@SuppressWarnings("deprecation") private static ImmutableSet<FSEditLogOpCodes> skippedOps() { ImmutableSet.Builder<FSEditLogOpCodes> b = ImmutableSet.builder(); // Deprecated opcodes b.add(FSEditLogOpCodes.OP_DATANODE_ADD) .add(FSEditLogOpCodes.OP_DATANODE_REMOVE) .add(FSEditLogOpCodes.OP_SET_NS_QUOTA) .add(FSEditLogOpCodes.OP_CLEAR_NS_QUOTA) .add(FSEditLogOpCodes.OP_SET_GENSTAMP_V1); // Cannot test delegation token related code in insecure set up b.add(FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN) .add(FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN) .add(FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN); // Skip invalid opcode b.add(FSEditLogOpCodes.OP_INVALID); return b.build(); }
@SuppressWarnings("deprecation") private static ImmutableSet<FSEditLogOpCodes> skippedOps() { ImmutableSet.Builder<FSEditLogOpCodes> b = ImmutableSet .<FSEditLogOpCodes> builder(); // Deprecated opcodes b.add(FSEditLogOpCodes.OP_DATANODE_ADD) .add(FSEditLogOpCodes.OP_DATANODE_REMOVE) .add(FSEditLogOpCodes.OP_SET_NS_QUOTA) .add(FSEditLogOpCodes.OP_CLEAR_NS_QUOTA) .add(FSEditLogOpCodes.OP_SET_GENSTAMP_V1); // Cannot test delegation token related code in insecure set up b.add(FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN) .add(FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN) .add(FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN); // Skip invalid opcode b.add(FSEditLogOpCodes.OP_INVALID); return b.build(); }
/** * Increment the op code counter * * @param opCode opCode for which to increment count */ private void incrementOpCodeCount(FSEditLogOpCodes opCode) { if(!opCodeCount.containsKey(opCode)) { opCodeCount.put(opCode, 0L); } Long newValue = opCodeCount.get(opCode) + 1; opCodeCount.put(opCode, newValue); }
/** * Get the statistics in string format, suitable for printing * * @return statistics in in string format, suitable for printing */ public String getStatisticsString() { StringBuffer sb = new StringBuffer(); sb.append(String.format( " %-30.30s : %d%n", "VERSION", version)); for(FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) { sb.append(String.format( " %-30.30s (%3d): %d%n", opCode.toString(), opCode.getOpCode(), opCodeCount.get(opCode))); } return sb.toString(); }
/** * Verify that the given list of streams contains exactly the range of * transactions specified, inclusive. */ public static void verifyEdits(List<EditLogInputStream> streams, int firstTxnId, int lastTxnId) throws IOException { Iterator<EditLogInputStream> iter = streams.iterator(); assertTrue(iter.hasNext()); EditLogInputStream stream = iter.next(); for (int expected = firstTxnId; expected <= lastTxnId; expected++) { FSEditLogOp op = stream.readOp(); while (op == null) { assertTrue("Expected to find txid " + expected + ", " + "but no more streams available to read from", iter.hasNext()); stream = iter.next(); op = stream.readOp(); } assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode); assertEquals(expected, op.getTransactionId()); } assertNull(stream.readOp()); assertFalse("Expected no more txns after " + lastTxnId + " but more streams are available", iter.hasNext()); }
/** * Compare two files, ignore trailing zeros at the end, for edits log the * trailing zeros do not make any difference, throw exception is the files are * not same * * @param filenameSmall first file to compare (doesn't have to be smaller) * @param filenameLarge second file to compare (doesn't have to be larger) */ private boolean filesEqualIgnoreTrailingZeros(String filenameSmall, String filenameLarge) throws IOException { ByteBuffer small = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameSmall)); ByteBuffer large = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameLarge)); // now correct if it's otherwise if (small.capacity() > large.capacity()) { ByteBuffer tmpByteBuffer = small; small = large; large = tmpByteBuffer; String tmpFilename = filenameSmall; filenameSmall = filenameLarge; filenameLarge = tmpFilename; } // compare from 0 to capacity of small // the rest of the large should be all zeros small.position(0); small.limit(small.capacity()); large.position(0); large.limit(small.capacity()); // compares position to limit if (!small.equals(large)) { return false; } // everything after limit should be 0xFF int i = large.limit(); large.clear(); for (; i < large.capacity(); i++) { if (large.get(i) != FSEditLogOpCodes.OP_INVALID.getOpCode()) { return false; } } return true; }
/** * Compare two files, ignore trailing zeros at the end, * for edits log the trailing zeros do not make any difference, * throw exception is the files are not same * * @param filenameSmall first file to compare (doesn't have to be smaller) * @param filenameLarge second file to compare (doesn't have to be larger) */ private boolean filesEqualIgnoreTrailingZeros(String filenameSmall, String filenameLarge) throws IOException { ByteBuffer small = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameSmall)); ByteBuffer large = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameLarge)); // now correct if it's otherwise if(small.capacity() > large.capacity()) { ByteBuffer tmpByteBuffer = small; small = large; large = tmpByteBuffer; String tmpFilename = filenameSmall; filenameSmall = filenameLarge; filenameLarge = tmpFilename; } // compare from 0 to capacity of small // the rest of the large should be all zeros small.position(0); small.limit(small.capacity()); large.position(0); large.limit(small.capacity()); // compares position to limit if(!small.equals(large)) { return false; } // everything after limit should be 0xFF int i = large.limit(); large.clear(); for(; i < large.capacity(); i++) { if(large.get(i) != FSEditLogOpCodes.OP_INVALID.getOpCode()) { return false; } } return true; }
/** * For each operation read from the stream, check if this is a closing * transaction. If so, we are sure we need to move to the next segment. * * We also mark that this is the most recent time, we read something valid * from the input. */ private void updateState(FSEditLogOp op, boolean checkTxnId) throws IOException { InjectionHandler.processEvent(InjectionEvent.SERVERLOGREADER_UPDATE, op); if (checkTxnId) { mostRecentlyReadTransactionTxId = ServerLogReaderUtil.checkTransactionId( mostRecentlyReadTransactionTxId, op); } updateStreamPosition(); // read a valid operation core.getMetrics().readOperations.inc(); mostRecentlyReadTransactionTime = now(); // current log segment ends normally if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) { LOG.info("Segment - ending log segment start txid: " + currentSegmentTxId + ", end txid: " + op.getTransactionId()); // move forward with next segment currentSegmentTxId = op.getTransactionId() + 1; // set the stream to null so the next getNotification() // will recreate it currentEditLogInputStream = null; // indicate that a new stream will be opened currentEditLogInputStreamPosition = -1; } else if (op.opCode == FSEditLogOpCodes.OP_START_LOG_SEGMENT) { LOG.info("Segment - starting log segment start txid: " + currentSegmentTxId); } }