public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = hdr.getType(); long sessionId = hdr.getClientId(); rc = getZKDatabase().processTxn(hdr, txn); if (opCode == OpCode.createSession) { if (txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; sessionTracker.addSession(sessionId, cst .getTimeOut()); } else { LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString()); } } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } return rc; }
/** * Applies transaction to this <code>DataState</code>. Zxid of transaction * <code>t</code> is not checked, and it is possible to apply out of order * transactions using this method. User should take care about consistency. * * @param t Transaction to be applied. */ public void processTransaction(Transaction t) { TxnHeader hdr = t.getTxnHeader(); Record txn = t.getTxnRecord(); //there should be a check for put and remove operations switch (hdr.getType()) { case ZooDefs.OpCode.createSession: sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); break; case ZooDefs.OpCode.closeSession: sessions.remove(hdr.getClientId()); break; } dt.processTxn(hdr, txn); }
/** * play the log from this logstream into the datatree * @param logStream * @return * @throws IOException */ public long playLog(InputArchive logStream) throws IOException { long highestZxid = 0; try { while (true) { byte[] bytes = logStream.readBuffer("txnEntry"); if (bytes.length == 0) { // Since we preallocate, we define EOF to be an // empty transaction throw new EOFException(); } TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(bytes, hdr); if (logStream.readByte("EOR") != 'B') { LOG.warn("Last transaction was partial."); throw new EOFException("Last transaction was partial."); } if (hdr.getZxid() <= highestZxid && highestZxid != 0) { LOG.error(highestZxid + "(higestZxid) >= " + hdr.getZxid() + "(next log) for type " + hdr.getType()); } else { highestZxid = hdr.getZxid(); } switch (hdr.getType()) { case OpCode.createSession: sessionsWithTimeouts.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "playLog --- create session in log: 0x" + Long.toHexString(hdr.getClientId()) + " with timeout: " + ((CreateSessionTxn) txn).getTimeOut()); } // give dataTree a chance to sync its lastProcessedZxid oldDataTree.processTxn(hdr, txn); break; case OpCode.closeSession: sessionsWithTimeouts.remove(hdr.getClientId()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "playLog --- close session in log: 0x" + Long.toHexString(hdr.getClientId())); } oldDataTree.processTxn(hdr, txn); break; default: oldDataTree.processTxn(hdr, txn); } Request r = new Request(null, 0, hdr.getCxid(), hdr.getType(), null, null); r.txn = txn; r.hdr = hdr; r.zxid = hdr.getZxid(); } } catch (EOFException e) { // expected in some cases - see comments in try block } return highestZxid; }
/** * process the transaction on the datatree * @param hdr the hdr of the transaction * @param dt the datatree to apply transaction to * @param sessions the sessions to be restored * @param txn the transaction to be applied */ public void processTransaction(TxnHeader hdr,DataTree dt, Map<Long, Integer> sessions, Record txn) throws KeeperException.NoNodeException { ProcessTxnResult rc; switch (hdr.getType()) { case OpCode.createSession: sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- create session in log: 0x" + Long.toHexString(hdr.getClientId()) + " with timeout: " + ((CreateSessionTxn) txn).getTimeOut()); } // give dataTree a chance to sync its lastProcessedZxid rc = dt.processTxn(hdr, txn); break; case OpCode.closeSession: sessions.remove(hdr.getClientId()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- close session in log: 0x" + Long.toHexString(hdr.getClientId())); } rc = dt.processTxn(hdr, txn); break; default: rc = dt.processTxn(hdr, txn); } /** * Snapshots are lazily created. So when a snapshot is in progress, * there is a chance for later transactions to make into the * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS * errors could occur. It should be safe to ignore these. */ if (rc.err != Code.OK.intValue()) { LOG.debug("Ignoring processTxn failure hdr:" + hdr.getType() + ", error: " + rc.err + ", path: " + rc.path); } }
/** * process the transaction on the datatree * @param hdr the hdr of the transaction * @param dt the datatree to apply transaction to * @param sessions the sessions to be restored * @param txn the transaction to be applied */ public void processTransaction(TxnHeader hdr,DataTree dt, Map<Long, Integer> sessions, Record txn) throws KeeperException.NoNodeException { ProcessTxnResult rc; switch (hdr.getType()) { case OpCode.createSession: sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- create session in log: 0x" + Long.toHexString(hdr.getClientId()) + " with timeout: " + ((CreateSessionTxn) txn).getTimeOut()); } // give dataTree a chance to sync its lastProcessedZxid rc = dt.processTxn(hdr, txn); break; case OpCode.closeSession: sessions.remove(hdr.getClientId()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- close session in log: 0x" + Long.toHexString(hdr.getClientId())); } rc = dt.processTxn(hdr, txn); break; default: rc = dt.processTxn(hdr, txn); } /** * Snapshots are lazily created. So when a snapshot is in progress, * there is a chance for later transactions to make into the * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS * errors could occur. It should be safe to ignore these. */ if (rc.err != Code.OK.intValue()) { LOG.debug( "Ignoring processTxn failure hdr: {}, error: {}, path: {}", hdr.getType(), rc.err, rc.path); } }
/** * process the transaction on the datatree * @param hdr the hdr of the transaction * @param dt the datatree to apply transaction to * @param sessions the sessions to be restored * @param txn the transaction to be applied */ public void processTransaction(TxnHeader hdr,DataTree dt, Map<Long, Integer> sessions, Record txn) throws KeeperException.NoNodeException { ProcessTxnResult rc; switch (hdr.getType()) { case OpCode.createSession: sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- create session in log: 0x" + Long.toHexString(hdr.getClientId()) + " with timeout: " + ((CreateSessionTxn) txn).getTimeOut()); } // give dataTree a chance to sync its lastProcessedZxid rc = dt.processTxn(hdr, txn); break; case OpCode.closeSession: sessions.remove(hdr.getClientId()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- close session in log: 0x" + Long.toHexString(hdr.getClientId())); } rc = dt.processTxn(hdr, txn); break; default: rc = dt.processTxn(hdr, txn); } /** * This should never happen. A NONODE can never show up in the * transaction logs. This is more indicative of a corrupt transaction * log. Refer ZOOKEEPER-1333 for more info. */ if (rc.err != Code.OK.intValue()) { if (hdr.getType() == OpCode.create && rc.err == Code.NONODE.intValue()) { int lastSlash = rc.path.lastIndexOf('/'); String parentName = rc.path.substring(0, lastSlash); LOG.error("Parent {} missing for {}", parentName, rc.path); throw new KeeperException.NoNodeException(parentName); } else { LOG.debug("Ignoring processTxn failure hdr: " + hdr.getType() + " : error: " + rc.err); } } }
/** * process the transaction on the datatree * @param hdr the hdr of the transaction * @param dt the datatree to apply transaction to * @param sessions the sessions to be restored * @param txn the transaction to be applied */ public void processTransaction(TxnHeader hdr,DataTree dt, Map<Long, Integer> sessions, Record txn) throws KeeperException.NoNodeException { ProcessTxnResult rc; switch (hdr.getType()) { case OpCode.createSession: sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- create session in log: " + Long.toHexString(hdr.getClientId()) + " with timeout: " + ((CreateSessionTxn) txn).getTimeOut()); } // give dataTree a chance to sync its lastProcessedZxid rc = dt.processTxn(hdr, txn); break; case OpCode.closeSession: sessions.remove(hdr.getClientId()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- close session in log: " + Long.toHexString(hdr.getClientId())); } rc = dt.processTxn(hdr, txn); break; default: rc = dt.processTxn(hdr, txn); } /** * Snapshots are taken lazily. It can happen that the child * znodes of a parent are modified (deleted or created) after the parent * is serialized. Therefore, while replaying logs during restore, a * delete/create might fail because the node was already * deleted/created. * * After seeing this failure, we should increment * the cversion of the parent znode since the parent was serialized * before its children. * * Note, such failures on DT should be seen only during * restore. */ if ((hdr.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) && ((CreateTxn)txn).getParentCVersion() == -1) { LOG.debug("Failed Txn: " + hdr.getType() + " path:" + rc.path + " err: " + rc.err); int lastSlash = rc.path.lastIndexOf('/'); String parentName = rc.path.substring(0, lastSlash); try { dt.incrementCversion(parentName, hdr.getZxid()); } catch (KeeperException.NoNodeException e) { LOG.error("Failed to increment parent cversion for: " + parentName, e); throw e; } } else if (rc.err != Code.OK.intValue()) { LOG.debug("Ignoring processTxn failure hdr: " + hdr.getType() + " : error: " + rc.err); } }
/** * play the log from this logstream into the datatree * @param logStream * @return * @throws IOException */ public long playLog(InputArchive logStream) throws IOException { long highestZxid = 0; try { while (true) { byte[] bytes = logStream.readBuffer("txnEntry"); if (bytes.length == 0) { // Since we preallocate, we define EOF to be an // empty transaction throw new EOFException(); } InputArchive ia = BinaryInputArchive .getArchive(new ByteArrayInputStream(bytes)); TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(ia, hdr); if (logStream.readByte("EOR") != 'B') { LOG.warn("Last transaction was partial."); throw new EOFException("Last transaction was partial."); } if (hdr.getZxid() <= highestZxid && highestZxid != 0) { LOG.error(highestZxid + "(higestZxid) >= " + hdr.getZxid() + "(next log) for type " + hdr.getType()); } else { highestZxid = hdr.getZxid(); } switch (hdr.getType()) { case OpCode.createSession: sessionsWithTimeouts.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "playLog --- create session in log: 0x" + Long.toHexString(hdr.getClientId()) + " with timeout: " + ((CreateSessionTxn) txn).getTimeOut()); } // give dataTree a chance to sync its lastProcessedZxid oldDataTree.processTxn(hdr, txn); break; case OpCode.closeSession: sessionsWithTimeouts.remove(hdr.getClientId()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "playLog --- close session in log: 0x" + Long.toHexString(hdr.getClientId())); } oldDataTree.processTxn(hdr, txn); break; default: oldDataTree.processTxn(hdr, txn); } Request r = new Request(null, 0, hdr.getCxid(), hdr.getType(), null, null); r.txn = txn; r.hdr = hdr; r.zxid = hdr.getZxid(); } } catch (EOFException e) { // expected in some cases - see comments in try block } return highestZxid; }
/** * process the transaction on the datatree * @param hdr the hdr of the transaction * @param dt the datatree to apply transaction to * @param sessions the sessions to be restored * @param txn the transaction to be applied */ public void processTransaction(TxnHeader hdr,DataTree dt, Map<Long, Integer> sessions, Record txn) throws KeeperException.NoNodeException { ProcessTxnResult rc; switch (hdr.getType()) { case OpCode.createSession: sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- create session in log: " + Long.toHexString(hdr.getClientId()) + " with timeout: " + ((CreateSessionTxn) txn).getTimeOut()); } // give dataTree a chance to sync its lastProcessedZxid rc = dt.processTxn(hdr, txn); break; case OpCode.closeSession: sessions.remove(hdr.getClientId()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- close session in log: " + Long.toHexString(hdr.getClientId())); } rc = dt.processTxn(hdr, txn); break; default: rc = dt.processTxn(hdr, txn); } /** * Snapshots are taken lazily. It can happen that the child * znodes of a parent are modified (deleted or created) after the parent * is serialized. Therefore, while replaying logs during restore, a * delete/create might fail because the node was already * deleted/created. * * After seeing this failure, we should increment * the cversion of the parent znode since the parent was serialized * before its children. * * Note, such failures on DT should be seen only during * restore. */ if ((hdr.getType() == OpCode.delete && rc.err == Code.NONODE.intValue()) || (hdr.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue())) { LOG.debug("Failed Txn: " + hdr.getType() + " path:" + rc.path + " err: " + rc.err); int lastSlash = rc.path.lastIndexOf('/'); String parentName = rc.path.substring(0, lastSlash); try { dt.incrementCversion(parentName, hdr.getZxid()); } catch (KeeperException.NoNodeException e) { LOG.error("Failed to increment parent cversion for: " + parentName, e); throw e; } } else if (rc.err != Code.OK.intValue()) { LOG.debug("Ignoring processTxn failure hdr: " + hdr.getType() + " : error: " + rc.err); } }