From 520b95001d35ddd7f26cb6c979717fb75e70fd79 Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 00:19:01 -0500 Subject: [PATCH 01/14] chore: Add requests library to requirements The requests library was added to the base requirements file. This will ensure that it is installed during the project setup, which is crucial for making HTTP requests within the application. --- requirements/base.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements/base.txt b/requirements/base.txt index e35beba..b24947a 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -3,4 +3,5 @@ faust-streaming==0.11.0 # Required for Schema Registry confluent-kafka==2.4.0 -protobuf==5.27.0 \ No newline at end of file +protobuf==5.27.0 +requests==2.32.2 \ No newline at end of file From 2a575d8107be5b24ad9a923f09e76ccd747cee9c Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 00:21:08 -0500 Subject: [PATCH 02/14] refactor: Remove unused imports from faust producer and consumer The unused imports 'Message' from google.protobuf.message and 'Any' from typing have been removed from both, the faust_producer.py and faust_consumer.py files. Cleanup improves code readability and avoids unnecessary clutter. --- src/lesson-02/faust_consumer.py | 3 +-- src/lesson-02/faust_producer.py | 2 -- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/lesson-02/faust_consumer.py b/src/lesson-02/faust_consumer.py index d410179..c1d22db 100644 --- a/src/lesson-02/faust_consumer.py +++ b/src/lesson-02/faust_consumer.py @@ -1,8 +1,7 @@ import faust from protobuf.protos.user_pb2 import User from protobuf.serializers.protobufcodec import ProtobufCodec -from google.protobuf.message import Message -from typing import Any + user_codec = ProtobufCodec(User) diff --git a/src/lesson-02/faust_producer.py b/src/lesson-02/faust_producer.py index 51de9c6..2afc7f9 100644 --- a/src/lesson-02/faust_producer.py +++ b/src/lesson-02/faust_producer.py @@ -1,8 +1,6 @@ import faust from protobuf.protos.user_pb2 import User from protobuf.serializers.protobufcodec import ProtobufCodec -from google.protobuf.message import Message -from typing import Any user_codec = ProtobufCodec(User) From 8de9fc121c18e3a2f15f61b1bec453bc520365b7 Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 02:01:27 -0500 Subject: [PATCH 03/14] fix: Add .proto string compilation to ProtobufFactory. This commit includes an additional function compile_proto_from_string that allows the system to compile a .proto string at runtime. Additionally, it has changed how the system retrieves and uses messages in create_message_class by utilizing the new function. The goal of these adjustments is to enhance flexibility and expand compatibility with a variety of schema objects. --- .../protobuf/registry/protobuf_factory.py | 48 ++++++++++++++++--- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/src/lesson-02/protobuf/registry/protobuf_factory.py b/src/lesson-02/protobuf/registry/protobuf_factory.py index 84d2bf7..0006464 100644 --- a/src/lesson-02/protobuf/registry/protobuf_factory.py +++ b/src/lesson-02/protobuf/registry/protobuf_factory.py @@ -1,8 +1,10 @@ +import os +import tempfile +import importlib.util from google.protobuf import descriptor_pool, message_factory from confluent_kafka.schema_registry.protobuf import ProtobufSerializer, ProtobufDeserializer -from abstract_classes import SchemaFactoryInterface -from schema_registry import SchemaRegistry - +from .abstract_classes import SchemaFactoryInterface +from .schema_registry import SchemaRegistry class ProtobufFactory(SchemaFactoryInterface): def __init__(self, schema_registry: SchemaRegistry): @@ -20,16 +22,50 @@ def create_deserializer(self, subject: str): schema_str = schema_response.schema.schema_str return ProtobufDeserializer(schema_str, self.schema_registry.client) + def compile_proto_from_string(self, proto_string): + with tempfile.TemporaryDirectory() as temp_dir: + proto_filename = os.path.join(temp_dir, "temp.proto") + + # Write the .proto string to a file + with open(proto_filename, 'w') as temp_proto_file: + temp_proto_file.write(proto_string) + + # Determine the path to the protobuf includes + proto_include = os.path.dirname(os.path.abspath(temp_proto_file.name)) + + # Compile the .proto file using protoc + protoc_command = f"protoc --proto_path={temp_dir} --proto_path={proto_include} --python_out={temp_dir} {proto_filename}" + os.system(protoc_command) + + # Extract the generated Python module + py_module_name = "temp_pb2" + py_module_path = os.path.join(temp_dir, py_module_name + ".py") + + # Load the generated module dynamically + spec = importlib.util.spec_from_file_location(py_module_name, py_module_path) + generated_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(generated_module) + + return generated_module + def create_message_class(self, subject: str, message_name: str = None): if subject in self.factories: return self.factories[subject].GetPrototype(self.factories[subject].message_types_by_name[message_name]) schema_response = self.schema_registry.get_latest_schema(subject) - file_descriptor = self.pool.AddSerializedFile(schema_response.schema.schema_obj.SerializeToString()) - message_descriptor = file_descriptor.message_types_by_name[message_name] + schema_str = schema_response.schema.schema_str + + # Compile the .proto string and get the generated module + generated_module = self.compile_proto_from_string(schema_str) + + # Get the message class from the generated module + message_class = getattr(generated_module, message_name) + + # Create a MessageFactory and store it factory = message_factory.MessageFactory(self.pool) self.factories[subject] = factory - return factory.GetPrototype(message_descriptor) + + return message_class def register_schema(self, subject: str, schema_path: str): with open(schema_path, 'r') as file: From 159e29eec4d1f4f015fa687e683c007c3eb6a3d7 Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 02:01:42 -0500 Subject: [PATCH 04/14] fix: Update module import paths in schema_factory_producer.py The import paths for AvroFactory, JSONSchemaFactory, and ProtobufFactory classes in schema_factory_producer.py have been updated. The change was made to ensure that the relative import paths are used, which helps maintain compatibility in different execution environments. --- src/lesson-02/protobuf/registry/schema_factory_producer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lesson-02/protobuf/registry/schema_factory_producer.py b/src/lesson-02/protobuf/registry/schema_factory_producer.py index e42c89b..6228944 100644 --- a/src/lesson-02/protobuf/registry/schema_factory_producer.py +++ b/src/lesson-02/protobuf/registry/schema_factory_producer.py @@ -1,7 +1,7 @@ from abstract_classes import SchemaFactoryProducerInterface, SchemaFactoryInterface, SchemaRegistryInterface -from avro_factory import AvroFactory -from json_schema_factory import JSONSchemaFactory -from protobuf_factory import ProtobufFactory +from .avro_factory import AvroFactory +from .json_schema_factory import JSONSchemaFactory +from .protobuf_factory import ProtobufFactory class SchemaFactoryProducer(SchemaFactoryProducerInterface): From 41102738fb0764dfa81442d336c25e349dc61914 Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 02:01:52 -0500 Subject: [PATCH 05/14] fix: Update import path for SchemaRegistryInterface Changed the import path for SchemaRegistryInterface in schema_registry.py file. This is a correction from a direct import to a relative import, ensuring the application can find and import this module properly. --- src/lesson-02/protobuf/registry/schema_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lesson-02/protobuf/registry/schema_registry.py b/src/lesson-02/protobuf/registry/schema_registry.py index b4c673e..e7b7e22 100644 --- a/src/lesson-02/protobuf/registry/schema_registry.py +++ b/src/lesson-02/protobuf/registry/schema_registry.py @@ -1,5 +1,5 @@ from confluent_kafka.schema_registry import SchemaRegistryClient, Schema -from abstract_classes import SchemaRegistryInterface +from .abstract_classes import SchemaRegistryInterface class SchemaRegistry(SchemaRegistryInterface): From ad7f67025f1c4d5c96a55e992d6e12e8fd7e21af Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 02:02:00 -0500 Subject: [PATCH 06/14] fix: Update import statements in JSONSchemaFactory The changes in JSONSchemaFactory class in the 'src/lesson-02/protobuf/registry' directory involve updates to the import statements, replacing 'abstract_classes' and 'schema_registry' with '.abstract_classes' and '.schema_registry'. This makes the imports more compatible with Python's relative import statement format. --- src/lesson-02/protobuf/registry/json_schema_factory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lesson-02/protobuf/registry/json_schema_factory.py b/src/lesson-02/protobuf/registry/json_schema_factory.py index 84a00e3..b85e2cd 100644 --- a/src/lesson-02/protobuf/registry/json_schema_factory.py +++ b/src/lesson-02/protobuf/registry/json_schema_factory.py @@ -1,6 +1,6 @@ from confluent_kafka.schema_registry.json_schema import JSONSerializer, JSONDeserializer -from abstract_classes import SchemaFactoryInterface -from schema_registry import SchemaRegistry +from .abstract_classes import SchemaFactoryInterface +from .schema_registry import SchemaRegistry class JSONSchemaFactory(SchemaFactoryInterface): From 241ddc03e19c2fc1d29478b3d26f83e728a99391 Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 02:02:10 -0500 Subject: [PATCH 07/14] fix: Update import statements in AvroFactory Corrected the import paths for SchemaFactoryInterface, SchemaRegistryInterface, and SchemaRegistry in the AvroFactory class in 'lesson-02'. These are now relative imports to ensure they are correctly referenced within the same package. --- src/lesson-02/protobuf/registry/avro_factory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lesson-02/protobuf/registry/avro_factory.py b/src/lesson-02/protobuf/registry/avro_factory.py index 3d5b262..cfcedcd 100644 --- a/src/lesson-02/protobuf/registry/avro_factory.py +++ b/src/lesson-02/protobuf/registry/avro_factory.py @@ -1,6 +1,6 @@ from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer -from abstract_classes import SchemaFactoryInterface, SchemaRegistryInterface -from schema_registry import SchemaRegistry +from .abstract_classes import SchemaFactoryInterface, SchemaRegistryInterface +from .schema_registry import SchemaRegistry class AvroFactory(SchemaFactoryInterface): From 4175d446c18f3e413bd858807116315ec7007074 Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 02:02:23 -0500 Subject: [PATCH 08/14] chore: Add protoc-wheel dependency to the requirements The protoc-wheel Python library has been added to the requirements file. This addition helps facilitate easier Protocol Buffers compilation. --- requirements/base.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements/base.txt b/requirements/base.txt index b24947a..c717de6 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -4,4 +4,5 @@ faust-streaming==0.11.0 # Required for Schema Registry confluent-kafka==2.4.0 protobuf==5.27.0 -requests==2.32.2 \ No newline at end of file +requests==2.32.2 +protoc-wheel-0==25.3 \ No newline at end of file From 0f33044aad85c6379146f0dbe5e073613c2655c9 Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 02:08:02 -0500 Subject: [PATCH 09/14] style: Add an extra line before class declaration in protobuf_factory.py A blank line has been inserted before the ProtobufFactory class declaration in protobuf_factory.py for better code readability and adherence to Python's PEP 8 style guide. --- src/lesson-02/protobuf/registry/protobuf_factory.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lesson-02/protobuf/registry/protobuf_factory.py b/src/lesson-02/protobuf/registry/protobuf_factory.py index 0006464..9fd0265 100644 --- a/src/lesson-02/protobuf/registry/protobuf_factory.py +++ b/src/lesson-02/protobuf/registry/protobuf_factory.py @@ -6,6 +6,7 @@ from .abstract_classes import SchemaFactoryInterface from .schema_registry import SchemaRegistry + class ProtobufFactory(SchemaFactoryInterface): def __init__(self, schema_registry: SchemaRegistry): self.schema_registry = schema_registry From 82ad84bd04b6f9e37d20ce8b944814173e41549b Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 02:09:53 -0500 Subject: [PATCH 10/14] feat: Add Faust producer with user protobuf serialization A Faust producer has been implemented to publish User instances, serialized into protobuf, to a Kafka topic. This producer creates 'User' instances using a generated protobuf message class and sets the serialization codec accordingly for key/value pairs. The producer will then publish User instances periodically using a timer. --- src/lesson-02/faust_producer_registry.py | 29 ++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 src/lesson-02/faust_producer_registry.py diff --git a/src/lesson-02/faust_producer_registry.py b/src/lesson-02/faust_producer_registry.py new file mode 100644 index 0000000..13f66f2 --- /dev/null +++ b/src/lesson-02/faust_producer_registry.py @@ -0,0 +1,29 @@ +import faust +from protobuf.registry.schema_registry import SchemaRegistry +from protobuf.registry.protobuf_factory import ProtobufFactory +from protobuf.serializers.protobufcodec import ProtobufCodec + + +schema_registry_url = 'http://localhost:8081' +subject = 'users' + +schema_registry = SchemaRegistry(schema_registry_url) +protobuf_factory = ProtobufFactory(schema_registry) + + +User = protobuf_factory.create_message_class(subject, 'User') +user_codec = ProtobufCodec(User) + +app = faust.App('myapp_producer', broker='kafka://localhost:9092', web_port=6067) +topic = app.topic('users', value_type=bytes, key_serializer=user_codec, value_serializer=user_codec) + + +@app.timer(interval=1.0) +async def produce(): + user = User(name='John Doe', age=30, email='john@example.com', interests=['hiking', 'reading']) + await topic.send(value=user) + print(f'Sent user: {user.name}, {user.age}, {user.email}, {user.interests}') + + +if __name__ == '__main__': + app.main() From 6d0beb2f3bd0911cb126db85e738057e15e91781 Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 02:30:53 -0500 Subject: [PATCH 11/14] feat: Specify schema version in create_message_class function The create_message_class function has been updated to take the version of the schema we want to use. This modification was made in the faust_producer_registry.py file, in which 'schema_version' was added as a parameter, and now it's passed as an argument to the create_message_class function. This ensures we are using the correct version of the schema when creating our message class. --- src/lesson-02/faust_producer_registry.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/lesson-02/faust_producer_registry.py b/src/lesson-02/faust_producer_registry.py index 13f66f2..fca7722 100644 --- a/src/lesson-02/faust_producer_registry.py +++ b/src/lesson-02/faust_producer_registry.py @@ -3,15 +3,14 @@ from protobuf.registry.protobuf_factory import ProtobufFactory from protobuf.serializers.protobufcodec import ProtobufCodec - schema_registry_url = 'http://localhost:8081' subject = 'users' +schema_version = 1 # Specify the version of the schema you want to use schema_registry = SchemaRegistry(schema_registry_url) protobuf_factory = ProtobufFactory(schema_registry) - -User = protobuf_factory.create_message_class(subject, 'User') +User = protobuf_factory.create_message_class(subject, 'User', version=schema_version) user_codec = ProtobufCodec(User) app = faust.App('myapp_producer', broker='kafka://localhost:9092', web_port=6067) From 0d751e1d67b85f8effcb391ec959cce88854a88d Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 02:31:21 -0500 Subject: [PATCH 12/14] feat: Add versioning support to protobuf toolset This commit adds optional version parameter to serialization/deserialization methods within the protobuf factory, allowing specific versioning when necessary. The command line interface has been updated for specified version operations, and additional methods have been implemented in the schema registry to handle schema version queries. --- .../protobuf/registry/abstract_classes.py | 14 +++++++-- src/lesson-02/protobuf/registry/proto_tool.py | 24 ++++++-------- .../protobuf/registry/protobuf_factory.py | 31 +++++++------------ .../protobuf/registry/schema_registry.py | 8 +++++ 4 files changed, 40 insertions(+), 37 deletions(-) diff --git a/src/lesson-02/protobuf/registry/abstract_classes.py b/src/lesson-02/protobuf/registry/abstract_classes.py index 9fd31fe..c61a625 100644 --- a/src/lesson-02/protobuf/registry/abstract_classes.py +++ b/src/lesson-02/protobuf/registry/abstract_classes.py @@ -10,18 +10,26 @@ def get_latest_schema(self, subject: str): def register_schema(self, subject: str, schema_str: str, schema_type: str): pass + @abstractmethod + def get_schema(self, subject: str, version: int = None): + pass + + @abstractmethod + def get_versions(self, subject: str): + pass + class SchemaFactoryInterface(ABC): @abstractmethod - def create_serializer(self, subject: str): + def create_serializer(self, subject: str, version: int = None): pass @abstractmethod - def create_deserializer(self, subject: str): + def create_deserializer(self, subject: str, version: int = None): pass @abstractmethod - def create_message_class(self, subject: str, message_name: str = None): + def create_message_class(self, subject: str, message_name: str = None, version: int = None): pass @abstractmethod diff --git a/src/lesson-02/protobuf/registry/proto_tool.py b/src/lesson-02/protobuf/registry/proto_tool.py index 112aa19..8609b9d 100644 --- a/src/lesson-02/protobuf/registry/proto_tool.py +++ b/src/lesson-02/protobuf/registry/proto_tool.py @@ -14,16 +14,16 @@ def register_protobuf_schema(factory, subject, schema_path): factory.register_schema(subject, schema_path) -def create_serializer(factory, subject): - return factory.create_serializer(subject) +def create_serializer(factory, subject, version=None): + return factory.create_serializer(subject, version) -def create_deserializer(factory, subject): - return factory.create_deserializer(subject) +def create_deserializer(factory, subject, version=None): + return factory.create_deserializer(subject, version) -def create_message_class(factory, subject, message_name): - return factory.create_message_class(subject, message_name) +def create_message_class(factory, subject, message_name, version=None): + return factory.create_message_class(subject, message_name, version) def main(): @@ -32,9 +32,9 @@ def main(): parser.add_argument("--subject", type=str, required=True, help="Schema subject") parser.add_argument("--schema_path", type=str, help="Path to schema file for registration") parser.add_argument("--message_name", type=str, help="Message name for creating message class") + parser.add_argument("--version", type=int, help="Schema version") parser.add_argument("--operation", type=str, required=True, choices=["register", "serialize", "deserialize", "create_message"], help="Operation to perform") - args = parser.parse_args() schema_registry = initialize_schema_registry(args.url) @@ -47,23 +47,19 @@ def main(): print(f"Schema registered for subject: {args.subject}") elif args.operation == "serialize": - serializer = create_serializer(protobuf_factory, args.subject) + serializer = create_serializer(protobuf_factory, args.subject, args.version) print(f"Serializer created for subject: {args.subject}") elif args.operation == "deserialize": - deserializer = create_deserializer(protobuf_factory, args.subject) + deserializer = create_deserializer(protobuf_factory, args.subject, args.version) print(f"Deserializer created for subject: {args.subject}") elif args.operation == "create_message": if not args.message_name: raise ValueError("Message name is required for creating message class") - MessageClass = create_message_class(protobuf_factory, args.subject, args.message_name) + MessageClass = create_message_class(protobuf_factory, args.subject, args.message_name, args.version) print(f"Message class created for message: {args.message_name}") if __name__ == "__main__": main() - - -# python proto_tool.py --url "http://localhost:8081" --subject "users" --schema_path "../../../protos/user.proto" --operation register - diff --git a/src/lesson-02/protobuf/registry/protobuf_factory.py b/src/lesson-02/protobuf/registry/protobuf_factory.py index 9fd0265..f3040f5 100644 --- a/src/lesson-02/protobuf/registry/protobuf_factory.py +++ b/src/lesson-02/protobuf/registry/protobuf_factory.py @@ -13,59 +13,50 @@ def __init__(self, schema_registry: SchemaRegistry): self.pool = descriptor_pool.Default() self.factories = {} - def create_serializer(self, subject: str): - schema_response = self.schema_registry.get_latest_schema(subject) + def create_serializer(self, subject: str, version: int = None): + schema_response = self.schema_registry.get_schema(subject, version) schema_str = schema_response.schema.schema_str return ProtobufSerializer(schema_str, self.schema_registry.client, {'use.deprecated.format': True}) - def create_deserializer(self, subject: str): - schema_response = self.schema_registry.get_latest_schema(subject) + def create_deserializer(self, subject: str, version: int = None): + schema_response = self.schema_registry.get_schema(subject, version) schema_str = schema_response.schema.schema_str return ProtobufDeserializer(schema_str, self.schema_registry.client) def compile_proto_from_string(self, proto_string): with tempfile.TemporaryDirectory() as temp_dir: proto_filename = os.path.join(temp_dir, "temp.proto") - # Write the .proto string to a file with open(proto_filename, 'w') as temp_proto_file: temp_proto_file.write(proto_string) - # Determine the path to the protobuf includes proto_include = os.path.dirname(os.path.abspath(temp_proto_file.name)) - # Compile the .proto file using protoc protoc_command = f"protoc --proto_path={temp_dir} --proto_path={proto_include} --python_out={temp_dir} {proto_filename}" os.system(protoc_command) - # Extract the generated Python module py_module_name = "temp_pb2" py_module_path = os.path.join(temp_dir, py_module_name + ".py") - # Load the generated module dynamically spec = importlib.util.spec_from_file_location(py_module_name, py_module_path) generated_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(generated_module) - return generated_module - def create_message_class(self, subject: str, message_name: str = None): - if subject in self.factories: - return self.factories[subject].GetPrototype(self.factories[subject].message_types_by_name[message_name]) - - schema_response = self.schema_registry.get_latest_schema(subject) + def create_message_class(self, subject: str, message_name: str = None, version: int = None): + subject_version_key = f"{subject}:{version}" + if subject_version_key in self.factories: + return self.factories[subject_version_key].GetPrototype( + self.factories[subject_version_key].message_types_by_name[message_name]) + schema_response = self.schema_registry.get_schema(subject, version) schema_str = schema_response.schema.schema_str - # Compile the .proto string and get the generated module generated_module = self.compile_proto_from_string(schema_str) - # Get the message class from the generated module message_class = getattr(generated_module, message_name) - # Create a MessageFactory and store it factory = message_factory.MessageFactory(self.pool) - self.factories[subject] = factory - + self.factories[subject_version_key] = factory return message_class def register_schema(self, subject: str, schema_path: str): diff --git a/src/lesson-02/protobuf/registry/schema_registry.py b/src/lesson-02/protobuf/registry/schema_registry.py index e7b7e22..d39367d 100644 --- a/src/lesson-02/protobuf/registry/schema_registry.py +++ b/src/lesson-02/protobuf/registry/schema_registry.py @@ -9,8 +9,16 @@ def __init__(self, url: str): def get_latest_schema(self, subject: str): return self.client.get_latest_version(subject) + def get_schema(self, subject: str, version: int = None): + if version is None: + return self.get_latest_schema(subject) + return self.client.get_version(subject, version) + def register_schema(self, subject: str, schema_str: str, schema_type: str): schema = Schema(schema_str, schema_type) schema_id = self.client.register_schema(subject, schema) print(f"Schema registered with ID: {schema_id}") return schema_id + + def get_versions(self, subject: str): + return self.client.get_versions(subject) From 6b04634cd29080f26712754841fca5bc4189ecb6 Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 03:07:39 -0500 Subject: [PATCH 13/14] refactor: Refactor topic definition in faust producer The key_serializer was removed from the topic definition to improve code efficiency. The value_serializer was left in place to handle serialization of the user data. --- src/lesson-02/faust_producer_registry.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/lesson-02/faust_producer_registry.py b/src/lesson-02/faust_producer_registry.py index fca7722..b8f9def 100644 --- a/src/lesson-02/faust_producer_registry.py +++ b/src/lesson-02/faust_producer_registry.py @@ -14,8 +14,7 @@ user_codec = ProtobufCodec(User) app = faust.App('myapp_producer', broker='kafka://localhost:9092', web_port=6067) -topic = app.topic('users', value_type=bytes, key_serializer=user_codec, value_serializer=user_codec) - +topic = app.topic('users', value_type=bytes, value_serializer=user_codec) @app.timer(interval=1.0) async def produce(): @@ -23,6 +22,5 @@ async def produce(): await topic.send(value=user) print(f'Sent user: {user.name}, {user.age}, {user.email}, {user.interests}') - if __name__ == '__main__': app.main() From 2d43fe9826a6a299df356e9efaf5728d9d7954b4 Mon Sep 17 00:00:00 2001 From: Joshua Magady Date: Tue, 28 May 2024 03:18:49 -0500 Subject: [PATCH 14/14] feat: Add Faust consumer with Protobuf support A Faust consumer has been added to the project utilizing Protobuf for serialized communication in Kafka. The script is set up to receive and process 'User' messages on the 'users' topic from a local Kafka broker. The 'User' message class is fetched dynamically from the schema registry. --- src/lesson-02/faust_consumer_registry.py | 26 ++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 src/lesson-02/faust_consumer_registry.py diff --git a/src/lesson-02/faust_consumer_registry.py b/src/lesson-02/faust_consumer_registry.py new file mode 100644 index 0000000..bdf5480 --- /dev/null +++ b/src/lesson-02/faust_consumer_registry.py @@ -0,0 +1,26 @@ +import faust +from protobuf.registry.schema_registry import SchemaRegistry +from protobuf.registry.protobuf_factory import ProtobufFactory +from protobuf.serializers.protobufcodec import ProtobufCodec + +schema_registry_url = 'http://localhost:8081' +subject = 'users' +schema_version = 1 # Specify the version of the schema you want to use, or set to None if not using version + +schema_registry = SchemaRegistry(schema_registry_url) +protobuf_factory = ProtobufFactory(schema_registry) + +# Fetch the User message class from the registry +User = protobuf_factory.create_message_class(subject, 'User', version=schema_version) +user_codec = ProtobufCodec(User, version=schema_version) + +app = faust.App('myapp_consumer', broker='kafka://localhost:9092', web_port=6068) +topic = app.topic('users', key_serializer=None, value_serializer=user_codec) + +@app.agent(topic) +async def process(users): + async for user in users: + print(f'Received user: {user.name}, {user.age}, {user.email}, {user.interests}') + +if __name__ == '__main__': + app.main()