static void convertAvroToJson(InputStream inputStream, OutputStream outputStream, Schema schema) throws IOException { DatumReader<Object> reader = new GenericDatumReader<>(schema); DatumWriter<Object> writer = new GenericDatumWriter<>(schema); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null); JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, outputStream, true); Object datum = null; while (!binaryDecoder.isEnd()) { datum = reader.read(datum, binaryDecoder); writer.write(datum, jsonEncoder); jsonEncoder.flush(); } outputStream.flush(); }
public AvroFileInputStream(FileStatus status) throws IOException { pos = 0; buffer = new byte[0]; GenericDatumReader<Object> reader = new GenericDatumReader<Object>(); FileContext fc = FileContext.getFileContext(new Configuration()); fileReader = DataFileReader.openReader(new AvroFSInput(fc, status.getPath()),reader); Schema schema = fileReader.getSchema(); writer = new GenericDatumWriter<Object>(schema); output = new ByteArrayOutputStream(); JsonGenerator generator = new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8); MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter(); prettyPrinter.setRootValueSeparator(System.getProperty("line.separator")); generator.setPrettyPrinter(prettyPrinter); encoder = EncoderFactory.get().jsonEncoder(schema, generator); }
@Test public void testGenericRecord() throws IOException { final Path outputPath = new Path(File.createTempFile("avro-output-file", "generic.avro").getAbsolutePath()); final AvroOutputFormat<GenericRecord> outputFormat = new AvroOutputFormat<>(outputPath, GenericRecord.class); Schema schema = new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"user\", \"fields\": [{\"name\":\"user_name\", \"type\":\"string\"}, {\"name\":\"favorite_number\", \"type\":\"int\"}, {\"name\":\"favorite_color\", \"type\":\"string\"}]}"); outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); outputFormat.setSchema(schema); output(outputFormat, schema); GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new File(outputPath.getPath()), reader); while (dataFileReader.hasNext()) { GenericRecord record = dataFileReader.next(); assertEquals(record.get("user_name").toString(), "testUser"); assertEquals(record.get("favorite_number"), 1); assertEquals(record.get("favorite_color").toString(), "blue"); } //cleanup FileSystem fs = FileSystem.getLocalFileSystem(); fs.delete(outputPath, false); }
/** * Process singlex. * * @throws Exception the exception */ public void processSinglex() throws Exception { int base = (int) System.currentTimeMillis(); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .build(); DatumWriter<GenericRecord> datumWriterUser = new GenericDatumWriter<GenericRecord>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<GenericRecord> datumReaderUser = new GenericDatumReader<GenericRecord>( User.getClassSchema()); GenericRecord genericRecord = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null) ); System.out.println(genericRecord); System.out.println( genericRecord.get("name")); }
/** * Reads in binary Avro-encoded entities using the schema stored in the file * and prints them out. */ public static void readAvroFile(File file) throws IOException { GenericDatumReader datum = new GenericDatumReader(); DataFileReader reader = new DataFileReader(file, datum); GenericData.Record record = new GenericData.Record(reader.getSchema()); while (reader.hasNext()) { reader.next(record); System.out.println("Name " + record.get("name") + " on " + record.get("Meetup_date") + " attending " + record.get("going") + " organized by " + record.get("organizer") + " on " + record.get("topics")); } reader.close(); }
private void checkNumeric(String type, Object value) throws Exception { String def = "{\"type\":\"record\",\"name\":\"X\",\"fields\":" +"[{\"type\":\""+type+"\",\"name\":\"n\"}]}"; Schema schema = Schema.parse(def); DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); String[] records = {"{\"n\":1}", "{\"n\":1.0}"}; for (String record : records) { Decoder decoder = new ExtendedJsonDecoder(schema, record); GenericRecord r = reader.read(null, decoder); Assert.assertEquals(value, r.get("n")); } }
private void initialize() throws IOException, NoSuchAlgorithmException { SeekableResettableInputBridge in = new SeekableResettableInputBridge(ris); long pos = in.tell(); in.seek(0L); fileReader = new DataFileReader<GenericRecord>(in, new GenericDatumReader<GenericRecord>()); fileReader.sync(pos); schema = fileReader.getSchema(); datumWriter = new GenericDatumWriter(schema); out = new ByteArrayOutputStream(); encoder = EncoderFactory.get().binaryEncoder(out, encoder); schemaHash = SchemaNormalization.parsingFingerprint("CRC-64-AVRO", schema); schemaHashString = Hex.encodeHexString(schemaHash); }
public void validateAvroFile(File file) throws IOException { // read the events back using GenericRecord DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(); DataFileReader<GenericRecord> fileReader = new DataFileReader<GenericRecord>(file, reader); GenericRecord record = new GenericData.Record(fileReader.getSchema()); int numEvents = 0; while (fileReader.hasNext()) { fileReader.next(record); ByteBuffer body = (ByteBuffer) record.get("body"); CharsetDecoder decoder = Charsets.UTF_8.newDecoder(); String bodyStr = decoder.decode(body).toString(); System.out.println(bodyStr); numEvents++; } fileReader.close(); Assert.assertEquals("Should have found a total of 3 events", 3, numEvents); }
static void convertJsonToAvro(InputStream inputStream, OutputStream outputStream, Schema schema) throws IOException { DatumReader<Object> reader = new GenericDatumReader<>(schema); DatumWriter<Object> writer = new GenericDatumWriter<>(schema); Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null); JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(schema, inputStream); Object datum = null; while (true) { try { datum = reader.read(datum, jsonDecoder); } catch (EOFException eofException) { break; } writer.write(datum, binaryEncoder); binaryEncoder.flush(); } outputStream.flush(); }
@Test public void testCompressFile() throws Exception { String avroCodec = "snappy"; localProps.put(StorageSinkConnectorConfig.AVRO_CODEC_CONFIG, avroCodec); setUp(); task = new S3SinkTask(connectorConfig, context, storage, partitioner, format, SYSTEM_TIME); List<SinkRecord> sinkRecords = createRecords(7); // Perform write task.put(sinkRecords); task.close(context.assignment()); task.stop(); List<S3ObjectSummary> summaries = listObjects(S3_TEST_BUCKET_NAME, "/", s3); for(S3ObjectSummary summary: summaries){ InputStream in = s3.getObject(summary.getBucketName(), summary.getKey()).getObjectContent(); DatumReader<Object> reader = new GenericDatumReader<>(); DataFileStream<Object> streamReader = new DataFileStream<>(in, reader); // make sure that produced Avro file has proper codec set Assert.assertEquals(avroCodec, streamReader.getMetaString(StorageSinkConnectorConfig.AVRO_CODEC_CONFIG)); streamReader.close(); } long[] validOffsets = {0, 3, 6}; verify(sinkRecords, validOffsets); }
/** * Processes an Avro Blob containing a single message and with no embedded * schema. This is the pattern when Avro objects are passed over messaging * infrastructure such as Apache Kafka. * * @param avroMessage * The Blob that holds the single Avro message object * @param avroKey * The Blob that holds the single Avro key object (if passed) * @param outStream * The stream to which the JSON string must be submitted * @param outTuple * The tuple holding the JSON string * @param messageSchema * The schema of the Avro messsage object * @param keySchema * The schema of the Avro key object * @throws Exception */ private void processAvroMessage(Blob avroMessage, Blob avroKey, StreamingOutput<OutputTuple> outStream, OutputTuple outTuple, Schema messageSchema, Schema keySchema) throws Exception { // Deserialize message GenericDatumReader<GenericRecord> consumer = new GenericDatumReader<GenericRecord>(messageSchema); ByteArrayInputStream consumedByteArray = new ByteArrayInputStream(avroMessage.getData()); Decoder consumedDecoder = DecoderFactory.get().binaryDecoder(consumedByteArray, null); GenericRecord consumedDatum = consumer.read(null, consumedDecoder); if (LOGGER.isTraceEnabled()) LOGGER.log(TraceLevel.TRACE, "JSON representation of Avro message: " + consumedDatum.toString()); outTuple.setString(outputJsonMessage, consumedDatum.toString()); // Deserialize key (if specified) if (avroKey != null) { consumer = new GenericDatumReader<GenericRecord>(keySchema); consumedByteArray = new ByteArrayInputStream(avroKey.getData()); consumedDecoder = DecoderFactory.get().binaryDecoder(consumedByteArray, null); consumedDatum = consumer.read(null, consumedDecoder); if (LOGGER.isTraceEnabled()) LOGGER.log(TraceLevel.TRACE, "JSON representation of Avro key: " + consumedDatum.toString()); if (outputJsonKey != null) outTuple.setString(outputJsonKey, consumedDatum.toString()); } // Submit new tuple to output port 0 outStream.submit(outTuple); }
/** * Processes a blob which contains one or more Avro messages and has the * schema embedded. This is the pattern when Avro objects are read from a * file (either local file system or HDFS). Every Avro object in the blob is * converted to JSON and then submitted to the output port. * * @param avroMessage * The Blob that holds one or more Avro objects and the schema * @param outStream * The stream to which the JSON string must be submitted * @param outTuple * The tuple holding the JSON string * @throws Exception */ private void processAvroMessage(Blob avroMessage, StreamingOutput<OutputTuple> outStream, OutputTuple outTuple) throws Exception { ByteArrayInputStream is = new ByteArrayInputStream(avroMessage.getData()); DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(); DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(is, reader); GenericRecord consumedDatum = null; while (dataFileReader.hasNext()) { consumedDatum = dataFileReader.next(consumedDatum); if (LOGGER.isTraceEnabled()) LOGGER.log(TraceLevel.TRACE, "JSON representation of Avro message: " + consumedDatum.toString()); // Submit new tuple to output port 0 outTuple.setString(outputJsonMessage, consumedDatum.toString()); outStream.submit(outTuple); } is.close(); dataFileReader.close(); }
@Override public void configure(Config config) { fieldNames = config.getStringList(FIELD_NAMES_CONFIG_NAME); fieldTypes = config.getStringList(FIELD_TYPES_CONFIG_NAME); avroSchema = schemaFor(fieldNames, fieldTypes); reader = new GenericDatumReader<GenericRecord>(avroSchema); doesAppendRaw = TranslatorUtils.doesAppendRaw(config); if (doesAppendRaw) { fieldNames.add(TranslatorUtils.getAppendRawKeyFieldName(config)); fieldTypes.add("binary"); fieldNames.add(TranslatorUtils.getAppendRawValueFieldName(config)); fieldTypes.add("binary"); } schema = RowUtils.structTypeFor(fieldNames, fieldTypes); }
AvroBlock( byte[] data, long numRecords, Mode<T> mode, String writerSchemaString, String codec) throws IOException { this.mode = mode; this.numRecords = numRecords; checkNotNull(writerSchemaString, "writerSchemaString"); Schema writerSchema = internOrParseSchemaString(writerSchemaString); Schema readerSchema = internOrParseSchemaString( MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString)); this.reader = (mode.type == GenericRecord.class) ? new GenericDatumReader<T>(writerSchema, readerSchema) : new ReflectDatumReader<T>(writerSchema, readerSchema); this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null); }
@Test @SuppressWarnings("unchecked") @Category(NeedsRunner.class) public void testCompressedWriteAndReadASingleFile() throws Throwable { List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); writePipeline.apply(Create.of(values)) .apply( AvroIO.write(GenericClass.class) .to(outputFile.getAbsolutePath()) .withoutSharding() .withCodec(CodecFactory.deflateCodec(9))); writePipeline.run(); PAssert.that( readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()))) .containsInAnyOrder(values); readPipeline.run(); try (DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader())) { assertEquals("deflate", dataFileStream.getMetaString("avro.codec")); } }
@Test @SuppressWarnings("unchecked") @Category(NeedsRunner.class) public void testWriteThenReadASingleFileWithNullCodec() throws Throwable { List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); writePipeline.apply(Create.of(values)) .apply( AvroIO.write(GenericClass.class) .to(outputFile.getAbsolutePath()) .withoutSharding() .withCodec(CodecFactory.nullCodec())); writePipeline.run(); PAssert.that( readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()))) .containsInAnyOrder(values); readPipeline.run(); try (DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader())) { assertEquals("null", dataFileStream.getMetaString("avro.codec")); } }
private DataFileReader<E> initReader(FileInputSplit split) throws IOException { DatumReader<E> datumReader; if (org.apache.avro.generic.GenericRecord.class == avroValueType) { datumReader = new GenericDatumReader<E>(); } else { datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType) ? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType); } if (LOG.isInfoEnabled()) { LOG.info("Opening split {}", split); } SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen()); DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader); if (LOG.isDebugEnabled()) { LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema()); } end = split.getStart() + split.getLength(); recordsReadSinceLastSync = 0; return dataFileReader; }
/** * Process singlex. * * @throws Exception the exception */ public void processSinglex() throws Exception { int base = (int) System.currentTimeMillis(); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .build(); DatumWriter<GenericRecord> datumWriterUser = new GenericDatumWriter<GenericRecord>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<GenericRecord> datumReaderUser = new GenericDatumReader<GenericRecord>(User.getClassSchema()); GenericRecord genericRecord = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null)); System.out.println(genericRecord); System.out.println(genericRecord.get("name")); }
/** * Parses the avro file and sends dimension rows to a consumer. * * @param dimension The dimension object used to configure the dimension * @param avroFilePath The path of the AVRO data file (.avro) * @param consumer A consumer to process records from the avro file * * @throws IllegalArgumentException thrown if JSON object `fields` is not present */ public void parseAvroFileDimensionRows(Dimension dimension, String avroFilePath, Consumer<DimensionRow> consumer) throws IllegalArgumentException { GenericDatumReader datumReader = new GenericDatumReader(); // Creates an AVRO DataFileReader object that reads the AVRO data file one record at a time try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new File(avroFilePath), datumReader)) { streamDimensionRows(dataFileReader, dimension).forEach(consumer); } catch (IOException e) { String msg = String.format("Unable to process the file, at the location %s", avroFilePath); LOG.error(msg, e); throw new IllegalArgumentException(msg, e); } }
/** * Parses the avro file and returns the dimension rows. * * @param dimension The dimension object used to configure the dimension * @param avroFilePath The path of the AVRO data file (.avro) * * @return A set of dimension rows * * @throws IllegalArgumentException thrown if JSON object `fields` is not present */ public Set<DimensionRow> parseAvroFileDimensionRows(Dimension dimension, String avroFilePath) throws IllegalArgumentException { GenericDatumReader datumReader = new GenericDatumReader(); // Creates an AVRO DataFileReader object that reads the AVRO data file one record at a time try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new File(avroFilePath), datumReader)) { return streamDimensionRows(dataFileReader, dimension).collect(Collectors.toSet()); } catch (IOException e) { String msg = String.format("Unable to process the file, at the location %s", avroFilePath); LOG.error(msg, e); throw new IllegalArgumentException(msg, e); } }
/** * Reads in binary Avro-encoded entities using a schema that is different * from the writer's schema. * */ public static void readWithDifferentSchema(File file, Schema newSchema) throws IOException { GenericDatumReader datum = new GenericDatumReader(newSchema); DataFileReader reader = new DataFileReader(file, datum); GenericData.Record record = new GenericData.Record(newSchema); while (reader.hasNext()) { reader.next(record); System.out.println("Name " + record.get("name") + " on " + record.get("Meetup_date") + " attending " + record.get("attendance") + " organized by " + record.get("organizer") + " at " + record.get("location")); } reader.close(); }
/** * Deserializes the bytes as an array of Generic containers. * * <p>The bytes include a standard Avro header that contains a magic byte, the * record's Avro schema (and so on), followed by the byte representation of the record. * * <p>Implementation detail: This method uses Avro's {@code DataFileWriter}. * @schema Schema associated with this container * @return A Generic Container class */ public GenericContainer[] deserialize(Schema schema, byte[] container) throws IOException { GenericContainer ret = null; List<GenericContainer> retList = new ArrayList<>(); if (container != null) { DatumReader<GenericContainer> datumReader = new GenericDatumReader<>(schema); ByteArrayInputStream in = new ByteArrayInputStream(container); DataFileStream<GenericContainer> reader = new DataFileStream<GenericContainer>(in, datumReader); while (reader.hasNext()) { ret = reader.next(ret); retList.add(ret); } return retList.toArray(new GenericContainer[retList.size()]); } else { return null; } }
/** * Deserializes the bytes as an array of Generic containers. Assumes schema is * embedded with bytes. * * <p>The bytes include a standard Avro header that contains a magic byte, the * record's Avro schema (and so on), followed by the byte representation of the record. * * <p>Implementation detail: This method uses Avro's {@code DataFileWriter}. * @return A Generic Container class */ public GenericContainer[] deserialize(byte[] container) throws IOException { GenericContainer ret = null; List<GenericContainer> retList = new ArrayList<>(); if (container != null) { DatumReader<GenericContainer> datumReader = new GenericDatumReader<>(); ByteArrayInputStream in = new ByteArrayInputStream(container); DataFileStream<GenericContainer> reader = new DataFileStream<GenericContainer>(in, datumReader); while (reader.hasNext()) { ret = reader.next(ret); retList.add(ret); } return retList.toArray(new GenericContainer[retList.size()]); } else { return null; } }
/** * Get the latest avro schema for a directory * @param directory the input dir that contains avro files * @param conf configuration * @param latest true to return latest schema, false to return oldest schema * @return the latest/oldest schema in the directory * @throws IOException */ public static Schema getDirectorySchema(Path directory, Configuration conf, boolean latest) throws IOException { Schema schema = null; Closer closer = Closer.create(); try { List<FileStatus> files = getDirectorySchemaHelper(directory, FileSystem.get(conf)); if (files == null || files.size() == 0) { LOG.warn("There is no previous avro file in the directory: " + directory); } else { FileStatus file = latest ? files.get(0) : files.get(files.size() - 1); LOG.info("Path to get the avro schema: " + file); FsInput fi = new FsInput(file.getPath(), conf); GenericDatumReader<GenericRecord> genReader = new GenericDatumReader<GenericRecord>(); schema = closer.register(new DataFileReader<GenericRecord>(fi, genReader)).getSchema(); } } catch (IOException ioe) { throw new IOException("Cannot get the schema for directory " + directory, ioe); } catch (Throwable t) { throw closer.rethrow(t); } finally { closer.close(); } return schema; }
public DataFileReader<GenericRecord> getAvroFile(String file) throws FileBasedHelperException { try { if (!fs.exists(new Path(file))) { LOGGER.warn(file + " does not exist."); return null; } if (state.getPropAsBoolean(ConfigurationKeys.SHOULD_FS_PROXY_AS_USER, ConfigurationKeys.DEFAULT_SHOULD_FS_PROXY_AS_USER)) { return new DataFileReader<GenericRecord>(new ProxyFsInput(new Path(file), this.fs), new GenericDatumReader<GenericRecord>()); } else { return new DataFileReader<GenericRecord>(new FsInput(new Path(file), fs.getConf()), new GenericDatumReader<GenericRecord>()); } } catch (IOException e) { throw new FileBasedHelperException("Failed to open avro file " + file + " due to error " + e.getMessage(), e); } }
public TestExtractor(WorkUnitState workUnitState) { //super(workUnitState); Schema schema = new Schema.Parser().parse(AVRO_SCHEMA); Path sourceFile = new Path(workUnitState.getWorkunit().getProp(SOURCE_FILE_KEY)); LOG.info("Reading from source file " + sourceFile); DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema); try { FileSystem fs = FileSystem .get(URI.create(workUnitState.getProp(ConfigurationKeys.FS_URI_KEY, ConfigurationKeys.LOCAL_FS_URI)), new Configuration()); fs.makeQualified(sourceFile); this.dataFileReader = new DataFileReader<GenericRecord>(new FsInput(sourceFile, new Configuration()), datumReader); } catch (IOException ioe) { LOG.error("Failed to read the source file " + sourceFile, ioe); } }
/** * Deserializes the bytes as an array of Generic containers. * * <p>The bytes include a standard Avro header that contains a magic byte, the * record's Avro schema (and so on), followed by the byte representation of the record. * * <p>Implementation detail: This method uses Avro's {@code DataFileWriter}. * * @return A Generic Container class * @schema Schema associated with this container */ public GenericContainer[] deserialize(Schema schema, byte[] container) throws IOException { GenericContainer ret = null; List<GenericContainer> retList = new ArrayList<>(); if (container != null) { DatumReader<GenericContainer> datumReader = new GenericDatumReader<>(schema); ByteArrayInputStream in = new ByteArrayInputStream(container); DataFileStream<GenericContainer> reader = new DataFileStream<GenericContainer>( in, datumReader ); while (reader.hasNext()) { ret = reader.next(ret); retList.add(ret); } return retList.toArray(new GenericContainer[retList.size()]); } else { return null; } }
/** * Deserializes the bytes as an array of Generic containers. Assumes schema is * embedded with bytes. * * <p>The bytes include a standard Avro header that contains a magic byte, the * record's Avro schema (and so on), followed by the byte representation of the record. * * <p>Implementation detail: This method uses Avro's {@code DataFileWriter}. * * @return A Generic Container class */ public GenericContainer[] deserialize(byte[] container) throws IOException { GenericContainer ret = null; List<GenericContainer> retList = new ArrayList<>(); if (container != null) { DatumReader<GenericContainer> datumReader = new GenericDatumReader<>(); ByteArrayInputStream in = new ByteArrayInputStream(container); DataFileStream<GenericContainer> reader = new DataFileStream<GenericContainer>( in, datumReader ); while (reader.hasNext()) { ret = reader.next(ret); retList.add(ret); } return retList.toArray(new GenericContainer[retList.size()]); } else { return null; } }
public AvroFileInputStream(FileStatus status) throws IOException { pos = 0; buffer = new byte[0]; GenericDatumReader<Object> reader = new GenericDatumReader<Object>(); fileReader = DataFileReader.openReader(new File(status.getPath().toUri()), reader); Schema schema = fileReader.getSchema(); writer = new GenericDatumWriter<Object>(schema); output = new ByteArrayOutputStream(); JsonGenerator generator = new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8); MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter(); prettyPrinter.setRootValueSeparator(System.getProperty("line.separator")); generator.setPrettyPrinter(prettyPrinter); encoder = EncoderFactory.get().jsonEncoder(schema, generator); }
@Override public SampleDataRecord getSampleData(Path targetFilePath) throws IOException { SeekableInput sin = new FsInput(targetFilePath, fs.getConf()); DataFileReader<GenericRecord> reader = new DataFileReader<GenericRecord>(sin, new GenericDatumReader<GenericRecord>()); Iterator<GenericRecord> iter = reader.iterator(); int count = 0; List<Object> list = new ArrayList<Object>(); //JSONArray list = new JSONArray(); while (iter.hasNext() && count < 10) { // TODO handle out of memory error list.add(iter.next().toString().replaceAll("[\\n\\r\\p{C}]", "").replaceAll("\"", "\\\"")); count++; } SampleDataRecord sampleDataRecord = new SampleDataRecord(targetFilePath.toUri().getPath(), list); return sampleDataRecord; }
/** * * @param file * @return * @throws IOException */ public Schema getSchema(File file) throws IOException { Schema schema = null; FileReader<IndexedRecord> fileReader = null; try { DatumReader<IndexedRecord> reader = new GenericDatumReader<>(); fileReader = DataFileReader.openReader(file, reader); schema = fileReader.getSchema(); } finally { if (fileReader != null) { fileReader.close(); } } return schema; }
@Override public void init() throws Exception { final File file = new File(_fileName); if (!file.exists()) { throw new FileNotFoundException("File is not existed!"); } //_schemaExtractor = FieldExtractorFactory.get(_dataReaderSpec); if (_fileName.endsWith("gz")) { _dataStream = new DataFileStream<GenericRecord>(new GZIPInputStream(new FileInputStream(file)), new GenericDatumReader<GenericRecord>()); } else { _dataStream = new DataFileStream<GenericRecord>(new FileInputStream(file), new GenericDatumReader<GenericRecord>()); } updateSchema(_schemaExtractor.getSchema()); }
private static Map<String, Number> computeAggregateFromRawData(File avroFile, Map<String, String> fixedValues) throws Exception { long m0Aggregate = 0; double m1Aggregate = 0.0; DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(); DataFileReader<GenericRecord> fileReader = new DataFileReader<GenericRecord>(avroFile, reader); GenericRecord record = null; while (fileReader.hasNext()) { record = fileReader.next(record); boolean matches = true; for (Map.Entry<String, String> entry : fixedValues.entrySet()) { String value = record.get(entry.getKey()).toString(); if (!value.equals(entry.getValue())) { matches = false; } } if (matches) { m0Aggregate += (Long) record.get("M0"); m1Aggregate += (Double) record.get("M1"); } } return ImmutableMap.of("M0", m0Aggregate, "M1", m1Aggregate); }