Skip to content

Commit 0da8280

Browse files
committed
Compatible with both packet aggregation and packet fragmentation scenarios
1 parent e38bb52 commit 0da8280

6 files changed

Lines changed: 638 additions & 42 deletions

File tree

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java

Lines changed: 298 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,51 @@
1717

1818
package org.apache.servicecomb.common.rest.codec.produce;
1919

20+
import java.io.ByteArrayInputStream;
21+
import java.io.ByteArrayOutputStream;
22+
import java.io.IOException;
2023
import java.io.InputStream;
2124
import java.io.OutputStream;
2225
import java.nio.charset.StandardCharsets;
2326
import java.util.ArrayList;
24-
import java.util.Arrays;
2527
import java.util.List;
2628

2729
import org.apache.commons.lang3.StringUtils;
2830
import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
31+
import org.apache.servicecomb.foundation.vertx.stream.BufferInputStream;
2932
import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
33+
import org.reactivestreams.Publisher;
3034
import org.slf4j.Logger;
3135
import org.slf4j.LoggerFactory;
3236
import org.springframework.util.CollectionUtils;
3337

38+
import com.fasterxml.jackson.core.JsonProcessingException;
3439
import com.fasterxml.jackson.databind.JavaType;
3540

41+
import io.netty.buffer.ByteBuf;
42+
import io.netty.buffer.Unpooled;
43+
import io.reactivex.rxjava3.core.Flowable;
44+
import io.vertx.core.buffer.Buffer;
3645
import jakarta.ws.rs.core.MediaType;
3746

3847
public class ProduceEventStreamProcessor implements ProduceProcessor {
3948
private static final Logger LOGGER = LoggerFactory.getLogger(ProduceEventStreamProcessor.class);
4049

41-
public static final List<String> DEFAULT_DELIMITERS = Arrays.asList("\r\n", "\n", "\r");
50+
private static final String CR_STR = "\r";
51+
52+
private static final byte[] CR = CR_STR.getBytes(StandardCharsets.UTF_8);
53+
54+
private static final String LF_STR = "\n";
55+
56+
private static final byte[] LF = LF_STR.getBytes(StandardCharsets.UTF_8);
57+
58+
private static final String CRLF_STR = "\r\n";
59+
60+
private static final byte[] CRLF = CRLF_STR.getBytes(StandardCharsets.UTF_8);
61+
62+
private String lineDelimiter;
63+
64+
private byte[] lineDelimiterBytes;
4265

4366
private int writeIndex = 0;
4467

@@ -67,52 +90,268 @@ public void doEncodeResponse(OutputStream output, Object result) throws Exceptio
6790
}
6891
}
6992

