/** * Curator支持事务,一组crud操作同生同灭 * @param client */ private static void transation(CuratorFramework client) { try { // 开启事务 CuratorTransaction transaction = client.inTransaction(); Collection<CuratorTransactionResult> results = transaction.create() .forPath("/root/transation", "transation".getBytes()).and().create() .forPath("/root/transation2", "transation2".getBytes()).and() .delete().forPath("/root/transation").and() .delete().forPath("/root/transation2").and().commit(); for (CuratorTransactionResult result : results) { System.out.println(result.getForPath() + " - " + result.getType()); } }catch (Exception e){ log.error("transation exception ", e); } }
@Override public void delete(String path) throws DataSourceConnectorException { String error = "Can't delete data for path: " + path + " in transaction"; transaction = executeOrFailWithError( () -> { CuratorTransaction resultingTransaction = (transaction == null) ? getConnectedClientOrFailFast().inTransaction() : transaction; return resultingTransaction.delete().forPath(path).and(); }, error); }
private CuratorTransaction saveInTransaction(CuratorTransaction transaction, byte[] data, String path) throws Exception { CuratorTransaction resultingTransaction = (transaction == null) ? getConnectedClientOrFailFast().inTransaction() : transaction; if (isNodeExistsInternal(path)) { resultingTransaction = resultingTransaction.setData().forPath(path, data).and(); } else { String parentPath = StringUtils.substringBeforeLast(path, "/"); if (!isNodeExistsInternal(parentPath)) { resultingTransaction = resultingTransaction.create().forPath(parentPath).and(); } resultingTransaction = resultingTransaction.create().forPath(path, data).and(); } return resultingTransaction; }
SafeTransaction() throws Exception { CuratorTransaction transaction = curatorFramework.inTransaction(); transactionFinal = transaction.create() .withMode(CreateMode.PERSISTENT).withACL(zkAcl) .forPath(fencingNodePath, new byte[0]).and(); }
protected void doWriteChangeLogPartitionMapping(Map<TaskName, Integer> mapping) throws Exception { CuratorTransaction transaction = curator.inTransaction(); boolean needTransaction = false; for (Map.Entry<TaskName, Integer> entry : mapping.entrySet()) { Integer partitionNumber = entry.getValue(); TaskName tn = entry.getKey(); String clpnPath = getChangelogPartitionNumberPath(tn); byte[] data = intToBytes(partitionNumber); boolean created = createChangeLogPartitionPathIfNecessary(clpnPath, data); if (!created) {//create would have written with the data, but since we didn't create, we have to set it now: transaction.setData().forPath(clpnPath, data); needTransaction = true; log.debug("Appended changelog partition mapping {}={} to current transaction.", tn, partitionNumber); } } if (needTransaction) { ((CuratorTransactionFinal) transaction).commit(); } log.info("Wrote changelog partition mappings {}", mapping); }
@Override public CuratorTransaction and(CuratorTransaction transaction) throws Exception { if (data.isPresent()) { return transaction.create().forPath(path, data.get()).and(); } else { return transaction.create().forPath(path).and(); } }
/** * Creates all the given paths in a single transaction. Any paths which already exists are ignored. */ public void createAtomically(Path... paths) { try { CuratorTransaction transaction = framework().inTransaction(); for (Path path : paths) { if ( ! exists(path)) { transaction = transaction.create().forPath(path.getAbsolute(), new byte[0]).and(); } } ((CuratorTransactionFinal)transaction).commit(); } catch (Exception e) { throw new RuntimeException("Could not create " + Arrays.toString(paths), e); } }
@Override public CuratorTransaction inTransaction() { Preconditions.checkState(getState() == CuratorFrameworkState.STARTED, "instance must be started before calling this method"); return new CuratorTransactionImpl(this); }
public void complete(List<Long> txIds) throws Exception { Iterator<Long> iterator = txIds.iterator(); CuratorTransaction transaction = curatorFramework.inTransaction(); while (iterator.hasNext()) { Long txId = iterator.next(); transaction = transaction.delete().forPath(LIMBO_PATH + "/" + txId) .and().create().forPath(COMPLETED_PATH + "/" + txId).and(); } CuratorTransactionFinal tx = (CuratorTransactionFinal) transaction; tx.commit(); }
public void complete(List<String> partitionIds) throws Exception { Iterator<String> iterator = partitionIds.iterator(); CuratorTransaction transaction = curatorFramework.inTransaction(); while (iterator.hasNext()) { String partitionId = iterator.next(); transaction = transaction.delete().forPath(LIMBO_PATH + "/" + partitionId) .and().create().forPath(COMPLETED_PATH + "/" + partitionId).and(); } CuratorTransactionFinal tx = (CuratorTransactionFinal) transaction; tx.commit(); }
@Override public CuratorTransaction inTransaction() { return curator.inTransaction(); }
@VisibleForTesting CuratorTransaction getTransaction() { return transaction; }
@Override public CuratorTransaction and(CuratorTransaction transaction) throws Exception { return transaction.setData().forPath(path, data).and(); }
@Override public CuratorTransaction and(CuratorTransaction transaction) throws Exception { return transaction.delete().forPath(path).and(); }
@Override public CuratorTransaction inTransaction() { return new MockCuratorTransactionFinal(); }
@Override public CuratorTransaction inTransaction() throws Exception { openConnectionIfNeeded(); return new CuratorTransactionImpl(client); }
@Override public CuratorTransaction inTransaction() { return namespaceDelegate().inTransaction(); }
/** * Returns the transaction resulting from combining this operation with the input transaction * * @param transaction {@link CuratorTransaction} to append this operation to. * @return the transaction, for chaining. * @throws Exception if unable to create transaction for this operation. */ CuratorTransaction and(CuratorTransaction transaction) throws Exception;
/** * Start a transaction builder * * @return builder object * @throws Exception errors */ public CuratorTransaction inTransaction() throws Exception;
/** * Start a transaction builder * * @return builder object * @deprecated use {@link #transaction()} instead */ public CuratorTransaction inTransaction();