Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ faust-streaming==0.11.0

# Required for Schema Registry
confluent-kafka==2.4.0
protobuf==5.27.0
protobuf==5.27.0
requests==2.32.2
protoc-wheel-0==25.3
3 changes: 1 addition & 2 deletions src/lesson-02/faust_consumer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import faust
from protobuf.protos.user_pb2 import User
from protobuf.serializers.protobufcodec import ProtobufCodec
from google.protobuf.message import Message
from typing import Any


user_codec = ProtobufCodec(User)

Expand Down
26 changes: 26 additions & 0 deletions src/lesson-02/faust_consumer_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import faust
from protobuf.registry.schema_registry import SchemaRegistry
from protobuf.registry.protobuf_factory import ProtobufFactory
from protobuf.serializers.protobufcodec import ProtobufCodec

schema_registry_url = 'http://localhost:8081'
subject = 'users'
schema_version = 1 # Specify the version of the schema you want to use, or set to None if not using version

schema_registry = SchemaRegistry(schema_registry_url)
protobuf_factory = ProtobufFactory(schema_registry)

# Fetch the User message class from the registry
User = protobuf_factory.create_message_class(subject, 'User', version=schema_version)
user_codec = ProtobufCodec(User, version=schema_version)

app = faust.App('myapp_consumer', broker='kafka://localhost:9092', web_port=6068)
topic = app.topic('users', key_serializer=None, value_serializer=user_codec)

@app.agent(topic)
async def process(users):
async for user in users:
print(f'Received user: {user.name}, {user.age}, {user.email}, {user.interests}')

if __name__ == '__main__':
app.main()
2 changes: 0 additions & 2 deletions src/lesson-02/faust_producer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import faust
from protobuf.protos.user_pb2 import User
from protobuf.serializers.protobufcodec import ProtobufCodec
from google.protobuf.message import Message
from typing import Any


user_codec = ProtobufCodec(User)
Expand Down
26 changes: 26 additions & 0 deletions src/lesson-02/faust_producer_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import faust
from protobuf.registry.schema_registry import SchemaRegistry
from protobuf.registry.protobuf_factory import ProtobufFactory
from protobuf.serializers.protobufcodec import ProtobufCodec

schema_registry_url = 'http://localhost:8081'
subject = 'users'
schema_version = 1 # Specify the version of the schema you want to use

schema_registry = SchemaRegistry(schema_registry_url)
protobuf_factory = ProtobufFactory(schema_registry)

User = protobuf_factory.create_message_class(subject, 'User', version=schema_version)
user_codec = ProtobufCodec(User)

app = faust.App('myapp_producer', broker='kafka://localhost:9092', web_port=6067)
topic = app.topic('users', value_type=bytes, value_serializer=user_codec)

@app.timer(interval=1.0)
async def produce():
user = User(name='John Doe', age=30, email='john@example.com', interests=['hiking', 'reading'])
await topic.send(value=user)
print(f'Sent user: {user.name}, {user.age}, {user.email}, {user.interests}')

if __name__ == '__main__':
app.main()
14 changes: 11 additions & 3 deletions src/lesson-02/protobuf/registry/abstract_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,26 @@ def get_latest_schema(self, subject: str):
def register_schema(self, subject: str, schema_str: str, schema_type: str):
pass

@abstractmethod
def get_schema(self, subject: str, version: int = None):
pass

@abstractmethod
def get_versions(self, subject: str):
pass


class SchemaFactoryInterface(ABC):
@abstractmethod
def create_serializer(self, subject: str):
def create_serializer(self, subject: str, version: int = None):
pass

@abstractmethod
def create_deserializer(self, subject: str):
def create_deserializer(self, subject: str, version: int = None):
pass

@abstractmethod
def create_message_class(self, subject: str, message_name: str = None):
def create_message_class(self, subject: str, message_name: str = None, version: int = None):
pass

