当前位置:Gxlcms > 数据库问题 > Python开发:ORM sqlalchemy

Python开发:ORM sqlalchemy

时间:2021-07-01 10:21:17 帮助过:14人阅读

import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker

‘‘‘ORM 方式
1、链接数据库
2、创建一个orm基类
3、创建一个表对应的类
4、创建表结构
‘‘‘

engine = create_engine("mysql+pymysql://root:123@172.11.2.109/test2",
                                    encoding=‘utf-8‘)
#echo=True表示打印数据,如果不写,则不打印

Base = declarative_base() #生成orm基类

class User(Base):
    __tablename__ = ‘user‘ #表名
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    password = Column(String(64))

#Base.metadata.create_all(engine) #创建表结构

Session_class = sessionmaker(bind=engine) #创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例
Session = Session_class() #生成session实例

user_obj = User(name="alex",password="alex3714") #生成你要创建的数据对象
print(user_obj.name,user_obj.id)  #此时还没创建对象呢,不信你打印一下id发现还是None

Session.add(user_obj) #把要创建的数据对象添加到这个session里, 一会统一创建
print(user_obj.name,user_obj.id) #此时也依然还没创建

Session.commit() #现此才统一提交,创建数据

  

增删改查

 

#生成一个对象,通过对象添加一条数据
obj = Users(name="alex0", extra=‘sb‘)
session.add(obj)
#添加多条数据
session.add_all([
    Users(name="alex1", extra=‘sb‘),
    Users(name="alex2", extra=‘sb‘),
])
session.commit()

  

#过滤后删除
session.query(Users).filter(Users.id > 2).delete()
session.commit()

  

#修改
my_user = Session.query(User).filter_by(name="alex").first()    #筛选出对象
my_user.name = "Alex Li"    #重新赋值
Session.commit()        #提交,只有提交后,数据库才会更新

session.query(Users).filter(Users.id > 2).update({"name" : "099"})
# 直接设置新值
session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
# {Users.name: Users.name + "099"}表示增加增加字符"099",
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
# 数字相加需要synchronize_session="evaluate"
session.commit()

  

查——原理讲解

#查询
my_user = Session.query(User).filter_by(name="alex")
print(my_user)
‘‘‘my_user为ORM生成的sql语句
SELECT user.id AS user_id, user.name AS user_name, user.password AS user_password
FROM user
WHERE user.name = %(name_1)s
‘‘‘

my_user = Session.query(User).filter_by().all()
print(my_user)
‘‘‘my_user对象地址
[<__main__.User object at 0x00000000040915C0>,
<__main__.User object at 0x0000000004091630>,
<__main__.User object at 0x00000000040916A0>]
‘‘‘

my_user = Session.query(User).filter_by(name="alex").all()
print(‘all >>‘,my_user)
print(my_user[0].id,my_user[0].name,my_user[0].password)
‘‘‘my_user输出一个列表
all >> [<__main__.User object at 0x0000000004092710>]
输出具体值 1 alex alex3714
‘‘‘

my_user = Session.query(User).filter_by(name="alex").first()
print(‘frist >>‘,my_user)
‘‘‘my_user输出一个对象
frist >> <__main__.User object at 0x0000000004092710>
‘‘‘
print(my_user.id,my_user.name,my_user.password)
‘‘‘my_user具体的值
1 alex alex3714
如果查询条件是Session.query(User).filter_by(name="alex").all(),则需要循环打印
‘‘‘

 查——方法列举

obj_1 = session.query(Users).all()
print(obj_1)
#[<id:1,name:alex0>, <id:2,name:alex1>, <id:3,name:alex2>]
# 查看第一个值的方式 print(obj_1[0].id,obj_1[0].name)
obj_2 = session.query(Users.name, Users.extra).all()
print(obj_2)
#直接打印值 [(‘alex0‘, ‘sb‘), (‘alex1‘, ‘sb‘), (‘alex2‘, ‘sb‘)]

obj_3 = session.query(Users).filter_by(name=‘alex1‘).all()
print(obj_3)    #条件查找[<id:2,name:alex1>]

obj_4 = session.query(Users).filter_by(extra=‘sb‘).first()
print(obj_4)    #只显示第一条 <id:1,name:alex0>

  

回滚

my_user = Session.query(User).filter_by(id=1).first()
my_user.name = "Jack"


