/** * Add mapper(the first mapper) that reads input from the input * context and writes to queue */ @SuppressWarnings("unchecked") void addMapper(TaskInputOutputContext inputContext, ChainBlockingQueue<KeyValuePair<?, ?>> output, int index) throws IOException, InterruptedException { Configuration conf = getConf(index); Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class); Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS, Object.class); RecordReader rr = new ChainRecordReader(inputContext); RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output, conf); Mapper.Context mapperContext = createMapContext(rr, rw, (MapContext) inputContext, getConf(index)); MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw); threads.add(runner); }
/** * Add mapper that reads and writes from/to the queue */ @SuppressWarnings("unchecked") void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input, ChainBlockingQueue<KeyValuePair<?, ?>> output, TaskInputOutputContext context, int index) throws IOException, InterruptedException { Configuration conf = getConf(index); Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class); Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class); Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class); Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS, Object.class); RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf); RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output, conf); MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr, rw, context, getConf(index)), rr, rw); threads.add(runner); }
/** Compute sigma */ static void compute(Summation sigma, TaskInputOutputContext<?, ?, NullWritable, TaskResult> context ) throws IOException, InterruptedException { String s; LOG.info(s = "sigma=" + sigma); context.setStatus(s); final long start = System.currentTimeMillis(); sigma.compute(); final long duration = System.currentTimeMillis() - start; final TaskResult result = new TaskResult(sigma, duration); LOG.info(s = "result=" + result); context.setStatus(s); context.write(NullWritable.get(), result); }
ResourceUsageMatcherRunner(final TaskInputOutputContext context, ResourceUsageMetrics metrics) { Configuration conf = context.getConfiguration(); // set the resource calculator plugin Class<? extends ResourceCalculatorPlugin> clazz = conf.getClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN, null, ResourceCalculatorPlugin.class); ResourceCalculatorPlugin plugin = ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf); // set the other parameters this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME); progress = new BoostingProgress(context); // instantiate a resource-usage-matcher matcher = new ResourceUsageMatcher(); matcher.configure(conf, plugin, metrics, progress); }
public static String downloadGFF(TaskInputOutputContext context) throws IOException, URISyntaxException, InterruptedException { Configuration conf = context.getConfiguration(); String gff = HalvadeConf.getGff(context.getConfiguration()); if(gff == null) return null; Boolean refIsLocal = HalvadeConf.getRefIsLocal(context.getConfiguration()); if(refIsLocal) return gff; String refDir = HalvadeConf.getScratchTempDir(conf); if(!refDir.endsWith("/")) refDir = refDir + "/"; String gffSuffix = null; int si = gff.lastIndexOf('.'); if (si > 0) gffSuffix = gff.substring(si); else throw new InterruptedException("Illegal filename for gff file: " + gff); Logger.DEBUG("suffix: " + gffSuffix); HalvadeFileLock lock = new HalvadeFileLock(refDir, HalvadeFileConstants.GFF_LOCK); String filebase = gff.substring(gff.lastIndexOf("/")+1).replace(gffSuffix, ""); FileSystem fs = FileSystem.get(new URI(gff), conf); downloadFileWithLock(fs, lock, gff, refDir + filebase + gffSuffix, context.getConfiguration()); return refDir + filebase + gffSuffix; }
public static long rebuildStarGenome(TaskInputOutputContext context, String bin, String newGenomeDir, String ref, String SJouttab, int sjoverhang, int threads, long mem, String stargtf) throws InterruptedException { Logger.DEBUG("Creating new genome in " + newGenomeDir); String[] command = CommandGenerator.starRebuildGenome(bin, newGenomeDir, ref, SJouttab, sjoverhang, threads, mem, sparseGenome, stargtf); ProcessBuilderWrapper starbuild = new ProcessBuilderWrapper(command, bin); starbuild.startProcess(System.out, System.err); if(!starbuild.isAlive()) throw new ProcessException("STAR rebuild genome", starbuild.getExitState()); int error = starbuild.waitForCompletion(); if(error != 0) throw new ProcessException("STAR aligner load", error); return starbuild.getExecutionTime(); }
/** * Add reducer that reads from context and writes to a queue */ @SuppressWarnings("unchecked") void addReducer(TaskInputOutputContext inputContext, ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException, InterruptedException { Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS, Object.class); Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, Object.class); RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, outputQueue, rConf); Reducer.Context reducerContext = createReduceContext(rw, (ReduceContext) inputContext, rConf); ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw); threads.add(runner); }
/** * <p> * Configures the logging for mapreduce (new api) * </p> * * @param logFileDir * Directory at slave node where log files will be created * @param context * Context * @param isMapper * true if mapper * @throws IOException */ @SuppressWarnings("rawtypes") public static void configureLogging(String logFileDir, TaskInputOutputContext context, boolean isMapper) throws IOException { // combiner logs not required. They were logged in mapper log files. if (isMapper || (!isMapper && !context.getConfiguration().getBoolean( "mapred.task.is.map", true))) { initializeJumbuneLog(); try { LoggerUtil.loadLogger(logFileDir, context.getTaskAttemptID() .toString()); } catch (Exception e) { LOGGER.debug( "Error ocurred while loading logger while running instrumented jar", e); } } }