private void bulkLoadHFile( TableName tableName, byte[] family, byte[] qualifier, byte[][][] hfileRanges, int numRowsPerRange) throws Exception { Path familyDir = new Path(loadPath, Bytes.toString(family)); fs.mkdirs(familyDir); int hfileIdx = 0; for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; createHFile(new Path(familyDir, "hfile_"+(hfileIdx++)), family, qualifier, from, to, numRowsPerRange); } //set global read so RegionServer can move it setPermission(loadPath, FsPermission.valueOf("-rwxrwxrwx")); try (Connection conn = ConnectionFactory.createConnection(conf); HTable table = (HTable)conn.getTable(tableName)) { TEST_UTIL.waitUntilAllRegionsAssigned(tableName); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); loader.doBulkLoad(loadPath, table); } }
/** * Setup a job using BulkOutputFormat, to prepare writes to HBase tables by * writing Puts to HFiles in a non-customizable reducer, and bulk loading * these files into HBase tables, which is much faster than using * * @param verbose * @param tableNames * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException * @throws Exception */ public void doBulkLoad(boolean verbose, HTable... tables) throws IOException, ClassNotFoundException, InterruptedException, Exception { ClassTools.preLoad(LoadIncrementalHFiles.class); HDFSPath bulkLoadPath = doCreateHFiles(verbose, tables); if (bulkLoadPath != null) { // Load generated HFiles into table LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); for (int i = 0; i < tables.length; i++) { String tableName = Table.getSafeName(tables[i]); Table.loadTable(tables[i], loader, bulkLoadPath.getSubdir(tableName).getCanonicalPath()); } } else { log.info("loading failed."); } }
public void doBulkLoadSinglePut(boolean verbose, HTable table) throws IOException, ClassNotFoundException, InterruptedException, Exception { ClassTools.preLoad(LoadIncrementalHFiles.class); // setup the bulkload temp folder HDFSPath bulkLoadPath = new HDFSPath( getConfiguration(), "/tmp/" + UUID.randomUUID().toString()); if (bulkLoadPath.existsDir()) { bulkLoadPath.trash(); } // setup the job setMapOutputKeyClass(ImmutableBytesWritable.class); setMapOutputValueClass(Put.class); HFileOutputFormat2.configureIncrementalLoad(this, table); HFileOutputFormat2.setOutputPath(this, bulkLoadPath); if (waitForCompletion(verbose)) { // Load generated HFiles into table LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); loader.doBulkLoad(bulkLoadPath, table); } else { log.info("loading failed."); } }
public static void main(String[] args) throws Exception { if (args.length == 0) { System.out.println("ImportToLargerTableMain {originalHFilePath} {largeTableName}"); return; } String output = args[0]; String hTableName = args[1]; Configuration config = HBaseConfiguration.create(); HBaseConfiguration.addHbaseResources(config); HTable hTable = new HTable(config, hTableName); FileSystem hdfs = FileSystem.get(config); //Must all HBase to have write access to HFiles HFileUtils.changePermissionR(output, hdfs); LoadIncrementalHFiles load = new LoadIncrementalHFiles(config); load.doBulkLoad(new Path(output), hTable); }
@Override protected void postJobCompletion(Job job) { // If job is successful, load it into HBase try { if (job.isSuccessful()) { LoadIncrementalHFiles loader = new LoadIncrementalHFiles( getConf()); loader.doBulkLoad(outputDir, htable); System.out.println("MapReduce and bulk load successful"); } else { System.err .println("MapReduce job failed. Skipping bulk load."); } } catch (Exception e) { e.printStackTrace(); } }
@Override public void call(BulkImportPartition importPartition) throws Exception { Configuration conf = HConfiguration.unwrapDelegate(); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); FileSystem fs = FileSystem.get(URI.create(bulkImportDirectory), conf); Long conglomerateId = importPartition.getConglomerateId(); PartitionFactory tableFactory= SIDriver.driver().getTableFactory(); try(Partition partition=tableFactory.getTable(Long.toString(conglomerateId))){ Path path = new Path(importPartition.getFilePath()).getParent(); if (fs.exists(path)) { loader.doBulkLoad(path,(HTable) ((ClientPartition)partition).unwrapDelegate()); fs.delete(path, true); } else { LOG.warn("Path doesn't exist, nothing to load into this partition? " + path); } if (LOG.isDebugEnabled()) { SpliceLogUtils.debug(LOG, "Loaded file %s", path.toString()); } } }
/** * Perform the loading of Hfiles. */ @Override protected void completeImport(Job job) throws IOException, ImportException { super.completeImport(job); FileSystem fileSystem = FileSystem.get(job.getConfiguration()); // Make the bulk load files source directory accessible to the world // so that the hbase user can deal with it Path bulkLoadDir = getContext().getDestination(); setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir), FsPermission.createImmutable((short) 00777)); HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable()); // Load generated HFiles into table try { LoadIncrementalHFiles loader = new LoadIncrementalHFiles( job.getConfiguration()); loader.doBulkLoad(bulkLoadDir, hTable); } catch (Exception e) { String errorMessage = String.format("Unrecoverable error while " + "performing the bulk load of files in [%s]", bulkLoadDir.toString()); throw new ImportException(errorMessage, e); } }
@Test (timeout=300000) public void bulkLoadHFileTest() throws Exception { String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest"; TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".bulkLoadHFileTest"); Configuration conf = util.getConfiguration(); HTable table = util.createTable(tableName, new byte[][] {A, B, C}); try { verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, tableName, new Boolean[] {false, false} ); FileSystem fs = util.getTestFileSystem(); final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs); Path familyDir = new Path(dir, Bytes.toString(A)); createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A); // Bulk load new LoadIncrementalHFiles(conf).doBulkLoad(dir, table); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, tableName, new Boolean[] {true, true} ); } finally { util.deleteTable(tableName); table.close(); } }
@Test public void testBulkLoad() throws Exception { TableName tableName = TableName.valueOf("testBulkLoad"); long l = System.currentTimeMillis(); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); createTable(admin, tableName); Scan scan = createScan(); final HTable table = init(admin, l, scan, tableName); // use bulkload final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file", false); Configuration conf = TEST_UTIL.getConfiguration(); conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); bulkload.doBulkLoad(hfilePath, table); ResultScanner scanner = table.getScanner(scan); Result result = scanner.next(); result = scanAfterBulkLoad(scanner, result, "version2"); Put put0 = new Put(Bytes.toBytes("row1")); put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version3"))); table.put(put0); admin.flush(tableName); scanner = table.getScanner(scan); result = scanner.next(); while (result != null) { List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); for (KeyValue _kv : kvs) { if (Bytes.toString(_kv.getRow()).equals("row1")) { System.out.println(Bytes.toString(_kv.getRow())); System.out.println(Bytes.toString(_kv.getQualifier())); System.out.println(Bytes.toString(_kv.getValue())); Assert.assertEquals("version3", Bytes.toString(_kv.getValue())); } } result = scanner.next(); } scanner.close(); table.close(); }
private void LoadHFile2HBase(Configuration conf,String tableName,String hfile) throws Exception{ conf.set("hbase.metrics.showTableName", "false"); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); HBaseAdmin admin = new HBaseAdmin(conf); HTable table = new HTable(conf, tableName); loader.doBulkLoad(new Path(hfile), table); table.flushCommits(); table.close(); admin.close(); }
private void LoadHFile2HBase(Configuration conf, TableName tableName, String hfile) throws Exception { conf.set("hbase.metrics.showTableName", "false"); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); Admin admin = conn.getAdmin(); Table table = conn.getTable(tableName); RegionLocator rl = conn.getRegionLocator(tableName); loader.doBulkLoad(new Path(hfile), admin, table, rl); }
@Override public int run(String[] args) throws Exception { if (args.length != 3) { System.err.println("Usage: bulkload [-D" + MRJobConfig.QUEUE_NAME + "=proofofconcepts] [-D" + SKIP_INVALID_PROPERTY + "=true] [-D" + SPLIT_BITS_PROPERTY + "=8] [-D" + DEFAULT_CONTEXT_PROPERTY + "=http://new_context] [-D" + OVERRIDE_CONTEXT_PROPERTY + "=true] <input_path(s)> <output_path> <table_name>"); return -1; } TableMapReduceUtil.addDependencyJars(getConf(), NTriplesUtil.class, Rio.class, AbstractRDFHandler.class, RDFFormat.class, RDFParser.class); HBaseConfiguration.addHbaseResources(getConf()); getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, getConf().getLong(DEFAULT_TIMESTAMP_PROPERTY, System.currentTimeMillis())); Job job = Job.getInstance(getConf(), "HalyardBulkLoad -> " + args[1] + " -> " + args[2]); job.setJarByClass(HalyardBulkLoad.class); job.setMapperClass(RDFMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(RioFileInputFormat.class); job.setSpeculativeExecution(false); job.setReduceSpeculativeExecution(false); try (HTable hTable = HalyardTableUtils.getTable(getConf(), args[2], true, getConf().getInt(SPLIT_BITS_PROPERTY, 3))) { HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator()); FileInputFormat.setInputDirRecursive(job, true); FileInputFormat.setInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); if (job.waitForCompletion(true)) { if (getConf().getBoolean(TRUNCATE_PROPERTY, false)) { HalyardTableUtils.truncateTable(hTable).close(); } new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(args[1]), hTable); LOG.info("Bulk Load Completed.."); return 0; } } return -1; }
@Override public int run(String[] args) throws Exception { if (args.length != 3) { System.err.println("Usage: hiveload -D" + RDF_MIME_TYPE_PROPERTY + "='application/ld+json' [-D" + MRJobConfig.QUEUE_NAME + "=proofofconcepts] [-D" + HIVE_DATA_COLUMN_INDEX_PROPERTY + "=3] [-D" + BASE_URI_PROPERTY + "='http://my_base_uri/'] [-D" + HalyardBulkLoad.SPLIT_BITS_PROPERTY + "=8] [-D" + HalyardBulkLoad.DEFAULT_CONTEXT_PROPERTY + "=http://new_context] [-D" + HalyardBulkLoad.OVERRIDE_CONTEXT_PROPERTY + "=true] <hive_table_name> <output_path> <hbase_table_name>"); return -1; } TableMapReduceUtil.addDependencyJars(getConf(), NTriplesUtil.class, Rio.class, AbstractRDFHandler.class, RDFFormat.class, RDFParser.class); HBaseConfiguration.addHbaseResources(getConf()); getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, getConf().getLong(DEFAULT_TIMESTAMP_PROPERTY, System.currentTimeMillis())); Job job = Job.getInstance(getConf(), "HalyardHiveLoad -> " + args[1] + " -> " + args[2]); int i = args[0].indexOf('.'); HCatInputFormat.setInput(job, i > 0 ? args[0].substring(0, i) : null, args[0].substring(i + 1)); job.setJarByClass(HalyardHiveLoad.class); job.setMapperClass(HiveMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(HCatInputFormat.class); job.setSpeculativeExecution(false); job.setReduceSpeculativeExecution(false); try (HTable hTable = HalyardTableUtils.getTable(getConf(), args[2], true, getConf().getInt(HalyardBulkLoad.SPLIT_BITS_PROPERTY, 3))) { HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator()); FileInputFormat.setInputDirRecursive(job, true); FileInputFormat.setInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); if (job.waitForCompletion(true)) { new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(args[1]), hTable); LOG.info("Bulk Load Completed.."); return 0; } } return -1; }
/** * Submits the job and waits for completion. * @param job job * @param outputPath output path * @throws Exception */ private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath, TableName outputTableName, boolean skipDependencyJars) throws Exception { job.setMapperClass(getBulkMapperClass()); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); final Configuration configuration = job.getConfiguration(); try (Connection conn = ConnectionFactory.createConnection(configuration); Admin admin = conn.getAdmin(); Table table = conn.getTable(outputTableName); RegionLocator regionLocator = conn.getRegionLocator(outputTableName)) { HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); if (skipDependencyJars) { job.getConfiguration().unset("tmpjars"); } boolean status = job.waitForCompletion(true); if (!status) { LOG.error("IndexTool job failed!"); throw new Exception("IndexTool job failed: " + job.toString()); } LOG.info("Loading HFiles from {}", outputPath); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration); loader.doBulkLoad(outputPath, admin, table, regionLocator); } FileSystem.get(configuration).delete(outputPath, true); }
@Test public void bulkLoadHFileTest() throws Exception { String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest"; byte[] tableName = TEST_TABLE; Configuration conf = util.getConfiguration(); HTable table = util.createTable(tableName, new byte[][] {A, B, C}); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, tableName, new Boolean[] {false, false} ); FileSystem fs = util.getTestFileSystem(); final Path dir = util.getDataTestDir(testName).makeQualified(fs); Path familyDir = new Path(dir, Bytes.toString(A)); createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A); //Bulk load new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName)); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, tableName, new Boolean[] {true, true} ); util.deleteTable(tableName); table.close(); }
private void bulkLoadHFile( TableName tableName, byte[] family, byte[] qualifier, byte[][][] hfileRanges, int numRowsPerRange) throws Exception { Path familyDir = new Path(loadPath, Bytes.toString(family)); fs.mkdirs(familyDir); int hfileIdx = 0; for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; createHFile(new Path(familyDir, "hfile_"+(hfileIdx++)), family, qualifier, from, to, numRowsPerRange); } //set global read so RegionServer can move it setPermission(loadPath, FsPermission.valueOf("-rwxrwxrwx")); try (HTable table = (HTable)TEST_UTIL.getConnection().getTable(tableName)) { try (Admin admin = TEST_UTIL.getHBaseAdmin()) { TEST_UTIL.waitTableEnabled(admin, tableName.getName()); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); loader.doBulkLoad(loadPath, table); } } }
private void bulkLoadHFile( TableName tableName, byte[] family, byte[] qualifier, byte[][][] hfileRanges, int numRowsPerRange) throws Exception { Path familyDir = new Path(loadPath, Bytes.toString(family)); fs.mkdirs(familyDir); int hfileIdx = 0; for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; createHFile(new Path(familyDir, "hfile_"+(hfileIdx++)), family, qualifier, from, to, numRowsPerRange); } //set global read so RegionServer can move it setPermission(loadPath, FsPermission.valueOf("-rwxrwxrwx")); HTable table = new HTable(conf, tableName); try { HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); TEST_UTIL.waitTableEnabled(admin, tableName.getName()); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); loader.doBulkLoad(loadPath, table); } finally { table.close(); } }
@Test public void bulkLoadHFileTest() throws Exception { String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest"; TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".bulkLoadHFileTest"); Configuration conf = util.getConfiguration(); HTable table = util.createTable(tableName, new byte[][] {A, B, C}); try { verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, tableName, new Boolean[] {false, false} ); FileSystem fs = util.getTestFileSystem(); final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs); Path familyDir = new Path(dir, Bytes.toString(A)); createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A); //Bulk load new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName)); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, tableName, new Boolean[] {true, true} ); } finally { util.deleteTable(tableName); table.close(); } }
@Override public int run(String[] args) throws Exception { Options options = new Options(); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_HTABLE_NAME); options.addOption(OPTION_CUBE_NAME); parseOptions(options, args); String tableName = getOptionValue(OPTION_HTABLE_NAME); // e.g // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/ // end with "/" String input = getOptionValue(OPTION_INPUT_PATH); Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); FsShell shell = new FsShell(conf); int exitCode = -1; int retryCount = 10; while (exitCode != 0 && retryCount >= 1) { exitCode = shell.run(new String[] { "-chmod", "-R", "777", input }); retryCount--; Thread.sleep(5000); } if (exitCode != 0) { logger.error("Failed to change the file permissions: " + input); throw new IOException("Failed to change the file permissions: " + input); } String[] newArgs = new String[2]; newArgs[0] = input; newArgs[1] = tableName; logger.debug("Start to run LoadIncrementalHFiles"); int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), newArgs); logger.debug("End to run LoadIncrementalHFiles"); return ret; }
private void bulkLoadHFile( byte[] tableName, byte[] family, byte[] qualifier, byte[][][] hfileRanges, int numRowsPerRange) throws Exception { Path familyDir = new Path(loadPath, Bytes.toString(family)); fs.mkdirs(familyDir); int hfileIdx = 0; for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; createHFile(new Path(familyDir, "hfile_"+(hfileIdx++)), family, qualifier, from, to, numRowsPerRange); } //set global read so RegionServer can move it setPermission(loadPath, FsPermission.valueOf("-rwxrwxrwx")); HTable table = new HTable(conf, tableName); try { TEST_UTIL.waitTableAvailable(tableName, 30000); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); loader.doBulkLoad(loadPath, table); } finally { table.close(); } }
@Override public int run(String[] args) throws Exception { Options options = new Options(); try { options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_HTABLE_NAME); options.addOption(OPTION_II_NAME); parseOptions(options, args); String tableName = getOptionValue(OPTION_HTABLE_NAME); String input = getOptionValue(OPTION_INPUT_PATH); String iiname = getOptionValue(OPTION_II_NAME); FileSystem fs = FileSystem.get(getConf()); FsPermission permission = new FsPermission((short) 0777); fs.setPermission(new Path(input, IIDesc.HBASE_FAMILY), permission); int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName }); IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); IIInstance ii = mgr.getII(iiname); IISegment seg = ii.getFirstSegment(); seg.setStorageLocationIdentifier(tableName); seg.setStatus(SegmentStatusEnum.READY); mgr.updateII(ii); return hbaseExitCode; } catch (Exception e) { printUsage(options); throw e; } }
public void completeImport() throws Exception { LoadIncrementalHFiles loader = new LoadIncrementalHFiles(getConfiguration()); HTable table = new HTable(getConfiguration(), _tableName); loader.doBulkLoad(_hfilePath, table); FileSystem fs = _hfilePath.getFileSystem(getConfiguration()); fs.delete(_hfilePath, true); }
@Test public void bulkLoadHFileTest() throws Exception { String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest"; byte[] tableName = TEST_TABLE; Configuration conf = util.getConfiguration(); HTable table = util.createTable(tableName, new byte[][] {A, B, C}); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, tableName, new Boolean[] {false, false} ); FileSystem fs = util.getTestFileSystem(); final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs); Path familyDir = new Path(dir, Bytes.toString(A)); createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A); //Bulk load new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName)); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, tableName, new Boolean[] {true, true} ); util.deleteTable(tableName); table.close(); }
@Test public void testBulkLoadNativeHFile() throws Exception { TableName tableName = TableName.valueOf("testBulkLoadNativeHFile"); long l = System.currentTimeMillis(); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); createTable(admin, tableName); Scan scan = createScan(); final HTable table = init(admin, l, scan, tableName); // use bulkload final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/", "/temp/testBulkLoadNativeHFile/col/file", true); Configuration conf = TEST_UTIL.getConfiguration(); conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); bulkload.doBulkLoad(hfilePath, table); ResultScanner scanner = table.getScanner(scan); Result result = scanner.next(); // We had 'version0', 'version1' for 'row1,col:q' in the table. // Bulk load added 'version2' scanner should be able to see 'version2' result = scanAfterBulkLoad(scanner, result, "version2"); Put put0 = new Put(Bytes.toBytes("row1")); put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version3"))); table.put(put0); admin.flush(tableName); scanner = table.getScanner(scan); result = scanner.next(); while (result != null) { List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); for (KeyValue _kv : kvs) { if (Bytes.toString(_kv.getRow()).equals("row1")) { System.out.println(Bytes.toString(_kv.getRow())); System.out.println(Bytes.toString(_kv.getQualifier())); System.out.println(Bytes.toString(_kv.getValue())); Assert.assertEquals("version3", Bytes.toString(_kv.getValue())); } } result = scanner.next(); } scanner.close(); table.close(); }
@Override public int run(String[] args) throws Exception { if (args.length != 3) { System.err.println("Usage: bulkupdate [-D" + MRJobConfig.QUEUE_NAME + "=proofofconcepts] <input_file_with_SPARQL_queries> <output_path> <table_name>"); return -1; } TableMapReduceUtil.addDependencyJars(getConf(), HalyardExport.class, NTriplesUtil.class, Rio.class, AbstractRDFHandler.class, RDFFormat.class, RDFParser.class, HTable.class, HBaseConfiguration.class, AuthenticationProtos.class, Trace.class, Gauge.class); HBaseConfiguration.addHbaseResources(getConf()); getConf().setStrings(TABLE_NAME_PROPERTY, args[2]); getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, getConf().getLong(DEFAULT_TIMESTAMP_PROPERTY, System.currentTimeMillis())); Job job = Job.getInstance(getConf(), "HalyardBulkUpdate -> " + args[1] + " -> " + args[2]); NLineInputFormat.setNumLinesPerSplit(job, 1); job.setJarByClass(HalyardBulkUpdate.class); job.setMapperClass(SPARQLMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(NLineInputFormat.class); job.setSpeculativeExecution(false); job.setReduceSpeculativeExecution(false); try (HTable hTable = HalyardTableUtils.getTable(getConf(), args[2], false, 0)) { HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator()); FileInputFormat.setInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); if (job.waitForCompletion(true)) { new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(args[1]), hTable); LOG.info("Bulk Update Completed.."); return 0; } } return -1; }