Skip to content

feat: Implement stateful retry and resumable stream logic for TopicMessageQuery#2171

Open
manishdait wants to merge 7 commits into
hiero-ledger:mainfrom
manishdait:fix/flaky-submit-msg.e2e
Open

feat: Implement stateful retry and resumable stream logic for TopicMessageQuery#2171
manishdait wants to merge 7 commits into
hiero-ledger:mainfrom
manishdait:fix/flaky-submit-msg.e2e

Conversation

@manishdait
Copy link
Copy Markdown
Contributor

Description:
This PR introduces fixes for flaky TopicMessageQuery e2e tests by implementing retry logic and error handling.

Changes Made:

  • Added SubscriptionState to track last_message and count. on retry, the query resumes from last_message.consensusTimestamp + 1ns to prevent message loss or duplication.
  • Added _should_retry logic to identify retryable gRPC errors (e.g., UNAVAILABLE, RESOURCE_EXHAUSTED) and specific RST_STREAM internal errors.
  • Updated SubscriptionHandle to safely manage gRPC call cancellation across threads.

Related issue(s):

Fixes #1796

Notes for reviewer:

Checklist

  • Documented (Code comments, README, etc.)
  • Tested (unit, integration, etc.)

@github-actions github-actions Bot added skill: advanced requires knowledge of multiple areas in the codebase without defined steps to implement or examples scope: DLT involves engineering for distributed ledger technology labels Apr 18, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 18, 2026

Codecov Report

❌ Patch coverage is 99.05660% with 1 line in your changes missing coverage. Please review.

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2171      +/-   ##
==========================================
+ Coverage   93.99%   95.03%   +1.04%     
==========================================
  Files         163      163              
  Lines       10442    10504      +62     
==========================================
+ Hits         9815     9983     +168     
+ Misses        627      521     -106     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@codacy-production
Copy link
Copy Markdown

codacy-production Bot commented Apr 18, 2026

Up to standards ✅

🟢 Issues 0 issues

Results:
0 new issues

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

@github-actions
Copy link
Copy Markdown

Hi @manishdait,

This pull request has had no commit activity for 10 days. Are you still working on it?
To keep the PR active, you can:

  • Push a new commit.
  • Comment /working on the linked issue (not this PR).

If you're no longer working on this, please comment /unassign on the linked issue to release it for others. Otherwise, this PR may be closed due to inactivity.

Reach out on discord or join our office hours if you need assistance.

From the Python SDK Team

@github-actions
Copy link
Copy Markdown

Hi @manishdait, this is CronInactivityBot 👋

This pull request has had no new commits for 21 days, so I'm closing it and unassigning you from the linked issue to keep the backlog healthy.

If you're no longer interested, no action is needed.

Tip: You can comment /unassign on any issue to proactively step away before this bot kicks in.

If you'd like to continue working on this later, feel free to comment /assign on the issue to get re-assigned, and open a new PR when you're ready. 🚀

@github-actions github-actions Bot closed this May 19, 2026
@manishdait manishdait reopened this May 19, 2026
@manishdait manishdait force-pushed the fix/flaky-submit-msg.e2e branch from 83d14a9 to 631d51e Compare May 19, 2026 13:25
@manishdait manishdait marked this pull request as ready for review May 19, 2026 13:46
@manishdait manishdait requested review from a team as code owners May 19, 2026 13:46
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 19, 2026

Review Change Stack

Walkthrough

TopicMessageQuery subscription now tracks state across retries, resumes from the last received message, handles chunked responses, applies gRPC-aware retry decisions with exponential backoff, and coordinates thread-safe call cancellation via SubscriptionHandle.

Changes

Topic Message Query Subscription Reliability