93+
private enum ProcessStatus {
94+
DETERMINE_LINE_DELIMITER,
95+
MATCHING_CR,
96+
MATCHING_LF,
97+
MATCHING_CRLF,
98+
MATCHING_LINE,
99+
END_OF_MESSAGE,
100+
/**
101+
* The whole SSE stream is closed.
102+
* Be careful: there may be remaining buffer should be processed.
103+
*/
104+
END_OF_STREAM
105+
}
106+
107+
private ProcessStatus loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER;
108+
109+
private int matchingDelimiterIndex = 0;
110+
111+
final ByteBuf buffer = Unpooled.buffer();
112+
113+
private SseEventResponseEntity<?> currentEntity = new SseEventResponseEntity<>();
114+
115+
private List<SseEventResponseEntity<?>> entityList = new ArrayList<>();
116+
117+
private JavaType type;
118+
70119
@Override
71-
public Object doDecodeResponse(InputStream input, JavaType type) throws Exception {
72-
String buffer = new String(input.readAllBytes(), StandardCharsets.UTF_8);
73-
List<String> lines = new ArrayList<>();
74-
splitStringByDelimiters(buffer, lines);
75-
SseEventResponseEntity<?> responseEntity = new SseEventResponseEntity<>();
76-
for (String line : lines) {
77-
if (line.startsWith("id:")) {
78-
responseEntity.id(Integer.parseInt(line.substring("id:".length()).trim()));
79-
continue;
120+
public List<SseEventResponseEntity<?>> doDecodeResponse(InputStream input, JavaType type) throws Exception {
121+
this.type = type;
122+
final byte[] readCache = new byte[Math.min(128, input.available())];
123+
int bytesRead;
124+
while ((bytesRead = input.read(readCache)) > 0) {
125+
processAllBytes(readCache, bytesRead);
126+
}
127+
final List<SseEventResponseEntity<?>> resultList = entityList;
128+
entityList = new ArrayList<>();
129+
return resultList;
130+
}
131+
132+
private void processAllBytes(byte[] readCache, int cacheEndPos) {
133+
int lastProcessedPosition = innerLoop(readCache, 0, cacheEndPos);
134+
while (lastProcessedPosition < cacheEndPos) {
135+
lastProcessedPosition = innerLoop(readCache, lastProcessedPosition, cacheEndPos);
136+
}
137+
}
138+
139+
private int innerLoop(final byte[] readCache, final int startPos, final int cacheEndPos) {
140+
if (startPos >= cacheEndPos) {
141+
return cacheEndPos;
142+
}
143+
switch (loopStatus) {
144+
case MATCHING_CR -> {
145+
return tryToMatchDelimiterCR(readCache, startPos, cacheEndPos);
146+
}
147+
case MATCHING_CRLF -> {
148+
return tryToMatchDelimiterCRLF(readCache, startPos, cacheEndPos);
149+
}
150+
case MATCHING_LF -> {
151+
return tryToMatchDelimiterLF(readCache, startPos, cacheEndPos);
152+
}
153+
case DETERMINE_LINE_DELIMITER -> {
154+
return searchFirstLineDelimiter(readCache, startPos, cacheEndPos);
155+
}
156+
case MATCHING_LINE -> {
157+
return bufferReadCacheAndProcessLines(readCache, startPos, cacheEndPos);
158+
}
159+
case END_OF_STREAM -> {
160+
return processLeftBuffer(cacheEndPos);
161+
}
162+
default -> throw new IllegalStateException("unexpected case");
163+
}
164+
}
165+
166+
private int processLeftBuffer(int cacheEndPos) {
167+
final byte[] bytes = readAllBytesFromBuffer(buffer);
168+
final String bufferStr = new String(bytes, StandardCharsets.UTF_8);
169+
processStringBuffer(bufferStr);
170+
return cacheEndPos;
171+
}
172+
173+
private int bufferReadCacheAndProcessLines(byte[] readCache, int startPos, int cacheEndPos) {
174+
buffer.writeBytes(readCache, startPos, cacheEndPos - startPos);
175+
processAllAvailableBufferLines();
176+
return cacheEndPos;
177+
}
178+
179+
private int tryToMatchDelimiterCR(byte[] readCache, int startPos, int cacheEndPos) {
180+
int bytesProcessed = 0;
181+
for (; matchingDelimiterIndex < CR.length && startPos + bytesProcessed < cacheEndPos; ++bytesProcessed) {
182+
if (readCache[startPos + bytesProcessed] == CR[matchingDelimiterIndex]) {
183+
buffer.writeByte(readCache[startPos + bytesProcessed]);
184+
++matchingDelimiterIndex;
185+
} else {
186+
loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER;
187+
matchingDelimiterIndex = 0;
188+
return startPos + bytesProcessed;
189+
}
190+
}
191+
if (matchingDelimiterIndex == CR.length) {
192+
// matched all CR bytes, attempting to further match CRLF.
193+
loopStatus = ProcessStatus.MATCHING_CRLF;
194+
}
195+
return startPos + bytesProcessed;
196+
}
197+
198+
private int tryToMatchDelimiterCRLF(byte[] readCache, int startPos, int cacheEndPos) {
199+
// If you enter this branch, it means that at least CR should be used as the line break character.
200+
int bytesProcessed = 0;
201+
for (; matchingDelimiterIndex < CRLF.length && startPos + bytesProcessed < cacheEndPos; ++bytesProcessed) {
202+
if (readCache[startPos + bytesProcessed] == CRLF[matchingDelimiterIndex]) {
203+
buffer.writeByte(readCache[startPos + bytesProcessed]);
204+
++matchingDelimiterIndex;
205+
} else {
206+
determineDelimiter(CR_STR, CR);
207+
return startPos + bytesProcessed;
80208
}
81-
if (line.startsWith("event:")) {
82-
responseEntity.event(line.substring("event:".length()).trim());
83-
continue;
209+
}
210+
if (matchingDelimiterIndex == CRLF.length) {
211+
determineDelimiter(CRLF_STR, CRLF);
212+
}
213+
return startPos + bytesProcessed;
214+
}
215+
216+
private int tryToMatchDelimiterLF(byte[] readCache, int startPos, int cacheEndPos) {
217+
int bytesProcessed = 0;
218+
for (; matchingDelimiterIndex < LF.length && startPos + bytesProcessed < cacheEndPos; ++bytesProcessed) {
219+
if (readCache[startPos + bytesProcessed] == LF[matchingDelimiterIndex]) {
220+
buffer.writeByte(readCache[startPos + bytesProcessed]);
221+
++matchingDelimiterIndex;
222+
} else {
223+
loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER;
224+
matchingDelimiterIndex = 0;
225+
return startPos + bytesProcessed;
84226
}
85-
if (line.startsWith("retry:")) {
86-
responseEntity.retry(Long.parseLong(line.substring("retry:".length()).trim()));
87-
continue;
227+
}
228+
if (matchingDelimiterIndex == LF.length) {
229+
determineDelimiter(LF_STR, LF);
230+
}
231+
return startPos + bytesProcessed;
232+
}
233+
234+
private void determineDelimiter(String delimiterStr, byte[] delimiterBytes) {
235+
lineDelimiter = delimiterStr;
236+
lineDelimiterBytes = delimiterBytes;
237+
matchingDelimiterIndex = 0;
238+
loopStatus = ProcessStatus.MATCHING_LINE;
239+
}
240+
241+
private int searchFirstLineDelimiter(byte[] readCache, int startPos, int cacheEndPos) {
242+
for (int i = startPos; i < cacheEndPos; ++i) {
243+
if (readCache[i] == CR[0]) {
244+
loopStatus = ProcessStatus.MATCHING_CR;
245+
matchingDelimiterIndex = 0;
246+
return i;
247+
} else if (readCache[i] == LF[0]) {
248+
loopStatus = ProcessStatus.MATCHING_LF;
249+
matchingDelimiterIndex = 0;
250+
return i;
251+
} else {
252+
buffer.writeByte(readCache[i]);
88253
}
89-
if (line.startsWith("data:")) {
90-
responseEntity.data(RestObjectMapperFactory.getRestObjectMapper()
91-
.readValue(line.substring("data:".length()).trim(), type));
254+
}
255+
return cacheEndPos;
256+
}
257+
258+
private void processAllAvailableBufferLines() {
259+
while (buffer.readableBytes() > 0) {
260+
final byte[] bytes = readALineOfBytesFromBuffer(buffer);
261+
if (bytes == null || bytes.length == 0) {
262+
return;
92263
}
264+
final String bufferStr = new String(bytes, StandardCharsets.UTF_8);
265+
processStringBuffer(bufferStr);
266+
}
267+
}
268+
269+
private void processStringBuffer(String bufferStr) {
270+
int cursor = 0;
271+
int delimiterIdx;
272+
while ((delimiterIdx = bufferStr.indexOf(lineDelimiter, cursor)) >= 0) {
273+
final String line = bufferStr.substring(cursor, delimiterIdx);
274+
processStringLine(line);
275+
cursor = delimiterIdx + lineDelimiter.length();
276+
}
277+
if (cursor < bufferStr.length()) {
278+
buffer.writeBytes(bufferStr.substring(cursor).getBytes(StandardCharsets.UTF_8));
93279
}
94-
return responseEntity;
95280
}
96281

97-
private void splitStringByDelimiters(String str, List<String> lines) {
98-
boolean isContainsDelimiters = false;
99-
for (String split : DEFAULT_DELIMITERS) {
100-
if (str.contains(split)) {
101-
isContainsDelimiters = true;
102-
splitStrings(str.split(split), lines);
282+
private void processStringLine(String line) {
283+
if (StringUtils.isBlank(line)) {
284+
if (currentEntity.isEmpty()) {
285+
return;
103286
}
287+
entityList.add(currentEntity);
288+
currentEntity = new SseEventResponseEntity<>();
289+
return;
104290
}
105-
if (!isContainsDelimiters) {
106-
lines.add(str);
291+
final String[] split = line.split(":", 2);
292+
if (split.length < 2) {
293+
LOGGER.error("get a line of sse event without colon! stream is breaking!");
294+
throw new IllegalStateException("get a line of sse event without colon!");
295+
}
296+
switch (split[0]) {
297+
case "event" -> {
298+
if (StringUtils.isNotBlank(split[1])) {
299+
currentEntity.event(split[1].trim());
300+
}
301+
}
302+
case "id" -> {
303+
if (StringUtils.isNotBlank(split[1])) {
304+
currentEntity.id(Integer.parseInt(split[1].trim()));
305+
}
306+
}
307+
case "data" -> {
308+
try {
309+
currentEntity.data(RestObjectMapperFactory.getRestObjectMapper().readValue(split[1].trim(), type));
310+
} catch (JsonProcessingException e) {
311+
LOGGER.error("failed to process data of sse event: [{}]", e.getMessage());
312+
throw new IllegalStateException("failed to process data of sse event", e);
313+
}
314+
}
315+
case "retry" -> {
316+
if (StringUtils.isNotBlank(split[1])) {
317+
currentEntity.retry(Long.parseLong(split[1].trim()));
318+
}
319+
}
320+
default -> {
321+
LOGGER.debug("unrecognized sse message line! ignored string segment length=[{}]", line.length());
322+
}
107323
}
108324
}
109325

110-
private void splitStrings(String[] strings, List<String> lines) {
111-
for (String str : strings) {
112-
if (StringUtils.isEmpty(str)) {
113-
continue;
326+
private byte[] readALineOfBytesFromBuffer(ByteBuf buffer) {
327+
matchingDelimiterIndex = 0;
328+
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream(buffer.readableBytes())) {
329+
while (buffer.readableBytes() > 0 && matchingDelimiterIndex < lineDelimiterBytes.length) {
330+
final byte b = buffer.readByte();
331+
if (b == lineDelimiterBytes[matchingDelimiterIndex]) {
332+
++matchingDelimiterIndex;
333+
}
334+
bos.write(b);
114335
}
115-
splitStringByDelimiters(str, lines);
336+
if (matchingDelimiterIndex < lineDelimiterBytes.length) {
337+
// The newline character was not matched, so this part of the buffer does not constitute a complete line of
338+
// content and needs to remain in the buffer, waiting for the next segment to arrive for processing.
339+
buffer.writeBytes(bos.toByteArray());
340+
return null;
341+
}
342+
matchingDelimiterIndex = 0;
343+
return bos.toByteArray();
344+
} catch (IOException e) {
345+
throw new IllegalStateException("impossible error while closing ByteArrayOutputStream", e);
346+
}
347+
}
348+
349+
private byte[] readAllBytesFromBuffer(ByteBuf buffer) {
350+
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream(buffer.readableBytes())) {
351+
buffer.readBytes(bos, buffer.readableBytes());
352+
return bos.toByteArray();
353+
} catch (IOException e) {
354+
throw new IllegalStateException("impossible error while closing ByteArrayOutputStream", e);
116355
}
117356
}
118357

@@ -145,4 +384,29 @@ private void appendData(StringBuilder eventBuilder, List<?> datas) throws Except
145384
.append("\n");
146385
}
147386
}
387+
388+
@Override
389+
public Publisher<SseEventResponseEntity<?>> decodeResponse(Buffer buffer, JavaType type) throws Exception {
390+
if (buffer.length() == 0) {
391+
return Flowable.empty();
392+
}
393+
394+
try (BufferInputStream input = new BufferInputStream(buffer.getByteBuf())) {
395+
final List<SseEventResponseEntity<?>> list = doDecodeResponse(input, type);
396+
return Flowable.fromIterable(list);
397+
}
398+
}
399+
400+
public Publisher<SseEventResponseEntity<?>> close() throws Exception {
401+
if (type == null) {
402+
return Flowable.empty();
403+
}
404+
try (final ByteArrayInputStream input = new ByteArrayInputStream(
405+
(lineDelimiter + lineDelimiter).getBytes(StandardCharsets.UTF_8))) {
406+
// Write two additional newline characters into the buffer to ensure that the processor completes
407+
// processing all remaining content in the buffer.
408+
final List<SseEventResponseEntity<?>> list = doDecodeResponse(input, type);
409+
return Flowable.fromIterable(list);
410+
}
411+
}
148412
}

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void onNext(Object o) {
135135
}
136136
subscription.request(1);
137137
});
138-
} catch (Exception e) {
138+
} catch (Throwable e) {
139139
LOGGER.warn("Failed to subscribe event: {}", o, e);
140140
result.completeExceptionally(e);
141141
}

0 commit comments

Comments
 (0)