diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index f9151daf8..9037913c5 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -1834,6 +1834,7 @@ private boolean verify(Message msg, boolean checkTooHigh, boolean checkTooLow) return false; } else if (checkTooLow && isTargetTooLow(msgSeqNum)) { doTargetTooLow(msg); + stateListener.onPossDupMessageDiscarded(sessionID, msg); return false; } } @@ -1841,6 +1842,7 @@ private boolean verify(Message msg, boolean checkTooHigh, boolean checkTooLow) // Handle poss dup where msgSeq is as expected // FIX 4.4 Vol 2, test case 2f&g if (isPossibleDuplicate(msg) && !validatePossDup(msg)) { + stateListener.onPossDupMessageDiscarded(sessionID, msg); return false; } @@ -1873,7 +1875,7 @@ private boolean verify(Message msg, boolean checkTooHigh, boolean checkTooLow) return true; } - private boolean doTargetTooLow(Message msg) throws FieldNotFound, IOException { + private void doTargetTooLow(Message msg) throws FieldNotFound, IOException { if (!isPossibleDuplicate(msg)) { final int msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD); final String text = "MsgSeqNum too low, expecting " + getExpectedTargetNum() @@ -1881,7 +1883,7 @@ private boolean doTargetTooLow(Message msg) throws FieldNotFound, IOException { generateLogout(text); throw new SessionException(text); } - return validatePossDup(msg); + validatePossDup(msg); } private void doBadCompID(Message msg) throws IOException, FieldNotFound { diff --git a/quickfixj-core/src/main/java/quickfix/SessionStateListener.java b/quickfixj-core/src/main/java/quickfix/SessionStateListener.java index 4c1f0ea59..e5aa0c271 100644 --- a/quickfixj-core/src/main/java/quickfix/SessionStateListener.java +++ b/quickfixj-core/src/main/java/quickfix/SessionStateListener.java @@ -105,4 +105,18 @@ default void onSequenceResetReceived(SessionID sessionID, int newSeqNo, boolean */ default void onResendRequestSatisfied(SessionID sessionID, int beginSeqNo, int endSeqNo) { } + + /** + * Called when a received PossDupFlag=Y message is discarded before + * application processing because it failed sequence number or + * OrigSendingTime validation. + *

+ * The message is the full inbound message that was discarded. Listener + * implementations must treat it as read-only and must not mutate it. + * + * @param sessionID affected SessionID + * @param message discarded message + */ + default void onPossDupMessageDiscarded(SessionID sessionID, Message message) { + } } diff --git a/quickfixj-core/src/test/java/quickfix/SessionTest.java b/quickfixj-core/src/test/java/quickfix/SessionTest.java index dc70f958c..88368fa45 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionTest.java @@ -67,6 +67,7 @@ import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import org.mockito.Mockito; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; @@ -294,6 +295,105 @@ public void testPossDupMessageWithoutOrigSendingTime() throws Exception { session.close(); } + @Test + public void testTooLowPossDupMessageDiscardNotifiesStateListener() throws Exception { + final UnitTestApplication application = new UnitTestApplication(); + try (Session session = setUpSession(application, false, + new UnitTestResponder())) { + logonTo(session); + session.next(createAppMessage(2)); + + assertEquals(3, session.getExpectedTargetNum()); + assertEquals(1, application.fromAppMessages.size()); + + session.addStateListener(new SessionStateListener() { + @Override + public void onMissedHeartBeat(SessionID sessionID) { + } + }); + final SessionStateListener mockStateListener = mock(SessionStateListener.class); + session.addStateListener(mockStateListener); + + final Message possDupMessage = createPossDupAppMessage(2); + session.next(possDupMessage); + + assertEquals(3, session.getExpectedTargetNum()); + assertEquals(1, application.fromAppMessages.size()); + + final ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(Message.class); + verify(mockStateListener).onPossDupMessageDiscarded(eq(session.getSessionID()), + messageCaptor.capture()); + assertTrue(possDupMessage == messageCaptor.getValue()); + verifyNoMoreInteractions(mockStateListener); + } + } + + @Test + public void testExpectedSequencePossDupMessageDiscardNotifiesStateListener() + throws Exception { + final UnitTestApplication application = new UnitTestApplication(); + try (Session session = setUpSession(application, false, + new UnitTestResponder())) { + logonTo(session); + + final SessionStateListener mockStateListener = mock(SessionStateListener.class); + session.addStateListener(mockStateListener); + + final Message possDupMessage = createAppMessage(2); + possDupMessage.getHeader().setBoolean(PossDupFlag.FIELD, true); + session.next(possDupMessage); + + assertEquals(3, session.getExpectedTargetNum()); + assertNull(application.lastFromAppMessage()); + assertEquals(Reject.MSGTYPE, application.lastToAdminMessage() + .getHeader().getString(MsgType.FIELD)); + + final ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(Message.class); + verify(mockStateListener).onPossDupMessageDiscarded(eq(session.getSessionID()), + messageCaptor.capture()); + assertTrue(possDupMessage == messageCaptor.getValue()); + verifyNoMoreInteractions(mockStateListener); + } + } + + @Test + public void testTooLowNonPossDupMessageDoesNotNotifyStateListener() throws Exception { + final UnitTestApplication application = new UnitTestApplication(); + try (Session session = setUpSession(application, false, + new UnitTestResponder())) { + logonTo(session); + session.next(createAppMessage(2)); + + final SessionStateListener mockStateListener = mock(SessionStateListener.class); + session.addStateListener(mockStateListener); + + processMessage(session, createAppMessage(1)); + + verify(mockStateListener, times(0)).onPossDupMessageDiscarded( + any(SessionID.class), any(Message.class)); + } + } + + @Test + public void testInSequenceMessageDoesNotNotifyPossDupDiscarded() throws Exception { + final UnitTestApplication application = new UnitTestApplication(); + try (Session session = setUpSession(application, false, + new UnitTestResponder())) { + logonTo(session); + + final SessionStateListener mockStateListener = mock(SessionStateListener.class); + session.addStateListener(mockStateListener); + + session.next(createAppMessage(2)); + + assertEquals(3, session.getExpectedTargetNum()); + assertEquals(1, application.fromAppMessages.size()); + verify(mockStateListener, times(0)).onPossDupMessageDiscarded( + any(SessionID.class), any(Message.class)); + verifyNoMoreInteractions(mockStateListener); + } + } + @Test public void testInferResetSeqNumAcceptedWithNonInitialSequenceNumber() throws Exception {