diff --git a/requirements/base.txt b/requirements/base.txt index e35beba..c717de6 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -3,4 +3,6 @@ faust-streaming==0.11.0 # Required for Schema Registry confluent-kafka==2.4.0 -protobuf==5.27.0 \ No newline at end of file +protobuf==5.27.0 +requests==2.32.2 +protoc-wheel-0==25.3 \ No newline at end of file diff --git a/src/lesson-02/faust_consumer.py b/src/lesson-02/faust_consumer.py index d410179..c1d22db 100644 --- a/src/lesson-02/faust_consumer.py +++ b/src/lesson-02/faust_consumer.py @@ -1,8 +1,7 @@ import faust from protobuf.protos.user_pb2 import User from protobuf.serializers.protobufcodec import ProtobufCodec -from google.protobuf.message import Message -from typing import Any + user_codec = ProtobufCodec(User) diff --git a/src/lesson-02/faust_consumer_registry.py b/src/lesson-02/faust_consumer_registry.py new file mode 100644 index 0000000..bdf5480 --- /dev/null +++ b/src/lesson-02/faust_consumer_registry.py @@ -0,0 +1,26 @@ +import faust +from protobuf.registry.schema_registry import SchemaRegistry +from protobuf.registry.protobuf_factory import ProtobufFactory +from protobuf.serializers.protobufcodec import ProtobufCodec + +schema_registry_url = 'http://localhost:8081' +subject = 'users' +schema_version = 1 # Specify the version of the schema you want to use, or set to None if not using version + +schema_registry = SchemaRegistry(schema_registry_url) +protobuf_factory = ProtobufFactory(schema_registry) + +# Fetch the User message class from the registry +User = protobuf_factory.create_message_class(subject, 'User', version=schema_version) +user_codec = ProtobufCodec(User, version=schema_version) + +app = faust.App('myapp_consumer', broker='kafka://localhost:9092', web_port=6068) +topic = app.topic('users', key_serializer=None, value_serializer=user_codec) + +@app.agent(topic) +async def process(users): + async for user in users: + print(f'Received user: {user.name}, {user.age}, {user.email}, {user.interests}') + +if __name__ == '__main__': + app.main() diff --git a/src/lesson-02/faust_producer.py b/src/lesson-02/faust_producer.py index 51de9c6..2afc7f9 100644 --- a/src/lesson-02/faust_producer.py +++ b/src/lesson-02/faust_producer.py @@ -1,8 +1,6 @@ import faust from protobuf.protos.user_pb2 import User from protobuf.serializers.protobufcodec import ProtobufCodec -from google.protobuf.message import Message -from typing import Any user_codec = ProtobufCodec(User) diff --git a/src/lesson-02/faust_producer_registry.py b/src/lesson-02/faust_producer_registry.py new file mode 100644 index 0000000..b8f9def --- /dev/null +++ b/src/lesson-02/faust_producer_registry.py @@ -0,0 +1,26 @@ +import faust +from protobuf.registry.schema_registry import SchemaRegistry +from protobuf.registry.protobuf_factory import ProtobufFactory +from protobuf.serializers.protobufcodec import ProtobufCodec + +schema_registry_url = 'http://localhost:8081' +subject = 'users' +schema_version = 1 # Specify the version of the schema you want to use + +schema_registry = SchemaRegistry(schema_registry_url) +protobuf_factory = ProtobufFactory(schema_registry) + +User = protobuf_factory.create_message_class(subject, 'User', version=schema_version) +user_codec = ProtobufCodec(User) + +app = faust.App('myapp_producer', broker='kafka://localhost:9092', web_port=6067) +topic = app.topic('users', value_type=bytes, value_serializer=user_codec) + +@app.timer(interval=1.0) +async def produce(): + user = User(name='John Doe', age=30, email='john@example.com', interests=['hiking', 'reading']) + await topic.send(value=user) + print(f'Sent user: {user.name}, {user.age}, {user.email}, {user.interests}') + +if __name__ == '__main__': + app.main() diff --git a/src/lesson-02/protobuf/registry/abstract_classes.py b/src/lesson-02/protobuf/registry/abstract_classes.py index 9fd31fe..c61a625 100644 --- a/src/lesson-02/protobuf/registry/abstract_classes.py +++ b/src/lesson-02/protobuf/registry/abstract_classes.py @@ -10,18 +10,26 @@ def get_latest_schema(self, subject: str): def register_schema(self, subject: str, schema_str: str, schema_type: str): pass + @abstractmethod + def get_schema(self, subject: str, version: int = None): + pass + + @abstractmethod + def get_versions(self, subject: str): + pass + class SchemaFactoryInterface(ABC): @abstractmethod - def create_serializer(self, subject: str): + def create_serializer(self, subject: str, version: int = None): pass @abstractmethod - def create_deserializer(self, subject: str): + def create_deserializer(self, subject: str, version: int = None): pass @abstractmethod - def create_message_class(self, subject: str, message_name: str = None): + def create_message_class(self, subject: str, message_name: str = None, version: int = None): pass @abstractmethod diff --git a/src/lesson-02/protobuf/registry/avro_factory.py b/src/lesson-02/protobuf/registry/avro_factory.py index 3d5b262..cfcedcd 100644 --- a/src/lesson-02/protobuf/registry/avro_factory.py +++ b/src/lesson-02/protobuf/registry/avro_factory.py @@ -1,6 +1,6 @@ from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer -from abstract_classes import SchemaFactoryInterface, SchemaRegistryInterface -from schema_registry import SchemaRegistry +from .abstract_classes import SchemaFactoryInterface, SchemaRegistryInterface +from .schema_registry import SchemaRegistry class AvroFactory(SchemaFactoryInterface): diff --git a/src/lesson-02/protobuf/registry/json_schema_factory.py b/src/lesson-02/protobuf/registry/json_schema_factory.py index 84a00e3..b85e2cd 100644 --- a/src/lesson-02/protobuf/registry/json_schema_factory.py +++ b/src/lesson-02/protobuf/registry/json_schema_factory.py @@ -1,6 +1,6 @@ from confluent_kafka.schema_registry.json_schema import JSONSerializer, JSONDeserializer -from abstract_classes import SchemaFactoryInterface -from schema_registry import SchemaRegistry +from .abstract_classes import SchemaFactoryInterface +from .schema_registry import SchemaRegistry class JSONSchemaFactory(SchemaFactoryInterface): diff --git a/src/lesson-02/protobuf/registry/proto_tool.py b/src/lesson-02/protobuf/registry/proto_tool.py index 112aa19..8609b9d 100644 --- a/src/lesson-02/protobuf/registry/proto_tool.py +++ b/src/lesson-02/protobuf/registry/proto_tool.py @@ -14,16 +14,16 @@ def register_protobuf_schema(factory, subject, schema_path): factory.register_schema(subject, schema_path) -def create_serializer(factory, subject): - return factory.create_serializer(subject) +def create_serializer(factory, subject, version=None): + return factory.create_serializer(subject, version) -def create_deserializer(factory, subject): - return factory.create_deserializer(subject) +def create_deserializer(factory, subject, version=None): + return factory.create_deserializer(subject, version) -def create_message_class(factory, subject, message_name): - return factory.create_message_class(subject, message_name) +def create_message_class(factory, subject, message_name, version=None): + return factory.create_message_class(subject, message_name, version) def main(): @@ -32,9 +32,9 @@ def main(): parser.add_argument("--subject", type=str, required=True, help="Schema subject") parser.add_argument("--schema_path", type=str, help="Path to schema file for registration") parser.add_argument("--message_name", type=str, help="Message name for creating message class") + parser.add_argument("--version", type=int, help="Schema version") parser.add_argument("--operation", type=str, required=True, choices=["register", "serialize", "deserialize", "create_message"], help="Operation to perform") - args = parser.parse_args() schema_registry = initialize_schema_registry(args.url) @@ -47,23 +47,19 @@ def main(): print(f"Schema registered for subject: {args.subject}") elif args.operation == "serialize": - serializer = create_serializer(protobuf_factory, args.subject) + serializer = create_serializer(protobuf_factory, args.subject, args.version) print(f"Serializer created for subject: {args.subject}") elif args.operation == "deserialize": - deserializer = create_deserializer(protobuf_factory, args.subject) + deserializer = create_deserializer(protobuf_factory, args.subject, args.version) print(f"Deserializer created for subject: {args.subject}") elif args.operation == "create_message": if not args.message_name: raise ValueError("Message name is required for creating message class") - MessageClass = create_message_class(protobuf_factory, args.subject, args.message_name) + MessageClass = create_message_class(protobuf_factory, args.subject, args.message_name, args.version) print(f"Message class created for message: {args.message_name}") if __name__ == "__main__": main() - - -# python proto_tool.py --url "http://localhost:8081" --subject "users" --schema_path "../../../protos/user.proto" --operation register - diff --git a/src/lesson-02/protobuf/registry/protobuf_factory.py b/src/lesson-02/protobuf/registry/protobuf_factory.py index 84d2bf7..f3040f5 100644 --- a/src/lesson-02/protobuf/registry/protobuf_factory.py +++ b/src/lesson-02/protobuf/registry/protobuf_factory.py @@ -1,7 +1,10 @@ +import os +import tempfile +import importlib.util from google.protobuf import descriptor_pool, message_factory from confluent_kafka.schema_registry.protobuf import ProtobufSerializer, ProtobufDeserializer -from abstract_classes import SchemaFactoryInterface -from schema_registry import SchemaRegistry +from .abstract_classes import SchemaFactoryInterface +from .schema_registry import SchemaRegistry class ProtobufFactory(SchemaFactoryInterface): @@ -10,26 +13,51 @@ def __init__(self, schema_registry: SchemaRegistry): self.pool = descriptor_pool.Default() self.factories = {} - def create_serializer(self, subject: str): - schema_response = self.schema_registry.get_latest_schema(subject) + def create_serializer(self, subject: str, version: int = None): + schema_response = self.schema_registry.get_schema(subject, version) schema_str = schema_response.schema.schema_str return ProtobufSerializer(schema_str, self.schema_registry.client, {'use.deprecated.format': True}) - def create_deserializer(self, subject: str): - schema_response = self.schema_registry.get_latest_schema(subject) + def create_deserializer(self, subject: str, version: int = None): + schema_response = self.schema_registry.get_schema(subject, version) schema_str = schema_response.schema.schema_str return ProtobufDeserializer(schema_str, self.schema_registry.client) - def create_message_class(self, subject: str, message_name: str = None): - if subject in self.factories: - return self.factories[subject].GetPrototype(self.factories[subject].message_types_by_name[message_name]) + def compile_proto_from_string(self, proto_string): + with tempfile.TemporaryDirectory() as temp_dir: + proto_filename = os.path.join(temp_dir, "temp.proto") + # Write the .proto string to a file + with open(proto_filename, 'w') as temp_proto_file: + temp_proto_file.write(proto_string) + # Determine the path to the protobuf includes + proto_include = os.path.dirname(os.path.abspath(temp_proto_file.name)) + # Compile the .proto file using protoc + protoc_command = f"protoc --proto_path={temp_dir} --proto_path={proto_include} --python_out={temp_dir} {proto_filename}" + os.system(protoc_command) + # Extract the generated Python module + py_module_name = "temp_pb2" + py_module_path = os.path.join(temp_dir, py_module_name + ".py") + # Load the generated module dynamically + spec = importlib.util.spec_from_file_location(py_module_name, py_module_path) + generated_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(generated_module) + return generated_module - schema_response = self.schema_registry.get_latest_schema(subject) - file_descriptor = self.pool.AddSerializedFile(schema_response.schema.schema_obj.SerializeToString()) - message_descriptor = file_descriptor.message_types_by_name[message_name] + def create_message_class(self, subject: str, message_name: str = None, version: int = None): + subject_version_key = f"{subject}:{version}" + if subject_version_key in self.factories: + return self.factories[subject_version_key].GetPrototype( + self.factories[subject_version_key].message_types_by_name[message_name]) + schema_response = self.schema_registry.get_schema(subject, version) + schema_str = schema_response.schema.schema_str + # Compile the .proto string and get the generated module + generated_module = self.compile_proto_from_string(schema_str) + # Get the message class from the generated module + message_class = getattr(generated_module, message_name) + # Create a MessageFactory and store it factory = message_factory.MessageFactory(self.pool) - self.factories[subject] = factory - return factory.GetPrototype(message_descriptor) + self.factories[subject_version_key] = factory + return message_class def register_schema(self, subject: str, schema_path: str): with open(schema_path, 'r') as file: diff --git a/src/lesson-02/protobuf/registry/schema_factory_producer.py b/src/lesson-02/protobuf/registry/schema_factory_producer.py index e42c89b..6228944 100644 --- a/src/lesson-02/protobuf/registry/schema_factory_producer.py +++ b/src/lesson-02/protobuf/registry/schema_factory_producer.py @@ -1,7 +1,7 @@ from abstract_classes import SchemaFactoryProducerInterface, SchemaFactoryInterface, SchemaRegistryInterface -from avro_factory import AvroFactory -from json_schema_factory import JSONSchemaFactory -from protobuf_factory import ProtobufFactory +from .avro_factory import AvroFactory +from .json_schema_factory import JSONSchemaFactory +from .protobuf_factory import ProtobufFactory class SchemaFactoryProducer(SchemaFactoryProducerInterface): diff --git a/src/lesson-02/protobuf/registry/schema_registry.py b/src/lesson-02/protobuf/registry/schema_registry.py index b4c673e..d39367d 100644 --- a/src/lesson-02/protobuf/registry/schema_registry.py +++ b/src/lesson-02/protobuf/registry/schema_registry.py @@ -1,5 +1,5 @@ from confluent_kafka.schema_registry import SchemaRegistryClient, Schema -from abstract_classes import SchemaRegistryInterface +from .abstract_classes import SchemaRegistryInterface class SchemaRegistry(SchemaRegistryInterface): @@ -9,8 +9,16 @@ def __init__(self, url: str): def get_latest_schema(self, subject: str): return self.client.get_latest_version(subject) + def get_schema(self, subject: str, version: int = None): + if version is None: + return self.get_latest_schema(subject) + return self.client.get_version(subject, version) + def register_schema(self, subject: str, schema_str: str, schema_type: str): schema = Schema(schema_str, schema_type) schema_id = self.client.register_schema(subject, schema) print(f"Schema registered with ID: {schema_id}") return schema_id + + def get_versions(self, subject: str): + return self.client.get_versions(subject)