static void convertAvroToJson(InputStream inputStream, OutputStream outputStream, Schema schema) throws IOException { DatumReader<Object> reader = new GenericDatumReader<>(schema); DatumWriter<Object> writer = new GenericDatumWriter<>(schema); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null); JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, outputStream, true); Object datum = null; while (!binaryDecoder.isEnd()) { datum = reader.read(datum, binaryDecoder); writer.write(datum, jsonEncoder); jsonEncoder.flush(); } outputStream.flush(); }
/** * Encode the message data provided. * * @param <O> The type of the data to encode. * @param data The message data. * @throws EncodeMessageContentException Exception thrown if encoding the message content fails. */ public <O extends GenericContainer> void encode(O data) throws EncodeMessageContentException { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); this.schema = data.getSchema(); DatumWriter<O> outputDatumWriter = new SpecificDatumWriter<O>(this.schema); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null); outputDatumWriter.write(data, encoder); encoder.flush(); this.data = baos.toByteArray(); } catch (Exception ex) { throw new EncodeMessageContentException(ex); } }
public void jsonReadWriteExample() throws IOException { Employee employee = Employee.newBuilder().setFirstName("Gaurav") .setLastName("Mazra").setSex(SEX.MALE).build(); DatumWriter<Employee> employeeWriter = new SpecificDatumWriter<>(Employee.class); byte[] data; try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(Employee.getClassSchema(), baos); employeeWriter.write(employee, jsonEncoder); jsonEncoder.flush(); data = baos.toByteArray(); } // serialized data System.out.println(new String(data)); DatumReader<Employee> employeeReader = new SpecificDatumReader<>(Employee.class); Decoder decoder = DecoderFactory.get().jsonDecoder(Employee.getClassSchema(), new String(data)); employee = employeeReader.read(null, decoder); //data after deserialization System.out.println(employee); }
/** * Process singlex. * * @throws Exception the exception */ public void processSinglex() throws Exception { int base = (int) System.currentTimeMillis(); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .build(); DatumWriter<GenericRecord> datumWriterUser = new GenericDatumWriter<GenericRecord>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<GenericRecord> datumReaderUser = new GenericDatumReader<GenericRecord>( User.getClassSchema()); GenericRecord genericRecord = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null) ); System.out.println(genericRecord); System.out.println( genericRecord.get("name")); }
@Override public byte[] serialize(String topic, T payload) { try { byte[] result = null; if (payload != null) { LOGGER.debug("data='{}'", payload); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(payload.getSchema()); datumWriter.write(payload, binaryEncoder); binaryEncoder.flush(); byteArrayOutputStream.close(); result = byteArrayOutputStream.toByteArray(); LOGGER.debug("serialized data='{}'", DatatypeConverter.printHexBinary(result)); } return result; } catch (IOException ex) { throw new SerializationException("Can't serialize payload='" + payload + "' for topic='" + topic + "'", ex); } }
private static Path createDataFile() throws IOException { File avroFile = File.createTempFile("test-", "." + FILE_EXTENSION); DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema); try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer)) { dataFileWriter.setFlushOnEveryBlock(true); dataFileWriter.setSyncInterval(32); dataFileWriter.create(schema, avroFile); IntStream.range(0, NUM_RECORDS).forEach(index -> { GenericRecord datum = new GenericData.Record(schema); datum.put(FIELD_INDEX, index); datum.put(FIELD_NAME, String.format("%d_name_%s", index, UUID.randomUUID())); datum.put(FIELD_SURNAME, String.format("%d_surname_%s", index, UUID.randomUUID())); try { OFFSETS_BY_INDEX.put(index, dataFileWriter.sync() - 16L); dataFileWriter.append(datum); } catch (IOException ioe) { throw new RuntimeException(ioe); } }); } Path path = new Path(new Path(fsUri), avroFile.getName()); fs.moveFromLocalFile(new Path(avroFile.getAbsolutePath()), path); return path; }
public static void main(String[] args) throws Exception { Employee employee = Employee.newBuilder().setFirstName("Gaurav") .setLastName("Mazra").setSex(SEX.MALE).build(); byte[] payload; DatumWriter<Employee> datumWriter = new SpecificDatumWriter<>(Employee.class); try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder out = EncoderFactory.get().binaryEncoder(baos, null); datumWriter.write(employee, out ); out.flush(); payload = baos.toByteArray(); } catch (Exception e) { System.err.println(e); throw e; } System.out.println(new String(payload)); System.out.println(payload.length); }
public static void main(String[] args) throws Exception { Node node = new Node(); node.setValue("Gaurav"); node.setNext(node); byte[] payload; DatumWriter<Node> datumWriter = new ReflectDatumWriter<>(Node.class); try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder out = EncoderFactory.get().binaryEncoder(baos, null); datumWriter.write(node, out ); out.flush(); payload = baos.toByteArray(); } catch (Exception e) { System.err.println(e); throw e; } System.out.println(new String(payload)); System.out.println(payload.length); }
public static void main(String[] args) throws Exception { Node node = new Node(); node.setValue("Gaurav"); node.setNext(node); byte[] payload; DatumWriter<Node> datumWriter = new SpecificDatumWriter<>(Node.class); try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder out = EncoderFactory.get().binaryEncoder(baos, null); datumWriter.write(node, out ); out.flush(); payload = baos.toByteArray(); } catch (Exception e) { System.err.println(e); throw e; } System.out.println(new String(payload)); System.out.println(payload.length); }
public void binaryReadWriteExample() throws IOException { Employee employee = Employee.newBuilder().setFirstName("Gaurav") .setLastName("Mazra").setSex(SEX.MALE).build(); DatumWriter<Employee> employeeWriter = new SpecificDatumWriter<>(Employee.class); byte[] data; try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); employeeWriter.write(employee, binaryEncoder); binaryEncoder.flush(); data = baos.toByteArray(); } // serialized data System.out.println(data); DatumReader<Employee> employeeReader = new SpecificDatumReader<>(Employee.class); Decoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, null); employee = employeeReader.read(null, binaryDecoder); //data after deserialization System.out.println(employee); }
/** * tests Avro Serializer */ @Test public void testSerializer() throws Exception { Context context = new Context(); String schemaFile = getClass().getResource("/schema.avsc").getFile(); context.put(ES_AVRO_SCHEMA_FILE, schemaFile); avroSerializer.configure(context); Schema schema = new Schema.Parser().parse(new File(schemaFile)); GenericRecord user = generateGenericRecord(schema); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Encoder encoder = new EncoderFactory().binaryEncoder(outputStream, null); datumWriter.write(user, encoder); encoder.flush(); Event event = EventBuilder.withBody(outputStream.toByteArray()); XContentBuilder expected = generateContentBuilder(); XContentBuilder actual = avroSerializer.serialize(event); JsonParser parser = new JsonParser(); assertEquals(parser.parse(expected.string()), parser.parse(actual.string())); }
@Override public byte[] serialize(String s, WrapperAppMessage wrapperAppMessage) { DatumWriter<WrapperAppMessage> datumWriter = new SpecificDatumWriter<>(wrapperAppMessage.getSchema()); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { datumWriter.write(wrapperAppMessage, binaryEncoder); binaryEncoder.flush();//带缓冲区的binaryEncoder和直接directBinaryEncoder不一样,需要flush一下,否则字节数组没有数据 outputStream.flush(); outputStream.close(); } catch (IOException e) { e.printStackTrace(); } byte[] data = outputStream.toByteArray(); return data; }
/** * 动态序列化:通过动态解析Schema文件进行内容设置,并序列化内容 * * @throws IOException */ public void MemberInfoDynSer() throws IOException { // 1.解析schema文件内容 Parser parser = new Parser(); Schema mSchema = parser.parse(this.getClass().getResourceAsStream("/Members.avsc")); // 2.构建数据写对象 DatumWriter<GenericRecord> mGr = new SpecificDatumWriter<GenericRecord>(mSchema); DataFileWriter<GenericRecord> mDfw = new DataFileWriter<GenericRecord>(mGr); // 3.创建序列化文件 mDfw.create(mSchema, new File("/Users/a/Desktop/tmp/members.avro")); // 4.添加序列化数据 for (int i = 0; i < 20; i++) { GenericRecord gr = new GenericData.Record(mSchema); int r = i * new Random().nextInt(50); gr.put("userName", "light-" + r); gr.put("userPwd", "2016-" + r); gr.put("realName", "滔滔" + r + "号"); mDfw.append(gr); } // 5.关闭数据文件写对象 mDfw.close(); System.out.println("Dyn Builder Ser Start Complete."); }
/** * Serialize the record to prepare for publishing. * * @param record the GenericRecord * @param schema the Avro Schema * @param ggAvroSchema the internal representation of the Avro schema * @return the serialized record * @throws IOException if there is a problem */ private byte[] serializeRecord(GenericRecord record, Schema schema, @SuppressWarnings("unused") AvroSchema ggAvroSchema) throws IOException { byte[] rval; BinaryEncoder encoder = null; // serialize the record into a byte array ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); encoder = EncoderFactory.get().directBinaryEncoder(out, encoder); writer.write(record, encoder); encoder.flush(); rval = out.toByteArray(); //out.close(); // noop in the Apache version, so not bothering return rval; }
static void convertJsonToAvro(InputStream inputStream, OutputStream outputStream, Schema schema) throws IOException { DatumReader<Object> reader = new GenericDatumReader<>(schema); DatumWriter<Object> writer = new GenericDatumWriter<>(schema); Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null); JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(schema, inputStream); Object datum = null; while (true) { try { datum = reader.read(datum, jsonDecoder); } catch (EOFException eofException) { break; } writer.write(datum, binaryEncoder); binaryEncoder.flush(); } outputStream.flush(); }
AvroKeyValueWriter(Schema keySchema, Schema valueSchema, CodecFactory compressionCodec, OutputStream outputStream, int syncInterval) throws IOException { // Create the generic record schema for the key/value pair. mKeyValuePairSchema = AvroKeyValue .getSchema(keySchema, valueSchema); // Create an Avro container file and a writer to it. DatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>( mKeyValuePairSchema); mAvroFileWriter = new DataFileWriter<GenericRecord>( genericDatumWriter); mAvroFileWriter.setCodec(compressionCodec); mAvroFileWriter.setSyncInterval(syncInterval); mAvroFileWriter.create(mKeyValuePairSchema, outputStream); // Create a reusable output record. mOutputRecord = new AvroKeyValue<Object, Object>( new GenericData.Record(mKeyValuePairSchema)); }
/** * Process singlex. * * @throws Exception the exception */ public void processSinglex() throws Exception { int base = (int) System.currentTimeMillis(); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .build(); DatumWriter<GenericRecord> datumWriterUser = new GenericDatumWriter<GenericRecord>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<GenericRecord> datumReaderUser = new GenericDatumReader<GenericRecord>(User.getClassSchema()); GenericRecord genericRecord = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null)); System.out.println(genericRecord); System.out.println(genericRecord.get("name")); }
/** * Process list. * * @param weathers the weathers * @throws Exception the exception */ public void processList(List<Weather> weathers) throws Exception { long before = System.currentTimeMillis(); BinaryEncoder binaryEncoder = null; BinaryDecoder binaryDecoder = null; Weather weatherRead = null; for (Weather weather : weathers) { DatumWriter<Weather> datumWriterWeather = new SpecificDatumWriter<Weather>(Weather.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { binaryEncoder = EncoderFactory.get().binaryEncoder(baos, binaryEncoder); datumWriterWeather.write(weather, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } DatumReader<Weather> datumReaderWeather = new SpecificDatumReader<Weather>(Weather.getClassSchema()); binaryDecoder = DecoderFactory.get().binaryDecoder(byteData, binaryDecoder); weatherRead = datumReaderWeather.read(weatherRead, binaryDecoder); // System.out.println("After Binary Read: " + weatherRead.toString()); } System.out.println("size=" + weathers.size() + ", elapsed: " + (System.currentTimeMillis()-before)); }
/** * Convert a GenericRecord to a byte array. */ public static byte[] recordToByteArray(GenericRecord record) throws IOException { Closer closer = Closer.create(); try { ByteArrayOutputStream out = closer.register(new ByteArrayOutputStream()); Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema()); writer.write(record, encoder); byte[] byteArray = out.toByteArray(); return byteArray; } catch (Throwable t) { throw closer.rethrow(t); } finally { closer.close(); } }
/** * Serialize our Users to disk. */ private void serializing(List<User> listUsers) { long tiempoInicio = System.currentTimeMillis(); // We create a DatumWriter, which converts Java objects into an in-memory serialized format. // The SpecificDatumWriter class is used with generated classes and extracts the schema from the specified generated type. DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); // We create a DataFileWriter, which writes the serialized records, as well as the schema, to the file specified in the dataFileWriter.create call. DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); try { File file = createFile(); dataFileWriter.create(((User) listUsers.get(0)).getSchema(), file); for (User user : listUsers) { // We write our users to the file via calls to the dataFileWriter.append method. dataFileWriter.append(user); } // When we are done writing, we close the data file. dataFileWriter.close(); } catch (IOException e) { e.printStackTrace(); } terminaProceso("serializing", tiempoInicio); }
@Override public byte[] serialize(String topic, GenericRecord data) { ByteArrayOutputStream out = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); Schema schema = data.getSchema(); DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema); try { writer.write(data, encoder); encoder.flush(); out.close(); return out.toByteArray(); } catch (IOException e) { LOG.error("Error encoding Avro record into bytes: {}", e.getMessage()); return null; } }
@Override public byte[] serialise(final Object object) throws SerialisationException { Schema schema = ReflectData.get().getSchema(object.getClass()); DatumWriter<Object> datumWriter = new ReflectDatumWriter<>(schema); DataFileWriter<Object> dataFileWriter = new DataFileWriter<>(datumWriter); ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); try { dataFileWriter.create(schema, byteOut); dataFileWriter.append(object); dataFileWriter.flush(); } catch (final IOException e) { throw new SerialisationException("Unable to serialise given object of class: " + object.getClass().getName(), e); } finally { close(dataFileWriter); } return byteOut.toByteArray(); }
public byte[] compressURI(final URI uri) { Preconditions.checkNotNull(uri); try { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null); final DatumWriter<Object> writer = new GenericDatumWriter<Object>( AvroSchemas.COMPRESSED_IDENTIFIER); this.dictionary.keyFor(uri); // ensure a compressed version of URI is available final Object generic = encodeIdentifier(uri); writer.write(generic, encoder); return stream.toByteArray(); } catch (final IOException ex) { throw new Error("Unexpected exception (!): " + ex.getMessage(), ex); } }
@Test public void test() throws Throwable { System.out.println(AvroSchemas.RECORD.toString()); final Record resource = HBaseTestUtils.getMockResource(); //final Object generic = AvroSerialization.toGenericData(resource); final Object generic = null; final ByteArrayOutputStream jsonStream = new ByteArrayOutputStream(); final ByteArrayOutputStream binaryStream = new ByteArrayOutputStream(); final Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(AvroSchemas.RECORD, jsonStream); final Encoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(binaryStream, null); final DatumWriter<Object> writer = new GenericDatumWriter<Object>(AvroSchemas.RECORD); writer.write(generic, jsonEncoder); writer.write(generic, binaryEncoder); binaryEncoder.flush(); jsonEncoder.flush(); final byte[] bytes = binaryStream.toByteArray(); final String json = new String(jsonStream.toByteArray(), Charsets.UTF_8); System.out.println(bytes.length + " bytes: " + BaseEncoding.base16().encode(bytes)); System.out.println("JSON:\n" + json); }
public byte[] compressURI(final URI uri) { Preconditions.checkNotNull(uri); try { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null); final DatumWriter<Object> writer = new GenericDatumWriter<Object>( Schemas.COMPRESSED_IDENTIFIER); this.dictionary.keyFor(uri); // ensure a compressed version of URI is available final Object generic = encodeIdentifier(uri); writer.write(generic, encoder); return stream.toByteArray(); } catch (final IOException ex) { throw new Error("Unexpected exception (!): " + ex.getMessage(), ex); } }
private DatumWriter<Object> getDatumWriter(Class<Object> type, Schema schema) { DatumWriter<Object> writer; this.logger.debug("Finding correct DatumWriter for type " + type.getName()); if (SpecificRecord.class.isAssignableFrom(type)) { if (schema != null) { writer = new SpecificDatumWriter<>(schema); } else { writer = new SpecificDatumWriter<>(type); } } else if (GenericRecord.class.isAssignableFrom(type)) { writer = new GenericDatumWriter<>(schema); } else { if (schema != null) { writer = new ReflectDatumWriter<>(schema); } else { writer = new ReflectDatumWriter<>(type); } } return writer; }
public static void main(String[] args) throws IOException { // Open data file File file = new File(PATH); if (file.getParentFile() != null) { file.getParentFile().mkdirs(); } DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); dataFileWriter.create(User.SCHEMA$, file); // Create random users User user; Random random = new Random(); for (int i = 0; i < USERS; i++) { user = new User("user", null, COLORS[random.nextInt(COLORS.length)]); dataFileWriter.append(user); System.out.println(user); } dataFileWriter.close(); }
public void generateAvroFile(Schema schema, File file, long recourdCount) throws IOException { DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer); dataFileWriter.create(schema, file); for(long i = 0; i < recourdCount; i++) { GenericRecord datum = new GenericData.Record(schema); datum.put("b", i % 2 == 0); datum.put("s", String.valueOf(i)); datum.put("l", i); datum.put("l100", i % 100); datum.put("s100", String.valueOf(i%100)); dataFileWriter.append(datum); } dataFileWriter.close(); }
private byte[] createAvroData(String name, int age, List<String> emails) throws IOException { String AVRO_SCHEMA = "{\n" +"\"type\": \"record\",\n" +"\"name\": \"Employee\",\n" +"\"fields\": [\n" +" {\"name\": \"name\", \"type\": \"string\"},\n" +" {\"name\": \"age\", \"type\": \"int\"},\n" +" {\"name\": \"emails\", \"type\": {\"type\": \"array\", \"items\": \"string\"}},\n" +" {\"name\": \"boss\", \"type\": [\"Employee\",\"null\"]}\n" +"]}"; Schema schema = new Schema.Parser().parse(AVRO_SCHEMA); ByteArrayOutputStream out = new ByteArrayOutputStream(); GenericRecord e1 = new GenericData.Record(schema); e1.put("name", name); e1.put("age", age); e1.put("emails", emails); e1.put("boss", null); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); DataFileWriter<GenericRecord>dataFileWriter = new DataFileWriter<>(datumWriter); dataFileWriter.create(schema, out); dataFileWriter.append(e1); dataFileWriter.close(); return out.toByteArray(); }
@SuppressWarnings("unchecked") public static synchronized <T> void sendMessageToKafka(T message, Class<T> clazz) { try { DatumWriter datumWriter; if (datumWriterPool.containsKey(clazz)) datumWriter = datumWriterPool.get(clazz); else { datumWriter = new SpecificDatumWriter<>(clazz); datumWriterPool.put(clazz, datumWriter); } datumWriter.write(message, encoder); encoder.flush(); avroProducer.product(os.toByteArray()); os.reset(); } catch (Exception e) { _log.error(errorInfo(e)); } }
/** * Initializes the Appender. */ public void init() throws IOException { FileSystem fs = path.getFileSystem(conf); FSDataOutputStream outputStream = fs.create(path, false); avroSchema = AvroUtil.getAvroSchema(meta, conf); avroFields = avroSchema.getFields(); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(avroSchema); dataFileWriter = new DataFileWriter<>(datumWriter); dataFileWriter.create(avroSchema, outputStream); if (tableStatsEnabled) { this.stats = new TableStatistics(schema, columnStatsEnabled); } super.init(); }
/** * Initializes the Appender. */ public void init() throws IOException { FileSystem fs = path.getFileSystem(conf); if (!fs.exists(path.getParent())) { throw new FileNotFoundException(path.toString()); } FSDataOutputStream outputStream = fs.create(path); avroSchema = AvroUtil.getAvroSchema(meta, conf); avroFields = avroSchema.getFields(); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(avroSchema); dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); dataFileWriter.create(avroSchema, outputStream); if (enabledStats) { this.stats = new TableStatistics(schema); } super.init(); }
public byte[] serialize(String topic, GenericRecord data) throws SerializationException { Schema schema = data.getSchema(); MD5Digest schemaId = null; try { schemaId = schemaRegistry.register(topic, schema); ByteArrayOutputStream out = new ByteArrayOutputStream(); // MAGIC_BYTE | schemaId-bytes | avro_payload out.write(LiAvroSerDeHelper.MAGIC_BYTE); out.write(schemaId.asBytes()); BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null); DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema); writer.write(data, encoder); encoder.flush(); byte[] bytes = out.toByteArray(); out.close(); return bytes; } catch (IOException | SchemaRegistryException e) { throw new SerializationException(e); } }