public void jsonReadWriteExample() throws IOException { Employee employee = Employee.newBuilder().setFirstName("Gaurav") .setLastName("Mazra").setSex(SEX.MALE).build(); DatumWriter<Employee> employeeWriter = new SpecificDatumWriter<>(Employee.class); byte[] data; try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(Employee.getClassSchema(), baos); employeeWriter.write(employee, jsonEncoder); jsonEncoder.flush(); data = baos.toByteArray(); } // serialized data System.out.println(new String(data)); DatumReader<Employee> employeeReader = new SpecificDatumReader<>(Employee.class); Decoder decoder = DecoderFactory.get().jsonDecoder(Employee.getClassSchema(), new String(data)); employee = employeeReader.read(null, decoder); //data after deserialization System.out.println(employee); }
private void checkNumeric(String type, Object value) throws Exception { String def = "{\"type\":\"record\",\"name\":\"X\",\"fields\":" +"[{\"type\":\""+type+"\",\"name\":\"n\"}]}"; Schema schema = Schema.parse(def); DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); String[] records = {"{\"n\":1}", "{\"n\":1.0}"}; for (String record : records) { Decoder decoder = new ExtendedJsonDecoder(schema, record); GenericRecord r = reader.read(null, decoder); Assert.assertEquals(value, r.get("n")); } }
@SuppressWarnings("unchecked") @Override public T deserialize(String topic, byte[] data) { try { T result = null; if (data != null) { LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data)); DatumReader<GenericRecord> datumReader = new SpecificDatumReader<>( targetType.newInstance().getSchema()); Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); result = (T) datumReader.read(null, decoder); LOGGER.debug("deserialized data='{}'", result); } return result; } catch (Exception ex) { throw new SerializationException( "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex); } }
public void binaryReadWriteExample() throws IOException { Employee employee = Employee.newBuilder().setFirstName("Gaurav") .setLastName("Mazra").setSex(SEX.MALE).build(); DatumWriter<Employee> employeeWriter = new SpecificDatumWriter<>(Employee.class); byte[] data; try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); employeeWriter.write(employee, binaryEncoder); binaryEncoder.flush(); data = baos.toByteArray(); } // serialized data System.out.println(data); DatumReader<Employee> employeeReader = new SpecificDatumReader<>(Employee.class); Decoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, null); employee = employeeReader.read(null, binaryDecoder); //data after deserialization System.out.println(employee); }
/** * Converts the avro binary data to the json format */ @Override public XContentBuilder serialize(Event event) { XContentBuilder builder = null; try { if (datumReader != null) { Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); GenericRecord data = datumReader.read(null, decoder); logger.trace("Record in event " + data); XContentParser parser = XContentFactory .xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, data.toString()); builder = jsonBuilder().copyCurrentStructure(parser); parser.close(); } else { logger.error("Schema File is not configured"); } } catch (IOException e) { logger.error("Exception in parsing avro format data but continuing serialization to process further records", e.getMessage(), e); } return builder; }
public Object fromBytes(Schema schema, byte data[]) throws GoraException { Schema fromSchema = null; if (schema.getType() == Type.UNION) { try { Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); int unionIndex = decoder.readIndex(); List<Schema> possibleTypes = schema.getTypes(); fromSchema = possibleTypes.get(unionIndex); Schema effectiveSchema = possibleTypes.get(unionIndex); if (effectiveSchema.getType() == Type.NULL) { decoder.readNull(); return null; } else { data = decoder.readBytes(null).array(); } } catch (IOException e) { LOG.error(e.getMessage()); throw new GoraException("Error decoding union type: ", e); } } else { fromSchema = schema; } return fromBytes(encoder, fromSchema, data); }
/** * Reads a boolean[] from input * @throws IOException */ public static boolean[] readBoolArray(Decoder in) throws IOException { int length = in.readInt(); boolean[] boolArr = new boolean[length]; int byteArrLength = (int)Math.ceil(length / 8.0); byte[] byteArr = new byte[byteArrLength]; in.readFixed(byteArr); int arrIndex = 0; byte b = 0; for(int i=0; i < length; i++) { if(i % 8 == 0) { b = byteArr[arrIndex++]; } boolArr[i] = (b & 0x01) > 0; b >>= 1; } return boolArr; }
/** * Processes an Avro Blob containing a single message and with no embedded * schema. This is the pattern when Avro objects are passed over messaging * infrastructure such as Apache Kafka. * * @param avroMessage * The Blob that holds the single Avro message object * @param avroKey * The Blob that holds the single Avro key object (if passed) * @param outStream * The stream to which the JSON string must be submitted * @param outTuple * The tuple holding the JSON string * @param messageSchema * The schema of the Avro messsage object * @param keySchema * The schema of the Avro key object * @throws Exception */ private void processAvroMessage(Blob avroMessage, Blob avroKey, StreamingOutput<OutputTuple> outStream, OutputTuple outTuple, Schema messageSchema, Schema keySchema) throws Exception { // Deserialize message GenericDatumReader<GenericRecord> consumer = new GenericDatumReader<GenericRecord>(messageSchema); ByteArrayInputStream consumedByteArray = new ByteArrayInputStream(avroMessage.getData()); Decoder consumedDecoder = DecoderFactory.get().binaryDecoder(consumedByteArray, null); GenericRecord consumedDatum = consumer.read(null, consumedDecoder); if (LOGGER.isTraceEnabled()) LOGGER.log(TraceLevel.TRACE, "JSON representation of Avro message: " + consumedDatum.toString()); outTuple.setString(outputJsonMessage, consumedDatum.toString()); // Deserialize key (if specified) if (avroKey != null) { consumer = new GenericDatumReader<GenericRecord>(keySchema); consumedByteArray = new ByteArrayInputStream(avroKey.getData()); consumedDecoder = DecoderFactory.get().binaryDecoder(consumedByteArray, null); consumedDatum = consumer.read(null, consumedDecoder); if (LOGGER.isTraceEnabled()) LOGGER.log(TraceLevel.TRACE, "JSON representation of Avro key: " + consumedDatum.toString()); if (outputJsonKey != null) outTuple.setString(outputJsonKey, consumedDatum.toString()); } // Submit new tuple to output port 0 outStream.submit(outTuple); }
@SuppressWarnings("unchecked") @Override public T deserialize(String topic, byte[] data) { try { T result = null; if (data != null) { LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data)); DatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(targetType.newInstance().getSchema()); Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); result = (T) datumReader.read(null, decoder); LOGGER.debug("deserialized data='{}'", result); } return result; } catch (Exception ex) { throw new SerializationException( "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex); } }
@Override protected GenericRecord decodeRecord(MessageAndOffset messageAndOffset) throws SchemaNotFoundException, IOException { byte[] payload = getBytes(messageAndOffset.message().payload()); if (payload[0] != KafkaAvroSchemaRegistry.MAGIC_BYTE) { throw new RuntimeException(String.format("Unknown magic byte for partition %s", this.getCurrentPartition())); } byte[] schemaIdByteArray = Arrays.copyOfRange(payload, 1, 1 + KafkaAvroSchemaRegistry.SCHEMA_ID_LENGTH_BYTE); String schemaId = Hex.encodeHexString(schemaIdByteArray); Schema schema = null; schema = this.schemaRegistry.getSchemaById(schemaId); reader.get().setSchema(schema); Decoder binaryDecoder = DecoderFactory.get().binaryDecoder(payload, 1 + KafkaAvroSchemaRegistry.SCHEMA_ID_LENGTH_BYTE, payload.length - 1 - KafkaAvroSchemaRegistry.SCHEMA_ID_LENGTH_BYTE, null); try { GenericRecord record = reader.get().read(null, binaryDecoder); record = AvroUtils.convertRecordSchema(record, this.schema.get()); return record; } catch (IOException e) { LOG.error(String.format("Error during decoding record for partition %s: ", this.getCurrentPartition())); throw e; } }
@Test public void testEnums() throws Exception { Schema schema = Enums.SCHEMA$; String avroJson = "{\"enum1\": \"X\", \"enum2\": {\"test.Enum2\": \"A\"}, \"enum3\": {\"null\": null}, \"enum4\": [{\"test.Enum4\": \"SAT\"}, {\"test.Enum4\": \"SUN\"}]}}"; Decoder decoder = DecoderFactory.get().jsonDecoder(schema, avroJson); GenericDatumReader<Record> reader = new GenericDatumReader<Record>(schema); Record record1 = reader.read(null, decoder); String mongoJson = "{\"enum1\": \"X\", \"enum2\": \"A\", \"enum3\": null, \"enum4\": [\"SAT\", \"SUN\"]}}"; BSONObject object = (BSONObject) JSON.parse(mongoJson); Record record2 = RecordConverter.toRecord(schema, object, getClass().getClassLoader()); assertThat(record2, is(record1)); assertThat(AvroHelper.toSimpleJson(schema, record2), is(AvroHelper.toSimpleJson(schema, record1))); }
@Test public void testUnions() throws Exception { Schema schema = Unions.SCHEMA$; String avroJson = "{\"union1\": {\"int\": 1}, \"union2\": {\"test.Union2\": {\"union21\": {\"long\": 2}}}, \"union3\": {\"array\": [{\"boolean\": true}, {\"boolean\": false}, {\"null\": null}]}, \"union4\": {\"map\": {\"a\": {\"string\": \"A\"}, \"b\": {\"string\": \"B\"}, \"c\": {\"string\": \"C\"}}}, \"union5\": {\"null\": null}, \"union6\": {\"null\": null}}"; Decoder decoder = DecoderFactory.get().jsonDecoder(schema, avroJson); GenericDatumReader<Record> reader = new GenericDatumReader<Record>(schema); Record record1 = reader.read(null, decoder); String mongoJson = "{\"union1\": 1, \"union2\": {\"union21\": 2}, \"union3\": [true, false, null], \"union4\": {\"a\": \"A\", \"b\": \"B\", \"c\": \"C\"}, \"union5\": null, \"union6\": null}"; DBObject object = (DBObject) JSON.parse(mongoJson); Record record2 = RecordConverter.toRecord(schema, object, getClass().getClassLoader()); assertThat(record2, is(record1)); assertThat(AvroHelper.toSimpleJson(schema, record2), is(AvroHelper.toSimpleJson(schema, record1))); }
/** * Convert the payload in the input record to a deserialized object with the latest schema * * @param inputRecord the input record * @return the schema'ed payload object */ protected P upConvertPayload(GenericRecord inputRecord) throws DataConversionException { try { Schema payloadSchema = getPayloadSchema(inputRecord); // Set writer schema latestPayloadReader.setSchema(payloadSchema); byte[] payloadBytes = getPayloadBytes(inputRecord); Decoder decoder = DecoderFactory.get().binaryDecoder(payloadBytes, null); // 'latestPayloadReader.read' will convert the record from 'payloadSchema' to the latest payload schema return latestPayloadReader.read(null, decoder); } catch (Exception e) { throw new DataConversionException(e); } }
@Override public Collection<Either<JobSpec, URI>> parseJobSpec(byte[] message) throws IOException { InputStream is = new ByteArrayInputStream(message); this.versionWriter.readSchemaVersioningInformation(new DataInputStream(is)); Decoder decoder = DecoderFactory.get().binaryDecoder(is, this.decoder.get()); try { T decodedMessage = this.reader.get().read(null, decoder); return parseJobSpec(decodedMessage); } catch (AvroRuntimeException | IOException exc) { this.messageParseFailures.mark(); if (this.messageParseFailures.getFiveMinuteRate() < 1) { log.warn("Unable to decode input message.", exc); } else { log.warn("Unable to decode input message."); } return Lists.newArrayList(); } }
public Object fromBytes(Schema schema, byte[] data) throws IOException { Schema fromSchema = null; if (schema.getType() == Type.UNION) { try { Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); int unionIndex = decoder.readIndex(); List<Schema> possibleTypes = schema.getTypes(); fromSchema = possibleTypes.get(unionIndex); Schema effectiveSchema = possibleTypes.get(unionIndex); if (effectiveSchema.getType() == Type.NULL) { decoder.readNull(); return null; } else { data = decoder.readBytes(null).array(); } } catch (IOException e) { LOG.error(e.getMessage()); throw new GoraException("Error decoding union type: ", e); } } else { fromSchema = schema; } return fromBytes(encoder, fromSchema, data); }
/** * Reads a boolean[] from input. * * @param in decoder instance which wraps the input stream where data is read. * @return boolean array. * @throws IOException when failed reading the data from stream. */ public static boolean[] readBoolArray(Decoder in) throws IOException { int length = in.readInt(); boolean[] boolArr = new boolean[length]; int byteArrLength = (int)Math.ceil(length / 8.0); byte[] byteArr = new byte[byteArrLength]; in.readFixed(byteArr); int arrIndex = 0; byte b = 0; for(int i=0; i < length; i++) { if(i % 8 == 0) { b = byteArr[arrIndex++]; } boolArr[i] = (b & 0x01) > 0; b >>= 1; } return boolArr; }
@Override public void decode(PipelinePack pack, Handler<PipelinePack> pipelinePackHandler) { final Message message = pack.getMessage(); final Buffer buffer = message.getPayload(); // TODO: data = pack.getMsgBytes() DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); GenericRecord genericRecord = new GenericData.Record(schema); try { Decoder decoder = DecoderFactory.get().binaryDecoder(buffer.getBytes(), null); genericRecord = reader.read(genericRecord, decoder); } catch (Exception e) { logger.error("Cannot decode data", e); } pack.setMessage(genericRecord); pipelinePackHandler.handle(pack); }
@Test public void testEnums() throws Exception { Schema schema = Enums.SCHEMA$; String avroJson = "{\"enum1\": \"X\", \"enum2\": {\"test.Enum2\": \"A\"}, \"enum3\": {\"null\": null}, \"enum4\": [{\"test.Enum4\": \"SAT\"}, {\"test.Enum4\": \"SUN\"}]}}"; Decoder decoder = DecoderFactory.get().jsonDecoder(schema, avroJson); GenericDatumReader<Record> reader = new GenericDatumReader<Record>(schema); Record record1 = reader.read(null, decoder); String mongoJson = "{\"enum1\": \"X\", \"enum2\": \"A\", \"enum3\": null, \"enum4\": [\"SAT\", \"SUN\"]}}"; BSONObject object = (BSONObject) JSON.parse(mongoJson); Record record2 = RecordConverter.toRecord(schema, object, getClass().getClassLoader()); assertThat(record2).isEqualTo(record1); assertThat(AvroHelper.toJson(schema, record2)).isEqualTo(AvroHelper.toJson(schema, record1)); }
@Test public void testUnions() throws Exception { Schema schema = Unions.SCHEMA$; String avroJson = "{\"union1\": {\"int\": 1}, \"union2\": {\"test.Union2\": {\"union21\": {\"long\": 2}}}, \"union3\": {\"array\": [{\"boolean\": true}, {\"boolean\": false}, {\"null\": null}]}, \"union4\": {\"map\": {\"a\": {\"string\": \"A\"}, \"b\": {\"string\": \"B\"}, \"c\": {\"string\": \"C\"}}}, \"union5\": {\"null\": null}, \"union6\": {\"null\": null}}"; Decoder decoder = DecoderFactory.get().jsonDecoder(schema, avroJson); GenericDatumReader<Record> reader = new GenericDatumReader<Record>(schema); Record record1 = reader.read(null, decoder); String mongoJson = "{\"union1\": 1, \"union2\": {\"union21\": 2}, \"union3\": [true, false, null], \"union4\": {\"a\": \"A\", \"b\": \"B\", \"c\": \"C\"}, \"union5\": null, \"union6\": null}"; DBObject object = (DBObject) JSON.parse(mongoJson); Record record2 = RecordConverter.toRecord(schema, object, getClass().getClassLoader()); assertThat(record2).isEqualTo(record1); assertThat(AvroHelper.toJson(schema, record2)).isEqualTo(AvroHelper.toJson(schema, record1)); }
@Override public Object deserializeColumnValueFromBytes(String fieldName, byte[] bytes) { Field field = avroSchema.getAvroSchema().getField(fieldName); DatumReader<Object> datumReader = fieldDatumReaders.get(fieldName); if (field == null) { throw new SchemaValidationException("Invalid field name " + fieldName + " for schema " + avroSchema.toString()); } if (datumReader == null) { throw new SchemaValidationException("No datum reader for field name: " + fieldName); } ByteArrayInputStream byteIn = new ByteArrayInputStream(bytes); Decoder decoder = getColumnDecoder(field.schema(), byteIn); return AvroUtils.readAvroEntity(decoder, datumReader); }
@Test public void testDecodeInt() throws Exception { InputStream in = new ByteArrayInputStream(new byte[] { (byte) 0x80, (byte) 0x00, (byte) 0x00, (byte) 0x01 }); Decoder decoder = new MemcmpDecoder(in); int i = decoder.readInt(); assertEquals(1, i); in = new ByteArrayInputStream(new byte[] { (byte) 0x7f, (byte) 0xff, (byte) 0xff, (byte) 0xff }); decoder = new MemcmpDecoder(in); i = decoder.readInt(); assertEquals(-1, i); in = new ByteArrayInputStream(new byte[] { (byte) 0x80, (byte) 0x00, (byte) 0x00, (byte) 0x00 }); decoder = new MemcmpDecoder(in); i = decoder.readInt(); assertEquals(0, i); }
@Test public void testDecodeLong() throws Exception { InputStream in = new ByteArrayInputStream(new byte[] { (byte) 0x80, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x01 }); Decoder decoder = new MemcmpDecoder(in); long i = decoder.readLong(); assertEquals(1L, i); in = new ByteArrayInputStream(new byte[] { (byte) 0x7f, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff }); decoder = new MemcmpDecoder(in); i = decoder.readLong(); assertEquals(-1L, i); in = new ByteArrayInputStream(new byte[] { (byte) 0x80, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00 }); decoder = new MemcmpDecoder(in); i = decoder.readLong(); assertEquals(0L, i); }
@Test public void testReadEncoderOutput() throws Exception { ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); Encoder encoder = new MemcmpEncoder(byteOutputStream); encoder.writeFloat(1.1f); InputStream in = new ByteArrayInputStream(byteOutputStream.toByteArray()); Decoder decoder = new MemcmpDecoder(in); float readFloat = decoder.readFloat(); assertEquals(1.1f, readFloat, 0.0001); byteOutputStream = new ByteArrayOutputStream(); encoder = new MemcmpEncoder(byteOutputStream); encoder.writeDouble(1.1d); in = new ByteArrayInputStream(byteOutputStream.toByteArray()); decoder = new MemcmpDecoder(in); double readDouble = decoder.readDouble(); assertEquals(1.1d, readDouble, 0.0001); byteOutputStream = new ByteArrayOutputStream(); encoder = new MemcmpEncoder(byteOutputStream); encoder.writeString("hello there"); in = new ByteArrayInputStream(byteOutputStream.toByteArray()); decoder = new MemcmpDecoder(in); Utf8 readString = decoder.readString(null); assertEquals("hello there", readString.toString()); }
@Test public void testDecodeInt() throws Exception { InputStream in = new ByteArrayInputStream(new byte[] { (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x01}); Decoder decoder = new ColumnDecoder(in); int i = decoder.readInt(); assertEquals(1, i); in = new ByteArrayInputStream(new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff }); decoder = new ColumnDecoder(in); i = decoder.readInt(); assertEquals(-1, i); in = new ByteArrayInputStream(new byte[] { (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00 }); decoder = new ColumnDecoder(in); i = decoder.readInt(); assertEquals(0, i); }
@Test public void testDecodeLong() throws Exception { InputStream in = new ByteArrayInputStream(new byte[] { (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x01 }); Decoder decoder = new ColumnDecoder(in); long i = decoder.readLong(); assertEquals(1L, i); in = new ByteArrayInputStream(new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff }); decoder = new ColumnDecoder(in); i = decoder.readLong(); assertEquals(-1L, i); in = new ByteArrayInputStream(new byte[] { (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00 }); decoder = new ColumnDecoder(in); i = decoder.readLong(); assertEquals(0L, i); }
@Override protected boolean doProcess(Record inputRecord, InputStream in) throws IOException { Record template = inputRecord.copy(); removeAttachments(template); template.put(Fields.ATTACHMENT_MIME_TYPE, ReadAvroBuilder.AVRO_MEMORY_MIME_TYPE); Decoder decoder = prepare(in); try { while (true) { GenericContainer datum = datumReader.read(null, decoder); if (!extract(datum, template)) { return false; } } } catch (EOFException e) { ; // ignore } finally { in.close(); } return true; }
private Decoder prepare(InputStream in) throws IOException { Decoder decoder; if (isJson) { if (jsonDecoder == null) { jsonDecoder = DecoderFactory.get().jsonDecoder(writerSchema, in); } else { jsonDecoder.configure(in); // reuse for performance } decoder = jsonDecoder; } else { binaryDecoder = DecoderFactory.get().binaryDecoder(in, binaryDecoder); // reuse for performance decoder = binaryDecoder; } if (datumReader == null) { // reuse for performance Schema readSchema = readerSchema != null ? readerSchema : writerSchema; datumReader = new FastGenericDatumReader<GenericContainer>(writerSchema, readSchema); datumReader.setResolver(createResolver(writerSchema, readSchema)); } return decoder; }
/** * Performs a deep copy of a avro node message. This does not reuse existing * decoders/encoders, so if you're going to do a lot of copying, use a NodeCopier. * * @param node The node to copy. * @return A copy of the node. */ public static Node copy(Node node) { try { ByteArrayOutputStream out = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); SpecificDatumWriter<Node> writer = new SpecificDatumWriter<Node>(Node.SCHEMA$); writer.write(node, encoder); SpecificDatumReader<Node> reader = new SpecificDatumReader<Node>(Node.SCHEMA$); Decoder decoder = DecoderFactory.get().binaryDecoder( new ByteArrayInputStream(out.toByteArray()), null); return reader.read(null, decoder); } catch (IOException e) { throw new RuntimeException("Unable to copy node."); } }
@Test public void encodeNullValueExplicit() throws Exception { String json = "{ \"id\": 1, \"str\": null }"; Decoder decoder = createDecoder(json); Record avro = (GenericData.Record) datumReader.read(null, decoder); assertThat(avro.toString(), is("{\"id\": 1, \"str\": null}")); }
@Test public void encodeValue1AvroCompatible() throws Exception { String json = "{ \"id\": 1, \"str\": { \"string\": \"hello\"} }"; Decoder decoder = createDecoder(json); Record avro = (GenericData.Record) datumReader.read(null, decoder); assertThat(avro.toString(), is("{\"id\": 1, \"str\": \"hello\"}")); }
@Test public void encodeValue2AvroCompatible() throws Exception { String json = "{ \"id\": 1, \"str\": { \"long\": 2} }"; Decoder decoder = createDecoder(json); Record avro = (GenericData.Record) datumReader.read(null, decoder); assertThat(avro.toString(), is("{\"id\": 1, \"str\": 2}")); }
@Test public void encodeValue3AvroCompatible() throws Exception { String json = "{ \"id\": 1, \"str\": { \"inner\": { \"a\": 3, \"b\": 4 }} }"; Decoder decoder = createDecoder(json); Record avro = (GenericData.Record) datumReader.read(null, decoder); assertThat(avro.toString(), is("{\"id\": 1, \"str\": {\"a\": 3, \"b\": 4}}")); }
@Test public void encodeValue1AvroCompatible() throws Exception { String json = "{ \"id\": 1, \"str\": { \"string\": \"hello\"} }"; Decoder decoder = createDecoder(json); Record avro = (GenericData.Record) datumReader.read(null, decoder); assertThat(avro.get("str") instanceof Utf8, is(true)); assertThat(avro.toString(), is("{\"id\": 1, \"str\": \"hello\"}")); }
@Test public void encodeValue2AvroCompatible() throws Exception { String json = "{ \"id\": 1, \"str\": { \"bytes\": \"AAEC\"} }"; Decoder decoder = createDecoder(json); Record avro = (GenericData.Record) datumReader.read(null, decoder); assertThat(avro.get("str") instanceof ByteBuffer, is(true)); assertThat(avro.toString(), is("{\"id\": 1, \"str\": {\"bytes\": \"AAEC\"}}")); }