@Override public RecordReader<NullWritable,ColumnAndIndex> getRecordReader( final InputSplit split, final JobConf job, final Reporter reporter ) throws IOException { FileSplit fileSplit = (FileSplit)split; Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem( job ); long fileLength = fs.getLength( path ); long start = fileSplit.getStart(); long length = fileSplit.getLength(); InputStream in = fs.open( path ); IJobReporter jobReporter = new HadoopJobReporter( reporter ); jobReporter.setStatus( String.format( "Read file : %s" , path.toString() ) ); HiveReaderSetting hiveConfig = new HiveReaderSetting( fileSplit , job ); if ( hiveConfig.isVectorMode() ){ IVectorizedReaderSetting vectorizedSetting = new HiveVectorizedReaderSetting( fileSplit , job , hiveConfig ); return (RecordReader)new MDSHiveDirectVectorizedReader( in , fileLength , start , length , vectorizedSetting , jobReporter ); } else{ return new MDSHiveLineReader( in , fileLength , start , length , hiveConfig , jobReporter , spreadCounter ); } }
public HiveTextRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns, FragmentContext context) throws ExecutionSetupException { super(table, partition, inputSplit, projectedColumns, context, null); String d = table.getSd().getSerdeInfo().getParameters().get("field.delim"); if (d != null) { delimiter = d.getBytes()[0]; } else { delimiter = (byte) 1; } assert delimiter > 0; List<Integer> ids = Lists.newArrayList(); for (int i = 0; i < tableColumns.size(); i++) { if (selectedColumnNames.contains(tableColumns.get(i))) { ids.add(i); } } columnIds = ids; numCols = tableColumns.size(); }
@Override public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException { try { final List<InputSplit> splits = mappings.get(minorFragmentId); List<HivePartition> parts = Lists.newArrayList(); final List<String> encodedInputSplits = Lists.newArrayList(); final List<String> splitTypes = Lists.newArrayList(); for (final InputSplit split : splits) { HivePartition partition = null; if (partitionMap.get(split) != null) { partition = new HivePartition(partitionMap.get(split)); } parts.add(partition); encodedInputSplits.add(serializeInputSplit(split)); splitTypes.add(split.getClass().getName()); } if (parts.contains(null)) { parts = null; } final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride); return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); } }
@Override public ScanStats getScanStats() { try { long data =0; for (final InputSplit split : inputSplits) { data += split.getLength(); } long estRowCount = rowCount; if (estRowCount == 0) { // having a rowCount of 0 can mean the statistics were never computed estRowCount = data/1024; } // Hive's native reader is neither memory efficient nor fast. Increase the CPU cost // by a factor to let the planner choose HiveDrillNativeScan over HiveScan with SerDes. float cpuCost = 1 * getSerDeOverheadFactor(); logger.debug("estimated row count = {}, stats row count = {}", estRowCount, rowCount); return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, cpuCost, data); } catch (final IOException e) { throw new DrillRuntimeException(e); } }
/** * Read from Geode, using MonarchRecordReader, all the records from the provided split. * The split contains the range of records to be read by the record reader. It * returns the total number of records read by this method. * * @param conf the reader configuration -- must have the region name * @param split the input-split containing the records to be read * @param predicates the predicates to filter out unwanted results * @return the total number of records read */ private long readUsingRecordReader(final Configuration conf, final InputSplit split, final Filter... predicates) { MonarchRecordReader mrr = new MonarchRecordReader(conf); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); for (int i=0; i<predicates.length; i++) { filterList.addFilter(predicates[i]); } mrr.pushDownfilters = filterList; long size = 0; try { mrr.initialize(split, conf); Writable key = mrr.createKey(); Writable value = mrr.createValue(); while (mrr.next(key, value)) { ++size; } mrr.close(); } catch (IOException e) { e.printStackTrace(); } return size; }
/** * {@inheritDoc} * @throws IOException If the child InputSplit cannot be read, typically * for faliing access checks. */ @SuppressWarnings("unchecked") // Generic array assignment public void readFields(DataInput in) throws IOException { int card = WritableUtils.readVInt(in); if (splits == null || splits.length != card) { splits = new InputSplit[card]; } Class<? extends InputSplit>[] cls = new Class[card]; try { for (int i = 0; i < card; ++i) { cls[i] = Class.forName(Text.readString(in)).asSubclass(InputSplit.class); } for (int i = 0; i < card; ++i) { splits[i] = ReflectionUtils.newInstance(cls[i], null); splits[i].readFields(in); } } catch (ClassNotFoundException e) { throw (IOException)new IOException("Failed split init").initCause(e); } }
/** * From each split sampled, take the first numSamples / numSplits records. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException { InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); ArrayList<K> samples = new ArrayList<K>(numSamples); int splitsToSample = Math.min(maxSplitsSampled, splits.length); int splitStep = splits.length / splitsToSample; int samplesPerSplit = numSamples / splitsToSample; long records = 0; for (int i = 0; i < splitsToSample; ++i) { RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep], job, Reporter.NULL); K key = reader.createKey(); V value = reader.createValue(); while (reader.next(key, value)) { samples.add(key); key = reader.createKey(); ++records; if ((i+1) * samplesPerSplit <= records) { break; } } reader.close(); } return (K[])samples.toArray(); }
/** * For each split sampled, emit when the ratio of the number of records * retained to the total record count is less than the specified * frequency. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException { InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); ArrayList<K> samples = new ArrayList<K>(); int splitsToSample = Math.min(maxSplitsSampled, splits.length); int splitStep = splits.length / splitsToSample; long records = 0; long kept = 0; for (int i = 0; i < splitsToSample; ++i) { RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep], job, Reporter.NULL); K key = reader.createKey(); V value = reader.createValue(); while (reader.next(key, value)) { ++records; if ((double) kept / records < freq) { ++kept; samples.add(key); key = reader.createKey(); } } reader.close(); } return (K[])samples.toArray(); }
/** * test DBInputFormat class. Class should split result for chunks * @throws Exception */ @Test(timeout = 10000) public void testDBInputFormat() throws Exception { JobConf configuration = new JobConf(); setupDriver(configuration); DBInputFormat<NullDBWritable> format = new DBInputFormat<NullDBWritable>(); format.setConf(configuration); format.setConf(configuration); DBInputFormat.DBInputSplit splitter = new DBInputFormat.DBInputSplit(1, 10); Reporter reporter = mock(Reporter.class); RecordReader<LongWritable, NullDBWritable> reader = format.getRecordReader( splitter, configuration, reporter); configuration.setInt(MRJobConfig.NUM_MAPS, 3); InputSplit[] lSplits = format.getSplits(configuration, 3); assertEquals(5, lSplits[0].getLength()); assertEquals(3, lSplits.length); // test reader .Some simple tests assertEquals(LongWritable.class, reader.createKey().getClass()); assertEquals(0, reader.getPos()); assertEquals(0, reader.getProgress(), 0.001); reader.close(); }
public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { FileSplit fileSplit = (FileSplit) split; FileSystem fs = FileSystem.get(fileSplit.getPath().toUri(), job); FSDataInputStream is = fs.open(fileSplit.getPath()); byte[] header = new byte[3]; RecordReader reader = null; try { is.readFully(header); } catch (EOFException eof) { reader = textInputFormat.getRecordReader(split, job, reporter); } finally { is.close(); } if (header[0] == 'S' && header[1] == 'E' && header[2] == 'Q') { reader = seqFileInputFormat.getRecordReader(split, job, reporter); } else { reader = textInputFormat.getRecordReader(split, job, reporter); } return reader; }
@Test public void readExcelInputFormatExcel2013SingleSheetEncryptedNegativeLowFootprint() throws IOException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="excel2013encrypt.xlsx"; String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); Path file = new Path(fileNameSpreadSheet); FileInputFormat.setInputPaths(job, file); // set locale to the one of the test data job.set("hadoopoffice.read.locale.bcp47","de"); // low footprint job.set("hadoopoffice.read.lowFootprint", "true"); // for decryption simply set the password job.set("hadoopoffice.read.security.crypt.password","test2"); ExcelFileInputFormat format = new ExcelFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals(1,inputSplits.length,"Only one split generated for Excel file"); RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNull(reader,"Null record reader implies invalid password"); }
@Test public void testOpenClose() throws Exception { DummyRecordReader recordReader = mock(DummyRecordReader.class); DummyInputFormat inputFormat = mock(DummyInputFormat.class); when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader); HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf()); hadoopInputFormat.open(getHadoopInputSplit()); verify(inputFormat, times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class)); verify(recordReader, times(1)).createKey(); verify(recordReader, times(1)).createValue(); assertThat(hadoopInputFormat.fetched, is(false)); hadoopInputFormat.close(); verify(recordReader, times(1)).close(); }
@Test public void readExcelInputFormatExcel2013SingleSheetEncryptedNegative() throws IOException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="excel2013encrypt.xlsx"; String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); Path file = new Path(fileNameSpreadSheet); FileInputFormat.setInputPaths(job, file); // set locale to the one of the test data job.set("hadoopoffice.read.locale.bcp47","de"); // for decryption simply set the password job.set("hadoopoffice.read.security.crypt.password","test2"); ExcelFileInputFormat format = new ExcelFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals(1, inputSplits.length, "Only one split generated for Excel file"); RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNull(reader, "Null record reader implies invalid password"); }
@Test public void readEthereumBlockInputFormatBlock1346406Bzip2Compressed() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="eth1346406.bin.bz2"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for genesis block"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for block 1346406 contains at least one block"); assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions"); assertFalse( reader.next(key,block),"No further blocks in block 1346406"); reader.close(); }
static RecordReader<NullWritable, DynamoDBItemWritable> getRecordReader( InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException { // CombineFileSplit indicates the new export format which includes a manifest file if (inputSplit instanceof CombineFileSplit) { int version = job.getInt(DynamoDBConstants.EXPORT_FORMAT_VERSION, -1); if (version != ExportManifestRecordWriter.FORMAT_VERSION) { throw new IOException("Unknown version: " + job.get(DynamoDBConstants .EXPORT_FORMAT_VERSION)); } return new ImportCombineFileRecordReader((CombineFileSplit) inputSplit, job, reporter); } else if (inputSplit instanceof FileSplit) { // FileSplit indicates the old data pipeline format which doesn't include a manifest file Path path = ((FileSplit) inputSplit).getPath(); return new ImportRecordReader(job, path); } else { throw new IOException("Expecting CombineFileSplit or FileSplit but the input split type is:" + " " + inputSplit.getClass()); } }
@Test public void readEthereumBlockInputFormatBlock1() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="eth1.bin"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for genesis block"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for block 1 contains at least one block"); assertEquals( 0, block.getEthereumTransactions().size(),"Block 1 must have 0 transactions"); assertFalse( reader.next(key,block),"No further blocks in block 1"); reader.close(); }
@Test public void readExcelInputFormatExcel2003Empty() throws IOException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="excel2003empty.xls"; String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); Path file = new Path(fileNameSpreadSheet); FileInputFormat.setInputPaths(job, file); // set locale to the one of the test data job.set("hadoopoffice.locale.bcp47","de"); ExcelFileInputFormat format = new ExcelFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals(1, inputSplits.length,"Only one split generated for Excel file"); RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull(reader,"Format returned null RecordReader"); Text spreadSheetKey = new Text(); ArrayWritable spreadSheetValue = new ArrayWritable(SpreadSheetCellDAO.class); assertTrue( reader.next(spreadSheetKey,spreadSheetValue),"Input Split for Excel file contains row 1"); assertEquals(0,spreadSheetValue.get().length,"Input Split for Excel file contain row 1 and is empty"); assertFalse(reader.next(spreadSheetKey,spreadSheetValue),"Input Split for Excel file contains no further row"); }
@Override public List<Fragment> getFragments() throws IOException { InputSplit[] splits; // try { splits = getSplits(); // } finally { // this.gfxdManager.resetLonerSystemInUse(); // } for (InputSplit split : splits) { CombineFileSplit cSplit = (CombineFileSplit)split; if (cSplit.getLength() > 0L) { String filepath = cSplit.getPath(0).toUri().getPath(); filepath = filepath.substring(1); if (this.gfxdManager.getLogger().isDebugEnabled()) { this.gfxdManager.getLogger().debug("fragment-filepath " + filepath); } byte[] data = this.gfxdManager.populateUserData(cSplit); this.fragments.add(new Fragment(filepath, cSplit.getLocations(), data)); } } return this.fragments; }
private List<InputSplit> getSplitsFromManifest(JobConf job) throws IOException { Path[] dirs = getInputPaths(job); if (dirs.length == 0) { throw new IOException("No input path specified in job"); } else if (dirs.length > 1) { throw new IOException("Will only look for manifests in a single input directory (" + dirs .length + " directories provided)."); } TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); Path dir = dirs[0]; FileSystem fs = dir.getFileSystem(job); if (!fs.getFileStatus(dir).isDirectory()) { throw new IOException("Input path not a directory: " + dir); } Path manifestPath = new Path(dir, ExportManifestOutputFormat.MANIFEST_FILENAME); if (!fs.isFile(manifestPath)) { return null; } return parseManifest(fs, manifestPath, job); }
@Test public void testFetchNext() throws IOException { DummyRecordReader recordReader = mock(DummyRecordReader.class); when(recordReader.next(anyString(), anyLong())).thenReturn(true); DummyInputFormat inputFormat = mock(DummyInputFormat.class); when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader); HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf()); hadoopInputFormat.open(getHadoopInputSplit()); hadoopInputFormat.fetchNext(); verify(recordReader, times(1)).next(anyString(), anyLong()); assertThat(hadoopInputFormat.hasNext, is(true)); assertThat(hadoopInputFormat.fetched, is(true)); }
public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns, FragmentContext context, Map<String, String> hiveConfigOverride) throws ExecutionSetupException { this.table = table; this.partition = partition; this.inputSplit = inputSplit; this.empty = (inputSplit == null && partition == null); this.hiveConfigOverride = hiveConfigOverride; this.fragmentContext = context; this.managedBuffer = fragmentContext.getManagedBuffer().reallocIfNeeded(256); setColumns(projectedColumns); init(); }
public static InputSplit deserializeInputSplit(String base64, String className) throws IOException, ReflectiveOperationException{ Constructor<?> constructor = Class.forName(className).getDeclaredConstructor(); if (constructor == null) { throw new ReflectiveOperationException("Class " + className + " does not implement a default constructor."); } constructor.setAccessible(true); InputSplit split = (InputSplit) constructor.newInstance(); ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(base64)); split.readFields(byteArrayDataInput); return split; }
@Override public ScanBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children) throws ExecutionSetupException { List<RecordReader> readers = Lists.newArrayList(); Table table = config.getTable(); List<InputSplit> splits = config.getInputSplits(); List<Partition> partitions = config.getPartitions(); boolean hasPartitions = (partitions != null && partitions.size() > 0); int i = 0; // Native hive text record reader doesn't handle all types currently. For now use HiveRecordReader which uses // Hive InputFormat and SerDe classes to read the data. //if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) && // table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) && // config.getColumns() != null) { // for (InputSplit split : splits) { // readers.add(new HiveTextRecordReader(table, // (hasPartitions ? partitions.get(i++) : null), // split, config.getColumns(), context)); // } //} else { for (InputSplit split : splits) { readers.add(new HiveRecordReader(table, (hasPartitions ? partitions.get(i++) : null), split, config.getColumns(), context, config.getHiveReadEntry().hiveConfigOverride)); } //} // If there are no readers created (which is possible when the table is empty), create an empty RecordReader to // output the schema if (readers.size() == 0) { readers.add(new HiveRecordReader(table, null, null, config.getColumns(), context, config.getHiveReadEntry().hiveConfigOverride)); } return new ScanBatch(config, context, readers.iterator()); }
private void splitInput(final Properties properties, final StorageDescriptor sd, final Partition partition) throws ReflectiveOperationException, IOException { final JobConf job = new JobConf(); for (final Object obj : properties.keySet()) { job.set((String) obj, (String) properties.get(obj)); } for (final Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) { job.set(entry.getKey(), entry.getValue()); } InputFormat<?, ?> format = (InputFormat<?, ?>) Class.forName(sd.getInputFormat()).getConstructor().newInstance(); job.setInputFormat(format.getClass()); final Path path = new Path(sd.getLocation()); final FileSystem fs = path.getFileSystem(job); if (fs.exists(path)) { FileInputFormat.addInputPath(job, path); format = job.getInputFormat(); for (final InputSplit split : format.getSplits(job, 1)) { inputSplits.add(split); partitionMap.put(split, partition); } } final String numRowsProp = properties.getProperty("numRows"); logger.trace("HiveScan num rows property = {}", numRowsProp); if (numRowsProp != null) { final long numRows = Long.valueOf(numRowsProp); // starting from hive-0.13, when no statistics are available, this property is set to -1 // it's important to note that the value returned by hive may not be up to date if (numRows > 0) { rowCount += numRows; } } }
@Override public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) { mappings = Lists.newArrayList(); for (int i = 0; i < endpoints.size(); i++) { mappings.add(new ArrayList<InputSplit>()); } final int count = endpoints.size(); for (int i = 0; i < inputSplits.size(); i++) { mappings.get(i % count).add(inputSplits.get(i)); } }
public static String serializeInputSplit(final InputSplit split) throws IOException { final ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput(); split.write(byteArrayOutputStream); final String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray()); logger.debug("Encoded split string for split {} : {}", split, encoded); return encoded; }
/** * Provide the required splits from the specified configuration. By default this * method makes query (function-execution) on the region with `_meta' suffix * so need to be make sure that the region-name is passed accordingly. * * @param conf the job configuration * @param numSplits the required number of splits * @return the required splits to read/write the data * @throws IOException if table does not exist. */ public static InputSplit[] getSplits(final JobConf conf, final int numSplits) throws IOException { final Path[] tablePaths = FileInputFormat.getInputPaths(conf); /** initialize cache if not done yet.. **/ final AmpoolClient aClient = MonarchUtils.getConnectionFromConf(conf); String tableName = conf.get(MonarchUtils.REGION); boolean isFTable = MonarchUtils.isFTable(conf); Table table = null; if (isFTable) { table = aClient.getFTable(tableName); } else { table = aClient.getMTable(tableName); } if (table == null) { throw new IOException("Table " + tableName + "does not exist."); } int totalnumberOfSplits = table.getTableDescriptor().getTotalNumOfSplits(); Map<Integer, Set<ServerLocation>> bucketMap = new HashMap<>(numSplits); final AtomicLong start = new AtomicLong(0L); MonarchSplit[] splits = MTableUtils .getSplitsWithSize(tableName, numSplits, totalnumberOfSplits, bucketMap) .stream().map(e -> { MonarchSplit ms = convertToSplit(tablePaths, start.get(), e, bucketMap); start.addAndGet(e.getSize()); return ms; }).toArray(MonarchSplit[]::new); logger.info("numSplits= {}; MonarchSplits= {}", numSplits, Arrays.toString(splits)); return splits; }
@SuppressWarnings("unchecked") public static InputSplit[] getSplits(final JobConf conf, final int numSplits, int dummy) { final Path[] tablePaths = FileInputFormat.getInputPaths(conf); long splitSize = NumberUtils.toLong(conf.get(MonarchUtils.SPLIT_SIZE_KEY), DEFAULT_SPLIT_SIZE); final String regionName = conf.get(MonarchUtils.REGION) + MonarchUtils.META_TABLE_SFX; MPredicateHolder ph = new MPredicateHolder(-1, BasicTypes.STRING, CompareOp.REGEX, ".*"+MonarchUtils.KEY_BLOCKS_SFX); MonarchGetAllFunction func = new MonarchGetAllFunction(); final AmpoolClient aClient = MonarchUtils.getConnectionFromConf(conf); Execution exec = FunctionService.onServer(((GemFireCacheImpl)(aClient.getGeodeCache())).getDefaultPool()) .withArgs(new Object[]{regionName, ph}); ResultCollector rc = exec.execute(func); /** TODO: refactor below code.. change below required in case the function is changed to return in some way **/ List<String[]> output = (List<String[]>)((List) rc.getResult()).get(0); if (output.isEmpty()) { logger.error("No entries found in region= {} with key_prefix= %-{}", regionName, MonarchUtils.KEY_BLOCKS_SFX); return new MonarchSplit[0]; } List<MonarchSplit> list = new ArrayList<>(output.size()); String prefix; long numberOfBlocks; for (final String[] arr : output) { prefix = arr[0].substring(0, arr[0].length() - 6); numberOfBlocks = Long.valueOf(arr[1]); if (numberOfBlocks > splitSize) { Collections.addAll(list, MonarchSplit.getInputSplits(tablePaths[0], prefix, splitSize, numberOfBlocks)); } else { list.add(new MonarchSplit(tablePaths[0], 0, numberOfBlocks, null, prefix)); } } return list.toArray(new MonarchSplit[list.size()]); }
/** * Get input splits for the specified split-size. * * @param regionName the region name * @param splitSize the split-size * @return an array of splits to be read */ private InputSplit[] getSplits(final String regionName, final int splitSize) throws IOException{ JobConf jobConf = new JobConf(); jobConf.set(MonarchUtils.REGION, regionName); jobConf.set("mapred.input.dir", "/home/mgalande"); jobConf.set(MonarchUtils.SPLIT_SIZE_KEY, String.valueOf(splitSize)); jobConf.set(MonarchUtils.MONARCH_TABLE_TYPE, "unordered"); return MonarchSplit.getSplits(jobConf, 1); }
/** * Test using sequential reader. * * @throws Exception */ @Test(dataProvider = "getConf") public void testReader_SequentialReaders(final Configuration conf) throws Exception { // System.out.println("MonarchRecordReaderTest.testReader_SequentialReaders"); long totalRecords = 0; for (InputSplit is : getSplits(regionName, 50)) { totalRecords += readUsingRecordReader(conf, is); } assertEquals(totalRecords, readLineCount); }
/** * Test using sequential reader. * * @throws Exception */ @Test(dataProvider = "getConf") public void testReaderWithSmallerBatchSize(final Configuration conf) throws Exception { // System.out.println("MonarchRecordReaderTest.testReaderWithSmallerBatchSize"); conf.set(MonarchUtils.MONARCH_BATCH_SIZE, "2"); long totalRecords = 0; for (InputSplit is : getSplits(regionName, 10)) { totalRecords += readUsingRecordReader(conf, is); } assertEquals(totalRecords, readLineCount); }
/** * Test reader with predicates.. */ @Test(dataProvider = "getPredicates") public void testReaderWithPredicates(final int expectedCount, final Filter[] phs) throws IOException{ // System.out.println("MonarchRecordReaderTest.testReaderWithPredicates"); long totalRecords = 0; for (InputSplit is : getSplits(regionName, 50)) { totalRecords += readUsingRecordReader(getConfiguration(COLUMNS), is, phs); } assertEquals(totalRecords, expectedCount); }
/** * Read using record reader and assert that the columns not requested have 0 length. * <p> * @param conf the reader configuration -- must have the region name * @param split the input-split containing the records to be read * @param predicates the predicates to filter out unwanted results * @param readColIds the column ids to retrieve * @return total number of records read */ private long readAndAssertOnEmptyCols(final Configuration conf, final InputSplit split, final String readColIds, final Filter[] predicates) throws IOException{ MonarchRecordReader mrr = new MonarchRecordReader(conf); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); if (predicates != null) { for (int i = 0; i < predicates.length; i++) { filterList.addFilter(predicates[i]); } mrr.pushDownfilters = filterList; } // mrr.readColIds = readColIds; /*List<Integer> readColIdList = readColIds == null ? Collections.emptyList() : Arrays.stream(readColIds.split(",")).mapToInt(Integer::valueOf) .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);*/ List<Integer> readColIdList = ColumnProjectionUtils.getReadColumnIDs(conf); long size = 0; try { mrr.initialize(split, conf); Writable key = mrr.createKey(); Writable value = mrr.createValue(); while (mrr.next(key, value)) { BytesRefArrayWritable braw = (BytesRefArrayWritable) value; /** assert that skipped (not read) columns have 0 length **/ for (int i = 0; i < braw.size(); i++) { if (!readColIdList.isEmpty() && !readColIdList.contains(i)) { assertEquals(0, braw.get(i).getLength()); } } ++size; } mrr.close(); } catch (IOException e) { e.printStackTrace(); } return size; }
/** * Assert that only requested columns are fetched and empty cells are returned for rest. * * @param expectedCount expected number of rows * @param colsToGet comma separated list of column ids to retrieve * @param phs the predicate holder to be tested before retrieving the rows */ @Test(dataProvider = "getPredicatesCols") public void testReaderWithSelectedCols(final int expectedCount, final String colsToGet, final String colIDs, final Filter[] phs) throws IOException{ // System.out.println("MonarchRecordReaderTest.testReaderWithSelectedCols"); long totalRecords = 0; for (InputSplit is : getSplits(regionName, 50)) { totalRecords += readAndAssertOnEmptyCols(getConfiguration(colIDs), is, colsToGet, phs); } assertEquals(totalRecords, expectedCount); }