/** * ZOOKEEPR-1382 test class */ @Test public void testWatchesWithClientSessionTimeout() throws Exception { NIOServerCnxnFactory serverCnxnFactory = new NIOServerCnxnFactory(); ZKDatabase database = new ZKDatabase(null); database.setlastProcessedZxid(2L); QuorumPeer quorumPeer = mock(QuorumPeer.class); FileTxnSnapLog logfactory = mock(FileTxnSnapLog.class); // Directories are not used but we need it to avoid NPE when(logfactory.getDataDir()).thenReturn(new File("/tmp")); when(logfactory.getSnapDir()).thenReturn(new File("/tmp")); FollowerZooKeeperServer fzks = null; try { fzks = new FollowerZooKeeperServer(logfactory, quorumPeer, null, database); fzks.startup(); fzks.setServerCnxnFactory(serverCnxnFactory); quorumPeer.follower = new MyFollower(quorumPeer, fzks); final SelectionKey sk = new FakeSK(); // Simulate a socket channel between a client and a follower final SocketChannel socketChannel = createClientSocketChannel(); // Create the NIOServerCnxn that will handle the client requests final MockNIOServerCnxn nioCnxn = new MockNIOServerCnxn(fzks, socketChannel, sk, serverCnxnFactory); // Send the connection request as a client do nioCnxn.doIO(sk); // Send the invalid session packet to the follower QuorumPacket qp = createInvalidSessionPacket(); quorumPeer.follower.processPacket(qp); // OK, now the follower knows that the session is invalid, let's try // to // send it the watches nioCnxn.doIO(sk); // wait for the the request processor to do his job Thread.sleep(1000L); // Session has not been re-validated ! // If session has not been validated, there must be NO watches int watchCount = database.getDataTree().getWatchCount(); LOG.info("watches = " + watchCount); assertEquals(0, watchCount); } finally { if (fzks != null) { fzks.shutdown(); } } }