diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericDeserializerGeneratorTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericDeserializerGeneratorTest.java index a96729e97..047533a59 100644 --- a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericDeserializerGeneratorTest.java +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericDeserializerGeneratorTest.java @@ -901,6 +901,28 @@ public void shouldTolerateUnionReorderingThatIncludeString(Implementation implem Assert.assertEquals(recordH.get("test"), 1); } + @Test(groups = {"deserializationTest"}, dataProvider = "Implementation") + public void shouldPromoteNullableIntUnionToNullableLongUnion(Implementation implementation) { + // given + Schema writerSchema = createRecord( + createPrimitiveUnionFieldSchema("test", Schema.Type.INT)); + Schema readerSchema = createRecord( + createPrimitiveUnionFieldSchema("test", Schema.Type.LONG)); + + GenericData.Record recordWithInt = new GenericData.Record(writerSchema); + recordWithInt.put("test", 1); + GenericData.Record recordWithNull = new GenericData.Record(writerSchema); + recordWithNull.put("test", null); + + // when + GenericRecord intRecord = implementation.decode(writerSchema, readerSchema, genericDataAsDecoder(recordWithInt)); + GenericRecord nullRecord = implementation.decode(writerSchema, readerSchema, genericDataAsDecoder(recordWithNull)); + + // then + Assert.assertEquals(intRecord.get("test"), 1L); + Assert.assertNull(nullRecord.get("test")); + } + @Test(groups = {"deserializationTest"}, dataProvider = "Implementation") public void shouldTolerateUnionReorderingWithNonString(Implementation implementation) { // given diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java index 85bbb412a..250b51f65 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java @@ -225,8 +225,8 @@ private void processSimpleType(JVar fieldSchemaVar, String name, Schema schema, break; default: final Schema primitiveFieldSchema; - if (action.getShouldRead() && readerSchema != null && Schema.Type.STRING.equals(readerSchema.getType())) { - // to preserve reader-specific options use reader field schema + if (action.getShouldRead() && readerSchema != null && !Schema.Type.UNION.equals(readerSchema.getType())) { + // preserve reader-specific options/promotions, but avoid forwarding a union schema primitiveFieldSchema = readerSchema; } else { primitiveFieldSchema = schema; @@ -630,7 +630,16 @@ private void processUnion(JVar unionSchemaVar, final String name, final Schema u // Check if unionReaderSchema is really a union, if not then only the compatible writer union type can be deserialized final boolean readerSchemaNotAUnion = unionReaderSchema != null && !Schema.Type.UNION.equals(unionReaderSchema.getType()); - final int compatibleWriterSchema = readerSchemaNotAUnion ? schemaAssistant.compatibleUnionSchemaIndex(unionReaderSchema, unionSchema) : -1; + int compatibleWriterSchema = -1; + if (readerSchemaNotAUnion) { + for (int i = 0; i < unionSchema.getTypes().size(); i++) { + Schema optionSchema = unionSchema.getTypes().get(i); + if (schemaAssistant.areTypesCompatible(optionSchema, unionReaderSchema)) { + compatibleWriterSchema = i; + break; + } + } + } for (int i = 0; i < unionSchema.getTypes().size(); i++) { Schema optionSchema = unionSchema.getTypes().get(i); @@ -658,16 +667,12 @@ private void processUnion(JVar unionSchemaVar, final String name, final Schema u readerOptionSchema = (i == compatibleWriterSchema) ? unionReaderSchema : null; } else { // The reader's union could be re-ordered, so we need to find the one that matches. - // TODO: this code should support primitive type promotions - for (int j = 0; j < unionReaderSchema.getTypes().size(); j++) { - Schema potentialReaderSchema = unionReaderSchema.getTypes().get(j); - // Avro allows unnamed types to appear only once in a union, but named types may appear multiple times and - // thus need to be disambiguated via their full-name (including aliases). - if (schemaAssistant.areTypesCompatible(potentialReaderSchema, optionSchema)) { - readerOptionSchema = potentialReaderSchema; - readerOptionUnionBranchIndex = j; - break; - } + try { + readerOptionUnionBranchIndex = schemaAssistant.compatibleUnionSchemaIndex(optionSchema, unionReaderSchema); + readerOptionSchema = unionReaderSchema.getTypes().get(readerOptionUnionBranchIndex); + } catch (SchemaAssistantException e) { + // no compatible branch - readerOptionSchema stays null, and AvroTypeException + // is thrown below } } @@ -698,17 +703,17 @@ private void processUnion(JVar unionSchemaVar, final String name, final Schema u if (readerSchemaNotAUnion) { unionAction = - FieldAction.fromValues(optionSchema.getType(), action.getShouldRead(), alternative.symbols[compatibleWriterSchema]); + FieldAction.fromValues(readerOptionSchema.getType(), action.getShouldRead(), alternative.symbols[compatibleWriterSchema]); } else { Symbol.UnionAdjustAction unionAdjustAction = (Symbol.UnionAdjustAction) alternative.symbols[i].production[0]; //For maps and arrays, our processMap and processArray logic expect the map-end symbol whose production contains the maps values //We go from Symbol A(production = [map-start, map-end]) to Symbol map-end if(optionSchema.getType().equals(Schema.Type.MAP) || optionSchema.getType().equals(Schema.Type.ARRAY)){ unionAction = - FieldAction.fromValues(optionSchema.getType(), action.getShouldRead(), unionAdjustAction.symToParse.production[0]); + FieldAction.fromValues(readerOptionSchema.getType(), action.getShouldRead(), unionAdjustAction.symToParse.production[0]); } else { unionAction = - FieldAction.fromValues(optionSchema.getType(), action.getShouldRead(), unionAdjustAction.symToParse); + FieldAction.fromValues(readerOptionSchema.getType(), action.getShouldRead(), unionAdjustAction.symToParse); } } diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/SchemaAssistant.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/SchemaAssistant.java index b18a2f422..46b9a49a8 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/SchemaAssistant.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/SchemaAssistant.java @@ -574,11 +574,16 @@ public JExpression getStringableValue(Schema schema, JExpression stringExpr) { } } - // TODO: this code should support primitive type promotions public int compatibleUnionSchemaIndex(Schema schema, Schema unionSchema) { for (int i = 0; i < unionSchema.getTypes().size(); i++) { Schema potentialCompatibleSchema = unionSchema.getTypes().get(i); - if (areTypesCompatible(schema, potentialCompatibleSchema)) { + if (areTypesExactlyCompatible(schema, potentialCompatibleSchema)) { + return i; + } + } + for (int i = 0; i < unionSchema.getTypes().size(); i++) { + Schema potentialCompatibleSchema = unionSchema.getTypes().get(i); + if (isTypePromotable(schema, potentialCompatibleSchema)) { return i; } } @@ -590,6 +595,11 @@ public Schema compatibleUnionSchema(Schema schema, Schema unionSchema) { } public boolean areTypesCompatible(Schema schema, Schema potentialCompatibleSchema){ + return areTypesExactlyCompatible(schema, potentialCompatibleSchema) + || isTypePromotable(schema, potentialCompatibleSchema); + } + + private boolean areTypesExactlyCompatible(Schema schema, Schema potentialCompatibleSchema) { if(!potentialCompatibleSchema.getType().equals(schema.getType())) { return false; } @@ -605,4 +615,24 @@ public boolean areTypesCompatible(Schema schema, Schema potentialCompatibleSchem return potentialCompatibleSchema.getName().equals(schema.getName()) || potentialCompatibleSchema.getAliases().contains(AvroCompatibilityHelper.getSchemaFullName(schema)); } + + private boolean isTypePromotable(Schema schema, Schema potentialCompatibleSchema) { + switch (schema.getType()) { + case INT: + return Schema.Type.LONG.equals(potentialCompatibleSchema.getType()) + || Schema.Type.FLOAT.equals(potentialCompatibleSchema.getType()) + || Schema.Type.DOUBLE.equals(potentialCompatibleSchema.getType()); + case LONG: + return Schema.Type.FLOAT.equals(potentialCompatibleSchema.getType()) + || Schema.Type.DOUBLE.equals(potentialCompatibleSchema.getType()); + case FLOAT: + return Schema.Type.DOUBLE.equals(potentialCompatibleSchema.getType()); + case STRING: + return Schema.Type.BYTES.equals(potentialCompatibleSchema.getType()); + case BYTES: + return Schema.Type.STRING.equals(potentialCompatibleSchema.getType()); + default: + return false; + } + } }