由于业务需求,抓取数据后直接由python端入库,然后为了效率考虑多线程肯定是基本操作啦,之前为了提升抓取速度已经用上了异步,将速度由原来的20多s提到1s然后速度太快导致被滑块验证拦截,于是加上了代理IP。
今天用我之前fastapi操作MySQL那套去操作数据库的时候,发现多线程并不好用,各种报错
经过百度,发现了一篇
sqlalchemy 多线程 创建session:https://blog.csdn.net/Gragon_Shao/article/details/112786197
于是我把我之前用的那套模板改了一下
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from Config import config
if config.DBType == 'sqlite':
# 使用SQLite数据库
SQLALCHEMY_DATABASE_URL = f"sqlite:///{config.DataBase}"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=True, bind=engine)
else:
# 使用MySQL数据库
SQLALCHEMY_DATABASE_URL = f"mysql+pymysql://{config.UserName}:{config.Password}@{config.Host}:{config.Port}/{config.DataBase}"
engine = create_engine(SQLALCHEMY_DATABASE_URL, pool_pre_ping=True, pool_size=5, pool_timeout=30, pool_recycle=1)
SessionLocal = sessionmaker(autocommit=False, autoflush=True, bind=engine)
session = scoped_session(SessionLocal)
Base = declarative_base()
def get_db():
db = session
try:
yield db
finally:
db.remove()
在就是直接去调用session就可以了,使用完之后再remove
def test(task, db: scoped_session = session):
db.query(ta).filter(ta.id == task.id).delete()
db.add(ta(**task.to_dict()))
db.commit()
db.remove()
评论 (0)