Skip to content

Commit 3f4b397

Browse files
committed
feat: implement ProtobufNative schema
1 parent 8ce83cf commit 3f4b397

6 files changed

Lines changed: 299 additions & 1 deletion

File tree

pulsar/schema/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@
2222

2323
from .schema import Schema, BytesSchema, StringSchema, JsonSchema
2424
from .schema_avro import AvroSchema
25+
from .schema_protobuf import ProtobufNativeSchema

pulsar/schema/schema_protobuf.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
import base64
21+
import _pulsar
22+
23+
from .schema import Schema
24+
25+
try:
26+
from google.protobuf import descriptor_pb2
27+
from google.protobuf.message import Message as ProtobufMessage
28+
HAS_PROTOBUF = True
29+
except ImportError:
30+
HAS_PROTOBUF = False
31+
32+
33+
def _collect_file_descriptors(file_descriptor, visited, file_descriptor_set):
34+
"""Recursively collect all FileDescriptorProto objects into file_descriptor_set."""
35+
if file_descriptor.name in visited:
36+
return
37+
for dep in file_descriptor.dependencies:
38+
_collect_file_descriptors(dep, visited, file_descriptor_set)
39+
visited.add(file_descriptor.name)
40+
proto = descriptor_pb2.FileDescriptorProto()
41+
file_descriptor.CopyToProto(proto)
42+
file_descriptor_set.file.append(proto)
43+
44+
45+
def _build_schema_definition(descriptor):
46+
"""
47+
Build the schema definition dict matching Java's ProtobufNativeSchemaData format:
48+
{
49+
"fileDescriptorSet": <base64-encoded FileDescriptorSet bytes>,
50+
"rootMessageTypeName": <full name of the root message>,
51+
"rootFileDescriptorName": <name of the root .proto file>
52+
}
53+
This mirrors ProtobufNativeSchemaUtils.serialize() in the Java client.
54+
"""
55+
file_descriptor_set = descriptor_pb2.FileDescriptorSet()
56+
_collect_file_descriptors(descriptor.file, set(), file_descriptor_set)
57+
file_descriptor_set_bytes = file_descriptor_set.SerializeToString()
58+
return {
59+
"fileDescriptorSet": base64.b64encode(file_descriptor_set_bytes).decode('utf-8'),
60+
"rootMessageTypeName": descriptor.full_name,
61+
"rootFileDescriptorName": descriptor.file.name,
62+
}
63+
64+
65+
if HAS_PROTOBUF:
66+
class ProtobufNativeSchema(Schema):
67+
"""
68+
Schema for protobuf messages using the native protobuf binary encoding.
69+
70+
The schema definition is stored as a JSON-encoded ProtobufNativeSchemaData
71+
(fileDescriptorSet, rootMessageTypeName, rootFileDescriptorName), which is
72+
compatible with the Java client's ProtobufNativeSchema.
73+
74+
Parameters
75+
----------
76+
record_cls:
77+
A generated protobuf message class (subclass of google.protobuf.message.Message).
78+
79+
Example
80+
-------
81+
.. code-block:: python
82+
83+
import pulsar
84+
from pulsar.schema import ProtobufNativeSchema
85+
from my_proto_pb2 import MyMessage
86+
87+
client = pulsar.Client('pulsar://localhost:6650')
88+
producer = client.create_producer(
89+
'my-topic',
90+
schema=ProtobufNativeSchema(MyMessage)
91+
)
92+
producer.send(MyMessage(field='value'))
93+
"""
94+
95+
def __init__(self, record_cls):
96+
if not (isinstance(record_cls, type) and issubclass(record_cls, ProtobufMessage)):
97+
raise TypeError(
98+
f'record_cls must be a protobuf Message subclass, got {record_cls!r}'
99+
)
100+
schema_definition = _build_schema_definition(record_cls.DESCRIPTOR)
101+
super(ProtobufNativeSchema, self).__init__(
102+
record_cls, _pulsar.SchemaType.PROTOBUF_NATIVE, schema_definition, 'PROTOBUF_NATIVE'
103+
)
104+
105+
def encode(self, obj):
106+
self._validate_object_type(obj)
107+
return obj.SerializeToString()
108+
109+
def decode(self, data):
110+
return self._record_cls.FromString(data)
111+
112+
def __str__(self):
113+
return f'ProtobufNativeSchema({self._record_cls.__name__})'
114+
115+
else:
116+
class ProtobufNativeSchema(Schema):
117+
def __init__(self, _record_cls=None):
118+
raise Exception(
119+
"protobuf library support was not found. "
120+
"Install it with: pip install protobuf"
121+
)
122+
123+
def encode(self, obj):
124+
pass
125+
126+
def decode(self, data):
127+
pass