Layer / File(s) Summary
Subscription state tracking and constants
src/hiero_sdk_python/query/topic_message_query.py
Added gRPC/logging imports, module logger, RST_STREAM regex, and SubscriptionState dataclass tracking retry attempts, message counts, last_message resume point, and buffered chunks.
Query initialization and error handler API
src/hiero_sdk_python/query/topic_message_query.py
Refactored __init__ to parse/store TopicId/Timestamp, default _limit handling, tightened validation for max_attempts/max_backoff, initialized internal handlers, and added public set_error_handler().
Subscription lifecycle helpers: request building, response handling, and retry logic
src/hiero_sdk_python/query/topic_message_query.py
Added _should_retry() (gRPC codes + RST_STREAM detail matching), _build_query_request() (resume from last_message, adjust limit), and _handle_response() (update counters, last_message, buffer/reassemble chunked responses keyed by initialTransactionID).
Subscription streaming loop and centralized error retry
src/hiero_sdk_python/query/topic_message_query.py
Refactored subscribe() to drive the stream with SubscriptionState, rebuild requests per attempt, process responses via _handle_response, enforce cancellation checks, and apply capped exponential backoff with attempt budget enforcement and conditional user error handler invocation.
Thread-safe gRPC call cancellation in SubscriptionHandle
src/hiero_sdk_python/utils/subscription_handle.py
Added _call storage and _lock, _set_call(call: Any) to register and immediately cancel if already signalled, and updated cancel() to cancel stored calls under the mutex.
SubscriptionHandle test coverage for call cancellation
tests/unit/subscription_handle_test.py
Added/annotated tests for default non-cancelled state, cancel() marking cancelled, join(timeout=...) forwarding to thread join and no-op join without thread, plus tests verifying active-call cancellation and immediate cancellation of calls set after cancellation.
TopicMessageQuery test expansion: initialization, chunk buffering, retry, and cancellation
tests/unit/topic_message_query_test.py
Updated imports/fixtures and payload; added initialization and setter validation tests; added missing-config precondition test; added chunk reassembly test; added retryable and non-retryable error tests asserting retry/stop behavior and on_error calls; added subscription cancellation test asserting underlying RPC cancel() is invoked.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The PR title clearly summarizes the main change: implementing stateful retry and resumable stream logic for TopicMessageQuery, which directly addresses the intermittent delivery failures documented in the linked issue.
Description check ✅ Passed The PR description is related to the changeset and clearly explains the fixes implemented: SubscriptionState for tracking state, retry logic for retryable errors, and SubscriptionHandle updates for thread-safe cancellation.
Linked Issues check ✅ Passed The PR implements all coding requirements from issue #1796: adds retry logic for retryable gRPC errors, implements resumable streams using last_message timestamp, updates SubscriptionHandle for safe cancellation, and includes comprehensive unit tests covering the new functionality.
Out of Scope Changes check ✅ Passed All changes are directly related to fixing the TopicMessageQuery reliability issue: modifications to TopicMessageQuery for retry/resume logic, SubscriptionHandle for thread-safe cancellation, and corresponding unit tests—no unrelated changes detected.
Docstring Coverage ✅ Passed Docstring coverage is 84.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

📋 Issue Planner

Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).

View plan for ticket: #1796

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6


ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: ea309a58-41ae-4f9a-9ef9-8bb283f770f5

📥 Commits

Reviewing files that changed from the base of the PR and between bc1e285 and 631d51e.

📒 Files selected for processing (4)
  • src/hiero_sdk_python/query/topic_message_query.py
  • src/hiero_sdk_python/utils/subscription_handle.py
  • tests/unit/subscription_handle_test.py
  • tests/unit/topic_message_query_test.py

Comment thread src/hiero_sdk_python/query/topic_message_query.py Outdated
Comment thread src/hiero_sdk_python/query/topic_message_query.py
Comment thread src/hiero_sdk_python/query/topic_message_query.py Outdated
Comment thread src/hiero_sdk_python/query/topic_message_query.py
Comment thread tests/unit/topic_message_query_test.py
Comment thread tests/unit/topic_message_query_test.py
@github-actions github-actions Bot added open to community review PR is open for community review and feedback queue:junior-committer PR awaiting initial quality review labels May 19, 2026
@manishdait manishdait force-pushed the fix/flaky-submit-msg.e2e branch from 631d51e to 43f9b2c Compare May 20, 2026 09:41
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4


ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 1454aa79-c860-4bc3-8f79-1b3fde0bbdc6

📥 Commits

Reviewing files that changed from the base of the PR and between 631d51e and 43f9b2c.

📒 Files selected for processing (4)
  • src/hiero_sdk_python/query/topic_message_query.py
  • src/hiero_sdk_python/utils/subscription_handle.py
  • tests/unit/subscription_handle_test.py
  • tests/unit/topic_message_query_test.py

