From 78fd64456c991ee202effbe854b6fa18e05b571a Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 29 Apr 2026 22:09:47 +0800 Subject: [PATCH 1/6] Fix DateTime field not being encoded as a numeric timestamp in JSON schema ## Motivation Currently, the `DateTime` field is encoded as a string in the JSON schema, which is inconsistent with the type defined in Avro: ``` "CreatedDate": "2026-04-20T06:30:03.071518Z" ``` Instead, JSON schema payloads should align with the generated Avro logical timestamp schema, rather than representing `DateTime` fields as ISO-formatted strings. ## Modification * Add a `DateTime` JSON converter that outputs Unix epoch timestamps in milliseconds * Update JSON schema decoding to support both numeric timestamps and legacy ISO string formats --- src/Pulsar.Client/Schema/JsonSchema.fs | 19 ++++++++++ tests/UnitTests/Internal/SchemaTests.fs | 46 +++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/src/Pulsar.Client/Schema/JsonSchema.fs b/src/Pulsar.Client/Schema/JsonSchema.fs index c6028cde..79ae0fa1 100644 --- a/src/Pulsar.Client/Schema/JsonSchema.fs +++ b/src/Pulsar.Client/Schema/JsonSchema.fs @@ -5,15 +5,34 @@ open System.Collections.Generic open System.Dynamic open System.Text open System.Text.Json +open System.Text.Json.Serialization open Avro open Pulsar.Client.Api open AvroSchemaGenerator open Pulsar.Client.Common +type internal DateTimeTimestampMillisConverter() = + inherit JsonConverter() + override this.Read(reader: byref, _, _) = + match reader.TokenType with + | JsonTokenType.Number -> + reader.GetInt64() + |> DateTimeOffset.FromUnixTimeMilliseconds + |> _.UtcDateTime + | JsonTokenType.String -> + reader.GetDateTime() + | _ -> + raise <| JsonException $"Unexpected token parsing DateTime. Expected Number or String, got {reader.TokenType}." + + override this.Write(writer: Utf8JsonWriter, value: DateTime, _) = + DateTimeOffset(value).ToUnixTimeMilliseconds() + |> writer.WriteNumberValue + type internal JsonSchema<'T> () = inherit ISchema<'T>() let parameterIsClass = typeof<'T>.IsClass let options = JsonSerializerOptions(IgnoreNullValues = true) + do options.Converters.Add(DateTimeTimestampMillisConverter()) let stringSchema = typeof<'T>.GetSchema() override this.SchemaInfo = { Name = ""; Type = SchemaType.JSON; Schema = stringSchema |> Encoding.UTF8.GetBytes; Properties = Map.empty } override this.Encode value = diff --git a/tests/UnitTests/Internal/SchemaTests.fs b/tests/UnitTests/Internal/SchemaTests.fs index 93caa2b3..12007736 100644 --- a/tests/UnitTests/Internal/SchemaTests.fs +++ b/tests/UnitTests/Internal/SchemaTests.fs @@ -3,6 +3,8 @@ module Pulsar.Client.UnitTests.Internal.SchemaTests open System open System.Collections.Generic open System.Diagnostics +open System.Text +open System.Text.Json open AvroGenerated open Expecto open Expecto.Flip @@ -26,6 +28,9 @@ type ProtobufSchemaTest = { [] type AvroSchemaTest = { X: string; Y: ResizeArray } +[] +type DateTimeSchemaTest = { OccurredAt: DateTime } + [] [] type ProtobufNativeSchemaTest = { @@ -193,6 +198,47 @@ let tests = Expect.equal "" input.X output.X Expect.sequenceEqual "" input.Y output.Y } + + test "JSON schema DateTime encoding writes numeric timestamp value" { + let schema = Schema.JSON() + let input = { OccurredAt = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) } + let payload = schema.Encode(input) + use doc = JsonDocument.Parse(payload) + + Expect.equal "" JsonValueKind.Number (doc.RootElement.GetProperty("OccurredAt").ValueKind) + } + + test "JSON schema DateTime decoding accepts numeric timestamp value" { + let schema = Schema.JSON() + let expected = DateTimeOffset.FromUnixTimeMilliseconds(1776666603071L).UtcDateTime + let payload = Encoding.UTF8.GetBytes("""{"OccurredAt":1776666603071}""") + + let output = schema.Decode(payload) + + Expect.equal "" expected output.OccurredAt + } + + test "JSON schema DateTime decoding accepts legacy string timestamp value" { + let schema = Schema.JSON() + let expected = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) + let payload = Encoding.UTF8.GetBytes("""{"OccurredAt":"2026-04-20T06:30:03.071518Z"}""") + + let output = schema.Decode(payload) + + Expect.equal "" expected output.OccurredAt + } + + test "Avro schema DateTime encoding writes numeric timestamp value" { + let schema = Schema.AVRO() + let input = { OccurredAt = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) } + use stream = new IO.MemoryStream(schema.Encode(input)) + let decoder = Avro.IO.BinaryDecoder(stream) + let unionBranch = decoder.ReadLong() + let timestamp = decoder.ReadLong() + + Expect.equal "" 1L unionBranch + Expect.equal "" (DateTimeOffset(input.OccurredAt).ToUnixTimeMilliseconds()) timestamp + } test "Protobuf native" { let inputs = [{ ProtobufNativeSchemaTest.foo = "X1"; bar = 1.0; time = DateTime.Now}] From d09790fee1682933b6dad7f5f48fb3774c469d0e Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 29 Apr 2026 22:29:09 +0800 Subject: [PATCH 2/6] JSON schema DateTime encoding treats non-UTC values as UTC --- src/Pulsar.Client/Schema/JsonSchema.fs | 11 ++++++++++- tests/UnitTests/Internal/SchemaTests.fs | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/Pulsar.Client/Schema/JsonSchema.fs b/src/Pulsar.Client/Schema/JsonSchema.fs index 79ae0fa1..67e57573 100644 --- a/src/Pulsar.Client/Schema/JsonSchema.fs +++ b/src/Pulsar.Client/Schema/JsonSchema.fs @@ -13,6 +13,12 @@ open Pulsar.Client.Common type internal DateTimeTimestampMillisConverter() = inherit JsonConverter() + let normalizeToUtc (value: DateTime) = + if value.Kind = DateTimeKind.Utc then + value + else + DateTime.SpecifyKind(value, DateTimeKind.Utc) + override this.Read(reader: byref, _, _) = match reader.TokenType with | JsonTokenType.Number -> @@ -25,7 +31,10 @@ type internal DateTimeTimestampMillisConverter() = raise <| JsonException $"Unexpected token parsing DateTime. Expected Number or String, got {reader.TokenType}." override this.Write(writer: Utf8JsonWriter, value: DateTime, _) = - DateTimeOffset(value).ToUnixTimeMilliseconds() + value + |> normalizeToUtc + |> DateTimeOffset + |> _.ToUnixTimeMilliseconds() |> writer.WriteNumberValue type internal JsonSchema<'T> () = diff --git a/tests/UnitTests/Internal/SchemaTests.fs b/tests/UnitTests/Internal/SchemaTests.fs index 12007736..518cca9d 100644 --- a/tests/UnitTests/Internal/SchemaTests.fs +++ b/tests/UnitTests/Internal/SchemaTests.fs @@ -208,6 +208,25 @@ let tests = Expect.equal "" JsonValueKind.Number (doc.RootElement.GetProperty("OccurredAt").ValueKind) } + test "JSON schema DateTime encoding treats non-UTC values as UTC" { + let schema = Schema.JSON() + let inputs = [ + DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Local).AddTicks(715180L) + DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Unspecified).AddTicks(715180L) + ] + + for input in inputs do + let payload = schema.Encode({ OccurredAt = input }) + use doc = JsonDocument.Parse(payload) + let timestamp = doc.RootElement.GetProperty("OccurredAt").GetInt64() + let expected = + DateTime.SpecifyKind(input, DateTimeKind.Utc) + |> DateTimeOffset + |> _.ToUnixTimeMilliseconds() + + Expect.equal "" expected timestamp + } + test "JSON schema DateTime decoding accepts numeric timestamp value" { let schema = Schema.JSON() let expected = DateTimeOffset.FromUnixTimeMilliseconds(1776666603071L).UtcDateTime From 995100f316cb7a2d8fef5edca5167b5ecb994685 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 29 Apr 2026 23:27:40 +0800 Subject: [PATCH 3/6] Fix JSON schema integration DateTime precision --- tests/IntegrationTests/Schema.fs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/IntegrationTests/Schema.fs b/tests/IntegrationTests/Schema.fs index 19f2b4e0..fbb70c14 100644 --- a/tests/IntegrationTests/Schema.fs +++ b/tests/IntegrationTests/Schema.fs @@ -111,7 +111,13 @@ let tests = .SubscriptionName("test-subscription") .SubscribeAsync() - let input = { SimpleRecord3.Name = "abc"; Age = 20; Date = DateTime.UtcNow } + // JSON schema encodes DateTime as timestamp-millis, so keep the + // expected value at the same precision used by the wire payload. + let inputDate = + DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + |> DateTimeOffset.FromUnixTimeMilliseconds + |> _.UtcDateTime + let input = { SimpleRecord3.Name = "abc"; Age = 20; Date = inputDate } let! _ = producer.SendAsync(input) let! (msg : Message) = consumer.ReceiveAsync() @@ -556,4 +562,4 @@ let tests = Log.Debug("Finished Auto consume with multi-version schema") } ] - \ No newline at end of file + From e49cd891908072a3a441e42d8ed9b4e5d5ea7c23 Mon Sep 17 00:00:00 2001 From: Vladimir Shchur Date: Fri, 1 May 2026 16:20:29 -0700 Subject: [PATCH 4/6] Simplified conversion to UTC --- src/Pulsar.Client/Schema/JsonSchema.fs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/Pulsar.Client/Schema/JsonSchema.fs b/src/Pulsar.Client/Schema/JsonSchema.fs index 67e57573..a6632ce5 100644 --- a/src/Pulsar.Client/Schema/JsonSchema.fs +++ b/src/Pulsar.Client/Schema/JsonSchema.fs @@ -13,26 +13,20 @@ open Pulsar.Client.Common type internal DateTimeTimestampMillisConverter() = inherit JsonConverter() - let normalizeToUtc (value: DateTime) = - if value.Kind = DateTimeKind.Utc then - value - else - DateTime.SpecifyKind(value, DateTimeKind.Utc) override this.Read(reader: byref, _, _) = match reader.TokenType with - | JsonTokenType.Number -> + | JsonTokenType.Number -> // timestamp in milliseconds since Unix epoch reader.GetInt64() |> DateTimeOffset.FromUnixTimeMilliseconds |> _.UtcDateTime - | JsonTokenType.String -> + | JsonTokenType.String -> // ISO 8601 string reader.GetDateTime() | _ -> raise <| JsonException $"Unexpected token parsing DateTime. Expected Number or String, got {reader.TokenType}." override this.Write(writer: Utf8JsonWriter, value: DateTime, _) = - value - |> normalizeToUtc + value.ToUniversalTime() |> DateTimeOffset |> _.ToUnixTimeMilliseconds() |> writer.WriteNumberValue From b193f170c7c0f850e957f92871bc3d27808ee49d Mon Sep 17 00:00:00 2001 From: Vladimir Shchur Date: Fri, 1 May 2026 16:38:10 -0700 Subject: [PATCH 5/6] Fixed failing local datetime tests --- src/Pulsar.Client/Pulsar.Client.fsproj | 6 +++--- src/Pulsar.Client/Schema/JsonSchema.fs | 17 ++++++++--------- tests/UnitTests/Internal/SchemaTests.fs | 4 ++-- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/Pulsar.Client/Pulsar.Client.fsproj b/src/Pulsar.Client/Pulsar.Client.fsproj index 582f06b4..5a09d41d 100644 --- a/src/Pulsar.Client/Pulsar.Client.fsproj +++ b/src/Pulsar.Client/Pulsar.Client.fsproj @@ -7,17 +7,17 @@ Pulsar.Client Pulsar.Client Pulsar.Client - 3.15.1 + 3.15.2 F# community .NET client library for Apache Pulsar https://github.com/fsprojects/pulsar-client-dotnet - Auto cluster failover refactoring + Fixed date representation for JSON schema MIT https://github.com/fsprojects/pulsar-client-dotnet git Apache;Pulsar;F#;FSharp Vladimir Shchur and contributors - 3.15.1 + 3.15.2 portable true README.md diff --git a/src/Pulsar.Client/Schema/JsonSchema.fs b/src/Pulsar.Client/Schema/JsonSchema.fs index a6632ce5..bcdcbb6f 100644 --- a/src/Pulsar.Client/Schema/JsonSchema.fs +++ b/src/Pulsar.Client/Schema/JsonSchema.fs @@ -2,7 +2,6 @@ namespace Pulsar.Client.Schema open System open System.Collections.Generic -open System.Dynamic open System.Text open System.Text.Json open System.Text.Json.Serialization @@ -34,7 +33,7 @@ type internal DateTimeTimestampMillisConverter() = type internal JsonSchema<'T> () = inherit ISchema<'T>() let parameterIsClass = typeof<'T>.IsClass - let options = JsonSerializerOptions(IgnoreNullValues = true) + let options = JsonSerializerOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull) do options.Converters.Add(DateTimeTimestampMillisConverter()) let stringSchema = typeof<'T>.GetSchema() override this.SchemaInfo = { Name = ""; Type = SchemaType.JSON; Schema = stringSchema |> Encoding.UTF8.GetBytes; Properties = Map.empty } @@ -47,7 +46,7 @@ type internal JsonSchema<'T> () = type internal GenericJsonSchema (topicSchema: TopicSchema) = inherit ISchema() - let dynamicSerializerOptions = JsonSerializerOptions(IgnoreNullValues = true) + let dynamicSerializerOptions = JsonSerializerOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull) do dynamicSerializerOptions.Converters.Add <| DynamicJsonConverter() let stringSchema = topicSchema.SchemaInfo.Schema |> Encoding.UTF8.GetString let avroSchema = Schema.Parse(stringSchema) :?> RecordSchema @@ -63,10 +62,10 @@ type internal GenericJsonSchema (topicSchema: TopicSchema) = let doc = JsonSerializer.Deserialize>(ReadOnlySpan bytes, dynamicSerializerOptions) let fields = schemaFields - |> Seq.map (fun sf -> { Name = sf.Name; Value = doc.[sf.Name]; Index = sf.Pos }) + |> Seq.map (fun sf -> { Name = sf.Name; Value = doc[sf.Name]; Index = sf.Pos }) |> Seq.toArray - let scemaVersionBytes = - topicSchema.SchemaVersion - |> Option.map _.Bytes - |> Option.toObj - GenericRecord(scemaVersionBytes, fields) \ No newline at end of file + let schemaVersionBytes = + match topicSchema.SchemaVersion with + | Some v -> v.Bytes + | None -> null + GenericRecord(schemaVersionBytes, fields) diff --git a/tests/UnitTests/Internal/SchemaTests.fs b/tests/UnitTests/Internal/SchemaTests.fs index 518cca9d..24502a50 100644 --- a/tests/UnitTests/Internal/SchemaTests.fs +++ b/tests/UnitTests/Internal/SchemaTests.fs @@ -208,7 +208,7 @@ let tests = Expect.equal "" JsonValueKind.Number (doc.RootElement.GetProperty("OccurredAt").ValueKind) } - test "JSON schema DateTime encoding treats non-UTC values as UTC" { + test "JSON schema DateTime encoding works for local and unspecified DateTime values" { let schema = Schema.JSON() let inputs = [ DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Local).AddTicks(715180L) @@ -220,7 +220,7 @@ let tests = use doc = JsonDocument.Parse(payload) let timestamp = doc.RootElement.GetProperty("OccurredAt").GetInt64() let expected = - DateTime.SpecifyKind(input, DateTimeKind.Utc) + input |> DateTimeOffset |> _.ToUnixTimeMilliseconds() From fac54a85c75fcd2a03e16e5539a14d376422bb10 Mon Sep 17 00:00:00 2001 From: Vladimir Shchur Date: Fri, 1 May 2026 16:50:07 -0700 Subject: [PATCH 6/6] Review fixes --- tests/UnitTests/Internal/SchemaTests.fs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/tests/UnitTests/Internal/SchemaTests.fs b/tests/UnitTests/Internal/SchemaTests.fs index 24502a50..64ae75ff 100644 --- a/tests/UnitTests/Internal/SchemaTests.fs +++ b/tests/UnitTests/Internal/SchemaTests.fs @@ -199,31 +199,25 @@ let tests = Expect.sequenceEqual "" input.Y output.Y } - test "JSON schema DateTime encoding writes numeric timestamp value" { - let schema = Schema.JSON() - let input = { OccurredAt = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) } - let payload = schema.Encode(input) - use doc = JsonDocument.Parse(payload) - - Expect.equal "" JsonValueKind.Number (doc.RootElement.GetProperty("OccurredAt").ValueKind) - } - - test "JSON schema DateTime encoding works for local and unspecified DateTime values" { + test "JSON schema DateTime encoding works for all kinds of DateTime values" { let schema = Schema.JSON() let inputs = [ DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Local).AddTicks(715180L) DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Unspecified).AddTicks(715180L) + DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) ] for input in inputs do let payload = schema.Encode({ OccurredAt = input }) use doc = JsonDocument.Parse(payload) - let timestamp = doc.RootElement.GetProperty("OccurredAt").GetInt64() + let occuredAtProperty = doc.RootElement.GetProperty("OccurredAt") + Expect.equal "" JsonValueKind.Number occuredAtProperty.ValueKind + + let timestamp = occuredAtProperty.GetInt64() let expected = input |> DateTimeOffset |> _.ToUnixTimeMilliseconds() - Expect.equal "" expected timestamp }