SQLAlchemy 安装
泛域名ssl证书 239元1年送1个月、单域名39元1年,Sectigo(原Comodo证书)全球可信证书,强大的兼容性,高度安全性,如有问题7天内可退、可开发票
加微信VX 18718058521 备注SSL证书
【腾讯云】2核2G4M云服务器新老同享99元/年,续费同价
首先保证 python版本是3.9以上
环境安装
pip install flask-sqlalchemy
方案1: 安装 mysqlclient依赖包 (如果失败再尝试方案2)
pip install mysqlclient
方案2: 安装pymysql依赖包
pip install pymysql
mysqlclient 和 pymysql 都是用于mysql访问的依赖包, 前者由C语言实现的, 而后者由python实现, 前者的执行效率比后者更高, 但前者在windows系统中兼容性较差, 工作中建议优先前者
组件初始化
flask-sqlalchemy 的相关配置也封装到了 flask 的配置项中, 可以通过app.config属性 或 配置加载方案 (如config.from_object) 进行设置
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31'
mysql+pymysql://xxxxxxx
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 设置数据库连接地址 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' # 是否追踪数据库修改(开启后会触发一些钩子函数) 一般不开启, 会影响性能 app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 是否显示底层执行的SQL语句 app.config['SQLALCHEMY_ECHO'] = True
两种初始化方式
方式1
flask-sqlalchemy 支持两种组件初始化方式:
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 应用配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # 方式1: 初始化组件对象, 直接关联Flask应用 db = SQLAlchemy(app)
方式2: 先创建组件, 延后关联Flass应用
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
# 方式2: 初始化组件对象, 延后关联Flask应用
db = SQLAlchemy() def create_app(config_type): """工厂函数""" # 创建应用 flask_app = Flask(__name__) # 加载配置 config_class = config_dict[config_type] flask_app.config.from_object(config_class) # 关联flask应用 db.init_app(app) return flask_app
构建模型类
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # 创建组件对象 db = SQLAlchemy(app) # 构建模型类 类->表 类属性->字段 实例对象->记录 class User(db.Model): __tablename__ = 't_user' # 设置表名, 表名默认为类名小写 id = db.Column(db.Integer, primary_key=True) # 设置主键, 默认自增 name = db.Column('username', db.String(20), unique=True) # 设置字段名 和 唯一约束 age = db.Column(db.Integer, default=10, index=True) # 设置默认值约束 和 索引 if __name__ == '__main__': # 删除所有继承自db.Model的表 db.drop_all() # 创建所有继承自db.Model的表 db.create_all() app.run(debug=True)
注意点
模型类必须继承 db.Model, 其中 db 指对应的组件对象
表名默认为类名小写, 可以通过 __tablename__类属性 进行修改
类属性对应字段, 必须是通过 db.Column() 创建的对象
可以通过 create_all() 和 drop_all()方法 来创建和删除所有模型类对应的表
常用的字段选项
注意点: 如果没有给对应字段的类属性设置default参数, 且添加数据时也没有给该字段赋值, 则sqlalchemy会给该字段设置默认值 None
数据操作
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # 创建组件对象 db = SQLAlchemy(app) # 构建模型类 class User(db.Model): __tablename__ = 't_user' id = db.Column(db.Integer, primary_key=True) name = db.Column('username', db.String(20), unique=True) age = db.Column(db.Integer, index=True) @app.route('/') def index(): """增加数据""" # 1.创建模型对象 user1 = User(name='zs', age=20) # user1.name = 'zs' # user1.age = 20 # 2.将模型对象添加到会话中 db.session.add(user1) # 添加多条记录 # db.session.add_all([user1, user2, user3]) # 3.提交会话 (会提交事务) # sqlalchemy会自动创建隐式事务 # 事务失败会自动回滚 db.session.commit() return "index" if __name__ == '__main__': db.drop_all() db.create_all() app.run(debug=True)
注意点:
这里的 会话 并不是 状态保持机制中的 session,而是 sqlalchemy 的会话。它被设计为 数据操作的执行者, 从SQL角度则可以理解为是一个 加强版的数据库事务
sqlalchemy 会 自动创建事务, 并将数据操作包含在事务中, 提交会话时就会提交事务
事务提交失败会自动回滚
查询数据
# hm_03_数据查询.py from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:mysql@127.0.0.1:3306/test31" app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["SQLALCHEMY_ECHO"] = False db = SQLAlchemy(app) # 自定义类 继承db.Model 对应 表 class User(db.Model): __tablename__ = "users" # 表名 默认使用类名的小写 # 定义类属性 记录字段 id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64)) email = db.Column(db.String(64)) age = db.Column(db.Integer) def __repr__(self): # 自定义 交互模式 & print() 的对象打印 return "(%s, %s, %s, %s)" % (self.id, self.name, self.email, self.age) @app.route('/') def index(): """ 查询所有用户数据 查询有多少个用户 查询第1个用户 查询id为4的用户[3种方式] 查询名字结尾字符为g的所有用户[开始 / 包含] 查询名字和邮箱都以li开头的所有用户[2种方式] 查询age是25 或者 `email`以`itheima.com`结尾的所有用户 查询名字不等于wang的所有用户[2种方式] 查询id为[1, 3, 5, 7, 9]的用户 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个 查询年龄从小到大第2-5位的数据 分页查询, 每页3个, 查询第2页的数据 查询每个年龄的人数 select age, count(name) from t_user group by age 分组聚合 只查询所有人的姓名和邮箱 优化查询 默认使用select * """ return 'index' if __name__ == '__main__': # 删除所有表 db.drop_all() # 创建所有表 db.create_all() # 添加测试数据 user1 = User(name='wang', email='wang@163.com', age=20) user2 = User(name='zhang', email='zhang@189.com', age=33) user3 = User(name='chen', email='chen@126.com', age=23) user4 = User(name='zhou', email='zhou@163.com', age=29) user5 = User(name='tang', email='tang@itheima.com', age=25) user6 = User(name='wu', email='wu@gmail.com', age=25) user7 = User(name='qian', email='qian@gmail.com', age=23) user8 = User(name='liu', email='liu@itheima.com', age=30) user9 = User(name='li', email='li@163.com', age=28) user10 = User(name='sun', email='sun@163.com', age=26) # 一次添加多条数据 db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10]) db.session.commit() app.run(debug=True)
# 查询所有用户数据 User.query.all() 返回列表, 元素为模型对象 # 查询有多少个用户 User.query.count() # 查询第1个用户 User.query.first() 返回模型对象/None # 查询id为4的用户[3种方式] # 方式1: 根据id查询 返回模型对象/None User.query.get(4) # 方式2: 等值过滤器 关键字实参设置字段值 返回BaseQuery对象 # BaseQuery对象可以续接其他过滤器/执行器 如 all/count/first等 User.query.filter_by(id=4).all() # 方式3: 复杂过滤器 参数为比较运算/函数引用等 返回BaseQuery对象 User.query.filter(User.id == 4).first() # 查询名字结尾字符为g的所有用户[开始 / 包含] User.query.filter(User.name.endswith("g")).all() User.query.filter(User.name.startswith("w")).all() User.query.filter(User.name.contains("n")).all() User.query.filter(User.name.like("w%n%g")).all() # 模糊查询 # 查询名字和邮箱都以li开头的所有用户[2种方式] User.query.filter(User.name.startswith('li'), User.email.startswith('li')).all() from sqlalchemy import and_ User.query.filter(and_(User.name.startswith('li'), User.email.startswith('li'))).all() # 查询age是25 或者 `email`以`itheima.com`结尾的所有用户 from sqlalchemy import or_ User.query.filter(or_(User.age==25, User.email.endswith("itheima.com"))).all() # 查询名字不等于wang的所有用户[2种方式] from sqlalchemy import not_ User.query.filter(not_(User.name == 'wang')).all() User.query.filter(User.name != 'wang').all() # 查询id为[1, 3, 5, 7, 9]的用户 User.query.filter(User.id.in_([1, 3, 5, 7, 9])).all() # 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个 User.query.order_by(User.age, User.id.desc()).limit(5).all() # 查询年龄从小到大第2-5位的数据 2 3 4 5 User.query.order_by(User.age).offset(1).limit(4).all() # 分页查询, 每页3个, 查询第2页的数据 paginate(页码, 每页条数) pn = User.query.paginate(2, 3) pn.pages 总页数 pn.page 当前页码 pn.items 当前页的数据 pn.total 总条数 # 查询每个年龄的人数 select age, count(name) from t_user group by age 分组聚合 from sqlalchemy import func data = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all() for item in data: # print(item[0], item[1]) print(item.age, item.count) # 建议通过label()方法给字段起别名, 以属性方式获取数据 # 只查询所有人的姓名和邮箱 优化查询 User.query.all() # 相当于select * from sqlalchemy.orm import load_only data = User.query.options(load_only(User.name, User.email)).all() # flask-sqlalchem的语法 for item in data: print(item.name, item.email) data = db.session.query(User.name, User.email).all() # sqlalchemy本体的语法 for item in data: print(item.name, item.email)
更新数据
flask-sqlalchemy 提供了两种更新数据的方案
先查询, 再更新
对应SQL中的 先select, 再update
基于过滤条件的更新 (推荐方案)
对应SQL中的 update xx where xx = xx (也称为 update子查询 )
先查询, 再更新
这种方式的缺点
查询和更新分两条语句, 效率低
如果并发更新, 可能出现更新丢失问题(Lost Update)
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # 创建组件对象 db = SQLAlchemy(app) # 构建模型类 商品表 class Goods(db.Model): __tablename__ = 't_good' # 设置表名 id = db.Column(db.Integer, primary_key=True) # 设置主键 name = db.Column(db.String(20), unique=True) # 商品名称 count = db.Column(db.Integer) # 剩余数量 @app.route('/') def purchase(): """购买商品""" # 更新方式1: 先查询后更新 # 缺点: 并发情况下, 容易出现更新丢失问题 (Lost Update) # 1.执行查询语句, 获取目标模型对象 goods = Goods.query.filter(Goods.name == '方便面').first() # 2.对模型对象的属性进行赋值 (更新数据) goods.count = goods.count - 1 # 3.提交会话 db.session.commit() return "index" if __name__ == '__main__': # 删除所有继承自db.Model的表 db.drop_all() # 创建所有继承自db.Model的表 db.create_all() # 添加一条测试数据 goods = Goods(name='方便面', count=1) db.session.add(goods) db.session.commit() app.run(debug=True)