Comment thread src/hiero_sdk_python/query/topic_message_query.py
Comment on lines +20 to +26
def _set_call(self, call: Any):
"""Sets the active gRPC call so it can be cancelled."""
with self._lock:
self._call = call

if self._cancelled.is_set():
self._call.cancel()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid calling cancel() while holding _lock.

Line 26 and Line 34 invoke cancel() inside the critical section. If that call blocks or re-enters related paths, this can stall or deadlock cancellation. Capture the call reference while locked, then cancel outside the lock. Also use is not None at Line 33.

Suggested fix
 def _set_call(self, call: Any):
     """Sets the active gRPC call so it can be cancelled."""
+    call_to_cancel = None
     with self._lock:
         self._call = call
 
         if self._cancelled.is_set():
-            self._call.cancel()
+            call_to_cancel = call
+    if call_to_cancel is not None:
+        call_to_cancel.cancel()
 
 def cancel(self):
     """Signals to cancel the subscription."""
+    call_to_cancel = None
     with self._lock:
         self._cancelled.set()
 
-        if self._call:
-            self._call.cancel()
+        if self._call is not None:
+            call_to_cancel = self._call
+    if call_to_cancel is not None:
+        call_to_cancel.cancel()

Also applies to: 30-34

Comment thread tests/unit/topic_message_query_test.py
Comment thread tests/unit/topic_message_query_test.py
@manishdait manishdait force-pushed the fix/flaky-submit-msg.e2e branch from 43f9b2c to 45c80fb Compare May 20, 2026 17:26
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
src/hiero_sdk_python/query/topic_message_query.py (1)

61-62: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Type annotation mismatch: _error_handler declared as Callable[[], None] but used with Exception argument.

Line 62 declares _error_handler as Callable[[], None], but:

  • set_error_handler (line 85) expects Callable[[Exception], None]
  • _on_error (line 118) takes err: Exception
  • The handler is invoked with self._error_handler(e) at line 243

This inconsistency means type checkers will report errors, and users following the type hint at line 62 would provide incompatible handlers.

🐛 Proposed fix
         self._completion_handler: Callable[[], None] | None = self._on_complete
-        self._error_handler: Callable[[], None] | None = self._on_error
+        self._error_handler: Callable[[Exception], None] | None = self._on_error
tests/unit/topic_message_query_test.py (3)

152-154: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Chunk ordering is not verified — test would pass even if chunks are assembled in wrong order.

Using in only checks presence, not position. If the implementation incorrectly orders chunks, this test won't detect it.

🐛 Proposed fix to verify correct ordering
     assert len(received_messages) == 1
-    assert b"chunk-1" in received_messages[0].contents
-    assert b"chunk-2" in received_messages[0].contents
+    # Verify chunks are assembled in correct order
+    contents = received_messages[0].contents
+    chunk1_pos = contents.find(b"chunk-1")
+    chunk2_pos = contents.find(b"chunk-2")
+    assert chunk1_pos != -1, "chunk-1 not found in assembled message"
+    assert chunk2_pos != -1, "chunk-2 not found in assembled message"
+    assert chunk1_pos < chunk2_pos, "Chunks assembled in wrong order: chunk-1 should come before chunk-2"

As per coding guidelines, tests should catch regressions in chunk ordering behavior.


167-178: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Missing assertion that on_error is invoked after retries exhaust.

When max_attempts are exhausted with retryable errors, on_error should be called. The test creates an on_error=MagicMock() but never verifies it was invoked, so regressions in terminal error handling would go undetected.

🐛 Proposed fix
 def test_retry_logic_on_retryable_error(mock_client, error):
     """Test that the query retries on retryable errors but stops after max_attempts."""
     query = TopicMessageQuery(topic_id="0.0.123").set_max_attempts(2).set_max_backoff(0.5)

     mock_client.mirror_stub.subscribeTopic.side_effect = [error, error]

-    handle = query.subscribe(mock_client, on_message=MagicMock(), on_error=MagicMock())
+    on_error = MagicMock()
+    handle = query.subscribe(mock_client, on_message=MagicMock(), on_error=on_error)

     handle._thread.join(timeout=2.0)
