@Override public Object write( final Object obj ) throws IOException{ GenericRecord record = new GenericData.Record( avroSchema ); if( ! ( obj instanceof Map ) ){ return record; } Map<Object,Object> mapObj = (Map<Object,Object>)obj; for( KeyAndFormatter childFormatter : childContainer ){ childFormatter.clear(); record.put( childFormatter.getName() , childFormatter.get( mapObj ) ); } return record; }
private static void applyMapDiff(Schema.Field field, GenericRecord avroObj, GenericRecord fieldsValue, Map<Object, Object> modifiedObj, Object key) throws IOException { Map<String, Object> changedKeys = ((MapDiff) fieldsValue).getChangedKeys(); for (String changedKey : changedKeys.keySet()) { Class<?> clazz = changedKeys.get(changedKey).getClass(); if (clazz.isAssignableFrom(PrimitiveDiff.class)) { AvroDiffPrimitive.applyPrimitiveDiff(field, avroObj, changedKeys.get(changedKey), changedKeys, changedKey); modifiedObj.put(key, changedKeys); } else if (clazz.isAssignableFrom(MapDiff.class)) { AvroDiffMap.applyMapDiff(field, avroObj, (GenericRecord) changedKeys.get(changedKey), Maps.newHashMap(changedKeys), changedKey); } else if (clazz.isAssignableFrom(ArrayDiff.class)) { AvroDiffArray.applyArrayDiff(field, avroObj, (GenericRecord) changedKeys.get(changedKey), null); } else if (clazz.isAssignableFrom(RecordDiff.class)) { Object avroField = ((Map) avroObj.get(field.pos())).get(key); GenericRecord genericRecord = AvroDiff.applyDiff((GenericRecord) ((Map) avroField).get(changedKey), (RecordDiff) changedKeys.get(changedKey), ((GenericRecord) ((Map) avroField).get(changedKey)).getSchema()); ((Map) avroField).put(changedKey, genericRecord); modifiedObj.put(key, avroField); } } }
/** * 将avro格式的数据写入到parquet文件中 * * @param parquetPath */ public void write(String parquetPath) { Schema.Parser parser = new Schema.Parser(); try { Schema schema = parser.parse(AvroParquetOperation.class.getClassLoader().getResourceAsStream("StringPair.avsc")); GenericRecord datum = new GenericData.Record(schema); datum.put("left", "L"); datum.put("right", "R"); Path path = new Path(parquetPath); System.out.println(path); AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(path, schema); writer.write(datum); writer.close(); } catch (IOException e) { e.printStackTrace(); } }
public static List processRecordY(CSVPrinter printer, GenericRecord record, List<Column> columns) throws IOException { List r = new ArrayList<>(); columns.forEach(c -> { try { r.add(record.get(c.getField().name())); } catch (Exception e) { try { r.add(c.getDefaultValue()); } catch (Exception e2) { r.add("NULL"); } } }); printer.printRecord(r); printer.flush(); return r; }
/** * Import blob data that is smaller than inline lob limit and compress with * deflate codec. Blob data should be encoded and saved as Avro bytes. * @throws IOException * @throws SQLException */ public void testBlobCompressedAvroImportInline() throws IOException, SQLException { String [] types = { getBlobType() }; String expectedVal = "This is short BLOB data"; String [] vals = { getBlobInsertStr(expectedVal) }; createTableWithColTypes(types, vals); runImport(getArgv("--compression-codec", CodecMap.DEFLATE)); Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); DataFileReader<GenericRecord> reader = read(outputFile); GenericRecord record = reader.next(); // Verify that the data block of the Avro file is compressed with deflate // codec. assertEquals(CodecMap.DEFLATE, reader.getMetaString(DataFileConstants.CODEC)); // Verify that all columns are imported correctly. ByteBuffer buf = (ByteBuffer) record.get(getColName(0)); String returnVal = new String(buf.array()); assertEquals(getColName(0), expectedVal, returnVal); }
@Test public void shouldCreateAndApplyRecordDiff() throws IOException { map1.put("a", 3f); map1.put("c", true); map2.put("c", true); map2.put("b", 5l); list1.add("asf"); list2.add("ddd"); RecordDiff diff = AvroDiff.createDiff(recordSpecificRecord1, recordSpecificRecord2, recordSpecificRecord1.getSchema()); GenericRecord modifiedRecord = AvroDiff.applyDiff(recordSpecificRecord1, diff, recordSpecificRecord1.getSchema()); Assert.assertEquals(modifiedRecord, recordSpecificRecord2); Assert.assertNotEquals(modifiedRecord, recordSpecificRecord1); }
/** * Create a data file that gets exported to the db. * @param fileNum the number of the file (for multi-file export) * @param numRecords how many records to write to the file. */ protected void createParquetFile(int fileNum, int numRecords, ColumnGenerator... extraCols) throws IOException { String uri = "dataset:file:" + getTablePath(); Schema schema = buildSchema(extraCols); DatasetDescriptor descriptor = new DatasetDescriptor.Builder() .schema(schema) .format(Formats.PARQUET) .build(); Dataset dataset = Datasets.create(uri, descriptor); DatasetWriter writer = dataset.newWriter(); try { for (int i = 0; i < numRecords; i++) { GenericRecord record = new GenericData.Record(schema); record.put("id", i); record.put("msg", getMsgPrefix() + i); addExtraColumns(record, i, extraCols); writer.write(record); } } finally { writer.close(); } }
public static ControlMessage fullDataPollMessage(GenericRecord record, String topologyId, ConsumerRecord<String, byte[]> consumerRecord) { ControlMessage message = new ControlMessage(); message.setId(System.currentTimeMillis()); message.setFrom(topologyId); message.setType(FULL_DATA_PULL_REQ); message.addPayload("topic", consumerRecord.topic()); message.addPayload("DBUS_DATASOURCE_ID", Utils.getDatasource().getId()); PairWrapper<String, Object> wrapper = BoltCommandHandlerHelper.convertAvroRecord(record, Constants.MessageBodyKey.noorderKeys); message.addPayload("OP_TS", wrapper.getProperties(Constants.MessageBodyKey.OP_TS).toString()); message.addPayload("POS", wrapper.getProperties(Constants.MessageBodyKey.POS).toString()); for (Pair<String,Object> pair : wrapper.getPairs()) { message.addPayload(pair.getKey(), pair.getValue()); } return message; }
public static <T extends Object> PairWrapper<String, Object> convertAvroRecordUseBeforeMap(GenericRecord record, Set<T> noorderKeys) { Schema schema = record.getSchema(); List<Schema.Field> fields = schema.getFields(); PairWrapper<String, Object> wrapper = new PairWrapper<>(); for (Schema.Field field : fields) { String key = field.name(); Object value = record.get(key); // 分离存储是否关心顺序的key-value if (noorderKeys.contains(field.name())) { wrapper.addProperties(key, value); } } GenericRecord before = getFromRecord(MessageBodyKey.BEFORE, record); Map<String, Object> beforeMap = convert2map(before); for (Map.Entry<String, Object> entry : beforeMap.entrySet()) { if(!entry.getKey().endsWith(MessageBodyKey.IS_MISSING_SUFFIX)) { wrapper.addPair(new Pair<>(entry.getKey(), CharSequence.class.isInstance(entry.getValue())?entry.getValue().toString():entry.getValue())); } } return wrapper; }
private GenericRecord convertToAvroRecord(Schema avroRecordSchema, Object[] values) { // TODO can be improve to create once and reuse GenericRecord avroRec = new GenericData.Record(avroRecordSchema); List<ColumnConverterDescriptor> columnConverters = converterDescriptor.getColumnConverters(); if (values.length != columnConverters.size()) { // mismatch schema // TODO better exception throw new RuntimeException("Expecting " + columnConverters.size() + " fields, received " + values.length + " values"); } for (int i = 0; i < values.length; i++) { Object value = values[i]; ColumnConverterDescriptor columnConverterDescriptor = columnConverters.get(i); Object valueToWrite = columnConverterDescriptor.getWritable(value); avroRec.put(columnConverterDescriptor.getColumnName(), valueToWrite); } return avroRec; }
@SuppressWarnings("unchecked") @Override public T deserialize(String topic, byte[] data) { try { T result = null; if (data != null) { LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data)); DatumReader<GenericRecord> datumReader = new SpecificDatumReader<>( targetType.newInstance().getSchema()); Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); result = (T) datumReader.read(null, decoder); LOGGER.debug("deserialized data='{}'", result); } return result; } catch (Exception ex) { throw new SerializationException( "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex); } }
public void testFirstUnderscoreInColumnName() throws IOException { String [] names = { "_NAME" }; String [] types = { "INT" }; String [] vals = { "1987" }; createTableWithColTypesAndNames(names, types, vals); runImport(getOutputArgv(true, null)); Schema schema = getSchema(); assertEquals(Type.RECORD, schema.getType()); List<Field> fields = schema.getFields(); assertEquals(types.length, fields.size()); checkField(fields.get(0), "__NAME", Type.INT); DatasetReader<GenericRecord> reader = getReader(); try { assertTrue(reader.hasNext()); GenericRecord record1 = reader.next(); assertEquals("__NAME", 1987, record1.get("__NAME")); assertFalse(reader.hasNext()); } finally { reader.close(); } }
private static Path createDataFile() throws IOException { File avroFile = File.createTempFile("test-", "." + FILE_EXTENSION); DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema); try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer)) { dataFileWriter.setFlushOnEveryBlock(true); dataFileWriter.setSyncInterval(32); dataFileWriter.create(schema, avroFile); IntStream.range(0, NUM_RECORDS).forEach(index -> { GenericRecord datum = new GenericData.Record(schema); datum.put(FIELD_INDEX, index); datum.put(FIELD_NAME, String.format("%d_name_%s", index, UUID.randomUUID())); datum.put(FIELD_SURNAME, String.format("%d_surname_%s", index, UUID.randomUUID())); try { OFFSETS_BY_INDEX.put(index, dataFileWriter.sync() - 16L); dataFileWriter.append(datum); } catch (IOException ioe) { throw new RuntimeException(ioe); } }); } Path path = new Path(new Path(fsUri), avroFile.getName()); fs.moveFromLocalFile(new Path(avroFile.getAbsolutePath()), path); return path; }
public void validateAvroFile(File file) throws IOException { // read the events back using GenericRecord DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(); DataFileReader<GenericRecord> fileReader = new DataFileReader<GenericRecord>(file, reader); GenericRecord record = new GenericData.Record(fileReader.getSchema()); int numEvents = 0; while (fileReader.hasNext()) { fileReader.next(record); ByteBuffer body = (ByteBuffer) record.get("body"); CharsetDecoder decoder = Charsets.UTF_8.newDecoder(); String bodyStr = decoder.decode(body).toString(); System.out.println(bodyStr); numEvents++; } fileReader.close(); Assert.assertEquals("Should have found a total of 3 events", 3, numEvents); }
public void testOverrideTypeMapping() throws IOException { String [] types = { "INT" }; String [] vals = { "10" }; createTableWithColTypes(types, vals); String [] extraArgs = { "--map-column-java", "DATA_COL0=String"}; runImport(getOutputArgv(true, extraArgs)); Schema schema = getSchema(); assertEquals(Type.RECORD, schema.getType()); List<Field> fields = schema.getFields(); assertEquals(types.length, fields.size()); checkField(fields.get(0), "DATA_COL0", Type.STRING); DatasetReader<GenericRecord> reader = getReader(); try { assertTrue(reader.hasNext()); GenericRecord record1 = reader.next(); assertEquals("DATA_COL0", "10", record1.get("DATA_COL0")); assertFalse(reader.hasNext()); } finally { reader.close(); } }
@Test public void testTimedFileRolling() throws EventDeliveryException, InterruptedException { // use a new roll interval config.put("kite.rollInterval", "1"); // in seconds DatasetSink sink = sink(in, config); Dataset<GenericRecord> records = Datasets.load(FILE_DATASET_URI); // run the sink sink.start(); sink.process(); Assert.assertEquals("Should have committed", 0, remaining(in)); Thread.sleep(1100); // sleep longer than the roll interval sink.process(); // rolling happens in the process method Assert.assertEquals(Sets.newHashSet(expected), read(records)); // wait until the end to stop because it would close the files sink.stop(); }
/** * Converts the avro binary data to the json format */ @Override public XContentBuilder serialize(Event event) { XContentBuilder builder = null; try { if (datumReader != null) { Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); GenericRecord data = datumReader.read(null, decoder); logger.trace("Record in event " + data); XContentParser parser = XContentFactory .xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, data.toString()); builder = jsonBuilder().copyCurrentStructure(parser); parser.close(); } else { logger.error("Schema File is not configured"); } } catch (IOException e) { logger.error("Exception in parsing avro format data but continuing serialization to process further records", e.getMessage(), e); } return builder; }
/** * tests Avro Serializer */ @Test public void testSerializer() throws Exception { Context context = new Context(); String schemaFile = getClass().getResource("/schema.avsc").getFile(); context.put(ES_AVRO_SCHEMA_FILE, schemaFile); avroSerializer.configure(context); Schema schema = new Schema.Parser().parse(new File(schemaFile)); GenericRecord user = generateGenericRecord(schema); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Encoder encoder = new EncoderFactory().binaryEncoder(outputStream, null); datumWriter.write(user, encoder); encoder.flush(); Event event = EventBuilder.withBody(outputStream.toByteArray()); XContentBuilder expected = generateContentBuilder(); XContentBuilder actual = avroSerializer.serialize(event); JsonParser parser = new JsonParser(); assertEquals(parser.parse(expected.string()), parser.parse(actual.string())); }
public void testNonIdentCharactersInColumnName() throws IOException { String [] names = { "test_a-v+r/o" }; String [] types = { "INT" }; String [] vals = { "2015" }; createTableWithColTypesAndNames(names, types, vals); runImport(getOutputArgv(true, null)); Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); DataFileReader<GenericRecord> reader = read(outputFile); Schema schema = reader.getSchema(); assertEquals(Schema.Type.RECORD, schema.getType()); List<Field> fields = schema.getFields(); assertEquals(types.length, fields.size()); checkField(fields.get(0), "TEST_A_V_R_O", Type.INT); GenericRecord record1 = reader.next(); assertEquals("TEST_A_V_R_O", 2015, record1.get("TEST_A_V_R_O")); }
/** Import blob data that is smaller than inline lob limit. Blob data * should be saved as Avro bytes. * @throws IOException * @throws SQLException */ public void testBlobAvroImportInline() throws IOException, SQLException { String [] types = { getBlobType() }; String expectedVal = "This is short BLOB data"; String [] vals = { getBlobInsertStr(expectedVal) }; createTableWithColTypes(types, vals); runImport(getArgv()); Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); DataFileReader<GenericRecord> reader = read(outputFile); GenericRecord record = reader.next(); // Verify that blob data is imported as Avro bytes. ByteBuffer buf = (ByteBuffer) record.get(getColName(0)); String returnVal = new String(buf.array()); assertEquals(getColName(0), expectedVal, returnVal); }
public void testParquetRecordsNotSupported() throws IOException, SQLException { String[] argv = {}; final int TOTAL_RECORDS = 1; Schema schema = Schema.createRecord("nestedrecord", null, null, false); schema.setFields(Lists.newArrayList(buildField("myint", Schema.Type.INT))); GenericRecord record = new GenericData.Record(schema); record.put("myint", 100); // DB type is not used so can be anything: ColumnGenerator gen = colGenerator(record, schema, null, "VARCHAR(64)"); createParquetFile(0, TOTAL_RECORDS, gen); createTable(gen); try { runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); fail("Parquet records can not be exported."); } catch (Exception e) { // expected assertTrue(true); } }
public void testNullableParquetImport() throws IOException, SQLException { String [] types = { "INT" }; String [] vals = { null }; createTableWithColTypes(types, vals); runImport(getOutputArgv(true, null)); DatasetReader<GenericRecord> reader = getReader(); try { assertTrue(reader.hasNext()); GenericRecord record1 = reader.next(); assertNull(record1.get("DATA_COL0")); assertFalse(reader.hasNext()); } finally { reader.close(); } }
protected SqoopRecord toSqoopRecord(GenericRecord record) throws IOException { Schema avroSchema = record.getSchema(); for (Map.Entry<Writable, Writable> e : columnTypes.entrySet()) { String columnName = e.getKey().toString(); String columnType = e.getValue().toString(); String cleanedCol = ClassWriter.toIdentifier(columnName); Schema.Field field = getFieldIgnoreCase(avroSchema, cleanedCol); if (null == field) { throw new IOException("Cannot find field " + cleanedCol + " in Avro schema " + avroSchema); } Object avroObject = record.get(field.name()); Object fieldVal = AvroUtil.fromAvro(avroObject, field.schema(), columnType); recordImpl.setField(cleanedCol, fieldVal.toString()); } return recordImpl; }
@Override public DatasetJsonRecord getSchema(Path targetFilePath) throws IOException { LOG.info("avro file path : " + targetFilePath.toUri().getPath()); try { SeekableInput sin = new FsInput(targetFilePath, fs.getConf()); DataFileReader<GenericRecord> reader = new DataFileReader<GenericRecord>(sin, new GenericDatumReader<GenericRecord>()); String codec = reader.getMetaString("avro.codec"); long record_count = reader.getBlockCount(); String schemaString = reader.getSchema().toString(); String storage = STORAGE_TYPE; String abstractPath = targetFilePath.toUri().getPath(); System.out.println("the schema string is: " + schemaString); System.out.println("the abstract path is: " + abstractPath); FileStatus fstat = fs.getFileStatus(targetFilePath); DatasetJsonRecord datasetJsonRecord = new DatasetJsonRecord(schemaString, abstractPath, fstat.getModificationTime(), fstat.getOwner(), fstat.getGroup(), fstat.getPermission().toString(), codec, storage, ""); reader.close(); sin.close(); LOG.info("Avro file datasetjsonrecorc get success, it is : " + datasetJsonRecord); return datasetJsonRecord; } catch (Exception e) { LOG.info("AvroAnalyzer get datasetjson failure, and exception is " + e.getMessage()); return null; } }
@Override public SampleDataRecord getSampleData(Path targetFilePath) throws IOException { SampleDataRecord sampleDataRecord = null; try { SeekableInput sin = new FsInput(targetFilePath, fs.getConf()); DataFileReader<GenericRecord> reader = new DataFileReader<GenericRecord>(sin, new GenericDatumReader<GenericRecord>()); Iterator<GenericRecord> iter = reader.iterator(); int count = 0; List<Object> list = new ArrayList<Object>(); //JSONArray list = new JSONArray(); while (iter.hasNext() && count < 10) { // TODO handle out of memory error list.add(iter.next().toString().replaceAll("[\\n\\r\\p{C}]", "").replaceAll("\"", "\\\"")); count++; } // for debug System.out.println("avro arraylist is: " + list.toString()); sampleDataRecord = new SampleDataRecord(targetFilePath.toUri().getPath(), list); return sampleDataRecord; } catch (Exception e) { LOG.info("AvroAnalyzer get sampleDataRecord failure and exception is " + e.getMessage()); } return sampleDataRecord; }
private static Object[] convertGenericRecordToArray(GenericRecord record) { Object[] result = new Object[record.getSchema().getFields().size()]; for (int i = 0; i < result.length; i++) { result[i] = record.get(i); } return result; }
@Test public void testNullsAreInferred() throws IOException { String w = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[{\"type\":[\"null\",\"long\"],\"name\":\"a\",\"default\":null}]}"; GenericRecord record = readRecord(w, "{}"); Assert.assertNull(record.get("a")); }
private GenericRecord convertToAvroRecord(CDCEvent event) { GenericRecord avroRec = new GenericData.Record(getAvroSchema()); Row row = event.getRow(); // EVENTID avroRec.put("EVENTID", String.valueOf(event.getEventSequenceID().getSequenceID())); // OPERATION_TYPE avroRec.put("OPERATION_TYPE", String.valueOf(event.getOperation())); // RowKey avroRec.put("RowKey", row.getRowId()); // VersionID if (row.getRowTimeStamp() != null) { avroRec.put("VersionID", row.getRowTimeStamp()); } // add col values List<Cell> cells = row.getCells(); cells.forEach(cell -> { if (cell.getColumnValue() != null) { avroRec.put(Bytes.toString(cell.getColumnName()), cell.getColumnValue()); } }); return avroRec; }
@Test public void testNestedNullsAreInferred() throws IOException { String w = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[{\"name\":\"S\",\"type\":" + "{\"type\":\"record\",\"name\":\"S\",\"fields\":[{\"type\":[\"null\",\"long\"],\"name\":\"a\",\"default\":null},{\"type\":\"long\",\"name\":\"b\"}]}}]}"; String data = "{\"S\": {\"b\":1}}"; GenericRecord record = ((GenericRecord)readRecord(w, data).get("S")); Assert.assertNull(record.get("a")); }
@Override public byte[] toBytes(Object o) { GenericRecord record = (GenericRecord) o; try { return encodeAvroGenericRecord(schema, record); } catch (IOException e) { throw new SamzaException("Unable to serialize the record", e); } }
@Test public void testRecordCanBeNull() throws IOException { String w = "{\"type\":\"record\",\"name\":\"R\",\"namespace\":\"com.playtech.bex.massupdate.api\",\"fields\":" + "[{\"name\":\"S\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"S\",\"fields\":[{\"name\":\"A\",\"type\":\"long\"}]}],\"default\":null}]}"; String data = "{}"; GenericRecord record = readRecord(w, data); Assert.assertNull(record.get("S")); }
/** * Create a data file that gets exported to the db. * @param fileNum the number of the file (for multi-file export) * @param numRecords how many records to write to the file. */ protected void createAvroFile(int fileNum, int numRecords, ColumnGenerator... extraCols) throws IOException { Path tablePath = getTablePath(); Path filePath = new Path(tablePath, "part" + fileNum); Configuration conf = new Configuration(); if (!BaseSqoopTestCase.isOnPhysicalCluster()) { conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS); } FileSystem fs = FileSystem.get(conf); fs.mkdirs(tablePath); OutputStream os = fs.create(filePath); Schema schema = buildAvroSchema(extraCols); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); dataFileWriter.create(schema, os); for (int i = 0; i < numRecords; i++) { GenericRecord record = new GenericData.Record(schema); record.put("id", i); record.put("msg", getMsgPrefix() + i); addExtraColumns(record, i, extraCols); dataFileWriter.append(record); } dataFileWriter.close(); os.close(); }
private static GenericRecord valuesToSpecificDiff(@Nullable Object originalValue, Object changedValue) throws IOException { if (GenericRecord.class.isAssignableFrom(changedValue.getClass())) { return AvroDiff.createDiff((GenericRecord) originalValue, (GenericRecord) changedValue, ((GenericRecord) changedValue).getSchema()); } else if (changedValue.getClass().isArray()) { return null; } else if (Map.class.isAssignableFrom(changedValue.getClass())) { if (originalValue != null) { return AvroDiffMap.createMapDiff(originalValue, changedValue); } else { return AvroDiffMap.createMapDiff(Maps.newHashMap(), changedValue); } } else { PrimitiveDiff primitiveDiff = new PrimitiveDiff(); if (Number.class.isAssignableFrom(changedValue.getClass())) { primitiveDiff = AvroDiffPrimitive.createPrimitiveDiff(originalValue, changedValue, Schema.Type.LONG, Schema.Type.LONG); } else if (changedValue.getClass().isAssignableFrom(String.class)) { primitiveDiff = AvroDiffPrimitive.createPrimitiveDiff(originalValue, changedValue, Schema.Type.STRING, Schema.Type.STRING); } else if (changedValue.getClass().isAssignableFrom(Boolean.class)) { primitiveDiff = AvroDiffPrimitive.createPrimitiveDiff(originalValue, changedValue, Schema.Type.BOOLEAN, Schema.Type.BOOLEAN); } else if (changedValue.getClass().isArray()) { primitiveDiff = AvroDiffPrimitive.createPrimitiveDiff(originalValue, changedValue, Schema.Type.BYTES, Schema.Type.BYTES); } return primitiveDiff; } }
public static GenericRecord applyDiff(GenericRecord avroObj, RecordDiff diff, Schema schema) throws IOException { GenericRecord modifiedAvroObj = createGenericRecordWithSchema(schema, avroObj); Map<String, Object> diffFields = diff.getDiffFields(); List<Schema.Field> fields = schema.getFields(); for (Schema.Field field : fields) { if (diffFields.containsKey(field.name())) { GenericRecord fieldsValue = (GenericRecord) diffFields.get(field.name()); Class<? extends GenericRecord> fieldsValueClass = fieldsValue.getClass(); if (fieldsValueClass.isAssignableFrom(PrimitiveDiff.class)) { AvroDiffPrimitive.applyPrimitiveDiff(field, avroObj, fieldsValue, modifiedAvroObj, null); } else if (fieldsValueClass.isAssignableFrom(MapDiff.class)) { AvroDiffMap.applyMapDiff(field, avroObj, fieldsValue, modifiedAvroObj); } else if (fieldsValueClass.isAssignableFrom(ArrayDiff.class)) { AvroDiffArray.applyArrayDiff(field, avroObj, fieldsValue, modifiedAvroObj); } else if (fieldsValueClass.isAssignableFrom(RecordDiff.class)) { GenericRecord recordField = (GenericRecord) modifiedAvroObj.get(field.pos()); GenericRecord genericRecord = applyDiff(recordField, (RecordDiff) fieldsValue, recordField.getSchema()); modifiedAvroObj.put(field.pos(), genericRecord); } else { LOGGER.error("Field from RecordDiff has unknown type."); } } else { modifiedAvroObj.put(field.pos(), avroObj.get(field.pos())); } } return SpecificData.get().deepCopy(schema, modifiedAvroObj); }
private static GenericRecord createGenericRecordWithSchema(Schema schema, GenericRecord avrObj){ GenericRecordBuilder builder = new GenericRecordBuilder(schema); for (Schema.Field field : schema.getFields()){ builder.set(field, avrObj.get(field.name())); } return builder.build(); }
public static void applyPrimitiveDiff(Schema.Field field, GenericRecord avroObj, Object fieldsValue, Object modifiedObj, Object key) throws IOException { ByteBuffer diffValue = ((PrimitiveDiff) fieldsValue).getDiffValue(); Object newValue = ((PrimitiveDiff) fieldsValue).getNewValue(); if (newValue != null) { if (key != null) { ((Map) modifiedObj).put(key, newValue); } else { ((GenericRecord) modifiedObj).put(field.pos(), newValue); } } else { Object avroObjField = avroObj.get(field.pos()); byte[] avroObjFieldBytes = field.schema().getType().equals(Schema.Type.STRING) ? ((String) avroObjField).getBytes() : ((byte[]) avroObjField); byte[] avroObjFieldModifiedBytes = new GDiffPatcher().patch(avroObjFieldBytes, diffValue.array()); if (key != null) { if (field.schema().getType().equals(Schema.Type.STRING)) { ((Map) modifiedObj).put(key, new String(avroObjFieldModifiedBytes, StandardCharsets.UTF_8)); } else { ((Map) modifiedObj).put(key, avroObjFieldModifiedBytes); } } else { if (field.schema().getType().equals(Schema.Type.STRING)) { ((GenericRecord) modifiedObj).put(field.pos(), new String(avroObjFieldModifiedBytes, StandardCharsets.UTF_8)); } else { ((GenericRecord) modifiedObj).put(field.pos(), avroObjFieldModifiedBytes); } } } }
public void testMultiTableImportAsParquetFormat() throws IOException { String [] argv = getArgv(new String[]{"--as-parquetfile"}, null); runImport(new ImportAllTablesTool(), argv); Path warehousePath = new Path(this.getWarehouseDir()); int i = 0; for (String tableName : this.tableNames) { Path tablePath = new Path(warehousePath, tableName); Dataset dataset = Datasets.load("dataset:file:" + tablePath); // dequeue the expected value for this table. This // list has the same order as the tableNames list. String expectedVal = Integer.toString(i++) + "," + this.expectedStrings.get(0); this.expectedStrings.remove(0); DatasetReader<GenericRecord> reader = dataset.newReader(); try { GenericRecord record = reader.next(); String line = record.get(0) + "," + record.get(1); assertEquals("Table " + tableName + " expected a different string", expectedVal, line); assertFalse(reader.hasNext()); } finally { reader.close(); } } }
@Test public void shouldCreateAndApplyMapOfMapOfRecordDiff() throws IOException { RecordDiff diff = AvroDiff.createDiff(mapSpecificRecord7, mapSpecificRecord8, mapSpecificRecord7.getSchema()); GenericRecord modified = AvroDiff.applyDiff(mapSpecificRecord7, diff, mapSpecificRecord7.getSchema()); Assert.assertEquals(modified, mapSpecificRecord8); Assert.assertNotEquals(modified, mapSpecificRecord7); }
@Test public void shouldCreateAndApplyMapOfRecordOfRecordDiff() throws IOException { map14.put("same", recordOfRecordSpecificRecord1); map14.put("removed", recordOfRecordSpecificRecord2); map14.put("changed", recordOfRecordSpecificRecord2); map15.put("same", recordOfRecordSpecificRecord1); map15.put("added", recordOfRecordSpecificRecord4); map15.put("changed", recordOfRecordSpecificRecord3); RecordDiff diff = AvroDiff.createDiff(mapSpecificRecord9, mapSpecificRecord10, mapSpecificRecord9.getSchema()); GenericRecord modified = AvroDiff.applyDiff(mapSpecificRecord9, diff, mapSpecificRecord10.getSchema()); Assert.assertEquals(modified, mapSpecificRecord10); Assert.assertNotEquals(modified, mapSpecificRecord9); }
@Test public void shouldCreateAndApplyPrimitiveDiff() throws IOException { RecordDiff diff = AvroDiff.createDiff(primitiveSpecificRecord1, primitiveSpecificRecord2, primitiveSpecificRecord1.getSchema()); GenericRecord modifiedAvroObj = AvroDiff.applyDiff(primitiveSpecificRecord1, diff, primitiveSpecificRecord1.getSchema()); Assert.assertEquals(diff.getDiffFields().size(), 6); Assert.assertEquals(modifiedAvroObj, primitiveSpecificRecord2); Assert.assertNotEquals(modifiedAvroObj, primitiveSpecificRecord1); }