private static void readSnapshotLog(String snapshotPath) throws Exception { FileInputStream fis = new FileInputStream(snapshotPath); BinaryInputArchive ia = BinaryInputArchive.getArchive(fis); Map<Long, Integer> sessions = new HashMap<Long, Integer>(); DataTree dt = new DataTree(); FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); if (header.getMagic() != FileSnap.SNAP_MAGIC) { throw new IOException("mismatching magic headers " + header.getMagic() + " != " + FileSnap.SNAP_MAGIC); } SerializeUtils.deserializeSnapshot(dt, ia, sessions); if (bw != null) { bw.write(sessions.toString()); bw.newLine(); } else { System.out.println(sessions); } traverse(dt, 1, "/"); }
/** * Controls the response of an observer to the receipt of a quorumpacket * @param qp * @throws IOException */ protected void processPacket(QuorumPacket qp) throws IOException{ switch (qp.getType()) { case Leader.PING: ping(qp); break; case Leader.PROPOSAL: LOG.warn("Ignoring proposal"); break; case Leader.COMMIT: LOG.warn("Ignoring commit"); break; case Leader.UPTODATE: LOG.error("Received an UPTODATE message after Observer started"); break; case Leader.REVALIDATE: revalidate(qp); break; case Leader.SYNC: ((ObserverZooKeeperServer)zk).sync(); break; case Leader.INFORM: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); Request request = new Request (null, hdr.getClientId(), hdr.getCxid(), hdr.getType(), null, null); request.txn = txn; request.hdr = hdr; ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk; obs.commitRequest(request); break; } }
/** * Examine the packet received in qp and dispatch based on its contents. * @param qp * @throws IOException */ protected void processPacket(QuorumPacket qp) throws IOException{ switch (qp.getType()) { case Leader.PING: ping(qp); break; case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); if (hdr.getZxid() != lastQueued + 1) { LOG.warn("Got zxid 0x" + Long.toHexString(hdr.getZxid()) + " expected 0x" + Long.toHexString(lastQueued + 1)); } lastQueued = hdr.getZxid(); fzk.logRequest(hdr, txn); break; case Leader.COMMIT: fzk.commit(qp.getZxid()); break; case Leader.UPTODATE: LOG.error("Received an UPTODATE message after Follower started"); break; case Leader.REVALIDATE: revalidate(qp); break; case Leader.SYNC: fzk.sync(); break; } }
/** * deserialize the datatree from an inputarchive * @param dt the datatree to be serialized into * @param sessions the sessions to be filled up * @param ia the input archive to restore from * @throws IOException */ public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException { FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); if (header.getMagic() != SNAP_MAGIC) { throw new IOException("mismatching magic headers " + header.getMagic() + " != " + FileSnap.SNAP_MAGIC); } SerializeUtils.deserializeSnapshot(dt,ia,sessions); }
/** * serialize the datatree and sessions * @param dt the datatree to be serialized * @param sessions the sessions to be serialized * @param oa the output archive to serialize into * @param header the header of this snapshot * @throws IOException */ protected void serialize(DataTree dt,Map<Long, Integer> sessions, OutputArchive oa, FileHeader header) throws IOException { // this is really a programmatic error and not something that can // happen at runtime if(header==null) throw new IllegalStateException( "Snapshot's not open for writing: uninitialized header"); header.serialize(oa, "fileheader"); SerializeUtils.serializeSnapshot(dt,oa,sessions); }
/** * the iterator that moves to the next transaction * @return true if there is more transactions to be read * false if not. */ public boolean next() throws IOException { if (ia == null) { return false; } try { long crcValue = ia.readLong("crcvalue"); byte[] bytes = Util.readTxnBytes(ia); // Since we preallocate, we define EOF to be an if (bytes == null || bytes.length==0) { throw new EOFException("Failed to read " + logFile); } // EOF or corrupted record // validate CRC Checksum crc = makeChecksumAlgorithm(); crc.update(bytes, 0, bytes.length); if (crcValue != crc.getValue()) throw new IOException(CRC_ERROR); if (bytes == null || bytes.length == 0) return false; hdr = new TxnHeader(); record = SerializeUtils.deserializeTxn(bytes, hdr); } catch (EOFException e) { LOG.debug("EOF excepton " + e); inputStream.close(); inputStream = null; ia = null; hdr = null; // this means that the file has ended // we should go to the next file if (!goToNextLog()) { return false; } // if we went to the next log file, we should call next() again return next(); } return true; }
/** * the iterator that moves to the next transaction * @return true if there is more transactions to be read * false if not. */ public boolean next() throws IOException { if (ia == null) { return false; } try { long crcValue = ia.readLong("crcvalue"); byte[] bytes = Util.readTxnBytes(ia); // Since we preallocate, we define EOF to be an if (bytes == null || bytes.length==0) throw new EOFException("Failed to read"); // EOF or corrupted record // validate CRC Checksum crc = makeChecksumAlgorithm(); crc.update(bytes, 0, bytes.length); if (crcValue != crc.getValue()) throw new IOException(CRC_ERROR); if (bytes == null || bytes.length == 0) return false; hdr = new TxnHeader(); record = SerializeUtils.deserializeTxn(bytes, hdr); } catch (EOFException e) { LOG.debug("EOF excepton " + e); inputStream.close(); inputStream = null; ia = null; hdr = null; // this means that the file has ended // we shoud go to the next file if (!goToNextLog()) { return false; } // if we went to the next log file, we should call next() again return next(); } return true; }
/** * deserialize the datatree from an inputarchive * @param dt the datatree to be serialized into * @param sessions the sessions to be filled up * @param ia the input archive to restore from * @throws IOException */ public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException { FileHeader header = new FileHeader(); // 反序列化fileheader header.deserialize(ia, "fileheader"); // 校验魔数 if (header.getMagic() != SNAP_MAGIC) { throw new IOException("mismatching magic headers " + header.getMagic() + " != " + FileSnap.SNAP_MAGIC); } // 反序列化内存快照 SerializeUtils.deserializeSnapshot(dt,ia,sessions); }