From 66755712ae4cfde1dc369de2a82c7af005b6b8bd Mon Sep 17 00:00:00 2001 From: yunyad Date: Wed, 11 Mar 2026 12:13:08 -0700 Subject: [PATCH 1/2] update logic for python in mem size estimation --- .../linked_blocking_multi_queue.py | 33 +++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py index 3b46e6db4d7..28ae1e91058 100644 --- a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py +++ b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py @@ -29,13 +29,34 @@ T = TypeVar("T") +def _estimate_in_mem_size(item: T) -> int: + """ + Estimate in-memory bytes for queue accounting. + Prefer payload/frame byte size when available; otherwise fall back to object size. + """ + if item is None: + return 0 + + payload = getattr(item, "payload", None) + frame = getattr(payload, "frame", None) if payload is not None else None + if frame is not None: + if hasattr(frame, "nbytes"): + return int(frame.nbytes) + if hasattr(frame, "to_table"): + table = frame.to_table() + if hasattr(table, "nbytes"): + return int(table.nbytes) + + return sys.getsizeof(item) + + class LinkedBlockingMultiQueue(IKeyedQueue): @inner class Node(Generic[T]): def __init__(self, item: T): self.item = item self.next: Optional[LinkedBlockingMultiQueue.Node[T]] = None - self.in_mem_size = sys.getsizeof(item) + self.in_mem_size = _estimate_in_mem_size(item) @inner class SubQueue(Generic[T]): @@ -165,9 +186,9 @@ def remove(self, obj: T) -> bool: self.fully_unlock() def unlink( - self, - trail: LinkedBlockingMultiQueue.Node, - next_: LinkedBlockingMultiQueue.Node, + self, + trail: LinkedBlockingMultiQueue.Node, + next_: LinkedBlockingMultiQueue.Node, ) -> None: trail.item = None trail.next = next_.next @@ -240,7 +261,7 @@ def peek(self) -> Optional[T]: @inner class DefaultSubQueueSelection(Generic[T]): def __init__( - self, priority_groups: List[LinkedBlockingMultiQueue.PriorityGroup[T]] + self, priority_groups: List[LinkedBlockingMultiQueue.PriorityGroup[T]] ): self.priority_groups: List[LinkedBlockingMultiQueue.PriorityGroup[T]] = ( priority_groups @@ -261,7 +282,7 @@ def peek(self) -> Optional[T]: return None def set_priority_groups( - self, priority_groups: List[LinkedBlockingMultiQueue.PriorityGroup[T]] + self, priority_groups: List[LinkedBlockingMultiQueue.PriorityGroup[T]] ) -> None: self.priority_groups = priority_groups From 3ff4e9c29efb53e4ea2f36859cd8fde9d7f592e7 Mon Sep 17 00:00:00 2001 From: yunyad Date: Wed, 11 Mar 2026 12:16:09 -0700 Subject: [PATCH 2/2] revert details --- .../customized_queue/linked_blocking_multi_queue.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py index 28ae1e91058..f0e661ffa6f 100644 --- a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py +++ b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py @@ -186,9 +186,9 @@ def remove(self, obj: T) -> bool: self.fully_unlock() def unlink( - self, - trail: LinkedBlockingMultiQueue.Node, - next_: LinkedBlockingMultiQueue.Node, + self, + trail: LinkedBlockingMultiQueue.Node, + next_: LinkedBlockingMultiQueue.Node, ) -> None: trail.item = None trail.next = next_.next @@ -261,7 +261,7 @@ def peek(self) -> Optional[T]: @inner class DefaultSubQueueSelection(Generic[T]): def __init__( - self, priority_groups: List[LinkedBlockingMultiQueue.PriorityGroup[T]] + self, priority_groups: List[LinkedBlockingMultiQueue.PriorityGroup[T]] ): self.priority_groups: List[LinkedBlockingMultiQueue.PriorityGroup[T]] = ( priority_groups @@ -282,7 +282,7 @@ def peek(self) -> Optional[T]: return None def set_priority_groups( - self, priority_groups: List[LinkedBlockingMultiQueue.PriorityGroup[T]] + self, priority_groups: List[LinkedBlockingMultiQueue.PriorityGroup[T]] ) -> None: self.priority_groups = priority_groups