fake_user = User(name=‘Rain‘, password=‘12345‘)
Session.add(fake_user)

print(Session.query(User).filter(User.name.in_([‘Jack‘,‘rain‘])).all() )
#这时看session里有你刚添加和修改的数据,
# 输出为[<__main__.User object at 0x000000000408A278>, <__main__.User object at 0x0000000004028A58>]
# 但数据库里查不到

Session.rollback() #此时你rollback一下

print(Session.query(User).filter(User.name.in_([‘Jack‘,‘rain‘])).all() )
#再查就发现刚才添加的数据没有了。
# 输出为[]

Session
Session.commit()

  

条件查询

obj_2 = session.query(Users).filter(Users.id > 1, Users.extra ==‘sb‘).all()
print(obj_2)    #[<id:2,name:alex1>, <id:3,name:alex2>]
obj_3 = session.query(Users).filter(Users.id.between(1, 3), Users.extra ==‘sb‘).all()
print(obj_3)    #[<id:1,name:alex0>, <id:2,name:alex1>, <id:3,name:alex2>]
obj_4 = session.query(Users).filter(Users.id.in_([1,3])).all()
print(obj_4)    #id等于1,3的[<id:1,name:alex0>, <id:3,name:alex2>]
obj_5 = session.query(Users).filter(~Users.id.in_([1,3])).all()
print(obj_5)    #id不等于1,3的[<id:2,name:alex1>]
obj_6 = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name=‘alex0‘))).all()
print(obj_6)    #嵌套过滤[<id:1,name:alex0>]
from sqlalchemy import and_, or_
obj_7 = session.query(Users).filter(and_(Users.id > 3, Users.extra ==‘sb‘)).all()
print(obj_7)    #[]
obj_8 = session.query(Users).filter(or_(Users.id < 2, Users.extra ==‘sb‘)).all()
print(obj_8)    #[<id:1,name:alex0>, <id:2,name:alex1>, <id:3,name:alex2>]
obj_9 = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.extra ==‘sb‘, Users.id < 3),
        Users.extra != ""
    )).all()
print(obj_9)    #[<id:1,name:alex0>, <id:2,name:alex1>, <id:3,name:alex2>]
(Users.extra ==‘sb‘, Users.id < 3)  #先and查找,然后再or查询

  

通配符

obj_1 = session.query(Users).filter(Users.name.like(‘e%‘)).all()
print(obj_1)    #[]
obj_2 = session.query(Users).filter(~Users.name.like(‘e%‘)).all()
print(obj_2)    #[<id:1,name:alex0>, <id:2,name:alex1>, <id:3,name:alex2>]

  

限制

obj_1 = session.query(Users)[0:1]
print(obj_1)    #打印第2个元素,[<id:1,name:alex0>]
obj_2 = session.query(Users)[1:2]
print(obj_2)    #打印第2个元素,[<id:2,name:alex1>]

  

排序

obj_1 = session.query(Users).order_by(Users.name.desc()).all()
obj_2 = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
print(obj_1)    #按名称倒叙 [<id:3,name:alex2>, <id:2,name:alex1>, <id:1,name:alex0>]
print(obj_2)    #先按Users.name.desc()排序,如果相同则按Users.id.asc()排序 [<id:3,name:alex2>, <id:2,name:alex1>, <id:1,name:alex0>]

  

分组

from sqlalchemy.sql import func

obj_1 = session.query(Users).group_by(Users.extra).all()
obj_2 = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()
obj_3 = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
#group_by配合having进行过滤

print(obj_1)    #基于(Users.extra)进行分组  [<id:1,name:alex0>]
print(obj_2)    #[(1, Decimal(‘1‘), 1), (2, Decimal(‘2‘), 2), (3, Decimal(‘3‘), 3)]
print(obj_3)    #[(3, Decimal(‘3‘), 3)]

  

连表

obj_1 = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
obj_2 = session.query(Person.name).join(Favor).all()
obj_3 = session.query(Person.name).join(Favor, isouter=True).all()
print(obj_1)    #通过类的方式,查询两个表,这里没有外键,所以没有join[(<id:1,name:alex0>, 1-girl), (<id:2,name:alex1>, 2-meat), (<id:3,name:alex2>, 3-money)]
print(obj_2)    #通过外键的方式,生成sql语句,进行链表查询。ORM会自己根据外键查找关系[(‘alex‘,), (‘alex‘,), (‘jack‘,), (‘jack‘,)]
print(obj_3)    #isouter=True表示外键left join的方式,如果不加,默认是inner join方式[(‘alex‘,), (‘alex‘,), (‘jack‘,), (‘jack‘,)]

  

