Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions connector/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
of running Odoo is obviously not for production purposes.
"""

from contextlib import closing
from contextlib import closing, contextmanager
import logging
import os
import re
Expand Down Expand Up @@ -209,7 +209,8 @@ class Database(object):

def __init__(self, db_name):
self.db_name = db_name
self.conn = psycopg2.connect(openerp.sql_db.dsn(db_name)[1])
session_pool_suffix = config.get("db_session_pool_mode_suffix", '')
self.conn = psycopg2.connect(openerp.sql_db.dsn(db_name + session_pool_suffix)[1])
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.has_connector = self._has_connector()
if self.has_connector:
Expand Down Expand Up @@ -290,15 +291,16 @@ def _initialize(self):
""")
cr.execute("LISTEN connector")

@contextmanager
def select_jobs(self, where, args):
query = ("SELECT %s, uuid, id as seq, date_created, "
"priority, eta, state "
"FROM queue_job WHERE %s" %
('channel' if self.has_channel else 'NULL',
where))
with closing(self.conn.cursor()) as cr:
with closing(self.conn.cursor("select_jobs", withhold=True)) as cr:
cr.execute(query, args)
return list(cr.fetchall())
yield cr

def set_job_enqueued(self, uuid):
with closing(self.conn.cursor()) as cr:
Expand Down Expand Up @@ -347,8 +349,9 @@ def initialize_databases(self):
_logger.debug('connector is not installed for db %s', db_name)
else:
self.db_by_name[db_name] = db
for job_data in db.select_jobs('state in %s', (NOT_DONE,)):
self.channel_manager.notify(db_name, *job_data)
with db.select_jobs('state in %s', (NOT_DONE,)) as cr:
for job_data in cr.fetchall():
self.channel_manager.notify(db_name, *job_data)
_logger.info('connector runner ready for db %s', db_name)

def run_jobs(self):
Expand All @@ -368,11 +371,12 @@ def process_notifications(self):
break
notification = db.conn.notifies.pop()
uuid = notification.payload
job_datas = db.select_jobs('uuid = %s', (uuid,))
if job_datas:
self.channel_manager.notify(db.db_name, *job_datas[0])
else:
self.channel_manager.remove_job(uuid)
with db.select_jobs('uuid = %s', (uuid,)) as cr:
job_datas = cr.fetchone()
if job_datas:
self.channel_manager.notify(db.db_name, *job_datas)
else:
self.channel_manager.remove_job(uuid)

def wait_notification(self):
for db in self.db_by_name.values():
Expand Down