@Override public Object write( final Object obj ) throws IOException{ GenericRecord record = new GenericData.Record( avroSchema ); if( ! ( obj instanceof Map ) ){ return record; } Map<Object,Object> mapObj = (Map<Object,Object>)obj; for( KeyAndFormatter childFormatter : childContainer ){ childFormatter.clear(); record.put( childFormatter.getName() , childFormatter.get( mapObj ) ); } return record; }
/** * Reads in binary Avro-encoded entities using the schema stored in the file * and prints them out. */ public static void readAvroFile(File file) throws IOException { GenericDatumReader datum = new GenericDatumReader(); DataFileReader reader = new DataFileReader(file, datum); GenericData.Record record = new GenericData.Record(reader.getSchema()); while (reader.hasNext()) { reader.next(record); System.out.println("Name " + record.get("name") + " on " + record.get("Meetup_date") + " attending " + record.get("going") + " organized by " + record.get("organizer") + " on " + record.get("topics")); } reader.close(); }
/** * update processor class, method and db writer for each topic */ public void updateTopicProcessor() { for (String topic : _topics.keySet()) { try { // get the processor class and method final Class processorClass = Class.forName(_topics.get(topic).processor); _topicProcessorClass.put(topic, processorClass.newInstance()); final Method method = processorClass.getDeclaredMethod("process", GenericData.Record.class, String.class); _topicProcessorMethod.put(topic, method); // get the database writer final DatabaseWriter dw = new DatabaseWriter(JdbcUtil.wherehowsJdbcTemplate, _topics.get(topic).dbTable); _topicDbWriter.put(topic, dw); } catch (Exception e) { Logger.error("Fail to create Processor for topic: " + topic, e); _topicProcessorClass.remove(topic); _topicProcessorMethod.remove(topic); _topicDbWriter.remove(topic); } } }
/** * Process a Gobblin tracking event audit record * @param record * @param topic * @return null * @throws Exception */ public Record process(GenericData.Record record, String topic) throws Exception { if (record != null && record.get("name") != null) { final String name = record.get("name").toString(); // only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor" if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) || name.equals(DALI_AUTOPURGED_AUDITOR) || name.equals(DS_IGNORE_IDPC_AUDITOR)) { Long timestamp = (Long) record.get("timestamp"); Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata")); String hasError = metadata.get("HasError"); if (!hasError.equalsIgnoreCase("true")) { String datasetPath = metadata.get("DatasetPath"); String datasetUrn = DATASET_URN_PREFIX + (datasetPath.startsWith("/") ? "" : "/") + datasetPath; String ownerUrns = metadata.get("OwnerURNs"); DatasetInfoDao.updateKafkaDatasetOwner(datasetUrn, ownerUrns, DATASET_OWNER_SOURCE, timestamp); } } } return null; }
private static Path createDataFile() throws IOException { File avroFile = File.createTempFile("test-", "." + FILE_EXTENSION); DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema); try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer)) { dataFileWriter.setFlushOnEveryBlock(true); dataFileWriter.setSyncInterval(32); dataFileWriter.create(schema, avroFile); IntStream.range(0, NUM_RECORDS).forEach(index -> { GenericRecord datum = new GenericData.Record(schema); datum.put(FIELD_INDEX, index); datum.put(FIELD_NAME, String.format("%d_name_%s", index, UUID.randomUUID())); datum.put(FIELD_SURNAME, String.format("%d_surname_%s", index, UUID.randomUUID())); try { OFFSETS_BY_INDEX.put(index, dataFileWriter.sync() - 16L); dataFileWriter.append(datum); } catch (IOException ioe) { throw new RuntimeException(ioe); } }); } Path path = new Path(new Path(fsUri), avroFile.getName()); fs.moveFromLocalFile(new Path(avroFile.getAbsolutePath()), path); return path; }
public void validateAvroFile(File file) throws IOException { // read the events back using GenericRecord DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(); DataFileReader<GenericRecord> fileReader = new DataFileReader<GenericRecord>(file, reader); GenericRecord record = new GenericData.Record(fileReader.getSchema()); int numEvents = 0; while (fileReader.hasNext()) { fileReader.next(record); ByteBuffer body = (ByteBuffer) record.get("body"); CharsetDecoder decoder = Charsets.UTF_8.newDecoder(); String bodyStr = decoder.decode(body).toString(); System.out.println(bodyStr); numEvents++; } fileReader.close(); Assert.assertEquals("Should have found a total of 3 events", 3, numEvents); }
@Test public void testIncompatibleSchemas() throws EventDeliveryException { final DatasetSink sink = sink(in, config); GenericRecordBuilder builder = new GenericRecordBuilder( INCOMPATIBLE_SCHEMA); GenericData.Record rec = builder.set("username", "koala").build(); putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, null, false)); // run the sink sink.start(); assertThrows("Should fail", EventDeliveryException.class, new Callable() { @Override public Object call() throws EventDeliveryException { sink.process(); return null; } }); sink.stop(); Assert.assertEquals("Should have rolled back", expected.size() + 1, remaining(in)); }
/** * Create a data file that gets exported to the db. * @param fileNum the number of the file (for multi-file export) * @param numRecords how many records to write to the file. */ protected void createParquetFile(int fileNum, int numRecords, ColumnGenerator... extraCols) throws IOException { String uri = "dataset:file:" + getTablePath(); Schema schema = buildSchema(extraCols); DatasetDescriptor descriptor = new DatasetDescriptor.Builder() .schema(schema) .format(Formats.PARQUET) .build(); Dataset dataset = Datasets.create(uri, descriptor); DatasetWriter writer = dataset.newWriter(); try { for (int i = 0; i < numRecords; i++) { GenericRecord record = new GenericData.Record(schema); record.put("id", i); record.put("msg", getMsgPrefix() + i); addExtraColumns(record, i, extraCols); writer.write(record); } } finally { writer.close(); } }
public void testParquetRecordsNotSupported() throws IOException, SQLException { String[] argv = {}; final int TOTAL_RECORDS = 1; Schema schema = Schema.createRecord("nestedrecord", null, null, false); schema.setFields(Lists.newArrayList(buildField("myint", Schema.Type.INT))); GenericRecord record = new GenericData.Record(schema); record.put("myint", 100); // DB type is not used so can be anything: ColumnGenerator gen = colGenerator(record, schema, null, "VARCHAR(64)"); createParquetFile(0, TOTAL_RECORDS, gen); createTable(gen); try { runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); fail("Parquet records can not be exported."); } catch (Exception e) { // expected assertTrue(true); } }
public void testAvroRecordsNotSupported() throws IOException, SQLException { String[] argv = {}; final int TOTAL_RECORDS = 1; Schema schema = Schema.createRecord("nestedrecord", null, null, false); schema.setFields(Lists.newArrayList(buildAvroField("myint", Schema.Type.INT))); GenericRecord record = new GenericData.Record(schema); record.put("myint", 100); // DB type is not used so can be anything: ColumnGenerator gen = colGenerator(record, schema, null, "VARCHAR(64)"); createAvroFile(0, TOTAL_RECORDS, gen); createTable(gen); try { runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); fail("Avro records can not be exported."); } catch (Exception e) { // expected assertTrue(true); } }
private GenericRecord convertToAvroRecord(Schema avroRecordSchema, Object[] values) { // TODO can be improve to create once and reuse GenericRecord avroRec = new GenericData.Record(avroRecordSchema); List<ColumnConverterDescriptor> columnConverters = converterDescriptor.getColumnConverters(); if (values.length != columnConverters.size()) { // mismatch schema // TODO better exception throw new RuntimeException("Expecting " + columnConverters.size() + " fields, received " + values.length + " values"); } for (int i = 0; i < values.length; i++) { Object value = values[i]; ColumnConverterDescriptor columnConverterDescriptor = columnConverters.get(i); Object valueToWrite = columnConverterDescriptor.getWritable(value); avroRec.put(columnConverterDescriptor.getColumnName(), valueToWrite); } return avroRec; }
/** * 将avro格式的数据写入到parquet文件中 * * @param parquetPath */ public void write(String parquetPath) { Schema.Parser parser = new Schema.Parser(); try { Schema schema = parser.parse(AvroParquetOperation.class.getClassLoader().getResourceAsStream("StringPair.avsc")); GenericRecord datum = new GenericData.Record(schema); datum.put("left", "L"); datum.put("right", "R"); Path path = new Path(parquetPath); System.out.println(path); AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(path, schema); writer.write(datum); writer.close(); } catch (IOException e) { e.printStackTrace(); } }
/** * 进行必要的业务处理 * * @param transceiver * @throws IOException */ private void bussinessDeal(Transceiver transceiver) throws IOException { // 2.获取协议 Protocol protocol = Protocol.parse(this.getClass().getResourceAsStream("/Members.avpr")); // 3.根据协议和通讯构造请求对象 GenericRequestor requestor = new GenericRequestor(protocol, transceiver); // 4.根据schema获取messages主节点内容 GenericRecord loginGr = new GenericData.Record(protocol.getMessages().get("login").getRequest()); // 5.在根据协议里面获取request中的schema GenericRecord mGr = new GenericData.Record(protocol.getType("Members")); // 6.设置request中的请求数据 mGr.put("userName", "rita"); mGr.put("userPwd", "123456"); // 7、把二级内容加入到一级message的主节点中 loginGr.put("m", mGr); // 8.设置完毕后,请求方法,正式发送访问请求信息,并得到响应内容 Object retObj = requestor.request("login", loginGr); // 9.进行解析操作 GenericRecord upGr = (GenericRecord) retObj; System.out.println(upGr.get("msg")); }
/** * 动态序列化:通过动态解析Schema文件进行内容设置,并序列化内容 * * @throws IOException */ public void MemberInfoDynSer() throws IOException { // 1.解析schema文件内容 Parser parser = new Parser(); Schema mSchema = parser.parse(this.getClass().getResourceAsStream("/Members.avsc")); // 2.构建数据写对象 DatumWriter<GenericRecord> mGr = new SpecificDatumWriter<GenericRecord>(mSchema); DataFileWriter<GenericRecord> mDfw = new DataFileWriter<GenericRecord>(mGr); // 3.创建序列化文件 mDfw.create(mSchema, new File("/Users/a/Desktop/tmp/members.avro")); // 4.添加序列化数据 for (int i = 0; i < 20; i++) { GenericRecord gr = new GenericData.Record(mSchema); int r = i * new Random().nextInt(50); gr.put("userName", "light-" + r); gr.put("userPwd", "2016-" + r); gr.put("realName", "滔滔" + r + "号"); mDfw.append(gr); } // 5.关闭数据文件写对象 mDfw.close(); System.out.println("Dyn Builder Ser Start Complete."); }
/** * * @param t * @return */ @Override public byte[] serialize(Tuple4<String, String, String, String> t) { if (!initialized) { parser = new Schema.Parser(); schema = parser.parse(schemaJson); recordInjection = GenericAvroCodecs.toBinary(schema); initialized = true; } GenericData.Record avroRecord = new GenericData.Record(schema); for (int i = 0; i < t.getArity() - 1; i += 2) { avroRecord.put(t.getField(i).toString(), t.getField(i + 1).toString()); } byte[] bytes = recordInjection.apply(avroRecord); return bytes; }
public static Object get(String fieldName, GenericData.Record record, Object defaultValue) { Schema decodedWithSchema = record.getSchema(); Optional<Schema.Field> field = decodedWithSchema.getFields().stream() .filter(i -> i.name().equals(fieldName) || i.aliases().contains(fieldName)) .findFirst(); if(field.isPresent()) { return record.get(field.get().pos()); } else { return defaultValue; } }
public Record serialize(AdvancedEmployee employee) { Record record = new Record(schema); AvroUtils.put("name", employee.getName(), record); AvroUtils.put("age", employee.getAge(), record); AvroUtils.put("gender", employee.getGender(), record); int numberOfEmails = (employee.getMails() != null) ? employee.getMails().size() : 0; GenericData.Array<Utf8> emails = new GenericData.Array<>(numberOfEmails, schema.getField("emails").schema()); for(int i = 0; i < numberOfEmails; ++i) { emails.add(new Utf8(employee.getMails().get(i))); } record.put("emails", emails); return record; }
@Test public void testSpecificSerializedGenericDeserialized() throws Exception { Map<String, Object> config = new HashMap<>(); config.put(AvroSnapshotDeserializer.SPECIFIC_AVRO_READER, false); KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient); kafkaAvroDeserializer.configure(config, false); KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient); kafkaAvroSerializer.configure(config, false); TestRecord record = new TestRecord(); record.setField1("some value"); record.setField1("some other value"); byte[] bytes = kafkaAvroSerializer.serialize("topic" , record); Object o = kafkaAvroDeserializer.deserialize("topic", bytes); Assert.assertEquals(o.getClass(), GenericData.Record.class); GenericRecord result = (GenericRecord) o; Assert.assertEquals(record.getField1(), result.get("field1")); Assert.assertEquals(record.getField2(), result.get("field2")); }
@Test public void genericEncoderV1GenericDecoderV1() throws Exception{ Schema schema = load("users_v1.schema"); SchemaRegistryClient client = mock(SchemaRegistryClient.class); AvroCodec codec = new AvroCodec(); codec.setSchemaRegistryClient(client); when(client.register(any())).thenReturn(1); when(client.fetch(eq(1))).thenReturn(schema); GenericRecord record = new GenericData.Record(schema); record.put("name","joe"); record.put("favoriteNumber",42); record.put("favoriteColor","blue"); byte[] results = codec.encode(record); GenericRecord decoded = codec.decode(results,GenericRecord.class); Assert.assertEquals(record.get("name").toString(),decoded.get("name").toString()); }
@Test public void genericEncoderV2GenericDecoderV2() throws Exception{ Schema schema = load("users_v2.schema"); SchemaRegistryClient client = mock(SchemaRegistryClient.class); AvroCodec codec = new AvroCodec(); codec.setSchemaRegistryClient(client); when(client.register(any())).thenReturn(2); when(client.fetch(eq(2))).thenReturn(schema); GenericRecord record = new GenericData.Record(schema); record.put("name","joe"); record.put("favoriteNumber",42); record.put("favoriteColor","blue"); record.put("favoritePlace","Paris"); byte[] results = codec.encode(record); GenericRecord decoded = codec.decode(results,GenericRecord.class); Assert.assertEquals(record.get("favoritePlace").toString(),decoded.get("favoritePlace").toString()); }
@Test public void genericEncoderV2GenericDecoderV1() throws Exception{ Schema reader = load("users_v1.schema"); Schema writer = load("users_v2.schema"); SchemaRegistryClient client = mock(SchemaRegistryClient.class); AvroCodec codec = new AvroCodec(); codec.setReaderSchema(reader); codec.setSchemaRegistryClient(client); when(client.register(any())).thenReturn(2); when(client.fetch(eq(2))).thenReturn(writer); GenericRecord record = new GenericData.Record(writer); record.put("name","joe"); record.put("favoriteNumber",42); record.put("favoriteColor","blue"); record.put("favoritePlace","Paris"); byte[] results = codec.encode(record); GenericRecord decoded = codec.decode(results,GenericRecord.class); Assert.assertEquals(record.get("name").toString(),decoded.get("name").toString()); }
@Test public void genericEncoderV1GenericDecoderV2() throws Exception{ Schema reader = load("users_v2.schema"); Schema writer = load("users_v1.schema"); SchemaRegistryClient client = mock(SchemaRegistryClient.class); AvroCodec codec = new AvroCodec(); codec.setReaderSchema(reader); codec.setSchemaRegistryClient(client); when(client.register(any())).thenReturn(2); when(client.fetch(eq(2))).thenReturn(writer); GenericRecord record = new GenericData.Record(writer); record.put("name","joe"); record.put("favoriteNumber",42); record.put("favoriteColor","blue"); byte[] results = codec.encode(record); GenericRecord decoded = codec.decode(results,GenericRecord.class); Assert.assertEquals(record.get("name").toString(),decoded.get("name").toString()); Assert.assertEquals("NYC",decoded.get("favoritePlace").toString()); }
@Test public void genericEncoderV1SpecificDecoderV1() throws Exception{ Schema schema = load("users_v1.schema"); SchemaRegistryClient client = mock(SchemaRegistryClient.class); AvroCodec codec = new AvroCodec(); codec.setSchemaRegistryClient(client); when(client.register(any())).thenReturn(1); when(client.fetch(eq(1))).thenReturn(schema); GenericRecord record = new GenericData.Record(schema); record.put("name","joe"); record.put("favoriteNumber",42); record.put("favoriteColor","blue"); byte[] results = codec.encode(record); User decoded = codec.decode(results,User.class); Assert.assertEquals(record.get("name").toString(),decoded.getName().toString()); }
@Test public void genericEncoderReflectDecoder() throws Exception{ SchemaRegistryClient client = mock(SchemaRegistryClient.class); Schema schema = load("status.avsc"); when(client.register(any())).thenReturn(10); when(client.fetch(eq(10))).thenReturn(schema); AvroCodec codec = new AvroCodec(); codec.setSchemaRegistryClient(client); codec.setResolver(new PathMatchingResourcePatternResolver(new AnnotationConfigApplicationContext())); codec.setProperties(new AvroCodecProperties()); codec.init(); GenericRecord record = new GenericData.Record(schema); record.put("id","1"); record.put("text","sample"); record.put("timestamp",System.currentTimeMillis()); byte[] results = codec.encode(record); Status status = codec.decode(results,Status.class); Assert.assertEquals(record.get("id").toString(),status.getId()); }
@Test public void testGenericRecordEncoding() throws Exception { String schemaString = "{\"namespace\": \"example.avro\",\n" + " \"type\": \"record\",\n" + " \"name\": \"User\",\n" + " \"fields\": [\n" + " {\"name\": \"name\", \"type\": \"string\"},\n" + " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" + " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" + " ]\n" + "}"; Schema schema = (new Schema.Parser()).parse(schemaString); GenericRecord before = new GenericData.Record(schema); before.put("name", "Bob"); before.put("favorite_number", 256); // Leave favorite_color null AvroCoder<GenericRecord> coder = AvroCoder.of(GenericRecord.class, schema); CoderProperties.coderDecodeEncodeEqual(coder, before); Assert.assertEquals(schema, coder.getSchema()); }
@Test public void testGeneric() throws Exception { Schema schema = new Schema.Parser().parse(Resources.getResource("person.avsc").openStream()); GenericRecord savedRecord = new GenericData.Record(schema); savedRecord.put("name", "John Doe"); savedRecord.put("age", 42); savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); populateGenericFile(Lists.newArrayList(savedRecord), schema); PCollection<GenericRecord> input = pipeline.apply( AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath())); input.apply( AvroIO.writeGenericRecords(schema) .to(outputFile.getAbsolutePath())); pipeline.run(); List<GenericRecord> records = readGenericFile(); assertEquals(Lists.newArrayList(savedRecord), records); }
AvroKeyValueWriter(Schema keySchema, Schema valueSchema, CodecFactory compressionCodec, OutputStream outputStream, int syncInterval) throws IOException { // Create the generic record schema for the key/value pair. mKeyValuePairSchema = AvroKeyValue .getSchema(keySchema, valueSchema); // Create an Avro container file and a writer to it. DatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>( mKeyValuePairSchema); mAvroFileWriter = new DataFileWriter<GenericRecord>( genericDatumWriter); mAvroFileWriter.setCodec(compressionCodec); mAvroFileWriter.setSyncInterval(syncInterval); mAvroFileWriter.create(mKeyValuePairSchema, outputStream); // Create a reusable output record. mOutputRecord = new AvroKeyValue<Object, Object>( new GenericData.Record(mKeyValuePairSchema)); }
@Override public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) { if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(type) || org.apache.avro.generic.GenericData.Record.class.isAssignableFrom(type)) { // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type // because Kryo is not able to serialize them properly, we use this serializer for them reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class); // We register this serializer for users who want to use untyped Avro records (GenericData.Record). // Kryo is able to serialize everything in there, except for the Schema. // This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea. // we add the serializer as a default serializer because Avro is using a private sub-type at runtime. reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class); } }
private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) { final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>(); // register Avro types. registrations.put( GenericData.Array.class.getName(), new KryoRegistration( GenericData.Array.class, new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList()))); registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class)); registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class)); registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class)); registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class)); // register the serialized data type registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType)); return registrations; }
/** * Reads in binary Avro-encoded entities using a schema that is different * from the writer's schema. * */ public static void readWithDifferentSchema(File file, Schema newSchema) throws IOException { GenericDatumReader datum = new GenericDatumReader(newSchema); DataFileReader reader = new DataFileReader(file, datum); GenericData.Record record = new GenericData.Record(newSchema); while (reader.hasNext()) { reader.next(record); System.out.println("Name " + record.get("name") + " on " + record.get("Meetup_date") + " attending " + record.get("attendance") + " organized by " + record.get("organizer") + " at " + record.get("location")); } reader.close(); }
@Before public void initializeRecord() { // Create a record with a given JSON schema. GenericRecord record = new GenericData.Record(new Schema.Parser().parse(BILLING_EVENT_SCHEMA)); record.put("id", "1"); record.put("billingTime", 1508835963000000L); record.put("eventTime", 1484870383000000L); record.put("registrarId", "myRegistrar"); record.put("billingId", "12345-CRRHELLO"); record.put("tld", "test"); record.put("action", "RENEW"); record.put("domain", "example.test"); record.put("repositoryId", "123456"); record.put("years", 5); record.put("currency", "USD"); record.put("amount", 20.5); record.put("flags", "AUTO_RENEW SYNTHETIC"); schemaAndRecord = new SchemaAndRecord(record, null); }
private void marshalAndUnmarshalGeneric(String inURI, String outURI) throws InterruptedException { GenericRecord input = new GenericData.Record(schema); input.put("name", "ceposta"); MockEndpoint mock = getMockEndpoint("mock:reverse"); mock.expectedMessageCount(1); mock.message(0).body().isInstanceOf(GenericRecord.class); mock.message(0).body().isEqualTo(input); Object marshalled = template.requestBody(inURI, input); template.sendBody(outURI, marshalled); mock.assertIsSatisfied(); GenericRecord output = mock.getReceivedExchanges().get(0).getIn().getBody(GenericRecord.class); assertEquals(input, output); }
@Override public byte[] serialize(final String topic, final GenericRow genericRow) { if (genericRow == null) { return null; } try { GenericRecord avroRecord = new GenericData.Record(avroSchema); for (int i = 0; i < genericRow.getColumns().size(); i++) { if (fields.get(i).schema().getType() == Schema.Type.ARRAY) { avroRecord.put(fields.get(i).name(), Arrays.asList((Object[]) genericRow.getColumns().get(i))); } else { avroRecord.put(fields.get(i).name(), genericRow.getColumns().get(i)); } } return kafkaAvroSerializer.serialize(topic, avroRecord); } catch (Exception e) { throw new SerializationException(e); } }
@Before public void before() { Schema.Parser parser = new Schema.Parser(); avroSchema = parser.parse(schemaStr); genericRecord = new GenericData.Record(avroSchema); genericRecord.put("orderTime", 1511897796092L); genericRecord.put("orderId", 1L); genericRecord.put("itemId", "item_1"); genericRecord.put("orderUnits", 10.0); genericRecord.put("arrayCol", new GenericData.Array(Schema.createArray( Schema.create(Schema.Type.DOUBLE)), Collections.singletonList(100.0))); genericRecord.put("mapCol", Collections.singletonMap("key1", 100.0)); schema = SchemaBuilder.struct() .field("ordertime".toUpperCase(), org.apache.kafka.connect.data.Schema.INT64_SCHEMA) .field("orderid".toUpperCase(), org.apache.kafka.connect.data.Schema.INT64_SCHEMA) .field("itemid".toUpperCase(), org.apache.kafka.connect.data.Schema.STRING_SCHEMA) .field("orderunits".toUpperCase(), org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA) .field("arraycol".toUpperCase(), SchemaBuilder.array(org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA)) .field("mapcol".toUpperCase(), SchemaBuilder.map(org.apache.kafka.connect.data.Schema.STRING_SCHEMA, org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA)) .build(); }
private byte[] getSerializedRow(String topicName, SchemaRegistryClient schemaRegistryClient, Schema rowAvroSchema, GenericRow genericRow) { Map map = new HashMap(); // Automatically register the schema in the Schema Registry if it has not been registered. map.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, true); map.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, ""); KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient, map); GenericRecord avroRecord = new GenericData.Record(rowAvroSchema); List<Schema.Field> fields = rowAvroSchema.getFields(); for (int i = 0; i < genericRow.getColumns().size(); i++) { if (fields.get(i).schema().getType() == Schema.Type.ARRAY) { avroRecord.put(fields.get(i).name(), Arrays.asList((Object[]) genericRow.getColumns().get(i))); } else { avroRecord.put(fields.get(i).name(), genericRow.getColumns().get(i)); } } return kafkaAvroSerializer.serialize(topicName, avroRecord); }
public static void writeToAvro(File inputFile, OutputStream outputStream) throws IOException { DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>( new GenericDatumWriter<GenericRecord>()); writer.setCodec(CodecFactory.snappyCodec()); writer.create(SCHEMA, outputStream); for (Stock stock : AvroStockUtils.fromCsvFile(inputFile)) { AvroKeyValue<CharSequence, Stock> record = new AvroKeyValue<CharSequence, Stock>(new GenericData.Record(SCHEMA)); record.setKey(stock.getSymbol()); record.setValue(stock); writer.append(record.get()); } IOUtils.closeStream(writer); IOUtils.closeStream(outputStream); }