@abstractmethod
Expand Down
4 changes: 2 additions & 2 deletions src/lesson-02/protobuf/registry/avro_factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from abstract_classes import SchemaFactoryInterface, SchemaRegistryInterface
from schema_registry import SchemaRegistry
from .abstract_classes import SchemaFactoryInterface, SchemaRegistryInterface
from .schema_registry import SchemaRegistry


class AvroFactory(SchemaFactoryInterface):
Expand Down
4 changes: 2 additions & 2 deletions src/lesson-02/protobuf/registry/json_schema_factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from confluent_kafka.schema_registry.json_schema import JSONSerializer, JSONDeserializer
from abstract_classes import SchemaFactoryInterface
from schema_registry import SchemaRegistry
from .abstract_classes import SchemaFactoryInterface
from .schema_registry import SchemaRegistry


class JSONSchemaFactory(SchemaFactoryInterface):
Expand Down
24 changes: 10 additions & 14 deletions src/lesson-02/protobuf/registry/proto_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ def register_protobuf_schema(factory, subject, schema_path):
factory.register_schema(subject, schema_path)


def create_serializer(factory, subject):
return factory.create_serializer(subject)
def create_serializer(factory, subject, version=None):
return factory.create_serializer(subject, version)


def create_deserializer(factory, subject):
return factory.create_deserializer(subject)
def create_deserializer(factory, subject, version=None):
return factory.create_deserializer(subject, version)


def create_message_class(factory, subject, message_name):
return factory.create_message_class(subject, message_name)
def create_message_class(factory, subject, message_name, version=None):
return factory.create_message_class(subject, message_name, version)


def main():
Expand All @@ -32,9 +32,9 @@ def main():
parser.add_argument("--subject", type=str, required=True, help="Schema subject")
parser.add_argument("--schema_path", type=str, help="Path to schema file for registration")
parser.add_argument("--message_name", type=str, help="Message name for creating message class")
parser.add_argument("--version", type=int, help="Schema version")
parser.add_argument("--operation", type=str, required=True,
choices=["register", "serialize", "deserialize", "create_message"], help="Operation to perform")

args = parser.parse_args()

schema_registry = initialize_schema_registry(args.url)
Expand All @@ -47,23 +47,19 @@ def main():
print(f"Schema registered for subject: {args.subject}")

elif args.operation == "serialize":
serializer = create_serializer(protobuf_factory, args.subject)
serializer = create_serializer(protobuf_factory, args.subject, args.version)
print(f"Serializer created for subject: {args.subject}")

elif args.operation == "deserialize":
deserializer = create_deserializer(protobuf_factory, args.subject)
deserializer = create_deserializer(protobuf_factory, args.subject, args.version)
print(f"Deserializer created for subject: {args.subject}")

elif args.operation == "create_message":
if not args.message_name:
raise ValueError("Message name is required for creating message class")
MessageClass = create_message_class(protobuf_factory, args.subject, args.message_name)
MessageClass = create_message_class(protobuf_factory, args.subject, args.message_name, args.version)
print(f"Message class created for message: {args.message_name}")


if __name__ == "__main__":
main()


# python proto_tool.py --url "http://localhost:8081" --subject "users" --schema_path "../../../protos/user.proto" --operation register

56 changes: 42 additions & 14 deletions src/lesson-02/protobuf/registry/protobuf_factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import tempfile
import importlib.util
from google.protobuf import descriptor_pool, message_factory
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer, ProtobufDeserializer
from abstract_classes import SchemaFactoryInterface
from schema_registry import SchemaRegistry
from .abstract_classes import SchemaFactoryInterface
from .schema_registry import SchemaRegistry


class ProtobufFactory(SchemaFactoryInterface):
Expand All @@ -10,26 +13,51 @@ def __init__(self, schema_registry: SchemaRegistry):
self.pool = descriptor_pool.Default()
self.factories = {}

def create_serializer(self, subject: str):
schema_response = self.schema_registry.get_latest_schema(subject)
def create_serializer(self, subject: str, version: int = None):
schema_response = self.schema_registry.get_schema(subject, version)
schema_str = schema_response.schema.schema_str
return ProtobufSerializer(schema_str, self.schema_registry.client, {'use.deprecated.format': True})

