需求场景:
使用sqlalchmy从现有的表中获取数据(不是自己建表)。百度了一下,网上都是使用sqlalchemy自己先创建表,然后导入数据表的模型类进行增删改查;现在不是自己建表,该如何操作呢?
操作方案
通过sqlalchmey执行原生的sql语句,增删改查的原生语句携带表名,就不需要导入数据表的模型类了。
使用的包:
SQLAlchemy (1.3.10) + mysql-connector-python (8.0.19)
提供以下干货:
- 演示了向原生sql语句传递变量的用法 即动态执行sql语句 更加灵活
- 通过执行原生的sql语句实现操作已有的表
- 演示了sql语句根据多字段排序的方法等
DEMO
# -*- coding:utf-8 -*- from sqlalchemy import create_engine,MetaData,Table,exists from sqlalchemy.orm import sessionmaker, scoped_session from util.Log import Log from conf.parseConfig import parseConf # 从配置文件中获取mysql的配置信息 host = parseConf.get_conf('MySQLInfo', 'host') port = parseConf.get_conf('MySQLInfo', 'port') dbname = parseConf.get_conf('MySQLInfo', 'dbname') usernm = parseConf.get_conf('MySQLInfo', 'usernm') passwd = parseConf.get_conf('MySQLInfo', 'passwd') engine_str = "mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}".format(usernm, passwd, host, port, dbname) class OpsMysql(object): def __init__(self, log=Log(__file__).getlog()): self.log = log self.session = None try: self.engine = create_engine( engine_str, max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1, # 多久之后对线程池中的线程进行一次连接的回收(重置) # echo=True, # 显示相应执行的 sql 指令 encoding='utf-8' ) SessionFactory = sessionmaker(bind=self.engine) self.session = scoped_session(SessionFactory) except Exception as e: self.log.error(str(e)) self.log.error("Connect {0}@{1}:{2} failed!".format(dbname, host, port)) def get_session(self): return self.session def getEngine(self): return self.engine def init_db(self, base): base.metadata.create_all(self.engine) def drop_db(self, base): base.metadata.drop_all(self.engine) if __name__ == "__main__": log = Log(__file__).getlog() tt = OpsMysql(log) session = tt.get_session() if session: # 通过执行原生的sql语句实现操作已有的表 # 此处演示了向原生sql语句传递变量的用法 即动态执行sql语句 更加灵活 mail_id = 1 res = session.execute('select * from tbl_mail_addr where mail_id='" + mail_id + "' and mail_tp="c"') res01 = res.fetchall() # 结果是列表 print(res01[0]) # (1, 'c', 1, 'XX@u163.com') mail_id = '1' mail_type = 'c' # 查询id为1,类型是c的邮箱信息,并按mail_tp降序,addr_no升序排列,限制查询数量100 sql = "select * from tbl_mail_addr where tbl_mail_addr.oper_flag='1' and tbl_mail_addr.mail_id='" + mail_id + "' and tbl_mail_addr.mail_tp='" + mail_type + "' order by tbl_mail_addr.mail_tp, tbl_mail_addr.addr_no ASC limit 100" result = session.execute(sql) value_list = result.fetchall() print(value_list) # [(1, 'c', 1, 'XX@163.com'), (1, 'c', 2, 'XX@qq.com), (1, 'c', 3, 'XX@qq.com')] session.close()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。