Skip to content

Commit 40d7c41

Browse files
committed
[#4873] Support for SSE interface RPC calls and register SSE schema
1 parent a9fdc22 commit 40d7c41

25 files changed

Lines changed: 515 additions & 37 deletions

File tree

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.servicecomb.common.rest.codec.produce;
19+
20+
import java.io.InputStream;
21+
import java.io.OutputStream;
22+
import java.nio.charset.StandardCharsets;
23+
24+
import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
25+
26+
import com.fasterxml.jackson.databind.JavaType;
27+
28+
import jakarta.ws.rs.core.MediaType;
29+
30+
public class ProduceEventStreamProcessor implements ProduceProcessor {
31+
private int writeIndex = 0;
32+
33+
@Override
34+
public String getName() {
35+
return MediaType.SERVER_SENT_EVENTS;
36+
}
37+
38+
@Override
39+
public int getOrder() {
40+
return 0;
41+
}
42+
43+
@Override
44+
public void doEncodeResponse(OutputStream output, Object result) throws Exception {
45+
String buffer = "id: " + (writeIndex++) + "\n"
46+
+ "data: "
47+
+ RestObjectMapperFactory.getRestObjectMapper().writeValueAsString(result)
48+
+ "\n\n";
49+
output.write(buffer.getBytes(StandardCharsets.UTF_8));
50+
}
51+
52+
@Override
53+
public Object doDecodeResponse(InputStream input, JavaType type) throws Exception {
54+
String buffer = new String(input.readAllBytes(), StandardCharsets.UTF_8);
55+
for (String line : buffer.split("\n")) {
56+
if (line.startsWith("data: ")) {
57+
return RestObjectMapperFactory.getRestObjectMapper().readValue(line.substring(5), type);
58+
}
59+
}
60+
return null;
61+
}
62+
}

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.servicecomb.swagger.engine.SwaggerProducerOperation;
4343
import org.slf4j.Logger;
4444
import org.slf4j.LoggerFactory;
45+
import org.springframework.util.CollectionUtils;
4546

4647
import com.fasterxml.jackson.annotation.JsonView;
4748

@@ -88,6 +89,10 @@ public class RestOperationMeta {
8889
// 快速构建URL path
8990
private URLPathBuilder pathBuilder;
9091

92+
protected boolean serverSendEvents;
93+
94+
protected static final String EVENTS_MEDIA_TYPE = MediaType.SERVER_SENT_EVENTS;
95+
9196
public void init(OperationMeta operationMeta) {
9297
this.operationMeta = operationMeta;
9398

@@ -99,6 +104,7 @@ public void init(OperationMeta operationMeta) {
99104
}
100105

101106
this.downloadFile = checkDownloadFileFlag();
107+
this.serverSendEvents = checkServerSendEvents();
102108
this.createProduceProcessors();
103109

104110
// 初始化所有rest param
@@ -146,6 +152,10 @@ public boolean isDownloadFile() {
146152
return downloadFile;
147153
}
148154

155+
public boolean isServerSendEvents() {
156+
return serverSendEvents;
157+
}
158+
149159
private boolean checkDownloadFileFlag() {
150160
Response response = operationMeta.getSwaggerOperation().getResponses().get("200");
151161
if (response != null) {
@@ -157,6 +167,10 @@ private boolean checkDownloadFileFlag() {
157167
return false;
158168
}
159169

170+
private boolean checkServerSendEvents() {
171+
return !CollectionUtils.isEmpty(produces) && produces.contains(EVENTS_MEDIA_TYPE);
172+
}
173+
160174
public boolean isFormData() {
161175
return formData;
162176
}

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@
4444
import org.apache.servicecomb.swagger.invocation.Response;
4545
import org.springframework.stereotype.Component;
4646

47-
import io.netty.buffer.Unpooled;
4847
import io.vertx.core.MultiMap;
48+
import io.vertx.core.buffer.Buffer;
4949

5050
@Component
5151
public class RestServerCodecFilter implements ProducerFilter {
@@ -103,7 +103,7 @@ public static CompletableFuture<Response> encodeResponse(Response response, bool
103103
}
104104

105105
responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
106-
try (BufferOutputStream output = new BufferOutputStream(Unpooled.compositeBuffer())) {
106+
try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
107107
produceProcessor.encodeResponse(output, response.getResult());
108108

109109
responseEx.setBodyBuffer(output.getBuffer());

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.apache.servicecomb.common.rest.RestConst;
2626
import org.apache.servicecomb.common.rest.codec.RestCodec;
27+
import org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor;
2728
import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
2829
import org.apache.servicecomb.common.rest.definition.RestOperationMeta;
2930
import org.apache.servicecomb.common.rest.filter.HttpServerFilter;
@@ -36,15 +37,22 @@
3637
import org.apache.servicecomb.swagger.invocation.Response;
3738
import org.apache.servicecomb.swagger.invocation.exception.ExceptionFactory;
3839
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
40+
import org.reactivestreams.Publisher;
41+
import org.reactivestreams.Subscriber;
42+
import org.reactivestreams.Subscription;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
3945

4046
import com.netflix.config.DynamicPropertyFactory;
4147

42-
import io.netty.buffer.Unpooled;
48+
import io.vertx.core.buffer.Buffer;
4349

4450
public class ServerRestArgsFilter implements HttpServerFilter {
4551
private static final boolean enabled = DynamicPropertyFactory.getInstance().getBooleanProperty
4652
("servicecomb.http.filter.server.serverRestArgs.enabled", true).get();
4753

54+
private static final Logger LOGGER = LoggerFactory.getLogger(ServerRestArgsFilter.class);
55+
4856
@Override
4957
public int getOrder() {
5058
return -100;
@@ -74,10 +82,15 @@ public CompletableFuture<Void> beforeSendResponseAsync(Invocation invocation, Ht
7482
return responseEx.sendPart(PartUtils.getSinglePart(null, response.getResult()));
7583
}
7684

77-
responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
85+
if (isServerSendEvent(response)) {
86+
produceProcessor = new ProduceEventStreamProcessor();
87+
responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
88+
return writeServerSendEvent(response, produceProcessor, responseEx);
89+
}
7890

91+
responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
7992
CompletableFuture<Void> future = new CompletableFuture<>();
80-
try (BufferOutputStream output = new BufferOutputStream(Unpooled.compositeBuffer())) {
93+
try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
8194
if (failed) {
8295
produceProcessor.encodeResponse(output, ((InvocationException) response.getResult()).getErrorData());
8396
} else {
@@ -91,4 +104,66 @@ public CompletableFuture<Void> beforeSendResponseAsync(Invocation invocation, Ht
91104
}
92105
return future;
93106
}
107+
108+
public static boolean isServerSendEvent(Response response) {
109+
return response.getResult() instanceof Publisher<?>;
110+
}
111+
112+
private static CompletableFuture<Void> writeServerSendEvent(Response response, ProduceProcessor produceProcessor,
113+
HttpServletResponseEx responseEx) {
114+
responseEx.setChunked(true);
115+
CompletableFuture<Void> result = new CompletableFuture<>();
116+
Publisher<?> publisher = response.getResult();
117+
publisher.subscribe(new Subscriber<Object>() {
118+
Subscription subscription;
119+
120+
@Override
121+
public void onSubscribe(Subscription s) {
122+
s.request(1);
123+
subscription = s;
124+
}
125+
126+
@Override
127+
public void onNext(Object o) {
128+
writeResponse(responseEx, produceProcessor, o, response).whenComplete((r, e) -> {
129+
if (e != null) {
130+
subscription.cancel();
131+
result.completeExceptionally(e);
132+
return;
133+
}
134+
subscription.request(1);
135+
});
136+
}
137+
138+
@Override
139+
public void onError(Throwable t) {
140+
result.completeExceptionally(t);
141+
}
142+
143+
@Override
144+
public void onComplete() {
145+
result.complete(null);
146+
}
147+
});
148+
return result;
149+
}
150+
151+
private static CompletableFuture<Response> writeResponse(
152+
HttpServletResponseEx responseEx, ProduceProcessor produceProcessor, Object data, Response response) {
153+
try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
154+
produceProcessor.encodeResponse(output, data);
155+
CompletableFuture<Response> result = new CompletableFuture<>();
156+
responseEx.sendBuffer(output.getBuffer()).whenComplete((v, e) -> {
157+
if (e != null) {
158+
result.completeExceptionally(e);
159+
}
160+
});
161+
result.complete(response);
162+
return result;
163+
} catch (Throwable e) {
164+
LOGGER.error("internal service error must be fixed.", e);
165+
responseEx.setStatus(500);
166+
return CompletableFuture.failedFuture(e);
167+
}
168+
}
94169
}

common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@
1616
#
1717

1818
org.apache.servicecomb.common.rest.codec.produce.ProduceJsonProcessor
19-
org.apache.servicecomb.common.rest.codec.produce.ProduceTextPlainProcessor
19+
org.apache.servicecomb.common.rest.codec.produce.ProduceTextPlainProcessor
20+
org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor

core/src/main/java/org/apache/servicecomb/core/Const.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,6 @@ private Const() {
5959

6060
// controlling whether to print stack information with sensitive errors
6161
public static final String PRINT_SENSITIVE_ERROR_MESSAGE = "servicecomb.error.printSensitiveErrorMessage";
62+
63+
public static final String FLOWABLE_CLIENT_RESPONSE = "flowable-client-response";
6264
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.servicecomb.samples;
18+
19+
20+
import org.apache.servicecomb.core.annotation.Transport;
21+
import org.apache.servicecomb.provider.pojo.RpcReference;
22+
import org.apache.servicecomb.provider.rest.common.RestSchema;
23+
import org.reactivestreams.Publisher;
24+
import org.springframework.web.bind.annotation.GetMapping;
25+
import org.springframework.web.bind.annotation.RequestMapping;
26+
27+
@RestSchema(schemaId = "ReactiveStreamController")
28+
@RequestMapping(path = "/")
29+
public class ConsumerReactiveStreamController {
30+
interface ProviderReactiveStreamController {
31+
Publisher<String> sseString();
32+
33+
Publisher<Model> sseModel();
34+
}
35+
36+
@RpcReference(microserviceName = "provider", schemaId = "ReactiveStreamController")
37+
ProviderReactiveStreamController controller;
38+
39+
public static class Model {
40+
private String name;
41+
42+
private int age;
43+
44+
public Model() {
45+
46+
}
47+
48+
public Model(String name, int age) {
49+
this.name = name;
50+
this.age = age;
51+
}
52+
53+
public int getAge() {
54+
return age;
55+
}
56+
57+
public Model setAge(int age) {
58+
this.age = age;
59+
return this;
60+
}
61+
62+
public String getName() {
63+
return name;
64+
}
65+
66+
public Model setName(String name) {
67+
this.name = name;
68+
return this;
69+
}
70+
}
71+
72+
@GetMapping("/sseString")
73+
@Transport(name = "rest")
74+
public Publisher<String> sseString() {
75+
return controller.sseString();
76+
}
77+
78+
@GetMapping("/sseModel")
79+
@Transport(name = "rest")
80+
public Publisher<Model> sseModel() {
81+
return controller.sseModel();
82+
}
83+
}

demo/demo-cse-v2/provider/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@
5252
</exclusion>
5353
</exclusions>
5454
</dependency>
55+
<dependency>
56+
<groupId>io.reactivex.rxjava3</groupId>
57+
<artifactId>rxjava</artifactId>
58+
</dependency>
5559
</dependencies>
5660

5761
<build>

0 commit comments

Comments
 (0)