diff --git a/src/main/java/network/crypta/node/PeerNodeTransport.java b/src/main/java/network/crypta/node/PeerNodeTransport.java index b918bf7068..451f298f03 100644 --- a/src/main/java/network/crypta/node/PeerNodeTransport.java +++ b/src/main/java/network/crypta/node/PeerNodeTransport.java @@ -188,11 +188,7 @@ public void sendSync(Message req, ByteCounter ctr, boolean realTime) MessageItem item = sendAsync(req, cb, ctr); cb.waitForSend(MINUTES.toMillis(1)); if (!cb.done) { - LOG.warn( - "Waited too long for a blocking send for {} to {}", - req, - peer.selfPeerNode(), - new Exception(STR_ERROR)); + LOG.warn("Waited too long for a blocking send for {} to {}", req, peer.selfPeerNode()); peer.localRejectedOverload("SendSyncTimeout", realTime); // Try to un-queue it, since it presumably won't be of any use now. if (!peer.getMessageQueue().removeMessage(item)) { @@ -441,7 +437,7 @@ private class SyncMessageCallback implements AsyncMessageCallback { /** True if completion occurred due to a disconnect. */ private boolean disconnected = false; - /** True once the message has been sent to the socket. */ + /** True, once the message has been sent to the socket. */ private boolean sent = false; /** diff --git a/src/main/java/network/crypta/node/RequestTag.java b/src/main/java/network/crypta/node/RequestTag.java index 1cfaa59c1e..4f25de9fa2 100644 --- a/src/main/java/network/crypta/node/RequestTag.java +++ b/src/main/java/network/crypta/node/RequestTag.java @@ -8,24 +8,41 @@ import org.slf4j.LoggerFactory; /** - * Tracks the lifecycle and routing state for a single request. + * Tracks lifecycle, routing, and transfer bookkeeping for a single request UID. * - *

A {@code RequestTag} encapsulates bookkeeping for one in-flight request, including where the - * request started, whether it targets an SSK, transfer activity for the handler and the sender, and - * whether the data was served from the local datastore. It also coordinates unlocking with the base - * {@link UIDTag} once routing and transfer conditions are satisfied. + *

A {@code RequestTag} instance is created when a request is accepted or initiated and stays + * associated with that UID until the inbound handler and any outbound routing are complete. It + * records origin ({@link START}), request type (SSK or CHK), and whether the response was served + * locally. It also tracks sender and handler transfer activity so {@link UIDTag} can decide when + * unlocking is safe without losing state needed for diagnostics. * - *

Thread-safety: methods that mutate state are synchronized where needed; callers should respect - * this contract. References to collaborators such as {@link RequestSender} and {@link PeerNode} are - * stored using {@link java.lang.ref.WeakReference} so they do not prevent garbage collection. + *

Concurrency: most state changes synchronize on the tag. The tag is mutable and not thread-safe + * without those locks. Weak references are used for peers and senders so they can be collected; in + * that case, call sites must tolerate {@code null} when inspecting those references. * - * @author Matthew Toseland (0xE43DA450) + *

Responsibilities include: + * + *

+ * + * @see UIDTag + * @see RequestSender + * @see RequestTracker + * @author Matthew Toseland {@literal } (0xE43DA450) */ public class RequestTag extends UIDTag { private static final Logger LOG = LoggerFactory.getLogger(RequestTag.class); /** - * Where the request originates. + * Identifies where the request originated for routing and accounting decisions. + * + *

The start value is set at construction and remains stable for the lifetime of the tag. It is + * used in timeout diagnostics and accounting paths that distinguish local requests from forwarded + * ones. Each constant describes an origin path, not a routing choice, and does not change once + * assigned. * *

*/ public enum START { + /** + * Originates from the asynchronous get path, typically a locally initiated fetch. + * + *

Use this value when the request began in the async client layer and should be treated as a + * local origin for diagnostics and accounting. + */ ASYNC_GET, + + /** + * Originates from this node without an external peer. + * + *

Use for locally created operations where the source is the node itself, so accounting and + * routing treat it as local. + */ LOCAL, + + /** + * Originates from a remote peer and is forwarded into this node. + * + *

Use when the UID was received from a peer, so limits and diagnostics attribute ownership + * externally unless reassigned. + */ REMOTE } @@ -56,14 +93,20 @@ public enum START { private NodeCHK key; /** - * Creates a tag for a request. - * - * @param isSSK {@code true} if the request targets an SSK; {@code false} for CHK. - * @param start Origin of the request. - * @param source Source peer for the request; may be {@code null} for local. - * @param realTimeFlag {@code true} if the request is real-time prioritized. - * @param uid Unique identifier for the request, as tracked by {@link UIDTag}. - * @param node Owning node instance. + * Creates a tag for a single request and initializes origin metadata. + * + *

The constructor records the origin, request type, and UID, and it initializes the base + * {@link UIDTag} with the source peer and routing tracker from {@code node}. It does not start + * any routing or transfer activity; it simply establishes bookkeeping that other components + * update as the request progresses. The instance is mutable and should be published only after + * construction completes to avoid race conditions. + * + * @param isSSK {@code true} when the request targets an SSK; {@code false} for CHK + * @param start origin classification describing how the request began; must be non-null + * @param source source peer for the request, or {@code null} for local origin + * @param realTimeFlag {@code true} for real-time priority scheduling; {@code false} otherwise + * @param uid unique identifier for the request, used for tracker accounting + * @param node owning node providing routing and tracker access; must be non-null */ public RequestTag( boolean isSSK, START start, PeerNode source, boolean realTimeFlag, long uid, Node node) { @@ -73,10 +116,16 @@ public RequestTag( } /** - * Records the final status from the {@link RequestSender} and unlocks if all conditions allow it. + * Records the final status from the {@link RequestSender} and unlocks if possible. + * + *

This method stores the terminal status code and then checks whether the tag can be unlocked + * based on routing, opennet wait, and handler state. It must be called with a value other than + * {@link RequestSender#NOT_FINISHED}; passing that sentinel indicates the sender has not finished + * and results in an {@link IllegalArgumentException}. If unlock conditions are satisfied, it + * delegates to {@link #innerUnlock(boolean)} outside the synchronized block. * - * @param status Final status code from {@link RequestSender}. - * @throws IllegalArgumentException if {@code status} equals {@link RequestSender#NOT_FINISHED}. + * @param status terminal status code from {@link RequestSender}; must not be NOT_FINISHED + * @throws IllegalArgumentException if the status equals {@link RequestSender#NOT_FINISHED} */ public void setRequestSenderFinished(int status) { boolean unlockNow; @@ -96,11 +145,15 @@ public void setRequestSenderFinished(int status) { } /** - * Sets the {@link RequestSender} associated with this tag. + * Associates a {@link RequestSender} with this tag. * - * @param rs The sender instance. - * @param coalesced {@code true} if the request was satisfied by transfer coalescing and no - * callbacks are expected from the sender. + *

The sender reference is stored as a weak reference to avoid retaining the sender longer than + * necessary. If {@code coalesced} is false, the tag marks that a sender is active and expects + * completion callbacks; if {@code coalesced} is true, the request was satisfied by coalescing and + * no sender callbacks are expected. This affects {@link #mustUnlock()} decisions. + * + * @param rs sender instance responsible for outbound routing and callbacks; must be non-null + * @param coalesced true when coalesced transfer satisfies the request; false otherwise */ public synchronized void setSender(RequestSender rs, boolean coalesced) { // When coalesced, the RequestSender will not produce events; do not wait for it. @@ -112,10 +165,15 @@ public synchronized void setSender(RequestSender rs, boolean coalesced) { } /** - * Returns whether the tag must remain locked based on current routing and transfer state. + * Determines whether the tag can be fully unlocked given sender and opennet state. + * + *

This override extends {@link UIDTag#mustUnlock()} by holding the tag locked while a sender + * is still active or while the tag is waiting for an opennet decision. It should be invoked only + * while holding the tag monitor, as it inspects the mutable state and updates the unlocking latch + * in the base class. If it returns {@code true}, callers must immediately proceed to {@link + * #innerUnlock(boolean)}. * - *

The tag stays locked while a {@link RequestSender} is active or this tag is waiting for an - * opennet decision. Otherwise, it defers to {@link UIDTag#mustUnlock()}. + * @return {@code true} if the tag can unlock now; {@code false} otherwise */ @Override protected synchronized boolean mustUnlock() { @@ -125,12 +183,15 @@ protected synchronized boolean mustUnlock() { } /** - * Clears transfer-tracking state and delegates to {@link UIDTag#innerUnlock(boolean)}. + * Clears transfer bookkeeping and delegates to {@link UIDTag#innerUnlock(boolean)}. * - *

If the handler or sender was marked as transferring, this method updates the associated - * trackers after unlocking. Callers pass through {@code noRecordUnlock} from the outer context. + *

This override snapshots transfer state under the tag monitor, clears handler and sender + * transfer flags, and then calls the base unlock logic. If sender transfer tracking appears + * active without a matching end call, it logs a warning, captures the key and sender references, + * and then removes the transfer entry from the tracker after unlocking. This keeps tracker + * accounting consistent even when end callbacks are missed. * - * @param noRecordUnlock If {@code true}, do not record the unlock in the tracker. + * @param noRecordUnlock whether to suppress unlock record bookkeeping for this tag */ @Override protected final void innerUnlock(boolean noRecordUnlock) { @@ -159,12 +220,15 @@ protected final void innerUnlock(boolean noRecordUnlock) { } /** - * Records a {@link Throwable} raised by the handler and unlocks the handler. + * Records a handler exception and unlocks the handler portion of the tag. * - *

The throwable is retained for later diagnostics and logged by {@link #logStillPresent(Long)} - * if the tag remains present. + *

The throwable is stored for later diagnostics and included by {@link #logStillPresent(Long)} + * if the tag remains active. This method always logs a trace entry and then calls {@link + * #unlockHandler()} to allow routing to continue or complete. It does not rethrow the exception; + * callers should handle error propagation separately. If invoked multiple times, the most recent + * throwable replaces the previous one. * - * @param t Throwable thrown by the request handler; never {@code null}. + * @param t exception thrown by the request handler; must be non-null */ public void handlerThrew(Throwable t) { synchronized (this) { @@ -175,12 +239,25 @@ public void handlerThrew(Throwable t) { unlockHandler(); } - /** Marks that the response was served from the local datastore. */ + /** + * Marks that the response was served from the local datastore. + * + *

This flag is used for diagnostics and timeout logging. Setting it does not affect routing or + * unlock decisions; it simply records that the data was retrieved locally rather than from a + * transfer. The method is synchronized and idempotent, so repeated calls have no additional + * effect. + */ public synchronized void setServedFromDatastore() { servedFromDatastore = true; } - /** Marks that the request was rejected. */ + /** + * Marks that the request was rejected by the handler. + * + *

The rejected flag is reported in timeout diagnostics and can be used by callers to explain + * why a request terminated early. Setting it does not unlock the tag or alter the routing state; + * it is purely informational. The method is synchronized and idempotent. + */ public synchronized void setRejected() { rejected = true; } @@ -188,11 +265,13 @@ public synchronized void setRejected() { /** * Logs a detailed snapshot when the tag remains present beyond an expected duration. * - *

Outputs origin, key flags, sender status, transfer state, and recorded exceptions. Logs at - * error level; if a {@link Throwable} was recorded via {@link #handlerThrew(Throwable)}, it is - * attached as the cause. + *

The snapshot includes age, origin, SSK flag, datastore hit flag, sender status, transfer + * activity, rejection state, and opennet wait information. It also appends the base tag state via + * {@link UIDTag#toString()}. If a handler exception was recorded, the log entry uses it as the + * cause; otherwise it logs without a cause. This method performs logging only and does not change + * the state. * - * @param uid The unique identifier of the tag being reported. + * @param uid unique identifier of the tag being reported; may be {@code null} */ @Override public void logStillPresent(Long uid) { @@ -233,17 +312,30 @@ public void logStillPresent(Long uid) { } } + /** + * Records that the handler disconnected while processing the request. + * + *

This flag is used for diagnostics when the tag remains present and does not by itself stop + * routing or unlock the tag. The method is synchronized and idempotent, allowing callers to set + * it safely from disconnect handlers without additional coordination. + */ public synchronized void handlerDisconnected() { handlerDisconnected = true; } /** - * Estimates incoming transfer count still expected for this tag. + * Estimates how many inbound transfers are still expected for this request. + * + *

This implementation returns {@code 0} until the handler has accepted the request. Once + * accepted, it returns {@code 1} unless {@link #setNotRoutedOnwards()} was called, in which case + * no downstream transfer is expected. The values are conservative and intended for admission and + * scheduling decisions rather than exact accounting. The method is synchronized to align with the + * handler state. * - * @param ignoreLocalVsRemote When {@code true}, treats local and remote origins equivalently. - * @param outwardTransfersPerInsert Unused for requests; reserved for inserts. - * @param forAccept {@code true} if called from accept-path accounting. - * @return {@code 1} when a downstream transfer is expected, otherwise {@code 0}. + * @param ignoreLocalVsRemote whether to treat local origin as remote for counting + * @param outwardTransfersPerInsert unused for requests; provided for API symmetry + * @param forAccept {@code true} when called from admission control paths + * @return estimated inbound transfers remaining, either {@code 0} or {@code 1} */ @Override public synchronized int expectedTransfersIn( @@ -253,12 +345,18 @@ public synchronized int expectedTransfersIn( } /** - * Estimates outgoing transfer count still expected for this tag. + * Estimates how many outbound transfers are still expected for this request. * - * @param ignoreLocalVsRemote When {@code true}, allows a transfer even if local. - * @param outwardTransfersPerInsert Unused for requests; reserved for inserts. - * @param forAccept {@code true} if called from accept-path accounting. - * @return {@code 1} when an outbound transfer is expected, otherwise {@code 0}. + *

This method returns {@code 0} when the handler has not accepted, when downstream transfers + * are marked complete, or when acceptance accounting should not count local or restarted + * requests. It returns {@code 1} when outbound transfer is still expected and the request should + * be treated as remote (or {@code ignoreLocalVsRemote} forces that). The result is synchronized + * with the mutable state. + * + * @param ignoreLocalVsRemote whether to count local origin as remote for scheduling + * @param outwardTransfersPerInsert unused for requests; provided for API symmetry + * @param forAccept {@code true} when called from admission control paths + * @return estimated outbound transfers remaining, either {@code 0} or {@code 1} */ @Override public synchronized int expectedTransfersOut( @@ -271,33 +369,68 @@ public synchronized int expectedTransfersOut( private boolean completedDownstreamTransfers; - /** Marks that all downstream transfers have completed for this tag. */ + /** + * Marks that all downstream transfers have completed for this tag. + * + *

Once set, {@link #expectedTransfersOut(boolean, int, boolean)} will return {@code 0} to + * avoid accounting for additional outbound transfers. This method does not unlock the tag by + * itself; it only updates the completion flag. The update is synchronized and idempotent. + */ public synchronized void completedDownstreamTransfers() { this.completedDownstreamTransfers = true; } - /** {@inheritDoc} */ + /** + * Reports whether this tag represents an SSK request. + * + *

This implementation returns the immutable {@code isSSK} flag recorded at construction. The + * value is used by routing, logging, and transfer estimation, and it is stable for the lifetime + * of the tag. The method performs no synchronization because the flag never changes. + * + * @return {@code true} if the request targets an SSK; {@code false} for CHK + */ @Override public boolean isSSK() { return isSSK; } - /** Returns {@code false}; requests are not inserts. */ + /** + * Reports whether this tag represents an insert request. + * + *

Request tags model fetch-style requests, so this implementation always returns {@code + * false}. Callers can rely on the value being constant for this type and use it to select + * insert-specific routing or accounting paths. + * + * @return {@code false} because request tags do not represent inserts + */ @Override public boolean isInsert() { return false; } - /** Returns {@code false}; this tag does not represent an offer reply. */ + /** + * Reports whether this tag represents a reply to an offer. + * + *

This implementation always returns {@code false}, because {@link RequestTag} is used for + * request handling rather than offer replies. The value is constant for this type and can be used + * to skip offer-reply-specific handling. + * + * @return {@code false} because request tags are not offer replies + */ @Override public boolean isOfferReply() { return false; } /** - * Records that routing is paused while waiting for an opennet decision from {@code next}. + * Records that routing is paused while waiting for an opennet decision. * - * @param next The peer consulted for opennet; must be non-null. + *

This method stores a weak reference to the peer being consulted and affects {@link + * #mustUnlock()} and {@link #currentlyRoutingTo(PeerNode)} until cleared. If a wait is already + * recorded, it logs an error but still replaces the stored reference with the new peer. Callers + * should invoke {@link #finishedWaitingForOpennet(PeerNode)} when the decision arrives. + * + * @param next peer consulted for opennet; must be non-null */ public synchronized void waitingForOpennet(PeerNode next) { if (waitingForOpennet != null) @@ -311,9 +444,14 @@ public synchronized void waitingForOpennet(PeerNode next) { } /** - * Clears the opennet wait state for {@code next} and unlocks if conditions allow it. + * Clears the opennet wait state for the given peer and unlocks if possible. + * + *

If no wait is recorded, the method logs a debug message and returns. If the stored peer does + * not match {@code next}, it logs an error but still clears the wait state. After clearing, it + * reevaluates unlock conditions and may call {@link #innerUnlock(boolean)} with the current + * {@code noRecordUnlock} flag. Logging and unlocking occur outside the synchronized block. * - * @param next The peer previously recorded by {@link #waitingForOpennet(PeerNode)}. + * @param next peer previously recorded by {@link #waitingForOpennet(PeerNode)} */ public void finishedWaitingForOpennet(PeerNode next) { boolean unlockNow; @@ -340,10 +478,79 @@ public void finishedWaitingForOpennet(PeerNode next) { } /** - * Returns {@code true} if this tag is currently routing to {@code peer} (including opennet wait). + * Returns routing peers for hard-timeout enforcement, including opennet wait peers. + * + *

This override starts with the base routing peers and then appends the peer recorded by + * {@link #waitingForOpennet(PeerNode)} if it is still available and not already present. The + * returned array is a snapshot; callers must not modify it. The method is synchronized to avoid + * concurrent changes to routing or wait state. + * + * @return snapshot of peers to consider for hard-timeout enforcement + */ + @Override + protected synchronized PeerNode[] routingPeersForHardTimeout() { + PeerNode[] peers = super.routingPeersForHardTimeout(); + if (waitingForOpennet == null) return peers; + PeerNode pn = waitingForOpennet.get(); + if (pn == null) return peers; + if (peers == null || peers.length == 0) return new PeerNode[] {pn}; + for (PeerNode peer : peers) { + if (Objects.equals(peer, pn)) return peers; + } + PeerNode[] expanded = new PeerNode[peers.length + 1]; + System.arraycopy(peers, 0, expanded, 0, peers.length); + expanded[peers.length] = pn; + return expanded; + } + + /** + * Handles hard-timeout cleanup when no routing peers remain. * - * @param peer Candidate peer. - * @return {@code true} when routing involves {@code peer}; otherwise {@code false}. + *

This implementation checks whether a sender was marked active, has not reported a finished + * status, is not transferring, and has been garbage collected. When those conditions hold, it + * logs a warning and forces {@link RequestSender#TIMED_OUT} by calling {@link + * #setRequestSenderFinished(int)}. If any condition fails, it returns {@code false} so the base + * logic can continue without forced completion. + * + * @param continueAge time since timeout-continue was first recorded, in milliseconds + * @return {@code true} if a forced sender completion occurred; {@code false} otherwise + */ + @Override + protected boolean handleHardTimeoutWithoutPeers(long continueAge) { + WeakReference localSender; + boolean localSent; + int localFinishedCode; + boolean localSenderTransferring; + synchronized (this) { + localSender = sender; + localSent = sent; + localFinishedCode = requestSenderFinishedCode; + localSenderTransferring = senderTransferring; + } + if (!localSent || localFinishedCode != RequestSender.NOT_FINISHED) return false; + if (localSenderTransferring) return false; + if (localSender == null || localSender.get() != null) return false; + if (LOG.isWarnEnabled()) { + LOG.warn( + "Hard timeout after {} for {}. Forcing sender finish: status={}", + TimeUtil.formatTime(continueAge), + this, + RequestSender.getStatusString(RequestSender.TIMED_OUT)); + } + setRequestSenderFinished(RequestSender.TIMED_OUT); + return true; + } + + /** + * Reports whether this tag is currently routing to the given peer. + * + *

This override extends {@link UIDTag#currentlyRoutingTo(PeerNode)} by treating the opennet + * decision peer as active while the tag is waiting for it. It does not indicate that a sending is + * in flight, only that the routing state still considers the peer relevant. The method is + * synchronized to coordinate with updates to the wait state. + * + * @param peer candidate peer to test against routing state; must be non-null + * @return {@code true} if routing state includes the peer; {@code false} otherwise */ @Override public synchronized boolean currentlyRoutingTo(PeerNode peer) { @@ -351,7 +558,13 @@ public synchronized boolean currentlyRoutingTo(PeerNode peer) { return super.currentlyRoutingTo(peer); } - /** Marks the beginning of handler-side transfer and registers it with the tracker. */ + /** + * Marks the beginning of handler-side transfer and registers it with the tracker. + * + *

This method sets the handler transfer flag once and then registers the UID with the tracker + * so that transfer accounting reflects the in-progress handler work. It is safe to call multiple + * times; further calls have no effect once the flag is set. + */ public void handlerTransferBegins() { synchronized (this) { if (handlerTransferring) return; @@ -363,11 +576,15 @@ public void handlerTransferBegins() { /** * Marks the beginning of sender-side transfer and registers it with the tracker. * - * @param k The content key for the transfer; must match {@link #senderTransferEnds(NodeCHK, - * RequestSender)}. - * @param requestSender The active {@link RequestSender}; must equal the sender set via {@link - * #setSender(RequestSender, boolean)}. - * @throws IllegalStateException if the sender was not set before calling this method. + *

The method sets the sender transfer flag, records the content key, and registers the sender + * with the tracker for transfer accounting. It requires that {@link #setSender(RequestSender, + * boolean)} was called with the same sender instance; otherwise it throws. If the transfer has + * already started, the method returns without changing the state. This method does not perform + * unlocking. + * + * @param k content key for the transfer; must match the end call + * @param requestSender active sender instance; must match the sender set earlier + * @throws IllegalStateException if the sender was not set or mismatched */ public void senderTransferBegins(NodeCHK k, RequestSender requestSender) { synchronized (this) { @@ -383,10 +600,14 @@ public void senderTransferBegins(NodeCHK k, RequestSender requestSender) { /** * Marks the end of sender-side transfer and unregisters it from the tracker. * - * @param key The content key used when the transfer began. - * @param requestSender The {@link RequestSender} that initiated the transfer. - * @throws IllegalStateException if {@code requestSender} or {@code key} does not match the values - * recorded by {@link #senderTransferBegins(NodeCHK, RequestSender)}. + *

If the transfer was already cleared, the method returns without side effects. Otherwise, it + * verifies that the sender and key match the values recorded by {@link + * #senderTransferBegins(NodeCHK, RequestSender)}, clears the stored key, and then removes the + * sender from transfer accounting. Mismatches are treated as illegal state and thrown. + * + * @param key content key used when the transfer began; must match the stored key + * @param requestSender sender that initiated the transfer; must match stored sender + * @throws IllegalStateException if the sender or key does not match */ public void senderTransferEnds(NodeCHK key, RequestSender requestSender) { synchronized (this) { diff --git a/src/main/java/network/crypta/node/UIDTag.java b/src/main/java/network/crypta/node/UIDTag.java index 4a67ca3290..5cb0bc8b14 100644 --- a/src/main/java/network/crypta/node/UIDTag.java +++ b/src/main/java/network/crypta/node/UIDTag.java @@ -4,32 +4,45 @@ import java.lang.ref.WeakReference; import java.util.HashSet; +import java.util.List; import java.util.Set; +import network.crypta.support.TimeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Base class for tags that represent a single in‑flight request. + * Tracks routing and lifecycle state for a single in-flight request UID. * - *

A tag tracks routing state (peers we contacted or are contacting), handler state (whether the - * incoming side has completed), and bookkeeping required to decide when it is safe to release the - * unique identifier (UID) for reuse. Subclasses define request‑specific behavior and logging. + *

A {@code UIDTag} instance is created when a request is accepted or initiated and remains bound + * to the UID until the inbound handler completes and all outbound routing or offered-key fetch work + * has been resolved. It records which peers were contacted, which peers are still considered + * active, and whether ownership has been reassigned locally. That state drives unlock decisions in + * {@link RequestTracker} and determines when the UID can be safely reused. * - *

Concurrency: many methods are {@code synchronized} on {@code this}. Callers must respect the - * locking comments on helpers that assume the monitor is already held. The class itself is not - * immutable. + *

Concurrency: most mutating operations synchronize on the tag itself. Callers must respect the + * locking notes for helpers that assume the monitor is already held. Instances are mutable and are + * not thread-safe without the documented synchronization. * - *

Lifetime: a tag is created when a request is accepted or initiated and is unlocked when both - * the handler (incoming) and all outstanding outbound activities are complete, or when ownership is - * reassigned locally as part of timeout handling. + *

Notable behaviors include: * - * @author Matthew Toseland (0xE43DA450) + *

+ * + * @see PeerNode + * @see RequestTracker + * @see UIDTraceLogger + * @author Matthew Toseland {@literal } (0xE43DA450) */ public abstract class UIDTag { private static final Logger LOG = LoggerFactory.getLogger(UIDTag.class); private static final String DEBUG_EXCEPTION_MESSAGE = "debug"; private static final String PEER_LOG_PREFIX = "peer="; private static final String REMOVED_LOG_FRAGMENT = " removed="; + private static final long HARD_TIMEOUT_AFTER_CONTINUE = RequestTracker.TIMEOUT; + private static final PeerNode[] NO_PEERS = new PeerNode[0]; // No static initialization required. @@ -37,8 +50,32 @@ public abstract class UIDTag { final boolean wasLocal; private final WeakReference sourceRef; final boolean realTimeFlag; + + /** + * Tracker responsible for UID lifecycle and unlock bookkeeping. + * + *

The tracker reference is established at construction from the owning node. It is stable and + * non-null for the lifetime of the tag and is used by {@link #innerUnlock(boolean)} to release + * the UID and update routing statistics. + */ protected final RequestTracker tracker; + + /** + * Indicates whether the inbound handler has accepted the request. + * + *

This flag is set when the handler decides to process the request. Subclasses may use it to + * reason about progress and for logging. Access is synchronized on the tag when mutated or read + * concurrently. + */ protected boolean accepted; + + /** + * Signals that the original source has restarted or disconnected. + * + *

When true, routing decisions may treat the request as restarted, and continuation behavior + * changes to avoid sending additional messages to the original source. The value is updated while + * the request is in flight and can be consulted by subclasses. + */ protected boolean sourceRestarted; /** Nodes we routed to at any point during this tag's lifetime. */ @@ -61,11 +98,34 @@ public abstract class UIDTag { */ private HashSet handlingTimeouts = null; + /** + * Marks that the request should not be routed further downstream. + * + *

Handlers set this flag when they decide no additional peers should be contacted. It is + * mutable, read by subclasses and routing policies, and is guarded by the tag monitor when + * updates are concurrent. + */ protected boolean notRoutedOnwards; + final long uid; + /** + * Tracks whether the inbound handler has completed and unlocked. + * + *

Once true, the UID can be released when no outbound peers remain. This flag is updated by + * {@link #unlockHandler(boolean)} and should only be changed while holding the tag monitor. + */ protected boolean unlockedHandler; + + /** + * Controls whether unlock bookkeeping should be recorded. + * + *

When true, the tag should be released without emitting certain record updates. It is set by + * the inbound handler during unlocking to suppress record writing in specific call paths and is + * read by {@link #innerUnlock(boolean)}. + */ protected boolean noRecordUnlock; + private boolean hasUnlocked; private boolean waitingForSlot; @@ -83,11 +143,15 @@ public abstract class UIDTag { } /** - * Log that this tag still exists after the request timeout threshold. + * Logs that this tag remains after the request timeout threshold. * - *

Subclasses decide the logging level and include any identifiers they control. + *

Subclasses decide the logging level, wording, and identifiers because they know the request + * type. This callback is invoked by {@link #maybeLogStillPresent(long, Long)} once the timeout + * has elapsed and can be called multiple times over the lifetime of the tag; rate limiting is + * handled by the caller. Implementations should be lightweight and tolerate a {@code null} UID + * when the subclass cannot or should not expose the identifier. * - * @param uid Optional UID to include in the log; may be {@code null} when not applicable. + * @param uid optional UID to include; may be {@code null} in local contexts */ public abstract void logStillPresent(Long uid); @@ -96,14 +160,17 @@ long age() { } /** - * Mark that we route to, or fetch an offered key from, a peer. Call before sending the outbound - * message so per‑peer capacity accounting is accurate. + * Records that this tag is routing to a peer or fetching an offered key. + * + *

Call this before the outbound message is sent so per-peer capacity accounting and timeout + * handling can attribute work correctly. The peer is added to the historical {@code routedTo} set + * and also to the active routing set that matches the {@code offeredKey} flag. The method is + * synchronized to avoid concurrent mutations of the peer sets and returns whether the peer was + * newly recorded for this activity. * - * @param peer Peer we route to or fetch from. - * @param offeredKey {@code true} for an offered‑key fetch; {@code false} for a normal route. - * Offered‑key fetches use shorter timeouts. - * @return {@code true} if the peer was newly added to the corresponding set; {@code false} if it - * was already present. + * @param peer peer being routed to or fetched from; must be non-null + * @param offeredKey {@code true} for offered-key fetches, {@code false} for normal routing + * @return {@code true} if the peer was newly recorded; {@code false} if already present */ public synchronized boolean addRoutedTo(PeerNode peer, boolean offeredKey) { if (LOG.isDebugEnabled()) @@ -132,10 +199,15 @@ public synchronized boolean addRoutedTo(PeerNode peer, boolean offeredKey) { } /** - * Whether we have ever routed to the given peer for this tag. + * Reports whether this tag has ever routed to the given peer. + * + *

This query checks the historical routing set, not just the currently active peers. It is + * synchronized to provide a consistent view of routing history when other threads are updating + * the sets. A {@code false} return means the peer has never been recorded for this tag, and it + * does not imply anything about current reachability. * - * @param peer Peer to test. - * @return {@code true} if {@code peer} has been recorded in {@link #routedTo} at least once. + * @param peer peer to test against the historical routing set + * @return {@code true} when the peer is present in the history; {@code false} otherwise */ @SuppressWarnings("unused") public synchronized boolean hasRoutedTo(PeerNode peer) { @@ -144,10 +216,15 @@ public synchronized boolean hasRoutedTo(PeerNode peer) { } /** - * Whether we are currently routing to the given peer. + * Reports whether this tag is currently routing to the given peer. * - * @param peer Peer to test. - * @return {@code true} if {@code peer} is present in {@link #currentlyRoutingTo}. + *

The check consults the active routing set, which represents peers that still consider the + * UID active on their side. The method is synchronized to avoid concurrent modification while + * routing decisions or removals are in progress. It does not guarantee that an outbound sending + * is currently in flight or that it will succeed. + * + * @param peer peer to test against the active routing set + * @return {@code true} if the peer is currently active; {@code false} if not recorded */ public synchronized boolean currentlyRoutingTo(PeerNode peer) { if (currentlyRoutingTo == null) return false; @@ -160,10 +237,15 @@ public synchronized boolean currentlyRoutingTo(PeerNode peer) { // receiving an acknowledgement sent after the UID is cleared. /** - * Whether we are currently fetching an offered key from the given peer. + * Reports whether this tag is currently fetching an offered key from a peer. + * + *

This check reads the active offered-key set, which is distinct from normal routing peers and + * may use different timeout behavior. The method is synchronized to avoid concurrent modification + * while updates or removals are happening. A {@code true} result means the tag still expects an + * offered-key response, not that the transfer is guaranteed. * - * @param peer Peer to test. - * @return {@code true} if {@code peer} is present in {@link #fetchingOfferedKeyFrom}. + * @param peer peer to test against the offered-key fetch set + * @return {@code true} if the peer is currently in the offered-key set; otherwise {@code false} */ public synchronized boolean currentlyFetchingOfferedKeyFrom(PeerNode peer) { if (fetchingOfferedKeyFrom == null) return false; @@ -171,12 +253,15 @@ public synchronized boolean currentlyFetchingOfferedKeyFrom(PeerNode peer) { } /** - * Notify that we no longer fetch an offered key from a peer. Call only when the peer no longer - * believes we route to it; see {@link #removeRoutingTo(PeerNode)} for rationale. When we are not - * routing to any peers, not fetching offered keys, and the handler is unlocked, the UID is - * released. + * Marks that we no longer fetch an offered key from a peer. * - * @param next Peer we are no longer fetching an offered key from. + *

Call this only once the peer is reasonably certain we stopped the offered-key fetch, similar + * to {@link #removeRoutingTo(PeerNode)}. The peer is removed from the offered-key tracking set + * and from timeout handling bookkeeping. If this change makes the tag eligible for unlocking and + * the handler is already unlocked, the method triggers {@link #innerUnlock(boolean)} after + * releasing the monitor. + * + * @param next peer that is no longer providing an offered key */ public void removeFetchingOfferedKeyFrom(PeerNode next) { boolean removed; @@ -216,16 +301,15 @@ public void removeFetchingOfferedKeyFrom(PeerNode next) { } /** - * Notify that we no longer route to a peer. When we are not routing to any peers (or fetching - * offered keys) and the handler is unlocked, the tag is fully unlocked. This is most relevant to - * incoming requests; outgoing requests only consider outbound routing. + * Marks that we no longer route to a peer. * - *

Do not call until the peer is reasonably certain we stopped routing to it. We unlock the - * handler as early as possible, without waiting to acknowledge our completion notice. Late on - * sending and early on accepting avoids the peer thinking we finished when we did not, or us - * thinking the next peer finished when it did not. + *

When no peers remain in the routing or offered-key sets and the handler has unlocked, the + * tag becomes eligible for full unlocking. Call this only when the peer is reasonably certain we + * stopped routing to it so that early unlock does not race with downstream completion. The method + * also clears timeout bookkeeping for the peer and can trigger {@link #innerUnlock(boolean)} + * after the synchronized section completes. * - * @param next Peer we are no longer routing to. + * @param next peer that is no longer being routed to */ public void removeRoutingTo(PeerNode next) { if (LOG.isDebugEnabled()) { @@ -274,14 +358,27 @@ public void removeRoutingTo(PeerNode next) { innerUnlock(localNoRecordUnlock); } + /** + * Performs the actual UID unlocking through the tracker. + * + *

This method delegates to {@link RequestTracker#unlockUID(UIDTag, boolean, boolean)} with the + * correct flags for this tag. It should be called immediately after {@link #mustUnlock()} returns + * {@code true} so that state and accounting remain consistent. Callers should already have + * decided that no further outbound routing or offered-key fetches remain. + * + * @param noRecordUnlock whether to suppress unlock record bookkeeping for this tag + */ protected void innerUnlock(boolean noRecordUnlock) { tracker.unlockUID(this, false, noRecordUnlock); } /** - * Called after {@link #innerUnlock(boolean)} to notify peers that tracked this tag. + * Notifies peers that previously tracked this tag after it has been unlocked. * - *

The best‑effort; missing peers are ignored. + *

This method is the best effort: it snapshots the {@code routedTo} set and invokes {@link + * PeerNode#postUnlock(Object)} for each peer that is still reachable. Missing or already + * disconnected peers are ignored. Callers should invoke it after {@link #innerUnlock(boolean)} to + * allow peers to release any tag-specific bookkeeping on their side. */ @SuppressWarnings("unused") public void postUnlock() { @@ -294,30 +391,45 @@ public void postUnlock() { } /** - * Estimate expected inbound transfers attributed to this tag. + * Estimates expected inbound transfers attributed to this tag. * - * @param ignoreLocalVsRemote When {@code true}, treat the request as remote even if local. - * @param outwardTransfersPerInsert Expected number of outbound transfers per insert operation. - * @param forAccept When {@code true}, compute for admission control; when {@code false}, compute - * for sending decisions where we must be more conservative to avoid avoidable rejections and - * mandatory backoffs. + *

The estimate is used to decide whether a request should be accepted or how aggressively it + * should be routed. Subclasses should account for the request type, locality, and any special + * cases that change transfer counts. The returned value is used for control decisions rather than + * exact accounting, so it should be conservative when {@code forAccept} is {@code false}. + * + * @param ignoreLocalVsRemote when {@code true}, treat the request as remote + * @param outwardTransfersPerInsert expected outbound transfers per insert operation + * @param forAccept {@code true} for admission control; {@code false} for send decisions + * @return estimated number of inbound transfers this tag is expected to represent */ public abstract int expectedTransfersIn( boolean ignoreLocalVsRemote, int outwardTransfersPerInsert, boolean forAccept); /** - * Estimate expected outbound transfers attributed to this tag. + * Estimates expected outbound transfers attributed to this tag. + * + *

The estimate informs routing and admission control decisions for outbound traffic. + * Subclasses should incorporate request type and locality when deriving the count, and they + * should be conservative when the result is used for sending decisions. The method is abstract, + * so each request type can encode its specific transfer pattern. * - * @param ignoreLocalVsRemote When {@code true}, treat the request as remote even if local. - * @param outwardTransfersPerInsert Expected number of outbound transfers per insert operation. - * @param forAccept When {@code true}, compute for admission control; when {@code false}, compute - * for sending decisions where we must be more conservative to avoid avoidable rejections and - * mandatory backoffs. + * @param ignoreLocalVsRemote when {@code true}, treat the request as remote + * @param outwardTransfersPerInsert expected outbound transfers per insert operation + * @param forAccept {@code true} for admission control; {@code false} for send decisions + * @return estimated number of outbound transfers this tag is expected to represent */ public abstract int expectedTransfersOut( boolean ignoreLocalVsRemote, int outwardTransfersPerInsert, boolean forAccept); - /** Mark that this request will not be routed further downstream. */ + /** + * Marks that this request will not be routed further downstream. + * + *

This flag is set by handlers that decide no additional peers should be contacted. It does + * not itself unlock the tag, but it can influence subclass decisions and routing heuristics. The + * method does not alter existing routing sets, and it is synchronized to keep the state + * consistent with other routing updates. + */ public synchronized void setNotRoutedOnwards() { this.notRoutedOnwards = true; } @@ -325,10 +437,14 @@ public synchronized void setNotRoutedOnwards() { private boolean reassigned; /** - * Get the effective source node for load management. + * Returns the effective source peer for load management. + * + *

The original source is returned only when the request was not local and has not been + * reassigned to self. Because the reference is weak, the GC may also have cleared the peer, in + * which case this method returns {@code null}. The method is synchronized to provide a consistent + * view with reassignment updates. * - * @return The original source {@link PeerNode}, or {@code null} if the tag has been reassigned - * locally or originated locally. + * @return the original source peer, or {@code null} if local, reassigned, or collected */ public synchronized PeerNode getSource() { if (reassigned) return null; @@ -336,19 +452,44 @@ public synchronized PeerNode getSource() { return sourceRef.get(); } - /** Reassign the tag locally rather than attributing it to the original sender. */ + /** + * Reassigns the tag locally rather than attributing it to the original sender. + * + *

When invoked for a non-local tag, the request is treated as locally owned for later load + * management decisions. Calls are idempotent; invoking this method on a locally originated tag is + * a no-op. After reassignment, {@link #getSource()} returns {@code null}, and accounting treats + * the request as local. The method records the reassignment in {@link UIDTraceLogger}. + */ public synchronized void reassignToSelf() { if (wasLocal) return; reassigned = true; UIDTraceLogger.log("reassignToSelf", this); } - /** Whether the request originated locally. Not affected by {@link #reassignToSelf()}. */ + /** + * Reports whether the request originated locally. + * + *

This value reflects the original origin and is not affected by {@link #reassignToSelf()}. + * The method is inexpensive and does not synchronize because the flag is immutable after + * construction. Callers often use it to distinguish local requests from those that arrived from a + * peer for accounting and logging. + * + * @return {@code true} if the request was created locally; {@code false} otherwise + */ public boolean wasLocal() { return wasLocal; } - /** Whether the request is considered local now (originated locally or reassigned to self). */ + /** + * Reports whether the request is considered local now. + * + *

A tag is considered local if it originated locally or if it has been reassigned to self as + * part of timeout handling. The method synchronizes when checking the reassignment state to avoid + * races with updates. It is safe to call immediately after {@link #reassignToSelf()} to reflect + * the updated ownership. + * + * @return {@code true} if the request is treated as local; {@code false} otherwise + */ public boolean isLocal() { if (wasLocal) return true; synchronized (this) { @@ -356,18 +497,51 @@ public boolean isLocal() { } } - /** Returns {@code true} if this tag represents an SSK request. */ + /** + * Reports whether this tag represents an SSK request. + * + *

Subclasses implement this to reflect the request type. The result is used by routing and + * logging decisions that need to distinguish SSK traffic from other request classes. It also + * informs transfer estimation for requests that have different patterns. Callers treat the value + * as stable for the lifetime of the tag. + * + * @return {@code true} if the underlying request is an SSK; {@code false} otherwise + */ public abstract boolean isSSK(); - /** Returns {@code true} if this tag represents an insert request. */ + /** + * Reports whether this tag represents an insert request. + * + *

Subclasses should return {@code true} when the request inserts data rather than fetching it. + * The flag influences transfer estimation and some routing policies, so implementations should be + * consistent with the request's primary data flow. The return value is expected to remain + * consistent for the lifetime of the tag. + * + * @return {@code true} if the request inserts data; {@code false} otherwise + */ public abstract boolean isInsert(); - /** Returns {@code true} if this tag represents a reply to an offer. */ + /** + * Reports whether this tag represents a reply to an offer. + * + *

Offer replies often use different timeout handling and transfer expectations. Subclasses + * should return {@code true} for those requests so routing decisions can reflect that behavior. + * The result may also influence how peers are tracked for offered-key handling. Callers should + * not assume an offer reply implies any particular routing peer set. + * + * @return {@code true} if the request is an offer reply; {@code false} otherwise + */ public abstract boolean isOfferReply(); /** - * Caller must call innerUnlock(noRecordUnlock) immediately if this returns true. Hence, derived - * versions should call mustUnlock() only after they have checked their own unlocking blockers. + * Determines whether the tag can be unlocked right now. + * + *

This method checks handler state and outstanding routing or offered-key fetches. If it + * returns {@code true}, callers must invoke {@link #innerUnlock(boolean)} immediately while still + * honoring any subclass-specific unlocking blockers. It also latches the unlocking decision so + * that only one caller proceeds with the actual unlocking. + * + * @return {@code true} if the tag is ready to unlock; {@code false} otherwise */ protected synchronized boolean mustUnlock() { if (hasUnlocked || !unlockedHandler) { @@ -450,12 +624,15 @@ private boolean isAnyHandlingTimeout(Set peers) { } /** - * Unlock the handler. That is, the incoming request has finished. This method should be called - * before the acknowledgement that the request has finished is sent downstream. Therefore, we will - * never be waiting for an acknowledgement from downstream to release the slot it is using, during - * which time it might think we are rejecting wrongly. + * Marks the inbound handler as complete and attempts to unlock the tag. * - *

Once both the incoming and outgoing requests are unlocked, the whole tag is unlocked. + *

This method should be called before sending the completion acknowledgement downstream so the + * peer does not assume completion while we are still holding its slot. It records the {@code + * noRecord} preference, marks the handler as unlocked, and then checks whether outbound routing + * or offered-key fetches remain. If none remain, it triggers {@link #innerUnlock(boolean)}; + * otherwise it logs that the unlocking is deferred. + * + * @param noRecord whether to suppress unlock record bookkeeping for this tag */ public void unlockHandler(boolean noRecord) { boolean canUnlock; @@ -473,6 +650,13 @@ public void unlockHandler(boolean noRecord) { } } + /** + * Convenience overload that unlocks the handler with recording enabled. + * + *

This is equivalent to calling {@link #unlockHandler(boolean)} with {@code false}. It is + * provided for call sites that do not need to suppress unlock record bookkeeping. Repeated calls + * are safe because the underlying method checks and returns once the handler is already unlocked. + */ public void unlockHandler() { unlockHandler(false); } @@ -505,11 +689,14 @@ public synchronized String toString() { } /** - * Mark that we are handling a timeout for the given peer. If the handler later unlocks while the - * peer still appears in routing/fetching sets, we will reassign this tag locally (rather than log - * an error) and wait for the fatal timeout. + * Records that a timeout is being handled for a peer. + * + *

If the handler later unlocks while the peer still appears in the routing or offered-key + * tracking sets, this marker allows the tag to be reassigned locally instead of logging a + * spurious error. The timeout marker is cleared when the peer is removed from tracking sets. + * Adding the same peer more than once is harmless because the set deduplicates entries. * - * @param next Peer for which a timeout is being handled. + * @param next peer for which a timeout is being handled */ public synchronized void handlingTimeout(PeerNode next) { if (handlingTimeouts == null) handlingTimeouts = new HashSet<>(); @@ -521,10 +708,15 @@ public synchronized void handlingTimeout(PeerNode next) { private static final long LOGGED_STILL_PRESENT_INTERVAL = SECONDS.toMillis(60); /** - * Log that the tag is still present after the request timeout, at most once per interval. + * Logs that the tag is still present after the request timeout. * - * @param now Current time in milliseconds since the epoch. - * @param uid Optional UID to include in the subclass log; may be {@code null}. + *

This method rate-limits logging to at most once per interval and delegates to {@link + * #logStillPresent(Long)} for the actual message. It should be called with a monotonically + * increasing time source, typically {@link System#currentTimeMillis()}. After logging checks, it + * also evaluates whether a deferred hard timeout should be forced. + * + * @param now current time in milliseconds since the epoch + * @param uid optional UID to include in the subclass log, may be {@code null} */ public void maybeLogStillPresent(long now, Long uid) { if (now - createdTime > RequestTracker.TIMEOUT) { @@ -534,26 +726,190 @@ public void maybeLogStillPresent(long now, Long uid) { } logStillPresent(uid); } + maybeForceHardTimeout(now); } - /** Mark this request as accepted by the handler. */ + /** + * Marks this request as accepted by the handler. + * + *

This flag is used by subclasses to reason about progress and logging. The update is + * synchronized to avoid races with other handler state transitions. Repeated calls are safe and + * simply leave the flag set. The flag does not indicate anything about an outbound routing state. + */ public synchronized void setAccepted() { accepted = true; } private boolean timedOutButContinued; + private long timeoutContinueAt; + private boolean hardTimeoutPeersTriggered; + private boolean hardTimeoutWithoutPeersTriggered; /** - * Set when we are going to tell downstream that the request has timed out, but can't terminate it - * yet. We will terminate the request if we have to reroute it, and we count it towards the peer's - * limit, but we don't stop messages to the request source. + * Marks that the handler timed out but processing continues. + * + *

This state is used when we must inform downstream that the request has timed out but cannot + * yet terminate it. The request still counts toward the peer's limit, but messages to the source + * may continue. The method is idempotent and records the first time the timeout-continue state + * was set so that hard timeout logic can be scheduled. */ public synchronized void timedOutToHandlerButContinued() { - timedOutButContinued = true; + if (!timedOutButContinued) { + timedOutButContinued = true; + timeoutContinueAt = System.currentTimeMillis(); + } else if (timeoutContinueAt == 0L) { + timeoutContinueAt = System.currentTimeMillis(); + } UIDTraceLogger.log("timeoutContinue", this); } - /** Mark that the handler disconnected or restarted. */ + private void maybeForceHardTimeout(long now) { + HardTimeoutContext context = resolveHardTimeoutContext(now); + if (context == null) return; + if (context.handleWithoutPeers()) { + if (handleHardTimeoutWithoutPeers(context.continueAge())) { + markHardTimeoutWithoutPeersTriggered(); + } + return; + } + logAndForcePeerTimeout(context.continueAge(), context.routingPeers(), context.offeredPeers()); + } + + private HardTimeoutContext resolveHardTimeoutContext(long now) { + PeerNode[] routingPeersArray; + PeerNode[] offeredPeersArray; + long continueAge; + synchronized (this) { + if (!timedOutButContinued || !unlockedHandler) return null; + if (timeoutContinueAt == 0L) return null; + continueAge = now - timeoutContinueAt; + if (continueAge < HARD_TIMEOUT_AFTER_CONTINUE) return null; + routingPeersArray = routingPeersForHardTimeout(); + offeredPeersArray = offeredKeyPeersForHardTimeout(); + } + List routingPeers = List.of(routingPeersArray); + List offeredPeers = List.of(offeredPeersArray); + boolean handleWithoutPeers = routingPeers.isEmpty() && offeredPeers.isEmpty(); + if (!handleWithoutPeers) { + if (!markHardTimeoutPeersTriggered()) return null; + } else if (!shouldHandleHardTimeoutWithoutPeers()) { + return null; + } + return new HardTimeoutContext(continueAge, routingPeers, offeredPeers, handleWithoutPeers); + } + + private boolean markHardTimeoutPeersTriggered() { + synchronized (this) { + if (hardTimeoutPeersTriggered) return false; + hardTimeoutPeersTriggered = true; + return true; + } + } + + private boolean shouldHandleHardTimeoutWithoutPeers() { + synchronized (this) { + return !hardTimeoutWithoutPeersTriggered; + } + } + + private void markHardTimeoutWithoutPeersTriggered() { + synchronized (this) { + hardTimeoutWithoutPeersTriggered = true; + } + } + + private void logAndForcePeerTimeout( + long continueAge, List routingPeers, List offeredPeers) { + String routingSummary = formatPeers(routingPeers); + String offeredSummary = formatPeers(offeredPeers); + String elapsed = TimeUtil.formatTime(continueAge); + UIDTraceLogger.log( + "timeoutHard", + this, + () -> "elapsed=" + elapsed + " routing=" + routingSummary + " offered=" + offeredSummary); + LOG.warn( + "Hard timeout after {} for {}. Forcing fatal timeout: routing={} offered={}", + elapsed, + this, + routingSummary, + offeredSummary); + for (PeerNode peer : routingPeers) { + peer.fatalTimeout(this, false); + } + for (PeerNode peer : offeredPeers) { + peer.fatalTimeout(this, true); + } + } + + /** + * Returns a snapshot of peers still considered active for routing timeouts. + * + *

The returned array is built while holding the tag monitor, so it is safe to iterate without + * concurrent modification. Callers should treat it as a snapshot for timeout forcing and logging; + * later routing updates may change the active set. An empty array indicates there are no active + * routing peers at the time of the snapshot. + * + * @return array of active routing peers, or an empty array if none remain + */ + protected synchronized PeerNode[] routingPeersForHardTimeout() { + if (currentlyRoutingTo == null || currentlyRoutingTo.isEmpty()) return NO_PEERS; + return currentlyRoutingTo.toArray(new PeerNode[0]); + } + + /** + * Returns a snapshot of peers still considered active for offered-key timeouts. + * + *

The returned array is produced while holding the tag monitor, providing a consistent view of + * the offered-key tracking set. It should be used for timeout enforcement and logging only, not + * as a live view. An empty array indicates there are no active offered-key peers at the time of + * the snapshot. + * + * @return array of active offered-key peers, or an empty array if none remain + */ + protected synchronized PeerNode[] offeredKeyPeersForHardTimeout() { + if (fetchingOfferedKeyFrom == null || fetchingOfferedKeyFrom.isEmpty()) return NO_PEERS; + return fetchingOfferedKeyFrom.toArray(new PeerNode[0]); + } + + /** + * Allows subclasses to handle hard-timeout conditions when no peers remain. + * + *

This hook is invoked after a timeout-continue period elapses, and there are no routing or + * offered-key peers left to force. Subclasses can perform request-specific cleanup or accounting + * and return {@code true} to mark the timeout as handled. Returning {@code false} leaves handling + * to the default path. + * + * @param continueAge time since timeout-continue was first recorded, in milliseconds + * @return {@code true} if the timeout was handled and should be marked as triggered + */ + protected boolean handleHardTimeoutWithoutPeers(long continueAge) { + return false; + } + + private record HardTimeoutContext( + long continueAge, + List routingPeers, + List offeredPeers, + boolean handleWithoutPeers) {} + + private static String formatPeers(List peers) { + if (peers.isEmpty()) return "none"; + StringBuilder sb = new StringBuilder(); + for (PeerNode peer : peers) { + sb.append(peer.shortToString()).append(','); + } + sb.setLength(sb.length() - 1); + return sb.toString(); + } + + /** + * Marks that the original source handler disconnected or restarted. + * + *

This update affects load accounting and stop/continue decisions. Once set, the tag treats + * the request as having a restarted source, which influences {@link #countAsSourceRestarted()} + * and {@link #shouldStop()} decisions. The call is idempotent and simply leaves the flag set. + * Callers typically invoke it when a disconnect or restart is detected. + */ public synchronized void onRestartOrDisconnectSource() { sourceRestarted = true; } @@ -563,33 +919,57 @@ public synchronized void onRestartOrDisconnectSource() { // are appropriate. /** - * Should we deduct this request from the source's limit instead of counting it towards it? A - * normal request is counted towards it. A hidden request is deducted from it. This is used when - * the source has restarted but also in some other cases. + * Reports whether this request should be deducted from the source's limit. + * + *

Normal requests count toward the source's limit, but some situations such as source restarts + * or timeout continuation are accounted for differently. This method encapsulates that policy for + * callers that manage admission and throttling. It has no side effects and depends only on the + * current restart and timeout state. + * + * @return {@code true} if the request should be deducted; {@code false} if it should count toward + * the source's limit */ public synchronized boolean countAsSourceRestarted() { return sourceRestarted || timedOutButContinued; } - /** Whether we should continue sending messages to the source. */ + /** + * Reports whether the original source is considered restarted. + * + *

This is a direct accessor for the restart flag and is synchronized to provide a consistent + * view when other threads update the flag due to disconnects. Unlike {@link + * #countAsSourceRestarted()}, it does not consider timeout-continuation state. Callers use it to + * decide whether messages to the source should continue. + * + * @return {@code true} if the source is considered restarted; {@code false} otherwise + */ public synchronized boolean hasSourceReallyRestarted() { return sourceRestarted; } /** - * Whether we should stop the request as soon as convenient. Normally {@code true} when the source - * restarted or disconnected. + * Reports whether the request should stop as soon as convenient. + * + *

Stopping is typically recommended when the source has restarted or when the request has + * timed out but continues. The method is synchronized to reflect the latest updates to restart + * and timeout state. It is advisory only and does not cancel work on its own. + * + * @return {@code true} if the request should stop; {@code false} if it may continue */ public synchronized boolean shouldStop() { return sourceRestarted || timedOutButContinued; } /** - * Whether the given peer is the original source of this tag. + * Reports whether the given peer is the original source of this tag. * - * @param pn Peer to compare. - * @return {@code true} if {@code pn} is the original source and the tag was not reassigned and is - * not local. + *

The comparison returns {@code false} if the tag has been reassigned or originated locally. + * It uses the stored weak reference to avoid retaining the source peer longer than necessary and + * synchronizes to keep the reassignment state consistent. If the weak reference has been cleared, + * the method returns {@code false} even when the peer was once the source. + * + * @param pn peer to compare against the original source; must be non-null + * @return {@code true} if the peer is the original source, and the tag is not local or reassigned */ public synchronized boolean isSource(PeerNode pn) { if (reassigned) return false; @@ -598,7 +978,13 @@ public synchronized boolean isSource(PeerNode pn) { return sourceRef == pn.myRef; } - /** Indicate that the tag is waiting for an outbound slot. */ + /** + * Marks that the tag is waiting for an outbound slot. + * + *

This state is used by handlers to avoid double-counting pending outbound work. The method is + * synchronized and idempotent, and it does not itself allocate or release any slots. Callers + * should ensure {@link #clearWaitingForSlot()} is invoked when the wait ends or on teardown. + */ public synchronized void setWaitingForSlot() { // Consider using a counter on Node. // We must ensure it ALWAYS gets unset when some weird @@ -607,7 +993,13 @@ public synchronized void setWaitingForSlot() { waitingForSlot = true; } - /** Clear the waiting‑for‑slot state. */ + /** + * Clears the waiting-for-slot state. + * + *

This method is synchronized and idempotent. It is typically called when outbound work has + * got a slot or when the request is being torn down. If the tag was not marked as waiting, the + * call has no effect. Clearing the flag does not release any external resources. + */ public synchronized void clearWaitingForSlot() { // Consider using a counter on Node. // We must ensure it ALWAYS gets unset when some weird @@ -617,7 +1009,15 @@ public synchronized void clearWaitingForSlot() { waitingForSlot = false; } - /** Returns {@code true} if the tag is currently waiting for an outbound slot. */ + /** + * Reports whether the tag is currently waiting for an outbound slot. + * + *

The return value reflects the most recent calls to {@link #setWaitingForSlot()} and {@link + * #clearWaitingForSlot()}. The method is synchronized to avoid races with updates and does not + * imply anything about the state of any external queues. It is purely a local bookkeeping signal. + * + * @return {@code true} if the tag is waiting for a slot; {@code false} otherwise + */ public synchronized boolean isWaitingForSlot() { return waitingForSlot; } diff --git a/src/test/java/network/crypta/node/RequestTagTest.java b/src/test/java/network/crypta/node/RequestTagTest.java index 60438bda3d..39da3dc50e 100644 --- a/src/test/java/network/crypta/node/RequestTagTest.java +++ b/src/test/java/network/crypta/node/RequestTagTest.java @@ -65,7 +65,7 @@ void unlockHandler_whenCoalescedSender_allowsUnlock() { RequestTag tag = newLocalTag(node); RequestSender rs = org.mockito.Mockito.mock(RequestSender.class); - // Coalesced: we won't wait for sender completion (sent flag remains false) + // Coalesced: we won't wait for sender completion (the "sent" flag remains false) tag.setSender(rs, /* coalesced= */ true); // Expect innerUnlock to delegate to tracker.unlockUID exactly once @@ -89,7 +89,7 @@ void setRequestSenderFinished_afterUnlockHandlerPreviouslyBlockedByPendingSender // Non-coalesced: mark as sent so mustUnlock() blocks until sender finishes tag.setSender(rs, /* coalesced= */ false); - tag.unlockHandler(); // should be deferred due to pending sender + tag.unlockHandler(); // should be deferred due to a pending sender verify(tracker, never()) .unlockUID( @@ -165,7 +165,7 @@ void waitingForOpennet_thenFinished_allowsUnlock() throws Exception { // Block unlock via waitingForOpennet != null && get() != null setPrivateWaitingForOpennet(tag, new WeakReference<>(pn)); - // Should defer unlock because waitingForOpennet is non-null + // Should defer unlocking because waitingForOpennet is non-null tag.unlockHandler(); verify(tracker, never()) .unlockUID( @@ -180,17 +180,84 @@ void waitingForOpennet_thenFinished_allowsUnlock() throws Exception { org.mockito.Mockito.anyBoolean(), org.mockito.Mockito.anyBoolean()); - // Now clear the wait with the same peer and expect unlock + // Now clear the wait with the same peer and expect unlocking tag.finishedWaitingForOpennet(pn); verify(tracker, times(1)).unlockUID(tag, false, false); } + @Test + void hardTimeout_whenWaitingForOpennet_triggersFatalTimeout() throws Exception { + RequestTag tag = newLocalTag(node); + PeerNode pn = org.mockito.Mockito.mock(PeerNode.class); + + setPrivateWaitingForOpennet(tag, new WeakReference<>(pn)); + + tag.timedOutToHandlerButContinued(); + tag.unlockHandler(); + + long now = System.currentTimeMillis() + RequestTracker.TIMEOUT + 1_000; + tag.maybeLogStillPresent(now, UID); + + verify(pn, times(1)).fatalTimeout(tag, false); + } + + @Test + void hardTimeout_whenSenderLostAndNoRouting_forcesSenderFinishAndUnlocks() throws Exception { + RequestTag tag = newLocalTag(node); + RequestSender rs = org.mockito.Mockito.mock(RequestSender.class); + + tag.setSender(rs, /* coalesced= */ false); + setPrivateSender(tag, new WeakReference<>(null)); + + tag.timedOutToHandlerButContinued(); + tag.unlockHandler(); + + verify(tracker, never()) + .unlockUID( + org.mockito.Mockito.any(), + org.mockito.Mockito.anyBoolean(), + org.mockito.Mockito.anyBoolean()); + + long now = System.currentTimeMillis() + RequestTracker.TIMEOUT + 1_000; + tag.maybeLogStillPresent(now, UID); + + verify(tracker, times(1)).unlockUID(tag, false, false); + assertEquals(RequestSender.TIMED_OUT, getPrivateRequestSenderFinishedCode(tag)); + } + + @Test + void hardTimeout_afterPeerTimeout_stillAllowsSenderFallback() throws Exception { + RequestTag tag = newLocalTag(node); + RequestSender rs = org.mockito.Mockito.mock(RequestSender.class); + PeerNode peer = org.mockito.Mockito.mock(PeerNode.class); + + tag.setSender(rs, /* coalesced= */ false); + setPrivateSender(tag, new WeakReference<>(null)); + tag.addRoutedTo(peer, /* offeredKey= */ false); + + tag.timedOutToHandlerButContinued(); + tag.unlockHandler(); + + long now = System.currentTimeMillis() + RequestTracker.TIMEOUT + 1_000; + tag.maybeLogStillPresent(now, UID); + + verify(peer, times(1)).fatalTimeout(tag, false); + + tag.removeRoutingTo(peer); + + long later = now + 61_000; + tag.maybeLogStillPresent(later, UID); + + verify(tracker, times(1)).unlockUID(tag, false, false); + assertEquals(RequestSender.TIMED_OUT, getPrivateRequestSenderFinishedCode(tag)); + } + @Test void handlerTransferBegins_thenUnlock_removesFromTracker() { RequestTag tag = newLocalTag(node); - // Register transferring state and then fully unlock + // Register the transferring state and then fully unlock tag.handlerTransferBegins(); verify(tracker, times(1)).addTransferringRequestHandler(UID); @@ -203,7 +270,7 @@ void handlerTransferBegins_thenUnlock_removesFromTracker() { tag.unlockHandler(); - // innerUnlock should clear transferring handler state + // innerUnlock should clear the transferring handler state verify(tracker, times(1)).removeTransferringRequestHandler(UID); verify(tracker, times(1)).unlockUID(tag, false, false); } @@ -228,7 +295,7 @@ void senderTransferBegins_andEnds_registersAndUnregisters() { tag.senderTransferBegins(key, rs); verify(tracker, times(1)).addTransferringSender(key, rs); - // Should assert and then remove the sender from tracker + // Should assert and then remove the sender from the tracker tag.senderTransferEnds(key, rs); verify(tracker, times(1)).removeTransferringSender(key, rs); } @@ -294,4 +361,17 @@ private static void setPrivateWaitingForOpennet(RequestTag tag, WeakReference ref) + throws Exception { + Field f = RequestTag.class.getDeclaredField("sender"); + f.setAccessible(true); + f.set(tag, ref); + } + + private static int getPrivateRequestSenderFinishedCode(RequestTag tag) throws Exception { + Field f = RequestTag.class.getDeclaredField("requestSenderFinishedCode"); + f.setAccessible(true); + return (int) f.get(tag); + } } diff --git a/src/test/java/network/crypta/node/UIDTagHardTimeoutTest.java b/src/test/java/network/crypta/node/UIDTagHardTimeoutTest.java new file mode 100644 index 0000000000..c697d37fa4 --- /dev/null +++ b/src/test/java/network/crypta/node/UIDTagHardTimeoutTest.java @@ -0,0 +1,101 @@ +package network.crypta.node; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@SuppressWarnings("java:S100") +@ExtendWith(MockitoExtension.class) +class UIDTagHardTimeoutTest { + + private static final long UID = 4242L; + + @Mock(answer = org.mockito.Answers.RETURNS_DEEP_STUBS) + private Node node; + + @Mock private RequestTracker tracker; + + @BeforeEach + void setup() { + when(node.routing().tracker()).thenReturn(tracker); + } + + @Test + void hardTimeout_whenTimedOutContinueAndHandlerUnlocked_triggersFatalTimeoutOnce() { + TestUIDTag tag = new TestUIDTag(node, UID); + PeerNode peer = mock(PeerNode.class); + + tag.addRoutedTo(peer, /* offeredKey= */ false); + tag.timedOutToHandlerButContinued(); + tag.unlockHandler(); + + long now = System.currentTimeMillis() + RequestTracker.TIMEOUT + 1_000; + tag.maybeLogStillPresent(now, UID); + + verify(peer, times(1)).fatalTimeout(tag, false); + + tag.maybeLogStillPresent(now + RequestTracker.TIMEOUT, UID); + verify(peer, times(1)).fatalTimeout(tag, false); + } + + @Test + void hardTimeout_beforeGraceWindow_doesNotTriggerFatalTimeout() { + TestUIDTag tag = new TestUIDTag(node, UID); + PeerNode peer = mock(PeerNode.class); + + tag.addRoutedTo(peer, /* offeredKey= */ false); + tag.timedOutToHandlerButContinued(); + tag.unlockHandler(); + + long now = System.currentTimeMillis() + RequestTracker.TIMEOUT - 5_000; + tag.maybeLogStillPresent(now, UID); + + verify(peer, never()).fatalTimeout(tag, false); + } + + private static final class TestUIDTag extends UIDTag { + private TestUIDTag(Node node, long uid) { + super(/* source= */ null, /* realTimeFlag= */ false, uid, node); + } + + @Override + public void logStillPresent(Long uid) { + // Intentionally empty for test isolation. + } + + @Override + public int expectedTransfersIn( + boolean ignoreLocalVsRemote, int outwardTransfersPerInsert, boolean forAccept) { + return 0; + } + + @Override + public int expectedTransfersOut( + boolean ignoreLocalVsRemote, int outwardTransfersPerInsert, boolean forAccept) { + return 0; + } + + @Override + public boolean isSSK() { + return false; + } + + @Override + public boolean isInsert() { + return false; + } + + @Override + public boolean isOfferReply() { + return false; + } + } +}