/** * get the last zxid that was logged in the transaction logs * @return the last zxid logged in the transaction logs */ public long getLastLoggedZxid() { File[] files = getLogFiles(logDir.listFiles(), 0); long maxLog=files.length>0? Util.getZxidFromName(files[files.length-1].getName(),"log"):-1; // if a log file is more recent we must scan it to find // the highest zxid long zxid = maxLog; TxnIterator itr = null; try { FileTxnLog txn = new FileTxnLog(logDir); itr = txn.read(maxLog); while (true) { if(!itr.next()) break; TxnHeader hdr = itr.getHeader(); zxid = hdr.getZxid(); } } catch (IOException e) { LOG.warn("Unexpected exception", e); } finally { close(itr); } return zxid; }
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = hdr.getType(); long sessionId = hdr.getClientId(); rc = getZKDatabase().processTxn(hdr, txn); if (opCode == OpCode.createSession) { if (txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; sessionTracker.addSession(sessionId, cst .getTimeOut()); } else { LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString()); } } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } return rc; }
private void addRequestToSyncProcessor() { long zxid = ZxidUtils.makeZxid(3, 7); TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.setData); Record txn = new SetDataTxn("/foo" + zxid, new byte[0], 1); byte[] buf; try { buf = Util.marshallTxnEntry(hdr, txn); } catch (IOException e) { LOG.error("IOException while adding request to SyncRequestProcessor", e); Assert.fail("IOException while adding request to SyncRequestProcessor!"); return; } NettyServerCnxnFactory factory = new NettyServerCnxnFactory(); final MockNettyServerCnxn nettyCnxn = new MockNettyServerCnxn(null, this, factory); Request req = new Request(nettyCnxn, 1, 1, ZooDefs.OpCode.setData, ByteBuffer.wrap(buf), null); req.hdr = hdr; req.txn = txn; syncProcessor.processRequest(req); }
/** * Simulates ZOOKEEPER-1069 and verifies that flush() before padLogFile * fixes it. */ @Test public void testPad() throws Exception { File tmpDir = ClientBase.createTmpDir(); FileTxnLog txnLog = new FileTxnLog(tmpDir); TxnHeader txnHeader = new TxnHeader(0xabcd, 0x123, 0x123, System.currentTimeMillis(), OpCode.create); Record txn = new CreateTxn("/Test", new byte[0], null, false, 1); txnLog.append(txnHeader, txn); FileInputStream in = new FileInputStream(tmpDir.getPath() + "/log." + Long.toHexString(txnHeader.getZxid())); BinaryInputArchive ia = BinaryInputArchive.getArchive(in); FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); LOG.info("Received magic : " + header.getMagic() + " Expected : " + FileTxnLog.TXNLOG_MAGIC); Assert.assertTrue("Missing magic number ", header.getMagic() == FileTxnLog.TXNLOG_MAGIC); }
@Test public void testGetTxnLogSyncElapsedTime() throws IOException { File tmpDir = ClientBase.createEmptyTestDir(); FileTxnSnapLog fileTxnSnapLog = new FileTxnSnapLog(new File(tmpDir, "data"), new File(tmpDir, "data_txnlog")); TxnHeader hdr = new TxnHeader(1, 1, 1, 1, ZooDefs.OpCode.setData); Record txn = new SetDataTxn("/foo", new byte[0], 1); Request req = new Request(0, 0, 0, hdr, txn, 0); try { fileTxnSnapLog.append(req); fileTxnSnapLog.commit(); long syncElapsedTime = fileTxnSnapLog.getTxnLogElapsedSyncTime(); Assert.assertNotEquals("Did not update syncElapsedTime!", -1L, syncElapsedTime); } finally { fileTxnSnapLog.close(); } }
private void attemptAutoCreateDb(File dataDir, File snapDir, Map<Long, Integer> sessions, String priorAutocreateDbValue, String autoCreateValue, long expectedValue) throws IOException { sessions.clear(); System.setProperty(FileTxnSnapLog.ZOOKEEPER_DB_AUTOCREATE, autoCreateValue); FileTxnSnapLog fileTxnSnapLog = new FileTxnSnapLog(dataDir, snapDir); try { long zxid = fileTxnSnapLog.restore(new DataTree(), sessions, new FileTxnSnapLog.PlayBackListener() { @Override public void onTxnLoaded(TxnHeader hdr, Record rec) { // empty by default } }); Assert.assertEquals("unexpected zxid", expectedValue, zxid); } finally { if (priorAutocreateDbValue == null) { System.clearProperty(FileTxnSnapLog.ZOOKEEPER_DB_AUTOCREATE); } else { System.setProperty(FileTxnSnapLog.ZOOKEEPER_DB_AUTOCREATE, priorAutocreateDbValue); } } }
/** * Simulates ZOOKEEPER-1069 and verifies that flush() before padLogFile * fixes it. */ @Test public void testPad() throws Exception { File tmpDir = ClientBase.createTmpDir(); FileTxnLog txnLog = new FileTxnLog(tmpDir); TxnHeader txnHeader = new TxnHeader(0xabcd, 0x123, 0x123, Time.currentElapsedTime(), OpCode.create); Record txn = new CreateTxn("/Test", new byte[0], null, false, 1); txnLog.append(txnHeader, txn); FileInputStream in = new FileInputStream(tmpDir.getPath() + "/log." + Long.toHexString(txnHeader.getZxid())); BinaryInputArchive ia = BinaryInputArchive.getArchive(in); FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); LOG.info("Received magic : " + header.getMagic() + " Expected : " + FileTxnLog.TXNLOG_MAGIC); Assert.assertTrue("Missing magic number ", header.getMagic() == FileTxnLog.TXNLOG_MAGIC); }
private Document makeDocument(TxnHeader header, EntryTypes type, AtomicInteger count, AtomicLong from, AtomicLong to) { count.incrementAndGet(); if ( header.getTime() < from.get() ) { from.set(header.getTime()); } if ( header.getTime() > to.get() ) { to.set(header.getTime()); } NumericField dateField = new NumericField(FieldNames.DATE, Field.Store.YES, true); dateField.setLongValue(header.getTime()); Document document = new Document(); document.add(new Field(FieldNames.TYPE, Integer.toString(type.getId()), Field.Store.YES, Field.Index.NOT_ANALYZED)); document.add(dateField); return document; }
/** * Simulates ZOOKEEPER-1069 and verifies that flush() before padLogFile * fixes it. */ @Test public void testPad() throws Exception { File tmpDir = ClientBase.createTmpDir(); FileTxnLog txnLog = new FileTxnLog(tmpDir); TxnHeader txnHeader = new TxnHeader(0xabcd, 0x123, 0x123, System.currentTimeMillis(), OpCode.create); Record txn = new CreateTxn("/Test", new byte[0], null, false, 1); txnLog.append(txnHeader, txn); FileInputStream in = new FileInputStream(tmpDir.getPath() + "/log." + Long.toHexString(txnHeader.getZxid())); BinaryInputArchive ia = BinaryInputArchive.getArchive(in); FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); LOG.info("Expected header :" + header.getMagic() + " Received : " + FileTxnLog.TXNLOG_MAGIC); Assert.assertTrue("Missing magic number ", header.getMagic() == FileTxnLog.TXNLOG_MAGIC); }
public void logRequest(TxnHeader hdr, Record txn) { Request request = new Request(null, hdr.getClientId(), hdr.getCxid(), hdr.getType(), null, null); request.hdr = hdr; request.txn = txn; request.zxid = hdr.getZxid(); if ((request.zxid & 0xffffffffL) != 0) { pendingTxns.add(request); } syncProcessor.processRequest(request); }
/** * Controls the response of an observer to the receipt of a quorumpacket * @param qp * @throws IOException */ protected void processPacket(QuorumPacket qp) throws IOException{ switch (qp.getType()) { case Leader.PING: ping(qp); break; case Leader.PROPOSAL: LOG.warn("Ignoring proposal"); break; case Leader.COMMIT: LOG.warn("Ignoring commit"); break; case Leader.UPTODATE: LOG.error("Received an UPTODATE message after Observer started"); break; case Leader.REVALIDATE: revalidate(qp); break; case Leader.SYNC: ((ObserverZooKeeperServer)zk).sync(); break; case Leader.INFORM: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); Request request = new Request (null, hdr.getClientId(), hdr.getCxid(), hdr.getType(), null, null); request.txn = txn; request.hdr = hdr; ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk; obs.commitRequest(request); break; } }
/** * Examine the packet received in qp and dispatch based on its contents. * @param qp * @throws IOException */ protected void processPacket(QuorumPacket qp) throws IOException{ switch (qp.getType()) { case Leader.PING: ping(qp); break; case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); if (hdr.getZxid() != lastQueued + 1) { LOG.warn("Got zxid 0x" + Long.toHexString(hdr.getZxid()) + " expected 0x" + Long.toHexString(lastQueued + 1)); } lastQueued = hdr.getZxid(); fzk.logRequest(hdr, txn); break; case Leader.COMMIT: fzk.commit(qp.getZxid()); break; case Leader.UPTODATE: LOG.error("Received an UPTODATE message after Follower started"); break; case Leader.REVALIDATE: revalidate(qp); break; case Leader.SYNC: fzk.sync(); break; } }
/** * Serializes transaction header and transaction data into a byte buffer. * * @param hdr transaction header * @param txn transaction data * @return serialized transaction record * @throws IOException */ public static byte[] marshallTxnEntry(TxnHeader hdr, Record txn) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputArchive boa = BinaryOutputArchive.getArchive(baos); hdr.serialize(boa, "hdr"); if (txn != null) { txn.serialize(boa, "txn"); } return baos.toByteArray(); }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir", testData); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); long zxid = ZxidUtils.makeZxid(3, 3); TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error); ErrorTxn txn = new ErrorTxn(1); byte[] buf = Util.marshallTxnEntry(hdr, txn); Request req = new Request(null, 1, 1, ZooDefs.OpCode.error, ByteBuffer.wrap(buf), null); req.hdr = hdr; req.txn = txn; logFactory.append(req); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = new QuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { recursiveDelete(tmpDir); } }
@Test public void testTruncationStreamReset() throws Exception { File tmpdir = ClientBase.createTmpDir(); FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir); ZKDatabase zkdb = new ZKDatabase(snaplog); for (int i = 1; i <= 100; i++) { append(zkdb, i); } zkdb.truncateLog(1); append(zkdb, 200); zkdb.close(); // verify that the truncation and subsequent append were processed // correctly FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2")); TxnIterator iter = txnlog.read(1); TxnHeader hdr = iter.getHeader(); Record txn = iter.getTxn(); Assert.assertEquals(1, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); iter.next(); hdr = iter.getHeader(); txn = iter.getTxn(); Assert.assertEquals(200, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); iter.close(); ClientBase.recursiveDelete(tmpdir); }
private void append(ZKDatabase zkdb, int i) throws IOException { TxnHeader hdr = new TxnHeader(1, 1, i, 1, ZooDefs.OpCode.setData); Record txn = new SetDataTxn("/foo" + i, new byte[0], 1); Request req = new Request(null, 0, 0, 0, null, null); req.hdr = hdr; req.txn = txn; zkdb.append(req); zkdb.commit(); }
public void logRequest(TxnHeader hdr, Record txn) { Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); if ((request.zxid & 0xffffffffL) != 0) { pendingTxns.add(request); } syncProcessor.processRequest(request); }
public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, long zxid) { this.sessionId = sessionId; this.cxid = xid; this.type = type; this.hdr = hdr; this.txn = txn; this.zxid = zxid; this.request = null; this.cnxn = null; this.authInfo = null; }
/** * Proposal returned by this iterator has request part set to null, since * it is not used for follower sync-up. */ @Override public Proposal next() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); Proposal p = new Proposal(); try { TxnHeader hdr = itr.getHeader(); Record txn = itr.getTxn(); hdr.serialize(boa, "hdr"); if (txn != null) { txn.serialize(boa, "txn"); } baos.close(); QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, itr.getHeader() .getZxid(), baos.toByteArray(), null); p.packet = pp; p.request = null; // This is the only place that can throw IO exception hasNext = itr.next(); } catch (IOException e) { LOG.error("Unable to read txnlog from disk", e); hasNext = false; } return p; }
/** * load the database from the disk onto memory and also add * the transactions to the committedlog in memory. * @return the last valid zxid on disk * @throws IOException */ public long loadDataBase() throws IOException { PlayBackListener listener=new PlayBackListener(){ public void onTxnLoaded(TxnHeader hdr,Record txn){ Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, txn, hdr.getZxid()); addCommittedProposal(r); } }; long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener); initialized = true; return zxid; }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir", testData); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>(), false); long zxid = ZxidUtils.makeZxid(3, 3); logFactory.append(new Request(1, 1, ZooDefs.OpCode.error, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error), new ErrorTxn(1), zxid)); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = new QuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { TestUtils.deleteFileRecursively(tmpDir); } }
@Test public void testTxnLogElapsedSyncTime() throws IOException { File tmpDir = ClientBase.createEmptyTestDir(); FileTxnSnapLog fileTxnSnapLog = new FileTxnSnapLog(new File(tmpDir, "data"), new File(tmpDir, "data_txnlog")); ZooKeeperServer zks = new ZooKeeperServer(); zks.setTxnLogFactory(fileTxnSnapLog); ZooKeeperServerBean serverBean = new ZooKeeperServerBean(zks); long elapsedTime = serverBean.getTxnLogElapsedSyncTime(); assertEquals(-1, elapsedTime); TxnHeader hdr = new TxnHeader(1, 1, 1, 1, ZooDefs.OpCode.setData); Record txn = new SetDataTxn("/foo", new byte[0], 1); Request req = new Request(0, 0, 0, hdr, txn, 0); try { zks.getTxnLogFactory().append(req); zks.getTxnLogFactory().commit(); elapsedTime = serverBean.getTxnLogElapsedSyncTime(); assertNotEquals(-1, elapsedTime); assertEquals(elapsedTime, serverBean.getTxnLogElapsedSyncTime()); } finally { fileTxnSnapLog.close(); } }
@Test public void testTruncationStreamReset() throws Exception { File tmpdir = ClientBase.createTmpDir(); FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir); ZKDatabase zkdb = new ZKDatabase(snaplog); // make sure to snapshot, so that we have something there when // truncateLog reloads the db snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false); for (int i = 1; i <= 100; i++) { append(zkdb, i); } zkdb.truncateLog(1); append(zkdb, 200); zkdb.close(); // verify that the truncation and subsequent append were processed // correctly FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2")); TxnIterator iter = txnlog.read(1); TxnHeader hdr = iter.getHeader(); Record txn = iter.getTxn(); Assert.assertEquals(1, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); iter.next(); hdr = iter.getHeader(); txn = iter.getTxn(); Assert.assertEquals(200, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); iter.close(); ClientBase.recursiveDelete(tmpdir); }
private void append(ZKDatabase zkdb, int i) throws IOException { TxnHeader hdr = new TxnHeader(1, 1, i, 1, ZooDefs.OpCode.setData); Record txn = new SetDataTxn("/foo" + i, new byte[0], 1); Request req = new Request(0, 0, 0, hdr, txn, 0); zkdb.append(req); zkdb.commit(); }