@Override protected void setup(final Context context) throws IOException, InterruptedException { super.setup(context); final Configuration configuration = context.getConfiguration(); skipWAL = configuration.getBoolean(Constants.MAPREDUCE_INDEX_SKIP_WAL, false); TableName outputTable = TableName.valueOf(configuration.get(TableOutputFormat.OUTPUT_TABLE)); BufferedMutator.ExceptionListener listener = (e, mutator) -> { for (int i = 0; i < e.getNumExceptions(); i++) { LOG.warn("Failed to send put: " + e.getRow(i)); } }; BufferedMutatorParams mutatorParms = new BufferedMutatorParams(outputTable).listener(listener); mutator = getGraph().connection().getBufferedMutator(mutatorParms); }
@Override public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { conf.setOutputFormat(TableOutputFormat.class); conf.setOutputKeyClass(ImmutableBytesWritable.class); conf.setOutputValueClass(Put.class); }