diff --git a/README.md b/README.md index b459a67..67ff736 100644 --- a/README.md +++ b/README.md @@ -11,19 +11,19 @@ pip install grpcio-opentracing ## Getting started -See the below code for basic usage or [examples/trivial](examples/trivial) for a +See the below code for basic usage or [examples/hello_world](examples/hello_world) for a complete example. ### Client-side usage example ```python +import grpc from grpc_opentracing import open_tracing_client_interceptor -from grpc_opentracing.grpcext import intercept_channel tracer = # some OpenTracing Tracer instance -interceptor = open_tracing_client_interceptor(tracer) +tracer_interceptor = open_tracing_client_interceptor.OpenTracingClientInterceptor(tracer) channel = # the grpc.Channel you created to invoke RPCs -channel = intercept_channel(channel, interceptor) +channel = grpc.intercept_channel(channel, tracer_interceptor) # All future RPC activity involving `channel` will be automatically traced. ``` @@ -31,44 +31,26 @@ channel = intercept_channel(channel, interceptor) ### Server-side usage example ```python +import grpc +from concurrent import futures from grpc_opentracing import open_tracing_server_interceptor -from grpc_opentracing.grpcext import intercept_server tracer = # some OpenTracing Tracer instance -interceptor = open_tracing_server_interceptor(tracer) -server = # the grpc.Server you created to receive RPCs -server = intercept_server(server, interceptor) - +tracer_interceptor = open_tracing_server_interceptor.OpenTracingServerInterceptor(tracer) +server = grpc.server( + futures.ThreadPoolExecutor(max_workers=10), + interceptors=(tracer_interceptor,)) # All future RPC activity involving `server` will be automatically traced. ``` ### Integrating with other spans. -`grpcio-opentracing` provides features that let you connect its span with other -tracing spans. On the client-side, you can write a class that derives from -`ActiveSpanSource` and provide it when creating the interceptor. - -```python -class CustomActiveSpanSource(ActiveSpanSource): - @classmethod - def get_active_span(self): - # your custom method of getting the active span -tracer = # some OpenTracing Tracer instance -interceptor = open_tracing_client_interceptor( - tracer, - active_span_source=CustomActiveSpanSource) -... -``` - -On the server-side, the `context` argument passed into your service methods -packages the gRPC span created on the server-side. - ```python -class CustomRpcService(...): - ... - def Method1(self, request, context): - span = context.get_active_span() - ... -``` +from grpc_opentracing import scope -See [examples/integration](examples/integration) for a complete example. +span = scope.get_active_span() +span = tracer.start_span("do some thing", child_of=span) +# do some thing +span.finish() +... +``` \ No newline at end of file diff --git a/examples/hello_world/__init__.py b/examples/hello_world/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/hello_world/hello_world.proto b/examples/hello_world/hello_world.proto new file mode 100644 index 0000000..73f2f8c --- /dev/null +++ b/examples/hello_world/hello_world.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.grpc.examples.helloworld"; +option java_outer_classname = "HelloWorldProto"; + +package helloworld; + +// The hello world service definition. +service Greeter { + // Say hello to the request sender (unary-unary) + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the sender's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/examples/hello_world/hello_world_client.py b/examples/hello_world/hello_world_client.py new file mode 100644 index 0000000..9fa9721 --- /dev/null +++ b/examples/hello_world/hello_world_client.py @@ -0,0 +1,50 @@ +from __future__ import print_function + +import argparse +import time + +import grpc +from jaeger_client import Config + +import hello_world_pb2 +import hello_world_pb2_grpc +from grpc_opentracing import open_tracing_client_interceptor +from grpc_opentracing import scope + +HOST_PORT = 'localhost:50051' + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + '--log_payloads', + action='store_true', + default=True, + help='log request/response objects to open-tracing spans') + args = parser.parse_args() + + config = Config( + config={ + 'sampler': { + 'type': 'const', + 'param': 1, + }, + 'logging': True, + }, + service_name='hello_world_client') + tracer = config.initialize_tracer() + tracer_interceptor = open_tracing_client_interceptor.OpenTracingClientInterceptor( + tracer, + log_payloads=args.log_payloads) + with tracer.start_span("step1") as span: + scope.set_active_span(span) + time.sleep(0.01) + channel = grpc.insecure_channel(HOST_PORT) + channel = grpc.intercept_channel(channel, tracer_interceptor) + stub = hello_world_pb2_grpc.GreeterStub(channel) + response = stub.SayHello(hello_world_pb2.HelloRequest(name='you')) + print("Message received: " + response.message) + + +if __name__ == '__main__': + main() diff --git a/examples/hello_world/hello_world_pb2.py b/examples/hello_world/hello_world_pb2.py new file mode 100644 index 0000000..25fcee7 --- /dev/null +++ b/examples/hello_world/hello_world_pb2.py @@ -0,0 +1,134 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: hello_world.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='hello_world.proto', + package='helloworld', + syntax='proto3', + serialized_pb=_b('\n\x11hello_world.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x30\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\x62\x06proto3') +) + + + + +_HELLOREQUEST = _descriptor.Descriptor( + name='HelloRequest', + full_name='helloworld.HelloRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='helloworld.HelloRequest.name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=33, + serialized_end=61, +) + + +_HELLOREPLY = _descriptor.Descriptor( + name='HelloReply', + full_name='helloworld.HelloReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='message', full_name='helloworld.HelloReply.message', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=63, + serialized_end=92, +) + +DESCRIPTOR.message_types_by_name['HelloRequest'] = _HELLOREQUEST +DESCRIPTOR.message_types_by_name['HelloReply'] = _HELLOREPLY +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +HelloRequest = _reflection.GeneratedProtocolMessageType('HelloRequest', (_message.Message,), dict( + DESCRIPTOR = _HELLOREQUEST, + __module__ = 'hello_world_pb2' + # @@protoc_insertion_point(class_scope:helloworld.HelloRequest) + )) +_sym_db.RegisterMessage(HelloRequest) + +HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), dict( + DESCRIPTOR = _HELLOREPLY, + __module__ = 'hello_world_pb2' + # @@protoc_insertion_point(class_scope:helloworld.HelloReply) + )) +_sym_db.RegisterMessage(HelloReply) + + +DESCRIPTOR.has_options = True +DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001')) + +_GREETER = _descriptor.ServiceDescriptor( + name='Greeter', + full_name='helloworld.Greeter', + file=DESCRIPTOR, + index=0, + options=None, + serialized_start=94, + serialized_end=167, + methods=[ + _descriptor.MethodDescriptor( + name='SayHello', + full_name='helloworld.Greeter.SayHello', + index=0, + containing_service=None, + input_type=_HELLOREQUEST, + output_type=_HELLOREPLY, + options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_GREETER) + +DESCRIPTOR.services_by_name['Greeter'] = _GREETER + +# @@protoc_insertion_point(module_scope) diff --git a/examples/hello_world/hello_world_pb2_grpc.py b/examples/hello_world/hello_world_pb2_grpc.py new file mode 100644 index 0000000..8202ecd --- /dev/null +++ b/examples/hello_world/hello_world_pb2_grpc.py @@ -0,0 +1,46 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import hello_world_pb2 as hello__world__pb2 + + +class GreeterStub(object): + """The hello world service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SayHello = channel.unary_unary( + '/helloworld.Greeter/SayHello', + request_serializer=hello__world__pb2.HelloRequest.SerializeToString, + response_deserializer=hello__world__pb2.HelloReply.FromString, + ) + + +class GreeterServicer(object): + """The hello world service definition. + """ + + def SayHello(self, request, context): + """Say hello to the request sender (unary-unary) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_GreeterServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SayHello': grpc.unary_unary_rpc_method_handler( + servicer.SayHello, + request_deserializer=hello__world__pb2.HelloRequest.FromString, + response_serializer=hello__world__pb2.HelloReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'helloworld.Greeter', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/examples/integration/integration_server.py b/examples/hello_world/hello_world_server.py similarity index 55% rename from examples/integration/integration_server.py rename to examples/hello_world/hello_world_server.py index c76f544..fb0865c 100644 --- a/examples/integration/integration_server.py +++ b/examples/hello_world/hello_world_server.py @@ -1,30 +1,30 @@ from __future__ import print_function -import time import argparse +import time import grpc from concurrent import futures from jaeger_client import Config +import hello_world_pb2 +import hello_world_pb2_grpc from grpc_opentracing import open_tracing_server_interceptor -from grpc_opentracing.grpcext import intercept_server - -import command_line_pb2 +from grpc_opentracing import scope _ONE_DAY_IN_SECONDS = 60 * 60 * 24 -class CommandLine(command_line_pb2.CommandLineServicer): - +class HelloWorld(hello_world_pb2_grpc.GreeterServicer): def __init__(self, tracer): - self._tracer = tracer + self.tracer = tracer - def Echo(self, request, context): - with self._tracer.start_span( - 'command_line_server_span', - child_of=context.get_active_span().context): - return command_line_pb2.CommandResponse(text=request.text) + def SayHello(self, request, context): + span = scope.get_active_span() + span = self.tracer.start_span("do some thing", child_of=span) + time.sleep(0.1) + span.finish() + return hello_world_pb2.HelloReply(message='Hello, %s!' % request.name) def serve(): @@ -32,6 +32,7 @@ def serve(): parser.add_argument( '--log_payloads', action='store_true', + default=True, help='log request/response objects to open-tracing spans') args = parser.parse_args() @@ -43,15 +44,14 @@ def serve(): }, 'logging': True, }, - service_name='integration-server') + service_name='hello_world_server') tracer = config.initialize_tracer() - tracer_interceptor = open_tracing_server_interceptor( + tracer_interceptor = open_tracing_server_interceptor.OpenTracingServerInterceptor( tracer, log_payloads=args.log_payloads) - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - server = intercept_server(server, tracer_interceptor) - - command_line_pb2.add_CommandLineServicer_to_server( - CommandLine(tracer), server) + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=10), + interceptors=(tracer_interceptor,)) + hello_world_pb2_grpc.add_GreeterServicer_to_server(HelloWorld(tracer), server) server.add_insecure_port('[::]:50051') server.start() try: @@ -60,10 +60,6 @@ def serve(): except KeyboardInterrupt: server.stop(0) - time.sleep(2) - tracer.close() - time.sleep(2) - if __name__ == '__main__': serve() diff --git a/examples/integration/README.md b/examples/integration/README.md deleted file mode 100644 index b69df9f..0000000 --- a/examples/integration/README.md +++ /dev/null @@ -1,8 +0,0 @@ -An example showing how to connect gRPC's OpenTracing spans to other OpenTracing -spans. - -## Usage -``` -python integration_server.py & -python integration_client.py -``` diff --git a/examples/integration/command_line_pb2.py b/examples/integration/command_line_pb2.py deleted file mode 100644 index 42c7fc4..0000000 --- a/examples/integration/command_line_pb2.py +++ /dev/null @@ -1,213 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: command_line.proto - -import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor.FileDescriptor( - name='command_line.proto', - package='command_line', - syntax='proto3', - serialized_pb=_b('\n\x12\x63ommand_line.proto\x12\x0c\x63ommand_line\"\x1e\n\x0e\x43ommandRequest\x12\x0c\n\x04text\x18\x01 \x01(\t\"\x1f\n\x0f\x43ommandResponse\x12\x0c\n\x04text\x18\x01 \x01(\t2T\n\x0b\x43ommandLine\x12\x45\n\x04\x45\x63ho\x12\x1c.command_line.CommandRequest\x1a\x1d.command_line.CommandResponse\"\x00\x62\x06proto3') -) -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - - - - -_COMMANDREQUEST = _descriptor.Descriptor( - name='CommandRequest', - full_name='command_line.CommandRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='text', full_name='command_line.CommandRequest.text', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=36, - serialized_end=66, -) - - -_COMMANDRESPONSE = _descriptor.Descriptor( - name='CommandResponse', - full_name='command_line.CommandResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='text', full_name='command_line.CommandResponse.text', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=68, - serialized_end=99, -) - -DESCRIPTOR.message_types_by_name['CommandRequest'] = _COMMANDREQUEST -DESCRIPTOR.message_types_by_name['CommandResponse'] = _COMMANDRESPONSE - -CommandRequest = _reflection.GeneratedProtocolMessageType('CommandRequest', (_message.Message,), dict( - DESCRIPTOR = _COMMANDREQUEST, - __module__ = 'command_line_pb2' - # @@protoc_insertion_point(class_scope:command_line.CommandRequest) - )) -_sym_db.RegisterMessage(CommandRequest) - -CommandResponse = _reflection.GeneratedProtocolMessageType('CommandResponse', (_message.Message,), dict( - DESCRIPTOR = _COMMANDRESPONSE, - __module__ = 'command_line_pb2' - # @@protoc_insertion_point(class_scope:command_line.CommandResponse) - )) -_sym_db.RegisterMessage(CommandResponse) - - -try: - # THESE ELEMENTS WILL BE DEPRECATED. - # Please use the generated *_pb2_grpc.py files instead. - import grpc - from grpc.framework.common import cardinality - from grpc.framework.interfaces.face import utilities as face_utilities - from grpc.beta import implementations as beta_implementations - from grpc.beta import interfaces as beta_interfaces - - - class CommandLineStub(object): - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.Echo = channel.unary_unary( - '/command_line.CommandLine/Echo', - request_serializer=CommandRequest.SerializeToString, - response_deserializer=CommandResponse.FromString, - ) - - - class CommandLineServicer(object): - - def Echo(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - - def add_CommandLineServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Echo': grpc.unary_unary_rpc_method_handler( - servicer.Echo, - request_deserializer=CommandRequest.FromString, - response_serializer=CommandResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'command_line.CommandLine', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - class BetaCommandLineServicer(object): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This class was generated - only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0.""" - def Echo(self, request, context): - context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) - - - class BetaCommandLineStub(object): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This class was generated - only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0.""" - def Echo(self, request, timeout, metadata=None, with_call=False, protocol_options=None): - raise NotImplementedError() - Echo.future = None - - - def beta_create_CommandLine_server(servicer, pool=None, pool_size=None, default_timeout=None, maximum_timeout=None): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This function was - generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0""" - request_deserializers = { - ('command_line.CommandLine', 'Echo'): CommandRequest.FromString, - } - response_serializers = { - ('command_line.CommandLine', 'Echo'): CommandResponse.SerializeToString, - } - method_implementations = { - ('command_line.CommandLine', 'Echo'): face_utilities.unary_unary_inline(servicer.Echo), - } - server_options = beta_implementations.server_options(request_deserializers=request_deserializers, response_serializers=response_serializers, thread_pool=pool, thread_pool_size=pool_size, default_timeout=default_timeout, maximum_timeout=maximum_timeout) - return beta_implementations.server(method_implementations, options=server_options) - - - def beta_create_CommandLine_stub(channel, host=None, metadata_transformer=None, pool=None, pool_size=None): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This function was - generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0""" - request_serializers = { - ('command_line.CommandLine', 'Echo'): CommandRequest.SerializeToString, - } - response_deserializers = { - ('command_line.CommandLine', 'Echo'): CommandResponse.FromString, - } - cardinalities = { - 'Echo': cardinality.Cardinality.UNARY_UNARY, - } - stub_options = beta_implementations.stub_options(host=host, metadata_transformer=metadata_transformer, request_serializers=request_serializers, response_deserializers=response_deserializers, thread_pool=pool, thread_pool_size=pool_size) - return beta_implementations.dynamic_stub(channel, 'command_line.CommandLine', cardinalities, options=stub_options) -except ImportError: - pass -# @@protoc_insertion_point(module_scope) diff --git a/examples/integration/command_line_pb2_grpc.py b/examples/integration/command_line_pb2_grpc.py deleted file mode 100644 index 6b9b82d..0000000 --- a/examples/integration/command_line_pb2_grpc.py +++ /dev/null @@ -1,42 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -import grpc -from grpc.framework.common import cardinality -from grpc.framework.interfaces.face import utilities as face_utilities - -import command_line_pb2 as command__line__pb2 - - -class CommandLineStub(object): - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.Echo = channel.unary_unary( - '/command_line.CommandLine/Echo', - request_serializer=command__line__pb2.CommandRequest.SerializeToString, - response_deserializer=command__line__pb2.CommandResponse.FromString, - ) - - -class CommandLineServicer(object): - - def Echo(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_CommandLineServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Echo': grpc.unary_unary_rpc_method_handler( - servicer.Echo, - request_deserializer=command__line__pb2.CommandRequest.FromString, - response_serializer=command__line__pb2.CommandResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'command_line.CommandLine', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) diff --git a/examples/integration/integration_client.py b/examples/integration/integration_client.py deleted file mode 100644 index a133fa2..0000000 --- a/examples/integration/integration_client.py +++ /dev/null @@ -1,67 +0,0 @@ -from __future__ import print_function - -import time -import argparse - -import grpc -from jaeger_client import Config - -from grpc_opentracing import open_tracing_client_interceptor, ActiveSpanSource -from grpc_opentracing.grpcext import intercept_channel - -import command_line_pb2 - - -class FixedActiveSpanSource(ActiveSpanSource): - - def __init__(self): - self.active_span = None - - def get_active_span(self): - return self.active_span - - -def echo(tracer, active_span_source, stub): - with tracer.start_span('command_line_client_span') as span: - active_span_source.active_span = span - response = stub.Echo( - command_line_pb2.CommandRequest(text='Hello, hello')) - print(response.text) - - -def run(): - parser = argparse.ArgumentParser() - parser.add_argument( - '--log_payloads', - action='store_true', - help='log request/response objects to open-tracing spans') - args = parser.parse_args() - - config = Config( - config={ - 'sampler': { - 'type': 'const', - 'param': 1, - }, - 'logging': True, - }, - service_name='integration-client') - tracer = config.initialize_tracer() - active_span_source = FixedActiveSpanSource() - tracer_interceptor = open_tracing_client_interceptor( - tracer, - active_span_source=active_span_source, - log_payloads=args.log_payloads) - channel = grpc.insecure_channel('localhost:50051') - channel = intercept_channel(channel, tracer_interceptor) - stub = command_line_pb2.CommandLineStub(channel) - - echo(tracer, active_span_source, stub) - - time.sleep(2) - tracer.close() - time.sleep(2) - - -if __name__ == '__main__': - run() diff --git a/examples/integration/run_codegen.py b/examples/integration/run_codegen.py deleted file mode 100644 index 5f64324..0000000 --- a/examples/integration/run_codegen.py +++ /dev/null @@ -1,4 +0,0 @@ -from grpc_tools import protoc - -protoc.main(('', '-I../protos', '--python_out=.', '--grpc_python_out=.', - '../protos/command_line.proto')) diff --git a/examples/store/README.md b/examples/store/README.md deleted file mode 100644 index 499204b..0000000 --- a/examples/store/README.md +++ /dev/null @@ -1,8 +0,0 @@ -An example that demonstrates how the OpenTracing extensions work with -asynchronous and streaming RPC calls. - -## Usage -``` -python store_server.py & -python store_client.py -``` diff --git a/examples/store/run_codegen.py b/examples/store/run_codegen.py deleted file mode 100644 index 3c1808b..0000000 --- a/examples/store/run_codegen.py +++ /dev/null @@ -1,4 +0,0 @@ -from grpc_tools import protoc - -protoc.main(('', '-I../protos', '--python_out=.', '--grpc_python_out=.', - '../protos/store.proto')) diff --git a/examples/store/store_client.py b/examples/store/store_client.py deleted file mode 100644 index de8f231..0000000 --- a/examples/store/store_client.py +++ /dev/null @@ -1,211 +0,0 @@ -# A OpenTraced client for a Python service that implements the store interface. -from __future__ import print_function - -import time -import argparse -from builtins import input, range - -import grpc -from jaeger_client import Config - -from grpc_opentracing import open_tracing_client_interceptor, \ - SpanDecorator -from grpc_opentracing.grpcext import intercept_channel - -import store_pb2 - - -class CommandExecuter(object): - - def __init__(self, stub): - self._stub = stub - - def _execute_rpc(self, method, via, timeout, request_or_iterator): - if via == 'future': - result = getattr(self._stub, method).future(request_or_iterator, - timeout) - return result.result() - elif via == 'with_call': - return getattr(self._stub, method).with_call(request_or_iterator, - timeout)[0] - else: - return getattr(self._stub, method)(request_or_iterator, timeout) - - def do_stock_item(self, via, timeout, arguments): - if len(arguments) != 1: - print('must input a single item') - return - request = store_pb2.AddItemRequest(name=arguments[0]) - self._execute_rpc('AddItem', via, timeout, request) - - def do_stock_items(self, via, timeout, arguments): - if not arguments: - print('must input at least one item') - return - requests = [store_pb2.AddItemRequest(name=name) for name in arguments] - self._execute_rpc('AddItems', via, timeout, iter(requests)) - - def do_sell_item(self, via, timeout, arguments): - if len(arguments) != 1: - print('must input a single item') - return - request = store_pb2.RemoveItemRequest(name=arguments[0]) - response = self._execute_rpc('RemoveItem', via, timeout, request) - if not response.was_successful: - print('unable to sell') - - def do_sell_items(self, via, timeout, arguments): - if not arguments: - print('must input at least one item') - return - requests = [ - store_pb2.RemoveItemRequest(name=name) for name in arguments - ] - response = self._execute_rpc('RemoveItems', via, timeout, - iter(requests)) - if not response.was_successful: - print('unable to sell') - - def do_inventory(self, via, timeout, arguments): - if arguments: - print('inventory does not take any arguments') - return - if via != 'functor': - print('inventory can only be called via functor') - return - request = store_pb2.Empty() - result = self._execute_rpc('ListInventory', via, timeout, request) - for query in result: - print(query.name, '\t', query.count) - - def do_query_item(self, via, timeout, arguments): - if len(arguments) != 1: - print('must input a single item') - return - request = store_pb2.QueryItemRequest(name=arguments[0]) - query = self._execute_rpc('QueryQuantity', via, timeout, request) - print(query.name, '\t', query.count) - - def do_query_items(self, via, timeout, arguments): - if not arguments: - print('must input at least one item') - return - if via != 'functor': - print('query_items can only be called via functor') - return - requests = [store_pb2.QueryItemRequest(name=name) for name in arguments] - result = self._execute_rpc('QueryQuantities', via, timeout, - iter(requests)) - for query in result: - print(query.name, '\t', query.count) - - -def execute_command(command_executer, command, arguments): - via = 'functor' - timeout = None - for argument_index in range(0, len(arguments), 2): - argument = arguments[argument_index] - if argument == '--via' and argument_index + 1 < len(arguments): - if via not in ('functor', 'with_call', 'future'): - print('invalid --via option') - return - via = arguments[argument_index + 1] - elif argument == '--timeout' and argument_index + 1 < len(arguments): - timeout = float(arguments[argument_index + 1]) - else: - arguments = arguments[argument_index:] - break - - try: - getattr(command_executer, 'do_' + command)(via, timeout, arguments) - except AttributeError: - print('unknown command: \"%s\"' % command) - - -INSTRUCTIONS = \ -"""Enter commands to interact with the store service: - - stock_item Stock a single item. - stock_items Stock one or more items. - sell_item Sell a single item. - sell_items Sell one or more items. - inventory List the store's inventory. - query_item Query the inventory for a single item. - query_items Query the inventory for one or more items. - -You can also optionally provide a --via argument to instruct the RPC to be -initiated via either the functor, with_call, or future method; or provide a ---timeout argument to set a deadline for the RPC to be completed. - -Example: - > stock_item apple - > stock_items --via future apple milk - > inventory - apple 2 - milk 1 -""" - - -def read_and_execute(command_executer): - print(INSTRUCTIONS) - while True: - try: - line = input('> ') - components = line.split() - if not components: - continue - command = components[0] - arguments = components[1:] - execute_command(command_executer, command, arguments) - except EOFError: - break - - -class StoreSpanDecorator(SpanDecorator): - - def __call__(self, span, rpc_info): - span.set_tag('grpc.method', rpc_info.full_method) - span.set_tag('grpc.headers', str(rpc_info.metadata)) - span.set_tag('grpc.deadline', str(rpc_info.timeout)) - - -def run(): - parser = argparse.ArgumentParser() - parser.add_argument( - '--log_payloads', - action='store_true', - help='log request/response objects to open-tracing spans') - parser.add_argument( - '--include_grpc_tags', - action='store_true', - help='set gRPC-specific tags on spans') - args = parser.parse_args() - - config = Config( - config={ - 'sampler': { - 'type': 'const', - 'param': 1, - }, - 'logging': True, - }, - service_name='store-client') - tracer = config.initialize_tracer() - span_decorator = None - if args.include_grpc_tags: - span_decorator = StoreSpanDecorator() - tracer_interceptor = open_tracing_client_interceptor( - tracer, log_payloads=args.log_payloads, span_decorator=span_decorator) - channel = grpc.insecure_channel('localhost:50051') - channel = intercept_channel(channel, tracer_interceptor) - stub = store_pb2.StoreStub(channel) - - read_and_execute(CommandExecuter(stub)) - - time.sleep(2) - tracer.close() - time.sleep(2) - - -if __name__ == '__main__': - run() diff --git a/examples/store/store_pb2.py b/examples/store/store_pb2.py deleted file mode 100644 index 962c0dc..0000000 --- a/examples/store/store_pb2.py +++ /dev/null @@ -1,523 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: store.proto - -import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor.FileDescriptor( - name='store.proto', - package='store', - syntax='proto3', - serialized_pb=_b('\n\x0bstore.proto\x12\x05store\"\x07\n\x05\x45mpty\"\x1e\n\x0e\x41\x64\x64ItemRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"!\n\x11RemoveItemRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\",\n\x12RemoveItemResponse\x12\x16\n\x0ewas_successful\x18\x01 \x01(\x08\" \n\x10QueryItemRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"/\n\x10QuantityResponse\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x63ount\x18\x02 \x01(\x05\x32\xc7\x03\n\x05Store\x12\x30\n\x07\x41\x64\x64Item\x12\x15.store.AddItemRequest\x1a\x0c.store.Empty\"\x00\x12\x33\n\x08\x41\x64\x64Items\x12\x15.store.AddItemRequest\x1a\x0c.store.Empty\"\x00(\x01\x12\x43\n\nRemoveItem\x12\x18.store.RemoveItemRequest\x1a\x19.store.RemoveItemResponse\"\x00\x12\x46\n\x0bRemoveItems\x12\x18.store.RemoveItemRequest\x1a\x19.store.RemoveItemResponse\"\x00(\x01\x12:\n\rListInventory\x12\x0c.store.Empty\x1a\x17.store.QuantityResponse\"\x00\x30\x01\x12\x43\n\rQueryQuantity\x12\x17.store.QueryItemRequest\x1a\x17.store.QuantityResponse\"\x00\x12I\n\x0fQueryQuantities\x12\x17.store.QueryItemRequest\x1a\x17.store.QuantityResponse\"\x00(\x01\x30\x01\x62\x06proto3') -) -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - - - - -_EMPTY = _descriptor.Descriptor( - name='Empty', - full_name='store.Empty', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=22, - serialized_end=29, -) - - -_ADDITEMREQUEST = _descriptor.Descriptor( - name='AddItemRequest', - full_name='store.AddItemRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='name', full_name='store.AddItemRequest.name', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=31, - serialized_end=61, -) - - -_REMOVEITEMREQUEST = _descriptor.Descriptor( - name='RemoveItemRequest', - full_name='store.RemoveItemRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='name', full_name='store.RemoveItemRequest.name', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=63, - serialized_end=96, -) - - -_REMOVEITEMRESPONSE = _descriptor.Descriptor( - name='RemoveItemResponse', - full_name='store.RemoveItemResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='was_successful', full_name='store.RemoveItemResponse.was_successful', index=0, - number=1, type=8, cpp_type=7, label=1, - has_default_value=False, default_value=False, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=98, - serialized_end=142, -) - - -_QUERYITEMREQUEST = _descriptor.Descriptor( - name='QueryItemRequest', - full_name='store.QueryItemRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='name', full_name='store.QueryItemRequest.name', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=144, - serialized_end=176, -) - - -_QUANTITYRESPONSE = _descriptor.Descriptor( - name='QuantityResponse', - full_name='store.QuantityResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='name', full_name='store.QuantityResponse.name', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='count', full_name='store.QuantityResponse.count', index=1, - number=2, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=178, - serialized_end=225, -) - -DESCRIPTOR.message_types_by_name['Empty'] = _EMPTY -DESCRIPTOR.message_types_by_name['AddItemRequest'] = _ADDITEMREQUEST -DESCRIPTOR.message_types_by_name['RemoveItemRequest'] = _REMOVEITEMREQUEST -DESCRIPTOR.message_types_by_name['RemoveItemResponse'] = _REMOVEITEMRESPONSE -DESCRIPTOR.message_types_by_name['QueryItemRequest'] = _QUERYITEMREQUEST -DESCRIPTOR.message_types_by_name['QuantityResponse'] = _QUANTITYRESPONSE - -Empty = _reflection.GeneratedProtocolMessageType('Empty', (_message.Message,), dict( - DESCRIPTOR = _EMPTY, - __module__ = 'store_pb2' - # @@protoc_insertion_point(class_scope:store.Empty) - )) -_sym_db.RegisterMessage(Empty) - -AddItemRequest = _reflection.GeneratedProtocolMessageType('AddItemRequest', (_message.Message,), dict( - DESCRIPTOR = _ADDITEMREQUEST, - __module__ = 'store_pb2' - # @@protoc_insertion_point(class_scope:store.AddItemRequest) - )) -_sym_db.RegisterMessage(AddItemRequest) - -RemoveItemRequest = _reflection.GeneratedProtocolMessageType('RemoveItemRequest', (_message.Message,), dict( - DESCRIPTOR = _REMOVEITEMREQUEST, - __module__ = 'store_pb2' - # @@protoc_insertion_point(class_scope:store.RemoveItemRequest) - )) -_sym_db.RegisterMessage(RemoveItemRequest) - -RemoveItemResponse = _reflection.GeneratedProtocolMessageType('RemoveItemResponse', (_message.Message,), dict( - DESCRIPTOR = _REMOVEITEMRESPONSE, - __module__ = 'store_pb2' - # @@protoc_insertion_point(class_scope:store.RemoveItemResponse) - )) -_sym_db.RegisterMessage(RemoveItemResponse) - -QueryItemRequest = _reflection.GeneratedProtocolMessageType('QueryItemRequest', (_message.Message,), dict( - DESCRIPTOR = _QUERYITEMREQUEST, - __module__ = 'store_pb2' - # @@protoc_insertion_point(class_scope:store.QueryItemRequest) - )) -_sym_db.RegisterMessage(QueryItemRequest) - -QuantityResponse = _reflection.GeneratedProtocolMessageType('QuantityResponse', (_message.Message,), dict( - DESCRIPTOR = _QUANTITYRESPONSE, - __module__ = 'store_pb2' - # @@protoc_insertion_point(class_scope:store.QuantityResponse) - )) -_sym_db.RegisterMessage(QuantityResponse) - - -try: - # THESE ELEMENTS WILL BE DEPRECATED. - # Please use the generated *_pb2_grpc.py files instead. - import grpc - from grpc.framework.common import cardinality - from grpc.framework.interfaces.face import utilities as face_utilities - from grpc.beta import implementations as beta_implementations - from grpc.beta import interfaces as beta_interfaces - - - class StoreStub(object): - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.AddItem = channel.unary_unary( - '/store.Store/AddItem', - request_serializer=AddItemRequest.SerializeToString, - response_deserializer=Empty.FromString, - ) - self.AddItems = channel.stream_unary( - '/store.Store/AddItems', - request_serializer=AddItemRequest.SerializeToString, - response_deserializer=Empty.FromString, - ) - self.RemoveItem = channel.unary_unary( - '/store.Store/RemoveItem', - request_serializer=RemoveItemRequest.SerializeToString, - response_deserializer=RemoveItemResponse.FromString, - ) - self.RemoveItems = channel.stream_unary( - '/store.Store/RemoveItems', - request_serializer=RemoveItemRequest.SerializeToString, - response_deserializer=RemoveItemResponse.FromString, - ) - self.ListInventory = channel.unary_stream( - '/store.Store/ListInventory', - request_serializer=Empty.SerializeToString, - response_deserializer=QuantityResponse.FromString, - ) - self.QueryQuantity = channel.unary_unary( - '/store.Store/QueryQuantity', - request_serializer=QueryItemRequest.SerializeToString, - response_deserializer=QuantityResponse.FromString, - ) - self.QueryQuantities = channel.stream_stream( - '/store.Store/QueryQuantities', - request_serializer=QueryItemRequest.SerializeToString, - response_deserializer=QuantityResponse.FromString, - ) - - - class StoreServicer(object): - - def AddItem(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def AddItems(self, request_iterator, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def RemoveItem(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def RemoveItems(self, request_iterator, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ListInventory(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def QueryQuantity(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def QueryQuantities(self, request_iterator, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - - def add_StoreServicer_to_server(servicer, server): - rpc_method_handlers = { - 'AddItem': grpc.unary_unary_rpc_method_handler( - servicer.AddItem, - request_deserializer=AddItemRequest.FromString, - response_serializer=Empty.SerializeToString, - ), - 'AddItems': grpc.stream_unary_rpc_method_handler( - servicer.AddItems, - request_deserializer=AddItemRequest.FromString, - response_serializer=Empty.SerializeToString, - ), - 'RemoveItem': grpc.unary_unary_rpc_method_handler( - servicer.RemoveItem, - request_deserializer=RemoveItemRequest.FromString, - response_serializer=RemoveItemResponse.SerializeToString, - ), - 'RemoveItems': grpc.stream_unary_rpc_method_handler( - servicer.RemoveItems, - request_deserializer=RemoveItemRequest.FromString, - response_serializer=RemoveItemResponse.SerializeToString, - ), - 'ListInventory': grpc.unary_stream_rpc_method_handler( - servicer.ListInventory, - request_deserializer=Empty.FromString, - response_serializer=QuantityResponse.SerializeToString, - ), - 'QueryQuantity': grpc.unary_unary_rpc_method_handler( - servicer.QueryQuantity, - request_deserializer=QueryItemRequest.FromString, - response_serializer=QuantityResponse.SerializeToString, - ), - 'QueryQuantities': grpc.stream_stream_rpc_method_handler( - servicer.QueryQuantities, - request_deserializer=QueryItemRequest.FromString, - response_serializer=QuantityResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'store.Store', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - class BetaStoreServicer(object): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This class was generated - only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0.""" - def AddItem(self, request, context): - context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) - def AddItems(self, request_iterator, context): - context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) - def RemoveItem(self, request, context): - context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) - def RemoveItems(self, request_iterator, context): - context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) - def ListInventory(self, request, context): - context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) - def QueryQuantity(self, request, context): - context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) - def QueryQuantities(self, request_iterator, context): - context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) - - - class BetaStoreStub(object): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This class was generated - only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0.""" - def AddItem(self, request, timeout, metadata=None, with_call=False, protocol_options=None): - raise NotImplementedError() - AddItem.future = None - def AddItems(self, request_iterator, timeout, metadata=None, with_call=False, protocol_options=None): - raise NotImplementedError() - AddItems.future = None - def RemoveItem(self, request, timeout, metadata=None, with_call=False, protocol_options=None): - raise NotImplementedError() - RemoveItem.future = None - def RemoveItems(self, request_iterator, timeout, metadata=None, with_call=False, protocol_options=None): - raise NotImplementedError() - RemoveItems.future = None - def ListInventory(self, request, timeout, metadata=None, with_call=False, protocol_options=None): - raise NotImplementedError() - def QueryQuantity(self, request, timeout, metadata=None, with_call=False, protocol_options=None): - raise NotImplementedError() - QueryQuantity.future = None - def QueryQuantities(self, request_iterator, timeout, metadata=None, with_call=False, protocol_options=None): - raise NotImplementedError() - - - def beta_create_Store_server(servicer, pool=None, pool_size=None, default_timeout=None, maximum_timeout=None): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This function was - generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0""" - request_deserializers = { - ('store.Store', 'AddItem'): AddItemRequest.FromString, - ('store.Store', 'AddItems'): AddItemRequest.FromString, - ('store.Store', 'ListInventory'): Empty.FromString, - ('store.Store', 'QueryQuantities'): QueryItemRequest.FromString, - ('store.Store', 'QueryQuantity'): QueryItemRequest.FromString, - ('store.Store', 'RemoveItem'): RemoveItemRequest.FromString, - ('store.Store', 'RemoveItems'): RemoveItemRequest.FromString, - } - response_serializers = { - ('store.Store', 'AddItem'): Empty.SerializeToString, - ('store.Store', 'AddItems'): Empty.SerializeToString, - ('store.Store', 'ListInventory'): QuantityResponse.SerializeToString, - ('store.Store', 'QueryQuantities'): QuantityResponse.SerializeToString, - ('store.Store', 'QueryQuantity'): QuantityResponse.SerializeToString, - ('store.Store', 'RemoveItem'): RemoveItemResponse.SerializeToString, - ('store.Store', 'RemoveItems'): RemoveItemResponse.SerializeToString, - } - method_implementations = { - ('store.Store', 'AddItem'): face_utilities.unary_unary_inline(servicer.AddItem), - ('store.Store', 'AddItems'): face_utilities.stream_unary_inline(servicer.AddItems), - ('store.Store', 'ListInventory'): face_utilities.unary_stream_inline(servicer.ListInventory), - ('store.Store', 'QueryQuantities'): face_utilities.stream_stream_inline(servicer.QueryQuantities), - ('store.Store', 'QueryQuantity'): face_utilities.unary_unary_inline(servicer.QueryQuantity), - ('store.Store', 'RemoveItem'): face_utilities.unary_unary_inline(servicer.RemoveItem), - ('store.Store', 'RemoveItems'): face_utilities.stream_unary_inline(servicer.RemoveItems), - } - server_options = beta_implementations.server_options(request_deserializers=request_deserializers, response_serializers=response_serializers, thread_pool=pool, thread_pool_size=pool_size, default_timeout=default_timeout, maximum_timeout=maximum_timeout) - return beta_implementations.server(method_implementations, options=server_options) - - - def beta_create_Store_stub(channel, host=None, metadata_transformer=None, pool=None, pool_size=None): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This function was - generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0""" - request_serializers = { - ('store.Store', 'AddItem'): AddItemRequest.SerializeToString, - ('store.Store', 'AddItems'): AddItemRequest.SerializeToString, - ('store.Store', 'ListInventory'): Empty.SerializeToString, - ('store.Store', 'QueryQuantities'): QueryItemRequest.SerializeToString, - ('store.Store', 'QueryQuantity'): QueryItemRequest.SerializeToString, - ('store.Store', 'RemoveItem'): RemoveItemRequest.SerializeToString, - ('store.Store', 'RemoveItems'): RemoveItemRequest.SerializeToString, - } - response_deserializers = { - ('store.Store', 'AddItem'): Empty.FromString, - ('store.Store', 'AddItems'): Empty.FromString, - ('store.Store', 'ListInventory'): QuantityResponse.FromString, - ('store.Store', 'QueryQuantities'): QuantityResponse.FromString, - ('store.Store', 'QueryQuantity'): QuantityResponse.FromString, - ('store.Store', 'RemoveItem'): RemoveItemResponse.FromString, - ('store.Store', 'RemoveItems'): RemoveItemResponse.FromString, - } - cardinalities = { - 'AddItem': cardinality.Cardinality.UNARY_UNARY, - 'AddItems': cardinality.Cardinality.STREAM_UNARY, - 'ListInventory': cardinality.Cardinality.UNARY_STREAM, - 'QueryQuantities': cardinality.Cardinality.STREAM_STREAM, - 'QueryQuantity': cardinality.Cardinality.UNARY_UNARY, - 'RemoveItem': cardinality.Cardinality.UNARY_UNARY, - 'RemoveItems': cardinality.Cardinality.STREAM_UNARY, - } - stub_options = beta_implementations.stub_options(host=host, metadata_transformer=metadata_transformer, request_serializers=request_serializers, response_deserializers=response_deserializers, thread_pool=pool, thread_pool_size=pool_size) - return beta_implementations.dynamic_stub(channel, 'store.Store', cardinalities, options=stub_options) -except ImportError: - pass -# @@protoc_insertion_point(module_scope) diff --git a/examples/store/store_pb2_grpc.py b/examples/store/store_pb2_grpc.py deleted file mode 100644 index 84fdc8f..0000000 --- a/examples/store/store_pb2_grpc.py +++ /dev/null @@ -1,132 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -import grpc -from grpc.framework.common import cardinality -from grpc.framework.interfaces.face import utilities as face_utilities - -import store_pb2 as store__pb2 - - -class StoreStub(object): - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.AddItem = channel.unary_unary( - '/store.Store/AddItem', - request_serializer=store__pb2.AddItemRequest.SerializeToString, - response_deserializer=store__pb2.Empty.FromString, - ) - self.AddItems = channel.stream_unary( - '/store.Store/AddItems', - request_serializer=store__pb2.AddItemRequest.SerializeToString, - response_deserializer=store__pb2.Empty.FromString, - ) - self.RemoveItem = channel.unary_unary( - '/store.Store/RemoveItem', - request_serializer=store__pb2.RemoveItemRequest.SerializeToString, - response_deserializer=store__pb2.RemoveItemResponse.FromString, - ) - self.RemoveItems = channel.stream_unary( - '/store.Store/RemoveItems', - request_serializer=store__pb2.RemoveItemRequest.SerializeToString, - response_deserializer=store__pb2.RemoveItemResponse.FromString, - ) - self.ListInventory = channel.unary_stream( - '/store.Store/ListInventory', - request_serializer=store__pb2.Empty.SerializeToString, - response_deserializer=store__pb2.QuantityResponse.FromString, - ) - self.QueryQuantity = channel.unary_unary( - '/store.Store/QueryQuantity', - request_serializer=store__pb2.QueryItemRequest.SerializeToString, - response_deserializer=store__pb2.QuantityResponse.FromString, - ) - self.QueryQuantities = channel.stream_stream( - '/store.Store/QueryQuantities', - request_serializer=store__pb2.QueryItemRequest.SerializeToString, - response_deserializer=store__pb2.QuantityResponse.FromString, - ) - - -class StoreServicer(object): - - def AddItem(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def AddItems(self, request_iterator, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def RemoveItem(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def RemoveItems(self, request_iterator, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ListInventory(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def QueryQuantity(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def QueryQuantities(self, request_iterator, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_StoreServicer_to_server(servicer, server): - rpc_method_handlers = { - 'AddItem': grpc.unary_unary_rpc_method_handler( - servicer.AddItem, - request_deserializer=store__pb2.AddItemRequest.FromString, - response_serializer=store__pb2.Empty.SerializeToString, - ), - 'AddItems': grpc.stream_unary_rpc_method_handler( - servicer.AddItems, - request_deserializer=store__pb2.AddItemRequest.FromString, - response_serializer=store__pb2.Empty.SerializeToString, - ), - 'RemoveItem': grpc.unary_unary_rpc_method_handler( - servicer.RemoveItem, - request_deserializer=store__pb2.RemoveItemRequest.FromString, - response_serializer=store__pb2.RemoveItemResponse.SerializeToString, - ), - 'RemoveItems': grpc.stream_unary_rpc_method_handler( - servicer.RemoveItems, - request_deserializer=store__pb2.RemoveItemRequest.FromString, - response_serializer=store__pb2.RemoveItemResponse.SerializeToString, - ), - 'ListInventory': grpc.unary_stream_rpc_method_handler( - servicer.ListInventory, - request_deserializer=store__pb2.Empty.FromString, - response_serializer=store__pb2.QuantityResponse.SerializeToString, - ), - 'QueryQuantity': grpc.unary_unary_rpc_method_handler( - servicer.QueryQuantity, - request_deserializer=store__pb2.QueryItemRequest.FromString, - response_serializer=store__pb2.QuantityResponse.SerializeToString, - ), - 'QueryQuantities': grpc.stream_stream_rpc_method_handler( - servicer.QueryQuantities, - request_deserializer=store__pb2.QueryItemRequest.FromString, - response_serializer=store__pb2.QuantityResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'store.Store', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) diff --git a/examples/store/store_server.py b/examples/store/store_server.py deleted file mode 100644 index 1a59d45..0000000 --- a/examples/store/store_server.py +++ /dev/null @@ -1,122 +0,0 @@ -# A OpenTraced server for a Python service that implements the store interface. -from __future__ import print_function - -import time -import argparse -from collections import defaultdict - -from six import iteritems - -import grpc -from concurrent import futures -from jaeger_client import Config - -from grpc_opentracing import open_tracing_server_interceptor, \ - SpanDecorator -from grpc_opentracing.grpcext import intercept_server - -import store_pb2 - -_ONE_DAY_IN_SECONDS = 60 * 60 * 24 - - -class Store(store_pb2.StoreServicer): - - def __init__(self): - self._inventory = defaultdict(int) - - def AddItem(self, request, context): - self._inventory[request.name] += 1 - return store_pb2.Empty() - - def AddItems(self, request_iter, context): - for request in request_iter: - self._inventory[request.name] += 1 - return store_pb2.Empty() - - def RemoveItem(self, request, context): - new_quantity = self._inventory[request.name] - 1 - if new_quantity < 0: - return store_pb2.RemoveItemResponse(was_successful=False) - self._inventory[request.name] = new_quantity - return store_pb2.RemoveItemResponse(was_successful=True) - - def RemoveItems(self, request_iter, context): - response = store_pb2.RemoveItemResponse(was_successful=True) - for request in request_iter: - response = self.RemoveItem(request, context) - if not response.was_successful: - break - return response - - def ListInventory(self, request, context): - for name, count in iteritems(self._inventory): - if not count: - continue - else: - yield store_pb2.QuantityResponse(name=name, count=count) - - def QueryQuantity(self, request, context): - count = self._inventory[request.name] - return store_pb2.QuantityResponse(name=request.name, count=count) - - def QueryQuantities(self, request_iter, context): - for request in request_iter: - count = self._inventory[request.name] - yield store_pb2.QuantityResponse(name=request.name, count=count) - - -class StoreSpanDecorator(SpanDecorator): - - def __call__(self, span, rpc_info): - span.set_tag('grpc.method', rpc_info.full_method) - span.set_tag('grpc.headers', str(rpc_info.metadata)) - span.set_tag('grpc.deadline', str(rpc_info.timeout)) - - -def serve(): - parser = argparse.ArgumentParser() - parser.add_argument( - '--log_payloads', - action='store_true', - help='log request/response objects to open-tracing spans') - parser.add_argument( - '--include_grpc_tags', - action='store_true', - help='set gRPC-specific tags on spans') - args = parser.parse_args() - - config = Config( - config={ - 'sampler': { - 'type': 'const', - 'param': 1, - }, - 'logging': True, - }, - service_name='store-server') - tracer = config.initialize_tracer() - span_decorator = None - if args.include_grpc_tags: - span_decorator = StoreSpanDecorator() - tracer_interceptor = open_tracing_server_interceptor( - tracer, log_payloads=args.log_payloads, span_decorator=span_decorator) - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - server = intercept_server(server, tracer_interceptor) - - store_pb2.add_StoreServicer_to_server(Store(), server) - server.add_insecure_port('[::]:50051') - server.start() - try: - while True: - time.sleep(_ONE_DAY_IN_SECONDS) - except KeyboardInterrupt: - server.stop(0) - - time.sleep(2) - tracer.close() - time.sleep(2) - - -if __name__ == '__main__': - serve() diff --git a/examples/trivial/README.md b/examples/trivial/README.md deleted file mode 100644 index 6c11d2b..0000000 --- a/examples/trivial/README.md +++ /dev/null @@ -1,7 +0,0 @@ -A simple example showing how to set gRPC up to use OpenTracing. - -## Usage -``` -python trivial_server.py & -python trivial_client.py -``` diff --git a/examples/trivial/command_line_pb2.py b/examples/trivial/command_line_pb2.py deleted file mode 100644 index 42c7fc4..0000000 --- a/examples/trivial/command_line_pb2.py +++ /dev/null @@ -1,213 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: command_line.proto - -import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor.FileDescriptor( - name='command_line.proto', - package='command_line', - syntax='proto3', - serialized_pb=_b('\n\x12\x63ommand_line.proto\x12\x0c\x63ommand_line\"\x1e\n\x0e\x43ommandRequest\x12\x0c\n\x04text\x18\x01 \x01(\t\"\x1f\n\x0f\x43ommandResponse\x12\x0c\n\x04text\x18\x01 \x01(\t2T\n\x0b\x43ommandLine\x12\x45\n\x04\x45\x63ho\x12\x1c.command_line.CommandRequest\x1a\x1d.command_line.CommandResponse\"\x00\x62\x06proto3') -) -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - - - - -_COMMANDREQUEST = _descriptor.Descriptor( - name='CommandRequest', - full_name='command_line.CommandRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='text', full_name='command_line.CommandRequest.text', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=36, - serialized_end=66, -) - - -_COMMANDRESPONSE = _descriptor.Descriptor( - name='CommandResponse', - full_name='command_line.CommandResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='text', full_name='command_line.CommandResponse.text', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=68, - serialized_end=99, -) - -DESCRIPTOR.message_types_by_name['CommandRequest'] = _COMMANDREQUEST -DESCRIPTOR.message_types_by_name['CommandResponse'] = _COMMANDRESPONSE - -CommandRequest = _reflection.GeneratedProtocolMessageType('CommandRequest', (_message.Message,), dict( - DESCRIPTOR = _COMMANDREQUEST, - __module__ = 'command_line_pb2' - # @@protoc_insertion_point(class_scope:command_line.CommandRequest) - )) -_sym_db.RegisterMessage(CommandRequest) - -CommandResponse = _reflection.GeneratedProtocolMessageType('CommandResponse', (_message.Message,), dict( - DESCRIPTOR = _COMMANDRESPONSE, - __module__ = 'command_line_pb2' - # @@protoc_insertion_point(class_scope:command_line.CommandResponse) - )) -_sym_db.RegisterMessage(CommandResponse) - - -try: - # THESE ELEMENTS WILL BE DEPRECATED. - # Please use the generated *_pb2_grpc.py files instead. - import grpc - from grpc.framework.common import cardinality - from grpc.framework.interfaces.face import utilities as face_utilities - from grpc.beta import implementations as beta_implementations - from grpc.beta import interfaces as beta_interfaces - - - class CommandLineStub(object): - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.Echo = channel.unary_unary( - '/command_line.CommandLine/Echo', - request_serializer=CommandRequest.SerializeToString, - response_deserializer=CommandResponse.FromString, - ) - - - class CommandLineServicer(object): - - def Echo(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - - def add_CommandLineServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Echo': grpc.unary_unary_rpc_method_handler( - servicer.Echo, - request_deserializer=CommandRequest.FromString, - response_serializer=CommandResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'command_line.CommandLine', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - class BetaCommandLineServicer(object): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This class was generated - only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0.""" - def Echo(self, request, context): - context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) - - - class BetaCommandLineStub(object): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This class was generated - only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0.""" - def Echo(self, request, timeout, metadata=None, with_call=False, protocol_options=None): - raise NotImplementedError() - Echo.future = None - - - def beta_create_CommandLine_server(servicer, pool=None, pool_size=None, default_timeout=None, maximum_timeout=None): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This function was - generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0""" - request_deserializers = { - ('command_line.CommandLine', 'Echo'): CommandRequest.FromString, - } - response_serializers = { - ('command_line.CommandLine', 'Echo'): CommandResponse.SerializeToString, - } - method_implementations = { - ('command_line.CommandLine', 'Echo'): face_utilities.unary_unary_inline(servicer.Echo), - } - server_options = beta_implementations.server_options(request_deserializers=request_deserializers, response_serializers=response_serializers, thread_pool=pool, thread_pool_size=pool_size, default_timeout=default_timeout, maximum_timeout=maximum_timeout) - return beta_implementations.server(method_implementations, options=server_options) - - - def beta_create_CommandLine_stub(channel, host=None, metadata_transformer=None, pool=None, pool_size=None): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This function was - generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0""" - request_serializers = { - ('command_line.CommandLine', 'Echo'): CommandRequest.SerializeToString, - } - response_deserializers = { - ('command_line.CommandLine', 'Echo'): CommandResponse.FromString, - } - cardinalities = { - 'Echo': cardinality.Cardinality.UNARY_UNARY, - } - stub_options = beta_implementations.stub_options(host=host, metadata_transformer=metadata_transformer, request_serializers=request_serializers, response_deserializers=response_deserializers, thread_pool=pool, thread_pool_size=pool_size) - return beta_implementations.dynamic_stub(channel, 'command_line.CommandLine', cardinalities, options=stub_options) -except ImportError: - pass -# @@protoc_insertion_point(module_scope) diff --git a/examples/trivial/command_line_pb2_grpc.py b/examples/trivial/command_line_pb2_grpc.py deleted file mode 100644 index 6b9b82d..0000000 --- a/examples/trivial/command_line_pb2_grpc.py +++ /dev/null @@ -1,42 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -import grpc -from grpc.framework.common import cardinality -from grpc.framework.interfaces.face import utilities as face_utilities - -import command_line_pb2 as command__line__pb2 - - -class CommandLineStub(object): - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.Echo = channel.unary_unary( - '/command_line.CommandLine/Echo', - request_serializer=command__line__pb2.CommandRequest.SerializeToString, - response_deserializer=command__line__pb2.CommandResponse.FromString, - ) - - -class CommandLineServicer(object): - - def Echo(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_CommandLineServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Echo': grpc.unary_unary_rpc_method_handler( - servicer.Echo, - request_deserializer=command__line__pb2.CommandRequest.FromString, - response_serializer=command__line__pb2.CommandResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'command_line.CommandLine', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) diff --git a/examples/trivial/run_codegen.py b/examples/trivial/run_codegen.py deleted file mode 100644 index 5f64324..0000000 --- a/examples/trivial/run_codegen.py +++ /dev/null @@ -1,4 +0,0 @@ -from grpc_tools import protoc - -protoc.main(('', '-I../protos', '--python_out=.', '--grpc_python_out=.', - '../protos/command_line.proto')) diff --git a/examples/trivial/trivial_client.py b/examples/trivial/trivial_client.py deleted file mode 100644 index 4dd2c9a..0000000 --- a/examples/trivial/trivial_client.py +++ /dev/null @@ -1,47 +0,0 @@ -from __future__ import print_function - -import time -import argparse - -import grpc -from jaeger_client import Config - -from grpc_opentracing import open_tracing_client_interceptor -from grpc_opentracing.grpcext import intercept_channel - -import command_line_pb2 - - -def run(): - parser = argparse.ArgumentParser() - parser.add_argument( - '--log_payloads', - action='store_true', - help='log request/response objects to open-tracing spans') - args = parser.parse_args() - - config = Config( - config={ - 'sampler': { - 'type': 'const', - 'param': 1, - }, - 'logging': True, - }, - service_name='trivial-client') - tracer = config.initialize_tracer() - tracer_interceptor = open_tracing_client_interceptor( - tracer, log_payloads=args.log_payloads) - channel = grpc.insecure_channel('localhost:50051') - channel = intercept_channel(channel, tracer_interceptor) - stub = command_line_pb2.CommandLineStub(channel) - response = stub.Echo(command_line_pb2.CommandRequest(text='Hello, hello')) - print(response.text) - - time.sleep(2) - tracer.close() - time.sleep(2) - - -if __name__ == '__main__': - run() diff --git a/examples/trivial/trivial_server.py b/examples/trivial/trivial_server.py deleted file mode 100644 index 7cc21de..0000000 --- a/examples/trivial/trivial_server.py +++ /dev/null @@ -1,62 +0,0 @@ -from __future__ import print_function - -import time -import argparse - -import grpc -from concurrent import futures -from jaeger_client import Config - -from grpc_opentracing import open_tracing_server_interceptor -from grpc_opentracing.grpcext import intercept_server - -import command_line_pb2 - -_ONE_DAY_IN_SECONDS = 60 * 60 * 24 - - -class CommandLine(command_line_pb2.CommandLineServicer): - - def Echo(self, request, context): - return command_line_pb2.CommandResponse(text=request.text) - - -def serve(): - parser = argparse.ArgumentParser() - parser.add_argument( - '--log_payloads', - action='store_true', - help='log request/response objects to open-tracing spans') - args = parser.parse_args() - - config = Config( - config={ - 'sampler': { - 'type': 'const', - 'param': 1, - }, - 'logging': True, - }, - service_name='trivial-server') - tracer = config.initialize_tracer() - tracer_interceptor = open_tracing_server_interceptor( - tracer, log_payloads=args.log_payloads) - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - server = intercept_server(server, tracer_interceptor) - - command_line_pb2.add_CommandLineServicer_to_server(CommandLine(), server) - server.add_insecure_port('[::]:50051') - server.start() - try: - while True: - time.sleep(_ONE_DAY_IN_SECONDS) - except KeyboardInterrupt: - server.stop(0) - - time.sleep(2) - tracer.close() - time.sleep(2) - - -if __name__ == '__main__': - serve() diff --git a/grpc_opentracing/__init__.py b/grpc_opentracing/__init__.py index f35e79e..9cb6f4d 100644 --- a/grpc_opentracing/__init__.py +++ b/grpc_opentracing/__init__.py @@ -1,96 +1,2 @@ -import abc -import enum - -import six - -import grpc - - -class ActiveSpanSource(six.with_metaclass(abc.ABCMeta)): - """Provides a way to access an the active span.""" - - @abc.abstractmethod - def get_active_span(self): - """Identifies the active span. - - Returns: - An object that implements the opentracing.Span interface. - """ - raise NotImplementedError() - - -class RpcInfo(six.with_metaclass(abc.ABCMeta)): - """Provides information for an RPC call. - - Attributes: - full_method: A string of the full RPC method, i.e., /package.service/method. - metadata: The initial :term:`metadata`. - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled. - request: The RPC request or None for request-streaming RPCs. - response: The RPC response or None for response-streaming or erroring RPCs. - error: The RPC error or None for successful RPCs. - """ - - -class SpanDecorator(six.with_metaclass(abc.ABCMeta)): - """Provides a mechanism to add arbitrary tags/logs/etc to the - opentracing.Span associated with client and/or server RPCs.""" - - @abc.abstractmethod - def __call__(self, span, rpc_info): - """Customizes an RPC span. - - Args: - span: The client-side or server-side opentracing.Span for the RPC. - rpc_info: An RpcInfo describing the RPC. - """ - raise NotImplementedError() - - -def open_tracing_client_interceptor(tracer, - active_span_source=None, - log_payloads=False, - span_decorator=None): - """Creates an invocation-side interceptor that can be use with gRPC to add - OpenTracing information. - - Args: - tracer: An object implmenting the opentracing.Tracer interface. - active_span_source: An optional ActiveSpanSource to customize how the - active span is determined. - log_payloads: Indicates whether requests should be logged. - span_decorator: An optional SpanDecorator. - - Returns: - An invocation-side interceptor object. - """ - from grpc_opentracing import _client - return _client.OpenTracingClientInterceptor(tracer, active_span_source, - log_payloads, span_decorator) - - -def open_tracing_server_interceptor(tracer, - log_payloads=False, - span_decorator=None): - """Creates a service-side interceptor that can be use with gRPC to add - OpenTracing information. - - Args: - tracer: An object implmenting the opentracing.Tracer interface. - log_payloads: Indicates whether requests should be logged. - span_decorator: An optional SpanDecorator. - - Returns: - A service-side interceptor object. - """ - from grpc_opentracing import _server - return _server.OpenTracingServerInterceptor(tracer, log_payloads, - span_decorator) - - -################################### __all__ ################################# - -__all__ = ('ActiveSpanSource', 'RpcInfo', 'SpanDecorator', - 'open_tracing_client_interceptor', - 'open_tracing_server_interceptor',) +from grpc_opentracing.grpc_interceptor import open_tracing_client_interceptor as open_tracing_client_interceptor +from grpc_opentracing.grpc_interceptor import open_tracing_server_interceptor as open_tracing_server_interceptor diff --git a/grpc_opentracing/_client.py b/grpc_opentracing/_client.py deleted file mode 100644 index 01afb3e..0000000 --- a/grpc_opentracing/_client.py +++ /dev/null @@ -1,209 +0,0 @@ -"""Implementation of the invocation-side open-tracing interceptor.""" - -import sys -import logging -import time - -from six import iteritems - -import grpc -from grpc_opentracing import grpcext -from grpc_opentracing._utilities import get_method_type, get_deadline_millis,\ - log_or_wrap_request_or_iterator, RpcInfo -import opentracing -from opentracing.ext import tags as ot_tags - - -class _GuardedSpan(object): - - def __init__(self, span): - self.span = span - self._engaged = True - - def __enter__(self): - self.span.__enter__() - return self - - def __exit__(self, *args, **kwargs): - if self._engaged: - return self.span.__exit__(*args, **kwargs) - else: - return False - - def release(self): - self._engaged = False - return self.span - - -def _inject_span_context(tracer, span, metadata): - headers = {} - try: - tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, headers) - except (opentracing.UnsupportedFormatException, - opentracing.InvalidCarrierException, - opentracing.SpanContextCorruptedException) as e: - logging.exception('tracer.inject() failed') - span.log_kv({'event': 'error', 'error.object': e}) - return metadata - metadata = () if metadata is None else tuple(metadata) - return metadata + tuple((k.lower(), v) for (k, v) in iteritems(headers)) - - -def _make_future_done_callback(span, rpc_info, log_payloads, span_decorator): - - def callback(response_future): - with span: - code = response_future.code() - if code != grpc.StatusCode.OK: - span.set_tag('error', True) - error_log = {'event': 'error', 'error.kind': str(code)} - details = response_future.details() - if details is not None: - error_log['message'] = details - span.log_kv(error_log) - rpc_info.error = code - if span_decorator is not None: - span_decorator(span, rpc_info) - return - response = response_future.result() - rpc_info.response = response - if log_payloads: - span.log_kv({'response': response}) - if span_decorator is not None: - span_decorator(span, rpc_info) - - return callback - - -class OpenTracingClientInterceptor(grpcext.UnaryClientInterceptor, - grpcext.StreamClientInterceptor): - - def __init__(self, tracer, active_span_source, log_payloads, - span_decorator): - self._tracer = tracer - self._active_span_source = active_span_source - self._log_payloads = log_payloads - self._span_decorator = span_decorator - - def _start_span(self, method): - active_span_context = None - if self._active_span_source is not None: - active_span = self._active_span_source.get_active_span() - if active_span is not None: - active_span_context = active_span.context - tags = { - ot_tags.COMPONENT: 'grpc', - ot_tags.SPAN_KIND: ot_tags.SPAN_KIND_RPC_CLIENT - } - return self._tracer.start_span( - operation_name=method, child_of=active_span_context, tags=tags) - - def _trace_result(self, guarded_span, rpc_info, result): - # If the RPC is called asynchronously, release the guard and add a callback - # so that the span can be finished once the future is done. - if isinstance(result, grpc.Future): - result.add_done_callback( - _make_future_done_callback(guarded_span.release( - ), rpc_info, self._log_payloads, self._span_decorator)) - return result - response = result - # Handle the case when the RPC is initiated via the with_call - # method and the result is a tuple with the first element as the - # response. - # http://www.grpc.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.with_call - if isinstance(result, tuple): - response = result[0] - rpc_info.response = response - if self._log_payloads: - guarded_span.span.log_kv({'response': response}) - if self._span_decorator is not None: - self._span_decorator(guarded_span.span, rpc_info) - return result - - def _start_guarded_span(self, *args, **kwargs): - return _GuardedSpan(self._start_span(*args, **kwargs)) - - def intercept_unary(self, request, metadata, client_info, invoker): - with self._start_guarded_span(client_info.full_method) as guarded_span: - metadata = _inject_span_context(self._tracer, guarded_span.span, - metadata) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request) - if self._log_payloads: - guarded_span.span.log_kv({'request': request}) - try: - result = invoker(request, metadata) - except: - e = sys.exc_info()[0] - guarded_span.span.set_tag('error', True) - guarded_span.span.log_kv({'event': 'error', 'error.object': e}) - rpc_info.error = e - if self._span_decorator is not None: - self._span_decorator(guarded_span.span, rpc_info) - raise - return self._trace_result(guarded_span, rpc_info, result) - - # For RPCs that stream responses, the result can be a generator. To record - # the span across the generated responses and detect any errors, we wrap the - # result in a new generator that yields the response values. - def _intercept_server_stream(self, request_or_iterator, metadata, - client_info, invoker): - with self._start_span(client_info.full_method) as span: - metadata = _inject_span_context(self._tracer, span, metadata) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout) - if client_info.is_client_stream: - rpc_info.request = request_or_iterator - if self._log_payloads: - request_or_iterator = log_or_wrap_request_or_iterator( - span, client_info.is_client_stream, request_or_iterator) - try: - result = invoker(request_or_iterator, metadata) - for response in result: - if self._log_payloads: - span.log_kv({'response': response}) - yield response - except: - e = sys.exc_info()[0] - span.set_tag('error', True) - span.log_kv({'event': 'error', 'error.object': e}) - rpc_info.error = e - if self._span_decorator is not None: - self._span_decorator(span, rpc_info) - raise - if self._span_decorator is not None: - self._span_decorator(span, rpc_info) - - def intercept_stream(self, request_or_iterator, metadata, client_info, - invoker): - if client_info.is_server_stream: - return self._intercept_server_stream(request_or_iterator, metadata, - client_info, invoker) - with self._start_guarded_span(client_info.full_method) as guarded_span: - metadata = _inject_span_context(self._tracer, guarded_span.span, - metadata) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request_or_iterator) - if self._log_payloads: - request_or_iterator = log_or_wrap_request_or_iterator( - guarded_span.span, client_info.is_client_stream, - request_or_iterator) - try: - result = invoker(request_or_iterator, metadata) - except: - e = sys.exc_info()[0] - guarded_span.span.set_tag('error', True) - guarded_span.span.log_kv({'event': 'error', 'error.object': e}) - rpc_info.error = e - if self._span_decorator is not None: - self._span_decorator(guarded_span.span, rpc_info) - raise - return self._trace_result(guarded_span, rpc_info, result) diff --git a/grpc_opentracing/_server.py b/grpc_opentracing/_server.py deleted file mode 100644 index dd33ca0..0000000 --- a/grpc_opentracing/_server.py +++ /dev/null @@ -1,232 +0,0 @@ -"""Implementation of the service-side open-tracing interceptor.""" - -import sys -import logging -import re - -import grpc -from grpc_opentracing import grpcext, ActiveSpanSource -from grpc_opentracing._utilities import get_method_type, get_deadline_millis,\ - log_or_wrap_request_or_iterator, RpcInfo -import opentracing -from opentracing.ext import tags as ot_tags - - -class _OpenTracingServicerContext(grpc.ServicerContext, ActiveSpanSource): - - def __init__(self, servicer_context, active_span): - self._servicer_context = servicer_context - self._active_span = active_span - self.code = grpc.StatusCode.OK - self.details = None - - def is_active(self, *args, **kwargs): - return self._servicer_context.is_active(*args, **kwargs) - - def time_remaining(self, *args, **kwargs): - return self._servicer_context.time_remaining(*args, **kwargs) - - def cancel(self, *args, **kwargs): - return self._servicer_context.cancel(*args, **kwargs) - - def add_callback(self, *args, **kwargs): - return self._servicer_context.add_callback(*args, **kwargs) - - def invocation_metadata(self, *args, **kwargs): - return self._servicer_context.invocation_metadata(*args, **kwargs) - - def peer(self, *args, **kwargs): - return self._servicer_context.peer(*args, **kwargs) - - def peer_identities(self, *args, **kwargs): - return self._servicer_context.peer_identities(*args, **kwargs) - - def peer_identity_key(self, *args, **kwargs): - return self._servicer_context.peer_identity_key(*args, **kwargs) - - def auth_context(self, *args, **kwargs): - return self._servicer_context.auth_context(*args, **kwargs) - - def send_initial_metadata(self, *args, **kwargs): - return self._servicer_context.send_initial_metadata(*args, **kwargs) - - def set_trailing_metadata(self, *args, **kwargs): - return self._servicer_context.set_trailing_metadata(*args, **kwargs) - - def abort(self, *args, **kwargs): - if not hasattr(self._servicer_context, 'abort'): - raise RuntimeError('abort() is not supported with the installed version of grpcio') - return self._servicer_context.abort(*args, **kwargs) - - def set_code(self, code): - self.code = code - return self._servicer_context.set_code(code) - - def set_details(self, details): - self.details = details - return self._servicer_context.set_details(details) - - def get_active_span(self): - return self._active_span - - -def _add_peer_tags(peer_str, tags): - ipv4_re = r"ipv4:(?P
.+):(?P