/** * Constructor that creates a connection to the local ZooKeeper ensemble. * @param conf Configuration to use * @throws IOException if an internal replication error occurs * @throws RuntimeException if replication isn't enabled. */ public ReplicationAdmin(Configuration conf) throws IOException { if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT)) { throw new RuntimeException("hbase.replication isn't true, please " + "enable it in order to use replication"); } this.connection = ConnectionFactory.createConnection(conf); zkw = createZooKeeperWatcher(); try { this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection); this.replicationPeers.init(); this.replicationQueuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection); this.replicationQueuesClient.init(); } catch (ReplicationException e) { throw new IOException("Error initializing the replication admin client.", e); } }
/** * Constructor that creates a connection to the local ZooKeeper ensemble. * @param conf Configuration to use * @throws IOException if an internal replication error occurs * @throws RuntimeException if replication isn't enabled. */ public ReplicationAdmin(Configuration conf) throws IOException { if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT)) { throw new RuntimeException("hbase.replication isn't true, please " + "enable it in order to use replication"); } this.connection = HConnectionManager.getConnection(conf); ZooKeeperWatcher zkw = createZooKeeperWatcher(); try { this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection); this.replicationPeers.init(); this.replicationQueuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection); this.replicationQueuesClient.init(); } catch (ReplicationException e) { throw new IOException("Error initializing the replication admin client.", e); } }
@Test public void testCleanupFailoverQueues() throws Exception { final Server server = new DummyServer("hostname1.example.org"); ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode SortedSet<String> files = new TreeSet<String>(); String group = "testgroup"; String file1 = group + ".log1"; String file2 = group + ".log2"; files.add(file1); files.add(file2); for (String file : files) { rq.addLog("1", file); } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); rp1.init(); NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( new Long(1), new Long(2))); w1.start(); w1.join(5000); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); manager.cleanOldLogs(file2, id, true); // log1 should be deleted assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); }
@Test public void testFailoverDeadServerCversionChange() throws Exception { LOG.debug("testFailoverDeadServerCversionChange"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server s0 = new DummyServer("cversion-change0.example.org"); ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0); repQueues.init(s0.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { repQueues.addLog("1", file); } // simulate queue transfer Server s1 = new DummyServer("cversion-change1.example.org"); ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); ReplicationQueuesClient client = ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1); int v0 = client.getQueuesZNodeCversion(); rq1.claimQueues(s0.getServerName().getServerName()); int v1 = client.getQueuesZNodeCversion(); // cversion should increased by 1 since a child node is deleted assertEquals(v0 + 1, v1); s0.abort("", null); }
public DummyNodeFailoverWorker(String znode, Server s) throws Exception { this.deadRsZnode = znode; this.server = s; this.rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server); this.rq.init(this.server.getServerName().toString()); }
private static String getPeerQuorumAddress(final Configuration conf) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeerZKImpl peer = null; try { localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", new Abortable() { @Override public void abort(String why, Throwable e) {} @Override public boolean isAborted() {return false;} }); ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); rp.init(); Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId); if (pair == null) { throw new IOException("Couldn't get peer conf!"); } Configuration peerConf = rp.getPeerConf(peerId).getSecond(); return ZKUtil.getZooKeeperClusterKey(peerConf); } catch (ReplicationException e) { throw new IOException( "An error occured while trying to connect to the remove peer cluster", e); } finally { if (peer != null) { peer.close(); } if (localZKW != null) { localZKW.close(); } } }
@Test public void testCleanupFailoverQueues() throws Exception { final Server server = new DummyServer("hostname1.example.org"); ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode SortedSet<String> files = new TreeSet<String>(); files.add("log1"); files.add("log2"); for (String file : files) { rq.addLog("1", file); } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); rp1.init(); NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( new Long(1), new Long(2))); w1.start(); w1.join(5000); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id)); manager.cleanOldLogs("log2", id, true); // log1 should be deleted assertEquals(Sets.newHashSet("log2"), manager.getWalsByIdRecoveredQueues().get(id)); }
private static String getPeerQuorumAddress(final Configuration conf) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeer peer = null; try { localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", new Abortable() { @Override public void abort(String why, Throwable e) {} @Override public boolean isAborted() {return false;} }); ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); rp.init(); Configuration peerConf = rp.getPeerConf(peerId); if (peerConf == null) { throw new IOException("Couldn't get peer conf!"); } return ZKUtil.getZooKeeperClusterKey(peerConf); } catch (ReplicationException e) { throw new IOException( "An error occured while trying to connect to the remove peer cluster", e); } finally { if (peer != null) { peer.close(); } if (localZKW != null) { localZKW.close(); } } }
public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception { ReplicationQueueStorage queueStorage; ReplicationTracker replicationTracker; StringBuilder sb = new StringBuilder(); queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(), new WarnOnlyStoppable()); Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers()); // Loops each peer on each RS and dumps the queues List<ServerName> regionservers = queueStorage.getListOfReplicators(); if (regionservers == null || regionservers.isEmpty()) { return sb.toString(); } for (ServerName regionserver : regionservers) { List<String> queueIds = queueStorage.getAllQueues(regionserver); if (!liveRegionServers.contains(regionserver.getServerName())) { deadRegionServers.add(regionserver.getServerName()); } for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); if (!peerIds.contains(queueInfo.getPeerId())) { deletedQueues.add(regionserver + "/" + queueId); sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); } else { sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); } } } return sb.toString(); }
@Test public void testCleanupFailoverQueues() throws Exception { Server server = new DummyServer("hostname1.example.org"); ReplicationQueueStorage rq = ReplicationStorageFactory .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); // populate some znodes in the peer znode SortedSet<String> files = new TreeSet<>(); String group = "testgroup"; String file1 = group + ".log1"; String file2 = group + ".log2"; files.add(file1); files.add(file2); for (String file : files) { rq.addWAL(server.getServerName(), "1", file); } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration()); rp1.init(); NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName()); w1.run(); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); manager.cleanOldLogs(file2, id, true); // log1 should be deleted assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); }
@BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); server = new DummyServer(); conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); HMaster.decorateMasterConfiguration(conf); rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf); rp.init(); rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); fs = FileSystem.get(conf); }
public void initialize(final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d") .setDaemon(true) .build()); if (replication) { try { this.replicationQueues = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server); this.replicationQueues.init(this.server.getServerName().toString()); this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server); this.replicationPeers.init(); this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, this.conf, this.server, this.server); } catch (ReplicationException e) { throw new IOException("Failed replication handler create", e); } UUID clusterId = null; try { clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper()); } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } this.replicationManager = new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); this.replicationLoad = new ReplicationLoad(); } else { this.replicationManager = null; this.replicationQueues = null; this.replicationPeers = null; this.replicationTracker = null; this.replicationLoad = null; } }
@Test public void testClaimQueues() throws Exception { LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("hostname0.example.org"); ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { rq.addLog("1", file); } // create 3 DummyServers Server s1 = new DummyServer("dummyserver1.example.org"); Server s2 = new DummyServer("dummyserver2.example.org"); Server s3 = new DummyServer("dummyserver3.example.org"); // create 3 DummyNodeFailoverWorkers DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker( server.getServerName().getServerName(), s1); DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker( server.getServerName().getServerName(), s2); DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker( server.getServerName().getServerName(), s3); latch = new CountDownLatch(3); // start the threads w1.start(); w2.start(); w3.start(); // make sure only one is successful int populatedMap = 0; // wait for result now... till all the workers are done. latch.await(); populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() + w3.isLogZnodesMapPopulated(); assertEquals(1, populatedMap); server.abort("", null); }
@Test public void testNodeFailoverDeadServerParsing() throws Exception { LOG.debug("testNodeFailoverDeadServerParsing"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); repQueues.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { repQueues.addLog("1", file); } // create 3 DummyServers Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal"); Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com"); Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); // simulate three servers fail sequentially ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); SortedMap<String, SortedSet<String>> testMap = rq1.claimQueues(server.getServerName().getServerName()); ReplicationQueues rq2 = ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2); rq2.init(s2.getServerName().toString()); testMap = rq2.claimQueues(s1.getServerName().getServerName()); ReplicationQueues rq3 = ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3); rq3.init(s3.getServerName().toString()); testMap = rq3.claimQueues(s2.getServerName().getServerName()); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey()); List<String> result = replicationQueueInfo.getDeadRegionServers(); // verify assertTrue(result.contains(server.getServerName().getServerName())); assertTrue(result.contains(s1.getServerName().getServerName())); assertTrue(result.contains(s2.getServerName().getServerName())); server.abort("", null); }
@Test public void testLogCleaning() throws Exception{ Configuration conf = TEST_UTIL.getConfiguration(); // set TTL long ttl = 10000; conf.setLong("hbase.master.logcleaner.ttl", ttl); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); Replication.decorateMasterConfiguration(conf); Server server = new DummyServer(); ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); repQueues.init(server.getServerName().toString()); final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8"); final FileSystem fs = FileSystem.get(conf); // Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files long now = System.currentTimeMillis(); fs.delete(oldLogDir, true); fs.mkdirs(oldLogDir); // Case 1: 2 invalid files, which would be deleted directly fs.createNewFile(new Path(oldLogDir, "a")); fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a")); // Case 2: 1 "recent" file, not even deletable for the first log cleaner // (TimeToLiveLogCleaner), so we are not going down the chain System.out.println("Now is: " + now); for (int i = 1; i < 31; i++) { // Case 3: old files which would be deletable for the first log cleaner // (TimeToLiveLogCleaner), and also for the second (ReplicationLogCleaner) Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i) ); fs.createNewFile(fileName); // Case 4: put 3 old log files in ZK indicating that they are scheduled // for replication so these files would pass the first log cleaner // (TimeToLiveLogCleaner) but would be rejected by the second // (ReplicationLogCleaner) if (i % (30/3) == 1) { repQueues.addLog(fakeMachineName, fileName.getName()); System.out.println("Replication log file: " + fileName); } } // sleep for sometime to get newer modifcation time Thread.sleep(ttl); fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now)); // Case 2: 1 newer file, not even deletable for the first log cleaner // (TimeToLiveLogCleaner), so we are not going down the chain fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) )); for (FileStatus stat : fs.listStatus(oldLogDir)) { System.out.println(stat.getPath().toString()); } assertEquals(34, fs.listStatus(oldLogDir).length); LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); cleaner.chore(); // We end up with the current log file, a newer one and the 3 old log // files which are scheduled for replication TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return 5 == fs.listStatus(oldLogDir).length; } }); for (FileStatus file : fs.listStatus(oldLogDir)) { System.out.println("Kept log files: " + file.getPath().getName()); } }
public void initialize(final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d") .setDaemon(true) .build()); if (replication) { try { this.replicationQueues = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server); this.replicationQueues.init(this.server.getServerName().toString()); this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server); this.replicationPeers.init(); this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, this.conf, this.server, this.server); } catch (ReplicationException e) { throw new IOException("Failed replication handler create", e); } UUID clusterId = null; try { clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper()); } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } this.replicationManager = new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); } else { this.replicationManager = null; this.replicationQueues = null; this.replicationPeers = null; this.replicationTracker = null; } }
@Override public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, WALProvider walProvider) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.isReplicationForBulkLoadDataEnabled = ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf); this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d") .setDaemon(true) .build()); if (this.isReplicationForBulkLoadDataEnabled) { if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) { throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is set to true."); } } try { this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf); this.replicationPeers.init(); this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.server, this.server); } catch (Exception e) { throw new IOException("Failed replication handler create", e); } UUID clusterId = null; try { clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper()); } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); if (walProvider != null) { walProvider .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); } this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); this.replicationLoad = new ReplicationLoad(); this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager); }
@Test public void testLogCleaning() throws Exception{ Configuration conf = TEST_UTIL.getConfiguration(); // set TTL long ttl = 10000; conf.setLong("hbase.master.logcleaner.ttl", ttl); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); Replication.decorateMasterConfiguration(conf); Server server = new DummyServer(); ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); repQueues.init(server.getServerName().toString()); Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8"); FileSystem fs = FileSystem.get(conf); LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); // Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files long now = System.currentTimeMillis(); fs.delete(oldLogDir, true); fs.mkdirs(oldLogDir); // Case 1: 2 invalid files, which would be deleted directly fs.createNewFile(new Path(oldLogDir, "a")); fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a")); // Case 2: 1 "recent" file, not even deletable for the first log cleaner // (TimeToLiveLogCleaner), so we are not going down the chain System.out.println("Now is: " + now); for (int i = 1; i < 31; i++) { // Case 3: old files which would be deletable for the first log cleaner // (TimeToLiveLogCleaner), and also for the second (ReplicationLogCleaner) Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i) ); fs.createNewFile(fileName); // Case 4: put 3 old log files in ZK indicating that they are scheduled // for replication so these files would pass the first log cleaner // (TimeToLiveLogCleaner) but would be rejected by the second // (ReplicationLogCleaner) if (i % (30/3) == 1) { repQueues.addLog(fakeMachineName, fileName.getName()); System.out.println("Replication log file: " + fileName); } } // sleep for sometime to get newer modifcation time Thread.sleep(ttl); fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now)); // Case 2: 1 newer file, not even deletable for the first log cleaner // (TimeToLiveLogCleaner), so we are not going down the chain fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) )); for (FileStatus stat : fs.listStatus(oldLogDir)) { System.out.println(stat.getPath().toString()); } assertEquals(34, fs.listStatus(oldLogDir).length); cleaner.chore(); // We end up with the current log file, a newer one and the 3 old log // files which are scheduled for replication assertEquals(5, fs.listStatus(oldLogDir).length); for (FileStatus file : fs.listStatus(oldLogDir)) { System.out.println("Kept log files: " + file.getPath().getName()); } }