public int run(String[] args) throws Exception { System.out.println(Arrays.toString(args)); /* getting the chunk of data and converting to corresponding Key Value Pairs */ @SuppressWarnings("deprecation") String intermediateFileDir = "tmp"; String intermediateFileDirFile =intermediateFileDir +"/part-r-00000"; JobControl control = new JobControl("ChainMapReduce"); ControlledJob step1 = new ControlledJob(jobListFriends(args[0], intermediateFileDir), null); ControlledJob step2 = new ControlledJob(jobRecommendFriends(intermediateFileDirFile, args[1]), Arrays.asList(step1)); control.addJob(step1); control.addJob(step2); Thread workFlowThread = new Thread(control, "workflowthread"); workFlowThread.setDaemon(true); workFlowThread.start(); return 0; }
@Test public void testDefaultParallel() throws Throwable { pc.defaultParallel = 100; String query = "a = load 'input';" + "b = group a by $0;" + "store b into 'output';"; PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); PhysicalPlan pp = Util.buildPp(ps, query); MROperPlan mrPlan = Util.buildMRPlan(pp, pc); ConfigurationValidator.validatePigProperties(pc.getProperties()); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); JobControlCompiler jcc = new JobControlCompiler(pc, conf); JobControl jobControl = jcc.compile(mrPlan, "Test"); Job job = jobControl.getWaitingJobs().get(0); int parallel = job.getJobConf().getNumReduceTasks(); assertEquals(100, parallel); Util.assertParallelValues(100, -1, -1, 100, job.getJobConf()); pc.defaultParallel = -1; }
/** * Test parallelism for group by constant * @throws Throwable */ @Test public void testGroupConstWithParallel() throws Throwable { PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties()); pc.defaultParallel = 100; pc.connect(); String query = "a = load 'input';\n" + "b = group a by 1;" + "store b into 'output';"; PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() ); PhysicalPlan pp = Util.buildPp( pigServer, query ); MROperPlan mrPlan = Util.buildMRPlan(pp, pc); ConfigurationValidator.validatePigProperties(pc.getProperties()); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); JobControlCompiler jcc = new JobControlCompiler(pc, conf); JobControl jobControl = jcc.compile(mrPlan, "Test"); Job job = jobControl.getWaitingJobs().get(0); int parallel = job.getJobConf().getNumReduceTasks(); assertEquals("parallism", 1, parallel); }
/** * Test parallelism for group by column * @throws Throwable */ @Test public void testGroupNonConstWithParallel() throws Throwable { PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties()); pc.defaultParallel = 100; pc.connect(); PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() ); String query = "a = load 'input';\n" + "b = group a by $0;" + "store b into 'output';"; PhysicalPlan pp = Util.buildPp( pigServer, query ); MROperPlan mrPlan = Util.buildMRPlan(pp, pc); ConfigurationValidator.validatePigProperties(pc.getProperties()); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); JobControlCompiler jcc = new JobControlCompiler(pc, conf); JobControl jobControl = jcc.compile(mrPlan, "Test"); Job job = jobControl.getWaitingJobs().get(0); int parallel = job.getJobConf().getNumReduceTasks(); assertEquals("parallism", 100, parallel); }
@Test public void testDefaultParallel() throws Throwable { pc.defaultParallel = 100; String query = "a = load 'input';" + "b = group a by $0;" + "store b into 'output';"; PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); PhysicalPlan pp = Util.buildPp(ps, query); MROperPlan mrPlan = Util.buildMRPlan(pp, pc); ConfigurationValidator.validatePigProperties(pc.getProperties()); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); JobControlCompiler jcc = new JobControlCompiler(pc, conf); JobControl jobControl = jcc.compile(mrPlan, "Test"); Job job = jobControl.getWaitingJobs().get(0); int parallel = job.getJobConf().getNumReduceTasks(); assertTrue(parallel==100); pc.defaultParallel = -1; }
public static JobControl createValueAggregatorJobs(String args[] , Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException { JobControl theControl = new JobControl("ValueAggregatorJobs"); ArrayList<Job> dependingJobs = new ArrayList<Job>(); JobConf aJobConf = createValueAggregatorJob(args); if(descriptors != null) setAggregatorDescriptors(aJobConf, descriptors); Job aJob = new Job(aJobConf, dependingJobs); theControl.addJob(aJob); return theControl; }
public static JobControl createValueAggregatorJobs(String args[] , Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException { JobControl theControl = new JobControl("ValueAggregatorJobs"); ArrayList<Job> dependingJobs = new ArrayList<Job>(); JobConf aJobConf = createValueAggregatorJob(args, (Class<?>) null); if(descriptors != null) setAggregatorDescriptors(aJobConf, descriptors); Job aJob = new Job(aJobConf, dependingJobs); theControl.addJob(aJob); return theControl; }
/** * Compute the progress of the current job submitted * through the JobControl object jc to the JobClient jobClient * @param jc - The JobControl object that has been submitted * @param jobClient - The JobClient to which it has been submitted * @return The progress as a precentage in double format * @throws IOException */ protected double calculateProgress(JobControl jc, JobClient jobClient) throws IOException{ double prog = 0.0; prog += jc.getSuccessfulJobs().size(); List runnJobs = jc.getRunningJobs(); for (Object object : runnJobs) { Job j = (Job)object; prog += progressOfRunningJob(j, jobClient); } return prog; }
/** * Compute the progress of the current job submitted through the JobControl * object jc to the JobClient jobClient * * @param jc * - The JobControl object that has been submitted * @param jobClient * - The JobClient to which it has been submitted * @return The progress as a precentage in double format * @throws IOException */ protected double calculateProgress(JobControl jc) throws IOException { double prog = 0.0; prog += jc.getSuccessfulJobs().size(); List<Job> runnJobs = jc.getRunningJobs(); for (Job j : runnJobs) { prog += HadoopShims.progressOfRunningJob(j); } return prog; }
@Override public void checkGroupConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception { MROperPlan mrPlan = Util.buildMRPlan(pp, pc); ConfigurationValidator.validatePigProperties(pc.getProperties()); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); JobControlCompiler jcc = new JobControlCompiler(pc, conf); JobControl jobControl = jcc.compile(mrPlan, "Test"); Job job = jobControl.getWaitingJobs().get(0); int parallel = job.getJobConf().getNumReduceTasks(); assertEquals("parallism", 1, parallel); }
@Override public void checkGroupNonConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception { MROperPlan mrPlan = Util.buildMRPlan(pp, pc); ConfigurationValidator.validatePigProperties(pc.getProperties()); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); JobControlCompiler jcc = new JobControlCompiler(pc, conf); JobControl jobControl = jcc.compile(mrPlan, "Test"); Job job = jobControl.getWaitingJobs().get(0); int parallel = job.getJobConf().getNumReduceTasks(); assertEquals("parallism", 100, parallel); }
/** * specifically tests that REGISTERED jars get added to distributed cache * @throws Exception */ @Test public void testJarAddedToDistributedCache() throws Exception { // creating a jar with a UDF *not* in the current classloader File tmpFile = File.createTempFile("Some_", ".jar"); tmpFile.deleteOnExit(); String className = createTestJar(tmpFile); final String testUDFFileName = className+".class"; // JobControlCompiler setup PigServer pigServer = new PigServer(ExecType.MAPREDUCE); PigContext pigContext = pigServer.getPigContext(); pigContext.connect(); pigContext.addJar(tmpFile.getAbsolutePath()); JobControlCompiler jobControlCompiler = new JobControlCompiler(pigContext, CONF); MROperPlan plan = new MROperPlan(); MapReduceOper mro = new MapReduceOper(new OperatorKey()); mro.UDFs = new HashSet<String>(); mro.UDFs.add(className+"()"); plan.add(mro); // compiling the job JobControl jobControl = jobControlCompiler.compile(plan , "test"); JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf(); // verifying the jar gets on distributed cache Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf); // guava jar is not shipped with Hadoop 2.x Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length); Path distributedCachePath = fileClassPaths[0]; Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName()); // hadoop bug requires path to not contain hdfs://hotname in front Assert.assertTrue("starts with /: "+distributedCachePath, distributedCachePath.toString().startsWith("/")); Assert.assertTrue("jar pushed to distributed cache should contain testUDF", jarContainsFileNamed(new File(fileClassPaths[0].toUri().getPath()), testUDFFileName)); }
private JobConf compileTestJob(final PigContext pigContext, Configuration conf) throws JobCreationException { final JobControlCompiler jobControlCompiler = new JobControlCompiler( pigContext, conf); final MROperPlan plan = new MROperPlan(); plan.add(new MapReduceOper(new OperatorKey())); final JobControl jobControl = jobControlCompiler.compile(plan, "test"); final JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf(); return jobConf; }
@Override public void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception { MROperPlan mrPlan = Util.buildMRPlan(pp, pc); ConfigurationValidator.validatePigProperties(pc.getProperties()); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); JobControlCompiler jcc = new JobControlCompiler(pc, conf); JobControl jobControl = jcc.compile(mrPlan, "Test"); Job job = jobControl.getWaitingJobs().get(0); int parallel = job.getJobConf().getNumReduceTasks(); assertEquals(100, parallel); Util.assertParallelValues(100, -1, -1, 100, job.getJobConf()); }
/** * Compiles all jobs that have no dependencies removes them from * the plan and returns. Should be called with the same plan until * exhausted. * @param plan - The MROperPlan to be compiled * @param grpName - The name given to the JobControl * @return JobControl object - null if no more jobs in plan * @throws JobCreationException */ public JobControl compile(MROperPlan plan, String grpName) throws JobCreationException{ // Assert plan.size() != 0 this.plan = plan; JobControl jobCtrl = new JobControl(grpName); try { List<MapReduceOper> roots = new LinkedList<MapReduceOper>(); roots.addAll(plan.getRoots()); for (MapReduceOper mro: roots) { if(mro instanceof NativeMapReduceOper) { return null; } Job job = getJob(mro, conf, pigContext); jobMroMap.put(job, mro); jobCtrl.addJob(job); } } catch (JobCreationException jce) { throw jce; } catch(Exception e) { int errCode = 2017; String msg = "Internal error creating job configuration."; throw new JobCreationException(msg, errCode, PigException.BUG, e); } return jobCtrl; }