private void verifyFragments(InputSplit[] fs, List<Fragment> fragments) throws Exception { log("Total fragments [expected, actual]: " + fs.length + ", " + fragments.size()); assertEquals(fs.length, fragments.size()); for (int i = 0; i < fs.length; i++) { CombineFileSplit split = (CombineFileSplit) fs[i]; Fragment frag = fragments.get(i); log("Number of hosts hosting the fragment [expected, actual]: " + fs[i].getLocations().length + ", " + frag.getReplicas().length); assertEquals(fs[i].getLocations().length, frag.getReplicas().length); log("Fragment source name [expected, actual]: " + split.getPath(0).toString() + ", " + frag.getSourceName()); assertEquals(split.getPath(0).toString(), "/" + frag.getSourceName()); for (int j = 0; j < frag.getReplicas().length; j++) { log("Fragment host [expected, actual]: " + fs[i].getLocations()[j] + ", " + frag.getReplicas()[j]); assertEquals(fs[i].getLocations()[j], frag.getReplicas()[j]); log(" User data [expected, actual]: " + null + ", " + frag.getUserData()); assertEquals(null, frag.getUserData()); } } }
private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs) throws IOException { if (hoplogs == null || hoplogs.isEmpty()) { return new InputSplit[0]; } HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs); List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf); InputSplit[] splits = new InputSplit[mr2Splits.size()]; int i = 0; for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) { org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit; mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit; CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(), mr2Spit.getStartOffsets(), mr2Spit.getLengths(), mr2Spit.getLocations()); splits[i] = getSplit(split); i++; } return splits; }
@Override public List<Fragment> getFragments() throws IOException { InputSplit[] splits; // try { splits = getSplits(); // } finally { // this.gfxdManager.resetLonerSystemInUse(); // } for (InputSplit split : splits) { CombineFileSplit cSplit = (CombineFileSplit)split; if (cSplit.getLength() > 0L) { String filepath = cSplit.getPath(0).toUri().getPath(); filepath = filepath.substring(1); if (this.gfxdManager.getLogger().isDebugEnabled()) { this.gfxdManager.getLogger().debug("fragment-filepath " + filepath); } byte[] data = this.gfxdManager.populateUserData(cSplit); this.fragments.add(new Fragment(filepath, cSplit.getLocations(), data)); } } return this.fragments; }
/** * Creates an input split for every block occupied by hoplogs of the input * regions * * @param job * @param hoplogs * @return array of input splits of type file input split * @throws IOException */ private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs) throws IOException { if (hoplogs == null || hoplogs.isEmpty()) { return new InputSplit[0]; } HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs); List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf); InputSplit[] splits = new InputSplit[mr2Splits.size()]; int i = 0; for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) { org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit; mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit; CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(), mr2Spit.getStartOffsets(), mr2Spit.getLengths(), mr2Spit.getLocations()); splits[i] = split; i++; } return splits; }
static RecordReader<NullWritable, DynamoDBItemWritable> getRecordReader( InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException { // CombineFileSplit indicates the new export format which includes a manifest file if (inputSplit instanceof CombineFileSplit) { int version = job.getInt(DynamoDBConstants.EXPORT_FORMAT_VERSION, -1); if (version != ExportManifestRecordWriter.FORMAT_VERSION) { throw new IOException("Unknown version: " + job.get(DynamoDBConstants .EXPORT_FORMAT_VERSION)); } return new ImportCombineFileRecordReader((CombineFileSplit) inputSplit, job, reporter); } else if (inputSplit instanceof FileSplit) { // FileSplit indicates the old data pipeline format which doesn't include a manifest file Path path = ((FileSplit) inputSplit).getPath(); return new ImportRecordReader(job, path); } else { throw new IOException("Expecting CombineFileSplit or FileSplit but the input split type is:" + " " + inputSplit.getClass()); } }
/** * Combines a number of file splits into one CombineFileSplit. If number of * splits to be combined is one, it returns this split as is without creating * a CombineFileSplit. * @param splits * @param startIndex * @param count * @return * @throws IOException */ public static InputSplit combineFileSplits(JobConf conf, List<FileSplit> splits, int startIndex, int count) throws IOException { if (count == 1) { return splits.get(startIndex); } else { Path[] paths = new Path[count]; long[] starts = new long[count]; long[] lengths = new long[count]; Vector<String> vlocations = new Vector<String>(); while (count > 0) { paths[count - 1] = splits.get(startIndex).getPath(); starts[count - 1] = splits.get(startIndex).getStart(); lengths[count - 1] = splits.get(startIndex).getLength(); vlocations.addAll(Arrays.asList(splits.get(startIndex).getLocations())); count--; startIndex++; } String[] locations = prioritizeLocations(vlocations); return new CombineFileSplit(conf, paths, starts, lengths, locations); } }
/** * Combines two file splits into a CombineFileSplit. * @param conf * @param split1 * @param split2 * @return * @throws IOException */ public static InputSplit combineFileSplits(JobConf conf, FileSplit split1, FileSplit split2) throws IOException { Path[] paths = new Path[2]; long[] starts = new long[2]; long[] lengths = new long[2]; Vector<String> vlocations = new Vector<String>(); paths[0] = split1.getPath(); starts[0] = split1.getStart(); lengths[0] = split1.getLength(); vlocations.addAll(Arrays.asList(split1.getLocations())); paths[1] = split2.getPath(); starts[1] = split2.getStart(); lengths[1] = split2.getLength(); vlocations.addAll(Arrays.asList(split2.getLocations())); String[] locations = prioritizeLocations(vlocations); return new CombineFileSplit(conf, paths, starts, lengths, locations); }
/** * Make sure we do not generate a lot of data here as this will be duplicated * per split and sent to HAWQ master and later to datanodes. * * The sequence in which data is written to out must match the sequence it is * read in {@link #readUserData()} * * <p> * Only called from Fragmenter. * * @param cSplit * @return */ public byte[] populateUserData(CombineFileSplit cSplit) throws IOException { // Construct user data ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutput out = new DataOutputStream(baos); // TODO Uncomment below statement (and its corresponding code in // readUserData()) when loner system is started from fragmenter as well as // from accessor. // 1. restart loner // out.write(RESTART_LONER_SYSTEM_CODE); // out.writeBoolean(this.restartLoner); // 2. home dir out.write(HOME_DIR_CODE); out.writeUTF(this.homeDir); // 3. schema.table out.write(SCHEMA_TABLE_NAME_CODE); out.writeUTF(this.schemaTableName); out.write(SPLIT_CODE); cSplit.write(out); // Serialize it and return return baos.toByteArray(); }
/** * This is only called from Accessor. The sequence in which switch cases * appear must match to the sequence followed in writing data to out in * {@link #populateUserData(FileSplit)} * * @param data * @throws IOException */ public void readUserData() throws IOException { byte[] data = this.inputData.getFragmentMetadata(); if (data != null && data.length > 0) { boolean done = false; ByteArrayDataInput in = new ByteArrayDataInput(); in.initialize(data, null); while (!done) { try { switch (in.readByte()) { case HOME_DIR_CODE: this.homeDir = in.readUTF(); this.logger.debug("Accessor received home dir: " + this.homeDir); break; case SCHEMA_TABLE_NAME_CODE: this.schemaTableName = in.readUTF(); this.logger.debug("Accessor received schemaTable name: " + this.schemaTableName); break; case SPLIT_CODE: this.split = new CombineFileSplit(); this.split.readFields(in); this.logger.debug("Accessor split read, total length: " + this.split.getLength()); done = true; break; default: this.logger.error("Internal error: Invalid data from fragmenter."); done = true; break; } } catch (EOFException eofe) { this.logger.error("Internal error: Invalid data from fragmenter."); break; // from while(). } } } }
/** * Initializes instance of record reader using file split and job * configuration * * @param split * @param conf * @throws IOException */ public void initialize(CombineFileSplit split, JobConf conf) throws IOException { CombineFileSplit cSplit = (CombineFileSplit) split; Path[] path = cSplit.getPaths(); long[] start = cSplit.getStartOffsets(); long[] len = cSplit.getLengths(); FileSystem fs = cSplit.getPath(0).getFileSystem(conf); this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l); }
@Override public RecordReader<GFKey, PersistedEventImpl> getRecordReader( InputSplit split, JobConf job, Reporter reporter) throws IOException { CombineFileSplit cSplit = (CombineFileSplit) split; AbstractGFRecordReader reader = new AbstractGFRecordReader(); reader.initialize(cSplit, job); return reader; }
public ImportCombineFileRecordReader(CombineFileSplit combineFileSplit, JobConf job, Reporter reporter) throws IOException { this.combineFileSplit = combineFileSplit; this.job = job; this.reporter = reporter; processedPathCount = 0; currentRecordReader = getRecordReader(combineFileSplit.getPath(processedPathCount)); }
@SuppressWarnings("unchecked") public BinaryRecordReader(Configuration conf, CombineFileSplit split) throws IOException { this.conf = conf; this.split = split; internalReaders = new RecordReader[(int) split.getNumPaths()]; // Initialize all record readers for (int i = 0; i < split.getNumPaths(); i++) { this.internalReaders[i] = createRecordReader(this.conf, this.split, i); } }
@Test(timeout=10000) public void testFormat() throws Exception { JobConf job = new JobConf(conf); Reporter reporter = Reporter.NULL; Random random = new Random(); long seed = random.nextLong(); LOG.info("seed = "+seed); random.setSeed(seed); localFs.delete(workDir, true); FileInputFormat.setInputPaths(job, workDir); final int length = 10000; final int numFiles = 10; // create a file with various lengths createFiles(length, numFiles, random); // create a combine split for the files InputFormat<IntWritable, BytesWritable> format = new CombineSequenceFileInputFormat<IntWritable, BytesWritable>(); IntWritable key = new IntWritable(); BytesWritable value = new BytesWritable(); for (int i = 0; i < 3; i++) { int numSplits = random.nextInt(length/(SequenceFile.SYNC_INTERVAL/20))+1; LOG.info("splitting: requesting = " + numSplits); InputSplit[] splits = format.getSplits(job, numSplits); LOG.info("splitting: got = " + splits.length); // we should have a single split as the length is comfortably smaller than // the block size assertEquals("We got more than one splits!", 1, splits.length); InputSplit split = splits[0]; assertEquals("It should be CombineFileSplit", CombineFileSplit.class, split.getClass()); // check each split BitSet bits = new BitSet(length); RecordReader<IntWritable, BytesWritable> reader = format.getRecordReader(split, job, reporter); try { while (reader.next(key, value)) { assertFalse("Key in multiple partitions.", bits.get(key.get())); bits.set(key.get()); } } finally { reader.close(); } assertEquals("Some keys in no partition.", length, bits.cardinality()); } }
@Test(timeout=10000) public void testFormat() throws Exception { JobConf job = new JobConf(defaultConf); Random random = new Random(); long seed = random.nextLong(); LOG.info("seed = "+seed); random.setSeed(seed); localFs.delete(workDir, true); FileInputFormat.setInputPaths(job, workDir); final int length = 10000; final int numFiles = 10; createFiles(length, numFiles, random); // create a combined split for the files CombineTextInputFormat format = new CombineTextInputFormat(); LongWritable key = new LongWritable(); Text value = new Text(); for (int i = 0; i < 3; i++) { int numSplits = random.nextInt(length/20)+1; LOG.info("splitting: requesting = " + numSplits); InputSplit[] splits = format.getSplits(job, numSplits); LOG.info("splitting: got = " + splits.length); // we should have a single split as the length is comfortably smaller than // the block size assertEquals("We got more than one splits!", 1, splits.length); InputSplit split = splits[0]; assertEquals("It should be CombineFileSplit", CombineFileSplit.class, split.getClass()); // check the split BitSet bits = new BitSet(length); LOG.debug("split= " + split); RecordReader<LongWritable, Text> reader = format.getRecordReader(split, job, voidReporter); try { int count = 0; while (reader.next(key, value)) { int v = Integer.parseInt(value.toString()); LOG.debug("read " + v); if (bits.get(v)) { LOG.warn("conflict with " + v + " at position "+reader.getPos()); } assertFalse("Key in multiple partitions.", bits.get(v)); bits.set(v); count++; } LOG.info("splits="+split+" count=" + count); } finally { reader.close(); } assertEquals("Some keys in no partition.", length, bits.cardinality()); } }
public void testEventInputFormat() throws Exception { getConnection(); Connection conn = startNetserverAndGetLocalNetConnection(); Statement st = conn.createStatement(); st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds"); st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1"); PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)"); int NUM_ENTRIES = 20; for(int i = 0; i < NUM_ENTRIES; i++) { ps.setInt(1, i); ps.setString(2, "Value-" + System.nanoTime()); ps.execute(); } //Wait for data to get to HDFS... String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1"); st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)"); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/")); assertEquals(1, list.length); conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1"); conf.set(RowInputFormat.HOME_DIR, HDFS_DIR); JobConf job = new JobConf(conf); job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false); RowInputFormat ipformat = new RowInputFormat(); InputSplit[] splits = ipformat.getSplits(job, 2); assertEquals(1, splits.length); CombineFileSplit split = (CombineFileSplit) splits[0]; assertEquals(1, split.getPaths().length); assertEquals(list[0].getPath().toString(), split.getPath(0).toString()); assertEquals(0, split.getOffset(0)); assertEquals(list[0].getLen(), split.getLength(0)); RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null); Key key = rr.createKey(); Row value = rr.createValue(); int count = 0; while (rr.next(key, value)) { assertEquals(count++, value.getRowAsResultSet().getInt("col1")); } assertEquals(20, count); TestUtil.shutDown(); }
public void testNoSecureHdfsCheck() throws Exception { getConnection(); Connection conn = startNetserverAndGetLocalNetConnection(); Statement st = conn.createStatement(); st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds"); st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1"); PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)"); int NUM_ENTRIES = 20; for(int i = 0; i < NUM_ENTRIES; i++) { ps.setInt(1, i); ps.setString(2, "Value-" + System.nanoTime()); ps.execute(); } //Wait for data to get to HDFS... String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1"); st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)"); stopNetServer(); FabricServiceManager.currentFabricServiceInstance().stop(new Properties()); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/")); assertEquals(1, list.length); conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1"); conf.set(RowInputFormat.HOME_DIR, HDFS_DIR); conf.set("hadoop.security.authentication", "kerberos"); JobConf job = new JobConf(conf); job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false); RowInputFormat ipformat = new RowInputFormat(); InputSplit[] splits = ipformat.getSplits(job, 2); assertEquals(1, splits.length); CombineFileSplit split = (CombineFileSplit) splits[0]; assertEquals(1, split.getPaths().length); assertEquals(list[0].getPath().toString(), split.getPath(0).toString()); assertEquals(0, split.getOffset(0)); assertEquals(list[0].getLen(), split.getLength(0)); RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null); Key key = rr.createKey(); Row value = rr.createValue(); int count = 0; while (rr.next(key, value)) { assertEquals(count++, value.getRowAsResultSet().getInt("col1")); } assertEquals(20, count); TestUtil.shutDown(); }
private static InputSplit getCombineSplit(Path path, long l, long m, String[] strings) { Path[] paths = {path}; long[] offsets = {l}; long[] lengths = {m}; return new CombineFileSplit(null, paths, offsets, lengths, strings); }
@Override public byte[] populateUserData(CombineFileSplit fileSplit) throws IOException { return null; }
@Override protected InputSplit getSplit(CombineFileSplit split) { return new GFXDHiveSplit(split); }
public GFXDHiveSplit(CombineFileSplit fileSplit) { this.combineFileSplit = fileSplit; }
public void readFields(DataInput in) throws IOException { this.combineFileSplit = new CombineFileSplit(); this.combineFileSplit.readFields(in); }
protected Path[] getSplitPaths(InputSplit split){ CombineFileSplit combineSplit = (CombineFileSplit) split; return combineSplit.getPaths(); }
protected long[] getStartOffsets(InputSplit split){ CombineFileSplit combineSplit = (CombineFileSplit) split; return combineSplit.getStartOffsets(); }
protected long[] getLengths(InputSplit split){ CombineFileSplit combineSplit = (CombineFileSplit) split; return combineSplit.getLengths(); }
protected InputSplit getSplit(CombineFileSplit split) { return split; }
/** * This method retrieves the URLs of all S3 files and generates input splits by combining * multiple S3 URLs into one split. * * @return a list of input splits. The length of this list may not be exactly the same as * <code>numSplits</code>. For example, if numSplits is larger than MAX_NUM_SPLITS or the number * of S3 files, then numSplits is ignored. Furthermore, not all input splits contain the same * number of S3 files. For example, with five S3 files {s1, s2, s3, s4, s5} and numSplits = 3, * this method returns a list of three input splits: {s1, s2}, {s3, s4} and {s5}. */ private List<InputSplit> readEntries(JsonReader reader, JobConf job) throws IOException { List<Path> paths = new ArrayList<Path>(); Gson gson = DynamoDBUtil.getGson(); reader.beginArray(); while (reader.hasNext()) { ExportManifestEntry entry = gson.fromJson(reader, ExportManifestEntry.class); paths.add(new Path(entry.url)); } reader.endArray(); log.info("Number of S3 files: " + paths.size()); if (paths.size() == 0) { return Collections.emptyList(); } int filesPerSplit = (int) Math.ceil((double) (paths.size()) / Math.min(MAX_NUM_SPLITS, paths .size())); int numSplits = (int) Math.ceil((double) (paths.size()) / filesPerSplit); long[] fileMaxLengths = new long[filesPerSplit]; Arrays.fill(fileMaxLengths, Long.MAX_VALUE / filesPerSplit); long[] fileStarts = new long[filesPerSplit]; Arrays.fill(fileStarts, 0); List<InputSplit> splits = new ArrayList<InputSplit>(numSplits); for (int i = 0; i < numSplits; i++) { int start = filesPerSplit * i; int end = filesPerSplit * (i + 1); if (i == (numSplits - 1)) { end = paths.size(); } Path[] pathsInOneSplit = paths.subList(start, end).toArray(new Path[end - start]); CombineFileSplit combineFileSplit = new CombineFileSplit(job, pathsInOneSplit, fileStarts, fileMaxLengths, new String[0]); splits.add(combineFileSplit); } return splits; }