public void run(Context context) throws IOException, InterruptedException { setup(context); int numMappers = chain.getAllMappers().size(); if (numMappers == 0) { return; } ChainBlockingQueue<Chain.KeyValuePair<?, ?>> inputqueue; ChainBlockingQueue<Chain.KeyValuePair<?, ?>> outputqueue; if (numMappers == 1) { chain.runMapper(context, 0); } else { // add all the mappers with proper context // add first mapper outputqueue = chain.createBlockingQueue(); chain.addMapper(context, outputqueue, 0); // add other mappers for (int i = 1; i < numMappers - 1; i++) { inputqueue = outputqueue; outputqueue = chain.createBlockingQueue(); chain.addMapper(inputqueue, outputqueue, context, i); } // add last mapper chain.addMapper(outputqueue, context, numMappers - 1); } // start all threads chain.startAllThreads(); // wait for all threads chain.joinAllThreads(); }
public void run(Context context) throws IOException, InterruptedException { setup(context); // if no reducer is set, just do nothing if (chain.getReducer() == null) { return; } int numMappers = chain.getAllMappers().size(); // if there are no mappers in chain, run the reducer if (numMappers == 0) { chain.runReducer(context); return; } // add reducer and all mappers with proper context ChainBlockingQueue<Chain.KeyValuePair<?, ?>> inputqueue; ChainBlockingQueue<Chain.KeyValuePair<?, ?>> outputqueue; // add reducer outputqueue = chain.createBlockingQueue(); chain.addReducer(context, outputqueue); // add all mappers except last one for (int i = 0; i < numMappers - 1; i++) { inputqueue = outputqueue; outputqueue = chain.createBlockingQueue(); chain.addMapper(inputqueue, outputqueue, context, i); } // add last mapper chain.addMapper(outputqueue, context, numMappers - 1); // start all threads chain.startAllThreads(); // wait for all threads chain.joinAllThreads(); }