Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
350 views
in Technique[技术] by (71.8m points)

python - SQLAlchemy proper session handling in multi-thread applications

I have trouble understanding how to properly open and close database sessions efficiently, as I understood by the sqlalchemy documentation, if I use scoped_session to construct my Session object, and then use the returned Session object to create sessions, it's threadsafe, so basically every thread will get it's own session, and there won't be problems with it. Now the below example works, I put it in an infinite loop to see if it properly closes the sessions and if I monitored it correctly (in mysql by executing "SHOW PROCESSLIST;"), the connections just keep growing, it does not close them, even though I used session.close(), and even remove the scoped_session object at the end of each run. What am I doing wrong? My goal in a larger application is to use the minimum number of database connections required, because my current working implementation creates a new session in every method where it is required and closes it at before returning, which seems inefficient.

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel


DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'


class MTWorker(object):

    def __init__(self, worker_count=5):
        self.task_queue = Queue()
        self.worker_count = worker_count
        self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
        self.DBSession = scoped_session(
            sessionmaker(
                autoflush=True,
                autocommit=False,
                bind=self.db_engine
            )
        )

    def _worker(self):
        db_session = self.DBSession()
        while True:
            try:
                task_id = self.task_queue.get(False)
                try:
                    item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
                    # do something with item
                except Exception as exc:
                    # if an error occurrs we skip it
                    continue

                finally:
                    db_session.commit()
                    self.task_queue.task_done()
            except QueueEmpty:
                db_session.close()
                return

    def start(self):
        try:
            db_session = self.DBSession()
            all_items = db_session.query(MyModel).all()
            for item in all_items:
                self.task_queue.put(item.id)

            for _i in range(self.worker_count):
                t = Thread(target=self._worker)
                t.start()

            self.task_queue.join()
        finally:
            db_session.close()
            self.DBSession.remove()


if __name__ == '__main__':
    while True:
        mt_worker = MTWorker(worker_count=50)
        mt_worker.start()
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You should only be calling create_engine and scoped_session once per process (per database). Each will get its own pool of connections or sessions (respectively), so you want to make sure you're only creating one pool. Just make it a module level global. if you need to manage your sessions more preciesly than that, you probably shouldn't be using scoped_session

Another change to make is to use DBSession directly as though it were a session. calling session methods on the scoped_session will transparently create a thread-local session, if needed, and forward the method call to the session.

Another thing to be aware of is the pool_size of the connection pool, which is 5 by default. For many applications that's fine, but if you are creating lots of threads, you might need to tune that parameter

DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
DBSession = scoped_session(
    sessionmaker(
        autoflush=True,
        autocommit=False,
        bind=db_engine
    )
)


class MTWorker(object):

    def __init__(self, worker_count=5):
        self.task_queue = Queue()
        self.worker_count = worker_count
# snip

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...