@Override public void process(Annotation annotation, Job job, Object target) throws ToolException { TableOutput tableOutput = (TableOutput)annotation; // Base setup of the table job Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); // Add dependencies try { TableMapReduceUtil.addDependencyJars(job); } catch (IOException e) { throw new ToolException(e); } // Set table output format job.setOutputFormatClass(TableOutputFormat.class); // Set the table name String tableName = (String)this.evaluateExpression(tableOutput.value()); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName); }
/** * Main entry point. * * @param args The command line parameters. * @throws Exception When running the job fails. */ public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); CommandLine cmd = parseArgs(otherArgs); // check debug flag and other options if (cmd.hasOption("d")) conf.set("conf.debug", "true"); // get details String table = cmd.getOptionValue("t"); String input = cmd.getOptionValue("i"); // create job and set classes etc. Job job = Job.getInstance(conf, "Import from file " + input + " into table " + table); job.setJarByClass(ImportJsonFromFile.class); job.setMapperClass(ImportMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(input)); // run the job System.exit(job.waitForCompletion(true) ? 0 : 1); }
@Override public void setStoreLocation(String location, Job job) throws IOException { if (location.startsWith("hbase://")){ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location.substring(8)); }else{ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location); } String serializedSchema = getUDFProperties().getProperty(contextSignature + "_schema"); if (serializedSchema!= null) { schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema); } initialiseHBaseClassLoaderResources(job); m_conf = initializeLocalJobConfig(job); // Not setting a udf property and getting the hbase delegation token // only once like in setLocation as setStoreLocation gets different Job // objects for each call and the last Job passed is the one that is // launched. So we end up getting multiple hbase delegation tokens. addHBaseDelegationToken(m_conf, job); }
/** * Converts runtime edge data to persistent edge data (includes * source/target vertex data) and writes it to HBase. * * @param collection Graph collection * @throws IOException */ private void writeEdges(final GraphCollection collection) throws IOException { DataSet<PersistentEdge<Vertex>> persistentEdgeDataSet = collection .getVertices() // join vertex with edges on edge source vertex id .join(collection.getEdges()) .where(new Id<>()) .equalTo(new SourceId<>()) // join result with vertices on edge target vertex id .join(collection.getVertices()) .where("f1.targetId") .equalTo(new Id<>()) // ((source-vertex-data, edge-data), target-vertex-data) .with(new BuildPersistentEdge<>(getHBaseConfig().getPersistentEdgeFactory())); // write (persistent-edge-data) to HBase table Job job = Job.getInstance(); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, getHBaseConfig().getEdgeTableName()); persistentEdgeDataSet .map(new BuildEdgeMutation<>(getHBaseConfig().getEdgeHandler())) .output(new HadoopOutputFormat<>(new TableOutputFormat<>(), job)); }
@Override public void setStoreLocation(String location, Job job) throws IOException { if (location.startsWith("hbase://")){ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location.substring(8)); }else{ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location); } String serializedSchema = getUDFProperties().getProperty(contextSignature + "_schema"); if (serializedSchema!= null) { schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema); } m_conf = initializeLocalJobConfig(job); // Not setting a udf property and getting the hbase delegation token // only once like in setLocation as setStoreLocation gets different Job // objects for each call and the last Job passed is the one that is // launched. So we end up getting multiple hbase delegation tokens. addHBaseDelegationToken(m_conf, job); }
public void run() throws Exception{ long startTime = System.currentTimeMillis(); Configuration conf = new Configuration(); conf.set(TableOutputFormat.OUTPUT_TABLE, Constants.hbase_user_item_pref_table); Job job = Job.getInstance(conf, "hbasewriter"+System.currentTimeMillis()); job.setJarByClass(UpdateCFJob.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(HBaseWriteReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.addInputPath(job, new Path(input)); long endTime = System.currentTimeMillis(); boolean isFinish = job.waitForCompletion(true); if(isFinish){ logger.info("UpdateCFJob job ["+job.getJobName()+"] run finish.it costs"+ (endTime - startTime) / 1000 +"s."); } else { logger.error("UpdateCFJob job ["+job.getJobName()+"] run failed."); } }
/** * Uses the HBase Front Door Api to write to index table. Submits the job and either returns or * waits for the job completion based on runForeground parameter. * * @param job job * @param outputPath output path * @param runForeground - if true, waits for job completion, else submits and returns * immediately. * @throws Exception */ private void configureSubmittableJobUsingDirectApi(Job job, Path outputPath, TableName outputTableName, boolean skipDependencyJars, boolean runForeground) throws Exception { job.setMapperClass(getDirectMapperClass()); job.setReducerClass(getDirectReducerClass()); Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); conf.set(TableOutputFormat.OUTPUT_TABLE, outputTableName.getNameAsString()); //Set the Output classes job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); if (!skipDependencyJars) { TableMapReduceUtil.addDependencyJars(job); } job.setNumReduceTasks(1); if (!runForeground) { LOG.info("Running Index Build in Background - Submit async and exit"); job.submit(); return; } LOG.info("Running Index Build in Foreground. Waits for the build to complete. This may take a long time!."); boolean result = job.waitForCompletion(true); if (!result) { LOG.error("IndexTool job failed!"); throw new Exception("IndexTool job failed: " + job.toString()); } FileSystem.get(conf).delete(outputPath, true); }
/** * Main entry point. * * @param args * The command line parameters. * @throws Exception * When running the job fails. */ public static void main(String[] args) throws Exception { //Configuration conf = HBaseConfiguration.create(); Configuration conf = AppConfig.getConfiguration(); String table = "jsontable"; String input = "/user/lhfei/test-data.txt"; String column = "data:json"; //args = new String[]{"-tjsontable -isrc/test/resources/test-data.txt -cdata:json"}; if(null != args){ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); CommandLine cmd = parseArgs(otherArgs); // check debug flag and other options if (cmd.hasOption("d")) conf.set("conf.debug", "true"); // get details table = cmd.getOptionValue("t"); input = cmd.getOptionValue("i"); column = cmd.getOptionValue("c"); } conf.set("conf.column", column); Job job = Job.getInstance(conf, "Import from file " + input + " into table " + table); job.setJarByClass(ImportFromFile.class); job.setMapperClass(ImportMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(input)); System.exit(job.waitForCompletion(true) ? 0 : 1); }
/** * Main entry point. * * @param args The command line parameters. * @throws Exception When running the job fails. */ public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); CommandLine cmd = parseArgs(otherArgs); // check debug flag and other options if (cmd.hasOption("d")) conf.set("conf.debug", "true"); // get details String table = cmd.getOptionValue("t"); String input = cmd.getOptionValue("i"); String column = cmd.getOptionValue("c"); conf.set("conf.column", column); // vv ImportFromFile2 Job job = Job.getInstance(conf, "Import from file " + input + " into table " + table); job.setJarByClass(ImportFromFile2.class); job.setMapperClass(ImportMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(input)); /*[*/TableMapReduceUtil.addDependencyJars(job);/*]*/ // co ImportFromFile2-1-AddDeps Add dependencies to the configuration. // ^^ ImportFromFile2 System.exit(job.waitForCompletion(true) ? 0 : 1); }
public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl, boolean addDependencyJars, Class<? extends OutputFormat> outputFormatClass) throws IOException { Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); job.setOutputFormatClass(outputFormatClass); if (reducer != null) job.setReducerClass(reducer); conf.set(TableOutputFormat.OUTPUT_TABLE, table); // If passed a quorum/ensemble address, pass it on to TableOutputFormat. if (quorumAddress != null) { // Calling this will validate the format ZKUtil.transformClusterKey(quorumAddress); conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress); } if (serverClass != null && serverImpl != null) { conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); } job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); if (partitioner == HRegionPartitioner.class) { job.setPartitionerClass(HRegionPartitioner.class); HTable outputTable = new HTable(conf, table); int regions = outputTable.getRegionsInfo().size(); if (job.getNumReduceTasks() > regions) { job.setNumReduceTasks(outputTable.getRegionsInfo().size()); } } else if (partitioner != null) { job.setPartitionerClass(partitioner); } if (addDependencyJars) { addDependencyJars(job); } TableMapReduceUtil.initCredentials(job); }
/** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * * @param table The output table. * @param reducer The reducer class to use. * @param job The current job to adjust. Make sure the passed job is * carrying all necessary HBase configuration. * @param partitioner Partitioner to use. Pass <code>null</code> to use * default partitioner. * @param quorumAddress Distant cluster to write to; default is null for * output to the cluster that is designated in <code>hbase-site.xml</code>. * Set this String to the zookeeper ensemble of an alternate remote cluster * when you would have the reduce write a cluster that is other than the * default; e.g. copying tables between clusters, the source would be * designated by <code>hbase-site.xml</code> and this param would have the * ensemble address of the remote cluster. The format to pass is particular. * Pass <code> <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent> * </code> such as <code>server,server2,server3:2181:/hbase</code>. * @param serverClass redefined hbase.regionserver.class * @param serverImpl redefined hbase.regionserver.impl * @param addDependencyJars upload HBase jars and jars for any of the configured * job classes via the distributed cache (tmpjars). * @throws IOException When determining the region count fails. */ public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl, boolean addDependencyJars) throws IOException { Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); job.setOutputFormatClass(TableOutputFormat.class); if (reducer != null) job.setReducerClass(reducer); conf.set(TableOutputFormat.OUTPUT_TABLE, table); // If passed a quorum/ensemble address, pass it on to TableOutputFormat. if (quorumAddress != null) { // Calling this will validate the format ZKUtil.transformClusterKey(quorumAddress); conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); } if (serverClass != null && serverImpl != null) { conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); } job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); if (partitioner == HRegionPartitioner.class) { job.setPartitionerClass(HRegionPartitioner.class); HTable outputTable = new HTable(conf, table); int regions = outputTable.getRegionsInfo().size(); if (job.getNumReduceTasks() > regions) { job.setNumReduceTasks(outputTable.getRegionsInfo().size()); } } else if (partitioner != null) { job.setPartitionerClass(partitioner); } if (addDependencyJars) { addDependencyJars(job); } initCredentials(job); }
@Override public OutputFormat getOutputFormat() throws IOException { if (outputFormat == null) { if (m_conf == null) { throw new IllegalStateException("setStoreLocation has not been called"); } else { this.outputFormat = new TableOutputFormat(); this.outputFormat.setConf(m_conf); } } return outputFormat; }
/** * Converts runtime graph data to persistent graph data (including vertex * and edge identifiers) and writes it to HBase. * * @param collection Graph collection * @throws IOException */ private void writeGraphHeads(final GraphCollection collection) throws IOException { // build (graph-id, vertex-id) tuples from vertices DataSet<Tuple2<GradoopId, GradoopId>> graphIdToVertexId = collection.getVertices() .flatMap(new PairGraphIdWithElementId<>()); // build (graph-id, edge-id) tuples from vertices DataSet<Tuple2<GradoopId, GradoopId>> graphIdToEdgeId = collection.getEdges() .flatMap(new PairGraphIdWithElementId<>()); // co-group (graph-id, vertex-id) and (graph-id, edge-id) tuples to // (graph-id, {vertex-id}, {edge-id}) triples DataSet<Tuple3<GradoopId, GradoopIdSet, GradoopIdSet>> graphToVertexIdsAndEdgeIds = graphIdToVertexId .coGroup(graphIdToEdgeId) .where(0) .equalTo(0) .with(new BuildGraphTransactions()); // join (graph-id, {vertex-id}, {edge-id}) triples with // (graph-id, graph-data) and build (persistent-graph-data) DataSet<PersistentGraphHead> persistentGraphDataSet = graphToVertexIdsAndEdgeIds .join(collection.getGraphHeads()) .where(0).equalTo(new Id<>()) .with(new BuildPersistentGraphHead<>(getHBaseConfig().getPersistentGraphHeadFactory())); // write (persistent-graph-data) to HBase table Job job = Job.getInstance(); job.getConfiguration().set( TableOutputFormat.OUTPUT_TABLE, getHBaseConfig().getGraphTableName()); persistentGraphDataSet // FIXME remove forced cast... .map(new BuildGraphHeadMutation((GraphHeadHandler<PersistentGraphHead>) ((Object) getHBaseConfig().getGraphHeadHandler()))) .output(new HadoopOutputFormat<>(new TableOutputFormat<>(), job)); }
/** * Converts runtime vertex data to persistent vertex data (includes * incoming and outgoing edge data) and writes it to HBase. * * @param collection Graph collection * @throws IOException */ private void writeVertices(final GraphCollection collection) throws IOException { // group edges by source vertex id (vertex-id, [out-edge]) DataSet<Tuple2<GradoopId, Set<Edge>>> vertexToOutgoingEdges = collection.getEdges() .groupBy(new SourceId<>()) .reduceGroup(new EdgeSetBySourceId<>()); // group edges by target vertex id (vertex-id, [in-edge]) DataSet<Tuple2<GradoopId, Set<Edge>>> vertexToIncomingEdges = collection.getEdges() .groupBy(new TargetId<>()) .reduceGroup(new EdgeSetByTargetId<>()); // co-group (vertex-data) with (vertex-id, [out-edge]) DataSet<Tuple2<Vertex, Set<Edge>>> vertexDataWithOutgoingEdges = collection .getVertices() .coGroup(vertexToOutgoingEdges) .where(new Id<Vertex>()).equalTo(0) .with(new BuildVertexDataWithEdges<>()); // co-group // (vertex, (vertex-id, [out-edge])) with (vertex-id, [in-edge]) DataSet<PersistentVertex<Edge>> persistentVertexDataSet = vertexDataWithOutgoingEdges .coGroup(vertexToIncomingEdges) .where("f0.id").equalTo(0) .with(new BuildPersistentVertex<>(getHBaseConfig().getPersistentVertexFactory())); // write (persistent-vertex-data) to HBase table Job job = Job.getInstance(); job.getConfiguration() .set(TableOutputFormat.OUTPUT_TABLE, getHBaseConfig().getVertexTableName()); persistentVertexDataSet .map(new BuildVertexMutation<>(getHBaseConfig().getVertexHandler())) .output(new HadoopOutputFormat<>(new TableOutputFormat<>(), job)); }
@Override public OutputFormat getOutputFormat() throws IOException { if (outputFormat == null) { this.outputFormat = new TableOutputFormat(); m_conf = initialiseHBaseConfig(m_conf); this.outputFormat.setConf(m_conf); } return outputFormat; }
@Override public void setStoreLocation(String location, Job job) throws IOException { if (location.startsWith("hbase://")){ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location.substring(8)); }else{ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location); } String serializedSchema = getUDFProperties().getProperty(contextSignature + "_schema"); if (serializedSchema!= null) { schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema); } m_conf = initialiseHBaseClassLoaderResources(job); }
/** * This configures this tool with the given table and file * @param inputTable * @param outputFile * @return * @throws IOException */ public StatisticsTool configure(String eventLogTable, String userTable, String outputTable) throws IOException { setBasics(); setJar(Config.getInstance().get(Config.MAPRED_MOVIELENS)); // Config zieook = Config.getInstance(); cp = task.getConfig().get(TaskConfig.CP); if (cp == null) { throw new IOException("content provider not set, please set <" + TaskConfig.CP + "> in the task configuration"); } collection = task.getConfig().get(TaskConfig.COLLECTION); if (collection == null) { throw new IOException("collection not set, please set <" + TaskConfig.COLLECTION + "> in the task configuration"); } // tmpFile = new Path(zieook.get(Config.ZIEOOK_HDFS_SERVER) + zieook.get(Config.ZIEOOK_HDFS_PATH), cp // + "/statistics/tmp"); this.eventLogTable = eventLogTable; this.userTable = userTable; getConf().set(TableOutputFormat.OUTPUT_TABLE, outputTable); startDate = task.getConfig().getLong(TaskConfig.STATS_START, null); endDate = task.getConfig().getLong(TaskConfig.STATS_END, null); Config zieook = Config.getInstance(); tempDirPath = new Path(zieook.get(Config.ZIEOOK_HDFS_SERVER) + zieook.get(Config.ZIEOOK_HDFS_PATH), cp + "/" + collection + "/statistics"); return this; }
@Override protected void setup(Context context) throws IOException, InterruptedException { config = context.getConfiguration(); table = new HTable(config, Bytes.toBytes(config .get(TableOutputFormat.OUTPUT_TABLE))); }
public static void main(String[] args) throws Exception { if (!parseParameters(args)) { return; } // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input data DataSet<String> text = getTextDataSet(env); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word, 1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); // emit result Job job = Job.getInstance(); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName); // TODO is "mapred.output.dir" really useful? job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR); counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() { private transient Tuple2<Text, Mutation> reuse; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); reuse = new Tuple2<Text, Mutation>(); } @Override public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception { reuse.f0 = new Text(t.f0); Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET)); put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1)); reuse.f1 = put; return reuse; } }).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job)); // execute program env.execute("WordCount (HBase sink) Example"); }
public static void main(String[] args) throws Exception { if(!parseParameters(args)) { return; } // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input data DataSet<String> text = getTextDataSet(env); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); // emit result Job job = Job.getInstance(); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName); // TODO is "mapred.output.dir" really useful? job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR); counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() { private transient Tuple2<Text, Mutation> reuse; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); reuse = new Tuple2<Text, Mutation>(); } @Override public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception { reuse.f0 = new Text(t.f0); Put put = new Put(t.f0.getBytes()); put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1)); reuse.f1 = put; return reuse; } }).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job)); // execute program env.execute("WordCount (HBase sink) Example"); }
/** * @param conf to use to create and run the job. Should be an HBase * configuration. * @param input path to the processFile * @param totalJobCount the total * number of jobs that need to be run in this batch. Used in job * name. * @return whether all job confs were loaded properly. * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ private boolean runRawLoaderJob(Configuration myHBaseConf, String input, int totalJobCount) throws IOException, InterruptedException, ClassNotFoundException { boolean success; // Turn off speculative execution. // Note: must be BEFORE the job construction with the new mapreduce API. myHBaseConf.setBoolean("mapred.map.tasks.speculative.execution", false); // Set up job Job job = new Job(myHBaseConf, getJobName(totalJobCount)); job.setJarByClass(JobFileRawLoader.class); Path inputPath = new Path(input); if (hdfs.exists(inputPath)) { // Set input job.setInputFormatClass(SequenceFileInputFormat.class); SequenceFileInputFormat.setInputPaths(job, inputPath); job.setMapperClass(JobFileRawLoaderMapper.class); // Set the output format to push data into HBase. job.setOutputFormatClass(TableOutputFormat.class); TableMapReduceUtil.initTableReducerJob(Constants.HISTORY_RAW_TABLE, null, job); job.setOutputKeyClass(JobFileRawLoaderMapper.getOutputKeyClass()); job.setOutputValueClass(JobFileRawLoaderMapper.getOutputValueClass()); // This is a map-only class, skip reduce step job.setNumReduceTasks(0); // Run the job success = job.waitForCompletion(true); if (success) { success = hdfs.delete(inputPath, false); } } else { System.err.println("Unable to find processFile: " + inputPath); success = false; } return success; }
/** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * * @param table The output Splice table name, The format should be Schema.tableName. * @param reducer The reducer class to use. * @param job The current job to adjust. Make sure the passed job is * carrying all necessary configuration. * @param partitioner Partitioner to use. Pass <code>null</code> to use * default partitioner. * @param quorumAddress Distant cluster to write to; default is null for * output to the cluster that is designated in <code>hbase-site.xml</code>. * Set this String to the zookeeper ensemble of an alternate remote cluster * when you would have the reduce write a cluster that is other than the * default; e.g. copying tables between clusters, the source would be * designated by <code>hbase-site.xml</code> and this param would have the * ensemble address of the remote cluster. The format to pass is particular. * Pass <code> <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent> * </code> such as <code>server,server2,server3:2181:/hbase</code>. * @param serverClass redefined hbase.regionserver.class * @param serverImpl redefined hbase.regionserver.client * @param addDependencyJars upload HBase jars and jars for any of the configured * job classes via the distributed cache (tmpjars). * @throws IOException When determining the region count fails. * @throws SQLException */ public static void initTableReducerJob(String table, Class<? extends Reducer> reducer,Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl,boolean addDependencyJars,Class<? extends OutputFormat> outputformatClass) throws IOException{ Configuration conf=job.getConfiguration(); job.setOutputFormatClass(outputformatClass); if(reducer!=null) job.setReducerClass(reducer); conf.set(MRConstants.SPLICE_OUTPUT_TABLE_NAME,table); if(sqlUtil==null) sqlUtil=SMSQLUtil.getInstance(conf.get(MRConstants.SPLICE_JDBC_STR)); // If passed a quorum/ensemble address, pass it on to TableOutputFormat. String hbaseTableID=null; try{ hbaseTableID=sqlUtil.getConglomID(table); }catch(SQLException e){ // TODO Auto-generated catch block e.printStackTrace(); throw new IOException(e); } conf.set(MRConstants.HBASE_OUTPUT_TABLE_NAME,table); if(quorumAddress!=null){ // Calling this will validate the format HBasePlatformUtils.validateClusterKey(quorumAddress); conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); } if(serverClass!=null && serverImpl!=null){ conf.set(TableOutputFormat.REGION_SERVER_CLASS,serverClass); conf.set(TableOutputFormat.REGION_SERVER_IMPL,serverImpl); } job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Object.class); if(partitioner==HRegionPartitioner.class){ job.setPartitionerClass(HRegionPartitioner.class); // TODO Where are the keys? int regions=getReduceNumberOfRegions(hbaseTableID); if(job.getNumReduceTasks()>regions){ job.setNumReduceTasks(regions); } }else if(partitioner!=null){ job.setPartitionerClass(partitioner); } if(addDependencyJars){ addDependencyJars(job); } //initCredentials(job); }
public void setOutputTable(String table) { getConf().set(TableOutputFormat.OUTPUT_TABLE, table); }
public String getOutputTable() { return getConf().get(TableOutputFormat.OUTPUT_TABLE); }
/** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * * @param table The Splice output table. * @param reducer The reducer class to use. * @param job The current job to adjust. Make sure the passed job is * carrying all necessary HBase configuration. * @param partitioner Partitioner to use. Pass <code>null</code> to use * default partitioner. * @param quorumAddress Distant cluster to write to; default is null for * output to the cluster that is designated in <code>hbase-site.xml</code>. * Set this String to the zookeeper ensemble of an alternate remote cluster * when you would have the reduce write a cluster that is other than the * default; e.g. copying tables between clusters, the source would be * designated by <code>hbase-site.xml</code> and this param would have the * ensemble address of the remote cluster. The format to pass is particular. * Pass <code> <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent> * </code> such as <code>server,server2,server3:2181:/hbase</code>. * @param serverClass redefined hbase.regionserver.class * @param serverImpl redefined hbase.regionserver.client * @throws IOException When determining the region count fails. * @throws SQLException */ public static void initTableReducerJob(String table, Class<? extends Reducer> reducer,Job job, Class partitioner,String quorumAddress,String serverClass, String serverImpl) throws IOException, SQLException{ initTableReducerJob(table,reducer,job,partitioner,quorumAddress, serverClass,serverImpl,true,TableOutputFormat.class); }