@SuppressWarnings("unchecked") public void configure(JobConf jobConf) { int numberOfThreads = jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10); if (LOG.isDebugEnabled()) { LOG.debug("Configuring jobConf " + jobConf.getJobName() + " to use " + numberOfThreads + " threads"); } this.job = jobConf; //increment processed counter only if skipping feature is enabled this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && SkipBadRecords.getAutoIncrMapperProcCount(job); this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf); // Creating a threadpool of the configured size to execute the Mapper // map method in parallel. executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS, new BlockingArrayQueue (numberOfThreads)); }
public void configure(JobConf job) { super.configure(job); //disable the auto increment of the counter. For streaming, no of //processed records could be different(equal or less) than the no of //records input. SkipBadRecords.setAutoIncrReducerProcCount(job, false); skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false); try { reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8"); reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8"); this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1); } catch (UnsupportedEncodingException e) { throw new RuntimeException("The current system does not support UTF-8 encoding!", e); } }
public void configure(JobConf job) { super.configure(job); //disable the auto increment of the counter. For streaming, no of //processed records could be different(equal or less) than the no of //records input. SkipBadRecords.setAutoIncrMapperProcCount(job, false); skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false); if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) { String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName(); ignoreKey = job.getBoolean("stream.map.input.ignoreKey", inputFormatClassName.equals(TextInputFormat.class.getCanonicalName())); } try { mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8"); mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8"); numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1); } catch (UnsupportedEncodingException e) { throw new RuntimeException("The current system does not support UTF-8 encoding!", e); } }
public App(String[] args) throws Exception{ if(args.length>0) { isReducer = Boolean.parseBoolean(args[0]); } String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS; if(isReducer) { counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS; } BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); String line; int count = 0; while ((line = in.readLine()) != null) { processLine(line); count++; if(count>=10) { System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+ ","+counter+","+count); count = 0; } } }
@SuppressWarnings("unchecked") public void configure(JobConf jobConf) { int numberOfThreads = jobConf.getInt("mapred.map.multithreadedrunner.threads", 10); if (LOG.isDebugEnabled()) { LOG.debug("Configuring jobConf " + jobConf.getJobName() + " to use " + numberOfThreads + " threads"); } this.job = jobConf; //increment processed counter only if skipping feature is enabled this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && SkipBadRecords.getAutoIncrMapperProcCount(job); this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf); // Creating a threadpool of the configured size to execute the Mapper // map method in parallel. executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS, new BlockingArrayQueue (numberOfThreads)); }
public void configure(JobConf job) { super.configure(job); //disable the auto increment of the counter. For streaming, no of //processed records could be different(equal or less) than the no of //records input. SkipBadRecords.setAutoIncrReducerProcCount(job, false); skipping = job.getBoolean("mapred.skip.on", false); try { reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8"); reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8"); this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1); } catch (UnsupportedEncodingException e) { throw new RuntimeException("The current system does not support UTF-8 encoding!", e); } }
public void configure(JobConf job) { super.configure(job); //disable the auto increment of the counter. For streaming, no of //processed records could be different(equal or less) than the no of //records input. SkipBadRecords.setAutoIncrMapperProcCount(job, false); skipping = job.getBoolean("mapred.skip.on", false); if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) { String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName(); ignoreKey = job.getBoolean("stream.map.input.ignoreKey", inputFormatClassName.equals(TextInputFormat.class.getCanonicalName())); } try { mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8"); mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8"); numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1); } catch (UnsupportedEncodingException e) { throw new RuntimeException("The current system does not support UTF-8 encoding!", e); } }
@SuppressWarnings("unchecked") public void configure(JobConf job) { super.configure(job); //disable the auto increment of the counter. For streaming, no of //processed records could be different(equal or less) than the no of //records input. SkipBadRecords.setAutoIncrMapperProcCount(job, false); skipping = job.getBoolean("mapred.skip.on", false); String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName(); ignoreKey = ignoreKey || inputFormatClassName.equals(TextInputFormat.class.getCanonicalName()); try { mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8"); mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8"); numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1); } catch (UnsupportedEncodingException e) { throw new RuntimeException("The current system does not support UTF-8 encoding!", e); } }
public void configure(JobConf job) { super.configure(job); //disable the auto increment of the counter. For streaming, no of //processed records could be different(equal or less) than the no of //records input. SkipBadRecords.setAutoIncrMapperProcCount(job, false); skipping = job.getBoolean("mapred.skip.on", false); if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) { String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName(); ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName()); } try { mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8"); mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8"); numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1); } catch (UnsupportedEncodingException e) { throw new RuntimeException("The current system does not support UTF-8 encoding!", e); } }