+    assert not handle._thread.is_alive(), "Thread should have terminated after retries exhausted"

     assert mock_client.mirror_stub.subscribeTopic.call_count == 2
+    on_error.assert_called_once_with(error)

As per coding guidelines, "Tests must provide useful error messages when they fail for future debugging."


220-224: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Timing-dependent sleep can cause flaky tests in CI.

time.sleep(0.2) followed by assert handle._thread.is_alive() is scheduler-dependent. In slow CI environments, the thread may not have started processing yet, or conversely, the sleep may be insufficient. Use an event to synchronize on first message receipt before cancellation.

🐛 Proposed fix using event synchronization
+import threading
+
 def test_subscription_cancellation(mock_client):
     """Test that cancelling a handle stops the subscription thread."""
     query = TopicMessageQuery(topic_id="0.0.123")

     def infinite_stream():
         while True:
             yield mirror_proto.ConsensusTopicResponse(message=b"ping")
             time.sleep(0.1)

     mock_call = MagicMock()
     mock_call.__iter__.return_value = infinite_stream()

     mock_client.mirror_stub.subscribeTopic.return_value = mock_call

-    on_message = MagicMock()
+    first_message_seen = threading.Event()
+    on_message = MagicMock(side_effect=lambda _: first_message_seen.set())
     handle = query.subscribe(mock_client, on_message=on_message)

-    time.sleep(0.2)
-    assert handle._thread.is_alive()
+    assert first_message_seen.wait(timeout=2.0), "Expected at least one streamed message before cancellation"
+    assert handle._thread.is_alive(), "Subscription thread should be alive before cancel"

     handle.cancel()

As per coding guidelines, "No timing-dependent or unseeded random assertions."


ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: cc2d52be-f082-42a4-b517-4b8426de7c85

📥 Commits

Reviewing files that changed from the base of the PR and between 43f9b2c and 45c80fb.

📒 Files selected for processing (4)
  • src/hiero_sdk_python/query/topic_message_query.py
  • src/hiero_sdk_python/utils/subscription_handle.py
  • tests/unit/subscription_handle_test.py
  • tests/unit/topic_message_query_test.py

Comment thread tests/unit/topic_message_query_test.py
@github-actions
Copy link
Copy Markdown

Hi, this is WorkflowBot.
Your pull request cannot be merged as it is not passing all our workflow checks.
Please click on each check to review the logs and resolve issues so all checks pass.
To help you:

Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
@manishdait manishdait force-pushed the fix/flaky-submit-msg.e2e branch from 45c80fb to bb49b14 Compare May 21, 2026 10:59
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (4)
tests/unit/topic_message_query_test.py (4)

383-408: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Replace timing-dependent sleep with event-based synchronization.

The time.sleep(0.2) at line 400 followed by is_alive() assertion at line 401 is scheduler-dependent and can flap in CI under high load or slow schedulers. Use an event to deterministically wait for the first message delivery before asserting thread liveness and performing cancellation.

⏱️ Proposed fix using threading.Event
+import threading
 import time
 from datetime import datetime, timezone
@@
 def test_subscription_cancellation(mock_client):
     """Test that cancelling a handle stops the subscription thread."""
     query = TopicMessageQuery(topic_id="0.0.123")

     def infinite_stream():
         while True:
             yield mirror_proto.ConsensusTopicResponse(message=b"ping")
             time.sleep(0.1)

     mock_call = MagicMock()
     mock_call.__iter__.return_value = infinite_stream()

     mock_client.mirror_stub.subscribeTopic.return_value = mock_call

-    on_message = MagicMock()
+    first_message_seen = threading.Event()
+    on_message = MagicMock(side_effect=lambda _: first_message_seen.set())
     handle = query.subscribe(mock_client, on_message=on_message)

-    time.sleep(0.2)
-    assert handle._thread.is_alive()
+    assert first_message_seen.wait(timeout=1.0), "Expected at least one message before cancellation"
+    assert handle._thread.is_alive(), "Subscription thread should be alive before cancel()"

     handle.cancel()

     handle._thread.join(timeout=1.0)

     assert not handle._thread.is_alive()
     mock_call.cancel.assert_called()