src/enums.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ void export_enums(py::module_& m) {
115115
.value("AVRO", pulsar::AVRO)
116116
.value("AUTO_CONSUME", pulsar::AUTO_CONSUME)
117117
.value("AUTO_PUBLISH", pulsar::AUTO_PUBLISH)
118-
.value("KEY_VALUE", pulsar::KEY_VALUE);
118+
.value("KEY_VALUE", pulsar::KEY_VALUE)
119+
.value("PROTOBUF_NATIVE", pulsar::PROTOBUF_NATIVE);
119120

120121
enum_<InitialPosition>(m, "InitialPosition", "Supported initial position")
121122
.value("Latest", InitialPositionLatest)

tests/schema_test.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
# under the License.
1919
#
2020

21+
import base64
2122
import math
23+
import os
24+
import sys
2225
import requests
2326
from typing import List
2427
from unittest import TestCase, main
@@ -30,6 +33,10 @@
3033
import json
3134
from fastavro.schema import load_schema
3235

36+
# Make generated protobuf test classes importable
37+
sys.path.insert(0, os.path.dirname(__file__))
38+
from test_schema_pb2 import TestMessage, TestMessageWithNested, TestInner
39+
3340
class ExampleRecord(Record):
3441
str_field = String()
3542
int_field = Integer()
@@ -1404,5 +1411,90 @@ def test_schema_type_promotion(self):
14041411
client.close()
14051412

14061413

1414+
class ProtobufNativeSchemaTest(TestCase):
1415+
"""Unit tests for ProtobufNativeSchema (no Pulsar broker required)."""
1416+
1417+
def test_schema_type(self):
1418+
"""Schema type must be PROTOBUF_NATIVE."""
1419+
import _pulsar
1420+
schema = ProtobufNativeSchema(TestMessage)
1421+
self.assertEqual(schema.schema_info().schema_type(), _pulsar.SchemaType.PROTOBUF_NATIVE)
1422+
1423+
def test_schema_definition_keys(self):
1424+
"""Schema definition JSON must contain the three required keys."""
1425+
schema = ProtobufNativeSchema(TestMessage)
1426+
schema_def = json.loads(schema.schema_info().schema())
1427+
self.assertIn('fileDescriptorSet', schema_def)
1428+
self.assertIn('rootMessageTypeName', schema_def)
1429+
self.assertIn('rootFileDescriptorName', schema_def)
1430+
1431+
def test_schema_definition_values(self):
1432+
"""rootMessageTypeName and rootFileDescriptorName must match the descriptor."""
1433+
schema = ProtobufNativeSchema(TestMessage)
1434+
schema_def = json.loads(schema.schema_info().schema())
1435+
self.assertEqual(schema_def['rootMessageTypeName'], 'test.TestMessage')
1436+
self.assertEqual(schema_def['rootFileDescriptorName'], 'test_schema.proto')
1437+
1438+
def test_file_descriptor_set_is_valid_base64_proto(self):
1439+
"""fileDescriptorSet must be valid base64-encoded FileDescriptorSet bytes."""
1440+
from google.protobuf import descriptor_pb2
1441+
schema = ProtobufNativeSchema(TestMessage)
1442+
schema_def = json.loads(schema.schema_info().schema())
1443+
raw = base64.b64decode(schema_def['fileDescriptorSet'])
1444+
fds = descriptor_pb2.FileDescriptorSet.FromString(raw)
1445+
file_names = [f.name for f in fds.file]
1446+
self.assertIn('test_schema.proto', file_names)
1447+
1448+
def test_encode_decode_roundtrip(self):
1449+
"""encode then decode must reproduce the original message."""
1450+
schema = ProtobufNativeSchema(TestMessage)
1451+
original = TestMessage(name='hello', value=42)
1452+
encoded = schema.encode(original)
1453+
decoded = schema.decode(encoded)
1454+
self.assertEqual(decoded.name, 'hello')
1455+
self.assertEqual(decoded.value, 42)
1456+
1457+
def test_encode_produces_protobuf_binary(self):
1458+
"""Encoded bytes must be valid protobuf binary (parseable by the class directly)."""
1459+
schema = ProtobufNativeSchema(TestMessage)
1460+
msg = TestMessage(name='pulsar', value=100)
1461+
encoded = schema.encode(msg)
1462+
# Verify with protobuf's own parser
1463+
reparsed = TestMessage.FromString(encoded)
1464+
self.assertEqual(reparsed, msg)
1465+
1466+
def test_encode_decode_nested_message(self):
1467+
"""encode/decode round-trip works for messages containing nested message fields."""
1468+
schema = ProtobufNativeSchema(TestMessageWithNested)
1469+
original = TestMessageWithNested(
1470+
str_field='test',
1471+
int_field=7,
1472+
double_field=3.14,
1473+
nested=TestInner(inner_str='inner', inner_int=999),
1474+
)
1475+
decoded = schema.decode(schema.encode(original))
1476+
self.assertEqual(decoded.str_field, 'test')
1477+
self.assertEqual(decoded.int_field, 7)
1478+
self.assertAlmostEqual(decoded.double_field, 3.14)
1479+
self.assertEqual(decoded.nested.inner_str, 'inner')
1480+
self.assertEqual(decoded.nested.inner_int, 999)
1481+
1482+
def test_wrong_type_raises(self):
1483+
"""Encoding an object of the wrong type must raise TypeError."""
1484+
schema = ProtobufNativeSchema(TestMessage)
1485+
with self.assertRaises(TypeError):
1486+
schema.encode("not a protobuf message")
1487+
1488+
def test_non_message_class_raises(self):
1489+
"""Constructing with a non-Message class must raise TypeError."""
1490+
with self.assertRaises(TypeError):
1491+
ProtobufNativeSchema(str)
1492+
1493+
def test_schema_name(self):
1494+
"""Schema name must be 'PROTOBUF_NATIVE'."""
1495+
schema = ProtobufNativeSchema(TestMessage)
1496+
self.assertEqual(schema.schema_info().name(), 'PROTOBUF_NATIVE')
1497+
1498+
14071499
if __name__ == '__main__':
14081500
main()

tests/test_schema.proto

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
syntax = "proto3";
19+
20+
package test;
21+
22+
message TestMessage {
23+
string name = 1;
24+
int32 value = 2;
25+
}
26+
27+
message TestMessageWithNested {
28+
string str_field = 1;
29+
int32 int_field = 2;
30+
double double_field = 3;
31+
TestInner nested = 4;
32+
}
33+
34+
message TestInner {
35+
string inner_str = 1;
36+
int64 inner_int = 2;
37+
}

tests/test_schema_pb2.py

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)