private void writeOutput(RecordWriter theRecordWriter, TaskAttemptContext context) throws IOException, InterruptedException { NullWritable nullWritable = NullWritable.get(); try { theRecordWriter.write(key1, val1); theRecordWriter.write(null, nullWritable); theRecordWriter.write(null, val1); theRecordWriter.write(nullWritable, val2); theRecordWriter.write(key2, nullWritable); theRecordWriter.write(key1, null); theRecordWriter.write(null, null); theRecordWriter.write(key2, val2); } finally { theRecordWriter.close(context); } }
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split, org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat, TaskReporter reporter, org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) throws InterruptedException, IOException { this.reporter = reporter; this.inputRecordCounter = reporter .getCounter(TaskCounter.MAP_INPUT_RECORDS); this.fileInputByteCounter = reporter .getCounter(FileInputFormatCounter.BYTES_READ); List <Statistics> matchedStats = null; if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) { matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split) .getPath(), taskContext.getConfiguration()); } fsStats = matchedStats; long bytesInPrev = getInputBytes(fsStats); this.real = inputFormat.createRecordReader(split, taskContext); long bytesInCurr = getInputBytes(fsStats); fileInputByteCounter.increment(bytesInCurr - bytesInPrev); }
/** * Write random values to the writer assuming a table created using * {@link #FAMILIES} as column family descriptors */ private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, KeyValue> writer, TaskAttemptContext context, Set<byte[]> families, int numRows) throws IOException, InterruptedException { byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; int valLength = 10; byte valBytes[] = new byte[valLength]; int taskId = context.getTaskAttemptID().getTaskID().getId(); assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; final byte [] qualifier = Bytes.toBytes("data"); Random random = new Random(); for (int i = 0; i < numRows; i++) { Bytes.putInt(keyBytes, 0, i); random.nextBytes(valBytes); ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); for (byte[] family : families) { KeyValue kv = new KeyValue(keyBytes, family, qualifier, valBytes); writer.write(key, kv); } } }
protected void applyMapperJdbcUrl(TaskAttemptContext context, int mapperId) { Configuration conf = context.getConfiguration(); // Retrieve the JDBC URL that should be used by this mapper. // We achieve this by modifying the JDBC URL property in the // configuration, prior to the OraOopDBRecordWriter's (ancestral) // constructor using the configuration to establish a connection // to the database - via DBConfiguration.getConnection()... String mapperJdbcUrlPropertyName = OraOopUtilities.getMapperJdbcUrlPropertyName(mapperId, conf); // Get this mapper's JDBC URL String mapperJdbcUrl = conf.get(mapperJdbcUrlPropertyName, null); LOG.debug(String.format("Mapper %d has a JDBC URL of: %s", mapperId, mapperJdbcUrl == null ? "<null>" : mapperJdbcUrl)); if (mapperJdbcUrl != null) { conf.set(DBConfiguration.URL_PROPERTY, mapperJdbcUrl); } }
public OraOopDBRecordWriterBase(TaskAttemptContext context, int mapperId) throws ClassNotFoundException, SQLException { super(context); this.mapperId = mapperId; this.mapperRowNumber = 1; Configuration conf = context.getConfiguration(); // Log any info that might be useful to us... logBatchSettings(); // Connect to Oracle... Connection connection = this.getConnection(); String thisOracleInstanceName = OraOopOracleQueries.getCurrentOracleInstanceName(connection); LOG.info(String.format( "This record writer is connected to Oracle via the JDBC URL: \n" + "\t\"%s\"\n" + "\tto the Oracle instance: \"%s\"", connection .toString(), thisOracleInstanceName)); // Initialize the Oracle session... OracleConnectionFactory.initializeOracleConnection(connection, conf); connection.setAutoCommit(false); }
protected void getExportTableAndColumns(TaskAttemptContext context) throws SQLException { Configuration conf = context.getConfiguration(); String schema = context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_OWNER); String localTableName = context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_NAME); if (schema == null || schema.isEmpty() || localTableName == null || localTableName.isEmpty()) { throw new RuntimeException( "Unable to recall the schema and name of the Oracle table " + "being exported."); } this.oracleTable = new OracleTable(schema, localTableName); setOracleTableColumns(OraOopOracleQueries.getTableColumns(this .getConnection(), this.oracleTable, OraOopUtilities .omitLobAndLongColumnsDuringImport(conf), OraOopUtilities .recallSqoopJobType(conf), true // <- onlyOraOopSupportedTypes , false // <- omitOraOopPseudoColumns )); }
private static List<Text> readSplit(InputFormat<LongWritable,Text> format, InputSplit split, Job job) throws IOException, InterruptedException { List<Text> result = new ArrayList<Text>(); Configuration conf = job.getConfiguration(); TaskAttemptContext context = MapReduceTestUtil. createDummyMapTaskAttemptContext(conf); RecordReader<LongWritable, Text> reader = format.createRecordReader(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); MapContext<LongWritable,Text,LongWritable,Text> mcontext = new MapContextImpl<LongWritable,Text,LongWritable,Text>(conf, context.getTaskAttemptID(), reader, null, null, MapReduceTestUtil.createDummyReporter(), split); reader.initialize(split, mcontext); while (reader.nextKeyValue()) { result.add(new Text(reader.getCurrentValue())); } return result; }
/** * Write random values to the writer assuming a table created using * {@link #FAMILIES} as column family descriptors */ private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, TaskAttemptContext context, Set<byte[]> families, int numRows) throws IOException, InterruptedException { byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; int valLength = 10; byte valBytes[] = new byte[valLength]; int taskId = context.getTaskAttemptID().getTaskID().getId(); assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; final byte [] qualifier = Bytes.toBytes("data"); Random random = new Random(); for (int i = 0; i < numRows; i++) { Bytes.putInt(keyBytes, 0, i); random.nextBytes(valBytes); ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); for (byte[] family : families) { Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); writer.write(key, kv); } } }
private static List<Text> readSplit(KeyValueTextInputFormat format, InputSplit split, Job job) throws IOException, InterruptedException { List<Text> result = new ArrayList<Text>(); Configuration conf = job.getConfiguration(); TaskAttemptContext context = MapReduceTestUtil. createDummyMapTaskAttemptContext(conf); RecordReader<Text, Text> reader = format.createRecordReader(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); MapContext<Text, Text, Text, Text> mcontext = new MapContextImpl<Text, Text, Text, Text>(conf, context.getTaskAttemptID(), reader, null, null, MapReduceTestUtil.createDummyReporter(), split); reader.initialize(split, mcontext); while (reader.nextKeyValue()) { result.add(new Text(reader.getCurrentValue())); } reader.close(); return result; }
/** * Factory method that * 1. acquires a chunk for the specified map-task attempt * 2. returns a DynamicInputChunk associated with the acquired chunk-file. * @param taskAttemptContext The attempt-context for the map task that's * trying to acquire a chunk. * @return The acquired dynamic-chunk. The chunk-file is renamed to the * attempt-id (from the attempt-context.) * @throws IOException Exception on failure. * @throws InterruptedException Exception on failure. */ public static DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { if (!areInvariantsInitialized()) initializeChunkInvariants(taskAttemptContext.getConfiguration()); String taskId = taskAttemptContext.getTaskAttemptID().getTaskID().toString(); Path acquiredFilePath = new Path(chunkRootPath, taskId); if (fs.exists(acquiredFilePath)) { LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath); return new DynamicInputChunk(acquiredFilePath, taskAttemptContext); } for (FileStatus chunkFile : getListOfChunkFiles()) { if (fs.rename(chunkFile.getPath(), acquiredFilePath)) { LOG.info(taskId + " acquired " + chunkFile.getPath()); return new DynamicInputChunk(acquiredFilePath, taskAttemptContext); } else LOG.warn(taskId + " could not acquire " + chunkFile.getPath()); } return null; }
@Override public void initialize( final InputSplit inputSplit, final TaskAttemptContext context ) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit)inputSplit; Configuration config = context.getConfiguration(); Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem( config ); long fileLength = fs.getLength( path ); long start = fileSplit.getStart(); long length = fileSplit.getLength(); InputStream in = fs.open( path ); }
@Override public void close(TaskAttemptContext context) throws IOException,InterruptedException { reporter.progress(); if (out != null) { long bytesOutPrev = getOutputBytes(fsStats); out.close(context); long bytesOutCurr = getOutputBytes(fsStats); fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { TaskAttemptContext taskContext = new TaskAttemptContextImpl(taskAttempt.conf, TypeConverter.fromYarn(taskAttempt.attemptId)); taskAttempt.eventHandler.handle(new CommitterTaskAbortEvent( taskAttempt.attemptId, taskContext)); }
public void initialize(InputSplit genericSplit, TaskAttemptContext context) { try { FileSplit split = (FileSplit)genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647); this.start = split.getStart(); this.end = this.start + split.getLength(); Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); this.fileIn = fs.open(file); CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file); if(null != codec) { this.isCompressedInput = true; this.decompressor = CodecPool.getDecompressor(codec); if(codec instanceof SplittableCompressionCodec) { SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream(this.fileIn, this.decompressor, this.start, this.end, SplittableCompressionCodec.READ_MODE.BYBLOCK); this.in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); this.start = cIn.getAdjustedStart(); this.end = cIn.getAdjustedEnd(); this.filePosition = cIn; } else { this.in = new SplitLineReader(codec.createInputStream(this.fileIn, this.decompressor), job, this.recordDelimiterBytes); this.filePosition = this.fileIn; } } else { this.fileIn.seek(this.start); this.in = new SplitLineReader(this.fileIn, job, this.recordDelimiterBytes); this.filePosition = this.fileIn; } if(this.start != 0L) { this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start)); } this.pos = this.start; }catch(Exception ex){ LOG.warn("Exception occurred during initialization {}", ex, ex); } }
@Override public RecordWriter<NullWritable,BytesWritable> getRecordWriter( TaskAttemptContext job) throws IOException { return new ChunkWriter(getDefaultWorkFile(job, ""), job.getConfiguration()); }
/** * Implementation for RecordReader::initialize(). Initializes the internal RecordReader to read from chunks. * * @param inputSplit The InputSplit for the map. Ignored entirely. * @param taskAttemptContext The AttemptContext. * @throws IOException, on failure. * @throws InterruptedException */ @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { numRecordsPerChunk = DynamicInputFormat.getNumEntriesPerChunk(taskAttemptContext.getConfiguration()); this.taskAttemptContext = taskAttemptContext; configuration = taskAttemptContext.getConfiguration(); taskId = taskAttemptContext.getTaskAttemptID().getTaskID(); chunk = DynamicInputChunk.acquire(this.taskAttemptContext); timeOfLastChunkDirScan = System.currentTimeMillis(); isChunkDirAlreadyScanned = false; totalNumRecords = getTotalNumRecords(); }
public ArrayList<String> readRecords(URL testFileUrl, int splitSize) throws IOException { // Set up context File testFile = new File(testFileUrl.getFile()); long testFileSize = testFile.length(); Path testFilePath = new Path(testFile.getAbsolutePath()); Configuration conf = new Configuration(); conf.setInt("io.file.buffer.size", 1); TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); // Gather the records returned by the record reader ArrayList<String> records = new ArrayList<String>(); long offset = 0; while (offset < testFileSize) { FileSplit split = new FileSplit(testFilePath, offset, splitSize, null); LineRecordReader reader = new LineRecordReader(); reader.initialize(split, context); while (reader.nextKeyValue()) { records.add(reader.getCurrentValue().toString()); } offset += splitSize; } return records; }
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { Path path = ((FileSplit)split).getPath(); Configuration conf = context.getConfiguration(); FileSystem fs = path.getFileSystem(conf); this.in = new SequenceFile.Reader(fs, path, conf); this.end = ((FileSplit)split).getStart() + split.getLength(); if (((FileSplit)split).getStart() > in.getPosition()) { in.sync(((FileSplit)split).getStart()); // sync to start } this.start = in.getPosition(); vbytes = in.createValueBytes(); done = start >= end; }
/** * From each split sampled, take the first numSamples / numSplits records. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type public K[] getSample(InputFormat<K,V> inf, Job job) throws IOException, InterruptedException { List<InputSplit> splits = inf.getSplits(job); ArrayList<K> samples = new ArrayList<K>(numSamples); int splitsToSample = Math.min(maxSplitsSampled, splits.size()); int samplesPerSplit = numSamples / splitsToSample; long records = 0; for (int i = 0; i < splitsToSample; ++i) { TaskAttemptContext samplingContext = new TaskAttemptContextImpl( job.getConfiguration(), new TaskAttemptID()); RecordReader<K,V> reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); while (reader.nextKeyValue()) { samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), null)); ++records; if ((i+1) * samplesPerSplit <= records) { break; } } reader.close(); } return (K[])samples.toArray(); }
protected void updateBatchSizeInConfigurationToAllowOracleAppendValuesHint( TaskAttemptContext context) { Configuration conf = context.getConfiguration(); // If using APPEND_VALUES, check the batch size and commit frequency... int originalBatchesPerCommit = conf.getInt(AsyncSqlOutputFormat.STATEMENTS_PER_TRANSACTION_KEY, 0); if (originalBatchesPerCommit != 1) { conf.setInt(AsyncSqlOutputFormat.STATEMENTS_PER_TRANSACTION_KEY, 1); LOG.info(String .format( "The number of batch-inserts to perform per commit has been " + "changed from %d to %d. This is in response " + "to the Oracle APPEND_VALUES hint being used.", originalBatchesPerCommit, 1)); } int originalBatchSize = conf.getInt(AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, 0); int minAppendValuesBatchSize = OraOopUtilities.getMinAppendValuesBatchSize(conf); if (originalBatchSize < minAppendValuesBatchSize) { conf.setInt(AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, minAppendValuesBatchSize); LOG.info(String .format( "The number of rows per batch-insert has been changed from %d " + "to %d. This is in response " + "to the Oracle APPEND_VALUES hint being used.", originalBatchSize, minAppendValuesBatchSize)); } }
public OraOopDBRecordWriterInsert(TaskAttemptContext context, int mapperId, InsertMode insertMode, boolean useAppendValuesOracleHint) throws ClassNotFoundException, SQLException { super(context, mapperId); this.insertMode = insertMode; this.useAppendValuesOracleHint = useAppendValuesOracleHint; }
@Private public void commitTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException { TaskAttemptID attemptId = context.getTaskAttemptID(); if (hasOutputPath()) { context.progress(); if(taskAttemptPath == null) { taskAttemptPath = getTaskAttemptPath(context); } FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); FileStatus taskAttemptDirStatus; try { taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath); } catch (FileNotFoundException e) { taskAttemptDirStatus = null; } if (taskAttemptDirStatus != null) { if (algorithmVersion == 1) { Path committedTaskPath = getCommittedTaskPath(context); if (fs.exists(committedTaskPath)) { if (!fs.delete(committedTaskPath, true)) { throw new IOException("Could not delete " + committedTaskPath); } } if (!fs.rename(taskAttemptPath, committedTaskPath)) { throw new IOException("Could not rename " + taskAttemptPath + " to " + committedTaskPath); } LOG.info("Saved output of task '" + attemptId + "' to " + committedTaskPath); } else { // directly merge everything from taskAttemptPath to output directory mergePaths(fs, taskAttemptDirStatus, outputPath); LOG.info("Saved output of task '" + attemptId + "' to " + outputPath); } } else { LOG.warn("No Output found for " + attemptId); } } else { LOG.warn("Output Path is null in commitTask()"); } }
private void exchangePartitionUniqueMapperTableDataIntoMainExportTable( TaskAttemptContext context) throws SQLException { String schema = context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_OWNER); String localTableName = context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_NAME); OracleTable mainTable = new OracleTable(schema, localTableName); try { long start = System.nanoTime(); OraOopOracleQueries.exchangeSubpartition(this.getConnection(), mainTable, this.subPartitionName, this.oracleTable); double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9); LOG.info(String .format( "Time spent performing an \"exchange subpartition with " + "table\": %f sec.", timeInSec)); } catch (SQLException ex) { throw new SQLException( String .format( "Unable to perform an \"exchange subpartition\" operation " + "for the table %s, for the subpartition named " + "\"%s\" with the table named \"%s\".", mainTable.toString(), this.subPartitionName, this.oracleTable.toString()), ex); } }
@Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.context = context; getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context); delegate.initialize( ((TableSnapshotRegionSplit) split).delegate, context.getConfiguration()); }
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // it really should be the same file split at the time the wrapper instance // was created assert fileSplitIsValid(context); delegate.initialize(fileSplit, context); }
/** * Construct a CompositeRecordReader for the children of this InputFormat * as defined in the init expression. * The outermost join need only be composable, not necessarily a composite. * Mandating TupleWritable isn't strictly correct. */ @SuppressWarnings("unchecked") // child types unknown public RecordReader<K,TupleWritable> createRecordReader(InputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException { setFormat(taskContext.getConfiguration()); return root.createRecordReader(split, taskContext); }
public RecordReader<Void, Text> createRecordReader( InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { context.setStatus(genericSplit.toString()); // cast as per example in TextInputFormat return new SingleFastqRecordReader(context.getConfiguration(), (FileSplit)genericSplit); }
@Override /** {@inheritDoc} */ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException { try { return new OracleExportRecordWriter<K, V>(context); } catch (Exception e) { throw new IOException(e); } }
@Override /** {@inheritDoc} */ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException { try { return new CubridUpsertRecordWriter(context); } catch (Exception e) { throw new IOException(e); } }
/** * Get the default path and filename for the output format. * @param context the task context * @param extension an extension to add to the filename * @return a full path $output/_temporary/$taskid/part-[mr]-$id * @throws IOException */ public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException{ FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(context); return new Path(committer.getWorkPath(), getUniqueFile(context, getOutputName(context), extension)); }
@Override /** {@inheritDoc} */ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException { try { return new DelegatingRecordWriter(context); } catch (ClassNotFoundException cnfe) { throw new IOException(cnfe); } }
public DelegatingRecordWriter(TaskAttemptContext context) throws ClassNotFoundException { this.conf = context.getConfiguration(); @SuppressWarnings("unchecked") Class<? extends FieldMapProcessor> procClass = (Class<? extends FieldMapProcessor>) conf.getClass(DELEGATE_CLASS_KEY, null); this.mapProcessor = ReflectionUtils.newInstance(procClass, this.conf); }
/** * Validate setupGenerateDistCacheData by validating <li>permissions of the * distributed cache directories and <li>content of the generated sequence * file. This includes validation of dist cache file paths and their file * sizes. */ private void validateSetupGenDC(Configuration jobConf, long[] sortedFileSizes) throws IOException, InterruptedException { // build things needed for validation long sumOfFileSizes = 0; for (int i = 0; i < sortedFileSizes.length; i++) { sumOfFileSizes += sortedFileSizes[i]; } FileSystem fs = FileSystem.get(jobConf); assertEquals("Number of distributed cache files to be generated is wrong.", sortedFileSizes.length, jobConf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1)); assertEquals("Total size of dist cache files to be generated is wrong.", sumOfFileSizes, jobConf.getLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1)); Path filesListFile = new Path( jobConf.get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST)); FileStatus stat = fs.getFileStatus(filesListFile); assertEquals("Wrong permissions of dist Cache files list file " + filesListFile, new FsPermission((short) 0644), stat.getPermission()); InputSplit split = new FileSplit(filesListFile, 0, stat.getLen(), (String[]) null); TaskAttemptContext taskContext = MapReduceTestUtil .createDummyMapTaskAttemptContext(jobConf); RecordReader<LongWritable, BytesWritable> reader = new GenerateDistCacheData.GenDCDataFormat() .createRecordReader(split, taskContext); MapContext<LongWritable, BytesWritable, NullWritable, BytesWritable> mapContext = new MapContextImpl<LongWritable, BytesWritable, NullWritable, BytesWritable>( jobConf, taskContext.getTaskAttemptID(), reader, null, null, MapReduceTestUtil.createDummyReporter(), split); reader.initialize(split, mapContext); // start validating setupGenerateDistCacheData doValidateSetupGenDC(reader, fs, sortedFileSizes); }
protected CombineFileRecordReaderWrapper(FileInputFormat<K,V> inputFormat, CombineFileSplit split, TaskAttemptContext context, Integer idx) throws IOException, InterruptedException { fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations()); delegate = inputFormat.createRecordReader(fileSplit, context); }
@Private public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath ) throws IOException { if(hasOutputPath()) { if(taskAttemptPath == null) { taskAttemptPath = getTaskAttemptPath(context); } FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); return fs.exists(taskAttemptPath); } return false; }
@Test public void testRecordReaderInit() throws InterruptedException, IOException { // Test that we properly initialize the child recordreader when // CombineFileInputFormat and CombineFileRecordReader are used. TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0); Configuration conf1 = new Configuration(); conf1.set(DUMMY_KEY, "STATE1"); TaskAttemptContext context1 = new TaskAttemptContextImpl(conf1, taskId); // This will create a CombineFileRecordReader that itself contains a // DummyRecordReader. InputFormat inputFormat = new ChildRRInputFormat(); Path [] files = { new Path("file1") }; long [] lengths = { 1 }; CombineFileSplit split = new CombineFileSplit(files, lengths); RecordReader rr = inputFormat.createRecordReader(split, context1); assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader); // Verify that the initial configuration is the one being used. // Right after construction the dummy key should have value "STATE1" assertEquals("Invalid initial dummy key value", "STATE1", rr.getCurrentKey().toString()); // Switch the active context for the RecordReader... Configuration conf2 = new Configuration(); conf2.set(DUMMY_KEY, "STATE2"); TaskAttemptContext context2 = new TaskAttemptContextImpl(conf2, taskId); rr.initialize(split, context2); // And verify that the new context is updated into the child record reader. assertEquals("Invalid secondary dummy key value", "STATE2", rr.getCurrentKey().toString()); }
@Override /** {@inheritDoc} */ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException { try { return new ExportCallRecordWriter(context); } catch (Exception e) { throw new IOException(e); } }
@Override /** {@inheritDoc} */ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException { try { return new SQLServerUpdateRecordWriter(context); } catch (Exception e) { throw new IOException(e); } }
@Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { split = (MainframeDatasetInputSplit)inputSplit; conf = taskAttemptContext.getConfiguration(); inputClass = (Class<T>) (conf.getClass( DBConfiguration.INPUT_CLASS_PROPERTY, null)); key = null; datasetRecord = null; numberRecordRead = 0; datasetProcessed = 0; }
/** * Constructor invoked by CombineFileRecordReader that identifies part of a * CombineFileSplit to use. */ public CombineShimRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException { this.index = index; this.split = (CombineFileSplit) split; this.context = context; createChildReader(); }