时间:2021-07-01 10:21:17 帮助过:18人阅读
from sqlalchemy.ext.declarative import declarative_base # 导入基类
from sqlalchemy import Column, Integer, String # 数据类型
# Base = ORM基类 - 要按照ORM的规则定义你的类
Base = declarative_base()
class Users(Base):
__tablename__ = "user"
# 创建ID数据字段 , 那么ID是不是一个数据列呢? 也就是说创建ID字段 == 创建ID数据列
# id = Column(数据类型,索引,主键,外键,等等)
id = Column(Integer, primary_key=True, autoincrement=True,index=True)
name = Column(String(32), nullable=False) # nullable=False 不能为空
# 打开数据库的连接 -- 创建数据库引擎
from sqlalchemy import create_engine
# 创建数据库引擎
# engine = create_engine("mysql://scott:tiger@hostname/dbname",encoding=‘latin1‘, echo=True)
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/SQLAlchemy_Pro?charset=utf8")
Base.metadata.create_all(engine) # Base自动检索所有继承Base的ORM 对象 并且创建所有的数据表
2.单表的增删改查:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from creatTable import Users
# 创建引擎
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/SQLAlchemy_Pro?charset=utf8")
Session = sessionmaker(engine)
db_session = Session()
# 1. 增加数据add(创建表结构的类名(字段名=添加的数据))
db_session.add(Users(name="ZWQ")) # 相当于建立一条添加数据的sql语句
db_session.commit() # 执行
db_session.close() # 结束关闭
# 批量添加
db_session.add_all([Users(name="清风徐来"), Users(name="水波不兴")])
db_session.commit()
db_session.close()
# 2.查询 query(表结构的类名)
sqlres = db_session.query(Users)
print(sqlres) # 直接翻译输出对应的SQL查询语句
res = db_session.query(Users).all() # 返回表中所有数据对象
print(res)# [<creatTable.Users object at 0x00000000038A1B00>,<creatTable.Users object at 0x00000000038A1B70>]
for u in res:
print(u.id, u.name)
res = db_session.query(Users).first() # 取第一个,返回是对象
print(res.id, res.name)
res = db_session.query(Users).filter(Users.id == 3).first() # 返回符合条件查询结果
print(res.name)
res = db_session.query(Users).filter(Users.id <= 2, Users.name == "ZWQ").all() # filter中的条件可以是模糊条件,多个条件
for u in res:
print(u.id,u.name)
# 3.更改数据 update({k:v})
res = db_session.query(Users).filter(Users.id == 1).update({"name":"DragonFire"})
print(res)
db_session.commit()
res = db_session.query(Users).update({"name":"ZWQ"}) # 全部修改,返回修改的数据个数
print(res)
db_session.commit()
# 4.删除 delete()结合查询条件删除
res = db_session.query(Users).filter(Users.id == 1).delete() # 删除否合条件的数据,返回删除数量
print(res)
db_session.commit()
res = db_session.query(Users).delete() # 删除表中所有数据,返回删除数量
print(res)
db_session.commit()
3.创建外键关联的表结构:
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
from sqlalchemy import Column,Integer,String,ForeignKey
from sqlalchemy.orm import relationship
class Student(Base):
__tablename__ = "student"
id = Column(Integer,primary_key=True)
name = Column(String(32),nullable=False)
sch_id = Column(Integer,ForeignKey("school.id")) # 关联的表的字段,表间的关系
stu2sch = relationship("School",backref="sch2stu") # 写在哪边那边就是正向查询,对象间的关系,backref(反向查询)
class School(Base):
__tablename__ = "school"
id = Column(Integer,primary_key=True)
name = Column(String(32),nullable=False)
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/SQLAlchemy_Pro?charset=utf8")
Base.metadata.create_all(engine)
4、外键关联的表添加与查询操作:
from sqlalchemy.orm import sessionmaker
from creatTableFk import engine
from creatTableFk import Student,School # 导入创建表结构的类
Session = sessionmaker(engine)
db_session = Session()
# 1.添加数据
db_session.add(School(name="NCU"))
db_session.commit()
# relationship 正向添加
stu = Student(name="清风徐来",stu2sch=School(name="NCU"))
db_session.add(stu)
db_session.commit()
# relationship 反向添加
sch = School(name="NCU")
sch.sch2stu = [Student(name="YWB"),Student(name="CT")]
db_session.add(sch)
db_session.commit()
# 2.查询
res = db_session.query(Student).all()
for stu in res:
print(stu.name,stu.stu2sch.name) # 正向跨表
res = db_session.query(School).all()
for sch in res:
for stu in sch.sch2stu:
print(sch.name,stu.name) # 反向跨表
5.多对多的表的创建
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer, ForeignKey, create_engine
from sqlalchemy.orm import relationship
Base = declarative_base()
# 多对多关联通过第三张表关联,sqlalchemy要自己创建第三张表
class Girl(Base):
__tablename__ = "girls"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
g2b = relationship("Boy", backref="b2g", secondary="hotels")
class Boy(Base):
__tablename__ = "boys"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
class Hotel(Base):
__tablename__ = "hotels"
id = Column(Integer, primary_key=True)
boy_id = Column(Integer, ForeignKey("boys.id"), nullable=False)
girl_id = Column(Integer, ForeignKey("girls.id"), nullable=False)
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/SQLAlchemy_Pro?charset=utf8")
Base.metadata.create_all(engine)
6.多对多表数据的添加与查询:
from sqlalchemy.orm import sessionmaker
from CreateTableM2M import engine
Session = sessionmaker(engine)
db_session = Session()
from CreateTableM2M import Girl,Boy
# 1.增加数据
# relationship 正向添加
g = Girl(name="ZLY",g2b=[Boy(name="ZWQ"),Boy(name="FSF")])
db_session.add(g)
db_session.commit()
# relationship 反向添加
b = Boy(name="ZS")
b.b2g = [Girl(name="罗玉凤"),Girl(name="娟儿"),Girl(name="芙蓉姐姐")]
db_session.add(b)
db_session.commit()
# 2.查询
# relationship 正向
res = db_session.query(Girl).all()
for g in res:
for b in g.g2b:
print(g.name,b.name)
# relationship 反向
res = db_session.query(Boy).all()
for b in res:
for g in b.b2g:
print(b.name,g.name)
7. 高级ORM操作
1 from CreateTable import Users,engine
2 from sqlalchemy.orm import sessionmaker
3 from CreateTableForeignKey import Student
4
5 Session = sessionmaker(engine)
6 db_session = Session()
7
8 # 查询数据表操作
9 # and or
10 from sqlalchemy.sql import and_ , or_,desc
11 ret = db_session.query(Users).filter(and_(Users.id > 3, Users.name == ‘DragonFire‘)).all()
12 ret = db_session.query(Users).filter(or_(Users.id < 2, Users.name == ‘DragonFire‘)).all()
13
14 ret = db_session.query(Users).filter(
15 or_(
16 Users.id < 2,
17 and_(
18 Users.name == ‘eric‘,
19 Users.id > 3
20 ),
21 Users.name != ""
22 )
23 )
24 print(ret)
25 select * from User where id<2 or (name="eric" and id>3) or extra != ""
26
27
28 # 查询所有数据
29 r1 = db_session.query(User).all()
30
31 查询数据 指定查询数据列 加入别名
32 r2 = db_session.query(Student.name.label(‘username‘), Student.id).first()
33 print(r2.id,r2.username) # 15 NBDragon
34
35 # 表达式筛选条件
36 r3 = db_session.query(User).filter(User.name == "DragonFire").all()
37
38 # 原生SQL筛选条件
39 r4 = db_session.query(User).filter_by(name=‘DragonFire‘).all()
40 r5 = db_session.query(Users).filter_by(name=‘DragonFire‘).first()
41
42 字符串匹配方式筛选条件 并使用 order_by进行排序
43 r6 = db_session.query(Student).order_by(Student.name.desc()).all()
44 for i in r6:
45 print(i.id,i.name)
46
47 原生SQL查询
48 from sqlalchemy.sql import text
49 r7 = db_session.query(User).from_statement(text("SELECT * FROM User where name=:name")).params(name=‘DragonFire‘).all()
50
51 # 筛选查询列
52 # query的时候我们不在使用User ORM对象,而是使用User.name来对内容进行选取
53 user_list = db_session.query(User.name).all()
54 print(user_list)
55 for row in user_list:
56 print(row.name)
57
58 # 别名映射 name as nick
59 user_list = db_session.query(User.name.label("nick")).all()
60 print(user_list)
61 for row in user_list:
62 print(row.nick) # 这里要写别名了
63
64 # 筛选条件格式
65 user_list = db_session.query(User).filter(User.name == "DragonFire").all()
66 user_list = db_session.query(User).filter(User.name == "DragonFire").first()
67 user_list = db_session.query(User).filter_by(name="DragonFire").first()
68 for row in user_list:
69 print(row.nick)
70
71 # 复杂查询
72 from sqlalchemy.sql import text
73 user_list = db_session.query(User).filter(text("id<:value and name=:name")).params(value=3,name="DragonFire")
74
75 # 查询语句
76 from sqlalchemy.sql import text
77 user_list = db_session.query(User).filter(text("select * from User id<:value and name=:name")).params(value=3,name="DragonFire")
78
79 # 排序 :
80 user_list = db_session.query(User).order_by(User.id).all()
81 user_list = db_session.query(User).order_by(User.id.desc()).all()
82 for row