/** * Test case verifying TxnLogProposalIterator closure. */ @Test public void testTxnLogProposalIteratorClosure() throws Exception { long peerZxid; // CommmitedLog is empty, we will use txnlog up to lastProcessZxid db = new MockZKDatabase(null) { @Override public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid, long limit) { return TxnLogProposalIterator.EMPTY_ITERATOR; } }; db.lastProcessedZxid = 7; db.txnLog.add(createProposal(2)); db.txnLog.add(createProposal(3)); // Peer zxid peerZxid = 4; assertTrue("Couldn't identify snapshot transfer!", learnerHandler.syncFollower(peerZxid, db, leader)); reset(); }
/** * Walk through the target peer commmittedLog. * @param sessionId * @param peerId */ private void validateRequestLog(long sessionId, int peerId) { String session = Long.toHexString(sessionId); LOG.info("Searching for txn of session 0x " + session + " on peer " + peerId); String peerType = peerId == qb.getLeaderIndex() ? "leader" : "follower"; QuorumPeer peer = qb.getPeerList().get(peerId); ZKDatabase db = peer.getActiveServer().getZKDatabase(); for (Proposal p : db.getCommittedLog()) { Assert.assertFalse("Should not see " + TraceFormatter.op2String(p.request.type) + " request from local session 0x" + session + " on the " + peerType, p.request.sessionId == sessionId); } }
public synchronized LinkedList<Proposal> getCommittedLog() { ReadLock rl = logLock.readLock(); // only make a copy if this thread isn't already holding a lock if(logLock.getReadHoldCount() <=0) { try { rl.lock(); return new LinkedList<Proposal>(this.committedLog); } finally { rl.unlock(); } } return this.committedLog; }
/** * maintains a list of last <i>committedLog</i> * or so committed requests. This is used for * fast follower synchronization. * @param request committed request */ public void addCommittedProposal(Request request) { WriteLock wl = logLock.writeLock(); try { wl.lock(); if (committedLog.size() > commitLogCount) { committedLog.removeFirst(); minCommittedLog = committedLog.getFirst().packet.getZxid(); } if (committedLog.size() == 0) { minCommittedLog = request.zxid; maxCommittedLog = request.zxid; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); try { request.hdr.serialize(boa, "hdr"); if (request.txn != null) { request.txn.serialize(boa, "txn"); } baos.close(); } catch (IOException e) { LOG.error("This really should be impossible", e); } QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos.toByteArray(), null); Proposal p = new Proposal(); p.packet = pp; p.request = request; committedLog.add(p); maxCommittedLog = p.packet.getZxid(); } finally { wl.unlock(); } }
/** * Proposal returned by this iterator has request part set to null, since * it is not used for follower sync-up. */ @Override public Proposal next() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); Proposal p = new Proposal(); try { TxnHeader hdr = itr.getHeader(); Record txn = itr.getTxn(); hdr.serialize(boa, "hdr"); if (txn != null) { txn.serialize(boa, "txn"); } baos.close(); QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, itr.getHeader() .getZxid(), baos.toByteArray(), null); p.packet = pp; p.request = null; // This is the only place that can throw IO exception hasNext = itr.next(); } catch (IOException e) { LOG.error("Unable to read txnlog from disk", e); hasNext = false; } return p; }
public synchronized List<Proposal> getCommittedLog() { ReadLock rl = logLock.readLock(); // only make a copy if this thread isn't already holding a lock if(logLock.getReadHoldCount() <=0) { try { rl.lock(); return new LinkedList<Proposal>(this.committedLog); } finally { rl.unlock(); } } return this.committedLog; }
/** * maintains a list of last <i>committedLog</i> * or so committed requests. This is used for * fast follower synchronization. * @param request committed request */ public void addCommittedProposal(Request request) { WriteLock wl = logLock.writeLock(); try { wl.lock(); if (committedLog.size() > commitLogCount) { committedLog.removeFirst(); minCommittedLog = committedLog.getFirst().packet.getZxid(); } if (committedLog.isEmpty()) { minCommittedLog = request.zxid; maxCommittedLog = request.zxid; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); try { request.getHdr().serialize(boa, "hdr"); if (request.getTxn() != null) { request.getTxn().serialize(boa, "txn"); } baos.close(); } catch (IOException e) { LOG.error("This really should be impossible", e); } QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos.toByteArray(), null); Proposal p = new Proposal(); p.packet = pp; p.request = request; committedLog.add(p); maxCommittedLog = p.packet.getZxid(); } finally { wl.unlock(); } }
public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid, long limit) { if (peerZxid >= txnLog.peekFirst().packet.getZxid()) { return txnLog.iterator(); } else { return (new LinkedList<Proposal>()).iterator(); } }
Proposal createProposal(long zxid) { Proposal p = new Proposal(); p.packet = new QuorumPacket(); p.packet.setZxid(zxid); p.packet.setType(Leader.PROPOSAL); return p; }
/** * test the purge * @throws Exception an exception might be thrown here */ @Test public void testRestoreCommittedLog() throws Exception { File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); try { for (int i = 0; i< 2000; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } finally { zk.close(); } f.shutdown(); zks.shutdown(); Assert.assertTrue("waiting for server to shutdown", ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT)); // start server again zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); zks.startdata(); LinkedList<Proposal> committedLog = zks.getZKDatabase().getCommittedLog(); int logsize = committedLog.size(); LOG.info("committedLog size = " + logsize); Assert.assertTrue("log size != 0", (logsize != 0)); zks.shutdown(); }
/** * test the purge * @throws Exception an exception might be thrown here */ @Test public void testRestoreCommittedLog() throws Exception { File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); try { for (int i = 0; i< 2000; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } finally { zk.close(); } f.shutdown(); Assert.assertTrue("waiting for server to shutdown", ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT)); // start server again zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); zks.startdata(); LinkedList<Proposal> committedLog = zks.getZKDatabase().getCommittedLog(); int logsize = committedLog.size(); LOG.info("committedLog size = " + logsize); Assert.assertTrue("log size != 0", (logsize != 0)); }
/** * test the purge * @throws Exception an exception might be thrown here */ @Test public void testRestoreCommittedLog() throws Exception { File tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); try { for (int i = 0; i< 2000; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } finally { zk.close(); } f.shutdown(); zks.shutdown(); Assert.assertTrue("waiting for server to shutdown", ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT)); // start server again zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); zks.startdata(); List<Proposal> committedLog = zks.getZKDatabase().getCommittedLog(); int logsize = committedLog.size(); LOG.info("committedLog size = " + logsize); Assert.assertTrue("log size != 0", (logsize != 0)); zks.shutdown(); }