As per coding guidelines, "No timing-dependent or unseeded random assertions."


54-80: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Verify fluent interface contract: setters must return self.

The test validates internal state but doesn't assert that each setter returns self, which is required for method chaining. If a setter is accidentally changed to return None, chaining would break but this test would pass.

🔗 Proposed fix
 def test_topic_message_query_initialization():
     """Test initializing the query with various parameter types and setters."""
     start = datetime(2023, 1, 1, tzinfo=timezone.utc)

     def mock_complete():
         pass

     def mock_error(e):
         pass

-    query = (
-        TopicMessageQuery()
-        .set_topic_id("0.0.123")
-        .set_start_time(start)
-        .set_limit(5)
-        .set_chunking_enabled(True)
-        .set_completion_handler(mock_complete)
-        .set_error_handler(mock_error)
-    )
+    query = TopicMessageQuery()
+    assert query.set_topic_id("0.0.123") is query, "set_topic_id should return self"
+    assert query.set_start_time(start) is query, "set_start_time should return self"
+    assert query.set_limit(5) is query, "set_limit should return self"
+    assert query.set_chunking_enabled(True) is query, "set_chunking_enabled should return self"
+    assert query.set_completion_handler(mock_complete) is query, "set_completion_handler should return self"
+    assert query.set_error_handler(mock_error) is query, "set_error_handler should return self"

     assert query._topic_id.topicNum == 123
     assert query._start_time.seconds == int(start.timestamp())
     assert query._limit == 5
     assert query._chunking_enabled is True
     assert query._completion_handler == mock_complete
     assert query._error_handler == mock_error

As per coding guidelines, "Assert fluent setters return self".


304-306: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Chunk ordering is not verified — test passes even if chunks are assembled backwards.

The test uses in at lines 305-306, which only checks presence. If the implementation concatenates chunk-2 before chunk-1, this assertion would still pass.

🔍 Proposed fix to verify correct ordering
     assert len(received_messages) == 1
-    assert b"chunk-1" in received_messages[0].contents
-    assert b"chunk-2" in received_messages[0].contents
+    # Verify chunks are assembled in correct order
+    contents = received_messages[0].contents
+    chunk1_pos = contents.find(b"chunk-1")
+    chunk2_pos = contents.find(b"chunk-2")
+    assert chunk1_pos != -1, "chunk-1 not found in assembled message"
+    assert chunk2_pos != -1, "chunk-2 not found in assembled message"
+    assert chunk1_pos < chunk2_pos, "Chunks assembled in wrong order: chunk-1 must precede chunk-2"

As per coding guidelines, tests must catch regressions in ordering behavior.


345-355: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Verify terminal error callback and thread termination after retries exhaust.

The test passes on_error=MagicMock() at line 350 but never asserts it's invoked when max_attempts are exhausted. The implementation should call on_error with the final error before stopping. Additionally, join(timeout=2.0) at line 352 should be followed by an assertion that the thread actually terminated—if it times out, the subscription count assertion at line 354 may pass spuriously.

🧪 Proposed fix
 def test_retry_logic_on_retryable_error(mock_client, error):
     """Test that the query retries on retryable errors but stops after max_attempts."""
     query = TopicMessageQuery(topic_id="0.0.123").set_max_attempts(2).set_max_backoff(0.5)

     mock_client.mirror_stub.subscribeTopic.side_effect = [error, error]
-    handle = query.subscribe(mock_client, on_message=MagicMock(), on_error=MagicMock())
+    on_error = MagicMock()
+    handle = query.subscribe(mock_client, on_message=MagicMock(), on_error=on_error)

     handle._thread.join(timeout=2.0)
+    assert not handle._thread.is_alive(), "Subscription thread should have terminated after retries exhausted"

     assert mock_client.mirror_stub.subscribeTopic.call_count == 2
+    on_error.assert_called_once_with(error)

As per coding guidelines, "Tests must provide useful error messages when they fail for future debugging" and should verify error handler contracts.


ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: e10049ac-fd87-4829-a382-583a10b742f7

📥 Commits

Reviewing files that changed from the base of the PR and between 45c80fb and bb49b14.

