sqlalchemy学习

发布时间:2017-2-25 10:30:32 编辑:www.fx114.net 分享查询网我要评论
本篇文章主要介绍了"sqlalchemy学习 ",主要涉及到sqlalchemy学习 方面的内容,对于sqlalchemy学习 感兴趣的同学可以参考一下。

sqlalchemy官网API参考

原文作为一个Pythoner,不会SQLAlchemy都不好意思跟同行打招呼!

#作者:笑虎#链接:https://zhuanlan.zhihu.com/p/23190728#来源:知乎#著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。# _*_ coding: utf-8 _*_import sqlalchemyimport sqlalchemy.ormimport sqlalchemy.ext.declarative# 利用数据库字符串构造engine, echo为True将打印所有的sql语句, 其他数据库的链接方式可自行百度# engine = sqlalchemy.create_engine("mysql+pymysql://username:[email protected]/dbname", encoding="utf8", echo=True)engine = sqlalchemy.create_engine("mysql+pymysql://dba_0:[email protected]/data_secret", encoding="utf8", echo=False)"""# 利用engine创建connection,因为使用了with所以不需要close操作,这部分不是重点with engine.connect() as conn:    # 最基础的用法    result = conn.execute("select * from tablename limit 10;")    for item in result:        print(item)    # execute的几种用法,这里具体还是得参考pymysql的用法,不需要执行commit操作    conn.execute("insert into tablename(id, url, title) values(1, 'url1', 'title1');")    conn.execute("insert into tablename(id, url, title) values(%s, %s, %s);", 2, "url2", "title2")    conn.execute("insert into tablename(id, url, title) values(%s, %s, %s)", (3, "url3", "title3"))    conn.execute("insert into tablename(id, url, title) values(%s, %s, %s)", [(31, "url31", "title31"), (32, "url32", "title32")])    # 使用事务可以进行批量提交和回滚    trans = conn.begin()    try:        conn.execute("insert into tablename(id, url, title) values(%s, %s, %s)", [(4, "url4", "title4"), (5, "url5", "title5")])        trans.commit()    except Exception as excep:        trans.rollback()        raise    trans.close()"""# 首先需要生成一个BaseModel类,作为所有模型类的基类BaseModel = sqlalchemy.ext.declarative.declarative_base()# 构建数据模型Userclass User(BaseModel):    __tablename__ = "Users"         # 表名    __table_args__ = {        "mysql_engine": "InnoDB",   # 表的引擎        "mysql_charset": "utf8",    # 表的编码格式    }    # 表结构,具体更多的数据类型自行百度    id = sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True)    name = sqlalchemy.Column("name", sqlalchemy.String(50), nullable=False)    age = sqlalchemy.Column("age", sqlalchemy.Integer, default=0)    # 添加角色id外键(关联到Role.id属性)    role_id = sqlalchemy.Column("role_id", sqlalchemy.Integer, sqlalchemy.ForeignKey("Roles.id"))    # 添加关系属性(关联到role_id外键上)    role = sqlalchemy.orm.relationship("Role", foreign_keys="User.role_id")    # 添加关系属性(关联到role_id外键上),如果使用了这种方式,Role模型中的users可以省略    # role = sqlalchemy.orm.relationship("Role", foreign_keys="User.role_id", backref=sqlalchemy.orm.backref("users"))# 构建数据模型Roleclass Role(BaseModel):    __tablename__ = "Roles"         # 表名    __table_args__ = {        "mysql_engine": "InnoDB",   # 表的引擎        "mysql_charset": "utf8",    # 表的编码格式    }    # 表结构,具体更多的数据类型自行百度    id = sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True)    name = sqlalchemy.Column("name", sqlalchemy.String(50), unique=True)    # 添加关系属性(关联到User.role_id属性上)    users = sqlalchemy.orm.relationship("User", foreign_keys="User.role_id")# 利用Session对象连接数据库DBSessinon = sqlalchemy.orm.sessionmaker(bind=engine)   # 创建会话类session = DBSessinon()                                  # 创建会话对象# 删除所有表BaseModel.metadata.drop_all(engine)# 创建所有表,如果表已经存在,则不会创建BaseModel.metadata.create_all(engine)try:    # 清空数据,不需要commit操作    session.query(User).filter(User.id != -1).delete()    session.query(Role).filter(Role.id != -1).delete()    # 删除数据的另外一种形式:session.delete()    # 插入数据,这里的一个实例只插入一次,第二次插入不生效    session.add(Role(id=1, name="student"))    session.add(Role(id=2, name="teacher"))    session.commit()    session.add(User(name="James", age=20, role_id=1))    session.add(User(name="Wade", age=40, role_id=2))    session.commit()    user = User(name="Kobe", age=24, role_id=1)    session.add(user)    session.commit()    # 修改数据    user.name = "Allen"    session.merge(user)                         # 使用merge方法,如果存在则修改,如果不存在则插入    session.query(User).filter(User.id == user.id).update({User.name: "Allen"})         # 使用update方法    session.query(User).filter(User.id == user.id).update({User.age: User.age + 1})     # 使用update方法,自增操作    # 查询数据    roles = session.query(Role)                 # 返回全部结果    for role in roles:        print("Role:", role.id, role.name)    users = session.query(User)                 # 返回全部结果    for user in users:        print("User:", user.id, user.name, user.age, user.role_id)    # 其他获取数据的方式    print("get(id):", session.query(User).get(1))                       # 返回结果集中id为1的项    print("get[1:3]:", session.query(User)[1:3])                        # 返回结果集中的第2-3项    # 其他高级查询,这里以Users表为例    users = session.query(User).filter(User.id > 6)                     # 条件查询    users = session.query(User).filter(User.id > 6).all()               # 条件查询,返回查询的全部数据    user = session.query(User).filter(User.id > 6).first()              # 条件查询,返回查询数据的第一项    users = session.query(User).filter(User.id > 6).limit(10)           # 条件查询,返回最多10条数据    users = session.query(User).filter(User.id > 6).offset(2)           # 条件查询,从第3条数据开始返回    users = session.query(User).filter(User.id > 6, User.name == "Kobe")                    # 条件查询,and操作    users = session.query(User).filter(User.id > 6).filter(User.name == "Kobe")             # 条件查询,and操作    users = session.query(User).filter(sqlalchemy.or_(User.id > 6, User.name == "Kobe"))    # 条件查询,or操作    users = session.query(User).filter(User.id.in_((1, 2)))                                 # 条件查询,in操作    users = session.query(User).filter(sqlalchemy.not_(User.name))                          # 条件查询,not操作    user_count = session.query(User.id).count()                                             # 统计全部user的数量    user_count = session.query(sqlalchemy.func.count(User.id)).scalar()                     # scalar操作返回第一行数据的第一个字段    session.query(sqlalchemy.func.count("*")).select_from(User).scalar()                    # scalar操作返回第一行数据的第一个字段    session.query(sqlalchemy.func.count(1)).select_from(User).scalar()                      # scalar操作返回第一行数据的第一个字段    session.query(sqlalchemy.func.count(User.id)).filter(User.id > 0).scalar()              # filter() 中包含 User,因此不需要指定表    session.query(sqlalchemy.func.sum(User.age)).scalar()                                   # 求和运算,运用scalar函数    session.query(sqlalchemy.func.avg(User.age)).scalar()                                   # 求均值运算,运用scalar函数    session.query(sqlalchemy.func.md5(User.name)).filter(User.id == 1).scalar()             # 运用md5函数    users = session.query(sqlalchemy.distinct(User.name))               # 去重查询,根据name进行去重    users = session.query(User).order_by(User.name)                     # 排序查询,正序查询    users = session.query(User).order_by(User.name.desc())              # 排序查询,倒序查询    users = session.query(User).order_by(sqlalchemy.desc(User.name))    # 排序查询,倒序查询的另外一种形式    users = session.query(User.id, User.name)                           # 只查询部分属性    users = session.query(User.name.label("user_name"))                 # 结果集的列取别名    for user in users:        print("label test:", user.user_name)                            # 这里使用别名    users = session.query(sqlalchemy.func.count(User.name).label("count"), User.age).group_by(User.age)    # 分组查询    for user in users:        print("age:{0}, count:{1}".format(user.age, user.count))    # 多表查询    result = session.query(User, Role).filter(User.role_id == Role.id)    for user, role in result:        print("user %s's role is %s" % (user.name, role.name))    users = session.query(User).join(Role, User.role_id == Role.id)    for user in users:        print("user join, name:", user.name)    # 关联属性的用法    roles = session.query(Role)    for role in roles:        print("role:%s users:" % role.name)        for user in role.users:            print("\t%s" % user.name)    users = session.query(User)    for user in users:        print("user %s's role is %s" % (user.name, user.role.name))except Exception as excep:    session.rollback()    raisesession.close()



上一篇:CF576E
下一篇:使用PHP的GD2裁剪 + 缩放图片

相关文章

关键词: sqlalchemy学习

相关评论

本站评论功能暂时取消,后续此功能例行通知。

一、不得利用本站危害国家安全、泄露国家秘密,不得侵犯国家社会集体的和公民的合法权益,不得利用本站制作、复制和传播不法有害信息!

二、互相尊重,对自己的言论和行为负责。