diff --git a/connector/jobrunner/runner.py b/connector/jobrunner/runner.py index 75f3672bdb..1e5b2146cc 100644 --- a/connector/jobrunner/runner.py +++ b/connector/jobrunner/runner.py @@ -112,7 +112,7 @@ of running Odoo is obviously not for production purposes. """ -from contextlib import closing +from contextlib import closing, contextmanager import logging import os import re @@ -209,7 +209,8 @@ class Database(object): def __init__(self, db_name): self.db_name = db_name - self.conn = psycopg2.connect(openerp.sql_db.dsn(db_name)[1]) + session_pool_suffix = config.get("db_session_pool_mode_suffix", '') + self.conn = psycopg2.connect(openerp.sql_db.dsn(db_name + session_pool_suffix)[1]) self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) self.has_connector = self._has_connector() if self.has_connector: @@ -290,15 +291,16 @@ def _initialize(self): """) cr.execute("LISTEN connector") + @contextmanager def select_jobs(self, where, args): query = ("SELECT %s, uuid, id as seq, date_created, " "priority, eta, state " "FROM queue_job WHERE %s" % ('channel' if self.has_channel else 'NULL', where)) - with closing(self.conn.cursor()) as cr: + with closing(self.conn.cursor("select_jobs", withhold=True)) as cr: cr.execute(query, args) - return list(cr.fetchall()) + yield cr def set_job_enqueued(self, uuid): with closing(self.conn.cursor()) as cr: @@ -347,8 +349,9 @@ def initialize_databases(self): _logger.debug('connector is not installed for db %s', db_name) else: self.db_by_name[db_name] = db - for job_data in db.select_jobs('state in %s', (NOT_DONE,)): - self.channel_manager.notify(db_name, *job_data) + with db.select_jobs('state in %s', (NOT_DONE,)) as cr: + for job_data in cr.fetchall(): + self.channel_manager.notify(db_name, *job_data) _logger.info('connector runner ready for db %s', db_name) def run_jobs(self): @@ -368,11 +371,12 @@ def process_notifications(self): break notification = db.conn.notifies.pop() uuid = notification.payload - job_datas = db.select_jobs('uuid = %s', (uuid,)) - if job_datas: - self.channel_manager.notify(db.db_name, *job_datas[0]) - else: - self.channel_manager.remove_job(uuid) + with db.select_jobs('uuid = %s', (uuid,)) as cr: + job_datas = cr.fetchone() + if job_datas: + self.channel_manager.notify(db.db_name, *job_datas) + else: + self.channel_manager.remove_job(uuid) def wait_notification(self): for db in self.db_by_name.values():