static void convertAvroToJson(InputStream inputStream, OutputStream outputStream, Schema schema) throws IOException { DatumReader<Object> reader = new GenericDatumReader<>(schema); DatumWriter<Object> writer = new GenericDatumWriter<>(schema); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null); JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, outputStream, true); Object datum = null; while (!binaryDecoder.isEnd()) { datum = reader.read(datum, binaryDecoder); writer.write(datum, jsonEncoder); jsonEncoder.flush(); } outputStream.flush(); }
public AvroFileInputStream(FileStatus status) throws IOException { pos = 0; buffer = new byte[0]; GenericDatumReader<Object> reader = new GenericDatumReader<Object>(); FileContext fc = FileContext.getFileContext(new Configuration()); fileReader = DataFileReader.openReader(new AvroFSInput(fc, status.getPath()),reader); Schema schema = fileReader.getSchema(); writer = new GenericDatumWriter<Object>(schema); output = new ByteArrayOutputStream(); JsonGenerator generator = new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8); MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter(); prettyPrinter.setRootValueSeparator(System.getProperty("line.separator")); generator.setPrettyPrinter(prettyPrinter); encoder = EncoderFactory.get().jsonEncoder(schema, generator); }
/** * Process singlex. * * @throws Exception the exception */ public void processSinglex() throws Exception { int base = (int) System.currentTimeMillis(); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .build(); DatumWriter<GenericRecord> datumWriterUser = new GenericDatumWriter<GenericRecord>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<GenericRecord> datumReaderUser = new GenericDatumReader<GenericRecord>( User.getClassSchema()); GenericRecord genericRecord = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null) ); System.out.println(genericRecord); System.out.println( genericRecord.get("name")); }
public File createAvroFile(String fileName, long recordCount, File parent) throws Exception { final File target = FileTestUtil.file(testClass, fileName, parent); try (DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) { if (codecFactory != null) { writer.setCodec(codecFactory); } writer.create(schema, target); for (long i = 0; i < recordCount; i++) { writer.append(recordCreatorFn.apply(schema, i)); } } return target; }
@Override public byte[] serialize(String topic, T payload) { try { byte[] result = null; if (payload != null) { LOGGER.debug("data='{}'", payload); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(payload.getSchema()); datumWriter.write(payload, binaryEncoder); binaryEncoder.flush(); byteArrayOutputStream.close(); result = byteArrayOutputStream.toByteArray(); LOGGER.debug("serialized data='{}'", DatatypeConverter.printHexBinary(result)); } return result; } catch (IOException ex) { throw new SerializationException("Can't serialize payload='" + payload + "' for topic='" + topic + "'", ex); } }
private static Path createDataFile() throws IOException { File avroFile = File.createTempFile("test-", "." + FILE_EXTENSION); DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema); try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer)) { dataFileWriter.setFlushOnEveryBlock(true); dataFileWriter.setSyncInterval(32); dataFileWriter.create(schema, avroFile); IntStream.range(0, NUM_RECORDS).forEach(index -> { GenericRecord datum = new GenericData.Record(schema); datum.put(FIELD_INDEX, index); datum.put(FIELD_NAME, String.format("%d_name_%s", index, UUID.randomUUID())); datum.put(FIELD_SURNAME, String.format("%d_surname_%s", index, UUID.randomUUID())); try { OFFSETS_BY_INDEX.put(index, dataFileWriter.sync() - 16L); dataFileWriter.append(datum); } catch (IOException ioe) { throw new RuntimeException(ioe); } }); } Path path = new Path(new Path(fsUri), avroFile.getName()); fs.moveFromLocalFile(new Path(avroFile.getAbsolutePath()), path); return path; }
private void initialize() throws IOException, NoSuchAlgorithmException { SeekableResettableInputBridge in = new SeekableResettableInputBridge(ris); long pos = in.tell(); in.seek(0L); fileReader = new DataFileReader<GenericRecord>(in, new GenericDatumReader<GenericRecord>()); fileReader.sync(pos); schema = fileReader.getSchema(); datumWriter = new GenericDatumWriter(schema); out = new ByteArrayOutputStream(); encoder = EncoderFactory.get().binaryEncoder(out, encoder); schemaHash = SchemaNormalization.parsingFingerprint("CRC-64-AVRO", schema); schemaHashString = Hex.encodeHexString(schemaHash); }
@Override public void configure(Context context) { String consumerKey = context.getString("consumerKey"); String consumerSecret = context.getString("consumerSecret"); String accessToken = context.getString("accessToken"); String accessTokenSecret = context.getString("accessTokenSecret"); twitterStream = new TwitterStreamFactory().getInstance(); twitterStream.setOAuthConsumer(consumerKey, consumerSecret); twitterStream.setOAuthAccessToken(new AccessToken(accessToken, accessTokenSecret)); twitterStream.addListener(this); avroSchema = createAvroSchema(); dataFileWriter = new DataFileWriter<GenericRecord>( new GenericDatumWriter<GenericRecord>(avroSchema)); maxBatchSize = context.getInteger("maxBatchSize", maxBatchSize); maxBatchDurationMillis = context.getInteger("maxBatchDurationMillis", maxBatchDurationMillis); }
/** * tests Avro Serializer */ @Test public void testSerializer() throws Exception { Context context = new Context(); String schemaFile = getClass().getResource("/schema.avsc").getFile(); context.put(ES_AVRO_SCHEMA_FILE, schemaFile); avroSerializer.configure(context); Schema schema = new Schema.Parser().parse(new File(schemaFile)); GenericRecord user = generateGenericRecord(schema); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Encoder encoder = new EncoderFactory().binaryEncoder(outputStream, null); datumWriter.write(user, encoder); encoder.flush(); Event event = EventBuilder.withBody(outputStream.toByteArray()); XContentBuilder expected = generateContentBuilder(); XContentBuilder actual = avroSerializer.serialize(event); JsonParser parser = new JsonParser(); assertEquals(parser.parse(expected.string()), parser.parse(actual.string())); }
/** * Serialize the record to prepare for publishing. * * @param record the GenericRecord * @param schema the Avro Schema * @param ggAvroSchema the internal representation of the Avro schema * @return the serialized record * @throws IOException if there is a problem */ private byte[] serializeRecord(GenericRecord record, Schema schema, @SuppressWarnings("unused") AvroSchema ggAvroSchema) throws IOException { byte[] rval; BinaryEncoder encoder = null; // serialize the record into a byte array ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); encoder = EncoderFactory.get().directBinaryEncoder(out, encoder); writer.write(record, encoder); encoder.flush(); rval = out.toByteArray(); //out.close(); // noop in the Apache version, so not bothering return rval; }
static void convertJsonToAvro(InputStream inputStream, OutputStream outputStream, Schema schema) throws IOException { DatumReader<Object> reader = new GenericDatumReader<>(schema); DatumWriter<Object> writer = new GenericDatumWriter<>(schema); Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null); JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(schema, inputStream); Object datum = null; while (true) { try { datum = reader.read(datum, jsonDecoder); } catch (EOFException eofException) { break; } writer.write(datum, binaryEncoder); binaryEncoder.flush(); } outputStream.flush(); }
public static byte[] putRecords(Collection<SinkRecord> records, AvroData avroData) throws IOException { final DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>()); ByteArrayOutputStream out = new ByteArrayOutputStream(); Schema schema = null; for (SinkRecord record : records) { if (schema == null) { schema = record.valueSchema(); org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema); writer.create(avroSchema, out); } Object value = avroData.fromConnectData(schema, record.value()); // AvroData wraps primitive types so their schema can be included. We need to unwrap // NonRecordContainers to just their value to properly handle these types if (value instanceof NonRecordContainer) { value = ((NonRecordContainer) value).getValue(); } writer.append(value); } writer.flush(); return out.toByteArray(); }
private void writeRowsHelper(List<TableRow> rows, Schema avroSchema, String destinationPattern, int shard) throws IOException { String filename = destinationPattern.replace("*", String.format("%012d", shard)); try (WritableByteChannel channel = FileSystems.create( FileSystems.matchNewResource(filename, false /* isDirectory */), MimeTypes.BINARY); DataFileWriter<GenericRecord> tableRowWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(avroSchema)) .create(avroSchema, Channels.newOutputStream(channel))) { for (Map<String, Object> record : rows) { GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(avroSchema); for (Map.Entry<String, Object> field : record.entrySet()) { genericRecordBuilder.set(field.getKey(), field.getValue()); } tableRowWriter.append(genericRecordBuilder.build()); } } catch (IOException e) { throw new IllegalStateException( String.format("Could not create destination for extract job %s", filename), e); } }
AvroKeyValueWriter(Schema keySchema, Schema valueSchema, CodecFactory compressionCodec, OutputStream outputStream, int syncInterval) throws IOException { // Create the generic record schema for the key/value pair. mKeyValuePairSchema = AvroKeyValue .getSchema(keySchema, valueSchema); // Create an Avro container file and a writer to it. DatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>( mKeyValuePairSchema); mAvroFileWriter = new DataFileWriter<GenericRecord>( genericDatumWriter); mAvroFileWriter.setCodec(compressionCodec); mAvroFileWriter.setSyncInterval(syncInterval); mAvroFileWriter.create(mKeyValuePairSchema, outputStream); // Create a reusable output record. mOutputRecord = new AvroKeyValue<Object, Object>( new GenericData.Record(mKeyValuePairSchema)); }
/** * Process singlex. * * @throws Exception the exception */ public void processSinglex() throws Exception { int base = (int) System.currentTimeMillis(); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .build(); DatumWriter<GenericRecord> datumWriterUser = new GenericDatumWriter<GenericRecord>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<GenericRecord> datumReaderUser = new GenericDatumReader<GenericRecord>(User.getClassSchema()); GenericRecord genericRecord = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null)); System.out.println(genericRecord); System.out.println(genericRecord.get("name")); }
public static void writeToAvro(File inputFile, OutputStream outputStream) throws IOException { DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>( new GenericDatumWriter<GenericRecord>()); writer.setCodec(CodecFactory.snappyCodec()); writer.create(SCHEMA, outputStream); for (Stock stock : AvroStockUtils.fromCsvFile(inputFile)) { AvroKeyValue<CharSequence, Stock> record = new AvroKeyValue<CharSequence, Stock>(new GenericData.Record(SCHEMA)); record.setKey(stock.getSymbol()); record.setValue(stock); writer.append(record.get()); } IOUtils.closeStream(writer); IOUtils.closeStream(outputStream); }
public static void writeToAvro(File srcPath, OutputStream outputStream) throws IOException { DataFileWriter<Object> writer = new DataFileWriter<Object>( new GenericDatumWriter<Object>()) .setSyncInterval(100); //<co id="ch02_smallfilewrite_comment2"/> writer.setCodec(CodecFactory.snappyCodec()); //<co id="ch02_smallfilewrite_comment3"/> writer.create(SCHEMA, outputStream); //<co id="ch02_smallfilewrite_comment4"/> for (Object obj : FileUtils.listFiles(srcPath, null, false)) { File file = (File) obj; String filename = file.getAbsolutePath(); byte content[] = FileUtils.readFileToByteArray(file); GenericRecord record = new GenericData.Record(SCHEMA); //<co id="ch02_smallfilewrite_comment5"/> record.put(FIELD_FILENAME, filename); //<co id="ch02_smallfilewrite_comment6"/> record.put(FIELD_CONTENTS, ByteBuffer.wrap(content)); //<co id="ch02_smallfilewrite_comment7"/> writer.append(record); //<co id="ch02_smallfilewrite_comment8"/> System.out.println( file.getAbsolutePath() + ": " + DigestUtils.md5Hex(content)); } IOUtils.cleanup(null, writer); IOUtils.cleanup(null, outputStream); }
/** * Convert a GenericRecord to a byte array. */ public static byte[] recordToByteArray(GenericRecord record) throws IOException { Closer closer = Closer.create(); try { ByteArrayOutputStream out = closer.register(new ByteArrayOutputStream()); Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema()); writer.write(record, encoder); byte[] byteArray = out.toByteArray(); return byteArray; } catch (Throwable t) { throw closer.rethrow(t); } finally { closer.close(); } }
public AvroHdfsDataWriter(State properties, String fileName, Schema schema, int numBranches, int branchId) throws IOException { super(properties, fileName, numBranches, branchId); CodecFactory codecFactory = WriterUtils.getCodecFactory( Optional.fromNullable(properties.getProp(ForkOperatorUtils .getPropertyNameForBranch(ConfigurationKeys.WRITER_CODEC_TYPE, numBranches, branchId))), Optional.fromNullable(properties.getProp(ForkOperatorUtils .getPropertyNameForBranch(ConfigurationKeys.WRITER_DEFLATE_LEVEL, numBranches, branchId)))); this.schema = schema; this.stagingFileOutputStream = createStagingFileOutputStream(); this.datumWriter = new GenericDatumWriter<GenericRecord>(); this.writer = this.closer.register(createDataFileWriter(codecFactory)); }
@Override public byte[] objectToByteBuffer(Object o) throws IOException{ ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); if (o instanceof GenericContainer) { oos.write(RECORD); GenericContainer container = (GenericContainer) o; oos.writeObject((container.getSchema().getFullName())); DatumWriter<GenericContainer> datumWriter = new GenericDatumWriter<>(container.getSchema()); BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(baos, null); datumWriter.write(container,encoder); encoder.flush(); } else if (o instanceof Schema) { oos.write(SCHEMA); oos.writeObject(o.toString()); } else { oos.write(OTHER); oos.writeObject(o); } return baos.toByteArray(); }
public AvroFileInputStream(FileStatus status) throws IOException { pos = 0; buffer = new byte[0]; GenericDatumReader<Object> reader = new GenericDatumReader<Object>(); fileReader = DataFileReader.openReader(new File(status.getPath().toUri()), reader); Schema schema = fileReader.getSchema(); writer = new GenericDatumWriter<Object>(schema); output = new ByteArrayOutputStream(); JsonGenerator generator = new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8); MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter(); prettyPrinter.setRootValueSeparator(System.getProperty("line.separator")); generator.setPrettyPrinter(prettyPrinter); encoder = EncoderFactory.get().jsonEncoder(schema, generator); }
public byte[] compressURI(final URI uri) { Preconditions.checkNotNull(uri); try { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null); final DatumWriter<Object> writer = new GenericDatumWriter<Object>( AvroSchemas.COMPRESSED_IDENTIFIER); this.dictionary.keyFor(uri); // ensure a compressed version of URI is available final Object generic = encodeIdentifier(uri); writer.write(generic, encoder); return stream.toByteArray(); } catch (final IOException ex) { throw new Error("Unexpected exception (!): " + ex.getMessage(), ex); } }
@Test public void test() throws Throwable { System.out.println(AvroSchemas.RECORD.toString()); final Record resource = HBaseTestUtils.getMockResource(); //final Object generic = AvroSerialization.toGenericData(resource); final Object generic = null; final ByteArrayOutputStream jsonStream = new ByteArrayOutputStream(); final ByteArrayOutputStream binaryStream = new ByteArrayOutputStream(); final Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(AvroSchemas.RECORD, jsonStream); final Encoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(binaryStream, null); final DatumWriter<Object> writer = new GenericDatumWriter<Object>(AvroSchemas.RECORD); writer.write(generic, jsonEncoder); writer.write(generic, binaryEncoder); binaryEncoder.flush(); jsonEncoder.flush(); final byte[] bytes = binaryStream.toByteArray(); final String json = new String(jsonStream.toByteArray(), Charsets.UTF_8); System.out.println(bytes.length + " bytes: " + BaseEncoding.base16().encode(bytes)); System.out.println("JSON:\n" + json); }
public byte[] compressURI(final URI uri) { Preconditions.checkNotNull(uri); try { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null); final DatumWriter<Object> writer = new GenericDatumWriter<Object>( Schemas.COMPRESSED_IDENTIFIER); this.dictionary.keyFor(uri); // ensure a compressed version of URI is available final Object generic = encodeIdentifier(uri); writer.write(generic, encoder); return stream.toByteArray(); } catch (final IOException ex) { throw new Error("Unexpected exception (!): " + ex.getMessage(), ex); } }
private byte[] avroTestData(List<Schema.Field> fields, List<Map<String, Object>> records) throws IOException { Schema schema = Schema.createRecord("testdata", null, null, false); schema.setFields(fields); ByteArrayOutputStream out = new ByteArrayOutputStream(); GenericDatumWriter<GenericData.Record> datum = new GenericDatumWriter<>(schema); DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(datum); writer.create(schema, out); for (Map<String, Object> record : records) { GenericData.Record r = new GenericData.Record(schema); for (Map.Entry<String, Object> item : record.entrySet()) { r.put(item.getKey(), item.getValue()); } writer.append(r); } writer.close(); return out.toByteArray(); }
public byte[] toBytes() throws IOException { ByteArrayOutputStream dataStream = new ByteArrayOutputStream(); Schema schema = record.getSchema(); if (WRITER == null) { WRITER = new GenericDatumWriter<GenericRecord>(schema); } binaryEncoder = factory.directBinaryEncoder(dataStream, binaryEncoder); WRITER.write(record, binaryEncoder); // serialize to bytes, we also need to know the schema name when we // process this record on the reducer since reducer gets the record from // multiple mappers. So we first write the schema/source name and then // write the serialized bytes ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(out); System.out.println("schema name:" + schemaName + " length:"+ schemaName.getBytes().length); dos.writeInt(schema.getName().getBytes().length); dos.write(schema.getName().getBytes()); byte[] dataBytes = dataStream.toByteArray(); System.out.println("Data Buffer length:" + dataBytes.length); dos.writeInt(dataBytes.length); dos.write(dataBytes); return out.toByteArray(); }
private DatumWriter<Object> getDatumWriter(Class<Object> type, Schema schema) { DatumWriter<Object> writer; this.logger.debug("Finding correct DatumWriter for type " + type.getName()); if (SpecificRecord.class.isAssignableFrom(type)) { if (schema != null) { writer = new SpecificDatumWriter<>(schema); } else { writer = new SpecificDatumWriter<>(type); } } else if (GenericRecord.class.isAssignableFrom(type)) { writer = new GenericDatumWriter<>(schema); } else { if (schema != null) { writer = new ReflectDatumWriter<>(schema); } else { writer = new ReflectDatumWriter<>(type); } } return writer; }
public void generateAvroFile(Schema schema, File file, long recourdCount) throws IOException { DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer); dataFileWriter.create(schema, file); for(long i = 0; i < recourdCount; i++) { GenericRecord datum = new GenericData.Record(schema); datum.put("b", i % 2 == 0); datum.put("s", String.valueOf(i)); datum.put("l", i); datum.put("l100", i % 100); datum.put("s100", String.valueOf(i%100)); dataFileWriter.append(datum); } dataFileWriter.close(); }
private byte[] createAvroData(String name, int age, List<String> emails) throws IOException { String AVRO_SCHEMA = "{\n" +"\"type\": \"record\",\n" +"\"name\": \"Employee\",\n" +"\"fields\": [\n" +" {\"name\": \"name\", \"type\": \"string\"},\n" +" {\"name\": \"age\", \"type\": \"int\"},\n" +" {\"name\": \"emails\", \"type\": {\"type\": \"array\", \"items\": \"string\"}},\n" +" {\"name\": \"boss\", \"type\": [\"Employee\",\"null\"]}\n" +"]}"; Schema schema = new Schema.Parser().parse(AVRO_SCHEMA); ByteArrayOutputStream out = new ByteArrayOutputStream(); GenericRecord e1 = new GenericData.Record(schema); e1.put("name", name); e1.put("age", age); e1.put("emails", emails); e1.put("boss", null); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); DataFileWriter<GenericRecord>dataFileWriter = new DataFileWriter<>(datumWriter); dataFileWriter.create(schema, out); dataFileWriter.append(e1); dataFileWriter.close(); return out.toByteArray(); }
private Consumer<IndexedRecord> getWritingConsumer(Encoder[] encoder) { return new Consumer<IndexedRecord>() { GenericDatumWriter<IndexedRecord> writer = null; @Override public void accept(IndexedRecord ir) { if (writer == null) { writer = new GenericDatumWriter<>(ir.getSchema()); try { if (json) { encoder[0] = EncoderFactory.get().jsonEncoder(ir.getSchema(), output); } else { encoder[0] = EncoderFactory.get().binaryEncoder(output, null); } } catch (IOException ioe) { throw new RuntimeException(ioe); } } writeIndexedRecord(writer, encoder[0], ir); } }; }
@Test public void testGenerateAvro3() { try { Parser parser = new Schema.Parser(); Schema peopleSchema = parser.parse(new File(getTestResource("people.avsc").toURI())); GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(peopleSchema); DataFileWriter<GenericRecord> dfw = new DataFileWriter<GenericRecord>(datumWriter); File tempfile = File.createTempFile("karma-people", "avro"); tempfile.deleteOnExit(); dfw.create(peopleSchema, new FileOutputStream(tempfile)); JSONArray array = new JSONArray(IOUtils.toString(new FileInputStream(new File(getTestResource("people.json").toURI())))); for(int i = 0; i < array.length(); i++) { dfw.append(generatePersonRecord(peopleSchema, array.getJSONObject(i))); } dfw.flush(); dfw.close(); } catch (Exception e) { logger.error("testGenerateAvro3 failed:", e); fail("Execption: " + e.getMessage()); } }
public void setProcessingOrder(Map<TriplesMapGraph, List<String>> triplesMapProcessingOrder) throws IOException { for(Entry<TriplesMapGraph, List<String>> entry : triplesMapProcessingOrder.entrySet()) { for(String triplesMapId : entry.getValue()) { triplesMapIdToSchema.put(triplesMapId, getSchemaForTriplesMap(entry.getKey(), triplesMapId)); } } String rootTriplesMapId = this.rootTriplesMapIds.iterator().next(); rootSchema = triplesMapIdToSchema.get(rootTriplesMapId); datumWriter = new GenericDatumWriter<GenericRecord>(rootSchema); dfw = new DataFileWriter<GenericRecord>(datumWriter); dfw.create(rootSchema, output); }
public static void createFileIfNotExists(BlockSchema fileSchema, String path) throws IOException { Configuration conf = new JobConf(); FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(path))) return; Schema avroSchema = convertFromBlockSchema("CUBERT_MV_RECORD", fileSchema); System.out.println("Creating avro file with schema = " + avroSchema); GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(avroSchema); DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(datumWriter); FSDataOutputStream fout = FileSystem.create(fs, new Path(path), new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE)); writer.create(avroSchema, fout); writer.flush(); writer.close(); }