/** * This is a helper function for launching a set of servers * * @param numServers * @return * @throws IOException * @throws InterruptedException */ private Servers LaunchServers(int numServers) throws IOException, InterruptedException { int SERVER_COUNT = numServers; Servers svrs = new Servers(); final int clientPorts[] = new int[SERVER_COUNT]; StringBuilder sb = new StringBuilder(); for(int i = 0; i < SERVER_COUNT; i++) { clientPorts[i] = PortAssignment.unique(); sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n"); } String quorumCfgSection = sb.toString(); MainThread mt[] = new MainThread[SERVER_COUNT]; ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; for(int i = 0; i < SERVER_COUNT; i++) { mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection); mt[i].start(); zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); } waitForAll(zk, States.CONNECTED); svrs.mt = mt; svrs.zk = zk; return svrs; }
private void restartServer(Map<String, String> authConfigs, int index, ZooKeeper zk, CountdownWatcher watcher) throws IOException, KeeperException, InterruptedException, TimeoutException { LOG.info("Restarting server myid=" + index); MainThread m = shutdown(index); startServer(m, authConfigs); Assert.assertTrue("waiting for server" + index + "being up", ClientBase.waitForServerUp("127.0.0.1:" + m.getClientPort(), ClientBase.CONNECTION_TIMEOUT)); watcher.waitForConnected(ClientTest.CONNECTION_TIMEOUT); zk.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); }
@After public void tearDown() throws Exception { for (MainThread mainThread : mt) { mainThread.shutdown(); mainThread.deleteBaseDir(); } if (zk != null) { zk.close(); } }
private QuorumPeer getLeaderQuorumPeer(List<MainThread> mtList) { for (MainThread mt : mtList) { QuorumPeer quorumPeer = mt.getQuorumPeer(); if (null != quorumPeer && ServerState.LEADING == quorumPeer.getPeerState()) { return quorumPeer; } } return null; }
@After public void tearDown() throws Exception { for (MainThread mainThread : mt) { mainThread.shutdown(); mainThread.deleteBaseDir(); } }
protected void startServer(MainThread restartPeer, Map<String, String> authConfigs) throws IOException { MainThread mthread = new MainThread(restartPeer.getMyid(), restartPeer.getClientPort(), restartPeer.getQuorumCfgSection(), authConfigs); mt.add(mthread); mthread.start(); }
MainThread shutdown(int index) { MainThread mainThread = mt.get(index); try { mainThread.shutdown(); } catch (InterruptedException e) { } finally { mt.remove(index); } mainThread.deleteBaseDir(); return mainThread; }
public ZookeeperServerWrapper(int serverId, int portBase) throws IOException { clientPort = 8200 + serverId; // start client port on 8200 + serverId // start servers on portbase + 300 or + 400 (+serverId) String quorumCfgSection = "server.1=127.0.0.1:" + (portBase + 301) + ":" + (portBase + 401) + "\nserver.2=127.0.0.1:" + (portBase + 302) + ":" + (portBase + 402) + "\nserver.3=127.0.0.1:" + (portBase + 303) + ":" + (portBase + 403); server = new MainThread(serverId, clientPort, quorumCfgSection); }
/** * Rolling upgrade should do in three steps: * * step-1) Stop the server and set the flags and restart the server. * quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=false and quorum.auth.serverRequireSasl=false * Ensure that all the servers should complete this step. Now, move to next step. * * step-2) Stop the server one by one and change the flags and restart the server. * quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=true and quorum.auth.serverRequireSasl=false * Ensure that all the servers should complete this step. Now, move to next step. * * step-3) Stop the server one by one and change the flags and restart the server. * quorum.auth.enableSasl=true, quorum.auth.learnerRequireSasl=true and quorum.auth.serverRequireSasl=true * Now, all the servers are fully upgraded and running in secured mode. */ @Test(timeout = 90000) public void testRollingUpgrade() throws Exception { // Start peer0,1,2 servers with quorum.auth.enableSasl=false and // quorum.auth.learnerRequireSasl=false, quorum.auth.serverRequireSasl=false // Assume this is an existing cluster. Map<String, String> authConfigs = new HashMap<String, String>(); authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "false"); String connectStr = startQuorum(3, authConfigs, 0, false); CountdownWatcher watcher = new CountdownWatcher(); ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT, watcher); watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT); zk.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); //1. Upgrade peer0,1,2 with quorum.auth.enableSasl=true and // quorum.auth.learnerRequireSasl=false, quorum.auth.serverRequireSasl=false authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true"); authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "false"); authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "false"); restartServer(authConfigs, 0, zk, watcher); restartServer(authConfigs, 1, zk, watcher); restartServer(authConfigs, 2, zk, watcher); //2. Upgrade peer0,1,2 with quorum.auth.enableSasl=true and // quorum.auth.learnerRequireSasl=true, quorum.auth.serverRequireSasl=false authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true"); authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true"); authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "false"); restartServer(authConfigs, 0, zk, watcher); restartServer(authConfigs, 1, zk, watcher); restartServer(authConfigs, 2, zk, watcher); //3. Upgrade peer0,1,2 with quorum.auth.enableSasl=true and // quorum.auth.learnerRequireSasl=true, quorum.auth.serverRequireSasl=true authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true"); authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true"); authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true"); restartServer(authConfigs, 0, zk, watcher); restartServer(authConfigs, 1, zk, watcher); restartServer(authConfigs, 2, zk, watcher); //4. Restart peer2 with quorum.auth.learnerEnableSasl=false and // quorum.auth.serverRequireSasl=false. It should fail to join the // quorum as this needs auth. authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "false"); MainThread m = shutdown(2); startServer(m, authConfigs); Assert.assertFalse("waiting for server 2 being up", ClientBase .waitForServerUp("127.0.0.1:" + m.getClientPort(), 5000)); }
/** * Test to verify that non-auth enabled Observer server should be rejected * by the auth enabled quorum servers. */ @Test(timeout = 30000) public void testNonAuthEnabledObserverJoiningAuthEnabledQuorum() throws Exception { Map<String, String> authConfigs = new HashMap<String, String>(); authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true"); authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true"); authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true"); // Starting auth enabled 3-node cluster. int totalServerCount = 3; String connectStr = startQuorum(totalServerCount, authConfigs, totalServerCount, false); CountdownWatcher watcher = new CountdownWatcher(); zk = new ZooKeeper(connectStr.toString(), ClientBase.CONNECTION_TIMEOUT, watcher); watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT); zk.create("/myTestRoot", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); // Adding a non-auth enabled Observer to the 3-node auth cluster. String quorumCfgSection = mt.get(0).getQuorumCfgSection(); int observerMyid = totalServerCount + 1; StringBuilder newObsCfgSection = new StringBuilder(quorumCfgSection); newObsCfgSection.append("\n"); newObsCfgSection.append(String.format( "server.%d=localhost:%d:%d:observer", observerMyid, PortAssignment.unique(), PortAssignment.unique())); newObsCfgSection.append("\npeerType=observer"); newObsCfgSection.append("\n"); int clientPort = PortAssignment.unique(); newObsCfgSection.append("127.0.0.1:" + clientPort); MainThread mthread = new MainThread(observerMyid, clientPort, newObsCfgSection.toString()); mt.add(mthread); mthread.start(); boolean waitForServerUp = ClientBase.waitForServerUp( "127.0.0.1:" + clientPort, QuorumPeerTestBase.TIMEOUT); Assert.assertFalse( "Non-auth enabled Observer shouldn't be able join auth-enabled quorum", waitForServerUp); // quorum shouldn't be disturbed due to rejection. zk.create("/myTestRoot", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); }
/** * Verify the ability to start a cluster. */ @Test public void testQuorum() throws Exception { ClientBase.setupTestEnv(); final int CLIENT_PORT_QP1 = PortAssignment.unique(); final int CLIENT_PORT_QP2 = PortAssignment.unique(); String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique(); MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection); q1.start(); q2.start(); Assert.assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT)); Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this); zk.create("/foo_q1", "foobar1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Assert.assertEquals(new String(zk.getData("/foo_q1", null, null)), "foobar1"); zk.close(); zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT, this); zk.create("/foo_q2", "foobar2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Assert.assertEquals(new String(zk.getData("/foo_q2", null, null)), "foobar2"); zk.close(); q1.shutdown(); q2.shutdown(); Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT)); Assert.assertTrue("waiting for server 2 down", ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT)); }
/** * Verify handling of bad quorum address */ @Test public void testBadPeerAddressInQuorum() throws Exception { ClientBase.setupTestEnv(); // setup the logger to capture all logs Layout layout = Logger.getRootLogger().getAppender("CONSOLE").getLayout(); ByteArrayOutputStream os = new ByteArrayOutputStream(); WriterAppender appender = new WriterAppender(layout, os); appender.setThreshold(Level.WARN); Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum"); qlogger.addAppender(appender); try { final int CLIENT_PORT_QP1 = PortAssignment.unique(); final int CLIENT_PORT_QP2 = PortAssignment.unique(); String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\nserver.2=fee.fii.foo.fum:" + PortAssignment.unique() + ":" + PortAssignment.unique(); MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); q1.start(); boolean isup = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 5000); Assert.assertFalse("Server never came up", isup); q1.shutdown(); Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT)); } finally { qlogger.removeAppender(appender); } LineNumberReader r = new LineNumberReader(new StringReader(os.toString())); String line; boolean found = false; Pattern p = Pattern.compile(".*Cannot open channel to .* at election address .*"); while ((line = r.readLine()) != null) { found = p.matcher(line).matches(); if (found) { break; } } Assert.assertTrue("complains about host", found); }
/** * verify if bad packets are being handled properly * at the quorum port * @throws Exception */ @Test public void testBadPackets() throws Exception { ClientBase.setupTestEnv(); final int CLIENT_PORT_QP1 = PortAssignment.unique(); final int CLIENT_PORT_QP2 = PortAssignment.unique(); int electionPort1 = PortAssignment.unique(); int electionPort2 = PortAssignment.unique(); String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + electionPort1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + electionPort2; MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection); q1.start(); q2.start(); Assert.assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT)); Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT)); byte[] b = new byte[4]; int length = 1024*1024*1024; ByteBuffer buff = ByteBuffer.wrap(b); buff.putInt(length); buff.position(0); SocketChannel s = SocketChannel.open(new InetSocketAddress("127.0.0.1", electionPort1)); s.write(buff); s.close(); buff.position(0); s = SocketChannel.open(new InetSocketAddress("127.0.0.1", electionPort2)); s.write(buff); s.close(); ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this); zk.create("/foo_q1", "foobar1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Assert.assertEquals(new String(zk.getData("/foo_q1", null, null)), "foobar1"); zk.close(); q1.shutdown(); q2.shutdown(); }
/** * Verify handling of quorum defaults * * default electionAlg is fast leader election */ @Test public void testQuorumDefaults() throws Exception { ClientBase.setupTestEnv(); // setup the logger to capture all logs Layout layout = Logger.getRootLogger().getAppender("CONSOLE").getLayout(); ByteArrayOutputStream os = new ByteArrayOutputStream(); WriterAppender appender = new WriterAppender(layout, os); appender.setImmediateFlush(true); appender.setThreshold(Level.INFO); Logger zlogger = Logger.getLogger("org.apache.zookeeper"); zlogger.addAppender(appender); try { final int CLIENT_PORT_QP1 = PortAssignment.unique(); final int CLIENT_PORT_QP2 = PortAssignment.unique(); String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique(); MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection); q1.start(); q2.start(); Assert.assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT)); Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT)); q1.shutdown(); q2.shutdown(); Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT)); Assert.assertTrue("waiting for server 2 down", ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT)); } finally { zlogger.removeAppender(appender); } os.close(); LineNumberReader r = new LineNumberReader(new StringReader(os.toString())); String line; boolean found = false; Pattern p = Pattern.compile(".*FastLeaderElection.*"); while ((line = r.readLine()) != null) { found = p.matcher(line).matches(); if (found) { break; } } Assert.assertTrue("fastleaderelection used", found); }
/** * Verify the ability to start a cluster. */ @Test public void testQuorum() throws Exception { LOG.info("STARTING " + getName()); ClientBase.setupTestEnv(); final int CLIENT_PORT_QP1 = PortAssignment.unique(); final int CLIENT_PORT_QP2 = PortAssignment.unique(); String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique(); MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection); q1.start(); q2.start(); assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT)); assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this); zk.create("/foo_q1", "foobar1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertEquals(new String(zk.getData("/foo_q1", null, null)), "foobar1"); zk.close(); zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT, this); zk.create("/foo_q2", "foobar2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertEquals(new String(zk.getData("/foo_q2", null, null)), "foobar2"); zk.close(); q1.shutdown(); q2.shutdown(); assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT)); assertTrue("waiting for server 2 down", ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT)); }
/** * Verify handling of bad quorum address */ @Test public void testBadPeerAddressInQuorum() throws Exception { LOG.info("STARTING " + getName()); ClientBase.setupTestEnv(); // setup the logger to capture all logs Layout layout = Logger.getRootLogger().getAppender("CONSOLE").getLayout(); ByteArrayOutputStream os = new ByteArrayOutputStream(); WriterAppender appender = new WriterAppender(layout, os); appender.setThreshold(Level.WARN); Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum"); qlogger.addAppender(appender); try { final int CLIENT_PORT_QP1 = PortAssignment.unique(); final int CLIENT_PORT_QP2 = PortAssignment.unique(); String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\nserver.2=fee.fii.foo.fum:" + PortAssignment.unique() + ":" + PortAssignment.unique(); MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); q1.start(); boolean isup = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 5000); assertFalse("Server never came up", isup); q1.shutdown(); assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT)); } finally { qlogger.removeAppender(appender); } LineNumberReader r = new LineNumberReader(new StringReader(os.toString())); String line; boolean found = false; Pattern p = Pattern.compile(".*Cannot open channel to .* at election address .*"); while ((line = r.readLine()) != null) { found = p.matcher(line).matches(); if (found) { break; } } assertTrue("complains about host", found); }
/** * verify if bad packets are being handled properly * at the quorum port * @throws Exception */ public void testBadPackets() throws Exception { LOG.info("STARTING " + getName()); ClientBase.setupTestEnv(); final int CLIENT_PORT_QP1 = PortAssignment.unique(); final int CLIENT_PORT_QP2 = PortAssignment.unique(); int electionPort1 = PortAssignment.unique(); int electionPort2 = PortAssignment.unique(); String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + electionPort1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + electionPort2; MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection); q1.start(); q2.start(); assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT)); assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT)); byte[] b = new byte[4]; int length = 1024*1024*1024; ByteBuffer buff = ByteBuffer.wrap(b); buff.putInt(length); buff.position(0); SocketChannel s = SocketChannel.open(new InetSocketAddress("127.0.0.1", electionPort1)); s.write(buff); s.close(); buff.position(0); s = SocketChannel.open(new InetSocketAddress("127.0.0.1", electionPort2)); s.write(buff); s.close(); ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this); zk.create("/foo_q1", "foobar1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertEquals(new String(zk.getData("/foo_q1", null, null)), "foobar1"); zk.close(); q1.shutdown(); q2.shutdown(); }
/** * Verify handling of quorum defaults * * default electionAlg is fast leader election */ @Test public void testQuorumDefaults() throws Exception { LOG.info("STARTING " + getName()); ClientBase.setupTestEnv(); // setup the logger to capture all logs Layout layout = Logger.getRootLogger().getAppender("CONSOLE").getLayout(); ByteArrayOutputStream os = new ByteArrayOutputStream(); WriterAppender appender = new WriterAppender(layout, os); appender.setImmediateFlush(true); appender.setThreshold(Level.INFO); Logger zlogger = Logger.getLogger("org.apache.zookeeper"); zlogger.addAppender(appender); try { final int CLIENT_PORT_QP1 = PortAssignment.unique(); final int CLIENT_PORT_QP2 = PortAssignment.unique(); String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique(); MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection); q1.start(); q2.start(); assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT)); assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT)); q1.shutdown(); q2.shutdown(); assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT)); assertTrue("waiting for server 2 down", ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT)); } finally { zlogger.removeAppender(appender); } os.close(); LineNumberReader r = new LineNumberReader(new StringReader(os.toString())); String line; boolean found = false; Pattern p = Pattern.compile(".*FastLeaderElection.*"); while ((line = r.readLine()) != null) { found = p.matcher(line).matches(); if (found) { break; } } assertTrue("fastleaderelection used", found); }