/** * Curator支持事务,一组crud操作同生同灭 * @param client */ private static void transation(CuratorFramework client) { try { // 开启事务 CuratorTransaction transaction = client.inTransaction(); Collection<CuratorTransactionResult> results = transaction.create() .forPath("/root/transation", "transation".getBytes()).and().create() .forPath("/root/transation2", "transation2".getBytes()).and() .delete().forPath("/root/transation").and() .delete().forPath("/root/transation2").and().commit(); for (CuratorTransactionResult result : results) { System.out.println(result.getForPath() + " - " + result.getType()); } }catch (Exception e){ log.error("transation exception ", e); } }
@Test void test() throws Exception { curatorFramework.create().forPath(PARENT_PATH); String node1 = makePath(PARENT_PATH, "node1"); String node2 = makePath(PARENT_PATH, "node2"); curatorFramework.create().forPath(node1); ZkBasedTreeNodeResource<List<String>> zkNode = ZkBasedTreeNodeResource .<List<String>> newBuilder() // .curator(curatorFramework2) // .path(PARENT_PATH) // .keysFactoryEx(this::factory) // .build(); System.out.println("first access:" + zkNode.get()); Collection<CuratorTransactionResult> commit = curatorFramework.inTransaction() // .delete().forPath(node1) // .and().create().forPath(node1, "1".getBytes()) // .and().create().forPath(node2, "2".getBytes()) // .and().commit(); System.out.println("result:" + commit); sleepUninterruptibly(1, SECONDS); System.out.println("second access:" + zkNode.get()); }
@Override public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData) throws Exception { try { final TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Background"); AsyncCallback.MultiCallback callback = new AsyncCallback.MultiCallback() { @Override public void processResult(int rc, String path, Object ctx, List<OpResult> opResults) { trace.commit(); List<CuratorTransactionResult> curatorResults = (opResults != null) ? CuratorTransactionImpl.wrapResults(client, opResults, operationAndData.getData()) : null; CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.TRANSACTION, rc, path, null, ctx, null, null, null, null, null, curatorResults); client.processBackgroundOperation(operationAndData, event); } }; client.getZooKeeper().multi(operationAndData.getData(), callback, backgrounding.getContext()); } catch ( Throwable e ) { backgrounding.checkError(e, null); } }
private List<CuratorTransactionResult> forOperationsInForeground(final CuratorMultiTransactionRecord record) throws Exception { TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Foreground"); List<OpResult> responseData = RetryLoop.callWithRetry ( client.getZookeeperClient(), new Callable<List<OpResult>>() { @Override public List<OpResult> call() throws Exception { return client.getZooKeeper().multi(record); } } ); trace.commit(); return CuratorTransactionImpl.wrapResults(client, responseData, record); }
public static Collection<CuratorTransactionResult> transaction(CuratorFramework client) throws Exception { // this example shows how to use ZooKeeper's transactions CuratorOp createOp = client.transactionOp().create().forPath("/a/path", "some data".getBytes()); CuratorOp setDataOp = client.transactionOp().setData().forPath("/another/path", "other data".getBytes()); CuratorOp deleteOp = client.transactionOp().delete().forPath("/yet/another/path"); Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp, setDataOp, deleteOp); for ( CuratorTransactionResult result : results ) { System.out.println(result.getForPath() + " - " + result.getType()); } return results; }
public static Collection<CuratorTransactionResult> transaction(CuratorFramework client) throws Exception { // // this example shows how to use ZooKeeper's new transactions // Collection<CuratorTransactionResult> results = client.inTransaction().create().forPath("/a/path", "some data".getBytes()) // .and().setData().forPath("/another/path", "other data".getBytes()) // .and().delete().forPath("/yet/another/path") // .and().commit(); // IMPORTANT! //inTransaction is deprecated. use transaction() instead List<CuratorTransactionResult> results = client.transaction().forOperations( client.transactionOp().create().forPath("/a/path", "some data".getBytes()), client.transactionOp().setData().forPath("/another/path", "other data".getBytes()), client.transactionOp().delete().forPath("/yet/another/path")); // called for (CuratorTransactionResult result : results) { System.out.println(result.getForPath() + " - " + result.getType()); } return results; }
@Override public void commit() throws DataSourceConnectorException { String error = "Can't commit transaction"; executeOrFailWithError( () -> { if (transaction instanceof CuratorTransactionFinal) { Collection<CuratorTransactionResult> result = ((CuratorTransactionFinal)transaction).commit(); log.info("Transaction is committed. Result: {}", result); } else { log.info("Transaction cancelled: nothing to save"); } return null; }, error); }
@Override public Collection<CuratorTransactionResult> commit() throws Exception { if (exceptionToThrow != null) { throw exceptionToThrow; } return Collections.emptyList(); }
@Override public Collection<CuratorTransactionResult> commit() throws Exception { fileSystem.replaceRoot(newRoot); committed = true; delayedListener.commit(); return null; // TODO }
CuratorEventImpl(CuratorFrameworkImpl client, CuratorEventType type, int resultCode, String path, String name, Object context, Stat stat, byte[] data, List<String> children, WatchedEvent watchedEvent, List<ACL> aclList, List<CuratorTransactionResult> opResults) { this.type = type; this.resultCode = resultCode; this.opResults = (opResults != null) ? ImmutableList.copyOf(opResults) : null; this.path = client.unfixForNamespace(path); this.name = name; this.context = context; this.stat = stat; this.data = data; this.children = children; this.watchedEvent = (watchedEvent != null) ? new NamespaceWatchedEvent(client, watchedEvent) : null; this.aclList = (aclList != null) ? ImmutableList.copyOf(aclList) : null; }
@Test public void testWithNamespace() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).namespace("galt").build(); try { client.start(); CuratorOp createOp1 = client.transactionOp().create().forPath("/foo", "one".getBytes()); CuratorOp createOp2 = client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test-", "one".getBytes()); CuratorOp setDataOp = client.transactionOp().setData().forPath("/foo", "two".getBytes()); CuratorOp createOp3 = client.transactionOp().create().forPath("/foo/bar"); CuratorOp deleteOp = client.transactionOp().delete().forPath("/foo/bar"); Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp1, createOp2, setDataOp, createOp3, deleteOp); Assert.assertTrue(client.checkExists().forPath("/foo") != null); Assert.assertTrue(client.usingNamespace(null).checkExists().forPath("/galt/foo") != null); Assert.assertEquals(client.getData().forPath("/foo"), "two".getBytes()); Assert.assertTrue(client.checkExists().forPath("/foo/bar") == null); CuratorTransactionResult ephemeralResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/test-")); Assert.assertNotNull(ephemeralResult); Assert.assertNotEquals(ephemeralResult.getResultPath(), "/test-"); Assert.assertTrue(ephemeralResult.getResultPath().startsWith("/test-")); } finally { CloseableUtils.closeQuietly(client); } }
@Test public void testBasic() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); try { client.start(); CuratorOp createOp1 = client.transactionOp().create().forPath("/foo"); CuratorOp createOp2 = client.transactionOp().create().forPath("/foo/bar", "snafu".getBytes()); Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp1, createOp2); Assert.assertTrue(client.checkExists().forPath("/foo/bar") != null); Assert.assertEquals(client.getData().forPath("/foo/bar"), "snafu".getBytes()); CuratorTransactionResult fooResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo")); CuratorTransactionResult fooBarResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo/bar")); Assert.assertNotNull(fooResult); Assert.assertNotNull(fooBarResult); Assert.assertNotSame(fooResult, fooBarResult); Assert.assertEquals(fooResult.getResultPath(), "/foo"); Assert.assertEquals(fooBarResult.getResultPath(), "/foo/bar"); } finally { CloseableUtils.closeQuietly(client); } }
@Test public void testWithNamespace() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).namespace("galt").build(); try { client.start(); Collection<CuratorTransactionResult> results = client.inTransaction() .create().forPath("/foo", "one".getBytes()) .and() .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test-", "one".getBytes()) .and() .setData().forPath("/foo", "two".getBytes()) .and() .create().forPath("/foo/bar") .and() .delete().forPath("/foo/bar") .and() .commit(); Assert.assertTrue(client.checkExists().forPath("/foo") != null); Assert.assertTrue(client.usingNamespace(null).checkExists().forPath("/galt/foo") != null); Assert.assertEquals(client.getData().forPath("/foo"), "two".getBytes()); Assert.assertTrue(client.checkExists().forPath("/foo/bar") == null); CuratorTransactionResult ephemeralResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/test-")); Assert.assertNotNull(ephemeralResult); Assert.assertNotEquals(ephemeralResult.getResultPath(), "/test-"); Assert.assertTrue(ephemeralResult.getResultPath().startsWith("/test-")); } finally { CloseableUtils.closeQuietly(client); } }
@Test public void testBasic() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); try { client.start(); Collection<CuratorTransactionResult> results = client.inTransaction() .create().forPath("/foo") .and() .create().forPath("/foo/bar", "snafu".getBytes()) .and() .commit(); Assert.assertTrue(client.checkExists().forPath("/foo/bar") != null); Assert.assertEquals(client.getData().forPath("/foo/bar"), "snafu".getBytes()); CuratorTransactionResult fooResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo")); CuratorTransactionResult fooBarResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo/bar")); Assert.assertNotNull(fooResult); Assert.assertNotNull(fooBarResult); Assert.assertNotSame(fooResult, fooBarResult); Assert.assertEquals(fooResult.getResultPath(), "/foo"); Assert.assertEquals(fooBarResult.getResultPath(), "/foo/bar"); } finally { CloseableUtils.closeQuietly(client); } }
@Override public AsyncMultiTransaction transaction() { return operations -> { BuilderCommon<List<CuratorTransactionResult>> common = new BuilderCommon<>(filters, opResultsProc); CuratorMultiTransactionImpl builder = new CuratorMultiTransactionImpl(client, common.backgrounding); return safeCall(common.internalCallback, () -> builder.forOperations(operations)); }; }
@Timed public QueuedRequestId enqueueRequest(BaragonRequest request, InternalRequestStates state) { final long start = System.currentTimeMillis(); try { final String queuedRequestPath = String.format(REQUEST_ENQUEUE_FORMAT, request.getLoadBalancerService().getServiceId(), request.getLoadBalancerRequestId()); final String requestPath = String.format(REQUEST_FORMAT, request.getLoadBalancerRequestId()); final String requestStatePath = String.format(REQUEST_STATE_FORMAT, request.getLoadBalancerRequestId()); if (!nodeExists(REQUESTS_FORMAT)) { createNode(REQUESTS_FORMAT); } if (!nodeExists(REQUEST_QUEUE_FORMAT)) { createNode(REQUEST_QUEUE_FORMAT); } byte[] requestBytes = objectMapper.writeValueAsBytes(request); byte[] stateBytes = objectMapper.writeValueAsBytes(state); Collection<CuratorTransactionResult> results = curatorFramework.inTransaction() .create().forPath(requestPath, requestBytes).and() .create().forPath(requestStatePath, stateBytes).and() .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(queuedRequestPath) .and().commit(); log(OperationType.WRITE, Optional.of(3), Optional.of(requestBytes.length + stateBytes.length), start, String.format("Transaction Paths [%s + %s + %s]", requestPath, requestStatePath, queuedRequestPath)); return QueuedRequestId.fromString(ZKPaths.getNodeFromPath(Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(org.apache.curator.framework.api.transaction.OperationType.CREATE, queuedRequestPath)).getResultPath())); } catch (Exception e) { throw Throwables.propagate(e); } }
@Override public List<CuratorTransactionResult> forOperations(CuratorOp... operations) throws Exception { List<CuratorOp> ops = (operations != null) ? Arrays.asList(operations) : Lists.<CuratorOp>newArrayList(); return forOperations(ops); }
@Override public List<CuratorTransactionResult> forOperations(List<CuratorOp> operations) throws Exception { operations = Preconditions.checkNotNull(operations, "operations cannot be null"); Preconditions.checkArgument(!operations.isEmpty(), "operations list cannot be empty"); CuratorMultiTransactionRecord record = new CuratorMultiTransactionRecord(); for ( CuratorOp curatorOp : operations ) { Schema schema = client.getSchemaSet().getSchema(curatorOp.getTypeAndPath().getForPath()); record.add(curatorOp.get(), curatorOp.getTypeAndPath().getType(), curatorOp.getTypeAndPath().getForPath()); if ( (curatorOp.get().getType() == ZooDefs.OpCode.create) || (curatorOp.get().getType() == ZooDefs.OpCode.createContainer) ) { CreateRequest createRequest = (CreateRequest)curatorOp.get().toRequestRecord(); CreateMode createMode; if ( client.isZk34CompatibilityMode() ) { try { createMode = CreateMode.fromFlag(createRequest.getFlags()); } catch ( KeeperException.BadArgumentsException dummy ) { createMode = CreateMode.PERSISTENT; } } else { createMode = CreateMode.fromFlag(createRequest.getFlags(), CreateMode.PERSISTENT); } schema.validateCreate(createMode, createRequest.getPath(), createRequest.getData(), createRequest.getAcl()); } else if ( (curatorOp.get().getType() == ZooDefs.OpCode.delete) || (curatorOp.get().getType() == ZooDefs.OpCode.deleteContainer) ) { DeleteRequest deleteRequest = (DeleteRequest)curatorOp.get().toRequestRecord(); schema.validateDelete(deleteRequest.getPath()); } else if ( curatorOp.get().getType() == ZooDefs.OpCode.setData ) { SetDataRequest setDataRequest = (SetDataRequest)curatorOp.get().toRequestRecord(); schema.validateGeneral(setDataRequest.getPath(), setDataRequest.getData(), null); } } if ( backgrounding.inBackground() ) { client.processBackgroundOperation(new OperationAndData<>(this, record, backgrounding.getCallback(), null, backgrounding.getContext(), null), null); return null; } else { return forOperationsInForeground(record); } }
@Override public List<CuratorTransactionResult> getOpResults() { return opResults; }
@Test public void testWithCompression() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).namespace("galt").build(); client.start(); try { Collection<CuratorTransactionResult> results = client.inTransaction() .create().compressed().forPath("/foo", "one".getBytes()) .and() .create().compressed().withACL(ZooDefs.Ids.READ_ACL_UNSAFE).forPath("/bar", "two".getBytes()) .and() .create().compressed().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test-", "three".getBytes()) .and() .create().compressed().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.READ_ACL_UNSAFE).forPath("/baz", "four".getBytes()) .and() .setData().compressed().withVersion(0).forPath("/foo", "five".getBytes()) .and() .commit(); Assert.assertTrue(client.checkExists().forPath("/foo") != null); Assert.assertEquals(client.getData().decompressed().forPath("/foo"), "five".getBytes()); Assert.assertTrue(client.checkExists().forPath("/bar") != null); Assert.assertEquals(client.getData().decompressed().forPath("/bar"), "two".getBytes()); Assert.assertEquals(client.getACL().forPath("/bar"), ZooDefs.Ids.READ_ACL_UNSAFE); CuratorTransactionResult ephemeralResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/test-")); Assert.assertNotNull(ephemeralResult); Assert.assertNotEquals(ephemeralResult.getResultPath(), "/test-"); Assert.assertTrue(ephemeralResult.getResultPath().startsWith("/test-")); Assert.assertTrue(client.checkExists().forPath("/baz") != null); Assert.assertEquals(client.getData().decompressed().forPath("/baz"), "four".getBytes()); Assert.assertEquals(client.getACL().forPath("/baz"), ZooDefs.Ids.READ_ACL_UNSAFE); } finally { client.close(); } }
@Override public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations) { return client.transaction().forOperations(operations); }
@Override public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations) { return client.inTransaction(operations); }
/** * @return any operation results or null */ public List<CuratorTransactionResult> getOpResults();
/** * Invoke ZooKeeper to commit the given operations as a single transaction. Create the * operation instances via {@link org.apache.curator.x.async.AsyncCuratorFramework#transactionOp()} * * @param operations operations that make up the transaction. * @return AsyncStage instance for managing the completion */ AsyncStage<List<CuratorTransactionResult>> forOperations(List<CuratorOp> operations);
/** * Invoke ZooKeeper to commit the given operations as a single transaction. * * @param operations operations that make up the transaction. * @return AsyncStage instance for managing the completion */ AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations);