static void convertAvroToJson(InputStream inputStream, OutputStream outputStream, Schema schema) throws IOException { DatumReader<Object> reader = new GenericDatumReader<>(schema); DatumWriter<Object> writer = new GenericDatumWriter<>(schema); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null); JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, outputStream, true); Object datum = null; while (!binaryDecoder.isEnd()) { datum = reader.read(datum, binaryDecoder); writer.write(datum, jsonEncoder); jsonEncoder.flush(); } outputStream.flush(); }
private void initialize(InputStream in) throws IOException { this.vin = DecoderFactory.get().binaryDecoder(in, vin); byte[] magic = new byte[DataFileConstants.MAGIC.length]; try { vin.readFixed(magic); } catch (IOException e) { throw new IOException("Not a data file.", e); } if (!Arrays.equals(DataFileConstants.MAGIC, magic)) throw new IOException("Not a data file."); long l = vin.readMapStart(); if (l > 0) { do { for (long i = 0; i < l; i++) { vin.skipString(); vin.skipBytes(); } } while ((l = vin.mapNext()) != 0); } vin.readFixed(expectedSync); }
public void jsonReadWriteExample() throws IOException { Employee employee = Employee.newBuilder().setFirstName("Gaurav") .setLastName("Mazra").setSex(SEX.MALE).build(); DatumWriter<Employee> employeeWriter = new SpecificDatumWriter<>(Employee.class); byte[] data; try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(Employee.getClassSchema(), baos); employeeWriter.write(employee, jsonEncoder); jsonEncoder.flush(); data = baos.toByteArray(); } // serialized data System.out.println(new String(data)); DatumReader<Employee> employeeReader = new SpecificDatumReader<>(Employee.class); Decoder decoder = DecoderFactory.get().jsonDecoder(Employee.getClassSchema(), new String(data)); employee = employeeReader.read(null, decoder); //data after deserialization System.out.println(employee); }
/** * Process singlex. * * @throws Exception the exception */ public void processSinglex() throws Exception { int base = (int) System.currentTimeMillis(); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .build(); DatumWriter<GenericRecord> datumWriterUser = new GenericDatumWriter<GenericRecord>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<GenericRecord> datumReaderUser = new GenericDatumReader<GenericRecord>( User.getClassSchema()); GenericRecord genericRecord = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null) ); System.out.println(genericRecord); System.out.println( genericRecord.get("name")); }
public static void testReflect(Object value, Type type, String schema) throws Exception { // check that schema matches expected Schema s = ReflectData.get().getSchema(type); assertEquals(Schema.parse(schema), s); // check that value is serialized correctly ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(s); ByteArrayOutputStream out = new ByteArrayOutputStream(); writer.write(value, EncoderFactory.get().directBinaryEncoder(out, null)); ReflectDatumReader<Object> reader = new ReflectDatumReader<Object>(s); Object after = reader.read(null, DecoderFactory.get().binaryDecoder(out.toByteArray(), null)); assertEquals(value, after); }
@SuppressWarnings("unchecked") @Override public T deserialize(String topic, byte[] data) { try { T result = null; if (data != null) { LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data)); DatumReader<GenericRecord> datumReader = new SpecificDatumReader<>( targetType.newInstance().getSchema()); Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); result = (T) datumReader.read(null, decoder); LOGGER.debug("deserialized data='{}'", result); } return result; } catch (Exception ex) { throw new SerializationException( "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex); } }
private Event deserializeValue(byte[] value, boolean parseAsFlumeEvent) throws IOException { Event e; if (parseAsFlumeEvent) { ByteArrayInputStream in = new ByteArrayInputStream(value); decoder = DecoderFactory.get().directBinaryDecoder(in, decoder); if (!reader.isPresent()) { reader = Optional.of( new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class)); } AvroFlumeEvent event = reader.get().read(null, decoder); e = EventBuilder.withBody(event.getBody().array(), toStringMap(event.getHeaders())); } else { e = EventBuilder.withBody(value, Collections.EMPTY_MAP); } return e; }
public void binaryReadWriteExample() throws IOException { Employee employee = Employee.newBuilder().setFirstName("Gaurav") .setLastName("Mazra").setSex(SEX.MALE).build(); DatumWriter<Employee> employeeWriter = new SpecificDatumWriter<>(Employee.class); byte[] data; try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); employeeWriter.write(employee, binaryEncoder); binaryEncoder.flush(); data = baos.toByteArray(); } // serialized data System.out.println(data); DatumReader<Employee> employeeReader = new SpecificDatumReader<>(Employee.class); Decoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, null); employee = employeeReader.read(null, binaryDecoder); //data after deserialization System.out.println(employee); }
/** * Converts the avro binary data to the json format */ @Override public XContentBuilder serialize(Event event) { XContentBuilder builder = null; try { if (datumReader != null) { Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); GenericRecord data = datumReader.read(null, decoder); logger.trace("Record in event " + data); XContentParser parser = XContentFactory .xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, data.toString()); builder = jsonBuilder().copyCurrentStructure(parser); parser.close(); } else { logger.error("Schema File is not configured"); } } catch (IOException e) { logger.error("Exception in parsing avro format data but continuing serialization to process further records", e.getMessage(), e); } return builder; }
/** * Create a new Event Reader * @param in * @throws IOException */ @SuppressWarnings("deprecation") public EventReader(DataInputStream in) throws IOException { this.in = in; this.version = in.readLine(); Schema myschema = new SpecificData(Event.class.getClassLoader()).getSchema(Event.class); Schema.Parser parser = new Schema.Parser(); this.schema = parser.parse(in.readLine()); this.reader = new SpecificDatumReader(schema, myschema); if (EventWriter.VERSION.equals(version)) { this.decoder = DecoderFactory.get().jsonDecoder(schema, in); } else if (EventWriter.VERSION_BINARY.equals(version)) { this.decoder = DecoderFactory.get().binaryDecoder(in, null); } else { throw new IOException("Incompatible event log version: " + version); } }
static void convertJsonToAvro(InputStream inputStream, OutputStream outputStream, Schema schema) throws IOException { DatumReader<Object> reader = new GenericDatumReader<>(schema); DatumWriter<Object> writer = new GenericDatumWriter<>(schema); Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null); JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(schema, inputStream); Object datum = null; while (true) { try { datum = reader.read(datum, jsonDecoder); } catch (EOFException eofException) { break; } writer.write(datum, binaryEncoder); binaryEncoder.flush(); } outputStream.flush(); }
public Object fromBytes(Schema schema, byte data[]) throws GoraException { Schema fromSchema = null; if (schema.getType() == Type.UNION) { try { Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); int unionIndex = decoder.readIndex(); List<Schema> possibleTypes = schema.getTypes(); fromSchema = possibleTypes.get(unionIndex); Schema effectiveSchema = possibleTypes.get(unionIndex); if (effectiveSchema.getType() == Type.NULL) { decoder.readNull(); return null; } else { data = decoder.readBytes(null).array(); } } catch (IOException e) { LOG.error(e.getMessage()); throw new GoraException("Error decoding union type: ", e); } } else { fromSchema = schema; } return fromBytes(encoder, fromSchema, data); }
/** * Processes an Avro Blob containing a single message and with no embedded * schema. This is the pattern when Avro objects are passed over messaging * infrastructure such as Apache Kafka. * * @param avroMessage * The Blob that holds the single Avro message object * @param avroKey * The Blob that holds the single Avro key object (if passed) * @param outStream * The stream to which the JSON string must be submitted * @param outTuple * The tuple holding the JSON string * @param messageSchema * The schema of the Avro messsage object * @param keySchema * The schema of the Avro key object * @throws Exception */ private void processAvroMessage(Blob avroMessage, Blob avroKey, StreamingOutput<OutputTuple> outStream, OutputTuple outTuple, Schema messageSchema, Schema keySchema) throws Exception { // Deserialize message GenericDatumReader<GenericRecord> consumer = new GenericDatumReader<GenericRecord>(messageSchema); ByteArrayInputStream consumedByteArray = new ByteArrayInputStream(avroMessage.getData()); Decoder consumedDecoder = DecoderFactory.get().binaryDecoder(consumedByteArray, null); GenericRecord consumedDatum = consumer.read(null, consumedDecoder); if (LOGGER.isTraceEnabled()) LOGGER.log(TraceLevel.TRACE, "JSON representation of Avro message: " + consumedDatum.toString()); outTuple.setString(outputJsonMessage, consumedDatum.toString()); // Deserialize key (if specified) if (avroKey != null) { consumer = new GenericDatumReader<GenericRecord>(keySchema); consumedByteArray = new ByteArrayInputStream(avroKey.getData()); consumedDecoder = DecoderFactory.get().binaryDecoder(consumedByteArray, null); consumedDatum = consumer.read(null, consumedDecoder); if (LOGGER.isTraceEnabled()) LOGGER.log(TraceLevel.TRACE, "JSON representation of Avro key: " + consumedDatum.toString()); if (outputJsonKey != null) outTuple.setString(outputJsonKey, consumedDatum.toString()); } // Submit new tuple to output port 0 outStream.submit(outTuple); }
AvroBlock( byte[] data, long numRecords, Mode<T> mode, String writerSchemaString, String codec) throws IOException { this.mode = mode; this.numRecords = numRecords; checkNotNull(writerSchemaString, "writerSchemaString"); Schema writerSchema = internOrParseSchemaString(writerSchemaString); Schema readerSchema = internOrParseSchemaString( MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString)); this.reader = (mode.type == GenericRecord.class) ? new GenericDatumReader<T>(writerSchema, readerSchema) : new ReflectDatumReader<T>(writerSchema, readerSchema); this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null); }
/** * From byte array. * * @param bytes the bytes * @return the iote 2 e request * @throws Exception the exception */ public Iote2eRequest fromByteArray( byte[] bytes ) throws Exception { try { if( this.datumReaderIote2eRequest == null ) { this.datumReaderIote2eRequest = new SpecificDatumReader<Iote2eRequest>(Iote2eRequest.getClassSchema()); } this.binaryDecoder = DecoderFactory.get().binaryDecoder( bytes, this.binaryDecoder ); this.iote2eRequest = this.datumReaderIote2eRequest.read (null, this.binaryDecoder ); return this.iote2eRequest; } catch (Exception e) { logger.error(e.getMessage(),e); throw e; } finally { } }
/** * From byte array. * * @param bytes the bytes * @return the iote 2 e result * @throws Exception the exception */ public Iote2eResult fromByteArray( byte[] bytes ) throws Exception { try { if( this.datumReaderIote2eResult == null ) { this.datumReaderIote2eResult = new SpecificDatumReader<Iote2eResult>(Iote2eResult.getClassSchema()); } this.binaryDecoder = DecoderFactory.get().binaryDecoder(bytes, this.binaryDecoder); this.iote2eResult = this.datumReaderIote2eResult.read(null,this.binaryDecoder); return this.iote2eResult; } catch (Exception e) { logger.error(e.getMessage(),e); throw e; } finally { } }
/** * Process singlex. * * @throws Exception the exception */ public void processSinglex() throws Exception { int base = (int) System.currentTimeMillis(); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .build(); DatumWriter<GenericRecord> datumWriterUser = new GenericDatumWriter<GenericRecord>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<GenericRecord> datumReaderUser = new GenericDatumReader<GenericRecord>(User.getClassSchema()); GenericRecord genericRecord = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null)); System.out.println(genericRecord); System.out.println(genericRecord.get("name")); }
/** * Process list. * * @param weathers the weathers * @throws Exception the exception */ public void processList(List<Weather> weathers) throws Exception { long before = System.currentTimeMillis(); BinaryEncoder binaryEncoder = null; BinaryDecoder binaryDecoder = null; Weather weatherRead = null; for (Weather weather : weathers) { DatumWriter<Weather> datumWriterWeather = new SpecificDatumWriter<Weather>(Weather.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { binaryEncoder = EncoderFactory.get().binaryEncoder(baos, binaryEncoder); datumWriterWeather.write(weather, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } DatumReader<Weather> datumReaderWeather = new SpecificDatumReader<Weather>(Weather.getClassSchema()); binaryDecoder = DecoderFactory.get().binaryDecoder(byteData, binaryDecoder); weatherRead = datumReaderWeather.read(weatherRead, binaryDecoder); // System.out.println("After Binary Read: " + weatherRead.toString()); } System.out.println("size=" + weathers.size() + ", elapsed: " + (System.currentTimeMillis()-before)); }
@SuppressWarnings("unchecked") @Override public T deserialize(String topic, byte[] data) { try { T result = null; if (data != null) { LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data)); DatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(targetType.newInstance().getSchema()); Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); result = (T) datumReader.read(null, decoder); LOGGER.debug("deserialized data='{}'", result); } return result; } catch (Exception ex) { throw new SerializationException( "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex); } }
/** * Change the schema of an Avro record. * @param record The Avro record whose schema is to be changed. * @param newSchema The target schema. It must be compatible as reader schema with record.getSchema() as writer schema. * @return a new Avro record with the new schema. * @throws IOException if conversion failed. */ public static GenericRecord convertRecordSchema(GenericRecord record, Schema newSchema) throws IOException { if (record.getSchema().equals(newSchema)) { return record; } if (checkReaderWriterCompatibility(newSchema, record.getSchema()).getType() != COMPATIBLE) { LOG.debug("Record schema not compatible with writer schema. Converting record schema to writer schema may fail."); } try { BinaryDecoder decoder = new DecoderFactory().binaryDecoder(recordToByteArray(record), null); DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(record.getSchema(), newSchema); return reader.read(null, decoder); } catch (IOException e) { throw new IOException( String.format("Cannot convert avro record to new schema. Origianl schema = %s, new schema = %s", record.getSchema(), newSchema), e); } }
@Override protected GenericRecord decodeRecord(MessageAndOffset messageAndOffset) throws SchemaNotFoundException, IOException { byte[] payload = getBytes(messageAndOffset.message().payload()); if (payload[0] != KafkaAvroSchemaRegistry.MAGIC_BYTE) { throw new RuntimeException(String.format("Unknown magic byte for partition %s", this.getCurrentPartition())); } byte[] schemaIdByteArray = Arrays.copyOfRange(payload, 1, 1 + KafkaAvroSchemaRegistry.SCHEMA_ID_LENGTH_BYTE); String schemaId = Hex.encodeHexString(schemaIdByteArray); Schema schema = null; schema = this.schemaRegistry.getSchemaById(schemaId); reader.get().setSchema(schema); Decoder binaryDecoder = DecoderFactory.get().binaryDecoder(payload, 1 + KafkaAvroSchemaRegistry.SCHEMA_ID_LENGTH_BYTE, payload.length - 1 - KafkaAvroSchemaRegistry.SCHEMA_ID_LENGTH_BYTE, null); try { GenericRecord record = reader.get().read(null, binaryDecoder); record = AvroUtils.convertRecordSchema(record, this.schema.get()); return record; } catch (IOException e) { LOG.error(String.format("Error during decoding record for partition %s: ", this.getCurrentPartition())); throw e; } }
public static MapOutputValue fromBytes(byte[] bytes, Map<String, Schema> schemaMap) throws IOException { DataInputStream dataInputStream = new DataInputStream( new ByteArrayInputStream(bytes)); int length = dataInputStream.readInt(); byte[] sourceNameBytes = new byte[length]; dataInputStream.read(sourceNameBytes); String schemaName = new String(sourceNameBytes); int recordDataLength = dataInputStream.readInt(); byte[] recordBytes = new byte[recordDataLength]; dataInputStream.read(recordBytes); Schema schema = schemaMap.get(schemaName); GenericRecord record = new GenericData.Record(schema); binaryDecoder = DecoderFactory.get().binaryDecoder(recordBytes, binaryDecoder); GenericDatumReader<GenericRecord> gdr = new GenericDatumReader<GenericRecord>( schema); gdr.read(record, binaryDecoder); return new MapOutputValue(schemaName, record); }
public Event unwrap(Event event) { if (LOG.isDebugEnabled()) { LOG.debug("Attempting to unwrap event, body [" + event.getBody().length + "] bytes"); } Event eventUnwrapped = event; InputStream eventWrappedStream = new ByteArrayInputStream(event.getBody()); try { decoder = DecoderFactory.get().directBinaryDecoder(eventWrappedStream, decoder); AvroFlumeEvent eventUnwrappedAvro = reader.read(null, decoder); eventUnwrapped = EventBuilder.withBody(eventUnwrappedAvro.getBody().array(), toStringMap(eventUnwrappedAvro.getHeaders(), event.getHeaders())); if (LOG.isDebugEnabled()) { LOG.debug("Event successfully unwrapped, header [" + eventUnwrappedAvro.getHeaders().size() + "] fields, body [" + eventUnwrapped.getBody().length + "] bytes"); } } catch (Exception exception) { if (LOG.isWarnEnabled()) { LOG.warn("Failed to unwrap event, " + "perhaps this source is not connected to a sinkless connector?", exception); } } finally { IOUtils.closeQuietly(eventWrappedStream); } return eventUnwrapped; }
@SuppressWarnings("unchecked") @Override public JetstreamEvent decode(byte[] key, byte[] message) { ByteArrayInputStream stream = new ByteArrayInputStream(message); BinaryDecoder reusedDecoder = decoderHolder.get(); BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(stream, reusedDecoder); if (reusedDecoder == null) { decoderHolder.set(decoder); } Record object; try { object = reader.read(null, decoder); Map<String, Object> m = (Map<String, Object>) object.get(MAP_FIELD_NAME); return new JetstreamEvent(m); } catch (IOException e) { throw new IllegalArgumentException("Can not read the avro message", e); } }
@Test public void testEnums() throws Exception { Schema schema = Enums.SCHEMA$; String avroJson = "{\"enum1\": \"X\", \"enum2\": {\"test.Enum2\": \"A\"}, \"enum3\": {\"null\": null}, \"enum4\": [{\"test.Enum4\": \"SAT\"}, {\"test.Enum4\": \"SUN\"}]}}"; Decoder decoder = DecoderFactory.get().jsonDecoder(schema, avroJson); GenericDatumReader<Record> reader = new GenericDatumReader<Record>(schema); Record record1 = reader.read(null, decoder); String mongoJson = "{\"enum1\": \"X\", \"enum2\": \"A\", \"enum3\": null, \"enum4\": [\"SAT\", \"SUN\"]}}"; BSONObject object = (BSONObject) JSON.parse(mongoJson); Record record2 = RecordConverter.toRecord(schema, object, getClass().getClassLoader()); assertThat(record2, is(record1)); assertThat(AvroHelper.toSimpleJson(schema, record2), is(AvroHelper.toSimpleJson(schema, record1))); }
@Test public void testUnions() throws Exception { Schema schema = Unions.SCHEMA$; String avroJson = "{\"union1\": {\"int\": 1}, \"union2\": {\"test.Union2\": {\"union21\": {\"long\": 2}}}, \"union3\": {\"array\": [{\"boolean\": true}, {\"boolean\": false}, {\"null\": null}]}, \"union4\": {\"map\": {\"a\": {\"string\": \"A\"}, \"b\": {\"string\": \"B\"}, \"c\": {\"string\": \"C\"}}}, \"union5\": {\"null\": null}, \"union6\": {\"null\": null}}"; Decoder decoder = DecoderFactory.get().jsonDecoder(schema, avroJson); GenericDatumReader<Record> reader = new GenericDatumReader<Record>(schema); Record record1 = reader.read(null, decoder); String mongoJson = "{\"union1\": 1, \"union2\": {\"union21\": 2}, \"union3\": [true, false, null], \"union4\": {\"a\": \"A\", \"b\": \"B\", \"c\": \"C\"}, \"union5\": null, \"union6\": null}"; DBObject object = (DBObject) JSON.parse(mongoJson); Record record2 = RecordConverter.toRecord(schema, object, getClass().getClassLoader()); assertThat(record2, is(record1)); assertThat(AvroHelper.toSimpleJson(schema, record2), is(AvroHelper.toSimpleJson(schema, record1))); }
public AvroMessageParser( ProtoConfigurableEntity.Context context, final Schema schema, final byte[] message, final String messageId, final OriginAvroSchemaSource schemaSource ) throws IOException { this.context = context; this.messageId = messageId; this.schemaSource = schemaSource; datumReader = new GenericDatumReader<>(schema); //Reader schema argument is optional if(schemaSource == OriginAvroSchemaSource.SOURCE) { dataFileReader = new DataFileReader<>(new SeekableByteArrayInput(message), datumReader); } else { decoder = DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(message), null); avroRecord = new GenericData.Record(schema); } }
/** * Change the schema of an Avro record. * @param record The Avro record whose schema is to be changed. * @param newSchema The target schema. It must be compatible as reader schema with record.getSchema() as writer schema. * @return a new Avro record with the new schema. * @throws IOException if conversion failed. */ public static GenericRecord convertRecordSchema(GenericRecord record, Schema newSchema) throws IOException { if (record.getSchema().equals(newSchema)) { return record; } try { BinaryDecoder decoder = new DecoderFactory().binaryDecoder(recordToByteArray(record), null); DatumReader<GenericRecord> reader = new GenericDatumReader<>(record.getSchema(), newSchema); return reader.read(null, decoder); } catch (IOException e) { throw new IOException( String.format("Cannot convert avro record to new schema. Origianl schema = %s, new schema = %s", record.getSchema(), newSchema), e); } }
/** * Convert the payload in the input record to a deserialized object with the latest schema * * @param inputRecord the input record * @return the schema'ed payload object */ protected P upConvertPayload(GenericRecord inputRecord) throws DataConversionException { try { Schema payloadSchema = getPayloadSchema(inputRecord); // Set writer schema latestPayloadReader.setSchema(payloadSchema); byte[] payloadBytes = getPayloadBytes(inputRecord); Decoder decoder = DecoderFactory.get().binaryDecoder(payloadBytes, null); // 'latestPayloadReader.read' will convert the record from 'payloadSchema' to the latest payload schema return latestPayloadReader.read(null, decoder); } catch (Exception e) { throw new DataConversionException(e); } }
public KafkaAvroJobMonitor(String topic, MutableJobCatalog catalog, Config config, Schema schema, SchemaVersionWriter<?> versionWriter) { super(topic, catalog, config); this.schema = schema; this.decoder = new ThreadLocal<BinaryDecoder>() { @Override protected BinaryDecoder initialValue() { InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]); return DecoderFactory.get().binaryDecoder(dummyInputStream, null); } }; this.reader = new ThreadLocal<SpecificDatumReader<T>>() { @Override protected SpecificDatumReader<T> initialValue() { return new SpecificDatumReader<>(KafkaAvroJobMonitor.this.schema); } }; this.versionWriter = versionWriter; }
@Override public Collection<Either<JobSpec, URI>> parseJobSpec(byte[] message) throws IOException { InputStream is = new ByteArrayInputStream(message); this.versionWriter.readSchemaVersioningInformation(new DataInputStream(is)); Decoder decoder = DecoderFactory.get().binaryDecoder(is, this.decoder.get()); try { T decodedMessage = this.reader.get().read(null, decoder); return parseJobSpec(decodedMessage); } catch (AvroRuntimeException | IOException exc) { this.messageParseFailures.mark(); if (this.messageParseFailures.getFiveMinuteRate() < 1) { log.warn("Unable to decode input message.", exc); } else { log.warn("Unable to decode input message."); } return Lists.newArrayList(); } }