一、概述1.1 Python操作MySQL的常用库Python操作MySQL数据库有多种选择各有特点库名特点适用场景PyMySQL纯Python实现安装简单兼容性好通用场景无需编译mysql-connector-pythonMySQL官方驱动功能完整企业级应用SQLAlchemyORM框架提供高层次抽象复杂业务需要对象映射aiomysql异步IO支持高并发异步应用MySQLdbC扩展实现性能高对性能要求高的场景1.2 环境准备bash# 安装PyMySQL pip install pymysql # 安装mysql-connector-python pip install mysql-connector-python # 安装SQLAlchemy通常配合PyMySQL使用 pip install sqlalchemy pymysql # 验证安装 python -c import pymysql; print(pymysql.__version__)二、PyMySQL基础使用2.1 连接数据库pythonimport pymysql # 方式1使用参数 connection pymysql.connect( hostlocalhost, # 数据库主机地址 port3306, # 端口号默认3306 userroot, # 数据库用户名 passwordyour_password, # 数据库密码 databaseschool, # 数据库名称 charsetutf8mb4, # 字符集 cursorclasspymysql.cursors.DictCursor # 返回字典格式结果 ) # 方式2使用字典参数 config { host: localhost, port: 3306, user: root, password: your_password, database: school, charset: utf8mb4, cursorclass: pymysql.cursors.DictCursor } connection pymysql.connect(**config) # 关闭连接 connection.close()2.2 使用上下文管理器pythonimport pymysql # 推荐使用with语句自动管理连接 with pymysql.connect( hostlocalhost, userroot, passwordyour_password, databaseschool, charsetutf8mb4 ) as connection: with connection.cursor() as cursor: cursor.execute(SELECT VERSION()) version cursor.fetchone() print(fMySQL版本: {version[0]})2.3 连接池实现pythonimport pymysql from contextlib import contextmanager from queue import Queue import threading class MySQLConnectionPool: 简单的MySQL连接池实现 def __init__(self, max_connections10, **db_config): self.max_connections max_connections self.db_config db_config self._pool Queue(maxsizemax_connections) self._lock threading.Lock() # 初始化连接池 for _ in range(max_connections): self._pool.put(self._create_connection()) def _create_connection(self): 创建新连接 return pymysql.connect(**self.db_config) contextmanager def get_connection(self): 获取连接上下文管理器 conn self._pool.get() try: yield conn finally: self._pool.put(conn) def close_all(self): 关闭所有连接 while not self._pool.empty(): conn self._pool.get() conn.close() # 使用示例 pool MySQLConnectionPool( max_connections5, hostlocalhost, userroot, passwordyour_password, databaseschool, charsetutf8mb4 ) with pool.get_connection() as conn: with conn.cursor() as cursor: cursor.execute(SELECT * FROM students LIMIT 10) results cursor.fetchall() print(results) # 程序结束时关闭连接池 pool.close_all()三、数据操作CRUD3.1 创建数据库和表pythonimport pymysql def init_database(): 初始化数据库和表 connection pymysql.connect( hostlocalhost, userroot, passwordyour_password, charsetutf8mb4 ) try: with connection.cursor() as cursor: # 创建数据库 cursor.execute(CREATE DATABASE IF NOT EXISTS school) cursor.execute(USE school) # 创建学生表 cursor.execute( CREATE TABLE IF NOT EXISTS students ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(50) NOT NULL, age TINYINT UNSIGNED, gender ENUM(男, 女, 保密) DEFAULT 保密, score DECIMAL(5,2) DEFAULT 0.00, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) # 创建班级表 cursor.execute( CREATE TABLE IF NOT EXISTS classes ( id INT AUTO_INCREMENT PRIMARY KEY, class_name VARCHAR(50) NOT NULL, grade VARCHAR(20) ) ) connection.commit() print(数据库和表创建成功) finally: connection.close() if __name__ __main__: init_database()3.2 插入数据INSERTpythonimport pymysql def insert_single_record(): 插入单条记录 connection pymysql.connect( hostlocalhost, userroot, passwordyour_password, databaseschool, charsetutf8mb4 ) try: with connection.cursor() as cursor: # 方式1直接拼接不推荐有SQL注入风险 # cursor.execute(fINSERT INTO students (name, age) VALUES ({name}, {age})) # 方式2参数化查询推荐防止SQL注入 sql INSERT INTO students (name, age, gender, score) VALUES (%s, %s, %s, %s) cursor.execute(sql, (张三, 18, 男, 85.5)) connection.commit() print(f插入成功影响行数: {cursor.rowcount}) print(f自增ID: {cursor.lastrowid}) finally: connection.close() def insert_multiple_records(): 批量插入多条记录 connection pymysql.connect( hostlocalhost, userroot, passwordyour_password, databaseschool, charsetutf8mb4 ) try: with connection.cursor() as cursor: sql INSERT INTO students (name, age, gender, score) VALUES (%s, %s, %s, %s) data [ (李四, 19, 男, 92.0), (王芳, 18, 女, 88.5), (赵强, 20, 男, 76.0), (孙丽, 19, 女, 95.5) ] # executemany用于批量插入 cursor.executemany(sql, data) connection.commit() print(f批量插入成功影响行数: {cursor.rowcount}) finally: connection.close() def insert_or_update(): 插入或更新ON DUPLICATE KEY UPDATE connection pymysql.connect( hostlocalhost, userroot, passwordyour_password, databaseschool, charsetutf8mb4 ) try: with connection.cursor() as cursor: # 假设id1的记录已存在 sql INSERT INTO students (id, name, age, score) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE name VALUES(name), age VALUES(age), score VALUES(score) cursor.execute(sql, (1, 张三更新, 19, 90.0)) connection.commit() print(f插入/更新成功) finally: connection.close() if __name__ __main__: insert_single_record() insert_multiple_records()