Kafka Avro Consumer issue workaround
I reported the issue here
Description
AvroDeserializer throws NullReferenceException without any further info.
Unhandled exception. Confluent.Kafka.ConsumeException: Local: Value deserialization error
---> System.NullReferenceException: Object reference not set to an instance of an object.
at Confluent.SchemaRegistry.Serdes.GenericDeserializerImpl.Deserialize(String topic, Byte[] array)
at Confluent.SchemaRegistry.Serdes.AvroDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)
at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer`1.Deserialize(ReadOnlySpan`1 data, Boolean isNull, SerializationContext context)
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
--- End of inner exception stack trace ---
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
at Arch.Pocs.KafkaClient.Consumer.ConsoleUI.KafkaAvroConsumer.Poll(String topic)+MoveNext() in /app/KafkaAvroConsumer.cs:line 42
at Arch.Pocs.KafkaClient.Consumer.ConsoleUI.Program.Main(String[] args) in /app/Program.cs:line 32
at Arch.Pocs.KafkaClient.Consumer.ConsoleUI.Program.<Main>(String[] args)
The issue is:
The method Confluent.SchemaRegistry.Serdes.AvroDeserializer<T>.DeserializeAsync(...)
(https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializer.cs) makes a call to deserializerImpl.Deserialize(context.Topic, isNull ? null : data.ToArray()).ConfigureAwait(continueOnCapturedContext: false)
. Notice that the second parameter could potencially be null.
Object deserializerImpl
is of type GenericDeserializerImpl
for GenericRecord
.
The implementation of the method GenericDeserializerImpl.Deserialize(string topic, byte[] array)
(https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs) does not check for null, so if the byte array is null, the NullReferenceException would be thrown, which is exactly what happend.
So:
- The implementation of the method
GenericDeserializerImpl.Deserialize(string topic, byte[] array)
MUST check for null object and throw a descriptive exception, rather than just letting the NullReferenceException come up. - The method
Confluent.SchemaRegistry.Serdes.AvroDeserializer<T>.DeserializeAsync(...)
MUST NOT pass a null byte array toGenericDeserializerImpl.Deserialize
, as it doesn’t make any sense to try to serialize a null array of bytes. Instead, it should implement the logic coherent to the contract intended (maybe just also throw a descriptive exception, see next point). - It’s likely that there’s another issue up the line, and the empty array should not get to this point in the first place, but I haven’t looked it up. At this point, these implementations should at least be improved as to being more descriptive with the thrown exception.
A wrapper for AvroDeserializer<GenericRecord>
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Avro.Generic;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
public class AvroGenericRecordDeserializerWrapper : IAsyncDeserializer<GenericRecord>
{
private readonly ISchemaRegistryClient _schemaRegistryClient;
private readonly AvroDeserializer<GenericRecord> _avroDeserializer;
public AvroGenericRecordDeserializerWrapper(ISchemaRegistryClient schemaRegistryClient, IEnumerable<KeyValuePair<string, string>> config = null)
{
_avroDeserializer = new AvroDeserializer<GenericRecord>(schemaRegistryClient, config);
_schemaRegistryClient = schemaRegistryClient;
}
public async Task<GenericRecord> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull, SerializationContext context)
{
try
{
if (isNull)
{
Console.WriteLine($"isNull=true. This would throw NullReferenceException down the line with the provided implementation. Skipping downstream call and just returning null GenericRecord. (data.Lenght={data.Length.ToString()})");
return await Task.FromResult<GenericRecord>(null);
}
return await _avroDeserializer.DeserializeAsync(data, isNull, context);
}
catch (Exception ex)
{
Console.WriteLine($"Failed executing DeserializeAsync(data, isNull, context). Returning null GenericRecord ex => {ex.ToString()}");
return await Task.FromResult<GenericRecord>(null);
}
}
}
Using the AvroGenericRecordDeserializerWrapper
rather than AvroDeserializer<GenericRecord>
Just set AvroGenericRecordDeserializerWrapper
as deserializers instead of AvroDeserializer<GenericRecord>
var builder = new ConsumerBuilder<GenericRecord, GenericRecord>(_consumerConfig)
// .SetKeyDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
// .SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.SetKeyDeserializer(new AvroGenericRecordDeserializerWrapper(schemaRegistry).AsSyncOverAsync())
.SetValueDeserializer(new AvroGenericRecordDeserializerWrapper(schemaRegistry).AsSyncOverAsync())
.SetErrorHandler((c, e) => {
Console.WriteLine($"Consumer error handler: {e.Reason}");
});