/** * Simulates ZOOKEEPER-1069 and verifies that flush() before padLogFile * fixes it. */ @Test public void testPad() throws Exception { File tmpDir = ClientBase.createTmpDir(); FileTxnLog txnLog = new FileTxnLog(tmpDir); TxnHeader txnHeader = new TxnHeader(0xabcd, 0x123, 0x123, System.currentTimeMillis(), OpCode.create); Record txn = new CreateTxn("/Test", new byte[0], null, false, 1); txnLog.append(txnHeader, txn); FileInputStream in = new FileInputStream(tmpDir.getPath() + "/log." + Long.toHexString(txnHeader.getZxid())); BinaryInputArchive ia = BinaryInputArchive.getArchive(in); FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); LOG.info("Received magic : " + header.getMagic() + " Expected : " + FileTxnLog.TXNLOG_MAGIC); Assert.assertTrue("Missing magic number ", header.getMagic() == FileTxnLog.TXNLOG_MAGIC); }
/** * Simulates ZOOKEEPER-1069 and verifies that flush() before padLogFile * fixes it. */ @Test public void testPad() throws Exception { File tmpDir = ClientBase.createTmpDir(); FileTxnLog txnLog = new FileTxnLog(tmpDir); TxnHeader txnHeader = new TxnHeader(0xabcd, 0x123, 0x123, Time.currentElapsedTime(), OpCode.create); Record txn = new CreateTxn("/Test", new byte[0], null, false, 1); txnLog.append(txnHeader, txn); FileInputStream in = new FileInputStream(tmpDir.getPath() + "/log." + Long.toHexString(txnHeader.getZxid())); BinaryInputArchive ia = BinaryInputArchive.getArchive(in); FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); LOG.info("Received magic : " + header.getMagic() + " Expected : " + FileTxnLog.TXNLOG_MAGIC); Assert.assertTrue("Missing magic number ", header.getMagic() == FileTxnLog.TXNLOG_MAGIC); }
/** * Simulates ZOOKEEPER-1069 and verifies that flush() before padLogFile * fixes it. */ @Test public void testPad() throws Exception { File tmpDir = ClientBase.createTmpDir(); FileTxnLog txnLog = new FileTxnLog(tmpDir); TxnHeader txnHeader = new TxnHeader(0xabcd, 0x123, 0x123, System.currentTimeMillis(), OpCode.create); Record txn = new CreateTxn("/Test", new byte[0], null, false, 1); txnLog.append(txnHeader, txn); FileInputStream in = new FileInputStream(tmpDir.getPath() + "/log." + Long.toHexString(txnHeader.getZxid())); BinaryInputArchive ia = BinaryInputArchive.getArchive(in); FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); LOG.info("Expected header :" + header.getMagic() + " Received : " + FileTxnLog.TXNLOG_MAGIC); Assert.assertTrue("Missing magic number ", header.getMagic() == FileTxnLog.TXNLOG_MAGIC); }
/** * Simulates ZOOKEEPER-1069 and verifies that flush() before padLogFile * fixes it. */ @Test public void testPad() throws Exception { File tmpDir = ClientBase.createTmpDir(); FileTxnLog txnLog = new FileTxnLog(tmpDir); TxnHeader txnHeader = new TxnHeader(0xabcd, 0x123, 0x123, System.currentTimeMillis(), OpCode.create); Record txn = new CreateTxn("/Test", new byte[0], null, false); txnLog.append(txnHeader, txn); FileInputStream in = new FileInputStream(tmpDir.getPath() + "/log." + Long.toHexString(txnHeader.getZxid())); BinaryInputArchive ia = BinaryInputArchive.getArchive(in); FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); Assert.assertTrue("Missing magic number ", header.getMagic() == FileTxnLog.TXNLOG_MAGIC); }
@SuppressWarnings("unchecked") public ProcessTxnResult processTxn(TxnHeader header, Record txn) { ProcessTxnResult rc = new ProcessTxnResult(); String debug = ""; try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; if (rc.zxid > lastProcessedZxid) { lastProcessedZxid = rc.zxid; } switch (header.getType()) { case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; debug = "Create transaction for " + createTxn.getPath(); createNode(createTxn.getPath(), createTxn.getData(), createTxn .getAcl(), createTxn.getEphemeral() ? header .getClientId() : 0, header.getZxid(), header.getTime()); rc.path = createTxn.getPath(); break; case OpCode.delete: DeleteTxn deleteTxn = (DeleteTxn) txn; debug = "Delete transaction for " + deleteTxn.getPath(); deleteNode(deleteTxn.getPath()); break; case OpCode.setData: SetDataTxn setDataTxn = (SetDataTxn) txn; debug = "Set data for transaction for " + setDataTxn.getPath(); rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(), header.getZxid(), header .getTime()); break; case OpCode.setACL: SetACLTxn setACLTxn = (SetACLTxn) txn; debug = "Set ACL for transaction for " + setACLTxn.getPath(); rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion()); break; case OpCode.closeSession: killSession(header.getClientId()); break; case OpCode.error: ErrorTxn errTxn = (ErrorTxn) txn; rc.err = errTxn.getErr(); break; } } catch (KeeperException e) { // These are expected errors since we take a lazy snapshot if (initialized || (e.code() != Code.NONODE && e.code() != Code.NODEEXISTS)) { LOG.warn("Failed:" + debug, e); } } return rc; }
@Test public void syncTest() throws Exception { File tmpFile = File.createTempFile("test", ".dir", testData); tmpFile.delete(); try { FileTxnSnapLog ftsl = new FileTxnSnapLog(tmpFile, tmpFile); SimpleLearner sl = new SimpleLearner(ftsl); long startZxid = sl.zk.getLastProcessedZxid(); // Set up bogus streams ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos); sl.leaderOs = BinaryOutputArchive.getArchive(new ByteArrayOutputStream()); // make streams and socket do something innocuous sl.bufferedOutput = new BufferedOutputStream(System.out); sl.sock = new Socket(); // fake messages from the server QuorumPacket qp = new QuorumPacket(Leader.SNAP, 0, null, null); oa.writeRecord(qp, null); sl.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); TxnHeader hdr = new TxnHeader(0, 0, 0, 0, ZooDefs.OpCode.create); CreateTxn txn = new CreateTxn("/foo", new byte[0], new ArrayList<ACL>(), false, sl.zk.getZKDatabase().getNode("/").stat.getCversion()); ByteArrayOutputStream tbaos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(tbaos); hdr.serialize(boa, "hdr"); txn.serialize(boa, "txn"); tbaos.close(); qp = new QuorumPacket(Leader.PROPOSAL, 1, tbaos.toByteArray(), null); oa.writeRecord(qp, null); // setup the messages to be streamed to follower sl.leaderIs = BinaryInputArchive.getArchive(new ByteArrayInputStream(baos.toByteArray())); try { sl.syncWithLeader(3); } catch(EOFException e) {} sl.zk.shutdown(); sl = new SimpleLearner(ftsl); Assert.assertEquals(startZxid, sl.zk.getLastProcessedZxid()); } finally { recursiveDelete(tmpFile); } }
public void testPopulatedLeaderConversation(PopulatedLeaderConversation conversation, int ops) throws Exception { Socket pair[] = getSocketPair(); Socket leaderSocket = pair[0]; Socket followerSocket = pair[1]; File tmpDir = File.createTempFile("test", "dir", testData); tmpDir.delete(); tmpDir.mkdir(); LeadThread leadThread = null; Leader leader = null; try { // Setup a database with two znodes FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir); ZKDatabase zkDb = new ZKDatabase(snapLog); Assert.assertTrue(ops >= 1); long zxid = ZxidUtils.makeZxid(1, 0); for(int i = 1; i <= ops; i++){ zxid = ZxidUtils.makeZxid(1, i); String path = "/foo-"+ i; zkDb.processTxn(new TxnHeader(13,1000+i,zxid,30+i,ZooDefs.OpCode.create), new CreateTxn(path, "fpjwasalsohere".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1)); Stat stat = new Stat(); Assert.assertEquals("fpjwasalsohere", new String(zkDb.getData(path, stat, null))); } Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0)); // Generate snapshot and close files. snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts()); snapLog.close(); QuorumPeer peer = createQuorumPeer(tmpDir); leader = createLeader(tmpDir, peer); peer.leader = leader; // Set the last accepted epoch and current epochs to be 1 peer.setAcceptedEpoch(1); peer.setCurrentEpoch(1); leadThread = new LeadThread(leader); leadThread.start(); while(leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) { Thread.sleep(20); } LearnerHandler lh = new LearnerHandler(leaderSocket, leader); lh.start(); leaderSocket.setSoTimeout(4000); InputArchive ia = BinaryInputArchive.getArchive(followerSocket .getInputStream()); OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket .getOutputStream()); conversation.converseWithLeader(ia, oa, leader, zxid); } finally { if (leader != null) { leader.shutdown("end of test"); } if (leadThread != null) { leadThread.interrupt(); leadThread.join(); } recursiveDelete(tmpDir); } }
@Test public void testTxnTimeout() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException { Assert.assertEquals(0, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); LearnerInfo li = new LearnerInfo(1, 0x10000); byte liBytes[] = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.UPTODATE, qp.getType()); l.propose(createNodeRequest(l.zk.getZxid())); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.PROPOSAL, qp.getType()); LOG.info("Proposal sent."); for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) { try { ia.readRecord(qp, null); LOG.info("Ping received: " + i); qp = new QuorumPacket(Leader.PING, qp.getZxid(), "".getBytes(), null); oa.writeRecord(qp, null); } catch (EOFException e) { return; } } Assert.fail("Connection hasn't been closed by leader after transaction times out."); } private Request createNodeRequest(long zxid) throws IOException { TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.create); CreateTxn ct = new CreateTxn("/foo", "data".getBytes(), null, true, 0); ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeRecord(hdr, "header"); boa.writeRecord(ct, "txn"); baos.close(); Request rq = new Request(null, 1, 1, ZooDefs.OpCode.create, ByteBuffer.wrap(baos.toByteArray()), null); rq.zxid = zxid; rq.hdr = hdr; rq.txn = ct; return rq; } }); }
/** * verify that a peer with dirty snapshot joining an established cluster * does not go into an inconsistent state. * * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1558} */ @Test public void testDirtySnapshot() throws IOException, InterruptedException, KeeperException, NoSuchFieldException, IllegalAccessException { Socket pair[] = getSocketPair(); Socket leaderSocket = pair[0]; Socket followerSocket = pair[1]; File tmpDir = File.createTempFile("test", "dir"); tmpDir.delete(); tmpDir.mkdir(); LeadThread leadThread = null; Leader leader = null; try { // Setup a database with two znodes FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir); ZKDatabase zkDb = new ZKDatabase(snapLog); long zxid = ZxidUtils.makeZxid(0, 1); String path = "/foo"; zkDb.processTxn(new TxnHeader(13,1000,zxid,30,ZooDefs.OpCode.create), new CreateTxn(path, "fpjwasalsohere".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1)); Stat stat = new Stat(); Assert.assertEquals("fpjwasalsohere", new String(zkDb.getData(path, stat, null))); // Close files snapLog.close(); QuorumPeer peer = createQuorumPeer(tmpDir); leader = createLeader(tmpDir, peer); peer.leader = leader; // Set the last accepted epoch and current epochs to be 1 peer.setAcceptedEpoch(0); peer.setCurrentEpoch(0); leadThread = new LeadThread(leader); leadThread.start(); while(leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) { Thread.sleep(20); } leader.shutdown("Shutting down the leader"); // Check if there is a valid snapshot (we better not have it) File snapDir = new File (tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION); List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false); for (File f : files) { try { Assert.assertFalse("Found a valid snapshot", Util.isValidSnapshot(f)); } catch (IOException e) { LOG.info("invalid snapshot " + f, e); } } } finally { if (leader != null) { leader.shutdown("end of test"); } if (leadThread != null) { leadThread.interrupt(); leadThread.join(); } recursiveDelete(tmpDir); } }
private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException { if (deserialize) { ByteBufferInputStream.byteBuffer2Record(request.request, record); } int flags; String path; List<ACL> acl; byte[] data; long ttl; if (type == OpCode.createTTL) { CreateTTLRequest createTtlRequest = (CreateTTLRequest)record; flags = createTtlRequest.getFlags(); path = createTtlRequest.getPath(); acl = createTtlRequest.getAcl(); data = createTtlRequest.getData(); ttl = createTtlRequest.getTtl(); } else { CreateRequest createRequest = (CreateRequest)record; flags = createRequest.getFlags(); path = createRequest.getPath(); acl = createRequest.getAcl(); data = createRequest.getData(); ttl = -1; } CreateMode createMode = CreateMode.fromFlag(flags); validateCreateRequest(path, createMode, request, ttl); String parentPath = validatePathForCreate(path, request.sessionId); List<ACL> listACL = fixupACL(path, request.authInfo, acl); ChangeRecord parentRecord = getRecordForPath(parentPath); checkACL(zks, request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL); int parentCVersion = parentRecord.stat.getCversion(); if (createMode.isSequential()) { path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NoNodeException e) { // ignore this one } boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL; if (ephemeralParent) { throw new KeeperException.NoChildrenForEphemeralsException(path); } int newCversion = parentRecord.stat.getCversion()+1; if (type == OpCode.createContainer) { request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion)); } else if (type == OpCode.createTTL) { request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl)); } else { request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion)); } StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { s.setEphemeralOwner(request.sessionId); } parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); addChangeRecord(parentRecord); addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL)); }
@Test public void syncTest() throws Exception { File tmpFile = File.createTempFile("test", ".dir", testData); tmpFile.delete(); try { FileTxnSnapLog ftsl = new FileTxnSnapLog(tmpFile, tmpFile); SimpleLearner sl = new SimpleLearner(ftsl); long startZxid = sl.zk.getLastProcessedZxid(); // Set up bogus streams ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos); sl.leaderOs = BinaryOutputArchive.getArchive(new ByteArrayOutputStream()); // make streams and socket do something innocuous sl.bufferedOutput = new BufferedOutputStream(System.out); sl.sock = new Socket(); // fake messages from the server QuorumPacket qp = new QuorumPacket(Leader.SNAP, 0, null, null); oa.writeRecord(qp, null); sl.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); TxnHeader hdr = new TxnHeader(0, 0, 0, 0, ZooDefs.OpCode.create); CreateTxn txn = new CreateTxn("/foo", new byte[0], new ArrayList<ACL>(), false, sl.zk.getZKDatabase().getNode("/").stat.getCversion()); ByteArrayOutputStream tbaos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(tbaos); hdr.serialize(boa, "hdr"); txn.serialize(boa, "txn"); tbaos.close(); qp = new QuorumPacket(Leader.PROPOSAL, 1, tbaos.toByteArray(), null); oa.writeRecord(qp, null); // setup the messages to be streamed to follower sl.leaderIs = BinaryInputArchive.getArchive(new ByteArrayInputStream(baos.toByteArray())); try { sl.syncWithLeader(3); } catch (EOFException e) {} sl.zk.shutdown(); sl = new SimpleLearner(ftsl); Assert.assertEquals(startZxid, sl.zk.getLastProcessedZxid()); } finally { TestUtils.deleteFileRecursively(tmpFile); } }
public void testPopulatedLeaderConversation(PopulatedLeaderConversation conversation, int ops) throws Exception { Socket pair[] = getSocketPair(); Socket leaderSocket = pair[0]; Socket followerSocket = pair[1]; File tmpDir = File.createTempFile("test", "dir", testData); tmpDir.delete(); tmpDir.mkdir(); LeadThread leadThread = null; Leader leader = null; try { // Setup a database with two znodes FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir); ZKDatabase zkDb = new ZKDatabase(snapLog); Assert.assertTrue(ops >= 1); long zxid = ZxidUtils.makeZxid(1, 0); for(int i = 1; i <= ops; i++){ zxid = ZxidUtils.makeZxid(1, i); String path = "/foo-"+ i; zkDb.processTxn(new TxnHeader(13,1000+i,zxid,30+i,ZooDefs.OpCode.create), new CreateTxn(path, "fpjwasalsohere".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1)); Stat stat = new Stat(); Assert.assertEquals("fpjwasalsohere", new String(zkDb.getData(path, stat, null))); } Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0)); // Generate snapshot and close files. snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false); snapLog.close(); QuorumPeer peer = createQuorumPeer(tmpDir); leader = createLeader(tmpDir, peer); peer.leader = leader; // Set the last accepted epoch and current epochs to be 1 peer.setAcceptedEpoch(1); peer.setCurrentEpoch(1); leadThread = new LeadThread(leader); leadThread.start(); while(leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) { Thread.sleep(20); } LearnerHandler lh = new LearnerHandler(leaderSocket, leader); lh.start(); leaderSocket.setSoTimeout(4000); InputArchive ia = BinaryInputArchive.getArchive(followerSocket .getInputStream()); OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket .getOutputStream()); conversation.converseWithLeader(ia, oa, leader, zxid); } finally { if (leader != null) { leader.shutdown("end of test"); } if (leadThread != null) { leadThread.interrupt(); leadThread.join(); } TestUtils.deleteFileRecursively(tmpDir); } }
@Test public void testTxnTimeout() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException { Assert.assertEquals(0, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); LearnerInfo li = new LearnerInfo(1, 0x10000, 0); byte liBytes[] = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(0, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.UPTODATE, qp.getType()); long zxid = l.zk.getZxid(); l.propose(new Request(1, 1, ZooDefs.OpCode.create, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.create), new CreateTxn("/test", "hola".getBytes(), null, true, 0), zxid)); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.PROPOSAL, qp.getType()); LOG.info("Proposal sent."); for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) { try { ia.readRecord(qp, null); LOG.info("Ping received: " + i); qp = new QuorumPacket(Leader.PING, qp.getZxid(), "".getBytes(), null); oa.writeRecord(qp, null); } catch (EOFException e) { return; } } Assert.fail("Connection hasn't been closed by leader after transaction times out."); } }); }
public void testPopulatedLeaderConversation(PopulatedLeaderConversation conversation, int ops) throws Exception { Socket pair[] = getSocketPair(); Socket leaderSocket = pair[0]; Socket followerSocket = pair[1]; File tmpDir = File.createTempFile("test", "dir", testData); tmpDir.delete(); tmpDir.mkdir(); LeadThread leadThread = null; Leader leader = null; try { // Setup a database with two znodes FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir); ZKDatabase zkDb = new ZKDatabase(snapLog); Assert.assertTrue(ops >= 1); long zxid = ZxidUtils.makeZxid(1, 0); for(int i = 1; i <= ops; i++){ zxid = ZxidUtils.makeZxid(1, i); String path = "/foo-"+ i; zkDb.processTxn(new TxnHeader(13,1000+i,zxid,30+i,ZooDefs.OpCode.create), new CreateTxn(path, "fpjwasalsohere".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1)); Stat stat = new Stat(); Assert.assertEquals("fpjwasalsohere", new String(zkDb.getData(path, stat, null))); } Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0)); // Generate snapshot and close files. snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts()); snapLog.close(); QuorumPeer peer = createQuorumPeer(tmpDir); leader = createLeader(tmpDir, peer); peer.leader = leader; // Set the last accepted epoch and current epochs to be 1 peer.setAcceptedEpoch(1); peer.setCurrentEpoch(1); leadThread = new LeadThread(leader); leadThread.start(); while(leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) { Thread.sleep(20); } LearnerHandler lh = new LearnerHandler(leaderSocket, new BufferedInputStream(leaderSocket.getInputStream()), leader); lh.start(); leaderSocket.setSoTimeout(4000); InputArchive ia = BinaryInputArchive.getArchive(followerSocket .getInputStream()); OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket .getOutputStream()); conversation.converseWithLeader(ia, oa, leader, zxid); } finally { if (leader != null) { leader.shutdown("end of test"); } if (leadThread != null) { leadThread.interrupt(); leadThread.join(); } recursiveDelete(tmpDir); } }
@Test public void syncTest() throws Exception { File tmpFile = File.createTempFile("test", ".dir"); tmpFile.delete(); try { FileTxnSnapLog ftsl = new FileTxnSnapLog(tmpFile, tmpFile); SimpleLearner sl = new SimpleLearner(ftsl); long startZxid = sl.zk.getLastProcessedZxid(); // Set up bogus streams ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos); sl.leaderOs = BinaryOutputArchive.getArchive(new ByteArrayOutputStream()); // make streams and socket do something innocuous sl.bufferedOutput = new BufferedOutputStream(System.out); sl.sock = new Socket(); // fake messages from the server QuorumPacket qp = new QuorumPacket(Leader.SNAP, 0, null, null); oa.writeRecord(qp, null); sl.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); TxnHeader hdr = new TxnHeader(0, 0, 0, 0, ZooDefs.OpCode.create); CreateTxn txn = new CreateTxn("/foo", new byte[0], new ArrayList<ACL>(), false, sl.zk.getZKDatabase().getNode("/").stat.getCversion()); ByteArrayOutputStream tbaos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(tbaos); hdr.serialize(boa, "hdr"); txn.serialize(boa, "txn"); tbaos.close(); qp = new QuorumPacket(Leader.PROPOSAL, 1, tbaos.toByteArray(), null); oa.writeRecord(qp, null); // setup the messages to be streamed to follower sl.leaderIs = BinaryInputArchive.getArchive(new ByteArrayInputStream(baos.toByteArray())); try { sl.syncWithLeader(3); } catch(EOFException e) {} sl.zk.shutdown(); sl = new SimpleLearner(ftsl); Assert.assertEquals(startZxid, sl.zk.getLastProcessedZxid()); } finally { recursiveDelete(tmpFile); } }
public void testPopulatedLeaderConversation(PopulatedLeaderConversation conversation, int ops) throws Exception { Socket pair[] = getSocketPair(); Socket leaderSocket = pair[0]; Socket followerSocket = pair[1]; File tmpDir = File.createTempFile("test", "dir"); tmpDir.delete(); tmpDir.mkdir(); LeadThread leadThread = null; Leader leader = null; try { // Setup a database with two znodes FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir); ZKDatabase zkDb = new ZKDatabase(snapLog); Assert.assertTrue(ops >= 1); long zxid = ZxidUtils.makeZxid(1, 0); for(int i = 1; i <= ops; i++){ zxid = ZxidUtils.makeZxid(1, i); String path = "/foo-"+ i; zkDb.processTxn(new TxnHeader(13,1000+i,zxid,30+i,ZooDefs.OpCode.create), new CreateTxn(path, "fpjwasalsohere".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1)); Stat stat = new Stat(); Assert.assertEquals("fpjwasalsohere", new String(zkDb.getData(path, stat, null))); } Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0)); // Generate snapshot and close files. snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts()); snapLog.close(); QuorumPeer peer = createQuorumPeer(tmpDir); leader = createLeader(tmpDir, peer); peer.leader = leader; // Set the last accepted epoch and current epochs to be 1 peer.setAcceptedEpoch(1); peer.setCurrentEpoch(1); leadThread = new LeadThread(leader); leadThread.start(); while(leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) { Thread.sleep(20); } LearnerHandler lh = new LearnerHandler(leaderSocket, leader); lh.start(); leaderSocket.setSoTimeout(4000); InputArchive ia = BinaryInputArchive.getArchive(followerSocket .getInputStream()); OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket .getOutputStream()); conversation.converseWithLeader(ia, oa, leader, zxid); } finally { if (leader != null) { leader.shutdown("end of test"); } if (leadThread != null) { leadThread.interrupt(); leadThread.join(); } recursiveDelete(tmpDir); } }
/** * process the transaction on the datatree * @param hdr the hdr of the transaction * @param dt the datatree to apply transaction to * @param sessions the sessions to be restored * @param txn the transaction to be applied */ public void processTransaction(TxnHeader hdr,DataTree dt, Map<Long, Integer> sessions, Record txn) throws KeeperException.NoNodeException { ProcessTxnResult rc; switch (hdr.getType()) { case OpCode.createSession: sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- create session in log: " + Long.toHexString(hdr.getClientId()) + " with timeout: " + ((CreateSessionTxn) txn).getTimeOut()); } // give dataTree a chance to sync its lastProcessedZxid rc = dt.processTxn(hdr, txn); break; case OpCode.closeSession: sessions.remove(hdr.getClientId()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- close session in log: " + Long.toHexString(hdr.getClientId())); } rc = dt.processTxn(hdr, txn); break; default: rc = dt.processTxn(hdr, txn); } /** * Snapshots are taken lazily. It can happen that the child * znodes of a parent are modified (deleted or created) after the parent * is serialized. Therefore, while replaying logs during restore, a * delete/create might fail because the node was already * deleted/created. * * After seeing this failure, we should increment * the cversion of the parent znode since the parent was serialized * before its children. * * Note, such failures on DT should be seen only during * restore. */ if ((hdr.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) && ((CreateTxn)txn).getParentCVersion() == -1) { LOG.debug("Failed Txn: " + hdr.getType() + " path:" + rc.path + " err: " + rc.err); int lastSlash = rc.path.lastIndexOf('/'); String parentName = rc.path.substring(0, lastSlash); try { dt.incrementCversion(parentName, hdr.getZxid()); } catch (KeeperException.NoNodeException e) { LOG.error("Failed to increment parent cversion for: " + parentName, e); throw e; } } else if (rc.err != Code.OK.intValue()) { LOG.debug("Ignoring processTxn failure hdr: " + hdr.getType() + " : error: " + rc.err); } }
public ProcessTxnResult processTxn(TxnHeader header, Record txn) { ProcessTxnResult rc = new ProcessTxnResult(); String debug = ""; try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; if (rc.zxid > lastProcessedZxid) { lastProcessedZxid = rc.zxid; } switch (header.getType()) { case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; debug = "Create transaction for " + createTxn.getPath(); rc.path = createTxn.getPath(); createNode( createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(), header.getZxid(), header.getTime()); break; case OpCode.delete: DeleteTxn deleteTxn = (DeleteTxn) txn; debug = "Delete transaction for " + deleteTxn.getPath(); rc.path = deleteTxn.getPath(); deleteNode(deleteTxn.getPath(), header.getZxid()); break; case OpCode.setData: SetDataTxn setDataTxn = (SetDataTxn) txn; debug = "Set data for transaction for " + setDataTxn.getPath(); rc.stat = setData(setDataTxn.getPath(), setDataTxn .getData(), setDataTxn.getVersion(), header .getZxid(), header.getTime()); break; case OpCode.setACL: SetACLTxn setACLTxn = (SetACLTxn) txn; debug = "Set ACL for transaction for " + setACLTxn.getPath(); rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion()); break; case OpCode.closeSession: killSession(header.getClientId(), header.getZxid()); break; case OpCode.error: ErrorTxn errTxn = (ErrorTxn) txn; rc.err = errTxn.getErr(); break; } } catch (KeeperException e) { LOG.debug("Failed: " + debug, e); rc.err = e.code().intValue(); } return rc; }