@Test(timeout = 30000) public void testAppendRecordCompression() throws Exception { GenericTestUtils.assumeInNativeProfile(); Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq"); fs.delete(file, true); Option compressOption = Writer.compression(CompressionType.RECORD, new GzipCodec()); Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), SequenceFile.Writer.keyClass(Long.class), SequenceFile.Writer.valueClass(String.class), compressOption); writer.append(1L, "one"); writer.append(2L, "two"); writer.close(); verify2Values(file); writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), SequenceFile.Writer.keyClass(Long.class), SequenceFile.Writer.valueClass(String.class), SequenceFile.Writer.appendIfExists(true), compressOption); writer.append(3L, "three"); writer.append(4L, "four"); writer.close(); verifyAll4Values(file); fs.deleteOnExit(file); }
private static void writeToSequenceFile(Configuration conf, Path path) throws IOException { IntWritable key = new IntWritable(); Text value = new Text(); Option fileOption = Writer.file(path); Option keyClassOption = Writer.keyClass(key.getClass()); Option valueClassOption = Writer.valueClass(value.getClass()); try (Writer writer = SequenceFile.createWriter(conf, fileOption, keyClassOption, valueClassOption)) { for (int i = 0; i < ENTRIES.length; i++) { key.set(i); value.set(ENTRIES[i]); writer.append(key, value); } } }
@Test(timeout = 30000) public void testAppendSort() throws Exception { GenericTestUtils.assumeInNativeProfile(); Path file = new Path(ROOT_PATH, "testseqappendSort.seq"); fs.delete(file, true); Path sortedFile = new Path(ROOT_PATH, "testseqappendSort.seq.sort"); fs.delete(sortedFile, true); SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, new JavaSerializationComparator<Long>(), Long.class, String.class, conf); Option compressOption = Writer.compression(CompressionType.BLOCK, new GzipCodec()); Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), SequenceFile.Writer.keyClass(Long.class), SequenceFile.Writer.valueClass(String.class), compressOption); writer.append(2L, "two"); writer.append(1L, "one"); writer.close(); writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), SequenceFile.Writer.keyClass(Long.class), SequenceFile.Writer.valueClass(String.class), SequenceFile.Writer.appendIfExists(true), compressOption); writer.append(4L, "four"); writer.append(3L, "three"); writer.close(); // Sort file after append sorter.sort(file, sortedFile); verifyAll4Values(sortedFile); fs.deleteOnExit(file); fs.deleteOnExit(sortedFile); }
public static void main(String[] args) throws IOException { FileInputStream fstream = new FileInputStream(args[0]); // Get the object of DataInputStream DataInputStream in = new DataInputStream(fstream); BufferedReader br = new BufferedReader(new InputStreamReader(in)); Job job = new Job(); Configuration conf = job.getConfiguration(); FileSystem fs = FileSystem.get(URI.create(args[1]), conf); Path path = new Path(fs.getUri()); LongWritable itemKey = new LongWritable(); IntArrayWritable itemValue = new IntArrayWritable(); /** GzipCodec might not work */ CompressionCodec Codec = new GzipCodec(); Option optPath = SequenceFile.Writer.file(path); Option optKey = SequenceFile.Writer.keyClass(itemKey.getClass()); Option optValue = SequenceFile.Writer.valueClass(itemValue.getClass()); Option optCom = SequenceFile.Writer.compression(CompressionType.RECORD, Codec); SequenceFile.Writer fileWriter = SequenceFile.createWriter(conf, optPath, optKey, optValue, optCom); String strLine; long counter = 0; //Read File Line By Line while ((strLine = br.readLine()) != null) { String[] strTerms = strLine.split("\t"); int[] intTerms = new int[strTerms.length]; for(int i=0; i<strTerms.length; i++) intTerms[i] = Integer.parseInt(strTerms[i]); fileWriter.append(new LongWritable(counter), new IntArrayWritable(intTerms)); counter++; System.out.println (intTerms); } //Close the input stream in.close(); fileWriter.close(); }