diff --git a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py index 3b46e6db4d7..f0e661ffa6f 100644 --- a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py +++ b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py @@ -29,13 +29,34 @@ T = TypeVar("T") +def _estimate_in_mem_size(item: T) -> int: + """ + Estimate in-memory bytes for queue accounting. + Prefer payload/frame byte size when available; otherwise fall back to object size. + """ + if item is None: + return 0 + + payload = getattr(item, "payload", None) + frame = getattr(payload, "frame", None) if payload is not None else None + if frame is not None: + if hasattr(frame, "nbytes"): + return int(frame.nbytes) + if hasattr(frame, "to_table"): + table = frame.to_table() + if hasattr(table, "nbytes"): + return int(table.nbytes) + + return sys.getsizeof(item) + + class LinkedBlockingMultiQueue(IKeyedQueue): @inner class Node(Generic[T]): def __init__(self, item: T): self.item = item self.next: Optional[LinkedBlockingMultiQueue.Node[T]] = None - self.in_mem_size = sys.getsizeof(item) + self.in_mem_size = _estimate_in_mem_size(item) @inner class SubQueue(Generic[T]):