时间:2021-07-01 10:21:17 帮助过:13人阅读
目录
models 文件
- <code># pip install sqlalchemy
- import datetime
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
- from sqlalchemy.orm import relationship
- Base = declarative_base()
- class Users(Base):
- __tablename__ = 'users' # 数据库表名称
- id = Column(Integer, primary_key=True) # id 主键
- name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空
- age = Column(Integer)
- #email = Column(String(32), unique=True)
- #datetime.datetime.now#不能加括号,加了括号,以后永远是当前时间
- #ctime = Column(DateTime, default=datetime.datetime.now)
- #extra = Column(Text, nullable=True)
- __table_args__ = (
- # UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
- # Index('ix_id_name', 'name', 'email'), #索引
- )
- def __repr__(self):
- return self.name
- class Hobby(Base):
- __tablename__ = "hobby"
- id = Column(Integer,primary_key=True)
- catption =Column(String(50),default="双色球")
- class Person(Base):
- __tablename__ = "person"
- nid = Column(Integer,primary_key=True)
- name = Column(String(32))
- #hobby值tablename而不是Hobby类名,
- hobby_id = Column(Integer,ForeignKey("hobby.id"))
- # 更数据库没有关系,不会新增加字段,只能用于快速的链表查询操作
- #relationship的第一个参数,是类名,第二个参数backref,用于反向查询
- hobby =relationship("Hobby",backref="pres")
- # 一个男孩可以喜欢多个女孩,一个女孩也可以喜欢多个男孩
- class Boy2Girl(Base):
- __tablename__ = "boy2girl"
- id = Column(Integer, primary_key=True)
- girl_id = Column(Integer,ForeignKey("girl.id"))
- boy_id = Column(Integer,ForeignKey("boy.id"))
- class Girl(Base):
- __tablename__ = "girl"
- id = Column(Integer,primary_key=True)
- name = Column(String(100),nullable=False)
- def __repr__(self):
- return self.name
- class Boy(Base):
- __tablename__ = "boy"
- id = Column(Integer, primary_key=True)
- name = Column(String(100), nullable=False)
- #secondary=boy2girl 中间表的表名
- def init_db():
- """
- 根据类创建数据库表
- :return:
- """
- engine = create_engine(
- "mysql+pymysql://root:@127.0.0.1:3307/python13?charset=utf8",
- #"什么数据库(mysql,orcal)+用什么取链接数据库(pymysql)://数据库用户名:密码@mysqlip:端口/数据库名?charset=字符集"
- max_overflow=0, # 超过连接池大小外最多创建的连接
- pool_size=5, # 连接池大小
- pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
- pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
- )
- Base.metadata.create_all(engine)
- def drop_db():
- """
- 根据类删除数据库表
- :return:
- """
- engine = create_engine(
- "mysql+pymysql://root:@127.0.0.1:3307/python13?charset=utf8",
- max_overflow=0, # 超过连接池大小外最多创建的连接
- pool_size=5, # 连接池大小
- pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
- pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
- )
- Base.metadata.drop_all(engine)
- if __name__ == '__main__':
- drop_db()
- init_db()</code>
orm.py
- <code>from sqlalchemy.orm import sessionmaker
- from sqlalchemy import create_engine
- from models import Users
- #"mysql+pymysql://root@127.0.0.1:3306/aaa"
- engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5)
- Connection = sessionmaker(bind=engine)
- # 每次执行数据库操作时,都需要创建一个Connection
- con = Connection()
- # 1 单增
- # obj1 = Users(name="lsb1",age=12)
- # con.add(obj1)
- # 2 多个增加
- # con.add_all([
- # Users(name="lsb1",age=12),
- # Users(name="esb",age=40),
- # Users(name="jsb",age=30),
- # Users(name="tsb",age=12),
- # #Host(name = "tsb",time=123213)
- # ])
- # 3 删除
- # con.query(Users).delete()
- #4 改
- # con.query(Users).update({"name":"sb","age":14})
- # con.query(Users).update({Users.name:Users.name +" is true","age":1},synchronize_session=False)
- # con.query(Users).update({Users.age:Users.age + 10})
- # 5查(查是不需要commit,也能拿到结果)
- #打印sql
- # r1 = con.query(Users)
- #查询所有
- # r1 = con.query(Users).all()
- #
- #查单条记录
- # r1 = con.query(Users).first()
- #查哪些字段
- # r1 = con.query(Users.age,Users.name.label("sb")).first()
- #过滤用filter_by(传参数)或者filter(传表达式)
- # r1 = con.query(Users).filter(Users.name == "tsb").first()
- # con.query(Users).filter(Users.name == "tsb").update({"name": "sb", "age": 14})
- r1 = con.query(Users).filter_by(name = "esb").first()
- print(r1)
- #必须提交才能生效
- con.commit()
- #关闭链接
- con.close()</code>
- <code>from sqlalchemy.orm import sessionmaker
- from sqlalchemy import create_engine
- from models import Users
- #"mysql+pymysql://root@127.0.0.1:3306/aaa"
- engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5)
- Connection = sessionmaker(bind=engine)
- # 每次执行数据库操作时,都需要创建一个Connection
- session = Connection()
- # 条件
- # ret = session.query(Users).filter_by(name = "esb").all()
- #表达式,and 条件链接
- # ret = session.query(Users).filter(Users.name == "sb",Users.age ==14 ).first()
- # print(ret.age,ret.name)
- # 表示的between,条件,30<=age<=40
- # ret = session.query(Users).filter(Users.age.between(30,40)).all()
- # print(ret)
- # sql查询的in_操作,相当于django中的__in
- # ret =session.query(Users).filter(Users.id.in_([9,11,13])).all()
- # print(ret)
- # # sql查询取反
- # ret1 = session.query(Users).filter(~Users.id.in_([9,11,13])).all()
- # print(ret1)
- #or查询 ,or和and ,做整合
- from sqlalchemy import or_,and_
- # ret = session.query(Users).filter(or_(Users.id == 9,Users.name=="jsb")).all()
- # ret = session.query(Users).filter(and_(Users.id == 9,Users.name=="lsb1")).all()
- # ret = session.query(Users).filter(or_(
- # Users.id == 9,
- # and_(Users.name=="jsb",Users.id==13),
- #
- # )
- # ).all()
- # like查询,
- #必须以b开头
- # ret = session.query(Users).filter(Users.name.like("b%")).all()
- # #第二字母是b
- # ret = session.query(Users).filter(Users.name.like("_b%")).all()
- #不以b开头
- # ret = session.query(Users).filter(~Users.name.like("b%")).all()
- #排序
- #desc重大到小排序
- # ret = session.query(Users).filter(Users.id>1).order_by(Users.id.desc()).all()
- #desc重小到大排序
- #ret = session.query(Users).filter(Users.id>1).order_by(Users.id.asc()).all()
- #多条件排序,先以年纪从大到小排,如果年龄相同,再以id从小到大排
- # ret = session.query(Users).filter(Users.id>1).order_by(Users.age.desc(),Users.id.asc()).all()
- # print(ret)
- #分组查询
- # ret = session.query(Users).group_by(Users.name).all()
- # 再分组的时候如果要用聚合操作,就要导入func
- from sqlalchemy.sql import func
- #选出组内最小年龄要大于等于30的组
- # ret = session.query(Users).group_by(Users.name).having(func.min(Users.age)>=30).all()
- #选出组内最小年龄要大于等于30的组,查询组内的最小年龄,最大年纪,年纪之和,
- ret = session.query(
- func.min(Users.age),
- func.max(Users.age),
- func.sum(Users.age),
- Users.name
- ).group_by(Users.name).having(func.min(Users.age)>=30).all()
- print(ret)</code>
- <code>from sqlalchemy.orm import sessionmaker
- from sqlalchemy import create_engine
- from models import Hobby,Person
- #"mysql+pymysql://root@127.0.0.1:3306/aaa"
- engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5)
- Connection = sessionmaker(bind=engine)
- # 每次执行数据库操作时,都需要创建一个Connection
- session = Connection()
- #1添加,没有用关联关系
- # session.add_all([
- # Hobby(catption="淫诗"),
- # Hobby(catption="推背"),
- # Person(name="tank",hobby_id=1),
- # Person(name="jason",hobby_id=2)
- # ])
- # 2添加 用关联关系
- # preson = Person(name="egon",hobby=Hobby(catption="相亲"))
- #session.add(preson)
- #
- # hobb = Hobby(catption="人妖")
- # hobb.pres = [Person(name="owen"),Person(name="sean")]
- # session.add(hobb)
- #session.commit()
- #正向查询
- # pr = session.query(Person).filter( Person.name == "tank").first()
- # print(pr.name)
- # print(pr.hobby.catption)
- #反向查
- # v = session.query(Hobby).filter(Hobby.catption=="人妖").first()
- # print(v.catption)
- # print(v.pres)
- # 自己连表,isouter=True表示是left join,不填默认为inner join
- person_list = session.query( Hobby).join(Person,Person.hobby_id==Hobby.id,isouter=True)
- #
- print(person_list)
- session.close()
- </code>
- <code>from sqlalchemy.orm import sessionmaker
- from sqlalchemy import create_engine
- from models import Boy,Boy2Girl,Girl
- #"mysql+pymysql://root@127.0.0.1:3306/aaa"
- engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5)
- Connection = sessionmaker(bind=engine)
- # 每次执行数据库操作时,都需要创建一个Connection
- session = Connection()
- #添加
- # session.add_all([
- # Boy(name="tank"),
- # Boy(name="sean"),
- # Girl(name="仓老师"),
- # Girl(name="小泽老师")
- # ])
- # b2g = Boy2Girl(boy_id=1,girl_id=2)
- # session.add(b2g )
- # b2g = Boy2Girl(boy_id=2,girl_id=1)
- # session.add(b2g )
- # session.commit()
- # session.close()
- #
- # boy = Boy(name="亚峰")
- # boy.girl=[Girl(name="迪丽热巴"),Girl(name="三上")]
- # session.add(boy)
- # session.commit()
- #
- # girl = Girl(name="丹丹")
- # girl.boys=[Boy(name="吴彦祖"),Boy(name="鹿晗")]
- # session.add(girl)
- # session.commit()
- # 使用relationship的关系,正向查
- # b = session.query(Boy).filter(Boy.name == "亚峰").first()
- # print(b.name)
- # print(b.girl)
- #反向查询
- # g = session.query(Girl).filter(Girl.name=="丹丹").first()
- # print(g.name)
- # print(g.boys)</code>
- <code>要用就必须先安装。
- 所有的到导入都找 下面的db
- from flask_sqlalchemy import SQLAlchemy
- db = SQLAlchemy()</code>
- <code>命令:manager.add_command('db1', MigrateCommand)
- 1 当项目第一次执行迁移的时候。
- python3 manage.py db1 init 只需要初始化一次
- 2 python3 manage.py db1 migrate # 等同于django的makemigrations
- 3 python3 manage.py db1 upgrade # 等同于django的migrate</code>
Flask SQLAlchemy
标签:删除数据库 and 报错 filter str 记录 pytho 过滤 添加