|
1 | 1 | package pl.allegro.tech.hermes.common.message.converter;
|
2 | 2 |
|
| 3 | +import java.io.ByteArrayInputStream; |
| 4 | +import java.io.Closeable; |
| 5 | +import java.io.InputStream; |
| 6 | +import java.util.Collection; |
| 7 | +import java.util.Collections; |
3 | 8 | import org.apache.avro.Conversion;
|
4 | 9 | import org.apache.avro.Schema;
|
5 | 10 | import org.apache.avro.generic.GenericData;
|
|
10 | 15 | import org.apache.commons.lang3.exception.ExceptionUtils;
|
11 | 16 | import tech.allegro.schema.json2avro.converter.AvroConversionException;
|
12 | 17 |
|
13 |
| -import java.io.ByteArrayInputStream; |
14 |
| -import java.io.Closeable; |
15 |
| -import java.io.InputStream; |
16 |
| -import java.util.Collection; |
17 |
| -import java.util.Collections; |
18 |
| - |
19 | 18 | public class AvroBinaryDecoders {
|
20 | 19 |
|
21 |
| - private static ThreadLocal<InputStream> threadLocalEmptyInputStream = |
22 |
| - ThreadLocal.withInitial(() -> new ByteArrayInputStream(new byte[0])); |
23 |
| - |
24 |
| - private static ThreadLocal<BinaryDecoder> threadLocalBinaryDecoder = |
25 |
| - ThreadLocal.withInitial( |
26 |
| - () -> DecoderFactory.get().binaryDecoder(threadLocalEmptyInputStream.get(), null)); |
27 |
| - |
28 |
| - static GenericRecord decodeReusingThreadLocalBinaryDecoder(byte[] message, Schema schema) { |
29 |
| - return decodeReusingThreadLocalBinaryDecoder(message, schema, Collections.emptyList()); |
30 |
| - } |
31 |
| - |
32 |
| - static GenericRecord decodeReusingThreadLocalBinaryDecoder(byte[] message, Schema schema, Collection<Conversion<?>> conversions) { |
33 |
| - try (FlushableBinaryDecoderHolder holder = new FlushableBinaryDecoderHolder()) { |
34 |
| - BinaryDecoder binaryDecoder = |
35 |
| - DecoderFactory.get().binaryDecoder(message, holder.getBinaryDecoder()); |
36 |
| - GenericData genericData = new GenericData(); |
37 |
| - conversions.forEach(genericData::addLogicalTypeConversion); |
38 |
| - return new GenericDatumReader<GenericRecord>(schema, schema, genericData).read(null, binaryDecoder); |
39 |
| - } catch (Exception e) { |
40 |
| - String reason = |
41 |
| - e.getMessage() == null ? ExceptionUtils.getRootCauseMessage(e) : e.getMessage(); |
42 |
| - throw new AvroConversionException( |
43 |
| - String.format( |
44 |
| - "Could not deserialize Avro message with provided schema, reason: %s", reason)); |
45 |
| - } |
| 20 | + private static ThreadLocal<InputStream> threadLocalEmptyInputStream = |
| 21 | + ThreadLocal.withInitial(() -> new ByteArrayInputStream(new byte[0])); |
| 22 | + |
| 23 | + private static ThreadLocal<BinaryDecoder> threadLocalBinaryDecoder = |
| 24 | + ThreadLocal.withInitial( |
| 25 | + () -> DecoderFactory.get().binaryDecoder(threadLocalEmptyInputStream.get(), null)); |
| 26 | + |
| 27 | + static GenericRecord decodeReusingThreadLocalBinaryDecoder(byte[] message, Schema schema) { |
| 28 | + return decodeReusingThreadLocalBinaryDecoder(message, schema, Collections.emptyList()); |
| 29 | + } |
| 30 | + |
| 31 | + static GenericRecord decodeReusingThreadLocalBinaryDecoder( |
| 32 | + byte[] message, Schema schema, Collection<Conversion<?>> conversions) { |
| 33 | + try (FlushableBinaryDecoderHolder holder = new FlushableBinaryDecoderHolder()) { |
| 34 | + BinaryDecoder binaryDecoder = |
| 35 | + DecoderFactory.get().binaryDecoder(message, holder.getBinaryDecoder()); |
| 36 | + GenericData genericData = new GenericData(); |
| 37 | + conversions.forEach(genericData::addLogicalTypeConversion); |
| 38 | + return new GenericDatumReader<GenericRecord>(schema, schema, genericData) |
| 39 | + .read(null, binaryDecoder); |
| 40 | + } catch (Exception e) { |
| 41 | + String reason = |
| 42 | + e.getMessage() == null ? ExceptionUtils.getRootCauseMessage(e) : e.getMessage(); |
| 43 | + throw new AvroConversionException( |
| 44 | + String.format( |
| 45 | + "Could not deserialize Avro message with provided schema, reason: %s", reason)); |
46 | 46 | }
|
| 47 | + } |
47 | 48 |
|
48 |
| - static class FlushableBinaryDecoderHolder implements Closeable { |
| 49 | + static class FlushableBinaryDecoderHolder implements Closeable { |
49 | 50 |
|
50 |
| - final BinaryDecoder binaryDecoder = threadLocalBinaryDecoder.get(); |
| 51 | + final BinaryDecoder binaryDecoder = threadLocalBinaryDecoder.get(); |
51 | 52 |
|
52 |
| - BinaryDecoder getBinaryDecoder() { |
53 |
| - return binaryDecoder; |
54 |
| - } |
| 53 | + BinaryDecoder getBinaryDecoder() { |
| 54 | + return binaryDecoder; |
| 55 | + } |
55 | 56 |
|
56 |
| - @Override |
57 |
| - public void close() { |
58 |
| - DecoderFactory.get() |
59 |
| - .binaryDecoder(threadLocalEmptyInputStream.get(), threadLocalBinaryDecoder.get()); |
60 |
| - } |
| 57 | + @Override |
| 58 | + public void close() { |
| 59 | + DecoderFactory.get() |
| 60 | + .binaryDecoder(threadLocalEmptyInputStream.get(), threadLocalBinaryDecoder.get()); |
61 | 61 | }
|
| 62 | + } |
62 | 63 | }
|
0 commit comments