📒 Files selected for processing (4)
  • src/hiero_sdk_python/query/topic_message_query.py
  • src/hiero_sdk_python/utils/subscription_handle.py
  • tests/unit/subscription_handle_test.py
  • tests/unit/topic_message_query_test.py

Comment on lines +176 to +177
if self._limit > 0:
request.limit = max(0, self._limit - state.count)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Count emitted TopicMessages before decrementing limit.

  • File and line: src/hiero_sdk_python/query/topic_message_query.py, Lines 176-177 and 185-207
  • Proto field: ConsensusTopicQuery.limit (#4). (raw.githubusercontent.com)
  • Issue type: Wrong default
  • Description: state.count is subtracted from the next request's limit, but it is incremented for every ConsensusTopicResponse, including intermediate chunks. On retries with chunking enabled, that makes the remaining limit track wire chunks instead of delivered TopicMessages. Once the subtraction reaches 0, the retry request serializes limit = 0, and the schema defines zero/unset as "receive indefinitely", so a resumed subscription can either stop before enough logical messages are emitted or run past the caller's cap. (raw.githubusercontent.com)
  • Suggested fix: Increment state.count only after on_message() runs for a complete logical message, and stop retrying once state.count >= self._limit instead of issuing another request with limit = 0.
🐛 Proposed fix
     def _build_query_request(self, state: SubscriptionState) -> mirror_proto.ConsensusTopicQuery:
         """Build the request object based on current subscription state."""
         request = mirror_proto.ConsensusTopicQuery(topicID=self._topic_id)

         if self._end_time is not None:
             request.consensusEndTime.CopyFrom(self._end_time)

         if state.last_message is not None:
             last_message_time = state.last_message.consensusTimestamp
@@
             request.consensusStartTime.seconds = seconds
             request.consensusStartTime.nanos = nanos

             if self._limit > 0:
-                request.limit = max(0, self._limit - state.count)
+                request.limit = self._limit - state.count
         else:
             if self._start_time is not None:
                 request.consensusStartTime.CopyFrom(self._start_time)
             request.limit = self._limit

         return request

     def _handle_response(self, response, state: SubscriptionState, on_message: Callable[[TopicMessage], None]) -> None:
         """Handles single or chunked messages."""
-        state.count += 1
         state.last_message = response

         if not self._chunking_enabled or not response.HasField("chunkInfo") or response.chunkInfo.total <= 1:
             message = TopicMessage.of_single(response)
             on_message(message)
+            state.count += 1
             return
@@
         if len(chunks) == response.chunkInfo.total:
             del state.pending_messages[initial_tx_id]

             message = TopicMessage.of_many(chunks)
             on_message(message)
+            state.count += 1

     def subscribe(
@@
         def run_stream():
             while state.attempt < self._max_attempts and not subscription_handle.is_cancelled():
+                if self._limit > 0 and state.count >= self._limit:
+                    if self._completion_handler:
+                        self._completion_handler()
+                    return
+
                 state.attempt += 1
                 request = self._build_query_request(state)

As per coding guidelines, "Compare the SDK class against the proto schema" and "Ensure Query code remains: Backward-compatible."

Also applies to: 185-207

received_messages = []
handle = query.subscribe(mock_client, on_message=lambda m: received_messages.append(m))

handle._thread.join(timeout=1.0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add thread termination assertions after join to detect timeout failures early.

If join(timeout=1.0) times out and the thread remains alive, subsequent assertions will pass or fail unpredictably without a clear diagnostic message. Explicitly assert that the thread terminated.

🧵 Proposed fix
 # In test_chunk_message_handling:
     handle._thread.join(timeout=1.0)
+    assert not handle._thread.is_alive(), "Subscription thread should have terminated after consuming all chunks"

     assert len(received_messages) == 1

Apply the same pattern in test_chunk_message_handling_when_chunking_is_disabled at line 328.

As per coding guidelines, "Tests must provide useful error messages when they fail for future debugging."

Also applies to: 328-328

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

open to community review PR is open for community review and feedback queue:junior-committer PR awaiting initial quality review scope: DLT involves engineering for distributed ledger technology skill: advanced requires knowledge of multiple areas in the codebase without defined steps to implement or examples

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Advanced]: Bug in tests/integration/topic_message_query_e2e_test.py

1 participant