Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,28 @@ public void shouldTolerateUnionReorderingThatIncludeString(Implementation implem
Assert.assertEquals(recordH.get("test"), 1);
}

@Test(groups = {"deserializationTest"}, dataProvider = "Implementation")
public void shouldPromoteNullableIntUnionToNullableLongUnion(Implementation implementation) {
// given
Schema writerSchema = createRecord(
createPrimitiveUnionFieldSchema("test", Schema.Type.INT));
Schema readerSchema = createRecord(
createPrimitiveUnionFieldSchema("test", Schema.Type.LONG));

GenericData.Record recordWithInt = new GenericData.Record(writerSchema);
recordWithInt.put("test", 1);
GenericData.Record recordWithNull = new GenericData.Record(writerSchema);
recordWithNull.put("test", null);

// when
GenericRecord intRecord = implementation.decode(writerSchema, readerSchema, genericDataAsDecoder(recordWithInt));
GenericRecord nullRecord = implementation.decode(writerSchema, readerSchema, genericDataAsDecoder(recordWithNull));

// then
Assert.assertEquals(intRecord.get("test"), 1L);
Assert.assertNull(nullRecord.get("test"));
}

@Test(groups = {"deserializationTest"}, dataProvider = "Implementation")
public void shouldTolerateUnionReorderingWithNonString(Implementation implementation) {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ private void processSimpleType(JVar fieldSchemaVar, String name, Schema schema,
break;
default:
final Schema primitiveFieldSchema;
if (action.getShouldRead() && readerSchema != null && Schema.Type.STRING.equals(readerSchema.getType())) {
// to preserve reader-specific options use reader field schema
if (action.getShouldRead() && readerSchema != null && !Schema.Type.UNION.equals(readerSchema.getType())) {
// preserve reader-specific options/promotions, but avoid forwarding a union schema
primitiveFieldSchema = readerSchema;
} else {
primitiveFieldSchema = schema;
Expand Down Expand Up @@ -630,7 +630,16 @@ private void processUnion(JVar unionSchemaVar, final String name, final Schema u

// Check if unionReaderSchema is really a union, if not then only the compatible writer union type can be deserialized
final boolean readerSchemaNotAUnion = unionReaderSchema != null && !Schema.Type.UNION.equals(unionReaderSchema.getType());
final int compatibleWriterSchema = readerSchemaNotAUnion ? schemaAssistant.compatibleUnionSchemaIndex(unionReaderSchema, unionSchema) : -1;
int compatibleWriterSchema = -1;
if (readerSchemaNotAUnion) {
for (int i = 0; i < unionSchema.getTypes().size(); i++) {
Schema optionSchema = unionSchema.getTypes().get(i);
if (schemaAssistant.areTypesCompatible(optionSchema, unionReaderSchema)) {
compatibleWriterSchema = i;
break;
}
}
}

for (int i = 0; i < unionSchema.getTypes().size(); i++) {
Schema optionSchema = unionSchema.getTypes().get(i);
Expand Down Expand Up @@ -658,16 +667,12 @@ private void processUnion(JVar unionSchemaVar, final String name, final Schema u
readerOptionSchema = (i == compatibleWriterSchema) ? unionReaderSchema : null;
} else {
// The reader's union could be re-ordered, so we need to find the one that matches.
// TODO: this code should support primitive type promotions
for (int j = 0; j < unionReaderSchema.getTypes().size(); j++) {
Schema potentialReaderSchema = unionReaderSchema.getTypes().get(j);
// Avro allows unnamed types to appear only once in a union, but named types may appear multiple times and
// thus need to be disambiguated via their full-name (including aliases).
if (schemaAssistant.areTypesCompatible(potentialReaderSchema, optionSchema)) {
readerOptionSchema = potentialReaderSchema;
readerOptionUnionBranchIndex = j;
break;
}
try {
readerOptionUnionBranchIndex = schemaAssistant.compatibleUnionSchemaIndex(optionSchema, unionReaderSchema);
readerOptionSchema = unionReaderSchema.getTypes().get(readerOptionUnionBranchIndex);
} catch (SchemaAssistantException e) {
// no compatible branch - readerOptionSchema stays null, and AvroTypeException
// is thrown below
}
}

Expand Down Expand Up @@ -698,17 +703,17 @@ private void processUnion(JVar unionSchemaVar, final String name, final Schema u

if (readerSchemaNotAUnion) {
unionAction =
FieldAction.fromValues(optionSchema.getType(), action.getShouldRead(), alternative.symbols[compatibleWriterSchema]);
FieldAction.fromValues(readerOptionSchema.getType(), action.getShouldRead(), alternative.symbols[compatibleWriterSchema]);
} else {
Symbol.UnionAdjustAction unionAdjustAction = (Symbol.UnionAdjustAction) alternative.symbols[i].production[0];
//For maps and arrays, our processMap and processArray logic expect the map-end symbol whose production contains the maps values
//We go from Symbol A(production = [map-start, map-end]) to Symbol map-end
if(optionSchema.getType().equals(Schema.Type.MAP) || optionSchema.getType().equals(Schema.Type.ARRAY)){
unionAction =
FieldAction.fromValues(optionSchema.getType(), action.getShouldRead(), unionAdjustAction.symToParse.production[0]);
FieldAction.fromValues(readerOptionSchema.getType(), action.getShouldRead(), unionAdjustAction.symToParse.production[0]);
} else {
unionAction =
FieldAction.fromValues(optionSchema.getType(), action.getShouldRead(), unionAdjustAction.symToParse);
FieldAction.fromValues(readerOptionSchema.getType(), action.getShouldRead(), unionAdjustAction.symToParse);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,16 @@ public JExpression getStringableValue(Schema schema, JExpression stringExpr) {
}
}

// TODO: this code should support primitive type promotions
public int compatibleUnionSchemaIndex(Schema schema, Schema unionSchema) {
for (int i = 0; i < unionSchema.getTypes().size(); i++) {
Schema potentialCompatibleSchema = unionSchema.getTypes().get(i);
if (areTypesCompatible(schema, potentialCompatibleSchema)) {
if (areTypesExactlyCompatible(schema, potentialCompatibleSchema)) {
return i;
}
}
for (int i = 0; i < unionSchema.getTypes().size(); i++) {
Schema potentialCompatibleSchema = unionSchema.getTypes().get(i);
if (isTypePromotable(schema, potentialCompatibleSchema)) {
return i;
}
}
Expand All @@ -590,6 +595,11 @@ public Schema compatibleUnionSchema(Schema schema, Schema unionSchema) {
}

public boolean areTypesCompatible(Schema schema, Schema potentialCompatibleSchema){
return areTypesExactlyCompatible(schema, potentialCompatibleSchema)
|| isTypePromotable(schema, potentialCompatibleSchema);
}

private boolean areTypesExactlyCompatible(Schema schema, Schema potentialCompatibleSchema) {
if(!potentialCompatibleSchema.getType().equals(schema.getType())) {
return false;
}
Expand All @@ -605,4 +615,24 @@ public boolean areTypesCompatible(Schema schema, Schema potentialCompatibleSchem
return potentialCompatibleSchema.getName().equals(schema.getName()) ||
potentialCompatibleSchema.getAliases().contains(AvroCompatibilityHelper.getSchemaFullName(schema));
}

private boolean isTypePromotable(Schema schema, Schema potentialCompatibleSchema) {
switch (schema.getType()) {
case INT:
return Schema.Type.LONG.equals(potentialCompatibleSchema.getType())
|| Schema.Type.FLOAT.equals(potentialCompatibleSchema.getType())
|| Schema.Type.DOUBLE.equals(potentialCompatibleSchema.getType());
case LONG:
return Schema.Type.FLOAT.equals(potentialCompatibleSchema.getType())
|| Schema.Type.DOUBLE.equals(potentialCompatibleSchema.getType());
case FLOAT:
return Schema.Type.DOUBLE.equals(potentialCompatibleSchema.getType());
case STRING:
return Schema.Type.BYTES.equals(potentialCompatibleSchema.getType());
case BYTES:
return Schema.Type.STRING.equals(potentialCompatibleSchema.getType());
default:
return false;
}
}
}