private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path, List<String> keys, List<String> values) throws IOException { FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path)); IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in, fs.getFileStatus(path).getLen(), null, null); DataInputBuffer keyBuff = new DataInputBuffer(); DataInputBuffer valueBuff = new DataInputBuffer(); Text key = new Text(); Text value = new Text(); while (reader.nextRawKey(keyBuff)) { key.readFields(keyBuff); keys.add(key.toString()); reader.nextRawValue(valueBuff); value.readFields(valueBuff); values.add(value.toString()); } }
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path, List<String> keys, List<String> values) throws IOException { IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs, path, null, null); DataInputBuffer keyBuff = new DataInputBuffer(); DataInputBuffer valueBuff = new DataInputBuffer(); Text key = new Text(); Text value = new Text(); while (reader.nextRawKey(keyBuff)) { key.readFields(keyBuff); keys.add(key.toString()); reader.nextRawValue(valueBuff); value.readFields(valueBuff); values.add(value.toString()); } }
public void close() throws IOException { // Write EOF_MARKER for key/value length WritableUtils.writeVInt(out, IFile.EOF_MARKER); WritableUtils.writeVInt(out, IFile.EOF_MARKER); // Close the stream out.close(); out = null; }
private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); FSDataOutputStream fsdos = new FSDataOutputStream(baos, null); IFile.Writer<Text, Text> writer = new IFile.Writer<Text, Text>(conf, fsdos, Text.class, Text.class, null, null); for (String key : keysToValues.keySet()) { String value = keysToValues.get(key); writer.append(new Text(key), new Text(value)); } writer.close(); return baos.toByteArray(); }