diff --git a/src/main/java/network/crypta/node/PeerNodeTransport.java b/src/main/java/network/crypta/node/PeerNodeTransport.java index b918bf7068..451f298f03 100644 --- a/src/main/java/network/crypta/node/PeerNodeTransport.java +++ b/src/main/java/network/crypta/node/PeerNodeTransport.java @@ -188,11 +188,7 @@ public void sendSync(Message req, ByteCounter ctr, boolean realTime) MessageItem item = sendAsync(req, cb, ctr); cb.waitForSend(MINUTES.toMillis(1)); if (!cb.done) { - LOG.warn( - "Waited too long for a blocking send for {} to {}", - req, - peer.selfPeerNode(), - new Exception(STR_ERROR)); + LOG.warn("Waited too long for a blocking send for {} to {}", req, peer.selfPeerNode()); peer.localRejectedOverload("SendSyncTimeout", realTime); // Try to un-queue it, since it presumably won't be of any use now. if (!peer.getMessageQueue().removeMessage(item)) { @@ -441,7 +437,7 @@ private class SyncMessageCallback implements AsyncMessageCallback { /** True if completion occurred due to a disconnect. */ private boolean disconnected = false; - /** True once the message has been sent to the socket. */ + /** True, once the message has been sent to the socket. */ private boolean sent = false; /** diff --git a/src/main/java/network/crypta/node/RequestTag.java b/src/main/java/network/crypta/node/RequestTag.java index 1cfaa59c1e..4f25de9fa2 100644 --- a/src/main/java/network/crypta/node/RequestTag.java +++ b/src/main/java/network/crypta/node/RequestTag.java @@ -8,24 +8,41 @@ import org.slf4j.LoggerFactory; /** - * Tracks the lifecycle and routing state for a single request. + * Tracks lifecycle, routing, and transfer bookkeeping for a single request UID. * - *
A {@code RequestTag} encapsulates bookkeeping for one in-flight request, including where the - * request started, whether it targets an SSK, transfer activity for the handler and the sender, and - * whether the data was served from the local datastore. It also coordinates unlocking with the base - * {@link UIDTag} once routing and transfer conditions are satisfied. + *
A {@code RequestTag} instance is created when a request is accepted or initiated and stays + * associated with that UID until the inbound handler and any outbound routing are complete. It + * records origin ({@link START}), request type (SSK or CHK), and whether the response was served + * locally. It also tracks sender and handler transfer activity so {@link UIDTag} can decide when + * unlocking is safe without losing state needed for diagnostics. * - *
Thread-safety: methods that mutate state are synchronized where needed; callers should respect - * this contract. References to collaborators such as {@link RequestSender} and {@link PeerNode} are - * stored using {@link java.lang.ref.WeakReference} so they do not prevent garbage collection. + *
Concurrency: most state changes synchronize on the tag. The tag is mutable and not thread-safe
+ * without those locks. Weak references are used for peers and senders so they can be collected; in
+ * that case, call sites must tolerate {@code null} when inspecting those references.
*
- * @author Matthew Toseland Responsibilities include:
+ *
+ * The start value is set at construction and remains stable for the lifetime of the tag. It is
+ * used in timeout diagnostics and accounting paths that distinguish local requests from forwarded
+ * ones. Each constant describes an origin path, not a routing choice, and does not change once
+ * assigned.
*
* Use this value when the request began in the async client layer and should be treated as a
+ * local origin for diagnostics and accounting.
+ */
ASYNC_GET,
+
+ /**
+ * Originates from this node without an external peer.
+ *
+ * Use for locally created operations where the source is the node itself, so accounting and
+ * routing treat it as local.
+ */
LOCAL,
+
+ /**
+ * Originates from a remote peer and is forwarded into this node.
+ *
+ * Use when the UID was received from a peer, so limits and diagnostics attribute ownership
+ * externally unless reassigned.
+ */
REMOTE
}
@@ -56,14 +93,20 @@ public enum START {
private NodeCHK key;
/**
- * Creates a tag for a request.
- *
- * @param isSSK {@code true} if the request targets an SSK; {@code false} for CHK.
- * @param start Origin of the request.
- * @param source Source peer for the request; may be {@code null} for local.
- * @param realTimeFlag {@code true} if the request is real-time prioritized.
- * @param uid Unique identifier for the request, as tracked by {@link UIDTag}.
- * @param node Owning node instance.
+ * Creates a tag for a single request and initializes origin metadata.
+ *
+ * The constructor records the origin, request type, and UID, and it initializes the base
+ * {@link UIDTag} with the source peer and routing tracker from {@code node}. It does not start
+ * any routing or transfer activity; it simply establishes bookkeeping that other components
+ * update as the request progresses. The instance is mutable and should be published only after
+ * construction completes to avoid race conditions.
+ *
+ * @param isSSK {@code true} when the request targets an SSK; {@code false} for CHK
+ * @param start origin classification describing how the request began; must be non-null
+ * @param source source peer for the request, or {@code null} for local origin
+ * @param realTimeFlag {@code true} for real-time priority scheduling; {@code false} otherwise
+ * @param uid unique identifier for the request, used for tracker accounting
+ * @param node owning node providing routing and tracker access; must be non-null
*/
public RequestTag(
boolean isSSK, START start, PeerNode source, boolean realTimeFlag, long uid, Node node) {
@@ -73,10 +116,16 @@ public RequestTag(
}
/**
- * Records the final status from the {@link RequestSender} and unlocks if all conditions allow it.
+ * Records the final status from the {@link RequestSender} and unlocks if possible.
+ *
+ * This method stores the terminal status code and then checks whether the tag can be unlocked
+ * based on routing, opennet wait, and handler state. It must be called with a value other than
+ * {@link RequestSender#NOT_FINISHED}; passing that sentinel indicates the sender has not finished
+ * and results in an {@link IllegalArgumentException}. If unlock conditions are satisfied, it
+ * delegates to {@link #innerUnlock(boolean)} outside the synchronized block.
*
- * @param status Final status code from {@link RequestSender}.
- * @throws IllegalArgumentException if {@code status} equals {@link RequestSender#NOT_FINISHED}.
+ * @param status terminal status code from {@link RequestSender}; must not be NOT_FINISHED
+ * @throws IllegalArgumentException if the status equals {@link RequestSender#NOT_FINISHED}
*/
public void setRequestSenderFinished(int status) {
boolean unlockNow;
@@ -96,11 +145,15 @@ public void setRequestSenderFinished(int status) {
}
/**
- * Sets the {@link RequestSender} associated with this tag.
+ * Associates a {@link RequestSender} with this tag.
*
- * @param rs The sender instance.
- * @param coalesced {@code true} if the request was satisfied by transfer coalescing and no
- * callbacks are expected from the sender.
+ * The sender reference is stored as a weak reference to avoid retaining the sender longer than
+ * necessary. If {@code coalesced} is false, the tag marks that a sender is active and expects
+ * completion callbacks; if {@code coalesced} is true, the request was satisfied by coalescing and
+ * no sender callbacks are expected. This affects {@link #mustUnlock()} decisions.
+ *
+ * @param rs sender instance responsible for outbound routing and callbacks; must be non-null
+ * @param coalesced true when coalesced transfer satisfies the request; false otherwise
*/
public synchronized void setSender(RequestSender rs, boolean coalesced) {
// When coalesced, the RequestSender will not produce events; do not wait for it.
@@ -112,10 +165,15 @@ public synchronized void setSender(RequestSender rs, boolean coalesced) {
}
/**
- * Returns whether the tag must remain locked based on current routing and transfer state.
+ * Determines whether the tag can be fully unlocked given sender and opennet state.
+ *
+ * This override extends {@link UIDTag#mustUnlock()} by holding the tag locked while a sender
+ * is still active or while the tag is waiting for an opennet decision. It should be invoked only
+ * while holding the tag monitor, as it inspects the mutable state and updates the unlocking latch
+ * in the base class. If it returns {@code true}, callers must immediately proceed to {@link
+ * #innerUnlock(boolean)}.
*
- * The tag stays locked while a {@link RequestSender} is active or this tag is waiting for an
- * opennet decision. Otherwise, it defers to {@link UIDTag#mustUnlock()}.
+ * @return {@code true} if the tag can unlock now; {@code false} otherwise
*/
@Override
protected synchronized boolean mustUnlock() {
@@ -125,12 +183,15 @@ protected synchronized boolean mustUnlock() {
}
/**
- * Clears transfer-tracking state and delegates to {@link UIDTag#innerUnlock(boolean)}.
+ * Clears transfer bookkeeping and delegates to {@link UIDTag#innerUnlock(boolean)}.
*
- * If the handler or sender was marked as transferring, this method updates the associated
- * trackers after unlocking. Callers pass through {@code noRecordUnlock} from the outer context.
+ * This override snapshots transfer state under the tag monitor, clears handler and sender
+ * transfer flags, and then calls the base unlock logic. If sender transfer tracking appears
+ * active without a matching end call, it logs a warning, captures the key and sender references,
+ * and then removes the transfer entry from the tracker after unlocking. This keeps tracker
+ * accounting consistent even when end callbacks are missed.
*
- * @param noRecordUnlock If {@code true}, do not record the unlock in the tracker.
+ * @param noRecordUnlock whether to suppress unlock record bookkeeping for this tag
*/
@Override
protected final void innerUnlock(boolean noRecordUnlock) {
@@ -159,12 +220,15 @@ protected final void innerUnlock(boolean noRecordUnlock) {
}
/**
- * Records a {@link Throwable} raised by the handler and unlocks the handler.
+ * Records a handler exception and unlocks the handler portion of the tag.
*
- * The throwable is retained for later diagnostics and logged by {@link #logStillPresent(Long)}
- * if the tag remains present.
+ * The throwable is stored for later diagnostics and included by {@link #logStillPresent(Long)}
+ * if the tag remains active. This method always logs a trace entry and then calls {@link
+ * #unlockHandler()} to allow routing to continue or complete. It does not rethrow the exception;
+ * callers should handle error propagation separately. If invoked multiple times, the most recent
+ * throwable replaces the previous one.
*
- * @param t Throwable thrown by the request handler; never {@code null}.
+ * @param t exception thrown by the request handler; must be non-null
*/
public void handlerThrew(Throwable t) {
synchronized (this) {
@@ -175,12 +239,25 @@ public void handlerThrew(Throwable t) {
unlockHandler();
}
- /** Marks that the response was served from the local datastore. */
+ /**
+ * Marks that the response was served from the local datastore.
+ *
+ * This flag is used for diagnostics and timeout logging. Setting it does not affect routing or
+ * unlock decisions; it simply records that the data was retrieved locally rather than from a
+ * transfer. The method is synchronized and idempotent, so repeated calls have no additional
+ * effect.
+ */
public synchronized void setServedFromDatastore() {
servedFromDatastore = true;
}
- /** Marks that the request was rejected. */
+ /**
+ * Marks that the request was rejected by the handler.
+ *
+ * The rejected flag is reported in timeout diagnostics and can be used by callers to explain
+ * why a request terminated early. Setting it does not unlock the tag or alter the routing state;
+ * it is purely informational. The method is synchronized and idempotent.
+ */
public synchronized void setRejected() {
rejected = true;
}
@@ -188,11 +265,13 @@ public synchronized void setRejected() {
/**
* Logs a detailed snapshot when the tag remains present beyond an expected duration.
*
- * Outputs origin, key flags, sender status, transfer state, and recorded exceptions. Logs at
- * error level; if a {@link Throwable} was recorded via {@link #handlerThrew(Throwable)}, it is
- * attached as the cause.
+ * The snapshot includes age, origin, SSK flag, datastore hit flag, sender status, transfer
+ * activity, rejection state, and opennet wait information. It also appends the base tag state via
+ * {@link UIDTag#toString()}. If a handler exception was recorded, the log entry uses it as the
+ * cause; otherwise it logs without a cause. This method performs logging only and does not change
+ * the state.
*
- * @param uid The unique identifier of the tag being reported.
+ * @param uid unique identifier of the tag being reported; may be {@code null}
*/
@Override
public void logStillPresent(Long uid) {
@@ -233,17 +312,30 @@ public void logStillPresent(Long uid) {
}
}
+ /**
+ * Records that the handler disconnected while processing the request.
+ *
+ * This flag is used for diagnostics when the tag remains present and does not by itself stop
+ * routing or unlock the tag. The method is synchronized and idempotent, allowing callers to set
+ * it safely from disconnect handlers without additional coordination.
+ */
public synchronized void handlerDisconnected() {
handlerDisconnected = true;
}
/**
- * Estimates incoming transfer count still expected for this tag.
+ * Estimates how many inbound transfers are still expected for this request.
+ *
+ * This implementation returns {@code 0} until the handler has accepted the request. Once
+ * accepted, it returns {@code 1} unless {@link #setNotRoutedOnwards()} was called, in which case
+ * no downstream transfer is expected. The values are conservative and intended for admission and
+ * scheduling decisions rather than exact accounting. The method is synchronized to align with the
+ * handler state.
*
- * @param ignoreLocalVsRemote When {@code true}, treats local and remote origins equivalently.
- * @param outwardTransfersPerInsert Unused for requests; reserved for inserts.
- * @param forAccept {@code true} if called from accept-path accounting.
- * @return {@code 1} when a downstream transfer is expected, otherwise {@code 0}.
+ * @param ignoreLocalVsRemote whether to treat local origin as remote for counting
+ * @param outwardTransfersPerInsert unused for requests; provided for API symmetry
+ * @param forAccept {@code true} when called from admission control paths
+ * @return estimated inbound transfers remaining, either {@code 0} or {@code 1}
*/
@Override
public synchronized int expectedTransfersIn(
@@ -253,12 +345,18 @@ public synchronized int expectedTransfersIn(
}
/**
- * Estimates outgoing transfer count still expected for this tag.
+ * Estimates how many outbound transfers are still expected for this request.
*
- * @param ignoreLocalVsRemote When {@code true}, allows a transfer even if local.
- * @param outwardTransfersPerInsert Unused for requests; reserved for inserts.
- * @param forAccept {@code true} if called from accept-path accounting.
- * @return {@code 1} when an outbound transfer is expected, otherwise {@code 0}.
+ * This method returns {@code 0} when the handler has not accepted, when downstream transfers
+ * are marked complete, or when acceptance accounting should not count local or restarted
+ * requests. It returns {@code 1} when outbound transfer is still expected and the request should
+ * be treated as remote (or {@code ignoreLocalVsRemote} forces that). The result is synchronized
+ * with the mutable state.
+ *
+ * @param ignoreLocalVsRemote whether to count local origin as remote for scheduling
+ * @param outwardTransfersPerInsert unused for requests; provided for API symmetry
+ * @param forAccept {@code true} when called from admission control paths
+ * @return estimated outbound transfers remaining, either {@code 0} or {@code 1}
*/
@Override
public synchronized int expectedTransfersOut(
@@ -271,33 +369,68 @@ public synchronized int expectedTransfersOut(
private boolean completedDownstreamTransfers;
- /** Marks that all downstream transfers have completed for this tag. */
+ /**
+ * Marks that all downstream transfers have completed for this tag.
+ *
+ * Once set, {@link #expectedTransfersOut(boolean, int, boolean)} will return {@code 0} to
+ * avoid accounting for additional outbound transfers. This method does not unlock the tag by
+ * itself; it only updates the completion flag. The update is synchronized and idempotent.
+ */
public synchronized void completedDownstreamTransfers() {
this.completedDownstreamTransfers = true;
}
- /** {@inheritDoc} */
+ /**
+ * Reports whether this tag represents an SSK request.
+ *
+ * This implementation returns the immutable {@code isSSK} flag recorded at construction. The
+ * value is used by routing, logging, and transfer estimation, and it is stable for the lifetime
+ * of the tag. The method performs no synchronization because the flag never changes.
+ *
+ * @return {@code true} if the request targets an SSK; {@code false} for CHK
+ */
@Override
public boolean isSSK() {
return isSSK;
}
- /** Returns {@code false}; requests are not inserts. */
+ /**
+ * Reports whether this tag represents an insert request.
+ *
+ * Request tags model fetch-style requests, so this implementation always returns {@code
+ * false}. Callers can rely on the value being constant for this type and use it to select
+ * insert-specific routing or accounting paths.
+ *
+ * @return {@code false} because request tags do not represent inserts
+ */
@Override
public boolean isInsert() {
return false;
}
- /** Returns {@code false}; this tag does not represent an offer reply. */
+ /**
+ * Reports whether this tag represents a reply to an offer.
+ *
+ * This implementation always returns {@code false}, because {@link RequestTag} is used for
+ * request handling rather than offer replies. The value is constant for this type and can be used
+ * to skip offer-reply-specific handling.
+ *
+ * @return {@code false} because request tags are not offer replies
+ */
@Override
public boolean isOfferReply() {
return false;
}
/**
- * Records that routing is paused while waiting for an opennet decision from {@code next}.
+ * Records that routing is paused while waiting for an opennet decision.
*
- * @param next The peer consulted for opennet; must be non-null.
+ * This method stores a weak reference to the peer being consulted and affects {@link
+ * #mustUnlock()} and {@link #currentlyRoutingTo(PeerNode)} until cleared. If a wait is already
+ * recorded, it logs an error but still replaces the stored reference with the new peer. Callers
+ * should invoke {@link #finishedWaitingForOpennet(PeerNode)} when the decision arrives.
+ *
+ * @param next peer consulted for opennet; must be non-null
*/
public synchronized void waitingForOpennet(PeerNode next) {
if (waitingForOpennet != null)
@@ -311,9 +444,14 @@ public synchronized void waitingForOpennet(PeerNode next) {
}
/**
- * Clears the opennet wait state for {@code next} and unlocks if conditions allow it.
+ * Clears the opennet wait state for the given peer and unlocks if possible.
+ *
+ * If no wait is recorded, the method logs a debug message and returns. If the stored peer does
+ * not match {@code next}, it logs an error but still clears the wait state. After clearing, it
+ * reevaluates unlock conditions and may call {@link #innerUnlock(boolean)} with the current
+ * {@code noRecordUnlock} flag. Logging and unlocking occur outside the synchronized block.
*
- * @param next The peer previously recorded by {@link #waitingForOpennet(PeerNode)}.
+ * @param next peer previously recorded by {@link #waitingForOpennet(PeerNode)}
*/
public void finishedWaitingForOpennet(PeerNode next) {
boolean unlockNow;
@@ -340,10 +478,79 @@ public void finishedWaitingForOpennet(PeerNode next) {
}
/**
- * Returns {@code true} if this tag is currently routing to {@code peer} (including opennet wait).
+ * Returns routing peers for hard-timeout enforcement, including opennet wait peers.
+ *
+ * This override starts with the base routing peers and then appends the peer recorded by
+ * {@link #waitingForOpennet(PeerNode)} if it is still available and not already present. The
+ * returned array is a snapshot; callers must not modify it. The method is synchronized to avoid
+ * concurrent changes to routing or wait state.
+ *
+ * @return snapshot of peers to consider for hard-timeout enforcement
+ */
+ @Override
+ protected synchronized PeerNode[] routingPeersForHardTimeout() {
+ PeerNode[] peers = super.routingPeersForHardTimeout();
+ if (waitingForOpennet == null) return peers;
+ PeerNode pn = waitingForOpennet.get();
+ if (pn == null) return peers;
+ if (peers == null || peers.length == 0) return new PeerNode[] {pn};
+ for (PeerNode peer : peers) {
+ if (Objects.equals(peer, pn)) return peers;
+ }
+ PeerNode[] expanded = new PeerNode[peers.length + 1];
+ System.arraycopy(peers, 0, expanded, 0, peers.length);
+ expanded[peers.length] = pn;
+ return expanded;
+ }
+
+ /**
+ * Handles hard-timeout cleanup when no routing peers remain.
*
- * @param peer Candidate peer.
- * @return {@code true} when routing involves {@code peer}; otherwise {@code false}.
+ * This implementation checks whether a sender was marked active, has not reported a finished
+ * status, is not transferring, and has been garbage collected. When those conditions hold, it
+ * logs a warning and forces {@link RequestSender#TIMED_OUT} by calling {@link
+ * #setRequestSenderFinished(int)}. If any condition fails, it returns {@code false} so the base
+ * logic can continue without forced completion.
+ *
+ * @param continueAge time since timeout-continue was first recorded, in milliseconds
+ * @return {@code true} if a forced sender completion occurred; {@code false} otherwise
+ */
+ @Override
+ protected boolean handleHardTimeoutWithoutPeers(long continueAge) {
+ WeakReference This override extends {@link UIDTag#currentlyRoutingTo(PeerNode)} by treating the opennet
+ * decision peer as active while the tag is waiting for it. It does not indicate that a sending is
+ * in flight, only that the routing state still considers the peer relevant. The method is
+ * synchronized to coordinate with updates to the wait state.
+ *
+ * @param peer candidate peer to test against routing state; must be non-null
+ * @return {@code true} if routing state includes the peer; {@code false} otherwise
*/
@Override
public synchronized boolean currentlyRoutingTo(PeerNode peer) {
@@ -351,7 +558,13 @@ public synchronized boolean currentlyRoutingTo(PeerNode peer) {
return super.currentlyRoutingTo(peer);
}
- /** Marks the beginning of handler-side transfer and registers it with the tracker. */
+ /**
+ * Marks the beginning of handler-side transfer and registers it with the tracker.
+ *
+ * This method sets the handler transfer flag once and then registers the UID with the tracker
+ * so that transfer accounting reflects the in-progress handler work. It is safe to call multiple
+ * times; further calls have no effect once the flag is set.
+ */
public void handlerTransferBegins() {
synchronized (this) {
if (handlerTransferring) return;
@@ -363,11 +576,15 @@ public void handlerTransferBegins() {
/**
* Marks the beginning of sender-side transfer and registers it with the tracker.
*
- * @param k The content key for the transfer; must match {@link #senderTransferEnds(NodeCHK,
- * RequestSender)}.
- * @param requestSender The active {@link RequestSender}; must equal the sender set via {@link
- * #setSender(RequestSender, boolean)}.
- * @throws IllegalStateException if the sender was not set before calling this method.
+ * The method sets the sender transfer flag, records the content key, and registers the sender
+ * with the tracker for transfer accounting. It requires that {@link #setSender(RequestSender,
+ * boolean)} was called with the same sender instance; otherwise it throws. If the transfer has
+ * already started, the method returns without changing the state. This method does not perform
+ * unlocking.
+ *
+ * @param k content key for the transfer; must match the end call
+ * @param requestSender active sender instance; must match the sender set earlier
+ * @throws IllegalStateException if the sender was not set or mismatched
*/
public void senderTransferBegins(NodeCHK k, RequestSender requestSender) {
synchronized (this) {
@@ -383,10 +600,14 @@ public void senderTransferBegins(NodeCHK k, RequestSender requestSender) {
/**
* Marks the end of sender-side transfer and unregisters it from the tracker.
*
- * @param key The content key used when the transfer began.
- * @param requestSender The {@link RequestSender} that initiated the transfer.
- * @throws IllegalStateException if {@code requestSender} or {@code key} does not match the values
- * recorded by {@link #senderTransferBegins(NodeCHK, RequestSender)}.
+ * If the transfer was already cleared, the method returns without side effects. Otherwise, it
+ * verifies that the sender and key match the values recorded by {@link
+ * #senderTransferBegins(NodeCHK, RequestSender)}, clears the stored key, and then removes the
+ * sender from transfer accounting. Mismatches are treated as illegal state and thrown.
+ *
+ * @param key content key used when the transfer began; must match the stored key
+ * @param requestSender sender that initiated the transfer; must match stored sender
+ * @throws IllegalStateException if the sender or key does not match
*/
public void senderTransferEnds(NodeCHK key, RequestSender requestSender) {
synchronized (this) {
diff --git a/src/main/java/network/crypta/node/UIDTag.java b/src/main/java/network/crypta/node/UIDTag.java
index 4a67ca3290..5cb0bc8b14 100644
--- a/src/main/java/network/crypta/node/UIDTag.java
+++ b/src/main/java/network/crypta/node/UIDTag.java
@@ -4,32 +4,45 @@
import java.lang.ref.WeakReference;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import network.crypta.support.TimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Base class for tags that represent a single in‑flight request.
+ * Tracks routing and lifecycle state for a single in-flight request UID.
*
- * A tag tracks routing state (peers we contacted or are contacting), handler state (whether the
- * incoming side has completed), and bookkeeping required to decide when it is safe to release the
- * unique identifier (UID) for reuse. Subclasses define request‑specific behavior and logging.
+ * A {@code UIDTag} instance is created when a request is accepted or initiated and remains bound
+ * to the UID until the inbound handler completes and all outbound routing or offered-key fetch work
+ * has been resolved. It records which peers were contacted, which peers are still considered
+ * active, and whether ownership has been reassigned locally. That state drives unlock decisions in
+ * {@link RequestTracker} and determines when the UID can be safely reused.
*
- * Concurrency: many methods are {@code synchronized} on {@code this}. Callers must respect the
- * locking comments on helpers that assume the monitor is already held. The class itself is not
- * immutable.
+ * Concurrency: most mutating operations synchronize on the tag itself. Callers must respect the
+ * locking notes for helpers that assume the monitor is already held. Instances are mutable and are
+ * not thread-safe without the documented synchronization.
*
- * Lifetime: a tag is created when a request is accepted or initiated and is unlocked when both
- * the handler (incoming) and all outstanding outbound activities are complete, or when ownership is
- * reassigned locally as part of timeout handling.
+ * Notable behaviors include:
*
- * @author Matthew Toseland The tracker reference is established at construction from the owning node. It is stable and
+ * non-null for the lifetime of the tag and is used by {@link #innerUnlock(boolean)} to release
+ * the UID and update routing statistics.
+ */
protected final RequestTracker tracker;
+
+ /**
+ * Indicates whether the inbound handler has accepted the request.
+ *
+ * This flag is set when the handler decides to process the request. Subclasses may use it to
+ * reason about progress and for logging. Access is synchronized on the tag when mutated or read
+ * concurrently.
+ */
protected boolean accepted;
+
+ /**
+ * Signals that the original source has restarted or disconnected.
+ *
+ * When true, routing decisions may treat the request as restarted, and continuation behavior
+ * changes to avoid sending additional messages to the original source. The value is updated while
+ * the request is in flight and can be consulted by subclasses.
+ */
protected boolean sourceRestarted;
/** Nodes we routed to at any point during this tag's lifetime. */
@@ -61,11 +98,34 @@ public abstract class UIDTag {
*/
private HashSet Handlers set this flag when they decide no additional peers should be contacted. It is
+ * mutable, read by subclasses and routing policies, and is guarded by the tag monitor when
+ * updates are concurrent.
+ */
protected boolean notRoutedOnwards;
+
final long uid;
+ /**
+ * Tracks whether the inbound handler has completed and unlocked.
+ *
+ * Once true, the UID can be released when no outbound peers remain. This flag is updated by
+ * {@link #unlockHandler(boolean)} and should only be changed while holding the tag monitor.
+ */
protected boolean unlockedHandler;
+
+ /**
+ * Controls whether unlock bookkeeping should be recorded.
+ *
+ * When true, the tag should be released without emitting certain record updates. It is set by
+ * the inbound handler during unlocking to suppress record writing in specific call paths and is
+ * read by {@link #innerUnlock(boolean)}.
+ */
protected boolean noRecordUnlock;
+
private boolean hasUnlocked;
private boolean waitingForSlot;
@@ -83,11 +143,15 @@ public abstract class UIDTag {
}
/**
- * Log that this tag still exists after the request timeout threshold.
+ * Logs that this tag remains after the request timeout threshold.
*
- * Subclasses decide the logging level and include any identifiers they control.
+ * Subclasses decide the logging level, wording, and identifiers because they know the request
+ * type. This callback is invoked by {@link #maybeLogStillPresent(long, Long)} once the timeout
+ * has elapsed and can be called multiple times over the lifetime of the tag; rate limiting is
+ * handled by the caller. Implementations should be lightweight and tolerate a {@code null} UID
+ * when the subclass cannot or should not expose the identifier.
*
- * @param uid Optional UID to include in the log; may be {@code null} when not applicable.
+ * @param uid optional UID to include; may be {@code null} in local contexts
*/
public abstract void logStillPresent(Long uid);
@@ -96,14 +160,17 @@ long age() {
}
/**
- * Mark that we route to, or fetch an offered key from, a peer. Call before sending the outbound
- * message so per‑peer capacity accounting is accurate.
+ * Records that this tag is routing to a peer or fetching an offered key.
+ *
+ * Call this before the outbound message is sent so per-peer capacity accounting and timeout
+ * handling can attribute work correctly. The peer is added to the historical {@code routedTo} set
+ * and also to the active routing set that matches the {@code offeredKey} flag. The method is
+ * synchronized to avoid concurrent mutations of the peer sets and returns whether the peer was
+ * newly recorded for this activity.
*
- * @param peer Peer we route to or fetch from.
- * @param offeredKey {@code true} for an offered‑key fetch; {@code false} for a normal route.
- * Offered‑key fetches use shorter timeouts.
- * @return {@code true} if the peer was newly added to the corresponding set; {@code false} if it
- * was already present.
+ * @param peer peer being routed to or fetched from; must be non-null
+ * @param offeredKey {@code true} for offered-key fetches, {@code false} for normal routing
+ * @return {@code true} if the peer was newly recorded; {@code false} if already present
*/
public synchronized boolean addRoutedTo(PeerNode peer, boolean offeredKey) {
if (LOG.isDebugEnabled())
@@ -132,10 +199,15 @@ public synchronized boolean addRoutedTo(PeerNode peer, boolean offeredKey) {
}
/**
- * Whether we have ever routed to the given peer for this tag.
+ * Reports whether this tag has ever routed to the given peer.
+ *
+ * This query checks the historical routing set, not just the currently active peers. It is
+ * synchronized to provide a consistent view of routing history when other threads are updating
+ * the sets. A {@code false} return means the peer has never been recorded for this tag, and it
+ * does not imply anything about current reachability.
*
- * @param peer Peer to test.
- * @return {@code true} if {@code peer} has been recorded in {@link #routedTo} at least once.
+ * @param peer peer to test against the historical routing set
+ * @return {@code true} when the peer is present in the history; {@code false} otherwise
*/
@SuppressWarnings("unused")
public synchronized boolean hasRoutedTo(PeerNode peer) {
@@ -144,10 +216,15 @@ public synchronized boolean hasRoutedTo(PeerNode peer) {
}
/**
- * Whether we are currently routing to the given peer.
+ * Reports whether this tag is currently routing to the given peer.
*
- * @param peer Peer to test.
- * @return {@code true} if {@code peer} is present in {@link #currentlyRoutingTo}.
+ * The check consults the active routing set, which represents peers that still consider the
+ * UID active on their side. The method is synchronized to avoid concurrent modification while
+ * routing decisions or removals are in progress. It does not guarantee that an outbound sending
+ * is currently in flight or that it will succeed.
+ *
+ * @param peer peer to test against the active routing set
+ * @return {@code true} if the peer is currently active; {@code false} if not recorded
*/
public synchronized boolean currentlyRoutingTo(PeerNode peer) {
if (currentlyRoutingTo == null) return false;
@@ -160,10 +237,15 @@ public synchronized boolean currentlyRoutingTo(PeerNode peer) {
// receiving an acknowledgement sent after the UID is cleared.
/**
- * Whether we are currently fetching an offered key from the given peer.
+ * Reports whether this tag is currently fetching an offered key from a peer.
+ *
+ * This check reads the active offered-key set, which is distinct from normal routing peers and
+ * may use different timeout behavior. The method is synchronized to avoid concurrent modification
+ * while updates or removals are happening. A {@code true} result means the tag still expects an
+ * offered-key response, not that the transfer is guaranteed.
*
- * @param peer Peer to test.
- * @return {@code true} if {@code peer} is present in {@link #fetchingOfferedKeyFrom}.
+ * @param peer peer to test against the offered-key fetch set
+ * @return {@code true} if the peer is currently in the offered-key set; otherwise {@code false}
*/
public synchronized boolean currentlyFetchingOfferedKeyFrom(PeerNode peer) {
if (fetchingOfferedKeyFrom == null) return false;
@@ -171,12 +253,15 @@ public synchronized boolean currentlyFetchingOfferedKeyFrom(PeerNode peer) {
}
/**
- * Notify that we no longer fetch an offered key from a peer. Call only when the peer no longer
- * believes we route to it; see {@link #removeRoutingTo(PeerNode)} for rationale. When we are not
- * routing to any peers, not fetching offered keys, and the handler is unlocked, the UID is
- * released.
+ * Marks that we no longer fetch an offered key from a peer.
*
- * @param next Peer we are no longer fetching an offered key from.
+ * Call this only once the peer is reasonably certain we stopped the offered-key fetch, similar
+ * to {@link #removeRoutingTo(PeerNode)}. The peer is removed from the offered-key tracking set
+ * and from timeout handling bookkeeping. If this change makes the tag eligible for unlocking and
+ * the handler is already unlocked, the method triggers {@link #innerUnlock(boolean)} after
+ * releasing the monitor.
+ *
+ * @param next peer that is no longer providing an offered key
*/
public void removeFetchingOfferedKeyFrom(PeerNode next) {
boolean removed;
@@ -216,16 +301,15 @@ public void removeFetchingOfferedKeyFrom(PeerNode next) {
}
/**
- * Notify that we no longer route to a peer. When we are not routing to any peers (or fetching
- * offered keys) and the handler is unlocked, the tag is fully unlocked. This is most relevant to
- * incoming requests; outgoing requests only consider outbound routing.
+ * Marks that we no longer route to a peer.
*
- * Do not call until the peer is reasonably certain we stopped routing to it. We unlock the
- * handler as early as possible, without waiting to acknowledge our completion notice. Late on
- * sending and early on accepting avoids the peer thinking we finished when we did not, or us
- * thinking the next peer finished when it did not.
+ * When no peers remain in the routing or offered-key sets and the handler has unlocked, the
+ * tag becomes eligible for full unlocking. Call this only when the peer is reasonably certain we
+ * stopped routing to it so that early unlock does not race with downstream completion. The method
+ * also clears timeout bookkeeping for the peer and can trigger {@link #innerUnlock(boolean)}
+ * after the synchronized section completes.
*
- * @param next Peer we are no longer routing to.
+ * @param next peer that is no longer being routed to
*/
public void removeRoutingTo(PeerNode next) {
if (LOG.isDebugEnabled()) {
@@ -274,14 +358,27 @@ public void removeRoutingTo(PeerNode next) {
innerUnlock(localNoRecordUnlock);
}
+ /**
+ * Performs the actual UID unlocking through the tracker.
+ *
+ * This method delegates to {@link RequestTracker#unlockUID(UIDTag, boolean, boolean)} with the
+ * correct flags for this tag. It should be called immediately after {@link #mustUnlock()} returns
+ * {@code true} so that state and accounting remain consistent. Callers should already have
+ * decided that no further outbound routing or offered-key fetches remain.
+ *
+ * @param noRecordUnlock whether to suppress unlock record bookkeeping for this tag
+ */
protected void innerUnlock(boolean noRecordUnlock) {
tracker.unlockUID(this, false, noRecordUnlock);
}
/**
- * Called after {@link #innerUnlock(boolean)} to notify peers that tracked this tag.
+ * Notifies peers that previously tracked this tag after it has been unlocked.
*
- * The best‑effort; missing peers are ignored.
+ * This method is the best effort: it snapshots the {@code routedTo} set and invokes {@link
+ * PeerNode#postUnlock(Object)} for each peer that is still reachable. Missing or already
+ * disconnected peers are ignored. Callers should invoke it after {@link #innerUnlock(boolean)} to
+ * allow peers to release any tag-specific bookkeeping on their side.
*/
@SuppressWarnings("unused")
public void postUnlock() {
@@ -294,30 +391,45 @@ public void postUnlock() {
}
/**
- * Estimate expected inbound transfers attributed to this tag.
+ * Estimates expected inbound transfers attributed to this tag.
*
- * @param ignoreLocalVsRemote When {@code true}, treat the request as remote even if local.
- * @param outwardTransfersPerInsert Expected number of outbound transfers per insert operation.
- * @param forAccept When {@code true}, compute for admission control; when {@code false}, compute
- * for sending decisions where we must be more conservative to avoid avoidable rejections and
- * mandatory backoffs.
+ * The estimate is used to decide whether a request should be accepted or how aggressively it
+ * should be routed. Subclasses should account for the request type, locality, and any special
+ * cases that change transfer counts. The returned value is used for control decisions rather than
+ * exact accounting, so it should be conservative when {@code forAccept} is {@code false}.
+ *
+ * @param ignoreLocalVsRemote when {@code true}, treat the request as remote
+ * @param outwardTransfersPerInsert expected outbound transfers per insert operation
+ * @param forAccept {@code true} for admission control; {@code false} for send decisions
+ * @return estimated number of inbound transfers this tag is expected to represent
*/
public abstract int expectedTransfersIn(
boolean ignoreLocalVsRemote, int outwardTransfersPerInsert, boolean forAccept);
/**
- * Estimate expected outbound transfers attributed to this tag.
+ * Estimates expected outbound transfers attributed to this tag.
+ *
+ * The estimate informs routing and admission control decisions for outbound traffic.
+ * Subclasses should incorporate request type and locality when deriving the count, and they
+ * should be conservative when the result is used for sending decisions. The method is abstract,
+ * so each request type can encode its specific transfer pattern.
*
- * @param ignoreLocalVsRemote When {@code true}, treat the request as remote even if local.
- * @param outwardTransfersPerInsert Expected number of outbound transfers per insert operation.
- * @param forAccept When {@code true}, compute for admission control; when {@code false}, compute
- * for sending decisions where we must be more conservative to avoid avoidable rejections and
- * mandatory backoffs.
+ * @param ignoreLocalVsRemote when {@code true}, treat the request as remote
+ * @param outwardTransfersPerInsert expected outbound transfers per insert operation
+ * @param forAccept {@code true} for admission control; {@code false} for send decisions
+ * @return estimated number of outbound transfers this tag is expected to represent
*/
public abstract int expectedTransfersOut(
boolean ignoreLocalVsRemote, int outwardTransfersPerInsert, boolean forAccept);
- /** Mark that this request will not be routed further downstream. */
+ /**
+ * Marks that this request will not be routed further downstream.
+ *
+ * This flag is set by handlers that decide no additional peers should be contacted. It does
+ * not itself unlock the tag, but it can influence subclass decisions and routing heuristics. The
+ * method does not alter existing routing sets, and it is synchronized to keep the state
+ * consistent with other routing updates.
+ */
public synchronized void setNotRoutedOnwards() {
this.notRoutedOnwards = true;
}
@@ -325,10 +437,14 @@ public synchronized void setNotRoutedOnwards() {
private boolean reassigned;
/**
- * Get the effective source node for load management.
+ * Returns the effective source peer for load management.
+ *
+ * The original source is returned only when the request was not local and has not been
+ * reassigned to self. Because the reference is weak, the GC may also have cleared the peer, in
+ * which case this method returns {@code null}. The method is synchronized to provide a consistent
+ * view with reassignment updates.
*
- * @return The original source {@link PeerNode}, or {@code null} if the tag has been reassigned
- * locally or originated locally.
+ * @return the original source peer, or {@code null} if local, reassigned, or collected
*/
public synchronized PeerNode getSource() {
if (reassigned) return null;
@@ -336,19 +452,44 @@ public synchronized PeerNode getSource() {
return sourceRef.get();
}
- /** Reassign the tag locally rather than attributing it to the original sender. */
+ /**
+ * Reassigns the tag locally rather than attributing it to the original sender.
+ *
+ * When invoked for a non-local tag, the request is treated as locally owned for later load
+ * management decisions. Calls are idempotent; invoking this method on a locally originated tag is
+ * a no-op. After reassignment, {@link #getSource()} returns {@code null}, and accounting treats
+ * the request as local. The method records the reassignment in {@link UIDTraceLogger}.
+ */
public synchronized void reassignToSelf() {
if (wasLocal) return;
reassigned = true;
UIDTraceLogger.log("reassignToSelf", this);
}
- /** Whether the request originated locally. Not affected by {@link #reassignToSelf()}. */
+ /**
+ * Reports whether the request originated locally.
+ *
+ * This value reflects the original origin and is not affected by {@link #reassignToSelf()}.
+ * The method is inexpensive and does not synchronize because the flag is immutable after
+ * construction. Callers often use it to distinguish local requests from those that arrived from a
+ * peer for accounting and logging.
+ *
+ * @return {@code true} if the request was created locally; {@code false} otherwise
+ */
public boolean wasLocal() {
return wasLocal;
}
- /** Whether the request is considered local now (originated locally or reassigned to self). */
+ /**
+ * Reports whether the request is considered local now.
+ *
+ * A tag is considered local if it originated locally or if it has been reassigned to self as
+ * part of timeout handling. The method synchronizes when checking the reassignment state to avoid
+ * races with updates. It is safe to call immediately after {@link #reassignToSelf()} to reflect
+ * the updated ownership.
+ *
+ * @return {@code true} if the request is treated as local; {@code false} otherwise
+ */
public boolean isLocal() {
if (wasLocal) return true;
synchronized (this) {
@@ -356,18 +497,51 @@ public boolean isLocal() {
}
}
- /** Returns {@code true} if this tag represents an SSK request. */
+ /**
+ * Reports whether this tag represents an SSK request.
+ *
+ * Subclasses implement this to reflect the request type. The result is used by routing and
+ * logging decisions that need to distinguish SSK traffic from other request classes. It also
+ * informs transfer estimation for requests that have different patterns. Callers treat the value
+ * as stable for the lifetime of the tag.
+ *
+ * @return {@code true} if the underlying request is an SSK; {@code false} otherwise
+ */
public abstract boolean isSSK();
- /** Returns {@code true} if this tag represents an insert request. */
+ /**
+ * Reports whether this tag represents an insert request.
+ *
+ * Subclasses should return {@code true} when the request inserts data rather than fetching it.
+ * The flag influences transfer estimation and some routing policies, so implementations should be
+ * consistent with the request's primary data flow. The return value is expected to remain
+ * consistent for the lifetime of the tag.
+ *
+ * @return {@code true} if the request inserts data; {@code false} otherwise
+ */
public abstract boolean isInsert();
- /** Returns {@code true} if this tag represents a reply to an offer. */
+ /**
+ * Reports whether this tag represents a reply to an offer.
+ *
+ * Offer replies often use different timeout handling and transfer expectations. Subclasses
+ * should return {@code true} for those requests so routing decisions can reflect that behavior.
+ * The result may also influence how peers are tracked for offered-key handling. Callers should
+ * not assume an offer reply implies any particular routing peer set.
+ *
+ * @return {@code true} if the request is an offer reply; {@code false} otherwise
+ */
public abstract boolean isOfferReply();
/**
- * Caller must call innerUnlock(noRecordUnlock) immediately if this returns true. Hence, derived
- * versions should call mustUnlock() only after they have checked their own unlocking blockers.
+ * Determines whether the tag can be unlocked right now.
+ *
+ * This method checks handler state and outstanding routing or offered-key fetches. If it
+ * returns {@code true}, callers must invoke {@link #innerUnlock(boolean)} immediately while still
+ * honoring any subclass-specific unlocking blockers. It also latches the unlocking decision so
+ * that only one caller proceeds with the actual unlocking.
+ *
+ * @return {@code true} if the tag is ready to unlock; {@code false} otherwise
*/
protected synchronized boolean mustUnlock() {
if (hasUnlocked || !unlockedHandler) {
@@ -450,12 +624,15 @@ private boolean isAnyHandlingTimeout(Set Once both the incoming and outgoing requests are unlocked, the whole tag is unlocked.
+ * This method should be called before sending the completion acknowledgement downstream so the
+ * peer does not assume completion while we are still holding its slot. It records the {@code
+ * noRecord} preference, marks the handler as unlocked, and then checks whether outbound routing
+ * or offered-key fetches remain. If none remain, it triggers {@link #innerUnlock(boolean)};
+ * otherwise it logs that the unlocking is deferred.
+ *
+ * @param noRecord whether to suppress unlock record bookkeeping for this tag
*/
public void unlockHandler(boolean noRecord) {
boolean canUnlock;
@@ -473,6 +650,13 @@ public void unlockHandler(boolean noRecord) {
}
}
+ /**
+ * Convenience overload that unlocks the handler with recording enabled.
+ *
+ * This is equivalent to calling {@link #unlockHandler(boolean)} with {@code false}. It is
+ * provided for call sites that do not need to suppress unlock record bookkeeping. Repeated calls
+ * are safe because the underlying method checks and returns once the handler is already unlocked.
+ */
public void unlockHandler() {
unlockHandler(false);
}
@@ -505,11 +689,14 @@ public synchronized String toString() {
}
/**
- * Mark that we are handling a timeout for the given peer. If the handler later unlocks while the
- * peer still appears in routing/fetching sets, we will reassign this tag locally (rather than log
- * an error) and wait for the fatal timeout.
+ * Records that a timeout is being handled for a peer.
+ *
+ * If the handler later unlocks while the peer still appears in the routing or offered-key
+ * tracking sets, this marker allows the tag to be reassigned locally instead of logging a
+ * spurious error. The timeout marker is cleared when the peer is removed from tracking sets.
+ * Adding the same peer more than once is harmless because the set deduplicates entries.
*
- * @param next Peer for which a timeout is being handled.
+ * @param next peer for which a timeout is being handled
*/
public synchronized void handlingTimeout(PeerNode next) {
if (handlingTimeouts == null) handlingTimeouts = new HashSet<>();
@@ -521,10 +708,15 @@ public synchronized void handlingTimeout(PeerNode next) {
private static final long LOGGED_STILL_PRESENT_INTERVAL = SECONDS.toMillis(60);
/**
- * Log that the tag is still present after the request timeout, at most once per interval.
+ * Logs that the tag is still present after the request timeout.
*
- * @param now Current time in milliseconds since the epoch.
- * @param uid Optional UID to include in the subclass log; may be {@code null}.
+ * This method rate-limits logging to at most once per interval and delegates to {@link
+ * #logStillPresent(Long)} for the actual message. It should be called with a monotonically
+ * increasing time source, typically {@link System#currentTimeMillis()}. After logging checks, it
+ * also evaluates whether a deferred hard timeout should be forced.
+ *
+ * @param now current time in milliseconds since the epoch
+ * @param uid optional UID to include in the subclass log, may be {@code null}
*/
public void maybeLogStillPresent(long now, Long uid) {
if (now - createdTime > RequestTracker.TIMEOUT) {
@@ -534,26 +726,190 @@ public void maybeLogStillPresent(long now, Long uid) {
}
logStillPresent(uid);
}
+ maybeForceHardTimeout(now);
}
- /** Mark this request as accepted by the handler. */
+ /**
+ * Marks this request as accepted by the handler.
+ *
+ * This flag is used by subclasses to reason about progress and logging. The update is
+ * synchronized to avoid races with other handler state transitions. Repeated calls are safe and
+ * simply leave the flag set. The flag does not indicate anything about an outbound routing state.
+ */
public synchronized void setAccepted() {
accepted = true;
}
private boolean timedOutButContinued;
+ private long timeoutContinueAt;
+ private boolean hardTimeoutPeersTriggered;
+ private boolean hardTimeoutWithoutPeersTriggered;
/**
- * Set when we are going to tell downstream that the request has timed out, but can't terminate it
- * yet. We will terminate the request if we have to reroute it, and we count it towards the peer's
- * limit, but we don't stop messages to the request source.
+ * Marks that the handler timed out but processing continues.
+ *
+ * This state is used when we must inform downstream that the request has timed out but cannot
+ * yet terminate it. The request still counts toward the peer's limit, but messages to the source
+ * may continue. The method is idempotent and records the first time the timeout-continue state
+ * was set so that hard timeout logic can be scheduled.
*/
public synchronized void timedOutToHandlerButContinued() {
- timedOutButContinued = true;
+ if (!timedOutButContinued) {
+ timedOutButContinued = true;
+ timeoutContinueAt = System.currentTimeMillis();
+ } else if (timeoutContinueAt == 0L) {
+ timeoutContinueAt = System.currentTimeMillis();
+ }
UIDTraceLogger.log("timeoutContinue", this);
}
- /** Mark that the handler disconnected or restarted. */
+ private void maybeForceHardTimeout(long now) {
+ HardTimeoutContext context = resolveHardTimeoutContext(now);
+ if (context == null) return;
+ if (context.handleWithoutPeers()) {
+ if (handleHardTimeoutWithoutPeers(context.continueAge())) {
+ markHardTimeoutWithoutPeersTriggered();
+ }
+ return;
+ }
+ logAndForcePeerTimeout(context.continueAge(), context.routingPeers(), context.offeredPeers());
+ }
+
+ private HardTimeoutContext resolveHardTimeoutContext(long now) {
+ PeerNode[] routingPeersArray;
+ PeerNode[] offeredPeersArray;
+ long continueAge;
+ synchronized (this) {
+ if (!timedOutButContinued || !unlockedHandler) return null;
+ if (timeoutContinueAt == 0L) return null;
+ continueAge = now - timeoutContinueAt;
+ if (continueAge < HARD_TIMEOUT_AFTER_CONTINUE) return null;
+ routingPeersArray = routingPeersForHardTimeout();
+ offeredPeersArray = offeredKeyPeersForHardTimeout();
+ }
+ List The returned array is built while holding the tag monitor, so it is safe to iterate without
+ * concurrent modification. Callers should treat it as a snapshot for timeout forcing and logging;
+ * later routing updates may change the active set. An empty array indicates there are no active
+ * routing peers at the time of the snapshot.
+ *
+ * @return array of active routing peers, or an empty array if none remain
+ */
+ protected synchronized PeerNode[] routingPeersForHardTimeout() {
+ if (currentlyRoutingTo == null || currentlyRoutingTo.isEmpty()) return NO_PEERS;
+ return currentlyRoutingTo.toArray(new PeerNode[0]);
+ }
+
+ /**
+ * Returns a snapshot of peers still considered active for offered-key timeouts.
+ *
+ * The returned array is produced while holding the tag monitor, providing a consistent view of
+ * the offered-key tracking set. It should be used for timeout enforcement and logging only, not
+ * as a live view. An empty array indicates there are no active offered-key peers at the time of
+ * the snapshot.
+ *
+ * @return array of active offered-key peers, or an empty array if none remain
+ */
+ protected synchronized PeerNode[] offeredKeyPeersForHardTimeout() {
+ if (fetchingOfferedKeyFrom == null || fetchingOfferedKeyFrom.isEmpty()) return NO_PEERS;
+ return fetchingOfferedKeyFrom.toArray(new PeerNode[0]);
+ }
+
+ /**
+ * Allows subclasses to handle hard-timeout conditions when no peers remain.
+ *
+ * This hook is invoked after a timeout-continue period elapses, and there are no routing or
+ * offered-key peers left to force. Subclasses can perform request-specific cleanup or accounting
+ * and return {@code true} to mark the timeout as handled. Returning {@code false} leaves handling
+ * to the default path.
+ *
+ * @param continueAge time since timeout-continue was first recorded, in milliseconds
+ * @return {@code true} if the timeout was handled and should be marked as triggered
+ */
+ protected boolean handleHardTimeoutWithoutPeers(long continueAge) {
+ return false;
+ }
+
+ private record HardTimeoutContext(
+ long continueAge,
+ List This update affects load accounting and stop/continue decisions. Once set, the tag treats
+ * the request as having a restarted source, which influences {@link #countAsSourceRestarted()}
+ * and {@link #shouldStop()} decisions. The call is idempotent and simply leaves the flag set.
+ * Callers typically invoke it when a disconnect or restart is detected.
+ */
public synchronized void onRestartOrDisconnectSource() {
sourceRestarted = true;
}
@@ -563,33 +919,57 @@ public synchronized void onRestartOrDisconnectSource() {
// are appropriate.
/**
- * Should we deduct this request from the source's limit instead of counting it towards it? A
- * normal request is counted towards it. A hidden request is deducted from it. This is used when
- * the source has restarted but also in some other cases.
+ * Reports whether this request should be deducted from the source's limit.
+ *
+ * Normal requests count toward the source's limit, but some situations such as source restarts
+ * or timeout continuation are accounted for differently. This method encapsulates that policy for
+ * callers that manage admission and throttling. It has no side effects and depends only on the
+ * current restart and timeout state.
+ *
+ * @return {@code true} if the request should be deducted; {@code false} if it should count toward
+ * the source's limit
*/
public synchronized boolean countAsSourceRestarted() {
return sourceRestarted || timedOutButContinued;
}
- /** Whether we should continue sending messages to the source. */
+ /**
+ * Reports whether the original source is considered restarted.
+ *
+ * This is a direct accessor for the restart flag and is synchronized to provide a consistent
+ * view when other threads update the flag due to disconnects. Unlike {@link
+ * #countAsSourceRestarted()}, it does not consider timeout-continuation state. Callers use it to
+ * decide whether messages to the source should continue.
+ *
+ * @return {@code true} if the source is considered restarted; {@code false} otherwise
+ */
public synchronized boolean hasSourceReallyRestarted() {
return sourceRestarted;
}
/**
- * Whether we should stop the request as soon as convenient. Normally {@code true} when the source
- * restarted or disconnected.
+ * Reports whether the request should stop as soon as convenient.
+ *
+ * Stopping is typically recommended when the source has restarted or when the request has
+ * timed out but continues. The method is synchronized to reflect the latest updates to restart
+ * and timeout state. It is advisory only and does not cancel work on its own.
+ *
+ * @return {@code true} if the request should stop; {@code false} if it may continue
*/
public synchronized boolean shouldStop() {
return sourceRestarted || timedOutButContinued;
}
/**
- * Whether the given peer is the original source of this tag.
+ * Reports whether the given peer is the original source of this tag.
*
- * @param pn Peer to compare.
- * @return {@code true} if {@code pn} is the original source and the tag was not reassigned and is
- * not local.
+ * The comparison returns {@code false} if the tag has been reassigned or originated locally.
+ * It uses the stored weak reference to avoid retaining the source peer longer than necessary and
+ * synchronizes to keep the reassignment state consistent. If the weak reference has been cleared,
+ * the method returns {@code false} even when the peer was once the source.
+ *
+ * @param pn peer to compare against the original source; must be non-null
+ * @return {@code true} if the peer is the original source, and the tag is not local or reassigned
*/
public synchronized boolean isSource(PeerNode pn) {
if (reassigned) return false;
@@ -598,7 +978,13 @@ public synchronized boolean isSource(PeerNode pn) {
return sourceRef == pn.myRef;
}
- /** Indicate that the tag is waiting for an outbound slot. */
+ /**
+ * Marks that the tag is waiting for an outbound slot.
+ *
+ * This state is used by handlers to avoid double-counting pending outbound work. The method is
+ * synchronized and idempotent, and it does not itself allocate or release any slots. Callers
+ * should ensure {@link #clearWaitingForSlot()} is invoked when the wait ends or on teardown.
+ */
public synchronized void setWaitingForSlot() {
// Consider using a counter on Node.
// We must ensure it ALWAYS gets unset when some weird
@@ -607,7 +993,13 @@ public synchronized void setWaitingForSlot() {
waitingForSlot = true;
}
- /** Clear the waiting‑for‑slot state. */
+ /**
+ * Clears the waiting-for-slot state.
+ *
+ * This method is synchronized and idempotent. It is typically called when outbound work has
+ * got a slot or when the request is being torn down. If the tag was not marked as waiting, the
+ * call has no effect. Clearing the flag does not release any external resources.
+ */
public synchronized void clearWaitingForSlot() {
// Consider using a counter on Node.
// We must ensure it ALWAYS gets unset when some weird
@@ -617,7 +1009,15 @@ public synchronized void clearWaitingForSlot() {
waitingForSlot = false;
}
- /** Returns {@code true} if the tag is currently waiting for an outbound slot. */
+ /**
+ * Reports whether the tag is currently waiting for an outbound slot.
+ *
+ * The return value reflects the most recent calls to {@link #setWaitingForSlot()} and {@link
+ * #clearWaitingForSlot()}. The method is synchronized to avoid races with updates and does not
+ * imply anything about the state of any external queues. It is purely a local bookkeeping signal.
+ *
+ * @return {@code true} if the tag is waiting for a slot; {@code false} otherwise
+ */
public synchronized boolean isWaitingForSlot() {
return waitingForSlot;
}
diff --git a/src/test/java/network/crypta/node/RequestTagTest.java b/src/test/java/network/crypta/node/RequestTagTest.java
index 60438bda3d..39da3dc50e 100644
--- a/src/test/java/network/crypta/node/RequestTagTest.java
+++ b/src/test/java/network/crypta/node/RequestTagTest.java
@@ -65,7 +65,7 @@ void unlockHandler_whenCoalescedSender_allowsUnlock() {
RequestTag tag = newLocalTag(node);
RequestSender rs = org.mockito.Mockito.mock(RequestSender.class);
- // Coalesced: we won't wait for sender completion (sent flag remains false)
+ // Coalesced: we won't wait for sender completion (the "sent" flag remains false)
tag.setSender(rs, /* coalesced= */ true);
// Expect innerUnlock to delegate to tracker.unlockUID exactly once
@@ -89,7 +89,7 @@ void setRequestSenderFinished_afterUnlockHandlerPreviouslyBlockedByPendingSender
// Non-coalesced: mark as sent so mustUnlock() blocks until sender finishes
tag.setSender(rs, /* coalesced= */ false);
- tag.unlockHandler(); // should be deferred due to pending sender
+ tag.unlockHandler(); // should be deferred due to a pending sender
verify(tracker, never())
.unlockUID(
@@ -165,7 +165,7 @@ void waitingForOpennet_thenFinished_allowsUnlock() throws Exception {
// Block unlock via waitingForOpennet != null && get() != null
setPrivateWaitingForOpennet(tag, new WeakReference<>(pn));
- // Should defer unlock because waitingForOpennet is non-null
+ // Should defer unlocking because waitingForOpennet is non-null
tag.unlockHandler();
verify(tracker, never())
.unlockUID(
@@ -180,17 +180,84 @@ void waitingForOpennet_thenFinished_allowsUnlock() throws Exception {
org.mockito.Mockito.anyBoolean(),
org.mockito.Mockito.anyBoolean());
- // Now clear the wait with the same peer and expect unlock
+ // Now clear the wait with the same peer and expect unlocking
tag.finishedWaitingForOpennet(pn);
verify(tracker, times(1)).unlockUID(tag, false, false);
}
+ @Test
+ void hardTimeout_whenWaitingForOpennet_triggersFatalTimeout() throws Exception {
+ RequestTag tag = newLocalTag(node);
+ PeerNode pn = org.mockito.Mockito.mock(PeerNode.class);
+
+ setPrivateWaitingForOpennet(tag, new WeakReference<>(pn));
+
+ tag.timedOutToHandlerButContinued();
+ tag.unlockHandler();
+
+ long now = System.currentTimeMillis() + RequestTracker.TIMEOUT + 1_000;
+ tag.maybeLogStillPresent(now, UID);
+
+ verify(pn, times(1)).fatalTimeout(tag, false);
+ }
+
+ @Test
+ void hardTimeout_whenSenderLostAndNoRouting_forcesSenderFinishAndUnlocks() throws Exception {
+ RequestTag tag = newLocalTag(node);
+ RequestSender rs = org.mockito.Mockito.mock(RequestSender.class);
+
+ tag.setSender(rs, /* coalesced= */ false);
+ setPrivateSender(tag, new WeakReference<>(null));
+
+ tag.timedOutToHandlerButContinued();
+ tag.unlockHandler();
+
+ verify(tracker, never())
+ .unlockUID(
+ org.mockito.Mockito.any(),
+ org.mockito.Mockito.anyBoolean(),
+ org.mockito.Mockito.anyBoolean());
+
+ long now = System.currentTimeMillis() + RequestTracker.TIMEOUT + 1_000;
+ tag.maybeLogStillPresent(now, UID);
+
+ verify(tracker, times(1)).unlockUID(tag, false, false);
+ assertEquals(RequestSender.TIMED_OUT, getPrivateRequestSenderFinishedCode(tag));
+ }
+
+ @Test
+ void hardTimeout_afterPeerTimeout_stillAllowsSenderFallback() throws Exception {
+ RequestTag tag = newLocalTag(node);
+ RequestSender rs = org.mockito.Mockito.mock(RequestSender.class);
+ PeerNode peer = org.mockito.Mockito.mock(PeerNode.class);
+
+ tag.setSender(rs, /* coalesced= */ false);
+ setPrivateSender(tag, new WeakReference<>(null));
+ tag.addRoutedTo(peer, /* offeredKey= */ false);
+
+ tag.timedOutToHandlerButContinued();
+ tag.unlockHandler();
+
+ long now = System.currentTimeMillis() + RequestTracker.TIMEOUT + 1_000;
+ tag.maybeLogStillPresent(now, UID);
+
+ verify(peer, times(1)).fatalTimeout(tag, false);
+
+ tag.removeRoutingTo(peer);
+
+ long later = now + 61_000;
+ tag.maybeLogStillPresent(later, UID);
+
+ verify(tracker, times(1)).unlockUID(tag, false, false);
+ assertEquals(RequestSender.TIMED_OUT, getPrivateRequestSenderFinishedCode(tag));
+ }
+
@Test
void handlerTransferBegins_thenUnlock_removesFromTracker() {
RequestTag tag = newLocalTag(node);
- // Register transferring state and then fully unlock
+ // Register the transferring state and then fully unlock
tag.handlerTransferBegins();
verify(tracker, times(1)).addTransferringRequestHandler(UID);
@@ -203,7 +270,7 @@ void handlerTransferBegins_thenUnlock_removesFromTracker() {
tag.unlockHandler();
- // innerUnlock should clear transferring handler state
+ // innerUnlock should clear the transferring handler state
verify(tracker, times(1)).removeTransferringRequestHandler(UID);
verify(tracker, times(1)).unlockUID(tag, false, false);
}
@@ -228,7 +295,7 @@ void senderTransferBegins_andEnds_registersAndUnregisters() {
tag.senderTransferBegins(key, rs);
verify(tracker, times(1)).addTransferringSender(key, rs);
- // Should assert and then remove the sender from tracker
+ // Should assert and then remove the sender from the tracker
tag.senderTransferEnds(key, rs);
verify(tracker, times(1)).removeTransferringSender(key, rs);
}
@@ -294,4 +361,17 @@ private static void setPrivateWaitingForOpennet(RequestTag tag, WeakReference
+ *
+ *
+ * @see UIDTag
+ * @see RequestSender
+ * @see RequestTracker
+ * @author Matthew Toseland {@literal
*
*/
public enum START {
+ /**
+ * Originates from the asynchronous get path, typically a locally initiated fetch.
+ *
+ *
+ *
+ *
+ * @see PeerNode
+ * @see RequestTracker
+ * @see UIDTraceLogger
+ * @author Matthew Toseland {@literal