/** * Tests that when a quorum of followers send LearnerInfo but do not ack the epoch (which is sent * by the leader upon receipt of LearnerInfo from a quorum), the leader does not start using this epoch * as it would in the normal case (when a quorum do ack the epoch). This tests ZK-1192 * @throws Exception */ @Test public void testAbandonBeforeACKEpoch() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException { /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000); byte liBytes[] = new byte[12]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Thread.sleep(l.self.getInitLimit()*l.self.getTickTime() + 5000); // The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced Assert.assertEquals(0, l.self.getCurrentEpoch()); } }); }
/** * Tests that when a quorum of followers send LearnerInfo but do not ack the epoch (which is sent * by the leader upon receipt of LearnerInfo from a quorum), the leader does not start using this epoch * as it would in the normal case (when a quorum do ack the epoch). This tests ZK-1192 * @throws Exception */ @Test public void testAbandonBeforeACKEpoch() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException { /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000, 0); byte liBytes[] = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Thread.sleep(l.self.getInitLimit()*l.self.getTickTime() + 5000); // The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced Assert.assertEquals(0, l.self.getCurrentEpoch()); } }); }
@Test public void testUnnecessarySnap() throws Exception { testPopulatedLeaderConversation(new PopulatedLeaderConversation() { @Override public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long zxid) throws Exception { Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000); byte liBytes[] = new byte[12]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Assert.assertEquals(2, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); byte epochBytes[] = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); wrappedEpochBytes.putInt(1); qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); } }, 2); }
@Test public void testLeaderBehind() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException { /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000); byte liBytes[] = new byte[12]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); /* we are going to say we last acked epoch 20 */ QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, ZxidUtils.makeZxid(20, 0), liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid()); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.UPTODATE, qp.getType()); } }); }
@Test public void testUnnecessarySnap() throws Exception { testPopulatedLeaderConversation(new PopulatedLeaderConversation() { @Override public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long zxid) throws Exception { Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000, 0); byte liBytes[] = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Assert.assertEquals(2, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); byte epochBytes[] = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); wrappedEpochBytes.putInt(1); qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); } }, 2); }
@Test public void testLeaderBehind() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException { /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000, 0); byte liBytes[] = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); /* we are going to say we last acked epoch 20 */ QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, ZxidUtils.makeZxid(20, 0), liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid()); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.UPTODATE, qp.getType()); } }); }
@Test public void testNormalRun() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException { Assert.assertEquals(0, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000); byte liBytes[] = new byte[12]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.UPTODATE, qp.getType()); } }); }
@Test public void testTxnTimeout() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException { Assert.assertEquals(0, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); LearnerInfo li = new LearnerInfo(1, 0x10000); byte liBytes[] = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.UPTODATE, qp.getType()); l.propose(createNodeRequest(l.zk.getZxid())); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.PROPOSAL, qp.getType()); LOG.info("Proposal sent."); for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) { try { ia.readRecord(qp, null); LOG.info("Ping received: " + i); qp = new QuorumPacket(Leader.PING, qp.getZxid(), "".getBytes(), null); oa.writeRecord(qp, null); } catch (EOFException e) { return; } } Assert.fail("Connection hasn't been closed by leader after transaction times out."); } private Request createNodeRequest(long zxid) throws IOException { TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.create); CreateTxn ct = new CreateTxn("/foo", "data".getBytes(), null, true, 0); ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeRecord(hdr, "header"); boa.writeRecord(ct, "txn"); baos.close(); Request rq = new Request(null, 1, 1, ZooDefs.OpCode.create, ByteBuffer.wrap(baos.toByteArray()), null); rq.zxid = zxid; rq.hdr = hdr; rq.txn = ct; return rq; } }); }
@Test public void testNormalRun() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException { Assert.assertEquals(0, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000, 0); byte liBytes[] = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.UPTODATE, qp.getType()); } }); }
@Test public void testTxnTimeout() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException { Assert.assertEquals(0, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); LearnerInfo li = new LearnerInfo(1, 0x10000, 0); byte liBytes[] = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.UPTODATE, qp.getType()); long zxid = l.zk.getZxid(); l.propose(new Request(1, 1, ZooDefs.OpCode.create, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.create), new CreateTxn("/test", "hola".getBytes(), null, true, 0), zxid)); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.PROPOSAL, qp.getType()); LOG.info("Proposal sent."); for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) { try { ia.readRecord(qp, null); LOG.info("Ping received: " + i); qp = new QuorumPacket(Leader.PING, qp.getZxid(), "".getBytes(), null); oa.writeRecord(qp, null); } catch (EOFException e) { return; } } Assert.fail("Connection hasn't been closed by leader after transaction times out."); } }); }