From 8acc384b98b48562e4d1a256f21cc9043e0c24ca Mon Sep 17 00:00:00 2001 From: Wyatt Anderson Date: Wed, 1 Jul 2020 00:33:09 -0400 Subject: [PATCH 1/4] use opentracing.scope --- .../open_tracing_client_interceptor.py | 64 ++++++++----------- .../open_tracing_server_interceptor.py | 31 ++++----- grpc_opentracing/grpc_interceptor/utils.py | 13 ++-- grpc_opentracing/scope.py | 20 ------ 4 files changed, 44 insertions(+), 84 deletions(-) delete mode 100644 grpc_opentracing/scope.py diff --git a/grpc_opentracing/grpc_interceptor/open_tracing_client_interceptor.py b/grpc_opentracing/grpc_interceptor/open_tracing_client_interceptor.py index 4a2c051..3c7d92e 100644 --- a/grpc_opentracing/grpc_interceptor/open_tracing_client_interceptor.py +++ b/grpc_opentracing/grpc_interceptor/open_tracing_client_interceptor.py @@ -8,7 +8,6 @@ import six from opentracing.ext import tags as ot_tags -from grpc_opentracing import scope from grpc_opentracing.grpc_interceptor import utils as grpc_utils log = logging.getLogger(__name__) @@ -22,14 +21,14 @@ class _ClientCallDetails( pass -def _inject_span_context(tracer, span, metadata): +def _inject_span_context(span, metadata): headers = {} try: - tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, headers) + opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, headers) except (opentracing.UnsupportedFormatException, opentracing.InvalidCarrierException, opentracing.SpanContextCorruptedException) as e: - logging.exception('tracer.inject() failed') + logging.exception('opentracing.tracer.inject() failed') span.log_kv({'event': 'error', 'error.object': e}) return metadata metadata = () if metadata is None else tuple(metadata) @@ -41,22 +40,9 @@ class OpenTracingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor): - def __init__(self, tracer, log_payloads): - self._tracer = tracer + def __init__(self, log_payloads): self._log_payloads = log_payloads - def _start_span(self, method): - active_span_context = None - active_span = scope.get_active_span() - if active_span is not None: - active_span_context = active_span.context - tags = { - ot_tags.COMPONENT: 'grpc', - ot_tags.SPAN_KIND: ot_tags.SPAN_KIND_RPC_CLIENT - } - return self._tracer.start_span( - operation_name=method, child_of=active_span_context, tags=tags) - def _intercept_call( self, client_call_details, request_iterator ): @@ -64,11 +50,16 @@ def _intercept_call( if client_call_details.metadata is not None: metadata = client_call_details.metadata - # Start a span - current_span = self._start_span(client_call_details.method) + current_span = opentracing.tracer.start_span( + child_of=opentracing.tracer.active_span, + operation_name=client_call_details.method, + tags={ + ot_tags.COMPONENT: 'grpc', + ot_tags.SPAN_KIND: ot_tags.SPAN_KIND_RPC_CLIENT + }, + ) - metadata = _inject_span_context(self._tracer, current_span, - metadata) + metadata = _inject_span_context(current_span, metadata) client_call_details = _ClientCallDetails( client_call_details.method, client_call_details.timeout, @@ -86,19 +77,18 @@ def _intercept_call( def _callback(self, current_span): def callback(future_response): - exception = future_response.exception() - if exception is not None: - exception = str(exception) - current_span.set_tag('error', True) - error_log = {'event': 'error', 'error.kind': exception} - current_span.log_kv(error_log) - - if self._log_payloads: - response = future_response.result() - current_span.log_kv({'response': response}) - - scope.set_active_span(current_span) - scope.end_span() + try: + with current_span: + # ``result()`` will raise a stored exception if one exists, + # and the span context manager will capture it and log it + # for us. + response = future_response.result() + if self._log_payloads: + current_span.log_kv({'response': response}) + except: + # Ignore the exception. Exceptions in future callbacks don't + # propagate anyway and this will only generate log noise. + pass return callback @@ -133,7 +123,7 @@ def intercept_unary_stream( response_it = grpc_utils.log_or_wrap_response_or_iterator( current_span, True, response_it ) - response_it = grpc_utils.wrap_iter_with_end_span(response_it) + response_it = grpc_utils.wrap_iter_with_end_span(response_it, current_span) return response_it @@ -168,6 +158,6 @@ def intercept_stream_stream( response_it = grpc_utils.log_or_wrap_response_or_iterator( current_span, True, response_it ) - response_it = grpc_utils.wrap_iter_with_end_span(response_it) + response_it = grpc_utils.wrap_iter_with_end_span(response_it, current_span) return response_it diff --git a/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py b/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py index da15a52..cf34462 100644 --- a/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py +++ b/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py @@ -7,7 +7,6 @@ import opentracing from opentracing.ext import tags as ot_tags -from grpc_opentracing import scope from grpc_opentracing.grpc_interceptor import utils as grpc_utils _CANCELLED = 'cancelled' @@ -15,8 +14,7 @@ class OpenTracingServerInterceptor(grpc.ServerInterceptor): - def __init__(self, tracer, log_payloads): - self._tracer = tracer + def __init__(self, log_payloads): self._log_payloads = log_payloads def _start_span(self, servicer_context, method): @@ -25,7 +23,7 @@ def _start_span(self, servicer_context, method): metadata = servicer_context.invocation_metadata() try: if metadata: - span_context = self._tracer.extract( + span_context = opentracing.tracer.extract( opentracing.Format.HTTP_HEADERS, dict(metadata)) except (opentracing.UnsupportedFormatException, opentracing.InvalidCarrierException, @@ -37,18 +35,20 @@ def _start_span(self, servicer_context, method): ot_tags.SPAN_KIND: ot_tags.SPAN_KIND_RPC_SERVER } _add_peer_tags(servicer_context.peer(), tags) - span = self._tracer.start_span( + span = opentracing.tracer.start_span( operation_name=method, child_of=span_context, tags=tags) if error is not None: span.log_kv({'event': 'error', 'error.object': error}) - scope.set_active_span(span) return span def intercept_service(self, continuation, handler_call_details): def trace_wrapper(behavior, request_streaming, response_streaming): def new_behavior(request_or_iterator, servicer_context): - span = self._start_span(servicer_context, handler_call_details.method) - try: + span = self._start_span(servicer_context, + handler_call_details.method) + scope = opentracing.tracer.scope_manager.activate( + span, finish_on_close=not response_streaming) + with scope: if self._log_payloads: request_or_iterator = grpc_utils.log_or_wrap_request_or_iterator( span, request_streaming, request_or_iterator) @@ -61,19 +61,10 @@ def new_behavior(request_or_iterator, servicer_context): span, response_streaming, response_or_iterator ) if response_streaming: - response_or_iterator = grpc_utils.wrap_iter_with_end_span(response_or_iterator) + response_or_iterator = grpc_utils.wrap_iter_with_end_span( + response_or_iterator, span) _check_error_code(span, servicer_context) - except Exception as exc: - logging.exception(exc) - e = sys.exc_info()[0] - span.set_tag('error', True) - span.log_kv({'event': 'error', 'error.object': e}) - raise - finally: - # if the response is unary, end the span here. Otherwise - # it will be closed when the response iter completes - if not response_streaming: - scope.end_span() + return response_or_iterator return new_behavior diff --git a/grpc_opentracing/grpc_interceptor/utils.py b/grpc_opentracing/grpc_interceptor/utils.py index 7585521..0222c14 100644 --- a/grpc_opentracing/grpc_interceptor/utils.py +++ b/grpc_opentracing/grpc_interceptor/utils.py @@ -1,6 +1,3 @@ -from grpc_opentracing import scope - - class _LoggingIterator(object): def __init__(self, key, iterator, span): @@ -38,7 +35,9 @@ def log_or_wrap_response_or_iterator(span, is_service_stream, return response_or_iterator -def wrap_iter_with_end_span(response_iter): - for response in response_iter: - yield response - scope.end_span() +def wrap_iter_with_end_span(response_iter, span): + try: + for response in response_iter: + yield response + finally: + span.finish() diff --git a/grpc_opentracing/scope.py b/grpc_opentracing/scope.py deleted file mode 100644 index 1120a31..0000000 --- a/grpc_opentracing/scope.py +++ /dev/null @@ -1,20 +0,0 @@ -import logging -import threading - -_thread_local = threading.local() - - -def get_active_span(): - return getattr(_thread_local, 'active_span', None) - - -def set_active_span(active_span): - setattr(_thread_local, 'active_span', active_span) - - -def end_span(): - span = get_active_span() - if span is None: - logging.warning('No active span, cannot do end_span.') - return - span.finish() From 0097794e09e57a40629d35d62fab01f4d46f9f39 Mon Sep 17 00:00:00 2001 From: Wyatt Anderson Date: Wed, 1 Jul 2020 10:14:14 -0400 Subject: [PATCH 2/4] cleanup --- README.md | 12 -- examples/hello_world/__init__.py | 0 examples/hello_world/hello_world.proto | 23 ---- examples/hello_world/hello_world_client.py | 50 ------- examples/hello_world/hello_world_pb2.py | 134 ------------------- examples/hello_world/hello_world_pb2_grpc.py | 46 ------- examples/hello_world/hello_world_server.py | 65 --------- examples/protos/command_line.proto | 15 --- examples/protos/store.proto | 37 ----- examples/requirements.txt | 3 - 10 files changed, 385 deletions(-) delete mode 100644 examples/hello_world/__init__.py delete mode 100644 examples/hello_world/hello_world.proto delete mode 100644 examples/hello_world/hello_world_client.py delete mode 100644 examples/hello_world/hello_world_pb2.py delete mode 100644 examples/hello_world/hello_world_pb2_grpc.py delete mode 100644 examples/hello_world/hello_world_server.py delete mode 100644 examples/protos/command_line.proto delete mode 100644 examples/protos/store.proto delete mode 100644 examples/requirements.txt diff --git a/README.md b/README.md index 67ff736..bb751ae 100644 --- a/README.md +++ b/README.md @@ -42,15 +42,3 @@ server = grpc.server( interceptors=(tracer_interceptor,)) # All future RPC activity involving `server` will be automatically traced. ``` - -### Integrating with other spans. - -```python -from grpc_opentracing import scope - -span = scope.get_active_span() -span = tracer.start_span("do some thing", child_of=span) -# do some thing -span.finish() -... -``` \ No newline at end of file diff --git a/examples/hello_world/__init__.py b/examples/hello_world/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/examples/hello_world/hello_world.proto b/examples/hello_world/hello_world.proto deleted file mode 100644 index 73f2f8c..0000000 --- a/examples/hello_world/hello_world.proto +++ /dev/null @@ -1,23 +0,0 @@ -syntax = "proto3"; - -option java_multiple_files = true; -option java_package = "io.grpc.examples.helloworld"; -option java_outer_classname = "HelloWorldProto"; - -package helloworld; - -// The hello world service definition. -service Greeter { - // Say hello to the request sender (unary-unary) - rpc SayHello (HelloRequest) returns (HelloReply) {} -} - -// The request message containing the sender's name. -message HelloRequest { - string name = 1; -} - -// The response message containing the greetings -message HelloReply { - string message = 1; -} diff --git a/examples/hello_world/hello_world_client.py b/examples/hello_world/hello_world_client.py deleted file mode 100644 index 9fa9721..0000000 --- a/examples/hello_world/hello_world_client.py +++ /dev/null @@ -1,50 +0,0 @@ -from __future__ import print_function - -import argparse -import time - -import grpc -from jaeger_client import Config - -import hello_world_pb2 -import hello_world_pb2_grpc -from grpc_opentracing import open_tracing_client_interceptor -from grpc_opentracing import scope - -HOST_PORT = 'localhost:50051' - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument( - '--log_payloads', - action='store_true', - default=True, - help='log request/response objects to open-tracing spans') - args = parser.parse_args() - - config = Config( - config={ - 'sampler': { - 'type': 'const', - 'param': 1, - }, - 'logging': True, - }, - service_name='hello_world_client') - tracer = config.initialize_tracer() - tracer_interceptor = open_tracing_client_interceptor.OpenTracingClientInterceptor( - tracer, - log_payloads=args.log_payloads) - with tracer.start_span("step1") as span: - scope.set_active_span(span) - time.sleep(0.01) - channel = grpc.insecure_channel(HOST_PORT) - channel = grpc.intercept_channel(channel, tracer_interceptor) - stub = hello_world_pb2_grpc.GreeterStub(channel) - response = stub.SayHello(hello_world_pb2.HelloRequest(name='you')) - print("Message received: " + response.message) - - -if __name__ == '__main__': - main() diff --git a/examples/hello_world/hello_world_pb2.py b/examples/hello_world/hello_world_pb2.py deleted file mode 100644 index 25fcee7..0000000 --- a/examples/hello_world/hello_world_pb2.py +++ /dev/null @@ -1,134 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: hello_world.proto - -import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor.FileDescriptor( - name='hello_world.proto', - package='helloworld', - syntax='proto3', - serialized_pb=_b('\n\x11hello_world.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x30\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\x62\x06proto3') -) - - - - -_HELLOREQUEST = _descriptor.Descriptor( - name='HelloRequest', - full_name='helloworld.HelloRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='name', full_name='helloworld.HelloRequest.name', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=33, - serialized_end=61, -) - - -_HELLOREPLY = _descriptor.Descriptor( - name='HelloReply', - full_name='helloworld.HelloReply', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message', full_name='helloworld.HelloReply.message', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=63, - serialized_end=92, -) - -DESCRIPTOR.message_types_by_name['HelloRequest'] = _HELLOREQUEST -DESCRIPTOR.message_types_by_name['HelloReply'] = _HELLOREPLY -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -HelloRequest = _reflection.GeneratedProtocolMessageType('HelloRequest', (_message.Message,), dict( - DESCRIPTOR = _HELLOREQUEST, - __module__ = 'hello_world_pb2' - # @@protoc_insertion_point(class_scope:helloworld.HelloRequest) - )) -_sym_db.RegisterMessage(HelloRequest) - -HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), dict( - DESCRIPTOR = _HELLOREPLY, - __module__ = 'hello_world_pb2' - # @@protoc_insertion_point(class_scope:helloworld.HelloReply) - )) -_sym_db.RegisterMessage(HelloReply) - - -DESCRIPTOR.has_options = True -DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001')) - -_GREETER = _descriptor.ServiceDescriptor( - name='Greeter', - full_name='helloworld.Greeter', - file=DESCRIPTOR, - index=0, - options=None, - serialized_start=94, - serialized_end=167, - methods=[ - _descriptor.MethodDescriptor( - name='SayHello', - full_name='helloworld.Greeter.SayHello', - index=0, - containing_service=None, - input_type=_HELLOREQUEST, - output_type=_HELLOREPLY, - options=None, - ), -]) -_sym_db.RegisterServiceDescriptor(_GREETER) - -DESCRIPTOR.services_by_name['Greeter'] = _GREETER - -# @@protoc_insertion_point(module_scope) diff --git a/examples/hello_world/hello_world_pb2_grpc.py b/examples/hello_world/hello_world_pb2_grpc.py deleted file mode 100644 index 8202ecd..0000000 --- a/examples/hello_world/hello_world_pb2_grpc.py +++ /dev/null @@ -1,46 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -import grpc - -import hello_world_pb2 as hello__world__pb2 - - -class GreeterStub(object): - """The hello world service definition. - """ - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.SayHello = channel.unary_unary( - '/helloworld.Greeter/SayHello', - request_serializer=hello__world__pb2.HelloRequest.SerializeToString, - response_deserializer=hello__world__pb2.HelloReply.FromString, - ) - - -class GreeterServicer(object): - """The hello world service definition. - """ - - def SayHello(self, request, context): - """Say hello to the request sender (unary-unary) - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_GreeterServicer_to_server(servicer, server): - rpc_method_handlers = { - 'SayHello': grpc.unary_unary_rpc_method_handler( - servicer.SayHello, - request_deserializer=hello__world__pb2.HelloRequest.FromString, - response_serializer=hello__world__pb2.HelloReply.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'helloworld.Greeter', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) diff --git a/examples/hello_world/hello_world_server.py b/examples/hello_world/hello_world_server.py deleted file mode 100644 index fb0865c..0000000 --- a/examples/hello_world/hello_world_server.py +++ /dev/null @@ -1,65 +0,0 @@ -from __future__ import print_function - -import argparse -import time - -import grpc -from concurrent import futures -from jaeger_client import Config - -import hello_world_pb2 -import hello_world_pb2_grpc -from grpc_opentracing import open_tracing_server_interceptor -from grpc_opentracing import scope - -_ONE_DAY_IN_SECONDS = 60 * 60 * 24 - - -class HelloWorld(hello_world_pb2_grpc.GreeterServicer): - def __init__(self, tracer): - self.tracer = tracer - - def SayHello(self, request, context): - span = scope.get_active_span() - span = self.tracer.start_span("do some thing", child_of=span) - time.sleep(0.1) - span.finish() - return hello_world_pb2.HelloReply(message='Hello, %s!' % request.name) - - -def serve(): - parser = argparse.ArgumentParser() - parser.add_argument( - '--log_payloads', - action='store_true', - default=True, - help='log request/response objects to open-tracing spans') - args = parser.parse_args() - - config = Config( - config={ - 'sampler': { - 'type': 'const', - 'param': 1, - }, - 'logging': True, - }, - service_name='hello_world_server') - tracer = config.initialize_tracer() - tracer_interceptor = open_tracing_server_interceptor.OpenTracingServerInterceptor( - tracer, log_payloads=args.log_payloads) - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=10), - interceptors=(tracer_interceptor,)) - hello_world_pb2_grpc.add_GreeterServicer_to_server(HelloWorld(tracer), server) - server.add_insecure_port('[::]:50051') - server.start() - try: - while True: - time.sleep(_ONE_DAY_IN_SECONDS) - except KeyboardInterrupt: - server.stop(0) - - -if __name__ == '__main__': - serve() diff --git a/examples/protos/command_line.proto b/examples/protos/command_line.proto deleted file mode 100644 index b2367a7..0000000 --- a/examples/protos/command_line.proto +++ /dev/null @@ -1,15 +0,0 @@ -syntax = "proto3"; - -package command_line; - -service CommandLine { - rpc Echo(CommandRequest) returns (CommandResponse) {} -} - -message CommandRequest { - string text = 1; -} - -message CommandResponse { - string text = 1; -} diff --git a/examples/protos/store.proto b/examples/protos/store.proto deleted file mode 100644 index 6822d98..0000000 --- a/examples/protos/store.proto +++ /dev/null @@ -1,37 +0,0 @@ -syntax = "proto3"; - -package store; - -service Store { - rpc AddItem(AddItemRequest) returns (Empty) {} - rpc AddItems(stream AddItemRequest) returns (Empty) {} - rpc RemoveItem(RemoveItemRequest) returns (RemoveItemResponse) {} - rpc RemoveItems(stream RemoveItemRequest) returns (RemoveItemResponse) {} - rpc ListInventory(Empty) returns (stream QuantityResponse) {} - rpc QueryQuantity(QueryItemRequest) returns (QuantityResponse) {} - rpc QueryQuantities(stream QueryItemRequest) - returns (stream QuantityResponse) {} -} - -message Empty {} - -message AddItemRequest { - string name = 1; -} - -message RemoveItemRequest { - string name = 1; -} - -message RemoveItemResponse { - bool was_successful = 1; -} - -message QueryItemRequest { - string name = 1; -} - -message QuantityResponse { - string name = 1; - int32 count = 2; -} diff --git a/examples/requirements.txt b/examples/requirements.txt deleted file mode 100644 index f828ef0..0000000 --- a/examples/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -grpcio-opentracing>=1.0 -jaeger-client>=3.4.0 -protobuf From 363e9d4ce1205ea746a6e26635f6b265d41e3650 Mon Sep 17 00:00:00 2001 From: Wyatt Anderson Date: Wed, 1 Jul 2020 10:24:38 -0400 Subject: [PATCH 3/4] take tracer as param --- .../open_tracing_client_interceptor.py | 15 ++++++++------- .../open_tracing_server_interceptor.py | 9 +++++---- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/grpc_opentracing/grpc_interceptor/open_tracing_client_interceptor.py b/grpc_opentracing/grpc_interceptor/open_tracing_client_interceptor.py index 3c7d92e..017f793 100644 --- a/grpc_opentracing/grpc_interceptor/open_tracing_client_interceptor.py +++ b/grpc_opentracing/grpc_interceptor/open_tracing_client_interceptor.py @@ -21,14 +21,14 @@ class _ClientCallDetails( pass -def _inject_span_context(span, metadata): +def _inject_span_context(tracer, span, metadata): headers = {} try: - opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, headers) + tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, headers) except (opentracing.UnsupportedFormatException, opentracing.InvalidCarrierException, opentracing.SpanContextCorruptedException) as e: - logging.exception('opentracing.tracer.inject() failed') + logging.exception('tracer.inject() failed') span.log_kv({'event': 'error', 'error.object': e}) return metadata metadata = () if metadata is None else tuple(metadata) @@ -40,7 +40,8 @@ class OpenTracingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor): - def __init__(self, log_payloads): + def __init__(self, tracer, log_payloads): + self._tracer = tracer self._log_payloads = log_payloads def _intercept_call( @@ -50,8 +51,8 @@ def _intercept_call( if client_call_details.metadata is not None: metadata = client_call_details.metadata - current_span = opentracing.tracer.start_span( - child_of=opentracing.tracer.active_span, + current_span = self._tracer.start_span( + child_of=self._tracer.active_span, operation_name=client_call_details.method, tags={ ot_tags.COMPONENT: 'grpc', @@ -59,7 +60,7 @@ def _intercept_call( }, ) - metadata = _inject_span_context(current_span, metadata) + metadata = _inject_span_context(self._tracer, current_span, metadata) client_call_details = _ClientCallDetails( client_call_details.method, client_call_details.timeout, diff --git a/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py b/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py index cf34462..4ad1864 100644 --- a/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py +++ b/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py @@ -14,7 +14,8 @@ class OpenTracingServerInterceptor(grpc.ServerInterceptor): - def __init__(self, log_payloads): + def __init__(self, tracer, log_payloads): + self._tracer = tracer self._log_payloads = log_payloads def _start_span(self, servicer_context, method): @@ -23,7 +24,7 @@ def _start_span(self, servicer_context, method): metadata = servicer_context.invocation_metadata() try: if metadata: - span_context = opentracing.tracer.extract( + span_context = self._tracer.extract( opentracing.Format.HTTP_HEADERS, dict(metadata)) except (opentracing.UnsupportedFormatException, opentracing.InvalidCarrierException, @@ -35,7 +36,7 @@ def _start_span(self, servicer_context, method): ot_tags.SPAN_KIND: ot_tags.SPAN_KIND_RPC_SERVER } _add_peer_tags(servicer_context.peer(), tags) - span = opentracing.tracer.start_span( + span = self._tracer.start_span( operation_name=method, child_of=span_context, tags=tags) if error is not None: span.log_kv({'event': 'error', 'error.object': error}) @@ -46,7 +47,7 @@ def trace_wrapper(behavior, request_streaming, response_streaming): def new_behavior(request_or_iterator, servicer_context): span = self._start_span(servicer_context, handler_call_details.method) - scope = opentracing.tracer.scope_manager.activate( + scope = self._tracer.scope_manager.activate( span, finish_on_close=not response_streaming) with scope: if self._log_payloads: From 029286b175633d8bd83aed8c89a9dba11dcefb3c Mon Sep 17 00:00:00 2001 From: Wyatt Anderson Date: Fri, 10 Jul 2020 14:49:44 -0400 Subject: [PATCH 4/4] wip --- .../open_tracing_server_interceptor.py | 35 +++++++++++++------ grpc_opentracing/grpc_interceptor/utils.py | 6 ++++ 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py b/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py index 4ad1864..164e066 100644 --- a/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py +++ b/grpc_opentracing/grpc_interceptor/open_tracing_server_interceptor.py @@ -47,24 +47,37 @@ def trace_wrapper(behavior, request_streaming, response_streaming): def new_behavior(request_or_iterator, servicer_context): span = self._start_span(servicer_context, handler_call_details.method) + + if self._log_payloads: + request_or_iterator = grpc_utils.log_or_wrap_request_or_iterator( + span, request_streaming, request_or_iterator) + + # Set up a tracing scope. We don't want to finish the span on + # scope close if the response is streaming, because the + # invocation of ``behavior`` is deferred. scope = self._tracer.scope_manager.activate( span, finish_on_close=not response_streaming) + with scope: - if self._log_payloads: - request_or_iterator = grpc_utils.log_or_wrap_request_or_iterator( - span, request_streaming, request_or_iterator) # invoke the original rpc behavior response_or_iterator = behavior(request_or_iterator, servicer_context) - if self._log_payloads: - response_or_iterator = grpc_utils.log_or_wrap_response_or_iterator( - span, response_streaming, response_or_iterator - ) - if response_streaming: - response_or_iterator = grpc_utils.wrap_iter_with_end_span( - response_or_iterator, span) - _check_error_code(span, servicer_context) + if self._log_payloads: + response_or_iterator = grpc_utils.log_or_wrap_response_or_iterator( + span, response_streaming, response_or_iterator + ) + + if response_streaming: + # Wrap the response iterator in a scope; this ensures that + # child spans produced in the generator are properly + # parented to the RPC span + scope = self._tracer.scope_manager.activate( + span, finish_on_close=True) + response_or_iterator = grpc_utils.wrap_iter_with_scope( + response_or_iterator, scope) + + _check_error_code(span, servicer_context) return response_or_iterator diff --git a/grpc_opentracing/grpc_interceptor/utils.py b/grpc_opentracing/grpc_interceptor/utils.py index 0222c14..7febf77 100644 --- a/grpc_opentracing/grpc_interceptor/utils.py +++ b/grpc_opentracing/grpc_interceptor/utils.py @@ -41,3 +41,9 @@ def wrap_iter_with_end_span(response_iter, span): yield response finally: span.finish() + + +def wrap_iter_with_scope(response_iter, scope): + with scope: + for response in response_iter: + yield response