def create_deserializer(self, subject: str):
schema_response = self.schema_registry.get_latest_schema(subject)
def create_deserializer(self, subject: str, version: int = None):
schema_response = self.schema_registry.get_schema(subject, version)
schema_str = schema_response.schema.schema_str
return ProtobufDeserializer(schema_str, self.schema_registry.client)

def create_message_class(self, subject: str, message_name: str = None):
if subject in self.factories:
return self.factories[subject].GetPrototype(self.factories[subject].message_types_by_name[message_name])
def compile_proto_from_string(self, proto_string):
with tempfile.TemporaryDirectory() as temp_dir:
proto_filename = os.path.join(temp_dir, "temp.proto")
# Write the .proto string to a file
with open(proto_filename, 'w') as temp_proto_file:
temp_proto_file.write(proto_string)
# Determine the path to the protobuf includes
proto_include = os.path.dirname(os.path.abspath(temp_proto_file.name))
# Compile the .proto file using protoc
protoc_command = f"protoc --proto_path={temp_dir} --proto_path={proto_include} --python_out={temp_dir} {proto_filename}"
os.system(protoc_command)
# Extract the generated Python module
py_module_name = "temp_pb2"
py_module_path = os.path.join(temp_dir, py_module_name + ".py")
# Load the generated module dynamically
spec = importlib.util.spec_from_file_location(py_module_name, py_module_path)
generated_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(generated_module)
return generated_module

schema_response = self.schema_registry.get_latest_schema(subject)
file_descriptor = self.pool.AddSerializedFile(schema_response.schema.schema_obj.SerializeToString())
message_descriptor = file_descriptor.message_types_by_name[message_name]
def create_message_class(self, subject: str, message_name: str = None, version: int = None):
subject_version_key = f"{subject}:{version}"
if subject_version_key in self.factories:
return self.factories[subject_version_key].GetPrototype(
self.factories[subject_version_key].message_types_by_name[message_name])
schema_response = self.schema_registry.get_schema(subject, version)
schema_str = schema_response.schema.schema_str
# Compile the .proto string and get the generated module
generated_module = self.compile_proto_from_string(schema_str)
# Get the message class from the generated module
message_class = getattr(generated_module, message_name)
# Create a MessageFactory and store it
factory = message_factory.MessageFactory(self.pool)
self.factories[subject] = factory
return factory.GetPrototype(message_descriptor)
self.factories[subject_version_key] = factory
return message_class

def register_schema(self, subject: str, schema_path: str):
with open(schema_path, 'r') as file:
Expand Down
6 changes: 3 additions & 3 deletions src/lesson-02/protobuf/registry/schema_factory_producer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abstract_classes import SchemaFactoryProducerInterface, SchemaFactoryInterface, SchemaRegistryInterface
from avro_factory import AvroFactory
from json_schema_factory import JSONSchemaFactory
from protobuf_factory import ProtobufFactory
from .avro_factory import AvroFactory
from .json_schema_factory import JSONSchemaFactory
from .protobuf_factory import ProtobufFactory


class SchemaFactoryProducer(SchemaFactoryProducerInterface):
Expand Down
10 changes: 9 additions & 1 deletion src/lesson-02/protobuf/registry/schema_registry.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from abstract_classes import SchemaRegistryInterface
from .abstract_classes import SchemaRegistryInterface


class SchemaRegistry(SchemaRegistryInterface):
Expand All @@ -9,8 +9,16 @@ def __init__(self, url: str):
def get_latest_schema(self, subject: str):
return self.client.get_latest_version(subject)

def get_schema(self, subject: str, version: int = None):
if version is None:
return self.get_latest_schema(subject)
return self.client.get_version(subject, version)

def register_schema(self, subject: str, schema_str: str, schema_type: str):
schema = Schema(schema_str, schema_type)
schema_id = self.client.register_schema(subject, schema)
print(f"Schema registered with ID: {schema_id}")
return schema_id

def get_versions(self, subject: str):
return self.client.get_versions(subject)