static void convertAvroToJson(InputStream inputStream, OutputStream outputStream, Schema schema) throws IOException { DatumReader<Object> reader = new GenericDatumReader<>(schema); DatumWriter<Object> writer = new GenericDatumWriter<>(schema); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null); JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, outputStream, true); Object datum = null; while (!binaryDecoder.isEnd()) { datum = reader.read(datum, binaryDecoder); writer.write(datum, jsonEncoder); jsonEncoder.flush(); } outputStream.flush(); }
/** * Encode the message data provided. * * @param <O> The type of the data to encode. * @param data The message data. * @throws EncodeMessageContentException Exception thrown if encoding the message content fails. */ public <O extends GenericContainer> void encode(O data) throws EncodeMessageContentException { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); this.schema = data.getSchema(); DatumWriter<O> outputDatumWriter = new SpecificDatumWriter<O>(this.schema); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null); outputDatumWriter.write(data, encoder); encoder.flush(); this.data = baos.toByteArray(); } catch (Exception ex) { throw new EncodeMessageContentException(ex); } }
public AvroFileInputStream(FileStatus status) throws IOException { pos = 0; buffer = new byte[0]; GenericDatumReader<Object> reader = new GenericDatumReader<Object>(); FileContext fc = FileContext.getFileContext(new Configuration()); fileReader = DataFileReader.openReader(new AvroFSInput(fc, status.getPath()),reader); Schema schema = fileReader.getSchema(); writer = new GenericDatumWriter<Object>(schema); output = new ByteArrayOutputStream(); JsonGenerator generator = new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8); MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter(); prettyPrinter.setRootValueSeparator(System.getProperty("line.separator")); generator.setPrettyPrinter(prettyPrinter); encoder = EncoderFactory.get().jsonEncoder(schema, generator); }
public void jsonReadWriteExample() throws IOException { Employee employee = Employee.newBuilder().setFirstName("Gaurav") .setLastName("Mazra").setSex(SEX.MALE).build(); DatumWriter<Employee> employeeWriter = new SpecificDatumWriter<>(Employee.class); byte[] data; try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(Employee.getClassSchema(), baos); employeeWriter.write(employee, jsonEncoder); jsonEncoder.flush(); data = baos.toByteArray(); } // serialized data System.out.println(new String(data)); DatumReader<Employee> employeeReader = new SpecificDatumReader<>(Employee.class); Decoder decoder = DecoderFactory.get().jsonDecoder(Employee.getClassSchema(), new String(data)); employee = employeeReader.read(null, decoder); //data after deserialization System.out.println(employee); }
/** * Process singlex. * * @throws Exception the exception */ public void processSinglex() throws Exception { int base = (int) System.currentTimeMillis(); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .build(); DatumWriter<GenericRecord> datumWriterUser = new GenericDatumWriter<GenericRecord>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<GenericRecord> datumReaderUser = new GenericDatumReader<GenericRecord>( User.getClassSchema()); GenericRecord genericRecord = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null) ); System.out.println(genericRecord); System.out.println( genericRecord.get("name")); }
public static void testReflect(Object value, Type type, String schema) throws Exception { // check that schema matches expected Schema s = ReflectData.get().getSchema(type); assertEquals(Schema.parse(schema), s); // check that value is serialized correctly ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(s); ByteArrayOutputStream out = new ByteArrayOutputStream(); writer.write(value, EncoderFactory.get().directBinaryEncoder(out, null)); ReflectDatumReader<Object> reader = new ReflectDatumReader<Object>(s); Object after = reader.read(null, DecoderFactory.get().binaryDecoder(out.toByteArray(), null)); assertEquals(value, after); }
@Override public byte[] serialize(String topic, T payload) { try { byte[] result = null; if (payload != null) { LOGGER.debug("data='{}'", payload); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(payload.getSchema()); datumWriter.write(payload, binaryEncoder); binaryEncoder.flush(); byteArrayOutputStream.close(); result = byteArrayOutputStream.toByteArray(); LOGGER.debug("serialized data='{}'", DatatypeConverter.printHexBinary(result)); } return result; } catch (IOException ex) { throw new SerializationException("Can't serialize payload='" + payload + "' for topic='" + topic + "'", ex); } }
private void initialize() throws IOException, NoSuchAlgorithmException { SeekableResettableInputBridge in = new SeekableResettableInputBridge(ris); long pos = in.tell(); in.seek(0L); fileReader = new DataFileReader<GenericRecord>(in, new GenericDatumReader<GenericRecord>()); fileReader.sync(pos); schema = fileReader.getSchema(); datumWriter = new GenericDatumWriter(schema); out = new ByteArrayOutputStream(); encoder = EncoderFactory.get().binaryEncoder(out, encoder); schemaHash = SchemaNormalization.parsingFingerprint("CRC-64-AVRO", schema); schemaHashString = Hex.encodeHexString(schemaHash); }
private byte[] serializeEvent(Event event, boolean useAvroEventFormat) throws IOException { byte[] bytes; if (useAvroEventFormat) { if (!tempOutStream.isPresent()) { tempOutStream = Optional.of(new ByteArrayOutputStream()); } if (!writer.isPresent()) { writer = Optional.of(new SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class)); } tempOutStream.get().reset(); AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody())); encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream.get(), encoder); writer.get().write(e, encoder); encoder.flush(); bytes = tempOutStream.get().toByteArray(); } else { bytes = event.getBody(); } return bytes; }
private byte[] serializeValue(Event event, boolean parseAsFlumeEvent) throws IOException { byte[] bytes; if (parseAsFlumeEvent) { if (!tempOutStream.isPresent()) { tempOutStream = Optional.of(new ByteArrayOutputStream()); } if (!writer.isPresent()) { writer = Optional.of(new SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class)); } tempOutStream.get().reset(); AvroFlumeEvent e = new AvroFlumeEvent( toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody())); encoder = EncoderFactory.get() .directBinaryEncoder(tempOutStream.get(), encoder); writer.get().write(e, encoder); encoder.flush(); bytes = tempOutStream.get().toByteArray(); } else { bytes = event.getBody(); } return bytes; }
private byte[] serialize(Object datum, Schema datumSchema) throws FlumeException { if (schema == null || !datumSchema.equals(schema)) { schema = datumSchema; out = new ByteArrayOutputStream(); writer = new ReflectDatumWriter<Object>(schema); encoder = EncoderFactory.get().binaryEncoder(out, null); } out.reset(); try { writer.write(datum, encoder); encoder.flush(); return out.toByteArray(); } catch (IOException e) { throw new FlumeException(e); } }
public static void main(String[] args) throws Exception { Employee employee = Employee.newBuilder().setFirstName("Gaurav") .setLastName("Mazra").setSex(SEX.MALE).build(); byte[] payload; DatumWriter<Employee> datumWriter = new SpecificDatumWriter<>(Employee.class); try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder out = EncoderFactory.get().binaryEncoder(baos, null); datumWriter.write(employee, out ); out.flush(); payload = baos.toByteArray(); } catch (Exception e) { System.err.println(e); throw e; } System.out.println(new String(payload)); System.out.println(payload.length); }
public static void main(String[] args) throws Exception { Node node = new Node(); node.setValue("Gaurav"); node.setNext(node); byte[] payload; DatumWriter<Node> datumWriter = new ReflectDatumWriter<>(Node.class); try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder out = EncoderFactory.get().binaryEncoder(baos, null); datumWriter.write(node, out ); out.flush(); payload = baos.toByteArray(); } catch (Exception e) { System.err.println(e); throw e; } System.out.println(new String(payload)); System.out.println(payload.length); }
public static void main(String[] args) throws Exception { Node node = new Node(); node.setValue("Gaurav"); node.setNext(node); byte[] payload; DatumWriter<Node> datumWriter = new SpecificDatumWriter<>(Node.class); try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder out = EncoderFactory.get().binaryEncoder(baos, null); datumWriter.write(node, out ); out.flush(); payload = baos.toByteArray(); } catch (Exception e) { System.err.println(e); throw e; } System.out.println(new String(payload)); System.out.println(payload.length); }
public void binaryReadWriteExample() throws IOException { Employee employee = Employee.newBuilder().setFirstName("Gaurav") .setLastName("Mazra").setSex(SEX.MALE).build(); DatumWriter<Employee> employeeWriter = new SpecificDatumWriter<>(Employee.class); byte[] data; try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); employeeWriter.write(employee, binaryEncoder); binaryEncoder.flush(); data = baos.toByteArray(); } // serialized data System.out.println(data); DatumReader<Employee> employeeReader = new SpecificDatumReader<>(Employee.class); Decoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, null); employee = employeeReader.read(null, binaryDecoder); //data after deserialization System.out.println(employee); }
/** * tests Avro Serializer */ @Test public void testSerializer() throws Exception { Context context = new Context(); String schemaFile = getClass().getResource("/schema.avsc").getFile(); context.put(ES_AVRO_SCHEMA_FILE, schemaFile); avroSerializer.configure(context); Schema schema = new Schema.Parser().parse(new File(schemaFile)); GenericRecord user = generateGenericRecord(schema); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Encoder encoder = new EncoderFactory().binaryEncoder(outputStream, null); datumWriter.write(user, encoder); encoder.flush(); Event event = EventBuilder.withBody(outputStream.toByteArray()); XContentBuilder expected = generateContentBuilder(); XContentBuilder actual = avroSerializer.serialize(event); JsonParser parser = new JsonParser(); assertEquals(parser.parse(expected.string()), parser.parse(actual.string())); }
@Override public byte[] serialize(String s, WrapperAppMessage wrapperAppMessage) { DatumWriter<WrapperAppMessage> datumWriter = new SpecificDatumWriter<>(wrapperAppMessage.getSchema()); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { datumWriter.write(wrapperAppMessage, binaryEncoder); binaryEncoder.flush();//带缓冲区的binaryEncoder和直接directBinaryEncoder不一样,需要flush一下,否则字节数组没有数据 outputStream.flush(); outputStream.close(); } catch (IOException e) { e.printStackTrace(); } byte[] data = outputStream.toByteArray(); return data; }
@VisibleForTesting public EventWriter(FSDataOutputStream out, WriteMode mode) throws IOException { this.out = out; this.writeMode = mode; if (this.writeMode==WriteMode.JSON) { this.jsonOutput = true; out.writeBytes(VERSION); } else if (this.writeMode==WriteMode.BINARY) { this.jsonOutput = false; out.writeBytes(VERSION_BINARY); } else { throw new IOException("Unknown mode: " + mode); } out.writeBytes("\n"); out.writeBytes(Event.SCHEMA$.toString()); out.writeBytes("\n"); if (!this.jsonOutput) { this.encoder = EncoderFactory.get().binaryEncoder(out, null); } else { this.encoder = EncoderFactory.get().jsonEncoder(Event.SCHEMA$, out); } }
/** * Serialize the record to prepare for publishing. * * @param record the GenericRecord * @param schema the Avro Schema * @param ggAvroSchema the internal representation of the Avro schema * @return the serialized record * @throws IOException if there is a problem */ private byte[] serializeRecord(GenericRecord record, Schema schema, @SuppressWarnings("unused") AvroSchema ggAvroSchema) throws IOException { byte[] rval; BinaryEncoder encoder = null; // serialize the record into a byte array ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); encoder = EncoderFactory.get().directBinaryEncoder(out, encoder); writer.write(record, encoder); encoder.flush(); rval = out.toByteArray(); //out.close(); // noop in the Apache version, so not bothering return rval; }
public void send(COL_RDBMS event) throws Exception { EncoderFactory avroEncoderFactory = EncoderFactory.get(); SpecificDatumWriter<COL_RDBMS> avroEventWriter = new SpecificDatumWriter<COL_RDBMS>(COL_RDBMS.SCHEMA$); ByteArrayOutputStream stream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null); try { avroEventWriter.write(event, binaryEncoder); binaryEncoder.flush(); } catch (IOException e) { e.printStackTrace(); throw e; } IOUtils.closeQuietly(stream); KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>( TOPIC, stream.toByteArray()); producer.send(data); }
public void send(COL_ONEM2M event) throws Exception { EncoderFactory avroEncoderFactory = EncoderFactory.get(); SpecificDatumWriter<COL_ONEM2M> avroEventWriter = new SpecificDatumWriter<COL_ONEM2M>(COL_ONEM2M.SCHEMA$); ByteArrayOutputStream stream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null); try { avroEventWriter.write(event, binaryEncoder); binaryEncoder.flush(); } catch (IOException e) { e.printStackTrace(); throw e; } IOUtils.closeQuietly(stream); KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>( TOPIC, stream.toByteArray()); producer.send(data); }
static void convertJsonToAvro(InputStream inputStream, OutputStream outputStream, Schema schema) throws IOException { DatumReader<Object> reader = new GenericDatumReader<>(schema); DatumWriter<Object> writer = new GenericDatumWriter<>(schema); Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null); JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(schema, inputStream); Object datum = null; while (true) { try { datum = reader.read(datum, jsonDecoder); } catch (EOFException eofException) { break; } writer.write(datum, binaryEncoder); binaryEncoder.flush(); } outputStream.flush(); }
public byte[] toBytes(Schema toSchema, Object o) { if (toSchema != null && toSchema.getType() == Type.UNION) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); org.apache.avro.io.BinaryEncoder avroEncoder = EncoderFactory.get().binaryEncoder(baos, null); int unionIndex = 0; try { if (o == null) { unionIndex = firstNullSchemaTypeIndex(toSchema); avroEncoder.writeIndex(unionIndex); avroEncoder.writeNull(); } else { unionIndex = firstNotNullSchemaTypeIndex(toSchema); avroEncoder.writeIndex(unionIndex); avroEncoder.writeBytes(toBytes(o)); } avroEncoder.flush(); return baos.toByteArray(); } catch (IOException e) { LOG.error(e.getMessage()); return toBytes(o); } } else { return toBytes(o); } }
/** * @see ClientConnection#offerMessage(ServerToClient) */ public boolean offerMessage(ServerToClient message) { Session session = getSession(); if (session == null || !session.isOpen()) return false; if (inFlightMessages.incrementAndGet() > MAX_IN_FLIGHT_MESSAGES) { inFlightMessages.decrementAndGet(); return false; } ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null); writeToClient.write(message, encoder); encoder.flush(); } catch (Exception e) { log.warn("Failed to encode message to client", e); session.close(StatusCode.SERVER_ERROR, "Internal server error"); return true; } session.getRemote().sendBytes(ByteBuffer.wrap(stream.toByteArray()), new WriteCallback() { @Override public void writeSuccess() { inFlightMessages.decrementAndGet(); } @Override public void writeFailed(Throwable error) { inFlightMessages.decrementAndGet(); log.info("Sending message to WebSocket client failed: ", error); } }); return true; }
/** * To byte array. * * @param iote2eRequest the iote 2 e request * @return the byte[] * @throws Exception the exception */ public byte[] toByteArray( Iote2eRequest iote2eRequest ) throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { if( this.datumWriterIote2eRequest == null ) { this.datumWriterIote2eRequest = new SpecificDatumWriter<Iote2eRequest>(Iote2eRequest.getClassSchema()); } this.binaryEncoder = EncoderFactory.get().binaryEncoder(baos, this.binaryEncoder ); this.datumWriterIote2eRequest.write(iote2eRequest, this.binaryEncoder ); this.binaryEncoder.flush(); this.bytes = baos.toByteArray(); return this.bytes; } catch (Exception e) { logger.error(e.getMessage(),e); throw e; } finally { baos.close(); } }
/** * To byte array. * * @param iote2eResult the iote 2 e result * @return the byte[] * @throws Exception the exception */ public byte[] toByteArray( Iote2eResult iote2eResult ) throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { if( this.datumWriterIote2eResult == null ) { this.datumWriterIote2eResult = new SpecificDatumWriter<Iote2eResult>(Iote2eResult.getClassSchema()); } this.binaryEncoder = EncoderFactory.get().binaryEncoder(baos, this.binaryEncoder ); this.datumWriterIote2eResult.write(iote2eResult, this.binaryEncoder ); this.binaryEncoder.flush(); this.bytes = baos.toByteArray(); return this.bytes; } catch (Exception e) { logger.error(e.getMessage(),e); throw e; } finally { baos.close(); } }