组合

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
obj_1 = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
obj_2 = q1.union_all(q2).all()
#union会去重,union_all不会去重
print(obj_1)    #[(‘alex2‘,), (‘girl‘,)]
print(obj_2)    #[(‘alex2‘,), (‘girl‘,)]

  

  

外键关联

创建表并插入数据

import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,DATE
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

‘‘‘创建表‘‘‘
engine = create_engine("mysql+pymysql://root:123@172.11.2.109/test2",encoding=‘utf-8‘)
#echo=True表示打印数据,如果不写,则不打印

Base = declarative_base() #生成orm基类

class Student(Base):
    __tablename__ = ‘student‘
    id = Column(Integer, primary_key=True)
    name = Column(String(32), nullable=False)
    register = Column(DATE, nullable=False)

    def __repr__(self):
        return "<%s name:%s>" % (self.id,self.name)

class StudyRecord(Base):
    __tablename__ = ‘study_record‘
    id = Column(Integer,primary_key=True)
    day = Column(Integer,nullable=False)
    status = Column(String(32),nullable=False)
    stu_id = Column(Integer,ForeignKey(‘student.id‘))

    student = relationship("Student",backref=‘my_study_record‘)
     #这个nb,允许你在student表里通过backref字段反向查出所有它在StudyRecord表里的关联项
    #student实例指向Student类,数据库里没有student字段
    #my_study_record实例指向StudyRecord类,my_study_record相当于Student类里虚拟的实例

    def __repr__(self):
        return "<%s day:%s status:%s>" % (self.student.name,self.day,self.status)

Base.metadata.create_all(engine) #创建表结构

‘‘‘插入数据‘‘‘

from sqlalchemy.orm import sessionmaker
Session_class = sessionmaker(bind=engine)
session = Session_class()

s1 = Student(name = ‘alex‘,register = ‘2016-04-13‘)
s2 = Student(name = ‘jack‘,register = ‘2016-03-13‘)
s3 = Student(name = ‘pert‘,register = ‘2016-04-23‘)
s4 = Student(name = ‘rain‘,register = ‘2016-05-13‘)

study_obj1 = StudyRecord(day = 1,status=‘YES‘,stu_id=1)
study_obj2 = StudyRecord(day = 2,status=‘YES‘,stu_id=1)
study_obj3 = StudyRecord(day = 3,status=‘YES‘,stu_id=1)
study_obj4 = StudyRecord(day = 4,status=‘YES‘,stu_id=2)


session.add_all([s1,s2,s3,s4,study_obj1,study_obj2,study_obj3,study_obj4])    #添加数据
session.commit()      #提交

  

查询

stu_obj = session.query(Student).filter(Student.name == ‘alex‘).first()
print(stu_obj)
‘‘‘
输出结果
<1 name:alex>
‘‘‘
print(stu_obj.my_study_record)
‘‘‘
输出结果
[<alex day:1 status:YES>, <alex day:2 status:YES>, <alex day:3 status:YES>]
‘‘‘

  

多外键关联

