Skip to content

Commit e1afd8e

Browse files
committed
address comments
1 parent 1b4f04a commit e1afd8e

5 files changed

Lines changed: 79 additions & 88 deletions

File tree

pulsar/schema/schema_protobuf.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,24 @@ class ProtobufNativeSchema(Schema):
9090
from my_proto_pb2 import MyMessage
9191
9292
client = pulsar.Client('pulsar://localhost:6650')
93-
producer = client.create_producer(
94-
'my-topic',
95-
schema=ProtobufNativeSchema(MyMessage)
96-
)
97-
producer.send(MyMessage(field='value'))
93+
schema = ProtobufNativeSchema(MyMessage)
94+
producer = client.create_producer('my-topic', schema=schema)
95+
consumer = client.subscribe('my-topic', 'my-sub', schema=schema)
96+
97+
message = MyMessage()
98+
message.field = 'value'
99+
producer.send(message)
100+
101+
received = consumer.receive(timeout_millis=5000)
102+
typed_value = received.value()
103+
consumer.acknowledge(received)
104+
105+
assert isinstance(typed_value, MyMessage)
106+
assert typed_value.field == 'value'
107+
108+
consumer.close()
109+
producer.close()
110+
client.close()
98111
"""
99112

100113
def __init__(self, record_cls):

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def build_extension(self, ext):
7979
# protobuf schema dependencies
8080
extras_require["protobuf"] = sorted(
8181
{
82-
"protobuf>=3.6.1",
82+
"protobuf>=6.33.6",
8383
}
8484
)
8585

tests/schema_test.py

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121
import base64
2222
import math
23-
import os
24-
import sys
2523
import requests
2624
from typing import List
2725
from unittest import TestCase, main
@@ -32,10 +30,67 @@
3230
from enum import Enum
3331
import json
3432
from fastavro.schema import load_schema
33+
from google.protobuf import descriptor_pb2, descriptor_pool, message_factory
34+
35+
36+
def _add_protobuf_field(message, name, number, field_type, type_name=None):
37+
field = message.field.add()
38+
field.name = name
39+
field.number = number
40+
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
41+
field.type = field_type
42+
if type_name:
43+
field.type_name = type_name
44+
45+
46+
def _get_message_classes(pool, message_names):
47+
if hasattr(message_factory, 'GetMessageClass'):
48+
return tuple(
49+
message_factory.GetMessageClass(pool.FindMessageTypeByName(message_name))
50+
for message_name in message_names
51+
)
52+
factory = message_factory.MessageFactory(pool)
53+
return tuple(
54+
factory.GetPrototype(pool.FindMessageTypeByName(message_name))
55+
for message_name in message_names
56+
)
57+
58+
59+
def _build_protobuf_test_messages():
60+
file_proto = descriptor_pb2.FileDescriptorProto()
61+
file_proto.name = 'test_schema.proto'
62+
file_proto.package = 'test'
63+
file_proto.syntax = 'proto3'
64+
65+
test_message = file_proto.message_type.add()
66+
test_message.name = 'TestMessage'
67+
_add_protobuf_field(test_message, 'name', 1, descriptor_pb2.FieldDescriptorProto.TYPE_STRING)
68+
_add_protobuf_field(test_message, 'value', 2, descriptor_pb2.FieldDescriptorProto.TYPE_INT32)
69+
70+
nested_message = file_proto.message_type.add()
71+
nested_message.name = 'TestMessageWithNested'
72+
_add_protobuf_field(nested_message, 'str_field', 1, descriptor_pb2.FieldDescriptorProto.TYPE_STRING)
73+
_add_protobuf_field(nested_message, 'int_field', 2, descriptor_pb2.FieldDescriptorProto.TYPE_INT32)
74+
_add_protobuf_field(nested_message, 'double_field', 3, descriptor_pb2.FieldDescriptorProto.TYPE_DOUBLE)
75+
_add_protobuf_field(
76+
nested_message, 'nested', 4, descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE, '.test.TestInner'
77+
)
78+
79+
inner_message = file_proto.message_type.add()
80+
inner_message.name = 'TestInner'
81+
_add_protobuf_field(inner_message, 'inner_str', 1, descriptor_pb2.FieldDescriptorProto.TYPE_STRING)
82+
_add_protobuf_field(inner_message, 'inner_int', 2, descriptor_pb2.FieldDescriptorProto.TYPE_INT64)
83+
84+
pool = descriptor_pool.DescriptorPool()
85+
pool.AddSerializedFile(file_proto.SerializeToString())
86+
return _get_message_classes(
87+
pool,
88+
('test.TestMessage', 'test.TestMessageWithNested', 'test.TestInner'),
89+
)
90+
91+
92+
TestMessage, TestMessageWithNested, TestInner = _build_protobuf_test_messages()
3593

36-
# Make generated protobuf test classes importable
37-
sys.path.insert(0, os.path.dirname(__file__))
38-
from test_schema_pb2 import TestMessage, TestMessageWithNested, TestInner
3994

4095
class ExampleRecord(Record):
4196
str_field = String()

tests/test_schema.proto

Lines changed: 0 additions & 37 deletions
This file was deleted.

tests/test_schema_pb2.py

Lines changed: 0 additions & 40 deletions
This file was deleted.

0 commit comments

Comments
 (0)