悠悠楠杉
Python连接MySQL数据库实战:PyMySQL详细使用教程
一、为什么选择PyMySQL?
在Python生态中连接MySQL数据库主要有三种方式:
1. MySQL官方驱动:mysql-connector-python
2. ORM工具:SQLAlchemy、Django ORM
3. 纯Python实现:PyMySQL
PyMySQL因其纯Python实现、兼容性好、API简洁等特点,成为轻量级项目的首选。它完全兼容PEP 249标准,支持Python3.x全系列版本,特别适合需要快速开发数据库应用但不想引入重型ORM的场景。
二、环境准备与安装
1. 安装PyMySQL
bash
pip install pymysql
2. 创建测试数据库
sql
CREATE DATABASE pymysql_test;
USE pymysql_test;
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
三、基础连接与操作
1. 建立数据库连接
python
import pymysql
基础连接方式
connection = pymysql.connect(
host='localhost',
user='yourusername',
password='yourpassword',
database='pymysql_test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor # 返回字典格式结果
)
try:
with connection.cursor() as cursor:
# 执行SQL查询
cursor.execute("SELECT VERSION()")
result = cursor.fetchone()
print(f"MySQL Server version: {result['VERSION()']}")
finally:
connection.close()
2. 使用上下文管理器优化连接
python
def getconnection():
return pymysql.connect(
host='localhost',
user='root',
password='secret',
database='pymysqltest'
)
自动管理连接生命周期
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users LIMIT 5")
for row in cursor.fetchall():
print(row)
四、CRUD操作实战
1. 插入数据(Create)
python
def insertuser(username, email):
with getconnection() as conn:
with conn.cursor() as cursor:
sql = "INSERT INTO users (username, email) VALUES (%s, %s)"
cursor.execute(sql, (username, email))
conn.commit() # 必须显式提交事务
批量插入效率更高
data = [('user1', 'user1@test.com'), ('user2', 'user2@test.com')]
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.executemany("INSERT INTO users (username, email) VALUES (%s, %s)", data)
conn.commit()
2. 查询数据(Read)
python
def getuserbyemail(email):
with getconnection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users WHERE email = %s", (email,))
return cursor.fetchone() # 获取单条记录
分页查询示例
def getuserspaginated(page=1, perpage=10):
offset = (page - 1) * perpage
with getconnection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users ORDER BY id DESC LIMIT %s OFFSET %s",
(perpage, offset))
return cursor.fetchall()
3. 更新与删除(Update/Delete)
python
def updateusername(userid, newusername):
with getconnection() as conn:
with conn.cursor() as cursor:
cursor.execute(
"UPDATE users SET username = %s WHERE id = %s",
(newusername, userid)
)
conn.commit()
def deleteuser(userid):
with getconnection() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM users WHERE id = %s", (userid,))
conn.commit()
五、高级功能应用
1. 事务处理
python
def transfer_points(from_user, to_user, points):
try:
with get_connection() as conn:
with conn.cursor() as cursor:
# 扣减转出方积分
cursor.execute(
"UPDATE user_points SET points = points - %s WHERE user_id = %s",
(points, from_user)
)
# 增加接收方积分
cursor.execute(
"UPDATE user_points SET points = points + %s WHERE user_id = %s",
(points, to_user)
)
conn.commit()
except Exception as e:
conn.rollback() # 发生异常时回滚
print(f"Transaction failed: {e}")
2. 防止SQL注入
PyMySQL使用参数化查询自动防护SQL注入:python
错误示范(不要这样做!)
cursor.execute(f"SELECT * FROM users WHERE username = '{username}'")
正确做法
cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
3. 连接池优化(推荐生产环境使用)
python
from DBUtils.PooledDB import PooledDB
创建连接池
pool = PooledDB(
creator=pymysql,
maxconnections=5,
host='localhost',
user='root',
password='secret',
database='pymysql_test'
)
从连接池获取连接
def querywithpool():
conn = pool.connection()
try:
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM users")
return cursor.fetchone()
finally:
conn.close()
六、常见问题与解决方案
连接超时问题:
python connect_timeout=10 # 连接参数中添加超时设置
字符编码问题:
python charset='utf8mb4' # 支持完整的Unicode字符(包括emoji)
大结果集处理:python
使用SScursor流式游标
cursor = conn.cursor(pymysql.cursors.SSCursor)
连接丢失重连:
python conn.ping(reconnect=True) # 定期检查连接状态