diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 77a918e85e..5ae4c65162 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -470,6 +470,8 @@ public class Session implements Closeable { protected static final Logger LOG = LoggerFactory.getLogger(Session.class); + private final int MAX_RESEND_BATCH_RETRIEVAL_SIZE = 1000; + Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID, DataDictionaryProvider dataDictionaryProvider, SessionSchedule sessionSchedule, LogFactory logFactory, MessageFactory messageFactory, int heartbeatInterval) { @@ -2321,71 +2323,85 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqNo) throws IOException, InvalidMessage, FieldNotFound { - final ArrayList messages = new ArrayList<>(); - try { - state.get(beginSeqNo, endSeqNo, messages); - } catch (final IOException e) { - if (forceResendWhenCorruptedStore) { - LOG.error("Cannot read messages from stores, resend HeartBeats", e); - for (int i = beginSeqNo; i < endSeqNo; i++) { - final Message heartbeat = messageFactory.create(sessionID.getBeginString(), - MsgType.HEARTBEAT); - initializeHeader(heartbeat.getHeader()); - heartbeat.getHeader().setInt(MsgSeqNum.FIELD, i); - messages.add(heartbeat.toString()); - } - } else { - throw e; - } - } - int msgSeqNum = 0; int begin = 0; int current = beginSeqNo; boolean appMessageJustSent = false; - for (final String message : messages) { - appMessageJustSent = false; - final Message msg; + int curBatchStartSeqNo = beginSeqNo; + while (curBatchStartSeqNo <= endSeqNo) { + int endCurBatchSeqNo = endSeqNo; + if (curBatchStartSeqNo + MAX_RESEND_BATCH_RETRIEVAL_SIZE < endSeqNo) { + endCurBatchSeqNo = curBatchStartSeqNo + MAX_RESEND_BATCH_RETRIEVAL_SIZE; + } + final ArrayList messages = new ArrayList<>(); try { - // QFJ-626 - msg = parseMessage(message); - msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD); - } catch (final Exception e) { - getLog().onErrorEvent( - "Error handling ResendRequest: failed to parse message (" + e.getMessage() - + "): " + message); - // Note: a SequenceReset message will be generated to fill the gap - continue; + state.get(curBatchStartSeqNo, endCurBatchSeqNo, messages); + } catch (final IOException e) { + if (forceResendWhenCorruptedStore) { + LOG.error("Cannot read messages from stores, resend HeartBeats", e); + for (int i = beginSeqNo; i < endSeqNo; i++) { + final Message heartbeat = messageFactory.create(sessionID.getBeginString(), + MsgType.HEARTBEAT); + initializeHeader(heartbeat.getHeader()); + heartbeat.getHeader().setInt(MsgSeqNum.FIELD, i); + messages.add(heartbeat.toString()); + } + } else { + throw e; + } } + for (final String message : messages) { + appMessageJustSent = false; + final Message msg; + try { + // QFJ-626 + msg = parseMessage(message); + if (msg.getException() != null) { + getLog().onErrorEvent( + "Error handling ResendRequest: failed to parse message (" + msg.getException().getMessage() + + "): " + message); + // Note: a SequenceReset message will be generated to fill the gap + continue; + } + msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD); + } catch (final Exception e) { + getLog().onErrorEvent( + "Error handling ResendRequest: failed to parse message (" + e.getMessage() + + "): " + message); + // Note: a SequenceReset message will be generated to fill the gap + continue; + } - if ((current != msgSeqNum) && begin == 0) { - begin = current; - } + if ((current != msgSeqNum) && begin == 0) { + begin = current; + } - final String msgType = msg.getHeader().getString(MsgType.FIELD); + final String msgType = msg.getHeader().getString(MsgType.FIELD); - if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) { - if (begin == 0) { - begin = msgSeqNum; - } - } else { - initializeResendFields(msg); - if (resendApproved(msg)) { - if (begin != 0) { - generateSequenceReset(receivedMessage, begin, msgSeqNum); - } - getLog().onEvent("Resending message: " + msgSeqNum); - send(msg.toString()); - begin = 0; - appMessageJustSent = true; - } else { + if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) { if (begin == 0) { begin = msgSeqNum; } + } else { + initializeResendFields(msg); + if (resendApproved(msg)) { + if (begin != 0) { + generateSequenceReset(receivedMessage, begin, msgSeqNum); + } + getLog().onEvent("Resending message: " + msgSeqNum); + send(msg.toString()); + begin = 0; + appMessageJustSent = true; + } else { + if (begin == 0) { + begin = msgSeqNum; + } + } } + current = msgSeqNum + 1; } - current = msgSeqNum + 1; + curBatchStartSeqNo = endCurBatchSeqNo+1; } int newBegin = beginSeqNo; diff --git a/quickfixj-core/src/test/java/quickfix/SessionResendMessagesTest b/quickfixj-core/src/test/java/quickfix/SessionResendMessagesTest new file mode 100644 index 0000000000..88930de313 --- /dev/null +++ b/quickfixj-core/src/test/java/quickfix/SessionResendMessagesTest @@ -0,0 +1,98 @@ +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import quickfix.*; + +import java.io.IOException; +import java.util.ArrayList; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class SessionResendMessagesTest { + + private Session session; + private Application mockApplication; + private MessageStore mockStore; + private LogFactory mockLogFactory; + private Log mockLog; + private static final int HEARTBEAT_INTERVAL = 30; + + @BeforeEach + void setUp() throws Exception { + mockApplication = mock(Application.class); + mockStore = mock(MessageStore.class); + MessageStoreFactory mockStoreFactory = mock(MessageStoreFactory.class); + when(mockStoreFactory.create(any(SessionID.class))).thenReturn(mockStore); + DataDictionaryProvider mockDataDictProvider = mock(DataDictionaryProvider.class); + mockLogFactory = mock(LogFactory.class); + mockLog = mock(Log.class); + when(mockLogFactory.create(any(SessionID.class))).thenReturn(mockLog); + + session = new Session( + mockApplication, + mockStoreFactory, + new SessionID("FIX.4.4", "SENDER", "TARGET"), + mockDataDictProvider, + mock(SessionSchedule.class), + mockLogFactory, + mock(MessageFactory.class), + HEARTBEAT_INTERVAL + ); + } + + @Test + void testResendMessagesBatchedRetrieval() throws Exception { + // Prepare 1200 messages (expect two batches: 1000, then 200) + int beginSeqNo = 1; + int endSeqNo = 1200; + ArrayList batch1 = new ArrayList<>(); + ArrayList batch2 = new ArrayList<>(); + for (int i = beginSeqNo; i <= endSeqNo; i++) { + String msgStr = "8=FIX.4.4\u00019=12\u000135=0\u000134=" + i + "\u0001" + + "49=SENDER\u000156=TARGET\u000110=000\u0001"; + if (i <= 1000) batch1.add(msgStr); + else batch2.add(msgStr); + } + + doAnswer(invocation -> { + int start = invocation.getArgument(0); + int end = invocation.getArgument(1); + ArrayList msgs = invocation.getArgument(2); + if (start == 1 && end == 1001) msgs.addAll(batch1); + else if (start == 1001 && end == 1200) msgs.addAll(batch2); + else fail("Unexpected batch: " + start + " to " + end); + return null; + }).when(mockStore).get(anyInt(), anyInt(), anyList()); + + Message receivedMessage = new Message(); + session.resendMessages(receivedMessage, beginSeqNo, endSeqNo); + + // Verify batches requested + verify(mockStore).get(1, 1001, new ArrayList<>()); + verify(mockStore).get(1001, 1200, new ArrayList<>()); + verifyNoMoreInteractions(mockStore); + } + + @Test + void testResendMessagesSkipsCorruptedMessage() throws Exception { + int beginSeqNo = 1, endSeqNo = 2; + ArrayList messages = new ArrayList<>(); + // First message is valid, second is corrupted + messages.add("8=FIX.4.4\u00019=12\u000135=0\u000134=1\u000149=SENDER\u000156=TARGET\u000110=000\u0001"); + messages.add("corrupted message"); + + doAnswer(invocation -> { + ArrayList msgs = invocation.getArgument(2); + msgs.addAll(messages); + return null; + }).when(mockStore).get(anyInt(), anyInt(), anyList()); + + Message receivedMessage = new Message(); + session.resendMessages(receivedMessage, beginSeqNo, endSeqNo); + + // Should log an error for the corrupted message and process the first one + verify(mockLog, atLeastOnce()).onErrorEvent(contains("Error handling ResendRequest")); + } +}