From e3f489dbe1e32c99a494e2ebe85e13c03cb1fa57 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Tue, 15 Jul 2025 17:10:43 +1000 Subject: [PATCH 1/8] pulse_worker: reuse kombu Consumer (bug 1974350) --- git_hg_sync/pulse_worker.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/git_hg_sync/pulse_worker.py b/git_hg_sync/pulse_worker.py index 7acb80c..4e26fb4 100644 --- a/git_hg_sync/pulse_worker.py +++ b/git_hg_sync/pulse_worker.py @@ -24,6 +24,8 @@ class PulseWorker(ConsumerMixin): event_handler: EventHandler | None """Function that will be called whenever an event is received""" + consumer: kombu.Consumer | None = None + def __init__( self, connection: kombu.Connection, @@ -50,11 +52,12 @@ def get_consumers( consumer_class: type[kombu.Consumer], _channel: Any, ) -> list[kombu.Consumer]: - consumer = consumer_class( - self.task_queue, auto_declare=False, callbacks=[self.on_task] - ) - logger.debug(f"Using consumer {consumer=}") - return [consumer] + if not self.consumer: + self.consumer = consumer_class( + self.task_queue, auto_declare=False, callbacks=[self.on_task] + ) + logger.debug(f"Created consumer {self.consumer=}") + return [self.consumer] def on_connection_error(self, exc: Exception, interval: int) -> None: logger.error(f"Connection error: {exc=}, retrying in {interval}s ...") From 94c17c94b4ee605d90be5ecb093a0328ceea75e6 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Tue, 15 Jul 2025 17:49:15 +1000 Subject: [PATCH 2/8] application: dump tracemalloc stats --- docker-compose.yaml | 1 + git_hg_sync/application.py | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/docker-compose.yaml b/docker-compose.yaml index c1cd799..3ca679b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -20,6 +20,7 @@ services: environment: - EMAIL='docker@sync' - RABBITMQ=true + - TRACEMALLOC=10 depends_on: sync_init: condition: service_completed_successfully diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index cf2800e..f4169c5 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -3,6 +3,7 @@ import os import signal import sys +import tracemalloc from collections.abc import Sequence from types import FrameType @@ -18,6 +19,9 @@ class Application: + + _event_count: int = 0 + def __init__( self, worker: PulseWorker, @@ -91,3 +95,10 @@ def _handle_event(self, event: Event) -> None: self._handle_push_event(event) case _: raise NotImplementedError() + + self._event_count += 1 + if not self._event_count % 10: + self._event_count = 0 + if tracemalloc.is_tracing(): + for i, stat in enumerate(snapshot.statistics('filename')[:5], 1): + logging.info("tracemalloc",i=i, stat=str(stat)) From 92cb21e211bb17c7059fd9943cb1a97dee126bc9 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Tue, 15 Jul 2025 17:49:26 +1000 Subject: [PATCH 3/8] Revert "pulse_worker: reuse kombu Consumer (bug 1974350)" This reverts commit e3f489dbe1e32c99a494e2ebe85e13c03cb1fa57. --- git_hg_sync/pulse_worker.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/git_hg_sync/pulse_worker.py b/git_hg_sync/pulse_worker.py index 4e26fb4..7acb80c 100644 --- a/git_hg_sync/pulse_worker.py +++ b/git_hg_sync/pulse_worker.py @@ -24,8 +24,6 @@ class PulseWorker(ConsumerMixin): event_handler: EventHandler | None """Function that will be called whenever an event is received""" - consumer: kombu.Consumer | None = None - def __init__( self, connection: kombu.Connection, @@ -52,12 +50,11 @@ def get_consumers( consumer_class: type[kombu.Consumer], _channel: Any, ) -> list[kombu.Consumer]: - if not self.consumer: - self.consumer = consumer_class( - self.task_queue, auto_declare=False, callbacks=[self.on_task] - ) - logger.debug(f"Created consumer {self.consumer=}") - return [self.consumer] + consumer = consumer_class( + self.task_queue, auto_declare=False, callbacks=[self.on_task] + ) + logger.debug(f"Using consumer {consumer=}") + return [consumer] def on_connection_error(self, exc: Exception, interval: int) -> None: logger.error(f"Connection error: {exc=}, retrying in {interval}s ...") From 82310c1b6ac0d163c36753d9ec85c7ead32667cb Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Tue, 15 Jul 2025 17:53:12 +1000 Subject: [PATCH 4/8] fixup! application: dump tracemalloc stats --- git_hg_sync/application.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index f4169c5..8f0ed3b 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -19,7 +19,6 @@ class Application: - _event_count: int = 0 def __init__( @@ -100,5 +99,6 @@ def _handle_event(self, event: Event) -> None: if not self._event_count % 10: self._event_count = 0 if tracemalloc.is_tracing(): - for i, stat in enumerate(snapshot.statistics('filename')[:5], 1): - logging.info("tracemalloc",i=i, stat=str(stat)) + snapshot = tracemalloc.take_snapshot() + for i, stat in enumerate(snapshot.statistics("filename")[:5], 1): + logger.info("tracemalloc", i=i, stat=str(stat)) From 098ef384ad24df44bc67f2b38307addd1e21b144 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Tue, 15 Jul 2025 17:53:28 +1000 Subject: [PATCH 5/8] docker-compose: enable tracemalloc --- docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 3ca679b..975c773 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -20,7 +20,7 @@ services: environment: - EMAIL='docker@sync' - RABBITMQ=true - - TRACEMALLOC=10 + - TRACEMALLOC=1 depends_on: sync_init: condition: service_completed_successfully From 47523b187dcc21d4c70ab15c8a43aad576e3f2ed Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Tue, 15 Jul 2025 18:44:54 +1000 Subject: [PATCH 6/8] fixup! fixup! application: dump tracemalloc stats --- git_hg_sync/application.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index 8f0ed3b..74b2f66 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -86,6 +86,7 @@ def _handle_push_event(self, push_event: Push) -> None: ) def _handle_event(self, event: Event) -> None: + self._trace_malloc() if event.repo_url not in self._repo_synchronizers: logger.warning(f"Ignoring event for untracked repository: {event.repo_url}") return @@ -95,6 +96,7 @@ def _handle_event(self, event: Event) -> None: case _: raise NotImplementedError() + def _trace_malloc(self) -> None: self._event_count += 1 if not self._event_count % 10: self._event_count = 0 From 4ce2bcc1a1864a0c03f28d6fece0a01be2175afb Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 17 Jul 2025 17:29:42 +1000 Subject: [PATCH 7/8] fixup! fixup! fixup! application: dump tracemalloc stats --- git_hg_sync/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index 74b2f66..608a77e 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -103,4 +103,4 @@ def _trace_malloc(self) -> None: if tracemalloc.is_tracing(): snapshot = tracemalloc.take_snapshot() for i, stat in enumerate(snapshot.statistics("filename")[:5], 1): - logger.info("tracemalloc", i=i, stat=str(stat)) + logger.info(f"tracemalloc {i=} {stat=}") From dce728e70713551e3094a254bcc023a4bc12a2d0 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 24 Jul 2025 19:04:21 +1000 Subject: [PATCH 8/8] tracemalloc --- git_hg_sync/application.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index 608a77e..7ae3b17 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -20,6 +20,8 @@ class Application: _event_count: int = 0 + _prev_snapshot = None + _snapshot = None def __init__( self, @@ -101,6 +103,21 @@ def _trace_malloc(self) -> None: if not self._event_count % 10: self._event_count = 0 if tracemalloc.is_tracing(): - snapshot = tracemalloc.take_snapshot() - for i, stat in enumerate(snapshot.statistics("filename")[:5], 1): - logger.info(f"tracemalloc {i=} {stat=}") + self._snapshot = tracemalloc.take_snapshot() + + for i, stat in enumerate( + self._snapshot.statistics("lineno")[:25], + 1, + ): + logger.info( + f"tracemalloc absolute, {i}, {stat.traceback[0].filename}, {stat.traceback[0].lineno}, {stat.size / 1024}, {stat.count}" + ) + if self._prev_snapshot: + for i, stat in enumerate( + self._snapshot.compare_to(self._prev_snapshot, "lineno")[:25], + 1, + ): + logger.info( + f"tracemalloc diff, {i}, {stat.traceback[0].filename}, {stat.traceback[0].lineno}, {stat.size_diff / 1024}, {stat.count_diff}" + ) + self._prev_snapshot = self._snapshot