Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion eventbusk/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def __init__(self, broker: str):
self._topic_to_event: dict[str, str] = {}
self._event_to_topic: dict[str, str] = {}
self._receivers: set[ReceiverWrappedT] = set()
self.post_receive_hook: Callable[[Event, bool], None] | None = None
Copy link
Copy Markdown
Contributor

@sidmitra Siddharth Mitra (sidmitra) Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gautham Goli (@GauthamGoli)

What kind of things do you foresee the post_receive_hook doing? Can you give me some examples?

Not sure if this is something the framework should handle because this could get hairy and generate more issues to deal with if people start adding complex post-receive methods.
Since a consumer can receive/process millions of events while its up, we can't shouldn't do much cleanup per event like db connection handling. The post hook event needs to be ultra-fast, and allowed to fail as i see in the handling below. Then is it of any use? Is it plainly to add logging? Can't it be added as the last line of the consumer code itself? What does a separate handler give us?

Not saying NO here, but need to make sure this is a cost/complexity we need to pay forever for all events, for all consumers in the future.

What i do foresee is needing hooks that allow us to restart processes say after X events, or Ygb rss memory just like gunicorn/celery. This helps with memory leaks releasing unused memory back.
We might also need to allow timeouts. Eg. each event is allowed maximum of 30 seconds to do its work. This is just like celery. Because without the timeout, even if the consumer acks it might get ignored due to timeouts on kafkas side where(because kafka things consumer is dead and hasn't responded for 15 mins)

Copy link
Copy Markdown
Contributor Author

@GauthamGoli Gautham Goli (GauthamGoli) Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Siddharth Mitra (@sidmitra) Aiming to achieve similar functionality as task_postrun signal for celery workers, for resource clean up not logging.
Check out def close_db_connection_after_task(*args, **kwargs): in this related PR https://github.com/Airbase/airbase-backend/pull/29832


@staticmethod
def to_fqn(event_type: EventT | ReceiverT) -> str:
Expand Down Expand Up @@ -157,6 +158,12 @@ def send(
else:
raise exc

def set_post_receive_hook(self, hook: Callable[[Event, bool], None]) -> None:
"""
Sets a global post-receive hook, called after each handler.
"""
self.post_receive_hook = hook

@property
def receivers(self) -> set[ReceiverWrappedT]:
"""
Expand All @@ -166,7 +173,7 @@ def receivers(self) -> set[ReceiverWrappedT]:

# TODO: add group parameter?
def receive( # pylint: disable=too-complex
self, event_type: EventT, poll_timeout: int = 1
self, event_type: EventT, poll_timeout: int = 1, post_hook: Callable[[Event, bool], None] | None = None,
) -> ReceivedOuterT:
"""
Decorator to convert a function into an receiver.
Expand Down Expand Up @@ -287,6 +294,15 @@ def wrapper() -> None:
"Not acknowledging message.",
extra={**log_context, "data": event},
)
hook = post_hook or self.post_receive_hook
if hook:
try:
hook(event, success)
except Exception:
logger.exception(
"Error in post-receive hook",
extra={**log_context, "event_id": event_id},
)

except KeyboardInterrupt:
logger.info("Closing receiver.", extra=log_context)
Expand Down