private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { int iterations = 10; boolean someoneNotConnected = true; while(someoneNotConnected) { if (iterations-- == 0) { ClientBase.logAllStackTraces(); throw new RuntimeException("Waiting too long"); } someoneNotConnected = false; for(ZooKeeper zk: zks) { if (zk.getState() != state) { someoneNotConnected = true; } } Thread.sleep(1000); } }
/** * This is a helper function for launching a set of servers * * @param numServers * @return * @throws IOException * @throws InterruptedException */ private Servers LaunchServers(int numServers) throws IOException, InterruptedException { int SERVER_COUNT = numServers; Servers svrs = new Servers(); final int clientPorts[] = new int[SERVER_COUNT]; StringBuilder sb = new StringBuilder(); for(int i = 0; i < SERVER_COUNT; i++) { clientPorts[i] = PortAssignment.unique(); sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n"); } String quorumCfgSection = sb.toString(); MainThread mt[] = new MainThread[SERVER_COUNT]; ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; for(int i = 0; i < SERVER_COUNT; i++) { mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection); mt[i].start(); zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); } waitForAll(zk, States.CONNECTED); svrs.mt = mt; svrs.zk = zk; return svrs; }
/** * Tests a situation when client firstly connects to a read-only server and * then connects to a majority server. Transition should be transparent for * the user. */ @Test(timeout = 90000) public void testSessionEstablishment() throws Exception { qu.shutdown(2); CountdownWatcher watcher = new CountdownWatcher(); ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true); watcher.waitForConnected(CONNECTION_TIMEOUT); Assert.assertSame("should be in r/o mode", States.CONNECTEDREADONLY, zk .getState()); long fakeId = zk.getSessionId(); watcher.reset(); qu.start(2); Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp( "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT)); watcher.waitForConnected(CONNECTION_TIMEOUT); zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Assert.assertFalse("fake session and real session have same id", zk .getSessionId() == fakeId); zk.close(); }
private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { int iterations = ClientBase.CONNECTION_TIMEOUT / 1000; boolean someoneNotConnected = true; while (someoneNotConnected) { if (iterations-- == 0) { ClientBase.logAllStackTraces(); throw new RuntimeException("Waiting too long"); } someoneNotConnected = false; for (ZooKeeper zk : zks) { if (zk.getState() != state) { someoneNotConnected = true; break; } } Thread.sleep(1000); } }
/** * This is a helper function for launching a set of servers * * @param numServers * @return * @throws IOException * @throws InterruptedException */ private Servers LaunchServers(int numServers) throws IOException, InterruptedException { int SERVER_COUNT = numServers; Servers svrs = new Servers(); final int clientPorts[] = new int[SERVER_COUNT]; StringBuilder sb = new StringBuilder(); for (int i = 0; i < SERVER_COUNT; i++) { clientPorts[i] = PortAssignment.unique(); sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n"); } String quorumCfgSection = sb.toString(); MainThread mt[] = new MainThread[SERVER_COUNT]; ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; for (int i = 0; i < SERVER_COUNT; i++) { mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection); mt[i].start(); zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); } waitForAll(zk, States.CONNECTED); svrs.mt = mt; svrs.zk = zk; return svrs; }
@Test public void testClientRetry() throws IOException, InterruptedException, TimeoutException{ CountdownWatcher cdw1 = new CountdownWatcher(); CountdownWatcher cdw2 = new CountdownWatcher(); ZooKeeper zk = new ZooKeeper(hostPort, 10000, cdw1); try { cdw1.waitForConnected(CONNECTION_TIMEOUT); ZooKeeper zk2 = new ZooKeeper(hostPort, 10000, cdw2); try { States s1 = zk.getState(); States s2 = zk2.getState(); Assert.assertSame(s1,States.CONNECTED); Assert.assertSame(s2,States.CONNECTING); cdw1.reset(); zk.close(); cdw1.waitForDisconnected(CONNECTION_TIMEOUT); cdw2.waitForConnected(CONNECTION_TIMEOUT); Assert.assertSame(zk2.getState(),States.CONNECTED); } finally { zk2.close(); } } finally { zk.close(); } }
@Test public void testClientRetry() throws IOException, InterruptedException, TimeoutException{ CountdownWatcher cdw1 = new CountdownWatcher(); CountdownWatcher cdw2 = new CountdownWatcher(); ZooKeeper zk = new ZooKeeper(hostPort, 10000, cdw1); try { cdw1.waitForConnected(CONNECTION_TIMEOUT); ZooKeeper zk2 = new ZooKeeper(hostPort, 10000, cdw2); try { States s1 = zk.getState(); States s2 = zk2.getState(); Assert.assertSame(s1,States.CONNECTED); Assert.assertSame(s2,States.CONNECTING); cdw1.reset(); cdw1.waitForDisconnected(CONNECTION_TIMEOUT); cdw2.waitForConnected(CONNECTION_TIMEOUT); Assert.assertSame(zk2.getState(),States.CONNECTED); } finally { zk2.close(); } } finally { zk.close(); } }
@Test public void shouldWaitForConnection() { ZooKeeperConfiguration config = new ZooKeeperConfiguration(); config.addZookeeperServer("localhost:" + getServerPort()); ZooKeeperComponent component = new ZooKeeperComponent(config); component.setConfiguration(config); component.setCamelContext(context); ZooKeeperEndpoint zep = new ZooKeeperEndpoint("zookeeper:someserver/this/is/a/path", component, config); ZooKeeperConnectionManager zkcm = new ZooKeeperConnectionManager(zep); ZooKeeper zk = zkcm.getConnection(); zk.getState(); assertEquals(States.CONNECTED, zk.getState()); }
/** * @param zk * @throws InterruptedException */ private void waitForConnection(final ZooKeeper zk) throws InterruptedException { Assert.assertTrue(await(new Check() { public boolean doCheck() { if (zk.getState() == States.CONNECTED) { List<String> children; try { children = zk.getChildren("/", false); return children.size() != 0; } catch (Exception e) { // silently fail } } return false; } }, TimeUnit.MINUTES.toMillis(2))); }
/** * Tests a situation when client firstly connects to a read-only server and * then connects to a majority server. Transition should be transparent for * the user. */ @Test public void testSessionEstablishment() throws Exception { qu.shutdown(2); CountdownWatcher watcher = new CountdownWatcher(); ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true); watcher.waitForConnected(CONNECTION_TIMEOUT); Assert.assertSame("should be in r/o mode", States.CONNECTEDREADONLY, zk .getState()); long fakeId = zk.getSessionId(); watcher.reset(); qu.start(2); Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp( "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT)); watcher.waitForConnected(CONNECTION_TIMEOUT); zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Assert.assertFalse("fake session and real session have same id", zk .getSessionId() == fakeId); zk.close(); }
private void startConnect() throws IOException { if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } state = States.CONNECTING; InetSocketAddress addr; if (rwServerAddress != null) { addr = rwServerAddress; rwServerAddress = null; } else { addr = hostProvider.next(1000); } LOG.info("Opening socket connection to server " + addr); setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); clientCnxnSocket.connect(addr); }
private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { int iterations = 10; boolean someoneNotConnected = true; while(someoneNotConnected) { if (iterations-- == 0) { throw new RuntimeException("Waiting too long"); } someoneNotConnected = false; for(ZooKeeper zk: zks) { if (zk.getState() != state) { someoneNotConnected = true; } } Thread.sleep(1000); } }
/** * Checks whether the broker is allowed to do read-write operations based on the existence of a node in global * zookeeper. * * @throws WebApplicationException * if broker has a read only access if broker is not connected to the global zookeeper */ public void validatePoliciesReadOnlyAccess() { boolean arePoliciesReadOnly = true; try { arePoliciesReadOnly = globalZkCache().exists(POLICIES_READONLY_FLAG_PATH); } catch (Exception e) { log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e); throw new RestException(e); } if (arePoliciesReadOnly) { log.debug("Policies are read-only. Broker cannot do read-write operations"); throw new RestException(Status.FORBIDDEN, "Broker is forbidden to do read-write operations"); } else { // Make sure the broker is connected to the global zookeeper before writing. If not, throw an exception. if (globalZkCache().getZooKeeper().getState() != States.CONNECTED) { log.debug("Broker is not connected to the global zookeeper"); throw new RestException(Status.PRECONDITION_FAILED, "Broker needs to be connected to global zookeeper before making a read-write operation"); } else { // Do nothing, just log the message. log.debug("Broker is allowed to make read-write operations"); } } }
@Override public void shutdown(int exitCode) { try { // Try to close ZK session to ensure all ephemeral locks gets released immediately if (service != null) { if (service.getZkClient().getState() != States.CLOSED) { service.getZkClient().close(); } } } catch (Exception e) { LOG.warn(e.getMessage(), e); } LOG.info("Invoking Runtime.halt({})", exitCode); immediateFlushBufferedLogs(); Runtime.getRuntime().halt(exitCode); }
public synchronized void connect(final long maxMsToWaitUntilConnected, Watcher watcher) { if (_eventThread != null) { return; } boolean started = false; try { getEventLock().lockInterruptibly(); setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start();//这样的 线程很可能会直接退回 _connection.connect(watcher); logger.debug("Awaiting connection to Zookeeper server: " + maxMsToWaitUntilConnected); if (!waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS)) { throw new ZkTimeoutException(String.format("Unable to connect to zookeeper server[%s] within timeout %dms", _connection.getServers(), maxMsToWaitUntilConnected)); } started = true; } catch (InterruptedException e) { States state = _connection.getZookeeperState(); throw new IllegalStateException("Not connected with zookeeper server yet. Current state is " + state); } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }
/** * @see org.apache.zookeeper.ZooKeeper#getState() */ public States getState() { if (_ZooKeeper == null) { return States.CLOSED; } return _ZooKeeper.getState(); }
/** * TODO: Comment. * */ protected void initPropertiesSectionFromModel() { Table table = getPropertiesSectionTable(); table.removeAll(); Map<String, String> properties = getZooKeeperConnectionProperties(); for (String key : properties.keySet()) { TableItem item = new TableItem(table, SWT.NONE); String value = properties.get(key); item.setText(0, key); item.setText(1, value); Color valueTextColor = table.getForeground(); if (key.equals(ZooKeeperConnectionModelElementType.PROPERTY_NAME_STATE)) { if (!States.CONNECTED.name().equals(value)) { valueTextColor = table.getDisplay().getSystemColor(SWT.COLOR_RED); } // else { // valueTextColor = table.getDisplay().getSystemColor(SWT.COLOR_GREEN); // } } item.setForeground(1, valueTextColor); } packTable(table, PROPERTIES_COLUMN_WIDTHS); }
private void startConnect() throws IOException { state = States.CONNECTING; InetSocketAddress addr; if (rwServerAddress != null) { addr = rwServerAddress; rwServerAddress = null; } else { addr = hostProvider.next(1000); } setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); if (ZooKeeperSaslClient.isEnabled()) { try { String principalUserName = System.getProperty( ZK_SASL_CLIENT_USERNAME, "zookeeper"); zooKeeperSaslClient = new ZooKeeperSaslClient( principalUserName+"/"+addr.getHostName()); } catch (LoginException e) { // An authentication error occurred when the SASL client tried to initialize: // for Kerberos this means that the client failed to authenticate with the KDC. // This is different from an authentication error that occurs during communication // with the Zookeeper server, which is handled below. LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } } logStartConnect(addr); clientCnxnSocket.connect(addr); }
/** * Callback invoked by the ClientCnxnSocket once a connection has been * established. * * @param _negotiatedSessionTimeout * @param _sessionId * @param _sessionPasswd * @param isRO * @throws IOException */ void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException { negotiatedSessionTimeout = _negotiatedSessionTimeout; if (negotiatedSessionTimeout <= 0) { state = States.CLOSED; eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); eventThread.queueEventOfDeath(); String warnInfo; warnInfo = "Unable to reconnect to ZooKeeper service, session 0x" + Long.toHexString(sessionId) + " has expired"; LOG.warn(warnInfo); throw new SessionExpiredException(warnInfo); } if (!readOnly && isRO) { LOG.error("Read/write client got connected to read-only server"); } readTimeout = negotiatedSessionTimeout * 2 / 3; connectTimeout = negotiatedSessionTimeout / hostProvider.size(); hostProvider.onConnected(); sessionId = _sessionId; sessionPasswd = _sessionPasswd; state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED; seenRwServerBefore |= !isRO; LOG.info("Session establishment complete on server " + clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x" + Long.toHexString(sessionId) + ", negotiated timeout = " + negotiatedSessionTimeout + (isRO ? " (READ-ONLY mode)" : "")); KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected; eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, eventState, null)); }