private ZooKeeper getConnectedZkClient() throws IOException { ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); long start = System.currentTimeMillis(); while (!connected) { long end = System.currentTimeMillis(); if (end - start > 5000) { Assert.assertTrue("Could not connect with server in 5 seconds", false); } try { Thread.sleep(200); } catch (Exception e) { LOG.warn("Interrupted"); } } return zk; }
public void run() { byte b[] = new byte[256]; try { for (; current < count; current++) { ZooKeeper zk = parent.createClient(); try { zk.create(prefix + current, b, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } finally { try { zk.close(); } catch (InterruptedException e) { LOG.warn("Unexpected", e); } } } } catch (Throwable t) { LOG.error("Client create operation Assert.failed", t); } }
/** * Get a new zookeeper client instance. protected so that test class can * inherit and pass in a mock object for zookeeper * * @return new zookeeper client instance * @throws IOException * @throws KeeperException zookeeper connectionloss exception */ protected synchronized ZooKeeper getNewZooKeeper() throws IOException, KeeperException { // Unfortunately, the ZooKeeper constructor connects to ZooKeeper and // may trigger the Connected event immediately. So, if we register the // watcher after constructing ZooKeeper, we may miss that event. Instead, // we construct the watcher first, and have it block any events it receives // before we can set its ZooKeeper reference. watcher = new WatcherWithClientRef(); ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher); watcher.setZooKeeperRef(zk); // Wait for the asynchronous success/failure. This may throw an exception // if we don't connect within the session timeout. watcher.waitForZKConnectionEvent(zkSessionTimeout); for (ZKAuthInfo auth : zkAuthInfo) { zk.addAuthInfo(auth.getScheme(), auth.getAuth()); } return zk; }
@Test public void testSuperIsSuper() throws Exception { ZooKeeper zk = createClient(); try { zk.create("/digest_read", null, Arrays.asList(new ACL(Perms.READ, otherDigestUser)), CreateMode.PERSISTENT); zk.create("/digest_read/sub", null, Arrays.asList(new ACL(Perms.READ, otherDigestUser)), CreateMode.PERSISTENT); zk.create("/sasl_read", null, Arrays.asList(new ACL(Perms.READ, otherSaslUser)), CreateMode.PERSISTENT); zk.create("/sasl_read/sub", null, Arrays.asList(new ACL(Perms.READ, otherSaslUser)), CreateMode.PERSISTENT); zk.delete("/digest_read/sub", -1); zk.delete("/digest_read", -1); zk.delete("/sasl_read/sub", -1); zk.delete("/sasl_read", -1); //If the test failes it will most likely fail with a NoAuth exception before it ever gets to this assertion Assert.assertEquals(authFailed.get(), 0); } finally { zk.close(); } }
/** * 建立zk连接,并初始化root和config节点 */ private void init() { loadLocalProperties(); try { this.zooKeeper = new ZooKeeper(address, 1000000, new DefaultDataWatcher()); Stat stat = this.zooKeeper.exists(PathVarConst.ROOT_PATH, null); if (stat == null) { this.zooKeeper.create(PathVarConst.ROOT_PATH, "root".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); this.zooKeeper.create(PathVarConst.ROOTCONF_PATH, "config".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { LOGGER.error("zookeeper连接配置出错,请检查配置文件", e); } }
private void verifyUnexpectedBeans(Set<ObjectName> children) { if (allClients != null) { for (ZooKeeper zkc : allClients) { Iterator<ObjectName> childItr = children.iterator(); while (childItr.hasNext()) { ObjectName clientBean = childItr.next(); if (clientBean.toString().contains( getHexSessionId(zkc.getSessionId()))) { LOG.info("found name:" + zkc.getSessionId() + " client bean:" + clientBean.toString()); childItr.remove(); } } } } for (ObjectName bean : children) { LOG.info("unexpected:" + bean.toString()); } TestCase.assertEquals("Unexpected bean exists!", 0, children.size()); }
public void runHammer(final int threadCount, final int childCount) throws Throwable { try { HammerThread[] threads = new HammerThread[threadCount]; long start = System.currentTimeMillis(); for (int i = 0; i < threads.length; i++) { ZooKeeper zk = createClient(); String prefix = "/test-" + i; zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); prefix += "/"; HammerThread thread = new BasicHammerThread("BasicHammerThread-" + i, zk, prefix, childCount); thread.start(); threads[i] = thread; } verifyHammer(start, threads, childCount); } catch (Throwable t) { LOG.error("test Assert.failed", t); throw t; } }
@Override public void process(WatchedEvent event) { ZooKeeper zkClient = zookeeperConnManager.getZkClient(); try { /* 重新注册节点 */ List<String> childrens = zkClient.getChildren(nodePath, this); EventType eventType = event.getType(); switch (eventType) { case NodeChildrenChanged: log.info("当前注册中心内的成功注册的agent数量-->" + childrens.stream().filter(children -> children.startsWith("agent")).count()); break; default: break; } } catch (Exception e) { log.error("error", e); } }
private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { int iterations = 10; boolean someoneNotConnected = true; while(someoneNotConnected) { if (iterations-- == 0) { ClientBase.logAllStackTraces(); throw new RuntimeException("Waiting too long"); } someoneNotConnected = false; for(ZooKeeper zk: zks) { if (zk.getState() != state) { someoneNotConnected = true; } } Thread.sleep(1000); } }
@Test public void testClientRetry() throws IOException, InterruptedException, TimeoutException{ CountdownWatcher cdw1 = new CountdownWatcher(); CountdownWatcher cdw2 = new CountdownWatcher(); ZooKeeper zk = new ZooKeeper(hostPort, 10000, cdw1); try { cdw1.waitForConnected(CONNECTION_TIMEOUT); ZooKeeper zk2 = new ZooKeeper(hostPort, 10000, cdw2); try { States s1 = zk.getState(); States s2 = zk2.getState(); Assert.assertSame(s1,States.CONNECTED); Assert.assertSame(s2,States.CONNECTING); cdw1.reset(); cdw1.waitForDisconnected(CONNECTION_TIMEOUT); cdw2.waitForConnected(CONNECTION_TIMEOUT); Assert.assertSame(zk2.getState(),States.CONNECTED); } finally { zk2.close(); } } finally { zk.close(); } }
@Test public void testRemove1() throws Exception{ String dir = "/testRemove1"; String testString = "Hello World"; final int num_clients = 1; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } try{ queueHandles[0].remove(); }catch(NoSuchElementException e){ return; } Assert.assertTrue(false); }
public void createNremoveMelementTest(String dir,int n,int m) throws Exception{ String testString = "Hello World"; final int num_clients = 2; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } for(int i=0; i< n; i++){ String offerString = testString + i; queueHandles[0].offer(offerString.getBytes()); } byte data[] = null; for(int i=0; i<m; i++){ data=queueHandles[1].remove(); } Assert.assertEquals(new String(queueHandles[1].element()), testString+m); }
/** find if we have been created earler if not create our node * * @param prefix the prefix node * @param zookeeper teh zookeeper client * @param dir the dir paretn * @throws KeeperException * @throws InterruptedException */ private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) throws KeeperException, InterruptedException { List<String> names = zookeeper.getChildren(dir, false); for (String name : names) { if (name.startsWith(prefix)) { id = name; if (LOG.isDebugEnabled()) { LOG.debug("Found id created last time: " + id); } break; } } if (id == null) { id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL); if (LOG.isDebugEnabled()) { LOG.debug("Created id: " + id); } } }
@Test public void testNodeCreated() throws Exception { QuorumUtil qu = new QuorumUtil(1); qu.startAll(); EventsWatcher watcher = new EventsWatcher(); ZooKeeper zk1 = createClient(qu, 1, watcher); ZooKeeper zk2 = createClient(qu, 2); String path = "/test1-created"; zk1.exists(path, watcher); qu.shutdown(1); zk2.create(path, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); qu.start(1); watcher.waitForConnected(TIMEOUT * 1000L); watcher.assertEvent(TIMEOUT, EventType.NodeCreated); qu.shutdownAll(); }
/** * Test to verify that servers are able to form quorum. * peer0 -> quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=true, quorum.auth.serverRequireSasl=true * peer1 -> quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=true, quorum.auth.serverRequireSasl=true */ @Test(timeout = 30000) public void testAuthLearnerServer() throws Exception { Map<String, String> authConfigs = new HashMap<String, String>(); authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true"); authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true"); authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true"); String connectStr = startQuorum(2, authConfigs, 2, false); CountdownWatcher watcher = new CountdownWatcher(); ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT, watcher); watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT); zk.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.close(); }
public void preAuth() throws Exception { ZooKeeper zk = createClient(); zk.addAuthInfo("key", "25".getBytes()); try { createNodePrintAcl(zk, "/pre", "testPreAuth"); zk.setACL("/", Ids.CREATOR_ALL_ACL, -1); zk.getChildren("/", false); zk.create("/abc", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); zk.setData("/abc", "testData1".getBytes(), -1); zk.create("/key", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); zk.setData("/key", "5".getBytes(), -1); Thread.sleep(1000); } catch (KeeperException e) { Assert.fail("test failed :" + e); } finally { zk.close(); } }
/** * Create the znodes, this may fail if the lower 32 roll over, if so * wait for the clients to be re-connected after the re-election */ private int createNodes(ZooKeeper zk, int start, int count) throws Exception { LOG.info("Creating nodes {} thru {}", start, (start + count)); int j = 0; try { for (int i = start; i < start + count; i++) { zk.create("/foo" + i, new byte[0], Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL); j++; } } catch (ConnectionLossException e) { // this is ok - the leader has dropped leadership waitForClientsConnected(); } return j; }
@Test public void testSuperACL() throws Exception { ZooKeeper zk = createClient(); try { zk.addAuthInfo("digest", "pat:pass".getBytes()); zk.create("/path1", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); zk.close(); // verify super can do anything and ignores ACLs zk = createClient(); zk.addAuthInfo("digest", "super:test".getBytes()); zk.getData("/path1", false, null); zk.setACL("/path1", Ids.READ_ACL_UNSAFE, -1); zk.create("/path1/foo", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); zk.setACL("/path1", Ids.OPEN_ACL_UNSAFE, -1); } finally { zk.close(); } }
private void create_get_stat_test() throws IOException, InterruptedException, KeeperException { checkRoot(); ZooKeeper zk = new ZooKeeper(hostPort, 10000, this); String parentName = testDirOnZK; String nodeName = parentName + "/create_with_stat_tmp"; deleteNodeIfExists(zk, nodeName); deleteNodeIfExists(zk, nodeName + "_2"); Stat stat = new Stat(); zk.create(nodeName, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); Assert.assertNotNull(stat); Assert.assertTrue(stat.getCzxid() > 0); Assert.assertTrue(stat.getCtime() > 0); Stat stat2 = new Stat(); zk.create(nodeName + "_2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat2); Assert.assertNotNull(stat2); Assert.assertTrue(stat2.getCzxid() > stat.getCzxid()); Assert.assertTrue(stat2.getCtime() > stat.getCtime()); deleteNodeIfExists(zk, nodeName); deleteNodeIfExists(zk, nodeName + "_2"); zk.close(); }
boolean verify(ZooKeeper zkc, String path) { try { EditLogLedgerMetadata other = read(zkc, path); if (LOG.isTraceEnabled()) { LOG.trace("Verifying " + this.toString() + " against " + other); } return other.equals(this); } catch (KeeperException e) { LOG.error("Couldn't verify data in " + path, e); return false; } catch (IOException ie) { LOG.error("Couldn't verify data in " + path, ie); return false; } }
public static String[] getTree(ZooKeeper zk,String path) throws Exception{ if(zk.exists(path, false) == null){ return new String[0]; } List<String> dealList = new ArrayList<String>(); dealList.add(path); int index =0; while(index < dealList.size()){ String tempPath = dealList.get(index); List<String> children = zk.getChildren(tempPath, false); if(tempPath.equalsIgnoreCase("/") == false){ tempPath = tempPath +"/"; } Collections.sort(children); for(int i = children.size() -1;i>=0;i--){ dealList.add(index+1, tempPath + children.get(i)); } index++; } return (String[])dealList.toArray(new String[0]); }
private void verifyUnexpectedBeans(Set<ObjectName> children) { if (allClients != null) { for (ZooKeeper zkc : allClients) { Iterator<ObjectName> childItr = children.iterator(); while (childItr.hasNext()) { ObjectName clientBean = childItr.next(); if (clientBean.toString().contains( getHexSessionId(zkc.getSessionId()))) { LOG.info("found name:" + zkc.getSessionId() + " client bean:" + clientBean.toString()); childItr.remove(); } } } } for (ObjectName bean : children) { LOG.info("unexpected:" + bean.toString()); } Assert.assertEquals("Unexpected bean exists!", 0, children.size()); }
@Test public void testOffer1() throws Exception { String dir = "/testOffer1"; String testString = "Hello World"; final int num_clients = 1; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } queueHandles[0].offer(testString.getBytes()); byte dequeuedBytes[] = queueHandles[0].remove(); Assert.assertEquals(new String(dequeuedBytes), testString); }
/** * Test to verify that server is able to start with valid credentials */ @Test(timeout = 120000) public void testValidCredentials() throws Exception { String serverPrincipal = KerberosTestUtils.getServerPrincipal(); serverPrincipal = serverPrincipal.substring(0, serverPrincipal.lastIndexOf("@")); Map<String, String> authConfigs = new HashMap<String, String>(); authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true"); authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true"); authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true"); authConfigs.put(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL, serverPrincipal); String connectStr = startQuorum(3, authConfigs, 3, true); CountdownWatcher watcher = new CountdownWatcher(); ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT, watcher); watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT); for (int i = 0; i < 10; i++) { zk.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } zk.close(); }
/** * Test to verify that Observer server is able to join quorum. */ @Test(timeout = 30000) public void testObserverWithValidCredentials() throws Exception { Map<String, String> authConfigs = new HashMap<String, String>(); authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true"); authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true"); authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true"); // Starting auth enabled 5-node cluster. 3-Participants and 2-Observers. int totalServerCount = 5; int observerCount = 2; String connectStr = startQuorum(totalServerCount, observerCount, authConfigs, totalServerCount); CountdownWatcher watcher = new CountdownWatcher(); zk = new ZooKeeper(connectStr.toString(), ClientBase.CONNECTION_TIMEOUT, watcher); watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT); zk.create("/myTestRoot", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }
@Before public void setUp() throws Exception { super.setUp(); RestCfg cfg = new RestCfg(new ByteArrayInputStream(String.format( "rest.port=%s\n" + "rest.endpoint.1=%s;%s\n", GRIZZLY_PORT, CONTEXT_PATH, ZKHOSTPORT).getBytes())); rest = new RestMain(cfg); rest.start(); zk = new ZooKeeper(ZKHOSTPORT, 30000, new MyWatcher()); client = Client.create(); znodesr = client.resource(BASEURI).path("znodes/v1"); sessionsr = client.resource(BASEURI).path("sessions/v1/"); }
/** * Test that, if ACLs are specified in the configuration, that * it sets the ACLs when formatting the parent node. */ @Test(timeout=15000) public void testFormatSetsAcls() throws Exception { // Format the base dir, should succeed DummyHAService svc = cluster.getService(1); assertEquals(0, runFC(svc, "-formatZK")); ZooKeeper otherClient = createClient(); try { // client without auth should not be able to read it Stat stat = new Stat(); otherClient.getData(ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT, false, stat); fail("Was able to read data without authenticating!"); } catch (KeeperException.NoAuthException nae) { // expected } }
@Test public void testAuthFail() throws Exception { ZooKeeper zk = createClient(); try { zk.create("/path1", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); Assert.fail("Should have gotten exception."); } catch(Exception e ) { // ok, exception as expected. LOG.info("Got exception as expected: " + e); } finally { zk.close(); } }
/** * Initialize Zookeeper connection * * @param host zookeeper host names * @param sessionTimeout zookeeper connection session timeout * @return Zookeeper connection * @throws ConnectionException when connection fails */ public ZooKeeper connect(final String host,final int sessionTimeout) { try { connection = new ZooKeeper(host, sessionTimeout, watcher) ; } catch (IOException e) { throw new ConnectionException(host,e.getMessage(),e,this.getClass()); } return connection; }
public LogManagerFactoryImpl(ClientConfiguration clientConfiguration, BookKeeperConfig config) throws Exception { bookKeeperConfig = config; checkNotNull(clientConfiguration); String servers = clientConfiguration.getZkServers(); checkNotNull(servers); final CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper = new ZooKeeper(servers, clientConfiguration.getZkTimeout(), event -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { logger.info("Connected to zookeeper ,connectString = {}", servers); countDownLatch.countDown(); } else { logger.error("Failed to connect zookeeper,connectString = {}", servers); } }); if (!countDownLatch.await(clientConfiguration.getZkTimeout(), TimeUnit.MILLISECONDS) || zooKeeper.getState() != ZooKeeper.States.CONNECTED) { throw new LedgerStorageException( "Error connecting to zookeeper server ,connectString = " + servers + "."); } this.bookKeeper = new BookKeeper(clientConfiguration, zooKeeper); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(servers, retryPolicy); curatorFramework.start(); asyncCuratorFramework = AsyncCuratorFramework.wrap(curatorFramework); logInfoStorage = new LogInfoStorageImpl(asyncCuratorFramework); offsetStorage = new ZkOffsetStorageImpl(logInfoStorage, asyncCuratorFramework); }
private void utestChildren(int port) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper("127.0.0.1:" + port, CONNECTION_TIMEOUT, this); for (int i = 0; i < 10000; i++) { zk.getChildren("/" + i, true); } zk.close(); }
public static void createPath(ZooKeeper zk, String path, CreateMode createMode, List<ACL> acl) throws Exception { String[] list = path.split("/"); String zkPath = ""; for (String str : list) { if (StringUtils.isNotEmpty(str)) { zkPath = zkPath + "/" + str; if (zk.exists(zkPath, false) == null) { zk.create(zkPath, null, acl, createMode); } } } }
public void run() throws IOException, InterruptedException, KeeperException { zk = new ZooKeeper(zkHostPort, sessTimeout, this); mknod(assignmentsNode, CreateMode.PERSISTENT); mknod(statusNode, CreateMode.EPHEMERAL); mknod(reportsNode, CreateMode.PERSISTENT); // Now we just start watching the assignments directory zk.getChildren(assignmentsNode, true, this, null); }
@Test public void testBadSaslAuthNotifiesWatch() throws Exception { ZooKeeper zk = createClient(); // wait for authFailed event from client's EventThread. synchronized(authFailed) { authFailed.wait(); } Assert.assertEquals(authFailed.get(),1); zk.close(); }
protected static String createBaseZNode() throws Exception { ZooKeeper zk = new ZooKeeper(ZKHOSTPORT, 30000, new MyWatcher()); String baseZnode = zk.create("/test-", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); zk.close(); return baseZnode; }
@PostConstruct public void init() { ZooKeeper zkClient = zookeeperConnManager.getZkClient(); try { /* 如果根节点不存在则创建 */ if (null == zkClient.exists(nodePath, false)) { zkClient.create(nodePath, new byte[] {}, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } /* 注册节点 */ zkClient.getChildren(nodePath, this); } catch (KeeperException | InterruptedException e) { log.error("error", e); } }
private void utestGet(int port) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper("127.0.0.1:" + port, CONNECTION_TIMEOUT, this); for (int i = 0; i < 10000; i++) { Stat stat = new Stat(); zk.getData("/" + i, true, stat); } zk.close(); }
private List<OpResult> multi(ZooKeeper zk, Iterable<Op> ops) throws KeeperException, InterruptedException { if (useAsync) { final MultiResult res = new MultiResult(); zk.multi(ops, new MultiCallback() { @Override public void processResult(int rc, String path, Object ctx, List<OpResult> opResults) { synchronized (res) { res.rc = rc; res.results = opResults; res.finished = true; res.notifyAll(); } } }, null); synchronized (res) { while (!res.finished) { res.wait(); } } if (KeeperException.Code.OK.intValue() != res.rc) { KeeperException ke = KeeperException.create(KeeperException.Code.get(res.rc)); throw ke; } return res.results; } else { return zk.multi(ops); } }