From c9522fa35f183f49b05857862aa4a5bffc80e08f Mon Sep 17 00:00:00 2001 From: Anton Chepurov Date: Mon, 9 Feb 2026 18:03:15 +0200 Subject: [PATCH 1/2] [IMP] use named cursor to iterate jobs Instead of fetching all jobs at once and then inserting them in the channels queue, fetch them by chunks of 2000 (default named cursor iterator behaviour since psycopg 2.4). This has the benefit of releasing the GIL periodically while loading jobs as well as consuming less memmory. backport of ae3e64ee27e8467380... and f0fc575fae36663409... with fixed line `for job_data in cr.fetchall():`. --- connector/jobrunner/runner.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/connector/jobrunner/runner.py b/connector/jobrunner/runner.py index 75f3672bdb..0715c33e5d 100644 --- a/connector/jobrunner/runner.py +++ b/connector/jobrunner/runner.py @@ -112,7 +112,7 @@ of running Odoo is obviously not for production purposes. """ -from contextlib import closing +from contextlib import closing, contextmanager import logging import os import re @@ -290,15 +290,16 @@ def _initialize(self): """) cr.execute("LISTEN connector") + @contextmanager def select_jobs(self, where, args): query = ("SELECT %s, uuid, id as seq, date_created, " "priority, eta, state " "FROM queue_job WHERE %s" % ('channel' if self.has_channel else 'NULL', where)) - with closing(self.conn.cursor()) as cr: + with closing(self.conn.cursor("select_jobs", withhold=True)) as cr: cr.execute(query, args) - return list(cr.fetchall()) + yield cr def set_job_enqueued(self, uuid): with closing(self.conn.cursor()) as cr: @@ -347,8 +348,9 @@ def initialize_databases(self): _logger.debug('connector is not installed for db %s', db_name) else: self.db_by_name[db_name] = db - for job_data in db.select_jobs('state in %s', (NOT_DONE,)): - self.channel_manager.notify(db_name, *job_data) + with db.select_jobs('state in %s', (NOT_DONE,)) as cr: + for job_data in cr.fetchall(): + self.channel_manager.notify(db_name, *job_data) _logger.info('connector runner ready for db %s', db_name) def run_jobs(self): @@ -368,11 +370,12 @@ def process_notifications(self): break notification = db.conn.notifies.pop() uuid = notification.payload - job_datas = db.select_jobs('uuid = %s', (uuid,)) - if job_datas: - self.channel_manager.notify(db.db_name, *job_datas[0]) - else: - self.channel_manager.remove_job(uuid) + with db.select_jobs('uuid = %s', (uuid,)) as cr: + job_datas = cr.fetchone() + if job_datas: + self.channel_manager.notify(db.db_name, *job_datas) + else: + self.channel_manager.remove_job(uuid) def wait_notification(self): for db in self.db_by_name.values(): From 659d7128ebb0d1ac083ae1b6a60d02cc7cc86f36 Mon Sep 17 00:00:00 2001 From: Anton Chepurov Date: Mon, 9 Feb 2026 18:09:56 +0200 Subject: [PATCH 2/2] [FIX] jobrunner: support pgbouncer: db_session_pool_mode_suffix config pgbouncer is typically configured with `pool_mode = transaction` that does not support LISTEN/NOTIFY of postgre. support alt. `session` entry. LISTEN/NOTIFY is used by bus.bus (IM), web_auto_refresh, connector etc. Example odoo.conf: db_session_pool_mode_suffix = _bus Example pgbouncer.ini with additional entries in `pool_mode=session` (odoo_bus, postgres_bus): [databases] odoo = host=xx.xx.xx.xx port=yy dbname=odoo pool_mode=transaction odoo_bus = host=xx.xx.xx.xx port=yy dbname=odoo pool_mode=session pool_size=5 postgres = host=xx.xx.xx.xx port=yy dbname=postgres pool_size=20 postgres_bus = host=xx.xx.xx.xx port=yy dbname=postgres pool_mode=session pool_size=5 [pgbouncer] pool_mode = transaction --- connector/jobrunner/runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/connector/jobrunner/runner.py b/connector/jobrunner/runner.py index 0715c33e5d..1e5b2146cc 100644 --- a/connector/jobrunner/runner.py +++ b/connector/jobrunner/runner.py @@ -209,7 +209,8 @@ class Database(object): def __init__(self, db_name): self.db_name = db_name - self.conn = psycopg2.connect(openerp.sql_db.dsn(db_name)[1]) + session_pool_suffix = config.get("db_session_pool_mode_suffix", '') + self.conn = psycopg2.connect(openerp.sql_db.dsn(db_name + session_pool_suffix)[1]) self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) self.has_connector = self._has_connector() if self.has_connector: