Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .eslintignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@

examples
perf
tests/protobuf_schema/generated
16 changes: 16 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,21 @@ export interface SchemaInfo {
properties?: Record<string, string>;
}

export namespace ProtobufNativeSchema {
interface CreateSchemaInfoFromRootOptions {
root: any;
rootMessageTypeName: string;
rootFileDescriptorName: string;
schemaType?: 'ProtobufNative';
syntax?: 'proto3' | 'proto2' | string;
name?: string;
properties?: Record<string, string>;
}

function createRootFromJson(rootJson: Record<string, unknown>): any;
function createSchemaInfoFromRoot(options: CreateSchemaInfoFromRootOptions): SchemaInfo;
}

export interface DeadLetterPolicy {
deadLetterTopic: string;
maxRedeliverCount?: number;
Expand Down Expand Up @@ -385,6 +400,7 @@ export type SchemaType =
'Float32' |
'Float64' |
'KeyValue' |
'ProtobufNative' |
'Bytes' |
'AutoConsume' |
'AutoPublish';
Expand Down
2 changes: 2 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const AuthenticationToken = require('./src/AuthenticationToken');
const AuthenticationOauth2 = require('./src/AuthenticationOauth2');
const AuthenticationBasic = require('./src/AuthenticationBasic');
const Client = require('./src/Client');
const ProtobufNativeSchema = require('./src/ProtobufNativeSchema');

const LogLevel = {
DEBUG: 0,
Expand All @@ -43,6 +44,7 @@ const Pulsar = {
AuthenticationToken,
AuthenticationOauth2,
AuthenticationBasic,
ProtobufNativeSchema,
LogLevel,
};

Expand Down
22 changes: 21 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
"dependencies": {
"@mapbox/node-pre-gyp": "^2.0.3",
"bindings": "^1.5.0",
"node-addon-api": "^4.3.0"
"node-addon-api": "^4.3.0",
"protobufjs": "^8.4.2"
},
"binary": {
"module_name": "pulsar",
Expand Down
74 changes: 74 additions & 0 deletions src/ProtobufNativeSchema.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const protobuf = require('protobufjs');
const descriptor = require('protobufjs/ext/descriptor');

const normalizeTypeName = (typeName) => typeName.replace(/^\./, '');

const createSchemaInfoFromRoot = ({
root,
rootMessageTypeName,
rootFileDescriptorName,
schemaType = 'ProtobufNative',
syntax = 'proto3',
name = rootMessageTypeName,
properties = {},
}) => {
if (!root) {
throw new Error('root is required');
}
if (!rootMessageTypeName) {
throw new Error('rootMessageTypeName is required');
}
if (!rootFileDescriptorName) {
throw new Error('rootFileDescriptorName is required');
}

const normalizedTypeName = normalizeTypeName(rootMessageTypeName);
const rootMessageType = root.lookupType(normalizedTypeName);
const packageName = normalizedTypeName.split('.').slice(0, -1).join('.');
const namespace = packageName ? root.lookup(packageName) : root;

// protobufjs reflection JSON does not retain the source file name. Set it
// before exporting a FileDescriptorSet, mirroring descriptor->file()->name().
namespace.filename = rootFileDescriptorName;
root.resolveAll();

const fileDescriptorSet = root.toDescriptor(syntax);
const fileDescriptorSetBytes = descriptor.FileDescriptorSet.encode(fileDescriptorSet).finish();

return {
schemaType,
name,
schema: JSON.stringify({
fileDescriptorSet: Buffer.from(fileDescriptorSetBytes).toString('base64'),
rootMessageTypeName: normalizeTypeName(rootMessageType.fullName),
rootFileDescriptorName,
}),
properties,
};
};

const createRootFromJson = (rootJson) => protobuf.Root.fromJSON(rootJson);

module.exports = {
createRootFromJson,
createSchemaInfoFromRoot,
};
70 changes: 44 additions & 26 deletions src/SchemaInfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,53 @@
* under the License.
*/
#include "SchemaInfo.h"
#include <pulsar/ConsumerConfiguration.h>
#include <pulsar/ProducerConfiguration.h>
#include <map>

static const std::string CFG_SCHEMA_TYPE = "schemaType";
static const std::string CFG_NAME = "name";
static const std::string CFG_SCHEMA = "schema";
static const std::string CFG_PROPS = "properties";

static const std::map<std::string, pulsar_schema_type> SCHEMA_TYPE = {{"None", pulsar_None},
{"String", pulsar_String},
{"Json", pulsar_Json},
{"Protobuf", pulsar_Protobuf},
{"Avro", pulsar_Avro},
{"Boolean", pulsar_Boolean},
{"Int8", pulsar_Int8},
{"Int16", pulsar_Int16},
{"Int32", pulsar_Int32},
{"Int64", pulsar_Int64},
{"Float32", pulsar_Float32},
{"Float64", pulsar_Float64},
{"KeyValue", pulsar_KeyValue},
{"Bytes", pulsar_Bytes},
{"AutoConsume", pulsar_AutoConsume},
{"AutoPublish", pulsar_AutoPublish}};
struct _pulsar_producer_configuration {
pulsar::ProducerConfiguration conf;
};

SchemaInfo::SchemaInfo(const Napi::Object &schemaInfo) : cSchemaType(pulsar_Bytes), name("BYTES"), schema() {
this->cProperties = pulsar_string_map_create();
struct _pulsar_consumer_configuration {
pulsar::ConsumerConfiguration consumerConfiguration;
};

static const std::map<std::string, pulsar::SchemaType> SCHEMA_TYPE = {
{"None", static_cast<pulsar::SchemaType>(0)},
{"String", static_cast<pulsar::SchemaType>(1)},
{"Json", static_cast<pulsar::SchemaType>(2)},
{"Protobuf", static_cast<pulsar::SchemaType>(3)},
{"Avro", static_cast<pulsar::SchemaType>(4)},
{"Boolean", static_cast<pulsar::SchemaType>(5)},
{"Int8", static_cast<pulsar::SchemaType>(6)},
{"Int16", static_cast<pulsar::SchemaType>(7)},
{"Int32", static_cast<pulsar::SchemaType>(8)},
{"Int64", static_cast<pulsar::SchemaType>(9)},
{"Float32", static_cast<pulsar::SchemaType>(10)},
{"Float64", static_cast<pulsar::SchemaType>(11)},
{"KeyValue", static_cast<pulsar::SchemaType>(15)},
{"ProtobufNative", static_cast<pulsar::SchemaType>(20)},
{"Bytes", static_cast<pulsar::SchemaType>(-1)},
{"AutoConsume", static_cast<pulsar::SchemaType>(-3)},
{"AutoPublish", static_cast<pulsar::SchemaType>(-4)}};

SchemaInfo::SchemaInfo(const Napi::Object &schemaInfo)
: schemaType(static_cast<pulsar::SchemaType>(-1)), name("BYTES"), schema() {
Comment thread
shibd marked this conversation as resolved.
if (schemaInfo.Has(CFG_SCHEMA_TYPE) && schemaInfo.Get(CFG_SCHEMA_TYPE).IsString()) {
this->name = schemaInfo.Get(CFG_SCHEMA_TYPE).ToString().Utf8Value();
this->cSchemaType = SCHEMA_TYPE.at(schemaInfo.Get(CFG_SCHEMA_TYPE).ToString().Utf8Value());
std::string typeStr = schemaInfo.Get(CFG_SCHEMA_TYPE).ToString().Utf8Value();
auto it = SCHEMA_TYPE.find(typeStr);
if (it == SCHEMA_TYPE.end()) {
Napi::TypeError::New(schemaInfo.Env(), "Unknown schemaType: " + typeStr).ThrowAsJavaScriptException();
return;
}
this->name = typeStr;
this->schemaType = it->second;
}
Comment thread
shibd marked this conversation as resolved.
if (schemaInfo.Has(CFG_NAME) && schemaInfo.Get(CFG_NAME).IsString()) {
this->name = schemaInfo.Get(CFG_NAME).ToString().Utf8Value();
Expand All @@ -60,19 +78,19 @@ SchemaInfo::SchemaInfo(const Napi::Object &schemaInfo) : cSchemaType(pulsar_Byte
for (int i = 0; i < size; i++) {
Napi::String key = arr.Get(i).ToString();
Napi::String value = propObj.Get(key).ToString();
pulsar_string_map_put(this->cProperties, key.Utf8Value().c_str(), value.Utf8Value().c_str());
this->properties[key.Utf8Value()] = value.Utf8Value();
}
}
}

void SchemaInfo::SetProducerSchema(std::shared_ptr<pulsar_producer_configuration_t> cProducerConfiguration) {
pulsar_producer_configuration_set_schema_info(cProducerConfiguration.get(), this->cSchemaType,
this->name.c_str(), this->schema.c_str(), this->cProperties);
cProducerConfiguration->conf.setSchema(
pulsar::SchemaInfo(this->schemaType, this->name, this->schema, this->properties));
}

void SchemaInfo::SetConsumerSchema(std::shared_ptr<pulsar_consumer_configuration_t> cConsumerConfiguration) {
pulsar_consumer_configuration_set_schema_info(cConsumerConfiguration.get(), this->cSchemaType,
this->name.c_str(), this->schema.c_str(), this->cProperties);
cConsumerConfiguration->consumerConfiguration.setSchema(
pulsar::SchemaInfo(this->schemaType, this->name, this->schema, this->properties));
}

SchemaInfo::~SchemaInfo() { pulsar_string_map_free(this->cProperties); }
SchemaInfo::~SchemaInfo() {}
10 changes: 7 additions & 3 deletions src/SchemaInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
#ifndef SCHEMA_INFO_H
#define SCHEMA_INFO_H

#include <map>
#include <memory>
#include <napi.h>
#include <pulsar/c/producer_configuration.h>
#include <pulsar/Schema.h>
#include <pulsar/c/consumer_configuration.h>
#include <pulsar/c/producer_configuration.h>
#include <string>

class SchemaInfo {
public:
Expand All @@ -32,10 +36,10 @@ class SchemaInfo {
void SetConsumerSchema(std::shared_ptr<pulsar_consumer_configuration_t> cConsumerConfiguration);

private:
pulsar_schema_type cSchemaType;
pulsar::SchemaType schemaType;
std::string name;
std::string schema;
pulsar_string_map_t *cProperties;
std::map<std::string, std::string> properties;
};

#endif
Loading
Loading