@@ -241,9 +241,8 @@ private static class ExtProcClientCall<ReqT, RespT> extends SimpleForwardingClie
241241 private final ExternalProcessorGrpc .ExternalProcessorStub stub ;
242242 private final MethodDescriptor <ReqT , RespT > method ;
243243 private final ExternalProcessor config ;
244- private ClientCallStreamObserver <ProcessingRequest > clientCallRequestObserver ;
245- private final Object extProcLock = new Object ();
246- private boolean extProcStreamReady ;
244+ private ClientCallStreamObserver <ProcessingRequest > extProcClientCallRequestObserver ;
245+ private ExtProcListener <ReqT , RespT > wrappedListener ;
247246
248247 private boolean headersSent = false ;
249248 private Metadata requestHeaders ;
@@ -264,9 +263,9 @@ protected ExtProcClientCall(ClientCall<ReqT, RespT> delegate,
264263 @ Override
265264 public void start (Listener <RespT > responseListener , Metadata headers ) {
266265 this .requestHeaders = headers ;
267- ExternalProcessorInterceptor . ExtProcListener < ReqT , RespT > wrappedListener = new ExternalProcessorInterceptor .ExtProcListener <>(responseListener , delegate (), method , this );
266+ this . wrappedListener = new ExternalProcessorInterceptor .ExtProcListener <>(responseListener , delegate (), method , this );
268267
269- clientCallRequestObserver = (ClientCallStreamObserver <ProcessingRequest >) stub .process (new io .grpc .stub .StreamObserver <ProcessingResponse >() {
268+ extProcClientCallRequestObserver = (ClientCallStreamObserver <ProcessingRequest >) stub .process (new io .grpc .stub .StreamObserver <ProcessingResponse >() {
270269 @ Override
271270 public void onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingResponse response ) {
272271 if (response .hasImmediateResponse ()) {
@@ -280,7 +279,7 @@ public void onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse re
280279
281280 if (response .getRequestDrain ()) {
282281 handleFailOpen (wrappedListener );
283- clientCallRequestObserver .onCompleted ();
282+ extProcClientCallRequestObserver .onCompleted ();
284283 return ;
285284 }
286285
@@ -304,7 +303,7 @@ else if (response.hasRequestBody()) {
304303 io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
305304 .withDescription ("gRPC message compression not supported in ext_proc" )
306305 .asRuntimeException ();
307- clientCallRequestObserver .onError (ex );
306+ extProcClientCallRequestObserver .onError (ex );
308307 onError (ex );
309308 return ;
310309 }
@@ -327,7 +326,7 @@ else if (response.hasResponseBody()) {
327326 io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
328327 .withDescription ("gRPC message compression not supported in ext_proc" )
329328 .asRuntimeException ();
330- clientCallRequestObserver .onError (ex );
329+ extProcClientCallRequestObserver .onError (ex );
331330 onError (ex );
332331 return ;
333332 }
@@ -365,13 +364,12 @@ public void onCompleted() {
365364 });
366365
367366 if (config .getObservabilityMode ()) {
368- this .extProcStreamReady = clientCallRequestObserver .isReady ();
369- clientCallRequestObserver .setOnReadyHandler (this ::onExtProcStreamReady );
367+ extProcClientCallRequestObserver .setOnReadyHandler (this ::onExtProcStreamReady );
370368 }
371369
372- wrappedListener .setStream (clientCallRequestObserver );
370+ wrappedListener .setStream (extProcClientCallRequestObserver );
373371
374- sendToExtProc (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
372+ extProcClientCallRequestObserver . onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
375373 .setRequestHeaders (io .envoyproxy .envoy .service .ext_proc .v3 .HttpHeaders .newBuilder ()
376374 .setHeaders (toHeaderMap (headers ))
377375 .build ())
@@ -384,31 +382,17 @@ public void onCompleted() {
384382 }
385383
386384 private void onExtProcStreamReady () {
387- synchronized (extProcLock ) {
388- extProcStreamReady = true ;
389- extProcLock .notifyAll ();
385+ if (isReady ()) {
386+ wrappedListener .onReady ();
390387 }
391388 }
392389
393- private void sendToExtProc (ProcessingRequest request ) {
394- if (!config .getObservabilityMode ()) {
395- clientCallRequestObserver .onNext (request );
396- return ;
397- }
398-
399- synchronized (extProcLock ) {
400- while (!extProcStreamReady ) {
401- try {
402- extProcLock .wait ();
403- } catch (InterruptedException e ) {
404- Thread .currentThread ().interrupt ();
405- delegate ().cancel ("Interrupted while waiting for ext_proc stream" , e );
406- return ;
407- }
408- }
409- clientCallRequestObserver .onNext (request );
410- extProcStreamReady = clientCallRequestObserver .isReady ();
390+ @ Override
391+ public boolean isReady () {
392+ if (!config .getObservabilityMode () || extProcStreamCompleted .get ()) {
393+ return super .isReady ();
411394 }
395+ return super .isReady () && extProcClientCallRequestObserver .isReady ();
412396 }
413397
414398 @ Override
@@ -426,7 +410,7 @@ public void sendMessage(ReqT message) {
426410
427411 try (InputStream is = method .streamRequest (message )) {
428412 byte [] bodyBytes = ByteStreams .toByteArray (is );
429- sendToExtProc (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
413+ extProcClientCallRequestObserver . onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
430414 .setRequestBody (io .envoyproxy .envoy .service .ext_proc .v3 .HttpBody .newBuilder ()
431415 .setBody (com .google .protobuf .ByteString .copyFrom (bodyBytes ))
432416 .setEndOfStream (false )
@@ -449,7 +433,7 @@ public void halfClose() {
449433 }
450434
451435 // Signal end of request body stream to the external processor.
452- sendToExtProc (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
436+ extProcClientCallRequestObserver . onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
453437 .setRequestBody (io .envoyproxy .envoy .service .ext_proc .v3 .HttpBody .newBuilder ()
454438 .setEndOfStream (true )
455439 .build ())
@@ -504,7 +488,7 @@ private void handleImmediateResponse(io.envoyproxy.envoy.service.ext_proc.v3.Imm
504488 io .grpc .Status status = io .grpc .Status .fromCodeValue (immediate .getGrpcStatus ().getStatus ());
505489 delegate ().cancel ("Rejected by ExtProc" , null );
506490 listener .onClose (status , new Metadata ());
507- clientCallRequestObserver .onCompleted ();
491+ extProcClientCallRequestObserver .onCompleted ();
508492 }
509493
510494 private void handleFailOpen (ExtProcListener <ReqT , RespT > listener ) {
@@ -524,36 +508,43 @@ private void handleFailOpen(ExtProcListener<ReqT, RespT> listener) {
524508 private static class ExtProcListener <ReqT , RespT > extends ForwardingClientCallListener .SimpleForwardingClientCallListener <RespT > {
525509 private final MethodDescriptor <?, RespT > method ;
526510 private final ClientCall <?, RespT > callDelegate ; // The actual RPC call
527- private final ExtProcClientCall <ReqT , RespT > call ;
511+ private final ExtProcClientCall <ReqT , RespT > extProcClientCall ;
528512 private ClientCallStreamObserver <io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest > stream ;
529513 private Metadata savedHeaders ;
530514 private Metadata savedTrailers ;
531515 private io .grpc .Status savedStatus ;
532516
533517 protected ExtProcListener (ClientCall .Listener <RespT > delegate , ClientCall <?, RespT > callDelegate ,
534- MethodDescriptor <?, RespT > method , ExtProcClientCall <ReqT , RespT > call ) {
518+ MethodDescriptor <?, RespT > method , ExtProcClientCall <ReqT , RespT > extProcClientCall ) {
535519 super (delegate );
536520 this .method = method ;
537521 this .callDelegate = callDelegate ;
538- this .call = call ;
522+ this .extProcClientCall = extProcClientCall ;
539523 }
540524
541525 void setStream (ClientCallStreamObserver <io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest > stream ) { this .stream = stream ; }
542526
527+ @ Override
528+ public void onReady () {
529+ if (extProcClientCall .isReady ()) {
530+ super .onReady ();
531+ }
532+ }
533+
543534 @ Override
544535 public void onHeaders (Metadata headers ) {
545- if (call .extProcStreamCompleted .get ()) {
536+ if (extProcClientCall .extProcStreamCompleted .get ()) {
546537 super .onHeaders (headers );
547538 return ;
548539 }
549540 this .savedHeaders = headers ;
550- call . sendToExtProc (ProcessingRequest .newBuilder ()
541+ extProcClientCall . extProcClientCallRequestObserver . onNext (ProcessingRequest .newBuilder ()
551542 .setResponseHeaders (io .envoyproxy .envoy .service .ext_proc .v3 .HttpHeaders .newBuilder ()
552543 .setHeaders (toHeaderMap (headers ))
553544 .build ())
554545 .build ());
555546
556- if (call .config .getObservabilityMode ()) {
547+ if (extProcClientCall .config .getObservabilityMode ()) {
557548 super .onHeaders (headers );
558549 }
559550 }
@@ -562,28 +553,28 @@ public void onHeaders(Metadata headers) {
562553
563554 @ Override
564555 public void onMessage (RespT message ) {
565- if (call .extProcStreamCompleted .get ()) {
556+ if (extProcClientCall .extProcStreamCompleted .get ()) {
566557 super .onMessage (message );
567558 return ;
568559 }
569560 sendResponseBodyToExtProc (message , false );
570561
571- if (call .config .getObservabilityMode ()) {
562+ if (extProcClientCall .config .getObservabilityMode ()) {
572563 super .onMessage (message );
573564 }
574565 }
575566
576567 @ Override
577568 public void onClose (io .grpc .Status status , Metadata trailers ) {
578- if (call .extProcStreamFailed .get ()) {
569+ if (extProcClientCall .extProcStreamFailed .get ()) {
579570 // The ext_proc stream died, which caused delegate().cancel() to be called, leading here.
580571 // The incoming status will be CANCELLED. We must not attempt to forward the server's
581572 // response trailers to the now-dead ext_proc stream. Instead, we close the
582573 // application's call with UNAVAILABLE as per the gRFC.
583574 super .onClose (Status .UNAVAILABLE .withDescription ("External processor stream failed" ).withCause (status .getCause ()), new Metadata ());
584575 return ;
585576 }
586- if (call .extProcStreamCompleted .get ()) {
577+ if (extProcClientCall .extProcStreamCompleted .get ()) {
587578 super .onClose (status , trailers );
588579 return ;
589580 }
@@ -595,19 +586,19 @@ public void onClose(io.grpc.Status status, Metadata trailers) {
595586 sendResponseBodyToExtProc (null , true );
596587
597588 // Event 6: Server Trailers with ACTUAL data
598- call . sendToExtProc (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
589+ extProcClientCall . extProcClientCallRequestObserver . onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
599590 .setResponseTrailers (io .envoyproxy .envoy .service .ext_proc .v3 .HttpTrailers .newBuilder ()
600591 .setTrailers (toHeaderMap (savedTrailers )) // Map the captured trailers here
601592 .build ())
602593 .build ());
603594
604- if (call .config .getObservabilityMode ()) {
595+ if (extProcClientCall .config .getObservabilityMode ()) {
605596 super .onClose (status , trailers );
606597 }
607598 }
608599
609600 private void sendResponseBodyToExtProc (@ Nullable RespT message , boolean endOfStream ) {
610- if (call .extProcStreamCompleted .get ()) {
601+ if (extProcClientCall .extProcStreamCompleted .get ()) {
611602 return ;
612603 }
613604 try {
@@ -621,7 +612,7 @@ private void sendResponseBodyToExtProc(@Nullable RespT message, boolean endOfStr
621612 }
622613 bodyBuilder .setEndOfStream (endOfStream );
623614
624- call . sendToExtProc (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
615+ extProcClientCall . extProcClientCallRequestObserver . onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
625616 .setResponseBody (bodyBuilder .build ())
626617 .build ());
627618
0 commit comments