diff --git a/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/backports/GenericDatumReaderExt.java b/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/backports/GenericDatumReaderExt.java index 207939223..fd18eaaa4 100644 --- a/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/backports/GenericDatumReaderExt.java +++ b/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/backports/GenericDatumReaderExt.java @@ -16,8 +16,8 @@ public class GenericDatumReaderExt extends GenericDatumReader { - private final Schema writer; - private final Schema reader; + private Schema writer; + private Schema reader; public GenericDatumReaderExt(Schema writer, Schema reader) { super(writer, reader); @@ -25,6 +25,9 @@ public GenericDatumReaderExt(Schema writer, Schema reader) { this.reader = reader; } + /** + * {@inheritDoc} + */ @SuppressWarnings("unchecked") @Override public T read(T reuse, Decoder in) throws IOException { @@ -35,6 +38,27 @@ public T read(T reuse, Decoder in) throws IOException { return result; } + /** + * {@inheritDoc} + */ + @Override + public void setSchema(Schema writer) { + super.setSchema(writer); + this.writer = writer; + if (this.reader == null) { + this.reader = writer; + } + } + + /** + * {@inheritDoc} + */ + @Override + public void setExpected(Schema reader) throws IOException { + super.setExpected(reader); + this.reader = reader; + } + private Object read(Object old, Schema expected, CachedResolvingDecoder in) throws IOException { switch (expected.getType()) { diff --git a/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/backports/SpecificDatumReaderExt.java b/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/backports/SpecificDatumReaderExt.java index 158440d37..9a986aac7 100644 --- a/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/backports/SpecificDatumReaderExt.java +++ b/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/backports/SpecificDatumReaderExt.java @@ -16,8 +16,8 @@ public class SpecificDatumReaderExt extends SpecificDatumReader { - private final Schema writer; - private final Schema reader; + private Schema writer; + private Schema reader; public SpecificDatumReaderExt(Schema writer, Schema reader) { super(writer, reader); @@ -25,6 +25,30 @@ public SpecificDatumReaderExt(Schema writer, Schema reader) { this.reader = reader; } + /** + * {@inheritDoc} + */ + @Override + public void setSchema(Schema writer) { + super.setSchema(writer); + this.writer = writer; + if (this.reader == null) { + this.reader = writer; + } + } + + /** + * {@inheritDoc} + */ + @Override + public void setExpected(Schema reader) throws IOException { + super.setExpected(reader); + this.reader = reader; + } + + /** + * {@inheritDoc} + */ @SuppressWarnings("unchecked") @Override public T read(T reuse, Decoder in) throws IOException { diff --git a/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/parsing/ResolvingGrammarGenerator.java b/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/parsing/ResolvingGrammarGenerator.java index 9e29926d0..479d527f7 100644 --- a/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/parsing/ResolvingGrammarGenerator.java +++ b/helper/impls/helper-impl-14/src/main/java/com/linkedin/avroutil1/compatibility/avro14/parsing/ResolvingGrammarGenerator.java @@ -164,13 +164,11 @@ public Symbol generate( break; case NULL: case BOOLEAN: + break; case INT: - switch (writerType) { - case LONG: - return Symbol.IntLongAdjustAction.INSTANCE; - default: - break; - } + if (writerType == Schema.Type.LONG) { + return Symbol.IntLongAdjustAction.INSTANCE; + } case STRING: case FLOAT: case BYTES: diff --git a/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/backports/GenericDatumReaderExt.java b/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/backports/GenericDatumReaderExt.java index cf6b394ba..bfb89434f 100644 --- a/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/backports/GenericDatumReaderExt.java +++ b/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/backports/GenericDatumReaderExt.java @@ -6,9 +6,14 @@ package com.linkedin.avroutil1.compatibility.avro15.backports; +import com.linkedin.avroutil1.compatibility.avro15.codec.CachedResolvingDecoder; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.Decoder; + +import java.io.IOException; /** @@ -18,7 +23,132 @@ */ public class GenericDatumReaderExt extends GenericDatumReader { + private Schema writer; + private Schema reader; + public GenericDatumReaderExt(Schema writer, Schema reader, GenericData genericData) { super(writer, reader, genericData); + this.writer = writer; + this.reader = reader; + } + + /** + * {@inheritDoc} + */ + @Override + public void setExpected(Schema reader) throws IOException { + super.setExpected(reader); + this.reader = reader; + } + + /** + * {@inheritDoc} + */ + @Override + public void setSchema(Schema writer) { + super.setSchema(writer); + this.writer = writer; + if (reader == null) { + this.reader = writer; + } + } + + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + @Override + public T read(T reuse, Decoder in) throws IOException { + CachedResolvingDecoder resolver = new CachedResolvingDecoder(writer, reader, in); + resolver.init(in); + T result = (T) read(reuse, reader, resolver); + resolver.drain(); + return result; + } + + private Object read(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + switch (expected.getType()) { + case RECORD: + return readRecord(old, expected, in); + case ENUM: + return readEnum(expected, in); + case ARRAY: + return readArray(old, expected, in); + case MAP: + return readMap(old, expected, in); + case UNION: + return read(old, expected.getTypes().get(in.readIndex()), in); + case FIXED: + return readFixed(old, expected, in); + case STRING: + return readString(old, expected, in); + case BYTES: + return readBytes(old, in); + case INT: + return readInt(old, expected, in); + case LONG: + return in.readLong(); + case FLOAT: + return in.readFloat(); + case DOUBLE: + return in.readDouble(); + case BOOLEAN: + return in.readBoolean(); + case NULL: + in.readNull(); + return null; + default: + throw new AvroRuntimeException("Unknown type: " + expected); + } + } + + private Object readRecord(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Object record = newRecord(old, expected); + final GenericData data = getData(); + + for (Schema.Field f : in.readFieldOrder()) { + int pos = f.pos(); + String name = f.name(); + Object oldDatum = (old != null) ? data.getField(record, name, pos) : null; + data.setField(record, name, pos, read(oldDatum, f.schema(), in)); + } + + return record; + } + + private Object readArray(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema expectedType = expected.getElementType(); + long l = in.readArrayStart(); + long base = 0; + if (l > 0) { + Object array = newArray(old, (int) l, expected); + do { + for (long i = 0; i < l; i++) { + addToArray(array, base + i, read(peekArray(array), expectedType, in)); + } + base += l; + } while ((l = in.arrayNext()) > 0); + return array; + } else { + return newArray(old, 0, expected); + } + } + + private Object readMap(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema eValue = expected.getValueType(); + long l = in.readMapStart(); + Object map = newMap(old, (int) l); + if (l > 0) { + do { + for (int i = 0; i < l; i++) { + addToMap(map, readString(null, in), read(null, eValue, in)); + } + } while ((l = in.mapNext()) > 0); + } + return map; } } diff --git a/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/backports/SpecificDatumReaderExt.java b/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/backports/SpecificDatumReaderExt.java index 29d17ca15..1f487e4dc 100644 --- a/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/backports/SpecificDatumReaderExt.java +++ b/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/backports/SpecificDatumReaderExt.java @@ -6,10 +6,16 @@ package com.linkedin.avroutil1.compatibility.avro15.backports; +import com.linkedin.avroutil1.compatibility.avro15.codec.CachedResolvingDecoder; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.Decoder; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; +import java.io.IOException; + /** * this class allows constructing a {@link SpecificDatumReader} with @@ -17,7 +23,132 @@ * @param */ public class SpecificDatumReaderExt extends SpecificDatumReader { + private Schema writer; + private Schema reader; + public SpecificDatumReaderExt(Schema writer, Schema reader, SpecificData specificData) { super(writer, reader, specificData); + this.writer = writer; + this.reader = reader; + } + + /** + * {@inheritDoc} + */ + @Override + public void setExpected(Schema reader) throws IOException { + super.setExpected(reader); + this.reader = reader; + } + + /** + * {@inheritDoc} + */ + @Override + public void setSchema(Schema writer) { + super.setSchema(writer); + this.writer = writer; + if (reader == null) { + this.reader = writer; + } + } + + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + @Override + public T read(T reuse, Decoder in) throws IOException { + CachedResolvingDecoder resolver = new CachedResolvingDecoder(writer, reader, in); + resolver.init(in); + T result = (T) read(reuse, reader, resolver); + resolver.drain(); + return result; + } + + private Object read(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + switch (expected.getType()) { + case RECORD: + return readRecord(old, expected, in); + case ENUM: + return readEnum(expected, in); + case ARRAY: + return readArray(old, expected, in); + case MAP: + return readMap(old, expected, in); + case UNION: + return read(old, expected.getTypes().get(in.readIndex()), in); + case FIXED: + return readFixed(old, expected, in); + case STRING: + return readString(old, expected, in); + case BYTES: + return readBytes(old, in); + case INT: + return readInt(old, expected, in); + case LONG: + return in.readLong(); + case FLOAT: + return in.readFloat(); + case DOUBLE: + return in.readDouble(); + case BOOLEAN: + return in.readBoolean(); + case NULL: + in.readNull(); + return null; + default: + throw new AvroRuntimeException("Unknown type: " + expected); + } + } + + private Object readRecord(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Object record = newRecord(old, expected); + final GenericData data = getData(); + + for (Schema.Field f : in.readFieldOrder()) { + int pos = f.pos(); + String name = f.name(); + Object oldDatum = (old != null) ? data.getField(record, name, pos) : null; + data.setField(record, name, pos, read(oldDatum, f.schema(), in)); + } + + return record; + } + + private Object readArray(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema expectedType = expected.getElementType(); + long l = in.readArrayStart(); + long base = 0; + if (l > 0) { + Object array = newArray(old, (int) l, expected); + do { + for (long i = 0; i < l; i++) { + addToArray(array, base + i, read(peekArray(array), expectedType, in)); + } + base += l; + } while ((l = in.arrayNext()) > 0); + return array; + } else { + return newArray(old, 0, expected); + } + } + + private Object readMap(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema eValue = expected.getValueType(); + long l = in.readMapStart(); + Object map = newMap(old, (int) l); + if (l > 0) { + do { + for (int i = 0; i < l; i++) { + addToMap(map, readString(null, in), read(null, eValue, in)); + } + } while ((l = in.mapNext()) > 0); + } + return map; } } diff --git a/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/codec/ResolvingDecoder.java b/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/codec/ResolvingDecoder.java index 29c105648..0243b0ced 100644 --- a/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/codec/ResolvingDecoder.java +++ b/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/codec/ResolvingDecoder.java @@ -156,6 +156,23 @@ public final void drain() throws IOException { parser.processImplicitActions(); } + @Override + public int readInt() throws IOException { + Symbol actual = parser.popSymbol(); + if (actual == Symbol.INT) { + return in.readInt(); + } else if (actual == Symbol.IntLongAdjustAction.INSTANCE) { + long value = in.readLong(); + if (value < Integer.MIN_VALUE || value > Integer.MAX_VALUE) { + throw new AvroTypeException(value + " cannot be represented as int"); + } + + return (int) value; + } + + throw new AvroTypeException("Expected int but found " + actual); + } + @Override public long readLong() throws IOException { Symbol actual = parser.advance(Symbol.LONG); @@ -245,6 +262,9 @@ public Symbol doAction(Symbol input, Symbol top) throws IOException { .binaryDecoder(dsa.contents, null); } else if (top == Symbol.DEFAULT_END_ACTION) { in = backup; + } else if (top == Symbol.IntLongAdjustAction.INSTANCE) { + parser.pushSymbol(Symbol.INT); + return Symbol.INT; } else { throw new AvroTypeException("Unknown action: " + top); } diff --git a/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/parsing/ResolvingGrammarGenerator.java b/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/parsing/ResolvingGrammarGenerator.java index 1989baf4a..4c2f09597 100644 --- a/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/parsing/ResolvingGrammarGenerator.java +++ b/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/parsing/ResolvingGrammarGenerator.java @@ -33,8 +33,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -170,7 +170,11 @@ public Symbol generate( break; case NULL: case BOOLEAN: + break; case INT: + if (writerType == Schema.Type.LONG) { + return Symbol.IntLongAdjustAction.INSTANCE; + } case STRING: case BYTES: case ENUM: @@ -461,6 +465,11 @@ private static int bestBranch(Schema r, Schema w) { } break; case LONG: + // We can selectively demote long and check that it fits inside int range + // when reading an int from a long writer schema. + if (b.getType() == Schema.Type.INT) { + return j; + } case FLOAT: switch (b.getType()) { case DOUBLE: diff --git a/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/parsing/Symbol.java b/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/parsing/Symbol.java index f51addcc6..7b10e1871 100644 --- a/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/parsing/Symbol.java +++ b/helper/impls/helper-impl-15/src/main/java/com/linkedin/avroutil1/compatibility/avro15/parsing/Symbol.java @@ -481,6 +481,10 @@ public FieldAdjustAction(int rindex, String fname) { } } + public static class IntLongAdjustAction extends ImplicitAction { + public static final IntLongAdjustAction INSTANCE = new IntLongAdjustAction(); + } + public static final class FieldOrderAction extends ImplicitAction { public final Schema.Field[] fields; public FieldOrderAction(Schema.Field[] fields) { diff --git a/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/Avro16Adapter.java b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/Avro16Adapter.java index f80dac16e..5cb932fde 100644 --- a/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/Avro16Adapter.java +++ b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/Avro16Adapter.java @@ -27,6 +27,7 @@ import com.linkedin.avroutil1.compatibility.avro16.backports.Avro16DefaultValuesCache; import com.linkedin.avroutil1.compatibility.avro16.backports.GenericDatumReaderExt; import com.linkedin.avroutil1.compatibility.avro16.backports.GenericDatumWriterExt; +import com.linkedin.avroutil1.compatibility.avro16.backports.SpecificDatumReaderExt; import com.linkedin.avroutil1.compatibility.avro16.backports.SpecificDatumWriterExt; import com.linkedin.avroutil1.compatibility.avro16.codec.AliasAwareSpecificDatumReader; import com.linkedin.avroutil1.compatibility.avro16.codec.BoundedMemoryDecoder; @@ -245,7 +246,7 @@ public DatumWriter newSpecificDatumWriter(Schema writer, SpecificData specifi @Override public DatumReader newSpecificDatumReader(Schema writer, Schema reader, SpecificData specificData) { - return new SpecificDatumReader<>(writer, reader, specificData); + return new SpecificDatumReaderExt<>(writer, reader, specificData); } @Override diff --git a/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/backports/GenericDatumReaderExt.java b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/backports/GenericDatumReaderExt.java index 7d6e494bc..4f286e8cf 100644 --- a/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/backports/GenericDatumReaderExt.java +++ b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/backports/GenericDatumReaderExt.java @@ -6,9 +6,14 @@ package com.linkedin.avroutil1.compatibility.avro16.backports; +import com.linkedin.avroutil1.compatibility.avro16.codec.CachedResolvingDecoder; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.Decoder; + +import java.io.IOException; /** @@ -21,4 +26,104 @@ public class GenericDatumReaderExt extends GenericDatumReader { public GenericDatumReaderExt(Schema writer, Schema reader, GenericData genericData) { super(writer, reader, genericData); } + + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + @Override + public T read(T reuse, Decoder in) throws IOException { + final Schema reader = getExpected(); + CachedResolvingDecoder resolver = new CachedResolvingDecoder(getSchema(), reader, in); + resolver.configure(in); + T result = (T) read(reuse, reader, resolver); + resolver.drain(); + return result; + } + + private Object read(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + switch (expected.getType()) { + case RECORD: + return readRecord(old, expected, in); + case ENUM: + return readEnum(expected, in); + case ARRAY: + return readArray(old, expected, in); + case MAP: + return readMap(old, expected, in); + case UNION: + return read(old, expected.getTypes().get(in.readIndex()), in); + case FIXED: + return readFixed(old, expected, in); + case STRING: + return readString(old, expected, in); + case BYTES: + return readBytes(old, in); + case INT: + return readInt(old, expected, in); + case LONG: + return in.readLong(); + case FLOAT: + return in.readFloat(); + case DOUBLE: + return in.readDouble(); + case BOOLEAN: + return in.readBoolean(); + case NULL: + in.readNull(); + return null; + default: + throw new AvroRuntimeException("Unknown type: " + expected); + } + } + + private Object readRecord(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + final GenericData data = getData(); + Object r = data.newRecord(old, expected); + + for (Schema.Field f : in.readFieldOrder()) { + int pos = f.pos(); + String name = f.name(); + Object oldDatum = (old!=null) ? data.getField(r, name, pos) : null; + data.setField(r, name, pos, read(oldDatum, f.schema(), in)); + } + + return r; + } + + private Object readArray(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema expectedType = expected.getElementType(); + long l = in.readArrayStart(); + long base = 0; + if (l > 0) { + Object array = newArray(old, (int) l, expected); + do { + for (long i = 0; i < l; i++) { + addToArray(array, base + i, read(peekArray(array), expectedType, in)); + } + base += l; + } while ((l = in.arrayNext()) > 0); + return array; + } else { + return newArray(old, 0, expected); + } + } + + private Object readMap(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema eValue = expected.getValueType(); + long l = in.readMapStart(); + Object map = newMap(old, (int) l); + if (l > 0) { + do { + for (int i = 0; i < l; i++) { + addToMap(map, readString(null, in), read(null, eValue, in)); + } + } while ((l = in.mapNext()) > 0); + } + return map; + } } diff --git a/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/backports/SpecificDatumReaderExt.java b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/backports/SpecificDatumReaderExt.java new file mode 100644 index 000000000..0f9be5e86 --- /dev/null +++ b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/backports/SpecificDatumReaderExt.java @@ -0,0 +1,130 @@ +/* + * Copyright 2025 LinkedIn Corp. + * Licensed under the BSD 2-Clause License (the "License"). + * See License in the project root for license information. + */ + +package com.linkedin.avroutil1.compatibility.avro16.backports; + +import com.linkedin.avroutil1.compatibility.avro16.codec.CachedResolvingDecoder; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.Decoder; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; + +import java.io.IOException; + + +/** + * this class allows constructing a {@link SpecificDatumReader} with + * a specified {@link SpecificData} instance under avro 1.6 + * @param + */ +public class SpecificDatumReaderExt extends SpecificDatumReader { + + public SpecificDatumReaderExt(Schema writer, Schema reader, SpecificData specificData) { + super(writer, reader, specificData); + } + + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + @Override + public T read(T reuse, Decoder in) throws IOException { + final Schema reader = getExpected(); + CachedResolvingDecoder resolver = new CachedResolvingDecoder(getSchema(), reader, in); + resolver.configure(in); + T result = (T) read(reuse, reader, resolver); + resolver.drain(); + return result; + } + + private Object read(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + switch (expected.getType()) { + case RECORD: + return readRecord(old, expected, in); + case ENUM: + return readEnum(expected, in); + case ARRAY: + return readArray(old, expected, in); + case MAP: + return readMap(old, expected, in); + case UNION: + return read(old, expected.getTypes().get(in.readIndex()), in); + case FIXED: + return readFixed(old, expected, in); + case STRING: + return readString(old, expected, in); + case BYTES: + return readBytes(old, in); + case INT: + return readInt(old, expected, in); + case LONG: + return in.readLong(); + case FLOAT: + return in.readFloat(); + case DOUBLE: + return in.readDouble(); + case BOOLEAN: + return in.readBoolean(); + case NULL: + in.readNull(); + return null; + default: + throw new AvroRuntimeException("Unknown type: " + expected); + } + } + + private Object readRecord(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + final GenericData data = getData(); + Object r = data.newRecord(old, expected); + + for (Schema.Field f : in.readFieldOrder()) { + int pos = f.pos(); + String name = f.name(); + Object oldDatum = (old!=null) ? data.getField(r, name, pos) : null; + data.setField(r, name, pos, read(oldDatum, f.schema(), in)); + } + + return r; + } + + private Object readArray(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema expectedType = expected.getElementType(); + long l = in.readArrayStart(); + long base = 0; + if (l > 0) { + Object array = newArray(old, (int) l, expected); + do { + for (long i = 0; i < l; i++) { + addToArray(array, base + i, read(peekArray(array), expectedType, in)); + } + base += l; + } while ((l = in.arrayNext()) > 0); + return array; + } else { + return newArray(old, 0, expected); + } + } + + private Object readMap(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema eValue = expected.getValueType(); + long l = in.readMapStart(); + Object map = newMap(old, (int) l); + if (l > 0) { + do { + for (int i = 0; i < l; i++) { + addToMap(map, readString(null, in), read(null, eValue, in)); + } + } while ((l = in.mapNext()) > 0); + } + return map; + } +} diff --git a/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/codec/ResolvingDecoder.java b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/codec/ResolvingDecoder.java index e449e82f8..bd44d71a0 100644 --- a/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/codec/ResolvingDecoder.java +++ b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/codec/ResolvingDecoder.java @@ -64,6 +64,23 @@ public final void drain() throws IOException { this.parser.processImplicitActions(); } + @Override + public int readInt() throws IOException { + Symbol actual = parser.popSymbol(); + if (actual == Symbol.INT) { + return in.readInt(); + } else if (actual == Symbol.IntLongAdjustAction.INSTANCE) { + long value = in.readLong(); + if (value < Integer.MIN_VALUE || value > Integer.MAX_VALUE) { + throw new AvroTypeException(value + " cannot be represented as int"); + } + + return (int) value; + } + + throw new AvroTypeException("Expected int but found " + actual); + } + public long readLong() throws IOException { Symbol actual = this.parser.advance(Symbol.LONG); if (actual == Symbol.INT) { @@ -146,6 +163,11 @@ public Symbol doAction(Symbol input, Symbol top) throws IOException { throw new AvroTypeException(((Symbol.ErrorAction)top).msg); } + if (top == Symbol.IntLongAdjustAction.INSTANCE) { + parser.pushSymbol(Symbol.INT); + return Symbol.INT; + } + if (top instanceof Symbol.DefaultStartAction) { Symbol.DefaultStartAction dsa = (Symbol.DefaultStartAction)top; this.backup = this.in; diff --git a/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/parsing/ResolvingGrammarGenerator.java b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/parsing/ResolvingGrammarGenerator.java index 94bb22273..e2e1d168f 100644 --- a/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/parsing/ResolvingGrammarGenerator.java +++ b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/parsing/ResolvingGrammarGenerator.java @@ -170,7 +170,11 @@ public Symbol generate( break; case NULL: case BOOLEAN: + break; case INT: + if (writerType == Schema.Type.LONG) { + return Symbol.IntLongAdjustAction.INSTANCE; + } case STRING: case BYTES: case ENUM: @@ -461,6 +465,11 @@ private static int bestBranch(Schema r, Schema w) { } break; case LONG: + // We can selectively demote long and check that it fits inside int range + // when reading an int from a long writer schema. + if (b.getType() == Schema.Type.INT) { + return j; + } case FLOAT: switch (b.getType()) { case DOUBLE: diff --git a/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/parsing/Symbol.java b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/parsing/Symbol.java index 3ad5e1b54..8a0121fbe 100644 --- a/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/parsing/Symbol.java +++ b/helper/impls/helper-impl-16/src/main/java/com/linkedin/avroutil1/compatibility/avro16/parsing/Symbol.java @@ -488,6 +488,10 @@ public FieldOrderAction(Schema.Field[] fields) { } } + public static class IntLongAdjustAction extends ImplicitAction { + public static final IntLongAdjustAction INSTANCE = new IntLongAdjustAction(); + } + public static class DefaultStartAction extends ImplicitAction { public final byte[] contents; public DefaultStartAction(byte[] contents) { diff --git a/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/Avro17Adapter.java b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/Avro17Adapter.java index 9da748762..cf9fd1669 100644 --- a/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/Avro17Adapter.java +++ b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/Avro17Adapter.java @@ -23,6 +23,8 @@ import com.linkedin.avroutil1.compatibility.SkipDecoder; import com.linkedin.avroutil1.compatibility.StringRepresentation; import com.linkedin.avroutil1.compatibility.avro17.backports.Avro17DefaultValuesCache; +import com.linkedin.avroutil1.compatibility.avro17.backports.GenericDatumReaderExt; +import com.linkedin.avroutil1.compatibility.avro17.backports.SpecificDatumReaderExt; import com.linkedin.avroutil1.compatibility.avro17.backports.SpecificDatumWriterExt; import com.linkedin.avroutil1.compatibility.avro17.codec.AliasAwareSpecificDatumReader; import com.linkedin.avroutil1.compatibility.avro17.codec.BoundedMemoryDecoder; @@ -38,7 +40,6 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaNormalization; import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.Avro17BinaryDecoderAccessUtil; import org.apache.avro.io.BinaryDecoder; @@ -270,7 +271,7 @@ public DatumWriter newGenericDatumWriter(Schema writer, GenericData genericDa @Override public DatumReader newGenericDatumReader(Schema writer, Schema reader, GenericData genericData) { - return new GenericDatumReader<>(writer, reader, genericData); + return new GenericDatumReaderExt<>(writer, reader, genericData); } @Override @@ -280,7 +281,7 @@ public DatumWriter newSpecificDatumWriter(Schema writer, SpecificData specifi @Override public DatumReader newSpecificDatumReader(Schema writer, Schema reader, SpecificData specificData) { - return new SpecificDatumReader<>(writer, reader, specificData); + return new SpecificDatumReaderExt<>(writer, reader, specificData); } @Override diff --git a/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/backports/GenericDatumReaderExt.java b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/backports/GenericDatumReaderExt.java new file mode 100644 index 000000000..a05ea957c --- /dev/null +++ b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/backports/GenericDatumReaderExt.java @@ -0,0 +1,132 @@ +/* + * Copyright 2025 LinkedIn Corp. + * Licensed under the BSD 2-Clause License (the "License"). + * See License in the project root for license information. + */ + +package com.linkedin.avroutil1.compatibility.avro17.backports; + +import com.linkedin.avroutil1.compatibility.avro17.codec.CachedResolvingDecoder; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.Decoder; + +import java.io.IOException; + + +/** + * this class allows constructing a {@link GenericDatumReader} with + * a specified {@link GenericData} instance under avro 1.7 + * @param + */ +public class GenericDatumReaderExt extends GenericDatumReader { + + public GenericDatumReaderExt(Schema writer, Schema reader, GenericData genericData) { + super(writer, reader, genericData); + } + + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + @Override + public T read(T reuse, Decoder in) throws IOException { + final Schema reader = getExpected(); + CachedResolvingDecoder resolver = new CachedResolvingDecoder(getSchema(), reader, in); + resolver.configure(in); + T result = (T) read(reuse, reader, resolver); + resolver.drain(); + return result; + } + + private Object read(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + switch (expected.getType()) { + case RECORD: + return readRecord(old, expected, in); + case ENUM: + return readEnum(expected, in); + case ARRAY: + return readArray(old, expected, in); + case MAP: + return readMap(old, expected, in); + case UNION: + return read(old, expected.getTypes().get(in.readIndex()), in); + case FIXED: + return readFixed(old, expected, in); + case STRING: + return readString(old, expected, in); + case BYTES: + return readBytes(old, in); + case INT: + return readInt(old, expected, in); + case LONG: + return in.readLong(); + case FLOAT: + return in.readFloat(); + case DOUBLE: + return in.readDouble(); + case BOOLEAN: + return in.readBoolean(); + case NULL: + in.readNull(); + return null; + default: + throw new AvroRuntimeException("Unknown type: " + expected); + } + } + + private Object readRecord(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + final GenericData data = getData(); + Object r = data.newRecord(old, expected); + + for (Schema.Field f : in.readFieldOrder()) { + int pos = f.pos(); + String name = f.name(); + Object oldDatum = null; + if (old != null) { + oldDatum = data.getField(r, name, pos); + } + data.setField(r, f.name(), f.pos(), read(oldDatum, f.schema(), in)); + } + + return r; + } + + private Object readArray(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema expectedType = expected.getElementType(); + long l = in.readArrayStart(); + long base = 0; + if (l > 0) { + Object array = newArray(old, (int) l, expected); + do { + for (long i = 0; i < l; i++) { + addToArray(array, base + i, read(peekArray(array), expectedType, in)); + } + base += l; + } while ((l = in.arrayNext()) > 0); + return array; + } else { + return newArray(old, 0, expected); + } + } + + private Object readMap(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema eValue = expected.getValueType(); + long l = in.readMapStart(); + Object map = newMap(old, (int) l); + if (l > 0) { + do { + for (int i = 0; i < l; i++) { + addToMap(map, readString(null, in), read(null, eValue, in)); + } + } while ((l = in.mapNext()) > 0); + } + return map; + } +} diff --git a/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/backports/SpecificDatumReaderExt.java b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/backports/SpecificDatumReaderExt.java new file mode 100644 index 000000000..72a01910b --- /dev/null +++ b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/backports/SpecificDatumReaderExt.java @@ -0,0 +1,133 @@ +/* + * Copyright 2025 LinkedIn Corp. + * Licensed under the BSD 2-Clause License (the "License"). + * See License in the project root for license information. + */ + +package com.linkedin.avroutil1.compatibility.avro17.backports; + +import com.linkedin.avroutil1.compatibility.avro17.codec.CachedResolvingDecoder; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.Decoder; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; + +import java.io.IOException; + + +/** + * this class allows constructing a {@link SpecificDatumReader} with + * a specified {@link SpecificData} instance under avro 1.7 + * @param + */ +public class SpecificDatumReaderExt extends SpecificDatumReader { + + public SpecificDatumReaderExt(Schema writer, Schema reader, SpecificData specificData) { + super(writer, reader, specificData); + } + + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + @Override + public T read(T reuse, Decoder in) throws IOException { + final Schema reader = getExpected(); + CachedResolvingDecoder resolver = new CachedResolvingDecoder(getSchema(), reader, in); + resolver.configure(in); + T result = (T) read(reuse, reader, resolver); + resolver.drain(); + return result; + } + + private Object read(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + switch (expected.getType()) { + case RECORD: + return readRecord(old, expected, in); + case ENUM: + return readEnum(expected, in); + case ARRAY: + return readArray(old, expected, in); + case MAP: + return readMap(old, expected, in); + case UNION: + return read(old, expected.getTypes().get(in.readIndex()), in); + case FIXED: + return readFixed(old, expected, in); + case STRING: + return readString(old, expected, in); + case BYTES: + return readBytes(old, in); + case INT: + return readInt(old, expected, in); + case LONG: + return in.readLong(); + case FLOAT: + return in.readFloat(); + case DOUBLE: + return in.readDouble(); + case BOOLEAN: + return in.readBoolean(); + case NULL: + in.readNull(); + return null; + default: + throw new AvroRuntimeException("Unknown type: " + expected); + } + } + + private Object readRecord(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + final GenericData data = getData(); + Object r = data.newRecord(old, expected); + + for (Schema.Field f : in.readFieldOrder()) { + int pos = f.pos(); + String name = f.name(); + Object oldDatum = null; + if (old != null) { + oldDatum = data.getField(r, name, pos); + } + data.setField(r, f.name(), f.pos(), read(oldDatum, f.schema(), in)); + } + + return r; + } + + private Object readArray(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema expectedType = expected.getElementType(); + long l = in.readArrayStart(); + long base = 0; + if (l > 0) { + Object array = newArray(old, (int) l, expected); + do { + for (long i = 0; i < l; i++) { + addToArray(array, base + i, read(peekArray(array), expectedType, in)); + } + base += l; + } while ((l = in.arrayNext()) > 0); + return array; + } else { + return newArray(old, 0, expected); + } + } + + private Object readMap(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema eValue = expected.getValueType(); + long l = in.readMapStart(); + Object map = newMap(old, (int) l); + if (l > 0) { + do { + for (int i = 0; i < l; i++) { + addToMap(map, readString(null, in), read(null, eValue, in)); + } + } while ((l = in.mapNext()) > 0); + } + return map; + } +} diff --git a/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/codec/ResolvingDecoder.java b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/codec/ResolvingDecoder.java index 1f85435d4..ff656b811 100644 --- a/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/codec/ResolvingDecoder.java +++ b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/codec/ResolvingDecoder.java @@ -68,6 +68,23 @@ public final void drain() throws IOException { this.parser.processImplicitActions(); } + @Override + public int readInt() throws IOException { + Symbol actual = parser.popSymbol(); + if (actual == Symbol.INT) { + return in.readInt(); + } else if (actual == Symbol.IntLongAdjustAction.INSTANCE) { + long value = in.readLong(); + if (value < Integer.MIN_VALUE || value > Integer.MAX_VALUE) { + throw new AvroTypeException(value + " cannot be represented as int"); + } + + return (int) value; + } + + throw new AvroTypeException("Expected int but found " + actual); + } + public long readLong() throws IOException { Symbol actual = this.parser.advance(Symbol.LONG); if (actual == Symbol.INT) { @@ -208,6 +225,11 @@ public Symbol doAction(Symbol input, Symbol top) throws IOException { throw new AvroTypeException(((Symbol.ErrorAction)top).msg); } + if (top == Symbol.IntLongAdjustAction.INSTANCE) { + parser.pushSymbol(Symbol.INT); + return Symbol.INT; + } + if (top instanceof Symbol.DefaultStartAction) { Symbol.DefaultStartAction dsa = (Symbol.DefaultStartAction)top; this.backup = this.in; diff --git a/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/parsing/ResolvingGrammarGenerator.java b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/parsing/ResolvingGrammarGenerator.java index 028cc5754..540230a6b 100644 --- a/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/parsing/ResolvingGrammarGenerator.java +++ b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/parsing/ResolvingGrammarGenerator.java @@ -184,7 +184,11 @@ public Symbol generate( break; case NULL: case BOOLEAN: + break; case INT: + if (writerType == Schema.Type.LONG) { + return Symbol.IntLongAdjustAction.INSTANCE; + } case ENUM: case ARRAY: case MAP: @@ -475,6 +479,11 @@ private static int bestBranch(Schema r, Schema w) { } break; case LONG: + // We can selectively demote long and check that it fits inside int range + // when reading an int from a long writer schema. + if (b.getType() == Schema.Type.INT) { + return j; + } case FLOAT: switch (b.getType()) { case DOUBLE: diff --git a/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/parsing/Symbol.java b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/parsing/Symbol.java index 832c780a9..f25a40b92 100644 --- a/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/parsing/Symbol.java +++ b/helper/impls/helper-impl-17/src/main/java/com/linkedin/avroutil1/compatibility/avro17/parsing/Symbol.java @@ -551,6 +551,10 @@ public static final class FieldOrderAction extends ImplicitAction { } } + public static class IntLongAdjustAction extends ImplicitAction { + public static final IntLongAdjustAction INSTANCE = new IntLongAdjustAction(); + } + public static DefaultStartAction defaultStartAction(byte[] contents) { return new DefaultStartAction(contents); } diff --git a/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/Avro18Adapter.java b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/Avro18Adapter.java index 56a944fc1..fc3b3e7ff 100644 --- a/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/Avro18Adapter.java +++ b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/Avro18Adapter.java @@ -23,6 +23,8 @@ import com.linkedin.avroutil1.compatibility.SkipDecoder; import com.linkedin.avroutil1.compatibility.StringRepresentation; import com.linkedin.avroutil1.compatibility.avro18.backports.Avro18DefaultValuesCache; +import com.linkedin.avroutil1.compatibility.avro18.backports.GenericDatumReaderExt; +import com.linkedin.avroutil1.compatibility.avro18.backports.SpecificDatumReaderExt; import com.linkedin.avroutil1.compatibility.avro18.codec.AliasAwareSpecificDatumReader; import com.linkedin.avroutil1.compatibility.avro18.codec.BoundedMemoryDecoder; import com.linkedin.avroutil1.compatibility.avro18.codec.CachedResolvingDecoder; @@ -36,7 +38,6 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaNormalization; import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.Avro18BinaryDecoderAccessUtil; import org.apache.avro.io.BinaryDecoder; @@ -232,7 +233,7 @@ public DatumWriter newGenericDatumWriter(Schema writer, GenericData genericDa @Override public DatumReader newGenericDatumReader(Schema writer, Schema reader, GenericData genericData) { - return new GenericDatumReader<>(writer, reader, genericData); + return new GenericDatumReaderExt<>(writer, reader, genericData); } @Override @@ -242,7 +243,7 @@ public DatumWriter newSpecificDatumWriter(Schema writer, SpecificData specifi @Override public DatumReader newSpecificDatumReader(Schema writer, Schema reader, SpecificData specificData) { - return new SpecificDatumReader<>(writer, reader, specificData); + return new SpecificDatumReaderExt<>(writer, reader, specificData); } @Override diff --git a/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/backports/GenericDatumReaderExt.java b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/backports/GenericDatumReaderExt.java new file mode 100644 index 000000000..df1c80b52 --- /dev/null +++ b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/backports/GenericDatumReaderExt.java @@ -0,0 +1,150 @@ +/* + * Copyright 2025 LinkedIn Corp. + * Licensed under the BSD 2-Clause License (the "License"). + * See License in the project root for license information. + */ + +package com.linkedin.avroutil1.compatibility.avro18.backports; + +import com.linkedin.avroutil1.compatibility.avro18.codec.CachedResolvingDecoder; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.generic.Avro18GenericDataAccessUtil; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.Decoder; + +import java.io.IOException; + + +/** + * this class allows constructing a {@link GenericDatumReader} with + * a specified {@link GenericData} instance under avro 1.8 + * + * @param + */ +public class GenericDatumReaderExt extends GenericDatumReader { + + public GenericDatumReaderExt(Schema writer, Schema reader, GenericData genericData) { + super(writer, reader, genericData); + } + + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + @Override + public T read(T reuse, Decoder in) throws IOException { + final Schema reader = getExpected(); + CachedResolvingDecoder resolver = new CachedResolvingDecoder(getSchema(), reader, in); + resolver.configure(in); + T result = (T) read(reuse, reader, resolver); + resolver.drain(); + return result; + } + + private Object read(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Object datum = readWithoutConversion(old, expected, in); + LogicalType logicalType = expected.getLogicalType(); + if (logicalType != null) { + Conversion conversion = getData().getConversionFor(logicalType); + if (conversion != null) { + return convert(datum, expected, logicalType, conversion); + } + } + return datum; + } + + private Object readWithoutConversion(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + switch (expected.getType()) { + case RECORD: + return readRecord(old, expected, in); + case ENUM: + return readEnum(expected, in); + case ARRAY: + return readArray(old, expected, in); + case MAP: + return readMap(old, expected, in); + case UNION: + return read(old, expected.getTypes().get(in.readIndex()), in); + case FIXED: + return readFixed(old, expected, in); + case STRING: + return readString(old, expected, in); + case BYTES: + return readBytes(old, expected, in); + case INT: + return readInt(old, expected, in); + case LONG: + return in.readLong(); + case FLOAT: + return in.readFloat(); + case DOUBLE: + return in.readDouble(); + case BOOLEAN: + return in.readBoolean(); + case NULL: + in.readNull(); + return null; + default: + throw new AvroRuntimeException("Unknown type: " + expected); + } + } + + private Object readRecord(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + final GenericData data = getData(); + Object r = data.newRecord(old, expected); + Object state = Avro18GenericDataAccessUtil.getRecordState(data, r, expected); + + for (Schema.Field f : in.readFieldOrder()) { + int pos = f.pos(); + String name = f.name(); + Object oldDatum = null; + if (old != null) { + oldDatum = Avro18GenericDataAccessUtil.getField(data, r, name, pos, state); + } + Avro18GenericDataAccessUtil.setField(getData(), r, f.name(), f.pos(), read(oldDatum, f.schema(), in), state); + } + + return r; + } + + private Object readArray(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema expectedType = expected.getElementType(); + long l = in.readArrayStart(); + long base = 0; + if (l > 0) { + Object array = newArray(old, (int) l, expected); + do { + for (long i = 0; i < l; i++) { + addToArray(array, base + i, read(peekArray(array), expectedType, in)); + } + base += l; + } while ((l = in.arrayNext()) > 0); + return array; + } else { + return newArray(old, 0, expected); + } + } + + private Object readMap(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema eValue = expected.getValueType(); + long l = in.readMapStart(); + Object map = newMap(old, (int) l); + if (l > 0) { + do { + for (int i = 0; i < l; i++) { + addToMap(map, readString(null, in), read(null, eValue, in)); + } + } while ((l = in.mapNext()) > 0); + } + return map; + } +} diff --git a/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/backports/SpecificDatumReaderExt.java b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/backports/SpecificDatumReaderExt.java new file mode 100644 index 000000000..67e3395b5 --- /dev/null +++ b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/backports/SpecificDatumReaderExt.java @@ -0,0 +1,182 @@ +/* + * Copyright 2025 LinkedIn Corp. + * Licensed under the BSD 2-Clause License (the "License"). + * See License in the project root for license information. + */ + +package com.linkedin.avroutil1.compatibility.avro18.backports; + +import com.linkedin.avroutil1.compatibility.avro18.codec.CachedResolvingDecoder; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.generic.Avro18GenericDataAccessUtil; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.Decoder; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecordBase; + +import java.io.IOException; + + +/** + * this class allows constructing a {@link SpecificDatumReader} with + * a specified {@link SpecificData} instance under avro 1.8 + * + * @param + */ +public class SpecificDatumReaderExt extends SpecificDatumReader { + + public SpecificDatumReaderExt(Schema writer, Schema reader, SpecificData specificData) { + super(writer, reader, specificData); + } + + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + @Override + public T read(T reuse, Decoder in) throws IOException { + final Schema reader = getExpected(); + CachedResolvingDecoder resolver = new CachedResolvingDecoder(getSchema(), reader, in); + resolver.configure(in); + T result = (T) read(reuse, reader, resolver); + resolver.drain(); + return result; + } + + private Object read(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Object datum = readWithoutConversion(old, expected, in); + LogicalType logicalType = expected.getLogicalType(); + if (logicalType != null) { + Conversion conversion = getData().getConversionFor(logicalType); + if (conversion != null) { + return convert(datum, expected, logicalType, conversion); + } + } + return datum; + } + + private Object readWithoutConversion(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + switch (expected.getType()) { + case RECORD: + return readRecord(old, expected, in); + case ENUM: + return readEnum(expected, in); + case ARRAY: + return readArray(old, expected, in); + case MAP: + return readMap(old, expected, in); + case UNION: + return read(old, expected.getTypes().get(in.readIndex()), in); + case FIXED: + return readFixed(old, expected, in); + case STRING: + return readString(old, expected, in); + case BYTES: + return readBytes(old, expected, in); + case INT: + return readInt(old, expected, in); + case LONG: + return in.readLong(); + case FLOAT: + return in.readFloat(); + case DOUBLE: + return in.readDouble(); + case BOOLEAN: + return in.readBoolean(); + case NULL: + in.readNull(); + return null; + default: + throw new AvroRuntimeException("Unknown type: " + expected); + } + } + + private Object readRecord(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + final GenericData data = getData(); + Object r = data.newRecord(old, expected); + Object state = Avro18GenericDataAccessUtil.getRecordState(data, r, expected); + + for (Schema.Field f : in.readFieldOrder()) { + int pos = f.pos(); + String name = f.name(); + Object oldDatum = null; + if (old != null) { + oldDatum = Avro18GenericDataAccessUtil.getField(data, r, name, pos, state); + } + readField(r, f, oldDatum, in, state); + } + + return r; + } + + private Object readArray(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema expectedType = expected.getElementType(); + long l = in.readArrayStart(); + long base = 0; + if (l > 0) { + Object array = newArray(old, (int) l, expected); + do { + for (long i = 0; i < l; i++) { + addToArray(array, base + i, read(peekArray(array), expectedType, in)); + } + base += l; + } while ((l = in.arrayNext()) > 0); + return array; + } else { + return newArray(old, 0, expected); + } + } + + private Object readMap(Object old, Schema expected, + CachedResolvingDecoder in) throws IOException { + Schema eValue = expected.getValueType(); + long l = in.readMapStart(); + Object map = newMap(old, (int) l); + if (l > 0) { + do { + for (int i = 0; i < l; i++) { + addToMap(map, readString(null, in), read(null, eValue, in)); + } + } while ((l = in.mapNext()) > 0); + } + return map; + } + + private void readField(Object r, Schema.Field f, Object oldDatum, + CachedResolvingDecoder in, Object state) + throws IOException { + if (r instanceof SpecificRecordBase) { + Conversion conversion = ((SpecificRecordBase) r).getConversion(f.pos()); + + Object datum; + if (conversion != null) { + datum = readWithConversion( + oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in); + } else { + datum = readWithoutConversion(oldDatum, f.schema(), in); + } + + getData().setField(r, f.name(), f.pos(), datum); + + } else { + Avro18GenericDataAccessUtil.setField(getData(), r, f.name(), f.pos(), + read(oldDatum, f.schema(), in), state); + } + } + + private Object readWithConversion(Object old, Schema expected, + LogicalType logicalType, + Conversion conversion, + CachedResolvingDecoder in) throws IOException { + return convert(readWithoutConversion(old, expected, in), + expected, logicalType, conversion); + } +} diff --git a/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/codec/ResolvingDecoder.java b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/codec/ResolvingDecoder.java index 1f47727d2..765447e94 100644 --- a/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/codec/ResolvingDecoder.java +++ b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/codec/ResolvingDecoder.java @@ -68,6 +68,23 @@ public final void drain() throws IOException { this.parser.processImplicitActions(); } + @Override + public int readInt() throws IOException { + Symbol actual = parser.popSymbol(); + if (actual == Symbol.INT) { + return in.readInt(); + } else if (actual == Symbol.IntLongAdjustAction.INSTANCE) { + long value = in.readLong(); + if (value < Integer.MIN_VALUE || value > Integer.MAX_VALUE) { + throw new AvroTypeException(value + " cannot be represented as int"); + } + + return (int) value; + } + + throw new AvroTypeException("Expected int but found " + actual); + } + public long readLong() throws IOException { Symbol actual = this.parser.advance(Symbol.LONG); if (actual == Symbol.INT) { @@ -208,6 +225,11 @@ public Symbol doAction(Symbol input, Symbol top) throws IOException { throw new AvroTypeException(((Symbol.ErrorAction)top).msg); } + if (top == Symbol.IntLongAdjustAction.INSTANCE) { + parser.pushSymbol(Symbol.INT); + return Symbol.INT; + } + if (top instanceof Symbol.DefaultStartAction) { Symbol.DefaultStartAction dsa = (Symbol.DefaultStartAction)top; this.backup = this.in; diff --git a/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/parsing/ResolvingGrammarGenerator.java b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/parsing/ResolvingGrammarGenerator.java index fb1948a87..d9348c643 100644 --- a/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/parsing/ResolvingGrammarGenerator.java +++ b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/parsing/ResolvingGrammarGenerator.java @@ -184,7 +184,11 @@ public Symbol generate( break; case NULL: case BOOLEAN: + break; case INT: + if (writerType == Schema.Type.LONG) { + return Symbol.IntLongAdjustAction.INSTANCE; + } case ENUM: case ARRAY: case MAP: @@ -516,6 +520,11 @@ private int bestBranch(Schema r, Schema w, Map seen, boolean useFq } break; case LONG: + // We can selectively demote long and check that it fits inside int range + // when reading an int from a long writer schema. + if (b.getType() == Schema.Type.INT) { + return j; + } case FLOAT: switch (b.getType()) { case DOUBLE: diff --git a/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/parsing/Symbol.java b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/parsing/Symbol.java index 5be985f91..d75392019 100644 --- a/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/parsing/Symbol.java +++ b/helper/impls/helper-impl-18/src/main/java/com/linkedin/avroutil1/compatibility/avro18/parsing/Symbol.java @@ -573,6 +573,10 @@ public static final class FieldOrderAction extends ImplicitAction { } } + public static class IntLongAdjustAction extends ImplicitAction { + public static final IntLongAdjustAction INSTANCE = new IntLongAdjustAction(); + } + public static DefaultStartAction defaultStartAction(byte[] contents) { return new DefaultStartAction(contents); } diff --git a/helper/impls/helper-impl-18/src/main/java/org/apache/avro/generic/Avro18GenericDataAccessUtil.java b/helper/impls/helper-impl-18/src/main/java/org/apache/avro/generic/Avro18GenericDataAccessUtil.java new file mode 100644 index 000000000..b50f4b51f --- /dev/null +++ b/helper/impls/helper-impl-18/src/main/java/org/apache/avro/generic/Avro18GenericDataAccessUtil.java @@ -0,0 +1,29 @@ +/* + * Copyright 2025 LinkedIn Corp. + * Licensed under the BSD 2-Clause License (the "License"). + * See License in the project root for license information. + */ + +package org.apache.avro.generic; + +import org.apache.avro.Schema; + +/** + * this class exists to allow us access to package-private classes and methods on class {@link GenericData} + */ +public class Avro18GenericDataAccessUtil { + private Avro18GenericDataAccessUtil() { + } + + public static Object getRecordState(GenericData data, Object record, Schema schema) { + return data.getRecordState(record, schema); + } + + public static Object getField(GenericData data, Object record, String name, int pos, Object state) { + return data.getField(record, name, pos, state); + } + + public static void setField(GenericData data, Object record, String name, int pos, Object value, Object state) { + data.setField(record, name, pos, value, state); + } +} diff --git a/helper/tests/codegen-15/src/main/raw-avro/by15/IntRecord.avsc b/helper/tests/codegen-15/src/main/raw-avro/by15/IntRecord.avsc new file mode 100644 index 000000000..eafebd345 --- /dev/null +++ b/helper/tests/codegen-15/src/main/raw-avro/by15/IntRecord.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "namespace": "by15", + "name": "IntRecord", + "fields": [ + { + "name": "unionField", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "field", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/helper/tests/codegen-15/src/main/raw-avro/by15/LongRecord.avsc b/helper/tests/codegen-15/src/main/raw-avro/by15/LongRecord.avsc new file mode 100644 index 000000000..c39ba7335 --- /dev/null +++ b/helper/tests/codegen-15/src/main/raw-avro/by15/LongRecord.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "namespace": "by15", + "name": "LongRecord", + "fields": [ + { + "name": "unionField", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "field", + "type": "long" + } + ] +} \ No newline at end of file diff --git a/helper/tests/codegen-16/src/main/raw-avro/by16/IntRecord.avsc b/helper/tests/codegen-16/src/main/raw-avro/by16/IntRecord.avsc new file mode 100644 index 000000000..f37656601 --- /dev/null +++ b/helper/tests/codegen-16/src/main/raw-avro/by16/IntRecord.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "namespace": "by16", + "name": "IntRecord", + "fields": [ + { + "name": "unionField", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "field", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/helper/tests/codegen-16/src/main/raw-avro/by16/LongRecord.avsc b/helper/tests/codegen-16/src/main/raw-avro/by16/LongRecord.avsc new file mode 100644 index 000000000..818b83c90 --- /dev/null +++ b/helper/tests/codegen-16/src/main/raw-avro/by16/LongRecord.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "namespace": "by16", + "name": "LongRecord", + "fields": [ + { + "name": "unionField", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "field", + "type": "long" + } + ] +} \ No newline at end of file diff --git a/helper/tests/codegen-17/src/main/raw-avro/by17/IntRecord.avsc b/helper/tests/codegen-17/src/main/raw-avro/by17/IntRecord.avsc new file mode 100644 index 000000000..d5b5c80a3 --- /dev/null +++ b/helper/tests/codegen-17/src/main/raw-avro/by17/IntRecord.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "namespace": "by17", + "name": "IntRecord", + "fields": [ + { + "name": "unionField", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "field", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/helper/tests/codegen-17/src/main/raw-avro/by17/LongRecord.avsc b/helper/tests/codegen-17/src/main/raw-avro/by17/LongRecord.avsc new file mode 100644 index 000000000..12e8b8623 --- /dev/null +++ b/helper/tests/codegen-17/src/main/raw-avro/by17/LongRecord.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "namespace": "by17", + "name": "LongRecord", + "fields": [ + { + "name": "unionField", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "field", + "type": "long" + } + ] +} \ No newline at end of file diff --git a/helper/tests/codegen-18/src/main/raw-avro/by18/IntRecord.avsc b/helper/tests/codegen-18/src/main/raw-avro/by18/IntRecord.avsc new file mode 100644 index 000000000..40397758b --- /dev/null +++ b/helper/tests/codegen-18/src/main/raw-avro/by18/IntRecord.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "namespace": "by18", + "name": "IntRecord", + "fields": [ + { + "name": "unionField", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "field", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/helper/tests/codegen-18/src/main/raw-avro/by18/LongRecord.avsc b/helper/tests/codegen-18/src/main/raw-avro/by18/LongRecord.avsc new file mode 100644 index 000000000..8f78a8b0d --- /dev/null +++ b/helper/tests/codegen-18/src/main/raw-avro/by18/LongRecord.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "namespace": "by18", + "name": "LongRecord", + "fields": [ + { + "name": "unionField", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "field", + "type": "long" + } + ] +} \ No newline at end of file diff --git a/helper/tests/helper-tests-15/src/test/java/com/linkedin/avroutil1/compatibility/avro15/AvroCompatibilityHelperAvro15Test.java b/helper/tests/helper-tests-15/src/test/java/com/linkedin/avroutil1/compatibility/avro15/AvroCompatibilityHelperAvro15Test.java index 585a4ad66..f7e9e1e39 100644 --- a/helper/tests/helper-tests-15/src/test/java/com/linkedin/avroutil1/compatibility/avro15/AvroCompatibilityHelperAvro15Test.java +++ b/helper/tests/helper-tests-15/src/test/java/com/linkedin/avroutil1/compatibility/avro15/AvroCompatibilityHelperAvro15Test.java @@ -6,10 +6,24 @@ package com.linkedin.avroutil1.compatibility.avro15; +import by15.IntRecord; +import by15.LongRecord; +import org.apache.avro.AvroTypeException; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; import com.linkedin.avroutil1.Pojo; import com.linkedin.avroutil1.testcommon.TestUtil; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.avroutil1.compatibility.AvroVersion; + +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -80,4 +94,110 @@ public void testCreateSchemaFieldWithProvidedDefaultValue() throws IOException { List> actualListValue = mapper.convertValue(actualJsonNode, new TypeReference>>(){}); Assert.assertEquals(actualListValue.get(0).get(0), "dummyElement"); } + + @Test + public void testIntRoundtrip() throws IOException { + IntRecord intRecord = new IntRecord(); + intRecord.field = 42; + intRecord.unionField = 55; + byte[] binary = toBinary(intRecord); + + IntRecord roundtrip = toSpecificRecord(binary, IntRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(roundtrip.field, 42); + Assert.assertEquals(roundtrip.unionField.intValue(), 55); + + GenericRecord genericRecord = toGenericRecord(binary, IntRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42); + Assert.assertEquals(genericRecord.get("unionField"), 55); + } + + @Test + public void testLongRoundtrip() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = 42L; + longRecord.unionField = 55L; + byte[] binary = toBinary(longRecord); + + LongRecord roundtrip = toSpecificRecord(binary, LongRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(roundtrip.field, 42L); + Assert.assertEquals(roundtrip.unionField.longValue(), 55L); + + GenericRecord genericRecord = toGenericRecord(binary, LongRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42L); + Assert.assertEquals(genericRecord.get("unionField"), 55L); + } + + @Test + public void testIntToLongPromotion() throws IOException { + IntRecord intRecord = new IntRecord(); + intRecord.field = 42; + intRecord.unionField = 55; + byte[] binary = toBinary(intRecord); + + LongRecord longRecord = toSpecificRecord(binary, IntRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(longRecord.field, 42L); + Assert.assertEquals(longRecord.unionField.longValue(), 55L); + + GenericRecord genericRecord = toGenericRecord(binary, IntRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42L); + Assert.assertEquals(genericRecord.get("unionField"), 55L); + } + + @Test + public void testLongToIntDemotion() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = 42L; + longRecord.unionField = 55L; + byte[] binary = toBinary(longRecord); + + IntRecord intRecord = toSpecificRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(intRecord.field, 42); + Assert.assertEquals(intRecord.unionField.intValue(), 55); + + GenericRecord genericRecord = toGenericRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42); + Assert.assertEquals(genericRecord.get("unionField"), 55); + } + + @Test + public void testLongToIntDemotionOutOfRange() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = (long) Integer.MAX_VALUE + 1L; + byte[] binary = toBinary(longRecord); + + LongRecord longRecord2 = new LongRecord(); + longRecord2.unionField = (long) Integer.MIN_VALUE - 1L; + byte[] binary2 = toBinary(longRecord2); + + Assert.assertThrows(AvroTypeException.class, () -> toSpecificRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + Assert.assertThrows(AvroTypeException.class, () -> toGenericRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + + Assert.assertThrows(AvroTypeException.class, () -> toSpecificRecord(binary2, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + Assert.assertThrows(AvroTypeException.class, () -> toGenericRecord(binary2, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + } + + private byte[] toBinary(IndexedRecord record) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryEncoder encoder = AvroCompatibilityHelper.newBinaryEncoder(baos); + GenericDatumWriter writer = new GenericDatumWriter<>(record.getSchema()); + writer.write(record, encoder); + encoder.flush(); + return baos.toByteArray(); + } + + private T toSpecificRecord(byte[] binary, + Schema writerSchema, + Schema readerSchema) throws IOException { + BinaryDecoder decoder = AvroCompatibilityHelper.newBinaryDecoder(binary); + DatumReader reader = AvroCompatibilityHelper.newSpecificDatumReader(writerSchema, readerSchema, SpecificData.get()); + return reader.read(null, decoder); + } + + private GenericRecord toGenericRecord(byte[] binary, + Schema writerSchema, + Schema readerSchema) throws IOException { + BinaryDecoder decoder = AvroCompatibilityHelper.newBinaryDecoder(binary); + DatumReader reader = AvroCompatibilityHelper.newGenericDatumReader(writerSchema, readerSchema, GenericData.get()); + return reader.read(null, decoder); + } } diff --git a/helper/tests/helper-tests-16/src/test/java/com/linkedin/avroutil1/compatibility/avro16/AvroCompatibilityHelperAvro16Test.java b/helper/tests/helper-tests-16/src/test/java/com/linkedin/avroutil1/compatibility/avro16/AvroCompatibilityHelperAvro16Test.java index 5403bce5c..be0f49ae9 100644 --- a/helper/tests/helper-tests-16/src/test/java/com/linkedin/avroutil1/compatibility/avro16/AvroCompatibilityHelperAvro16Test.java +++ b/helper/tests/helper-tests-16/src/test/java/com/linkedin/avroutil1/compatibility/avro16/AvroCompatibilityHelperAvro16Test.java @@ -6,15 +6,30 @@ package com.linkedin.avroutil1.compatibility.avro16; +import by16.IntRecord; +import by16.LongRecord; import com.linkedin.avroutil1.Pojo; import com.linkedin.avroutil1.testcommon.TestUtil; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.avroutil1.compatibility.AvroVersion; + +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; + +import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; @@ -80,4 +95,110 @@ public void testCreateSchemaFieldWithProvidedDefaultValue() throws IOException { List> actualListValue = mapper.convertValue(actualJsonNode, new TypeReference>>(){}); Assert.assertEquals(actualListValue.get(0).get(0), "dummyElement"); } + + @Test + public void testIntRoundtrip() throws IOException { + IntRecord intRecord = new IntRecord(); + intRecord.field = 42; + intRecord.unionField = 55; + byte[] binary = toBinary(intRecord); + + IntRecord roundtrip = toSpecificRecord(binary, IntRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(roundtrip.field, 42); + Assert.assertEquals(roundtrip.unionField.intValue(), 55); + + GenericRecord genericRecord = toGenericRecord(binary, IntRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42); + Assert.assertEquals(genericRecord.get("unionField"), 55); + } + + @Test + public void testLongRoundtrip() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = 42L; + longRecord.unionField = 55L; + byte[] binary = toBinary(longRecord); + + LongRecord roundtrip = toSpecificRecord(binary, LongRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(roundtrip.field, 42L); + Assert.assertEquals(roundtrip.unionField.longValue(), 55L); + + GenericRecord genericRecord = toGenericRecord(binary, LongRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42L); + Assert.assertEquals(genericRecord.get("unionField"), 55L); + } + + @Test + public void testIntToLongPromotion() throws IOException { + IntRecord intRecord = new IntRecord(); + intRecord.field = 42; + intRecord.unionField = 55; + byte[] binary = toBinary(intRecord); + + LongRecord longRecord = toSpecificRecord(binary, IntRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(longRecord.field, 42L); + Assert.assertEquals(longRecord.unionField.longValue(), 55L); + + GenericRecord genericRecord = toGenericRecord(binary, IntRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42L); + Assert.assertEquals(genericRecord.get("unionField"), 55L); + } + + @Test + public void testLongToIntDemotion() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = 42L; + longRecord.unionField = 55L; + byte[] binary = toBinary(longRecord); + + IntRecord intRecord = toSpecificRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(intRecord.field, 42); + Assert.assertEquals(intRecord.unionField.intValue(), 55); + + GenericRecord genericRecord = toGenericRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42); + Assert.assertEquals(genericRecord.get("unionField"), 55); + } + + @Test + public void testLongToIntDemotionOutOfRange() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = (long) Integer.MAX_VALUE + 1L; + byte[] binary = toBinary(longRecord); + + LongRecord longRecord2 = new LongRecord(); + longRecord2.unionField = (long) Integer.MIN_VALUE - 1L; + byte[] binary2 = toBinary(longRecord2); + + Assert.assertThrows(AvroTypeException.class, () -> toSpecificRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + Assert.assertThrows(AvroTypeException.class, () -> toGenericRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + + Assert.assertThrows(AvroTypeException.class, () -> toSpecificRecord(binary2, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + Assert.assertThrows(AvroTypeException.class, () -> toGenericRecord(binary2, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + } + + private byte[] toBinary(IndexedRecord record) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryEncoder encoder = AvroCompatibilityHelper.newBinaryEncoder(baos); + GenericDatumWriter writer = new GenericDatumWriter<>(record.getSchema()); + writer.write(record, encoder); + encoder.flush(); + return baos.toByteArray(); + } + + private T toSpecificRecord(byte[] binary, + Schema writerSchema, + Schema readerSchema) throws IOException { + BinaryDecoder decoder = AvroCompatibilityHelper.newBinaryDecoder(binary); + DatumReader reader = AvroCompatibilityHelper.newSpecificDatumReader(writerSchema, readerSchema, SpecificData.get()); + return reader.read(null, decoder); + } + + private GenericRecord toGenericRecord(byte[] binary, + Schema writerSchema, + Schema readerSchema) throws IOException { + BinaryDecoder decoder = AvroCompatibilityHelper.newBinaryDecoder(binary); + DatumReader reader = AvroCompatibilityHelper.newGenericDatumReader(writerSchema, readerSchema, GenericData.get()); + return reader.read(null, decoder); + } } diff --git a/helper/tests/helper-tests-17/src/test/java/com/linkedin/avroutil1/compatibility/avro17/AvroCompatibilityHelperAvro17Test.java b/helper/tests/helper-tests-17/src/test/java/com/linkedin/avroutil1/compatibility/avro17/AvroCompatibilityHelperAvro17Test.java index 76b208bf3..66082f5b8 100644 --- a/helper/tests/helper-tests-17/src/test/java/com/linkedin/avroutil1/compatibility/avro17/AvroCompatibilityHelperAvro17Test.java +++ b/helper/tests/helper-tests-17/src/test/java/com/linkedin/avroutil1/compatibility/avro17/AvroCompatibilityHelperAvro17Test.java @@ -6,15 +6,30 @@ package com.linkedin.avroutil1.compatibility.avro17; +import by17.IntRecord; +import by17.LongRecord; import com.linkedin.avroutil1.Pojo; import com.linkedin.avroutil1.testcommon.TestUtil; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.avroutil1.compatibility.AvroVersion; + +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; + +import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; @@ -97,4 +112,110 @@ public void testGetGenericDefaultValueCloningForEnums() throws IOException { // Then we should expect the clone and original to be equal Assert.assertEquals(field, clone); } + + @Test + public void testIntRoundtrip() throws IOException { + IntRecord intRecord = new IntRecord(); + intRecord.field = 42; + intRecord.unionField = 55; + byte[] binary = toBinary(intRecord); + + IntRecord roundtrip = toSpecificRecord(binary, IntRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(roundtrip.field, 42); + Assert.assertEquals(roundtrip.unionField.intValue(), 55); + + GenericRecord genericRecord = toGenericRecord(binary, IntRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42); + Assert.assertEquals(genericRecord.get("unionField"), 55); + } + + @Test + public void testLongRoundtrip() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = 42L; + longRecord.unionField = 55L; + byte[] binary = toBinary(longRecord); + + LongRecord roundtrip = toSpecificRecord(binary, LongRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(roundtrip.field, 42L); + Assert.assertEquals(roundtrip.unionField.longValue(), 55L); + + GenericRecord genericRecord = toGenericRecord(binary, LongRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42L); + Assert.assertEquals(genericRecord.get("unionField"), 55L); + } + + @Test + public void testIntToLongPromotion() throws IOException { + IntRecord intRecord = new IntRecord(); + intRecord.field = 42; + intRecord.unionField = 55; + byte[] binary = toBinary(intRecord); + + LongRecord longRecord = toSpecificRecord(binary, IntRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(longRecord.field, 42L); + Assert.assertEquals(longRecord.unionField.longValue(), 55L); + + GenericRecord genericRecord = toGenericRecord(binary, IntRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42L); + Assert.assertEquals(genericRecord.get("unionField"), 55L); + } + + @Test + public void testLongToIntDemotion() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = 42L; + longRecord.unionField = 55L; + byte[] binary = toBinary(longRecord); + + IntRecord intRecord = toSpecificRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(intRecord.field, 42); + Assert.assertEquals(intRecord.unionField.intValue(), 55); + + GenericRecord genericRecord = toGenericRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42); + Assert.assertEquals(genericRecord.get("unionField"), 55); + } + + @Test + public void testLongToIntDemotionOutOfRange() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = (long) Integer.MAX_VALUE + 1L; + byte[] binary = toBinary(longRecord); + + LongRecord longRecord2 = new LongRecord(); + longRecord2.unionField = (long) Integer.MIN_VALUE - 1L; + byte[] binary2 = toBinary(longRecord2); + + Assert.assertThrows(AvroTypeException.class, () -> toSpecificRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + Assert.assertThrows(AvroTypeException.class, () -> toGenericRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + + Assert.assertThrows(AvroTypeException.class, () -> toSpecificRecord(binary2, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + Assert.assertThrows(AvroTypeException.class, () -> toGenericRecord(binary2, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + } + + private byte[] toBinary(IndexedRecord record) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryEncoder encoder = AvroCompatibilityHelper.newBinaryEncoder(baos); + GenericDatumWriter writer = new GenericDatumWriter<>(record.getSchema()); + writer.write(record, encoder); + encoder.flush(); + return baos.toByteArray(); + } + + private T toSpecificRecord(byte[] binary, + Schema writerSchema, + Schema readerSchema) throws IOException { + BinaryDecoder decoder = AvroCompatibilityHelper.newBinaryDecoder(binary); + DatumReader reader = AvroCompatibilityHelper.newSpecificDatumReader(writerSchema, readerSchema, SpecificData.get()); + return reader.read(null, decoder); + } + + private GenericRecord toGenericRecord(byte[] binary, + Schema writerSchema, + Schema readerSchema) throws IOException { + BinaryDecoder decoder = AvroCompatibilityHelper.newBinaryDecoder(binary); + DatumReader reader = AvroCompatibilityHelper.newGenericDatumReader(writerSchema, readerSchema, GenericData.get()); + return reader.read(null, decoder); + } } diff --git a/helper/tests/helper-tests-18/src/test/java/com/linkedin/avroutil1/compatibility/avro18/AvroCompatibilityHelperAvro18Test.java b/helper/tests/helper-tests-18/src/test/java/com/linkedin/avroutil1/compatibility/avro18/AvroCompatibilityHelperAvro18Test.java index 7034f7c89..0eda22b6a 100644 --- a/helper/tests/helper-tests-18/src/test/java/com/linkedin/avroutil1/compatibility/avro18/AvroCompatibilityHelperAvro18Test.java +++ b/helper/tests/helper-tests-18/src/test/java/com/linkedin/avroutil1/compatibility/avro18/AvroCompatibilityHelperAvro18Test.java @@ -6,15 +6,30 @@ package com.linkedin.avroutil1.compatibility.avro18; +import by18.IntRecord; +import by18.LongRecord; import com.linkedin.avroutil1.Pojo; import com.linkedin.avroutil1.testcommon.TestUtil; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.avroutil1.compatibility.AvroVersion; + +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; + +import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; @@ -80,4 +95,110 @@ public void testCreateSchemaFieldWithProvidedDefaultValue() throws IOException { List> actualListValue = mapper.convertValue(actualJsonNode, new TypeReference>>(){}); Assert.assertEquals(actualListValue.get(0).get(0), "dummyElement"); } + + @Test + public void testIntRoundtrip() throws IOException { + IntRecord intRecord = new IntRecord(); + intRecord.field = 42; + intRecord.unionField = 55; + byte[] binary = toBinary(intRecord); + + IntRecord roundtrip = toSpecificRecord(binary, IntRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(roundtrip.field, 42); + Assert.assertEquals(roundtrip.unionField.intValue(), 55); + + GenericRecord genericRecord = toGenericRecord(binary, IntRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42); + Assert.assertEquals(genericRecord.get("unionField"), 55); + } + + @Test + public void testLongRoundtrip() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = 42L; + longRecord.unionField = 55L; + byte[] binary = toBinary(longRecord); + + LongRecord roundtrip = toSpecificRecord(binary, LongRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(roundtrip.field, 42L); + Assert.assertEquals(roundtrip.unionField.longValue(), 55L); + + GenericRecord genericRecord = toGenericRecord(binary, LongRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42L); + Assert.assertEquals(genericRecord.get("unionField"), 55L); + } + + @Test + public void testIntToLongPromotion() throws IOException { + IntRecord intRecord = new IntRecord(); + intRecord.field = 42; + intRecord.unionField = 55; + byte[] binary = toBinary(intRecord); + + LongRecord longRecord = toSpecificRecord(binary, IntRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(longRecord.field, 42L); + Assert.assertEquals(longRecord.unionField.longValue(), 55L); + + GenericRecord genericRecord = toGenericRecord(binary, IntRecord.SCHEMA$, LongRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42L); + Assert.assertEquals(genericRecord.get("unionField"), 55L); + } + + @Test + public void testLongToIntDemotion() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = 42L; + longRecord.unionField = 55L; + byte[] binary = toBinary(longRecord); + + IntRecord intRecord = toSpecificRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(intRecord.field, 42); + Assert.assertEquals(intRecord.unionField.intValue(), 55); + + GenericRecord genericRecord = toGenericRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$); + Assert.assertEquals(genericRecord.get("field"), 42); + Assert.assertEquals(genericRecord.get("unionField"), 55); + } + + @Test + public void testLongToIntDemotionOutOfRange() throws IOException { + LongRecord longRecord = new LongRecord(); + longRecord.field = (long) Integer.MAX_VALUE + 1L; + byte[] binary = toBinary(longRecord); + + LongRecord longRecord2 = new LongRecord(); + longRecord2.unionField = (long) Integer.MIN_VALUE - 1L; + byte[] binary2 = toBinary(longRecord2); + + Assert.assertThrows(AvroTypeException.class, () -> toSpecificRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + Assert.assertThrows(AvroTypeException.class, () -> toGenericRecord(binary, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + + Assert.assertThrows(AvroTypeException.class, () -> toSpecificRecord(binary2, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + Assert.assertThrows(AvroTypeException.class, () -> toGenericRecord(binary2, LongRecord.SCHEMA$, IntRecord.SCHEMA$)); + } + + private byte[] toBinary(IndexedRecord record) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryEncoder encoder = AvroCompatibilityHelper.newBinaryEncoder(baos); + GenericDatumWriter writer = new GenericDatumWriter<>(record.getSchema()); + writer.write(record, encoder); + encoder.flush(); + return baos.toByteArray(); + } + + private T toSpecificRecord(byte[] binary, + Schema writerSchema, + Schema readerSchema) throws IOException { + BinaryDecoder decoder = AvroCompatibilityHelper.newBinaryDecoder(binary); + DatumReader reader = AvroCompatibilityHelper.newSpecificDatumReader(writerSchema, readerSchema, SpecificData.get()); + return reader.read(null, decoder); + } + + private GenericRecord toGenericRecord(byte[] binary, + Schema writerSchema, + Schema readerSchema) throws IOException { + BinaryDecoder decoder = AvroCompatibilityHelper.newBinaryDecoder(binary); + DatumReader reader = AvroCompatibilityHelper.newGenericDatumReader(writerSchema, readerSchema, GenericData.get()); + return reader.read(null, decoder); + } }