下表中,Customer表有2个字段都关联了Address表 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from sqlalchemy import Integer, ForeignKey, String, Column from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship   Base = declarative_base()   class Customer(Base):     __tablename__ = ‘customer‘     id = Column(Integer, primary_key=True)     name = Column(String)       billing_address_id = Column(Integer, ForeignKey("address.id"))     shipping_address_id = Column(Integer, ForeignKey("address.id"))       billing_address = relationship("Address"     shipping_address = relationship("Address")   class Address(Base):     __tablename__ = ‘address‘     id = Column(Integer, primary_key=True)     street = Column(String)     city = Column(String)     state = Column(String)

创建表结构是没有问题的,但你Address表中插入数据时会报下面的错

1 2 3 4 5 6 sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join condition between parent/child tables on relationship Customer.billing_address - there are multiple foreign key paths linking the tables.  Specify the ‘foreign_keys‘ argument, providing a list of those columns which should be counted as containing a foreign key reference to the parent table.

解决办法如下

1 2 3 4 5 6 7 8 9 10 class Customer(Base):     __tablename__ = ‘customer‘     id = Column(Integer, primary_key=True)     name = Column(String)       billing_address_id = Column(Integer, ForeignKey("address.id"))     shipping_address_id = Column(Integer, ForeignKey("address.id"))       billing_address = relationship("Address", foreign_keys=[billing_address_id])     shipping_address = relationship("Address", foreign_keys=[shipping_address_id])

这样sqlachemy就能分清哪个外键是对应哪个字段了

 

多对多关系

现在来设计一个能描述“图书”与“作者”的关系的表结构,需求是

  1. 一本书可以有好几个作者一起出版
  2. 一个作者可以写好几本书

此时你会发现,用之前学的外键好像没办法实现上面的需求了,因为

技术分享

当然你更不可以像下面这样干,因为这样就你就相当于有多条书的记录了,太low b了,改书名还得都改。。。

技术分享

 

那怎么办呢? 此时,我们可以再搞出一张中间表,就可以了

技术分享

这样就相当于通过book_m2m_author表完成了book表和author表之前的多对多关联

用orm如何表示呢?

#一本书可以有多个作者,一个作者又可以出版多本书


from sqlalchemy import Table, Column, Integer,String,DATE, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

#链接数据库
engine = create_engine(‘mysql+pymysql://root:123@172.11.2.109/test2?charset=utf8‘, max_overflow=5)

#创建基类
Base = declarative_base()

#定义类(表)
‘‘‘
第三张表,不用访问,所以用Table的方式创建
这里也不用自增id
通过Base.metadata方法创建表
‘‘‘
book_m2m_author = Table(‘book_m2m_author‘, Base.metadata,
                        Column(‘book_id‘,Integer,ForeignKey(‘books.id‘)),
                        Column(‘author_id‘,Integer,ForeignKey(‘authors.id‘)),
                        )

class Book(Base):
    __tablename__ = ‘books‘
    id = Column(Integer,primary_key=True)
    name = Column(String(64))
    pub_date = Column(DATE)
    #通过第3张表连接,secondary=book_m2m_author指向第三张关系表
    #注意:secondary=book_m2m_author这块是表名,而不是类名
    authors = relationship(‘Author‘,secondary=book_m2m_author,backref=‘books‘)

    def __repr__(self):
        return self.name

class Author(Base):
    __tablename__ = ‘authors‘
    id = Column(Integer, primary_key=True)
    name = Column(String(32))

    def __repr__(self):
        return self.name

Base.metadata.create_all(engine) #创建表结构

from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
session = Session()

b1 = Book(name = ‘learn python with alex‘,pub_date = "2015-04-26")
b2 = Book(name = ‘learn zhuangbility with alex‘,pub_date = "2016-04-26")
b3 = Book(name = ‘跟alex学撩妹‘,pub_date = "2015-07-26")

a1 = Author(name=‘alex‘)
a2 = Author(name=‘jack‘)
a3 = Author(name=‘rain‘)

session.add_all([a1,a2,a3,b1,b2,b3])

#通过实例关联两张表
#先创建对象session.add_all([a1,a2,a3,b1,b2,b3]),再做关联,否则对应表有误
b3.authors=[a1,a2,a3]
b1.authors=[a1,a3]

session.commit()


  

删除

#删除
‘‘‘
多对多删除
删除数据时不用管boo_m2m_authors , sqlalchemy会自动帮你把对应的数据删除
‘‘‘
# 通过书删除作者(删除一条关联)
author_obj =session.query(Author).filter_by(name="Jack").first()
book_obj = session.query(Book).filter_by(name="跟Alex学把妹").first()
book_obj.authors.remove(author_obj) #从一本书里删除一个作者,这里是通过类删除
session.commit()

# 直接删除作者 (删除表里的一条信息,关联表里的数据会自动删除)
# 删除作者时,会把这个作者跟所有书的关联关系数据也自动删除
author_obj =session.query(Author).filter_by(name="Alex").first()
# print(author_obj.name , author_obj.books)
session.delete(author_obj)      #这里是数据库操作
session.commit()

  

 

 

 

Python开发:ORM sqlalchemy

标签:bind   表关联   cx_oracle   ati   log   line   const   嵌套   ble   

人气教程排行