private void addMultipleCounts(final Map<RowKey, List<CountCellIncrementHolder>> rowChanges) { LOGGER.trace(() -> String.format("addMultipleCounts called for %s rows", rowChanges.size())); // create an action for each row we have data for final List<Mutation> actions = rowChanges.entrySet().stream() .map(entry -> createIncrementOperation(entry.getKey(), entry.getValue())) .collect(Collectors.toList()); final Object[] results = null; // don't care about what is written to results as we are doing puts send the mutations to HBase // long startTime = System.currentTimeMillis(); doBatch(actions, results); LOGGER.trace(() -> String.format("%s puts sent to HBase", actions.size())); // LOGGER.info("Sent %s ADDs to HBase from thread %s in %s ms", // cellQualifiersFromBuffer.size(), // Thread.currentThread().getName(), (System.currentTimeMillis() - // startTime)); }
/** * Adds the mutations to labels region and set the results to the finalOpStatus. finalOpStatus * might have some entries in it where the OpStatus is FAILURE. We will leave those and set in * others in the order. * @param mutations * @param finalOpStatus * @return whether we need a ZK update or not. */ private boolean mutateLabelsRegion(List<Mutation> mutations, OperationStatus[] finalOpStatus) throws IOException { OperationStatus[] opStatus = this.labelsRegion.batchMutate(mutations .toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, HConstants.NO_NONCE); int i = 0; boolean updateZk = false; for (OperationStatus status : opStatus) { // Update the zk when atleast one of the mutation was added successfully. updateZk = updateZk || (status.getOperationStatusCode() == OperationStatusCode.SUCCESS); for (; i < finalOpStatus.length; i++) { if (finalOpStatus[i] == null) { finalOpStatus[i] = status; break; } } } return updateZk; }
/** * Writes an action (Put or Delete) to the specified table. * * @param tableName * the table being updated. * @param action * the update, either a put or a delete. * @throws IllegalArgumentException * if the action is not a put or a delete. */ @Override public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException { BufferedMutator mutator = getBufferedMutator(tableName); // The actions are not immutable, so we defensively copy them if (action instanceof Put) { Put put = new Put((Put) action); put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL : Durability.SKIP_WAL); mutator.mutate(put); } else if (action instanceof Delete) { Delete delete = new Delete((Delete) action); mutator.mutate(delete); } else throw new IllegalArgumentException( "action must be either Delete or Put"); }
/** * Create a protocol buffer MultiRequest for row mutations. * Does not propagate Action absolute position. Does not set atomic action on the created * RegionAtomic. Caller should do that if wanted. * @param regionName * @param rowMutations * @return a data-laden RegionMutation.Builder * @throws IOException */ public static RegionAction.Builder buildRegionAction(final byte [] regionName, final RowMutations rowMutations) throws IOException { RegionAction.Builder builder = getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName); ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); for (Mutation mutation: rowMutations.getMutations()) { MutationType mutateType = null; if (mutation instanceof Put) { mutateType = MutationType.PUT; } else if (mutation instanceof Delete) { mutateType = MutationType.DELETE; } else { throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + mutation.getClass().getName()); } mutationBuilder.clear(); MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder); actionBuilder.clear(); actionBuilder.setMutation(mp); builder.addAction(actionBuilder.build()); } return builder; }
@Override public void preProcess(HRegion region, WALEdit walEdit) throws IOException { RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); if (coprocessorHost != null) { for (Mutation m : mutations) { if (m instanceof Put) { if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { // by pass everything return; } } else if (m instanceof Delete) { Delete d = (Delete) m; region.prepareDelete(d); if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) { // by pass everything return; } } } } }
@Override public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { preMergeBeforePONRCalled = true; RegionServerCoprocessorEnvironment environment = ctx.getEnvironment(); HRegionServer rs = (HRegionServer) environment.getRegionServerServices(); List<Region> onlineRegions = rs.getOnlineRegions(TableName.valueOf("testRegionServerObserver_2")); rmt = (RegionMergeTransactionImpl) new RegionMergeTransactionFactory(rs.getConfiguration()) .create(onlineRegions.get(0), onlineRegions.get(1), true); if (!rmt.prepare(rs)) { LOG.error("Prepare for the region merge of table " + onlineRegions.get(0).getTableDesc().getNameAsString() + " failed. So returning null. "); ctx.bypass(); return; } mergedRegion = rmt.stepsBeforePONR(rs, rs, false); rmt.prepareMutationsForMerge(mergedRegion.getRegionInfo(), regionA.getRegionInfo(), regionB.getRegionInfo(), rs.getServerName(), metaEntries, regionA.getTableDesc().getRegionReplication()); MetaTableAccessor.mutateMetaTable(rs.getConnection(), metaEntries); }
private void recordFailure(final Mutation m, final long keyBase, final long start, IOException e) { failedKeySet.add(keyBase); String exceptionInfo; if (e instanceof RetriesExhaustedWithDetailsException) { RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; exceptionInfo = aggEx.getExhaustiveDescription(); } else { StringWriter stackWriter = new StringWriter(); PrintWriter pw = new PrintWriter(stackWriter); e.printStackTrace(pw); pw.flush(); exceptionInfo = StringUtils.stringifyException(e); } LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo); }
@Override public Mutation beforeMutate(long rowkeyBase, Mutation m) throws IOException { if (!(m instanceof Delete)) { if (userNames != null && userNames.length > 0) { int mod = ((int) rowkeyBase % this.userNames.length); if (((int) rowkeyBase % specialPermCellInsertionFactor) == 0) { // These cells cannot be read back when running as user userName[mod] if (LOG.isTraceEnabled()) { LOG.trace("Adding special perm " + rowkeyBase); } m.setACL(userNames[mod], new Permission(Permission.Action.WRITE)); } else { m.setACL(userNames[mod], new Permission(Permission.Action.READ)); } } } return m; }
/** * Create a protocol buffer MultiRequest for row mutations that does not hold data. Data/Cells * are carried outside of protobuf. Return references to the Cells in <code>cells</code> param. * Does not propagate Action absolute position. Does not set atomic action on the created * RegionAtomic. Caller should do that if wanted. * @param regionName * @param rowMutations * @param cells Return in here a list of Cells as CellIterable. * @return a region mutation minus data * @throws IOException */ public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName, final RowMutations rowMutations, final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder, final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder) throws IOException { for (Mutation mutation: rowMutations.getMutations()) { MutationType type = null; if (mutation instanceof Put) { type = MutationType.PUT; } else if (mutation instanceof Delete) { type = MutationType.DELETE; } else { throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + mutation.getClass().getName()); } mutationBuilder.clear(); MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder); cells.add(mutation); actionBuilder.clear(); regionActionBuilder.addAction(actionBuilder.setMutation(mp).build()); } return regionActionBuilder; }
public TablestoreBufferedMutator(TablestoreConnection connection, TableName tableName) { this.tableName = tableName; this.connection = connection; writeBuffer = new ConcurrentLinkedQueue<Mutation>(); this.writeBufferSize = this.connection.getConfiguration().getLong("hbase.client.write.buffer", 2097152); this.currentWriteBufferSize = 0; this.columnMapping = new ColumnMapping(tableName.getNameAsString(), this.connection.getConfiguration()); this.adapter = OTSAdapter.getInstance(this.connection.getTablestoreConf()); this.clearBufferOnFail = true; }
@Override public void mutate(List<? extends Mutation> list) throws IOException { List<OPut> flushPuts = new ArrayList<OPut>(); List<ODelete> flushDeletes = new ArrayList<ODelete>(); for (Mutation mutation : list) { writeBuffer.add(mutation); currentWriteBufferSize += mutation.heapSize(); } if (currentWriteBufferSize >= writeBufferSize) { extractOMutation(flushPuts, flushDeletes); } flush(flushPuts, flushDeletes); }
private void extractOMutation(List<OPut> flushPuts, List<ODelete> flushDeletes) { for (Mutation mutation : writeBuffer) { if (mutation instanceof Put) { flushPuts.add(ElementConvertor.toOtsPut((Put)mutation, this.columnMapping)); } else if (mutation instanceof Delete) { flushDeletes.add(ElementConvertor.toOtsDelete((Delete)mutation, this.columnMapping)); } } writeBuffer.clear(); currentWriteBufferSize = 0; }
private void updateMutationAddingTags(final Mutation m) { byte[] attribute = m.getAttribute("visibility"); byte[] cf = null; List<Cell> updatedCells = new ArrayList<Cell>(); if (attribute != null) { for (List<? extends Cell> edits : m.getFamilyCellMap().values()) { for (Cell cell : edits) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); if (cf == null) { cf = kv.getFamily(); } Tag tag = new Tag((byte) 1, attribute); List<Tag> tagList = new ArrayList<Tag>(); tagList.add(tag); KeyValue newKV = new KeyValue(kv.getRow(), 0, kv.getRowLength(), kv.getFamily(), 0, kv.getFamilyLength(), kv.getQualifier(), 0, kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.codeToType(kv.getType()), kv.getValue(), 0, kv.getValueLength(), tagList); ((List<Cell>) updatedCells).add(newKV); } } m.getFamilyCellMap().remove(cf); // Update the family map m.getFamilyCellMap().put(cf, updatedCells); } }
@Override public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx, MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { List<Tag> tags = Lists.newArrayList(); CellVisibility cellVisibility = null; try { cellVisibility = mutation.getCellVisibility(); } catch (DeserializationException e) { throw new IOException(e); } if (cellVisibility == null) { return newCell; } // Prepend new visibility tags to a new list of tags for the cell // Don't check user auths for labels with Mutations when the user is super user boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser()); tags.addAll(this.visibilityLabelService.createVisibilityExpTags(cellVisibility.getExpression(), true, authCheck)); // Save an object allocation where we can if (newCell.getTagsLength() > 0) { // Carry forward all other tags Iterator<Tag> tagsItr = CellUtil.tagsIterator(newCell.getTagsArray(), newCell.getTagsOffset(), newCell.getTagsLength()); while (tagsItr.hasNext()) { Tag tag = tagsItr.next(); if (tag.getType() != TagType.VISIBILITY_TAG_TYPE && tag.getType() != TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE) { tags.add(tag); } } } Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags)); return rewriteCell; }
private void checkForReservedTagPresence(User user, Mutation m) throws IOException { // No need to check if we're not going to throw if (!authorizationEnabled) { m.setAttribute(TAG_CHECK_PASSED, TRUE); return; } // Superusers are allowed to store cells unconditionally. if (Superusers.isSuperUser(user)) { m.setAttribute(TAG_CHECK_PASSED, TRUE); return; } // We already checked (prePut vs preBatchMutation) if (m.getAttribute(TAG_CHECK_PASSED) != null) { return; } for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { Cell cell = cellScanner.current(); if (cell.getTagsLength() > 0) { Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); while (tagsItr.hasNext()) { if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) { throw new AccessDeniedException("Mutation contains cell with reserved type tag"); } } } } m.setAttribute(TAG_CHECK_PASSED, TRUE); }
@Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { if (cellFeaturesEnabled && !compatibleEarlyTermination) { TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); for (int i = 0; i < miniBatchOp.size(); i++) { Mutation m = miniBatchOp.getOperation(i); if (m.getAttribute(CHECK_COVERING_PERM) != null) { // We have a failure with table, cf and q perm checks and now giving a chance for cell // perm check OpType opType; if (m instanceof Put) { checkForReservedTagPresence(getActiveUser(), m); opType = OpType.PUT; } else { opType = OpType.DELETE; } AuthResult authResult = null; if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) { authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(), Action.WRITE, table, m.getFamilyCellMap()); } else { authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(), Action.WRITE, table, m.getFamilyCellMap()); } logResult(authResult); if (authorizationEnabled && !authResult.isAllowed()) { throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString()); } } } } }
@Override public void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { if (ops.incrementAndGet() % 20000 == 0) { LOG.info("Wrote " + ops.get() + " times in region " + regionName); } for (int i = 0; i < miniBatchOp.size(); i++) { miniBatchOp.setOperationStatus(i, new OperationStatus(HConstants.OperationStatusCode.SUCCESS)); } c.bypass(); }
public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) { this.type = type; this.mutation = mutation; if(this.mutation.getDurability() != Durability.SKIP_WAL) { // using ASYNC_WAL for relay this.mutation.setDurability(Durability.ASYNC_WAL); } this.nonceGroup = nonceGroup; this.nonce = nonce; }
public static long calculateMutationSize(final Mutation mutation) { long size = 0; for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) { for (Cell cell : entry.getValue()) { size += KeyValueUtil.length(cell); } } return size; }
/** * Writes a key/value pair into the table. * * @param key The key. * @param value The value. * @throws IOException When writing fails. * @see RecordWriter#write(Object, Object) */ @Override public void write(KEY key, Mutation value) throws IOException { if (!(value instanceof Put) && !(value instanceof Delete)) { throw new IOException("Pass a Delete or a Put"); } mutator.mutate(value); }
@Override public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) throws IOException { super.setup(context); try { this.keysToFind = readKeysToSearch(context.getConfiguration()); LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); } catch (InterruptedException e) { throw new InterruptedIOException(e.toString()); } }
@Override public void serialize(Mutation mutation) throws IOException { MutationType type; if (mutation instanceof Put) { type = MutationType.PUT; } else if (mutation instanceof Delete) { type = MutationType.DELETE; } else { throw new IllegalArgumentException("Only Put and Delete are supported"); } ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out); }
@Override public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); return new MultiTableRecordWriter(HBaseConfiguration.create(conf), conf.getBoolean(WAL_PROPERTY, WAL_ON)); }
public boolean preMergeCommit(final HRegion regionA, final HRegion regionB, final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException { return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { @Override public void call(RegionServerObserver oserver, ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { oserver.preMergeCommit(ctx, regionA, regionB, metaEntries); } }); }
/** * Create a protocol buffer MutateRequest for conditioned row mutations * * @param regionName * @param row * @param family * @param qualifier * @param comparator * @param compareType * @param rowMutations * @return a mutate request * @throws IOException */ public static ClientProtos.MultiRequest buildMutateRequest( final byte[] regionName, final byte[] row, final byte[] family, final byte [] qualifier, final ByteArrayComparable comparator, final CompareType compareType, final RowMutations rowMutations) throws IOException { RegionAction.Builder builder = getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName); builder.setAtomic(true); ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); Condition condition = buildCondition( row, family, qualifier, comparator, compareType); for (Mutation mutation: rowMutations.getMutations()) { MutationType mutateType = null; if (mutation instanceof Put) { mutateType = MutationType.PUT; } else if (mutation instanceof Delete) { mutateType = MutationType.DELETE; } else { throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + mutation.getClass().getName()); } mutationBuilder.clear(); MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder); actionBuilder.clear(); actionBuilder.setMutation(mp); builder.addAction(actionBuilder.build()); } ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) .setCondition(condition).build(); return request; }
@Override public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { // TODO we should return back the status of this hook run to HRegion so that those Mutations // with OperationStatus as SUCCESS or FAILURE should not get applied to memstore. RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); OperationStatus[] opStatus = new OperationStatus[mutations.size()]; Arrays.fill(opStatus, OperationStatus.NOT_RUN); WALEdit[] walEditsFromCP = new WALEdit[mutations.size()]; if (coprocessorHost != null) { miniBatch = new MiniBatchOperationInProgress<Mutation>( mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0, mutations.size()); coprocessorHost.preBatchMutate(miniBatch); } // Apply edits to a single WALEdit for (int i = 0; i < mutations.size(); i++) { if (opStatus[i] == OperationStatus.NOT_RUN) { // Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook // itself. No need to apply again to region if (walEditsFromCP[i] != null) { // Add the WALEdit created by CP hook for (Cell walCell : walEditsFromCP[i].getCells()) { walEdit.add(walCell); } } } } }
/** * Convert a MutateRequest to Mutation * * @param proto the protocol buffer Mutate to convert * @return the converted Mutation * @throws IOException */ public static Mutation toMutation(final MutationProto proto) throws IOException { MutationType type = proto.getMutateType(); if (type == MutationType.APPEND) { return toAppend(proto, null); } if (type == MutationType.DELETE) { return toDelete(proto, null); } if (type == MutationType.PUT) { return toPut(proto, null); } throw new IOException("Unknown mutation type " + type); }
@Override public Durability useDurability() { // return true when at least one mutation requested a WAL flush (default) Durability durability = Durability.USE_DEFAULT; for (Mutation m : mutations) { if (m.getDurability().ordinal() > durability.ordinal()) { durability = m.getDurability(); } } return durability; }
private void offlineParentInMetaAndputMetaEntries(HConnection hConnection, HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB, ServerName serverName, List<Mutation> metaEntries, int regionReplication) throws IOException { List<Mutation> mutations = metaEntries; HRegionInfo copyOfParent = new HRegionInfo(parent); copyOfParent.setOffline(true); copyOfParent.setSplit(true); //Put for parent Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent); MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB); mutations.add(putParent); //Puts for daughters Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA); Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB); addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine. addLocation(putB, serverName, 1); mutations.add(putA); mutations.add(putB); // Add empty locations for region replicas of daughters so that number of replicas can be // cached whenever the primary region is looked up from meta for (int i = 1; i < regionReplication; i++) { addEmptyLocation(putA, i); addEmptyLocation(putB, i); } MetaTableAccessor.mutateMetaTable(hConnection, mutations); }
private void mergeRegionsAndPutMetaEntries(HConnection hConnection, HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, ServerName serverName, List<Mutation> metaEntries, int regionReplication) throws IOException { prepareMutationsForMerge(mergedRegion, regionA, regionB, serverName, metaEntries, regionReplication); MetaTableAccessor.mutateMetaTable(hConnection, metaEntries); }
public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, ServerName serverName, List<Mutation> mutations, int regionReplication) throws IOException { HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); // use the maximum of what master passed us vs local time. long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime); // Put for parent Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged, time); putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray()); putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray()); mutations.add(putOfMerged); // Deletes for merging regions Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA, time); Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB, time); mutations.add(deleteA); mutations.add(deleteB); // Add empty locations for region replicas of the merged region so that number of replicas // can be cached whenever the primary region is looked up from meta for (int i = 1; i < regionReplication; i++) { addEmptyLocation(putOfMerged, i); } // The merged is a new region, openSeqNum = 1 is fine. addLocation(putOfMerged, serverName, 1); }
/** * @return Carry forward the TTL tag if the increment is carrying one */ private static List<Tag> carryForwardTTLTag(final List<Tag> tagsOrNull, final Mutation mutation) { long ttl = mutation.getTTL(); if (ttl == Long.MAX_VALUE) return tagsOrNull; List<Tag> tags = tagsOrNull; // If we are making the array in here, given we are the last thing checked, // we'll be only thing // in the array so set its size to '1' (I saw this being done in earlier // version of // tag-handling). if (tags == null) tags = new ArrayList<Tag>(1); tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); return tags; }
@Override public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) throws IOException { // As it stands, this is used for 3 things // * batchMutate with single mutation - put/delete, separate or from // checkAndMutate. // * coprocessor calls (see ex. BulkDeleteEndpoint). // So nonces are not really ever used by HBase. They could be by coprocs, // and checkAnd... return batchMutate(new MutationBatch(mutations, nonceGroup, nonce)); }
private void doBatchMutate(Mutation mutation) throws IOException { // Currently this is only called for puts and deletes, so no nonces. OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation }); if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg()); } }
/** * Possibly rewrite incoming cell tags. */ void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) { // Check if we have any work to do and early out otherwise // Update these checks as more logic is added here if (m.getTTL() == Long.MAX_VALUE) { return; } // From this point we know we have some work to do for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { List<Cell> cells = e.getValue(); assert cells instanceof RandomAccess; int listSize = cells.size(); for (int i = 0; i < listSize; i++) { Cell cell = cells.get(i); List<Tag> newTags = Tag.carryForwardTags(null, cell); newTags = carryForwardTTLTag(newTags, m); // Rewrite the cell with the updated set of tags cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), newTags)); } } }
/** * Execute the passed <code>mutations</code> against <code>hbase:meta</code> table. * @param connection connection we're using * @param mutations Puts and Deletes to execute on hbase:meta * @throws IOException */ public static void mutateMetaTable(final Connection connection, final List<Mutation> mutations) throws IOException { Table t = getMetaHTable(connection); try { t.batch(mutations); } catch (InterruptedException e) { InterruptedIOException ie = new InterruptedIOException(e.getMessage()); ie.initCause(e); throw ie; } finally { t.close(); } }
/** * @param mutation - the current mutation * @param kv - the current cell * @param byteNow - current timestamp in bytes * @param get - the get that could be used * Note that the get only does not specify the family and qualifier that should be used * @return true if default processing should be bypassed * @exception IOException * Exception */ public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv, final byte[] byteNow, final Get get) throws IOException { return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { @Override public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get); } }); }
/** * @param miniBatchOp * @return true if default processing should be bypassed * @throws IOException */ public boolean preBatchMutate( final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { @Override public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { oserver.preBatchMutate(ctx, miniBatchOp); } }); }
/** * @param miniBatchOp * @throws IOException */ public void postBatchMutate( final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { @Override public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { oserver.postBatchMutate(ctx, miniBatchOp); } }); }