@Test public void testCreateAsync() throws IOException, KeeperException, InterruptedException { AsyncCallback.Create2Callback callback = new AsyncCallback.Create2Callback() { @Override public void processResult(int rc, String path, Object ctx, String name, Stat stat) { // NOP } }; zk.create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_WITH_TTL, callback, null, 100); final AtomicLong fakeElapsed = new AtomicLong(0); ContainerManager containerManager = newContainerManager(fakeElapsed); containerManager.checkContainers(); Assert.assertNotNull("Ttl node should not have been deleted yet", zk.exists("/foo", false)); fakeElapsed.set(1000); containerManager.checkContainers(); Assert.assertNull("Ttl node should have been deleted", zk.exists("/foo", false)); }
/** * Set region as OFFLINED up in zookeeper asynchronously. * @param state * @return True if we succeeded, false otherwise (State was incorrect or failed * updating zk). */ private boolean asyncSetOfflineInZooKeeper(final RegionState state, final AsyncCallback.StringCallback cb, final ServerName destination) { if (!state.isClosed() && !state.isOffline()) { this.server.abort("Unexpected state trying to OFFLINE; " + state, new IllegalStateException()); return false; } regionStates.updateRegionState(state.getRegion(), State.OFFLINE); try { ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(), destination, cb, state); } catch (KeeperException e) { if (e instanceof NodeExistsException) { LOG.warn("Node for " + state.getRegion() + " already exists"); } else { server.abort("Unexpected ZK exception creating/setting node OFFLINE", e); } return false; } return true; }
@Override public Future<Map<String, DLSN>> getLastCommitPositions() { final Promise<Map<String, DLSN>> result = new Promise<Map<String, DLSN>>(); try { this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { if (KeeperException.Code.NONODE.intValue() == rc) { result.setValue(new HashMap<String, DLSN>()); } else if (KeeperException.Code.OK.intValue() != rc) { result.setException(KeeperException.create(KeeperException.Code.get(rc), path)); } else { getLastCommitPositions(result, children); } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { result.setException(zkce); } catch (InterruptedException ie) { result.setException(new DLInterruptedException("getLastCommitPositions was interrupted", ie)); } return result; }
/** * Asynchronously create zookeeper path recursively and optimistically * * @param zkc Zookeeper client * @param pathToCreate Zookeeper full path * @param parentPathShouldNotCreate zookeeper parent path should not be created * @param data Zookeeper data * @param acl Acl of the zk path * @param createMode Create mode of zk path */ public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic( final ZooKeeperClient zkc, final String pathToCreate, final Optional<String> parentPathShouldNotCreate, final byte[] data, final List<ACL> acl, final CreateMode createMode) { final Promise<BoxedUnit> result = new Promise<BoxedUnit>(); zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate, data, acl, createMode, new AsyncCallback.StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { handleKeeperExceptionCode(rc, path, result); } }, result); return result; }
/** * Set region as OFFLINED up in zookeeper asynchronously. * @param state * @return True if we succeeded, false otherwise (State was incorrect or failed * updating zk). */ boolean asyncSetOfflineInZooKeeper(final RegionState state, final AsyncCallback.StringCallback cb, final Object ctx) { if (!state.isClosed() && !state.isOffline()) { new RuntimeException("Unexpected state trying to OFFLINE; " + state); this.master.abort("Unexpected state trying to OFFLINE; " + state, new IllegalStateException()); return false; } state.update(RegionState.State.OFFLINE); try { ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(), this.master.getServerName(), cb, ctx); } catch (KeeperException e) { // TODO: this error handling will never execute, as the callback is async. if (e instanceof NodeExistsException) { LOG.warn("Node for " + state.getRegion() + " already exists"); } else { master.abort("Unexpected ZK exception creating/setting node OFFLINE", e); } return false; } return true; }
protected void processKeeperException(final KeeperException e, final Watcher watcher,final DataAction dataAction, boolean asynchFlag) { switch (e.code()) { case BADVERSION: if (asynchFlag) { zooKeeper.getData(e.getPath(), watcher, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { ZData zData = ZDataImpl.newInstance(path, data, stat.getVersion()); updateData(zData, dataAction, watcher); } }, null); } else { } break; default: logger.warn("encounter exception", e); } }
@Test public void testNotCloseZkWhenPending() throws Exception { ZooKeeper mockedZK = mock(ZooKeeper.class); Exchanger<AsyncCallback.DataCallback> exchanger = new Exchanger<>(); doAnswer(i -> { exchanger.exchange(i.getArgument(2)); return null; }).when(mockedZK).getData(anyString(), anyBoolean(), any(AsyncCallback.DataCallback.class), any()); doAnswer(i -> null).when(mockedZK).close(); when(mockedZK.getState()).thenReturn(ZooKeeper.States.CONNECTED); RO_ZK.zookeeper = mockedZK; CompletableFuture<byte[]> future = RO_ZK.get(PATH); AsyncCallback.DataCallback callback = exchanger.exchange(null); // 2 * keep alive time to ensure that we will not close the zk when there are pending requests Thread.sleep(6000); assertNotNull(RO_ZK.zookeeper); verify(mockedZK, never()).close(); callback.processResult(Code.OK.intValue(), PATH, null, DATA, null); assertArrayEquals(DATA, future.get()); // now we will close the idle connection. waitForIdleConnectionClosed(); verify(mockedZK, times(1)).close(); }
/** * Set region as OFFLINED up in zookeeper asynchronously. * @param state * @return True if we succeeded, false otherwise (State was incorrect or failed * updating zk). */ boolean asyncSetOfflineInZooKeeper(final RegionState state, final AsyncCallback.StringCallback cb, final Object ctx) { if (!state.isClosed() && !state.isOffline()) { new RuntimeException("Unexpected state trying to OFFLINE; " + state); this.master.abort("Unexpected state trying to OFFLINE; " + state, new IllegalStateException()); return false; } state.update(RegionState.State.OFFLINE); try { ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(), this.master.getServerName(), cb, ctx); } catch (KeeperException e) { master.abort("Unexpected ZK exception creating/setting node OFFLINE", e); return false; } return true; }
@Override public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData) throws Exception { try { final TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Background"); AsyncCallback.MultiCallback callback = new AsyncCallback.MultiCallback() { @Override public void processResult(int rc, String path, Object ctx, List<OpResult> opResults) { trace.commit(); List<CuratorTransactionResult> curatorResults = (opResults != null) ? CuratorTransactionImpl.wrapResults(client, opResults, operationAndData.getData()) : null; CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.TRANSACTION, rc, path, null, ctx, null, null, null, null, null, curatorResults); client.processBackgroundOperation(operationAndData, event); } }; client.getZooKeeper().multi(operationAndData.getData(), callback, backgrounding.getContext()); } catch ( Throwable e ) { backgrounding.checkError(e, null); } }
@Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("BackgroundSyncImpl"); final String data = operationAndData.getData(); client.getZooKeeper().sync ( data, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { trace.setReturnCode(rc).setRequestBytesLength(data).commit(); CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, null, ctx, null, null, null, null, null, null); client.processBackgroundOperation(operationAndData, event); } }, context ); }
@Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { try { final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetACLBuilderImpl-Background"); AsyncCallback.ACLCallback callback = new AsyncCallback.ACLCallback() { @Override public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) { trace.setReturnCode(rc).setPath(path).setStat(stat).commit(); CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.GET_ACL, rc, path, null, ctx, stat, null, null, null, acl, null); client.processBackgroundOperation(operationAndData, event); } }; client.getZooKeeper().getACL(operationAndData.getData(), responseStat, callback, backgrounding.getContext()); } catch ( Throwable e ) { backgrounding.checkError(e, null); } }
/** * Set region as OFFLINED up in zookeeper asynchronously. * @param state * @return True if we succeeded, false otherwise (State was incorrect or failed * updating zk). */ private boolean asyncSetOfflineInZooKeeper(final RegionState state, final AsyncCallback.StringCallback cb, final ServerName destination) { if (!state.isClosed() && !state.isOffline()) { this.server.abort("Unexpected state trying to OFFLINE; " + state, new IllegalStateException()); return false; } regionStates.updateRegionState( state.getRegion(), RegionState.State.OFFLINE); try { ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(), destination, cb, state); } catch (KeeperException e) { if (e instanceof NodeExistsException) { LOG.warn("Node for " + state.getRegion() + " already exists"); } else { server.abort("Unexpected ZK exception creating/setting node OFFLINE", e); } return false; } return true; }
MultiCallback(int expected, AsyncCallback.VoidCallback cb, Object context) { this.expected = expected; this.cb = cb; this.context = context; if (expected == 0) { cb.processResult(Code.OK.intValue(), null, context); } }
/** * This method asynchronously gets the set of available Bookies that the * dead input bookie's data will be copied over into. If the user passed in * a specific destination bookie, then just use that one. Otherwise, we'll * randomly pick one of the other available bookies to use for each ledger * fragment we are replicating. * * @param bookieSrc * Source bookie that had a failure. We want to replicate the * ledger fragments that were stored there. * @param bookieDest * Optional destination bookie that if passed, we will copy all * of the ledger fragments from the source bookie over to it. * @param cb * RecoverCallback to invoke once all of the data on the dead * bookie has been recovered and replicated. * @param context * Context for the RecoverCallback to call. */ private void getAvailableBookies(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest, final RecoverCallback cb, final Object context) { final List<InetSocketAddress> availableBookies = new LinkedList<InetSocketAddress>(); if (bookieDest != null) { availableBookies.add(bookieDest); // Now poll ZK to get the active ledgers getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies); } else { zk.getChildren(BOOKIES_PATH, null, new AsyncCallback.ChildrenCallback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children) { if (rc != Code.OK.intValue()) { LOG.error("ZK error getting bookie nodes: ", KeeperException.create(KeeperException.Code .get(rc), path)); cb.recoverComplete(BKException.Code.ZKException, context); return; } for (String bookieNode : children) { String parts[] = bookieNode.split(COLON); if (parts.length < 2) { LOG.error("Bookie Node retrieved from ZK has invalid name format: " + bookieNode); cb.recoverComplete(BKException.Code.ZKException, context); return; } availableBookies.add(new InetSocketAddress(parts[0], Integer.parseInt(parts[1]))); } // Now poll ZK to get the active ledgers getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies); } }, null); } }
/** * get acl async */ @Test @ZooConfig(initNodes = {"/node"}, async = true) public void testGetACLAsync() throws Exception { zooKeeper.getACL("/node", null, new AsyncCallback.ACLCallback() { @Override public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) { assertTrue(acl.equals(ZooDefs.Ids.OPEN_ACL_UNSAFE)); assertResult(rc, "/node", path, ctx, stat); } }, null); }
/** * set acl sync */ @Test @ZooConfig(initNodes = {"/node"}, async = true) public void testSetACLAsync() throws Exception { zooKeeper.setACL("/node", Arrays.asList(new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone"))), -1, new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { assertResult(rc, "/node", path, ctx, null); } }, null); }
@Test @ZooConfig(initNodes = {"/node"}, async = true) public void testCreateNodeAsync() { zooKeeper.create("/node/child", "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { assertEquals("/node/child", name); assertResult(rc, "/node/child", path, ctx, null); } }, null); }
@Test @ZooConfig(initNodes = {"/node"}, async = true) public void testDeleteNodeAsync() throws Exception { zooKeeper.delete("/node", -1, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { assertResult(rc, "/node", path, ctx, null); } }, null); }
@Test @ZooConfig(initNodes = {"/node"}, async = true) public void testNodeExistsAsync() throws Exception { zooKeeper.exists("/node", false, new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { assertResult(rc, "/node", path, ctx, stat); } }, null); }
@Test @ZooConfig(initNodes = {"/node"}, async = true) public void testGetNodeDataAsync() throws Exception { zooKeeper.setData("/node", "data".getBytes(), -1); zooKeeper.getData("/node", false, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { assertEquals("data", new String(data)); assertResult(rc, "/node", path, ctx, stat); } }, null); }
@Test @ZooConfig(initNodes = {"/node"}, async = true) public void testSetNodeDataAsync() throws Exception { zooKeeper.setData("/node", "data".getBytes(), -1, new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { assertResult(rc, "/node", path, ctx, stat); } }, null); }
@Test @ZooConfig(initNodes = {"/node/child"}, async = true) public void testGetChildrenAsync() throws Exception { zooKeeper.getChildren("/node", false, new AsyncCallback.ChildrenCallback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children) { assertTrue(children.equals(Arrays.asList("child"))); assertResult(rc, "/node", path, ctx, stat); } }, null); }
Future<DLSN> getLastCommitPositionFromZK() { final Promise<DLSN> result = new Promise<DLSN>(); try { logger.debug("Reading last commit position from path {}", zkPath); zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc); if (KeeperException.Code.NONODE.intValue() == rc) { result.setValue(DLSN.NonInclusiveLowerBound); } else if (KeeperException.Code.OK.intValue() != rc) { result.setException(KeeperException.create(KeeperException.Code.get(rc), path)); } else { try { DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8)); result.setValue(dlsn); } catch (Exception t) { logger.warn("Invalid last commit position found from path {}", zkPath, t); // invalid dlsn recorded in subscription state store result.setValue(DLSN.NonInclusiveLowerBound); } } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { result.setException(zkce); } catch (InterruptedException ie) { result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie)); } return result; }
private void createAllocators(int numAllocators) throws InterruptedException, IOException { final AtomicInteger numPendings = new AtomicInteger(numAllocators); final AtomicInteger numFailures = new AtomicInteger(0); final CountDownLatch latch = new CountDownLatch(1); AsyncCallback.StringCallback createCallback = new AsyncCallback.StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { if (KeeperException.Code.OK.intValue() != rc) { numFailures.incrementAndGet(); latch.countDown(); return; } if (numPendings.decrementAndGet() == 0 && numFailures.get() == 0) { latch.countDown(); } } }; for (int i = 0; i < numAllocators; i++) { zkc.get().create(poolPath + "/A", new byte[0], zkc.getDefaultACL(), CreateMode.PERSISTENT_SEQUENTIAL, createCallback, null); } latch.await(); if (numFailures.get() > 0) { throw new IOException("Failed to create " + numAllocators + " allocators."); } }
/** * Get client id and its ephemeral owner. * * @param zkClient * zookeeper client * @param lockPath * lock path * @param nodeName * node name * @return client id and its ephemeral owner. */ static Future<Pair<String, Long>> asyncParseClientID(ZooKeeper zkClient, String lockPath, String nodeName) { String[] parts = nodeName.split("_"); // member_<clientid>_s<owner_session>_ if (4 == parts.length && parts[2].startsWith("s")) { long sessionOwner = Long.parseLong(parts[2].substring(1)); String clientId; try { clientId = URLDecoder.decode(parts[1], UTF_8.name()); return Future.value(Pair.of(clientId, sessionOwner)); } catch (UnsupportedEncodingException e) { // if failed to parse client id, we have to get client id by zookeeper#getData. } } final Promise<Pair<String, Long>> promise = new Promise<Pair<String, Long>>(); zkClient.getData(lockPath + "/" + nodeName, false, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (KeeperException.Code.OK.intValue() != rc) { promise.setException(KeeperException.create(KeeperException.Code.get(rc))); } else { promise.setValue(Pair.of(deserializeClientId(data), stat.getEphemeralOwner())); } } }, null); return promise; }
private void deleteLockNode(final Promise<BoxedUnit> promise) { if (null == currentNode) { promise.setValue(BoxedUnit.UNIT); return; } zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() { @Override public void processResult(final int rc, final String path, Object ctx) { lockStateExecutor.submit(lockPath, new SafeRunnable() { @Override public void safeRun() { if (KeeperException.Code.OK.intValue() == rc) { LOG.info("Deleted lock node {} for {} successfully.", path, lockId); } else if (KeeperException.Code.NONODE.intValue() == rc || KeeperException.Code.SESSIONEXPIRED.intValue() == rc) { LOG.info("Delete node failed. Node already gone for node {} id {}, rc = {}", new Object[] { path, lockId, KeeperException.Code.get(rc) }); } else { LOG.error("Failed on deleting lock node {} for {} : {}", new Object[] { path, lockId, KeeperException.Code.get(rc) }); } FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockUnlockCleanup); promise.setValue(BoxedUnit.UNIT); } }); } }, null); }
/** * Check Lock Owner Phase 1 : Get all lock waiters. * * @param lockWatcher * lock watcher. * @param wait * whether to wait for ownership. * @param promise * promise to satisfy with current lock owner */ private void checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher, final boolean wait, final Promise<String> promise) { zk.getChildren(lockPath, false, new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { processLockWaiters(lockWatcher, wait, rc, children, promise); } }, null); }
@Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { try { final TimeTrace trace = client.getZookeeperClient().startTracer("RemoteWatches-Background"); AsyncCallback.VoidCallback callback = new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { trace.commit(); CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.REMOVE_WATCHES, rc, path, null, ctx, null, null, null, null, null, null); client.processBackgroundOperation(operationAndData, event); } }; ZooKeeper zkClient = client.getZooKeeper(); NamespaceWatcher namespaceWatcher = makeNamespaceWatcher(operationAndData.getData()); if(namespaceWatcher == null) { zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext()); } else { zkClient.removeWatches(operationAndData.getData(), namespaceWatcher, watcherType, local, callback, operationAndData.getContext()); } } catch ( Throwable e ) { backgrounding.checkError(e, null); } }
@Override public void performBackgroundOperation(final OperationAndData<Void> operationAndData) throws Exception { try { final TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Background"); AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { watching.commitWatcher(rc, false); trace.commit(); CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_CONFIG, rc, path, null, ctx, stat, data, null, null, null, null); client.processBackgroundOperation(operationAndData, event); } }; if ( watching.isWatched() ) { client.getZooKeeper().getConfig(true, callback, backgrounding.getContext()); } else { client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), callback, backgrounding.getContext()); } } catch ( Throwable e ) { backgrounding.checkError(e, watching); } }
@Override public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception { try { final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Background"); final byte[] data = operationAndData.getData().getData(); client.getZooKeeper().setData ( operationAndData.getData().getPath(), data, version, new AsyncCallback.StatCallback() { @SuppressWarnings({"unchecked"}) @Override public void processResult(int rc, String path, Object ctx, Stat stat) { trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).setStat(stat).commit(); CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_DATA, rc, path, null, ctx, stat, null, null, null, null, null); client.processBackgroundOperation(operationAndData, event); } }, backgrounding.getContext() ); } catch ( Throwable e ) { backgrounding.checkError(e, null); } }
@Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { try { final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("ExistsBuilderImpl-Background"); AsyncCallback.StatCallback callback = new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { watching.commitWatcher(rc, true); trace.setReturnCode(rc).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(stat).commit(); CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS, rc, path, null, ctx, stat, null, null, null, null, null); client.processBackgroundOperation(operationAndData, event); } }; if ( watching.isWatched() ) { client.getZooKeeper().exists(operationAndData.getData(), true, callback, backgrounding.getContext()); } else { client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(operationAndData.getData()), callback, backgrounding.getContext()); } } catch ( Throwable e ) { backgrounding.checkError(e, watching); } }