@Test public void publish_shouldThrowAnEncodeTransportExceptionForAnInvalidTransportMessage() throws Exception { // expect exception.expect(EncodeMessageContentException.class); exception.expectCause(IsInstanceOf.<Throwable>instanceOf(ClassCastException.class)); exception.expectMessage("Failed to publish message: Failed to encode message content"); // given Context context = mock(Context.class); when(context.getSchema()) .thenReturn(new Schema.Parser().parse("{\"name\":\"test\",\"type\":\"record\",\"fields\":[]}")); when(context.getDataBuffer()).thenReturn(ByteBuffer.wrap("{}".getBytes())); GenericRecordBuilder builder = new GenericRecordBuilder(schema); builder.set("testString", Boolean.TRUE); Record record = builder.build(); // when publisher.publish(context, record); }
@Test public void publish_shouldThrowAnEncodeTransportExceptionForAnInvalidTransportContext() throws Exception { // expect exception.expect(EncodeTransportException.class); exception.expectCause(IsInstanceOf.<Throwable>instanceOf(NullPointerException.class)); exception.expectMessage("Failed to publish message: Failed to encode transport"); // given Context context = mock(Context.class); when(context.getSchema()) .thenReturn(new Schema.Parser().parse("{\"name\":\"test\",\"type\":\"record\",\"fields\":[]}")); when(context.getDataBuffer()).thenReturn(null); GenericRecordBuilder builder = new GenericRecordBuilder(schema); builder.set("testString", "testString"); Record record = builder.build(); // when publisher.publish(context, record); }
@Test public void publish_shouldPublishTheMessage() throws Exception { // given Context context = mock(Context.class); when(context.getSchema()) .thenReturn(new Schema.Parser().parse("{\"name\":\"test\",\"type\":\"record\",\"fields\":[]}")); when(context.getDataBuffer()).thenReturn(ByteBuffer.wrap("{}".getBytes())); GenericRecordBuilder builder = new GenericRecordBuilder(schema); builder.set("testString", "test"); Record record = builder.build(); // when publisher.publish(context, record); }
@Test public void testIncompatibleSchemas() throws EventDeliveryException { final DatasetSink sink = sink(in, config); GenericRecordBuilder builder = new GenericRecordBuilder( INCOMPATIBLE_SCHEMA); GenericData.Record rec = builder.set("username", "koala").build(); putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, null, false)); // run the sink sink.start(); assertThrows("Should fail", EventDeliveryException.class, new Callable() { @Override public Object call() throws EventDeliveryException { sink.process(); return null; } }); sink.stop(); Assert.assertEquals("Should have rolled back", expected.size() + 1, remaining(in)); }
@Test public void testConfluentSerDes() throws Exception { org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(GENERIC_TEST_RECORD_SCHEMA); GenericRecord record = new GenericRecordBuilder(schema).set("field1", "some value").set("field2", "some other value").build(); Map<String, Object> config = new HashMap<>(); config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, rootTarget.getUri().toString()); KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(); kafkaAvroSerializer.configure(config, false); byte[] bytes = kafkaAvroSerializer.serialize("topic", record); KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); kafkaAvroDeserializer.configure(config, false); GenericRecord result = (GenericRecord) kafkaAvroDeserializer.deserialize("topic", bytes); LOG.info(result.toString()); }
private void writeRowsHelper(List<TableRow> rows, Schema avroSchema, String destinationPattern, int shard) throws IOException { String filename = destinationPattern.replace("*", String.format("%012d", shard)); try (WritableByteChannel channel = FileSystems.create( FileSystems.matchNewResource(filename, false /* isDirectory */), MimeTypes.BINARY); DataFileWriter<GenericRecord> tableRowWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(avroSchema)) .create(avroSchema, Channels.newOutputStream(channel))) { for (Map<String, Object> record : rows) { GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(avroSchema); for (Map.Entry<String, Object> field : record.entrySet()) { genericRecordBuilder.set(field.getKey(), field.getValue()); } tableRowWriter.append(genericRecordBuilder.build()); } } catch (IOException e) { throw new IllegalStateException( String.format("Could not create destination for extract job %s", filename), e); } }
/** * Parse the entity from the body in JSON of the given event. * * @param event * The event to parse. * @param reuse * If non-null, this may be reused and returned from this method. * @return The parsed entity as a GenericRecord. * @throws EventDeliveryException * A recoverable error such as an error downloading the schema * from the URL has occurred. * @throws NonRecoverableEventException * A non-recoverable error such as an unparsable schema or * entity has occurred. */ @Override public GenericRecord parse(Event event, GenericRecord reuse) throws EventDeliveryException, NonRecoverableEventException { JsonObject parser = new JsonParser().parse(new String(event.getBody())).getAsJsonObject(); GenericRecordBuilder recordBuilder = new GenericRecordBuilder(datasetSchema); for (Field field:datasetSchema.getFields()) { String at_header = field.getProp(FIELD_AT_HEADER_PROPERTY); if(at_header != null && at_header.equals(Boolean.TRUE.toString())){ recordBuilder.set(field.name(), event.getHeaders().get(field.name())); }else{ JsonElement element = parser.get(field.name()); recordBuilder.set(field.name(), getElementAsType(field.schema(), element)); } } return recordBuilder.build(); }
@Test public void testDefaultValues() throws Exception { Schema schema = Defaults.SCHEMA$; GenericRecordBuilder builder = new GenericRecordBuilder(schema).set("id", "1234"); Record record1 = builder.build(); String json = "{\"id\": \"1234\"}"; BSONObject object = (BSONObject) JSON.parse(json); Record record2 = RecordConverter.toRecord(schema, object, getClass().getClassLoader()); assertThat(record2, is(record1)); assertThat(AvroHelper.toSimpleJson(schema, record2), is(AvroHelper.toSimpleJson(schema, record1))); assertEquals(record2.get("id"), "1234"); assertNull(record2.get("s")); assertTrue((Boolean) record2.get("b")); assertEquals(((Record) record2.get("r")).get("f"), "value"); assertEquals(((Record) record2.get("r")).get("l"), 1234l); }
@Test public void testArrays() throws Exception { Schema schema = Arrays.SCHEMA$; GenericRecordBuilder builder = new GenericRecordBuilder(schema); builder.set("arrays", ImmutableList.of(ImmutableList.of(ImmutableList.of(1, 2, 3), ImmutableList.of()), ImmutableList.of(ImmutableList.of(4), ImmutableList.of()), ImmutableList.of(ImmutableList.of()))); Record record1 = builder.build(); String json = "{\"arrays\": [[[1, 2, 3], []], [[4], []], [[]]]}"; BSONObject object = (BSONObject) JSON.parse(json); Record record2 = RecordConverter.toRecord(schema, object, getClass().getClassLoader()); assertThat(record2, is(record1)); assertThat(AvroHelper.toSimpleJson(schema, record2), is(AvroHelper.toSimpleJson(schema, record1))); }
@Test public void testMaps() throws Exception { Schema schema = Maps.SCHEMA$; GenericRecordBuilder builder = new GenericRecordBuilder(schema); builder.set("maps", ImmutableMap.of("key1", ImmutableMap.of("value1", 1, "value2", 2), "key2", ImmutableMap.of(), "key3", ImmutableMap.of("value3", 3))); Record record1 = builder.build(); String json = "{\"maps\": {\"key1\": {\"value1\": 1, \"value2\": 2}, \"key2\": {}, \"key3\": {\"value3\": 3}}}"; DBObject object = (DBObject) JSON.parse(json); Record record2 = RecordConverter.toRecord(schema, object, getClass().getClassLoader()); // Convert into JsonNode before comparison, so the maps equal even if keys are reordered. assertThat(JSON.parse(AvroHelper.toSimpleJson(schema, record2)), is(JSON.parse(AvroHelper.toSimpleJson(schema, record1)))); }
@Test public void testCopyFieldsValues() { Schema intSchema = SchemaBuilder.record("intSchema") .fields() .name("a").type().intType().noDefault() .endRecord(); GenericRecord intRecord = new GenericRecordBuilder(intSchema) .set("a", 1) .build(); Schema stringSchema = SchemaBuilder.record("intSchema") .fields() .name("a").type().stringType().noDefault() .endRecord(); GenericRecordBuilder stringRecordBuilder = new GenericRecordBuilder(stringSchema) .set("a", "s"); TypeConverterUtils.copyFieldsValues(intRecord, stringRecordBuilder); GenericRecord stringRecord = stringRecordBuilder.build(); Assert.assertEquals(intRecord.get("a"), stringRecord.get("a")); }
static File generateAvroPrimitiveTypes(File parentDir, String filename, int nrows, Date date) throws IOException { File f = new File(parentDir, filename); Schema schema = new Schema.Parser().parse(Resources.getResource("PrimitiveAvro.avsc").openStream()); AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(new Path(f.getPath()), schema); try { DateFormat format = new SimpleDateFormat("yy-MMM-dd:hh.mm.ss.SSS aaa"); for (int i = 0; i < nrows; i++) { GenericData.Record record = new GenericRecordBuilder(schema) .set("mynull", null) .set("myboolean", i % 2 == 0) .set("myint", 1 + i) .set("mylong", 2L + i) .set("myfloat", 3.1f + i) .set("mydouble", 4.1 + i) .set("mydate", format.format(new Date(date.getTime() - (i * 1000 * 3600)))) .set("myuuid", UUID.randomUUID()) .set("mystring", "hello world: " + i) .set("myenum", i % 2 == 0 ? "a" : "b") .build(); writer.write(record); } } finally { writer.close(); } return f; }
@Test public void testEmptyArray() throws Exception { Schema schema = new Schema.Parser().parse( Resources.getResource("array.avsc").openStream()); File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); tmp.deleteOnExit(); tmp.delete(); Path file = new Path(tmp.getPath()); AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(file, schema); // Write a record with an empty array. List<Integer> emptyArray = new ArrayList<Integer>(); GenericData.Record record = new GenericRecordBuilder(schema) .set("myarray", emptyArray).build(); writer.write(record); writer.close(); AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file); GenericRecord nextRecord = reader.read(); assertNotNull(nextRecord); assertEquals(emptyArray, nextRecord.get("myarray")); }
@Test public void testEmptyMap() throws Exception { Schema schema = new Schema.Parser().parse( Resources.getResource("map.avsc").openStream()); File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); tmp.deleteOnExit(); tmp.delete(); Path file = new Path(tmp.getPath()); AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(file, schema); // Write a record with an empty map. ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build(); GenericData.Record record = new GenericRecordBuilder(schema) .set("mymap", emptyMap).build(); writer.write(record); writer.close(); AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file); GenericRecord nextRecord = reader.read(); assertNotNull(nextRecord); assertEquals(emptyMap, nextRecord.get("mymap")); }
@Test(expected=RuntimeException.class) public void testMapRequiredValueWithNull() throws Exception { Schema schema = Schema.createRecord("record1", null, null, false); schema.setFields(Lists.newArrayList( new Schema.Field("mymap", Schema.createMap(Schema.create(Schema.Type.INT)), null, null))); File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); tmp.deleteOnExit(); tmp.delete(); Path file = new Path(tmp.getPath()); AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(file, schema); // Write a record with a null value Map<String, Integer> map = new HashMap<String, Integer>(); map.put("thirty-four", 34); map.put("eleventy-one", null); map.put("one-hundred", 100); GenericData.Record record = new GenericRecordBuilder(schema) .set("mymap", map).build(); writer.write(record); }
@Test public void testMapWithUtf8Key() throws Exception { Schema schema = new Schema.Parser().parse( Resources.getResource("map.avsc").openStream()); File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); tmp.deleteOnExit(); tmp.delete(); Path file = new Path(tmp.getPath()); AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(file, schema); // Write a record with a map with Utf8 keys. GenericData.Record record = new GenericRecordBuilder(schema) .set("mymap", ImmutableMap.of(new Utf8("a"), 1, new Utf8("b"), 2)) .build(); writer.write(record); writer.close(); AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file); GenericRecord nextRecord = reader.read(); assertNotNull(nextRecord); assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap")); }
@Override public void setUp() throws Exception { Schema schema = SchemaBuilder.record("UnitTestRecord") .fields() .name("data").type().stringType().noDefault() .name("timestamp").type().nullable().longType().noDefault() .endRecord(); GenericRecordBuilder builder = new GenericRecordBuilder(schema); msg1 = builder.set("data", "foo").set("timestamp", 1467176315L).build(); msg2 = builder.set("data", "bar").set("timestamp", 1467176344L).build(); writer = new SpecificDatumWriter(schema); config = Mockito.mock(SecorConfig.class); when(config.getSchemaRegistryUrl()).thenReturn(""); secorSchemaRegistryClient = Mockito.mock(SecorSchemaRegistryClient.class); when(secorSchemaRegistryClient.getSchema(anyString())).thenReturn(schema); mFactory = new AvroParquetFileReaderWriterFactory(config); when(secorSchemaRegistryClient.decodeMessage("test-avro-topic", AvroParquetFileReaderWriterFactory.serializeAvroRecord(writer, msg1))).thenReturn(msg1); when(secorSchemaRegistryClient.decodeMessage("test-avro-topic", AvroParquetFileReaderWriterFactory.serializeAvroRecord(writer, msg2))).thenReturn(msg2); mFactory.schemaRegistryClient = secorSchemaRegistryClient; }
@Override public void map(AvroKey<GenericRecord> key, NullWritable value, Context context) throws IOException, InterruptedException { GenericRecord datum = key.datum(); char[] bytemask = ((String) datum.get(SchemaUtils.DIFFMASK)).toCharArray(); int minpos = -1; int maxpos = -1; for(int i = 0; i<bytemask.length; i++) { if(bytemask[i] == SchemaUtils.ONE) { if(minpos == -1) { minpos =i; } maxpos = i; } } GenericData.Record build = new GenericRecordBuilder((GenericData.Record) datum).build(); build.put(dm_from_datetime, dates[minpos == 0 ? 0 : minpos - 1]); build.put(dm_to_datetime, dates[maxpos]); context.write(new AvroKey<GenericRecord>(build), NullWritable.get()); }
@Test public void testArrays() throws Exception { Schema schema = Arrays.SCHEMA$; GenericRecordBuilder builder = new GenericRecordBuilder(schema); builder.set("arrays", ImmutableList.of(ImmutableList.of(ImmutableList.of(1, 2, 3), ImmutableList.of()), ImmutableList.of(ImmutableList.of(4), ImmutableList.of()), ImmutableList.of(ImmutableList.of()))); Record record1 = builder.build(); String json = "{\"arrays\": [[[1, 2, 3], []], [[4], []], [[]]]}"; BSONObject object = (BSONObject) JSON.parse(json); Record record2 = RecordConverter.toRecord(schema, object, getClass().getClassLoader()); assertThat(record2).isEqualTo(record1); assertThat(AvroHelper.toJson(schema, record2)).isEqualTo(AvroHelper.toJson(schema, record1)); }
@Test public void testMaps() throws Exception { Schema schema = Maps.SCHEMA$; GenericRecordBuilder builder = new GenericRecordBuilder(schema); builder.set("maps", ImmutableMap.of("key1", ImmutableMap.of("value1", 1, "value2", 2), "key2", ImmutableMap.of(), "key3", ImmutableMap.of("value3", 3))); Record record1 = builder.build(); String json = "{\"maps\": {\"key1\": {\"value1\": 1, \"value2\": 2}, \"key2\": {}, \"key3\": {\"value3\": 3}}}"; DBObject object = (DBObject) JSON.parse(json); Record record2 = RecordConverter.toRecord(schema, object, getClass().getClassLoader()); // Convert into JsonNode before comparison, so the maps equal even if keys are reordered. assertThat((Object) Json.parse(AvroHelper.toJson(schema, record2))) .isEqualTo(Json.parse(AvroHelper.toJson(schema, record1))); }
@Override public int run(String[] args) throws Exception { // Get a log4j logger Logger logger = Logger.getLogger(App.class); // Find the schema from the repository DatasetRepository repo = DatasetRepositories.open("repo:hdfs:/tmp/data"); Schema schema = repo.load("events").getDescriptor().getSchema(); // Build some events using the generic Avro API and log them using log4j GenericRecordBuilder builder = new GenericRecordBuilder(schema); for (long i = 0; i < 10; i++) { GenericRecord event = builder.set("id", i) .set("message", "Hello " + i).build(); System.out.println("Sending to log4j: " + event); logger.info(event); } return 0; }
@Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setContentType("text/html"); PrintWriter pw = response.getWriter(); pw.println("<html>"); pw.println("<head><title>CDK Example</title></title>"); pw.println("<body>"); String message = request.getParameter("message"); if (message == null) { pw.println("<p>No message specified.</p>"); } else { pw.println("<p>Message: " + message + "</p>"); GenericData.Record event = new GenericRecordBuilder(schema) .set("id", id.incrementAndGet()) .set("message", message) .build(); logger.info(event); } pw.println("<p><a href=\"/logging-webapp\">Home</a></p>"); pw.println("</body></html>"); }
@SuppressWarnings("deprecation") private static PartitionKey getPartitionKey(Dataset data, long timestamp) { // need to build a fake record to get a partition key final GenericRecordBuilder builder = new GenericRecordBuilder( data.getDescriptor().getSchema()); builder.set("timestamp", timestamp); builder.set("level", "INFO"); builder.set("component", "StagingToPersistentSerial"); builder.set("message", "Fake log message"); // access the partition strategy, which produces keys from records final PartitionStrategy partitioner = data.getDescriptor() .getPartitionStrategy(); return partitioner.partitionKeyForEntity(builder.build()); }
public static void writeTestUsers(Dataset<GenericData.Record> ds, int count, int start, String... fields) { DatasetWriter<GenericData.Record> writer = null; try { writer = ds.newWriter(); writer.open(); for (int i = start; i < count + start; i++) { GenericRecordBuilder recordBuilder = new GenericRecordBuilder(ds.getDescriptor ().getSchema()).set("username", "test-" + i); for (String field : fields) { recordBuilder.set(field, field + "-" + i); } writer.write(recordBuilder.build()); } writer.flush(); } finally { if (writer != null) { writer.close(); } } }
@Override public int run(String[] args) throws Exception { // Get a log4j logger Logger logger = Logger.getLogger(App.class); // Load the schema from our classpath Schema schema = new Schema.Parser().parse( getClass().getResourceAsStream("/event.avsc")); // Build some events using the generic Avro API and log them using log4j GenericRecordBuilder builder = new GenericRecordBuilder(schema); for (long i = 0; i < 10; i++) { GenericRecord event = builder.set("id", i) .set("message", "Hello " + i).build(); System.out.println("Sending to log4j: " + event); logger.info(event); } return 0; }
@Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setContentType("text/html"); PrintWriter pw = response.getWriter(); pw.println("<html>"); pw.println("<head><title>Kite Example</title></title>"); pw.println("<body>"); String message = request.getParameter("message"); if (message == null) { pw.println("<p>No message specified.</p>"); } else { pw.println("<p>Message: " + message + "</p>"); GenericData.Record event = new GenericRecordBuilder(schema) .set("id", id.incrementAndGet()) .set("message", message) .build(); logger.info(event); } pw.println("<p><a href=\"/logging-webapp\">Home</a></p>"); pw.println("</body></html>"); }
public static AvroDataFileGenerator intRecordGenerator(Class testClass, CodecFactory codec) throws Exception { return new AvroDataFileGenerator( testClass, new Schema.Parser().parse(testClass.getClassLoader().getResourceAsStream("intRecord.avsc")), (schema, value) -> new GenericRecordBuilder(schema) .set("value", value) .build(), codec ); }
private static GenericRecord createGenericRecordWithSchema(Schema schema, GenericRecord avrObj){ GenericRecordBuilder builder = new GenericRecordBuilder(schema); for (Schema.Field field : schema.getFields()){ builder.set(field, avrObj.get(field.name())); } return builder.build(); }
@Test public void union() throws Exception { Schema schema = Schemas.simpleUnion(); Record avro = new GenericRecordBuilder(schema).set("id", 1L).set("str", "hello").build(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Encoder encoder = new JasvornoEncoder(schema, outputStream); DatumWriter<Record> datumWriter = new GenericDatumWriter<Record>(schema); datumWriter.write(avro, encoder); encoder.flush(); assertThat(new String(outputStream.toByteArray()), is("{\"id\":1,\"str\":\"hello\"}")); outputStream.close(); }
@Test public void unionNull() throws Exception { Schema schema = Schemas.simpleUnion(); Record avro = new GenericRecordBuilder(schema).set("id", 1L).set("str", null).build(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Encoder encoder = new JasvornoEncoder(schema, outputStream); DatumWriter<Record> datumWriter = new GenericDatumWriter<Record>(schema); datumWriter.write(avro, encoder); encoder.flush(); assertThat(new String(outputStream.toByteArray()), is("{\"id\":1,\"str\":null}")); outputStream.close(); }
@Test public void bytes() throws Exception { Schema schema = Schemas.bytes(); Record avro = new GenericRecordBuilder(schema).set("str", ByteBuffer.wrap(new byte[] { 0x0, 0x1, 0x2 })).build(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Encoder encoder = new JasvornoEncoder(schema, outputStream); DatumWriter<Record> datumWriter = new GenericDatumWriter<Record>(schema); datumWriter.write(avro, encoder); encoder.flush(); assertThat(new String(outputStream.toByteArray()), is("{\"str\":\"\\u0000\\u0001\\u0002\"}")); outputStream.close(); }
@Before public void setup() throws EventDeliveryException { Datasets.delete(FILE_DATASET_URI); Datasets.create(FILE_DATASET_URI, DESCRIPTOR); this.config = new Context(); config.put("keep-alive", "0"); this.in = new MemoryChannel(); Configurables.configure(in, config); config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, FILE_DATASET_URI); GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA); expected = Lists.<GenericRecord>newArrayList( builder.set("id", "1").set("msg", "msg1").build(), builder.set("id", "2").set("msg", "msg2").build(), builder.set("id", "3").set("msg", "msg3").build()); putToChannel(in, Iterables.transform(expected, new Function<GenericRecord, Event>() { private int i = 0; @Override public Event apply(@Nullable GenericRecord rec) { this.i += 1; boolean useURI = (i % 2) == 0; return event(rec, RECORD_SCHEMA, SCHEMA_FILE, useURI); } })); }
@Test public void testCompatibleSchemas() throws EventDeliveryException { DatasetSink sink = sink(in, config); // add a compatible record that is missing the msg field GenericRecordBuilder compatBuilder = new GenericRecordBuilder( COMPATIBLE_SCHEMA); GenericData.Record compatibleRecord = compatBuilder.set("id", "0").build(); // add the record to the incoming channel putToChannel(in, event(compatibleRecord, COMPATIBLE_SCHEMA, null, false)); // the record will be read using the real schema, so create the expected // record using it, but without any data GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA); GenericData.Record expectedRecord = builder.set("id", "0").build(); expected.add(expectedRecord); // run the sink sink.start(); sink.process(); sink.stop(); Assert.assertEquals( Sets.newHashSet(expected), read(Datasets.load(FILE_DATASET_URI))); Assert.assertEquals("Should have committed", 0, remaining(in)); }
@Test public void testSerializedWithIncompatibleSchemasWithSavePolicy() throws EventDeliveryException { if (Datasets.exists(ERROR_DATASET_URI)) { Datasets.delete(ERROR_DATASET_URI); } config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY, DatasetSinkConstants.SAVE_FAILURE_POLICY); config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI, ERROR_DATASET_URI); final DatasetSink sink = sink(in, config); GenericRecordBuilder builder = new GenericRecordBuilder( INCOMPATIBLE_SCHEMA); GenericData.Record rec = builder.set("username", "koala").build(); // We pass in a valid schema in the header, but an incompatible schema // was used to serialize the record Event badEvent = event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true); putToChannel(in, badEvent); // run the sink sink.start(); sink.process(); sink.stop(); Assert.assertEquals("Good records should have been written", Sets.newHashSet(expected), read(Datasets.load(FILE_DATASET_URI))); Assert.assertEquals("Should not have rolled back", 0, remaining(in)); Assert.assertEquals("Should have saved the bad event", Sets.newHashSet(AvroFlumeEvent.newBuilder() .setBody(ByteBuffer.wrap(badEvent.getBody())) .setHeaders(toUtf8Map(badEvent.getHeaders())) .build()), read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class))); }
@Test public void testSerializedWithIncompatibleSchemas() throws EventDeliveryException { final DatasetSink sink = sink(in, config); GenericRecordBuilder builder = new GenericRecordBuilder( INCOMPATIBLE_SCHEMA); GenericData.Record rec = builder.set("username", "koala").build(); // We pass in a valid schema in the header, but an incompatible schema // was used to serialize the record putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true)); // run the sink sink.start(); assertThrows("Should fail", EventDeliveryException.class, new Callable() { @Override public Object call() throws EventDeliveryException { sink.process(); return null; } }); sink.stop(); Assert.assertEquals("Should have rolled back", expected.size() + 1, remaining(in)); }
private GenericRecord toAvroRow(EventModel event) { GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); EventHeaderModel header = event.getHeader(); recordBuilder.set(EventHDFSTableSchema.EVENT_ID, event.getId().toString()); recordBuilder.set(EventHDFSTableSchema.EVENT_CODE, header.getCode()); recordBuilder.set(EventHDFSTableSchema.EVENT_CREATED_AT_MILLIS, header.getTimeStamp()); recordBuilder.set(EventHDFSTableSchema.EVENT_HEADER_JSON, toJson(header.getHeaders())); recordBuilder.set(EventHDFSTableSchema.EVENT_PARAMS_JSON, toJson(event.getParams())); EventType eventType = header.getEventType(); if (eventType != null) { recordBuilder.set(EventHDFSTableSchema.EVENT_TYPE, eventType.name()); EventDetailModel eventDetail = event.getEventDetail(); if (header.getEventType() == EventType.System) { SystemEventDetailModel systemEventDetail = eventDetail.getSystemEventDetail(); // System Event recordBuilder.set(EventHDFSTableSchema.PROCESS_ID, systemEventDetail.getProcessId()); recordBuilder.set(EventHDFSTableSchema.PROCESS_NAME, systemEventDetail.getProcessName()); recordBuilder.set(EventHDFSTableSchema.VM_DETAILS, systemEventDetail.getVmDetail()); recordBuilder.set(EventHDFSTableSchema.CALL_TRACE, systemEventDetail.getGeneratorTrace()); recordBuilder.set(EventHDFSTableSchema.MAC_ID, systemEventDetail.getMacId()); recordBuilder.set(EventHDFSTableSchema.PRODUCT_NAME, systemEventDetail.getProductName()); recordBuilder.set(EventHDFSTableSchema.MODULE_NAME, systemEventDetail.getModuleName()); recordBuilder.set(EventHDFSTableSchema.ACTION, systemEventDetail.getAction()); } else { UserEventDetailModel userEventDetail = eventDetail.getUserEventDetail(); // User Event recordBuilder.set(EventHDFSTableSchema.USER_ID, userEventDetail.getUserId()); recordBuilder.set(EventHDFSTableSchema.USER_AGENT, userEventDetail.getUserAgent()); recordBuilder.set(EventHDFSTableSchema.CLIENT_IP_ADDRESS, userEventDetail.getIpAddress()); recordBuilder.set(EventHDFSTableSchema.ACTION_URL, userEventDetail.getActionUrl()); } } verifyNull(recordBuilder); return recordBuilder.build(); }
private void verifyNull(GenericRecordBuilder recordBuilder) { List<Field> fields = avroSchema.getFields(); for (Field field : fields) { Object value = recordBuilder.get(field); if (value == null) { recordBuilder.set(field, ""); } } }
@SuppressWarnings("unchecked") private Object wrapOption(Schema schema, Object option) { if (schema.getType() == Schema.Type.BYTES && option instanceof String) { option = ByteBuffer.wrap(((String) option).getBytes(Charset.defaultCharset())); } else if (schema.getType() == Schema.Type.FLOAT && option instanceof Double) { option = ((Double) option).floatValue(); } else if (schema.getType() == Schema.Type.LONG && option instanceof Integer) { option = ((Integer) option).longValue(); } else if (schema.getType() == Schema.Type.ARRAY && option instanceof Collection) { option = new GenericData.Array(schema, (Collection) option); } else if (schema.getType() == Schema.Type.ENUM && option instanceof String) { option = new GenericData.EnumSymbol(schema, (String) option); } else if (schema.getType() == Schema.Type.FIXED && option instanceof String) { option = new GenericData.Fixed(schema, ((String) option).getBytes(Charset.defaultCharset())); } else if (schema.getType() == Schema.Type.RECORD && option instanceof Map) { Map optionMap = (Map) option; GenericRecordBuilder optionBuilder = new GenericRecordBuilder(schema); for (Schema.Field field : schema.getFields()) { if (optionMap.containsKey(field.name())) { optionBuilder.set(field, optionMap.get(field.name())); } } option = optionBuilder.build(); } return option; }
private GenericRecord generateRecord(Schema schema) { GenericRecordBuilder builder = new GenericRecordBuilder(schema); for (Schema.Field field : schema.getFields()) { builder.set(field, generateObject(field.schema())); } return builder.build(); }
public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException { KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(producerProperties); Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(Resources.getResource("drill-avro-test.avsc").openStream()); GenericRecordBuilder builder = new GenericRecordBuilder(schema); Random rand = new Random(); for (int i = 0; i < numMsg; ++i) { builder.set("key1", UUID.randomUUID().toString()); builder.set("key2", rand.nextInt()); builder.set("key3", rand.nextBoolean()); List<Integer> list = Lists.newArrayList(); list.add(rand.nextInt(100)); list.add(rand.nextInt(100)); list.add(rand.nextInt(100)); builder.set("key5", list); Map<String, Double> map = Maps.newHashMap(); map.put("key61", rand.nextDouble()); map.put("key62", rand.nextDouble()); builder.set("key6", map); Record producerRecord = builder.build(); ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, producerRecord); producer.send(record); } producer.close(); }