时间:2021-07-01 10:21:17 帮助过:62人阅读
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine("mysql+pymysql://root:Leon@localhost/study?charset=utf8",encoding='utf-8',echo=True)
Base = declarative_base()
from sqlalchemy import Column, Integer, String
class User(Base):
__tablename__ = 'user' # 表名
# 各个字段
id = Column(Integer, primary_key=True) # 主键
name = Column(String(32))
password = Column(String(64))
def __repr__(self): # 返回数据定义
return "[%s,%s,%s]"%(self.id,self.name,self.password)
Base.metadata.create_all(engine) # 创建表结构
# Column中还可以设定是否唯一:unique=True
# 设置值不为空:nullable=False
# 时间格式为:DateTime
# 枚举值:Column(Enum("1","2","3"))
# 布尔值:Boolean,表现形式为True/False
Session_class = sessionmaker(bind=engine) # 创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例
session = Session_class() # 生成session实例
#add
user_obj = User(name="Leon", password="123456")
user_obj1 = User(name="张三", password="156")
session.add_all([user_obj,user_obj1]) #添加
# session.add(obj1)这样可以单个的添加
session.comiit() #事务提交
# 返回数据需要在类中的__repr__中设定
session.query(User).filter(User.name=="Leon").first() # 查询第一个
session.query(User).filter(User.name=="Leon").all() # 查询多个,返回一个可循环列表对象
# 多个查询使用多个filter进行过滤
# 也可以在filter中用逗号`,`来and匹配
user_obj = session.query(User).filter(User.name=="Leon").first()
user_obj.password = "NewPassword"
session.comiit()
obj = session.query(User).filter(User.name=="Leon").first()
session.delete(obj)
session.commit()
print(session.query(User).filter(User.name=="Leon").count())
from sqlalchemy import func
print(session.query(func.count(User.name),User.name).group_by(User.name).all())
1.设置外键
+ 场景:一个学生拥有多个上课记录
+ 数据库表结构
class Student(Base):
__tablename__ = "student"
id = Column(Integer,primary_key=True)
name = Column(String(32),nullable=False)
register_data = Column(DATE,nullable=False)
def __repr__(self):
return "%s,%s" % (self.id, self.name)
class StudyRecord(Base):
__tablename__ = 'study_record'
id = Column(Integer, primary_key=True)
day = Column(Integer,nullable=False)
status = Column(String(32),nullable=False)
stu_id = Column(Integer,ForeignKey('student.id'))
# 加上一个关系,在这个表中可以通过student来反查student表中的数据,而在student表中可以通过my_study_record来反查StudyRecord中的数据
student = relationship("Student",backref="my_study_record")
def __repr__(self):
return "%s day:%s status: %s"%(self.student.name,self.day,self.status)
2.在表中查询数据
# 查询某个人的上课记录
stu_obj = session.query(Student).filter(Student.name=="Leon").first()
print(stu_obj.my_study_record) # 在student表中通过my_study_record来反查StudyRecord中的数据,显示跟局repr来定,此处返回字符串
# 连表反向查询,查询没来上课的人
record_obj = session.query(StudyRecord).filter(StudyRecord.status=="NO").first()
print(record_obj.student)
1.多外键
class Customer(Base):
__tablename__ = 'customer'
id = Column(Integer, primary_key=True)
name = Column(String(64))
billing_address_id = Column(Integer, ForeignKey("address.id"))
shipping_address_id = Column(Integer, ForeignKey("address.id"))
# 一个人对应一个账单地址和一个购物地址
address_b = relationship("Address",backref="my_addr_bill",foreign_keys=[billing_address_id])
address_s = relationship("Address",backref="my_addr_shop",foreign_keys=[shipping_address_id])
def __repr__(self):
return "name:%s"%self.name
class Address(Base):
__tablename__ = 'address'
id = Column(Integer, primary_key=True)
street = Column(String(64))
city = Column(String(64))
state = Column(String(64))
def __repr__(self):
return "street:%s|city:%s|state:%s"%(self.street,self.city,self.state)
2.添加数据
a1 = Address(street='望京',city='北京',state="中国")
a2 = Address(street='雁塔',city='西安',state="中国")
a3 = Address(street='太原',city='山西',state="中国")
session.add_all([a1,a2,a3])
c1 = Customer(name="Leon",address_b=a1,address_s=a2)
c2 = Customer(name="eric",address_b=a1,address_s=a3)
session.add_all([c1,c2])
session.commit()
3.在此表中查询数据
# 通过名字直接查询他的两个地址
customer_obj = session.query(Customer).filter(Customer.name=="Leon").first()
print(customer_obj.address_b,customer_obj.address_s)
# 通过地址查询该地址属于谁
Add_obj = session.query(Address).filter(Address.city=="北京").first()
print(Add_obj.my_addr_bill)
1.创建表结构
# 一个作者可以出版很多本书,一本书有很多个作者,直接用第三张表来讲作者表和图书表关联起来
class Author(Base):
__tablename__ = 'authors'
id = Column(Integer, primary_key=True)
name = Column(String(32)) # 作者
def __repr__(self):
return self.name
book_m2m_author = Table('book_m2m_author', Base.metadata,
Column('book_id',Integer,ForeignKey('books.id')),
Column('author_id',Integer,ForeignKey('authors.id')),
)
class Book(Base):
__tablename__ = 'books'
id = Column(Integer,primary_key=True)
name = Column(String(64))
pub_date = Column(DATE) # 出版日期
authors = relationship('Author',secondary=book_m2m_author,backref='books')
def __repr__(self):
return self.name
2.添加数据
# 添加数据
b1 = Book(name='Python',pub_date='2017-08-29')
b2 = Book(name='Linux',pub_date='2011-5-2')
b3 = Book(name='PHP',pub_date='2012-3-9')
b4 = Book(name='Java',pub_date='2015-11-2')
a1 = Author(name="Leon")
a2 = Author(name="Jack")
a3 = Author(name="Rain")
#建立关系
b1.authors=[a1,a2]
b2.authors=[a1,a3]
b3.authors=[a2,a3]
b4.authors=[a1,a2,a3]
Session_class = sessionmaker(bind=engine)
session = Session_class()
session.add_all([b1,b2,b3,b4,a1,a2,a3])
session.commit()
3.数据查询
book_obj = session.query(Book).filter(Book.name=='Python').first()
print(book_obj.authors)
authors_obj = session.query(Author).filter(Author.name=="Leon").first()
print(authors_obj.books)
4.此时删除数据
author_obj =s.query(Author).filter_by(name="Jack").first()
book_obj = s.query(Book).filter_by(name="Python").first()
book_obj.authors.remove(author_obj) #从一本书里删除一个作者
session.commit()
book_m2m_author
表中的数据也会自动更新author_obj =s.query(Author).filter_by(name="Alex").first()
session.delete(author_obj)
session.commit()
Python数据库操作-SQLAlchemy
标签:中国 ping class 返回 定义 == 匹配 设定 时间