/** * @param client the client * @param path path to reap children from * @param executor executor to use for background tasks * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted * @param mode reaping mode * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster */ public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath) { this.client = client; this.mode = mode; this.executor = new CloseableScheduledExecutorService(executor); this.reapingThresholdMs = reapingThresholdMs; this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderPath); addPath(path); }
@Test public void testSomeNodes() throws Exception { Timing timing = new Timing(); ChildReaper reaper = null; CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); try { client.start(); Random r = new Random(); int nonEmptyNodes = 0; for ( int i = 0; i < 10; ++i ) { client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i)); if ( r.nextBoolean() ) { client.create().forPath("/test/" + Integer.toString(i) + "/foo"); ++nonEmptyNodes; } } reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1); reaper.start(); timing.forWaiting().sleepABit(); Stat stat = client.checkExists().forPath("/test"); Assert.assertEquals(stat.getNumChildren(), nonEmptyNodes); } finally { CloseableUtils.closeQuietly(reaper); CloseableUtils.closeQuietly(client); } }
@Test public void testSimple() throws Exception { Timing timing = new Timing(); ChildReaper reaper = null; CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); try { client.start(); for ( int i = 0; i < 10; ++i ) { client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i)); } reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1); reaper.start(); timing.forWaiting().sleepABit(); Stat stat = client.checkExists().forPath("/test"); Assert.assertEquals(stat.getNumChildren(), 0); } finally { CloseableUtils.closeQuietly(reaper); CloseableUtils.closeQuietly(client); } }
@Test public void testMultiPath() throws Exception { Timing timing = new Timing(); ChildReaper reaper = null; CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); try { client.start(); for ( int i = 0; i < 10; ++i ) { client.create().creatingParentsIfNeeded().forPath("/test1/" + Integer.toString(i)); client.create().creatingParentsIfNeeded().forPath("/test2/" + Integer.toString(i)); client.create().creatingParentsIfNeeded().forPath("/test3/" + Integer.toString(i)); } reaper = new ChildReaper(client, "/test2", Reaper.Mode.REAP_UNTIL_DELETE, 1); reaper.start(); reaper.addPath("/test1"); timing.forWaiting().sleepABit(); Stat stat = client.checkExists().forPath("/test1"); Assert.assertEquals(stat.getNumChildren(), 0); stat = client.checkExists().forPath("/test2"); Assert.assertEquals(stat.getNumChildren(), 0); stat = client.checkExists().forPath("/test3"); Assert.assertEquals(stat.getNumChildren(), 10); } finally { CloseableUtils.closeQuietly(reaper); CloseableUtils.closeQuietly(client); } }
@Test public void testNamespace() throws Exception { Timing timing = new Timing(); ChildReaper reaper = null; CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .sessionTimeoutMs(timing.session()) .connectionTimeoutMs(timing.connection()) .retryPolicy(new RetryOneTime(1)) .namespace("foo") .build(); try { client.start(); for ( int i = 0; i < 10; ++i ) { client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i)); } reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1); reaper.start(); timing.forWaiting().sleepABit(); Stat stat = client.checkExists().forPath("/test"); Assert.assertEquals(stat.getNumChildren(), 0); stat = client.usingNamespace(null).checkExists().forPath("/foo/test"); Assert.assertNotNull(stat); Assert.assertEquals(stat.getNumChildren(), 0); } finally { CloseableUtils.closeQuietly(reaper); CloseableUtils.closeQuietly(client); } }
@PostConstruct public void init() { taskRunnerExecutor = new ExecutorCompletionService<TaskResult>(createTaskExecutor()); priorityTaskRunnerExecutor = new ExecutorCompletionService<TaskResult>(createPriorityTaskExecutor()); Version version = ApplicationVersion.get(); String prefix; if( version.isDevelopment() ) { prefix = "tasksdev/"; } else { prefix = "tasks-" + version.getMmr() + '/'; } ZK_TASKPATH = prefix + ZK_TASKPATH; ZK_GLOBALTASKPATH = prefix + ZK_GLOBALTASKPATH; ZK_TASKOWNERPATH = prefix + ZK_TASKOWNERPATH; globalCache = zookeeperService.createPathCache(ZK_GLOBALTASKPATH, true); taskCache = zookeeperService.createPathCache(ZK_TASKPATH, false, this, StartMode.POST_INITIALIZED_EVENT); curator = zookeeperService.getCurator(); ourNodeId = zookeeperService.getNodeId(); final Reaper reaper = new Reaper(curator, 10000); try { reaper.start(); } catch( Exception e1 ) { Throwables.propagate(e1); } new TaskWatchThread("Task Finisher listener", taskRunnerExecutor, reaper).start(); new TaskWatchThread("Priority Task Finisher listener", priorityTaskRunnerExecutor, reaper).start(); }
public void close(Reaper reaper) { if( lockPath != null ) { try { curator.delete().forPath(lockPath); } catch( Exception e ) { reaper.addPath(lockPath); } } }
@Test public void createInjector_WithFrameworkAndExecutorProvidedAndBound_BindsReaper() { Injector inj = Guice.createInjector(new AbstractModule() { @Override protected void configure() { bind(ScheduledExecutorService.class).toInstance(executorService); bind(CuratorFramework.class).annotatedWith(Curator.class).toInstance(framework); } }, ReaperModuleBuilder.create().exectuor(Key.get(ScheduledExecutorService.class)).build()); inj.getInstance(Key.get(Reaper.class, Curator.class)); }
@Test public void createInjector_WithFrameworkAndNoExecutorProvided_BindsReaper() { Injector inj = Guice.createInjector(new AbstractModule() { @Override protected void configure() { bind(CuratorFramework.class).annotatedWith(Curator.class).toInstance(framework); } }, ReaperModuleBuilder.create().leaderPath("/reaperleader").build()); inj.getInstance(Key.get(Reaper.class, Curator.class)); }
public TaskWatchThread(String name, CompletionService<TaskResult> executor, Reaper reaper) { this.name = name; this.executor = executor; this.reaper = reaper; }
@Override public Module build() { checkState(lockPath != null, "Lock path must be provided."); checkState(mode != null, "Reaper mode must be provided."); return new AbstractModule() { @Override protected void configure() { if (getService() != null) { requireBinding(getService()); } final Key<ChildReaperManager> managerKey = holder.generateKey(ChildReaperManager.class); install(new PrivateModule() { @Override protected void configure() { if (getService() != null) { bind(ScheduledExecutorService.class).annotatedWith(Private.class).to(getService()); } else { bind(ScheduledExecutorService.class).annotatedWith(Private.class).toInstance( Reaper.newExecutorService()); } Key<ChildReaper> reaperKey = holder.generateKey(ChildReaper.class); bind(reaperKey).to(Key.get(ChildReaper.class, Private.class)).in(Singleton.class); expose(reaperKey); bind(managerKey).to(ChildReaperManager.class).in(Singleton.class); expose(managerKey); } @Provides @Private public ChildReaper childReaper(@Curator final CuratorFramework framework, @Private final ScheduledExecutorService executorService) { if (getLeaderPath() != null) { return new ChildReaper(framework, lockPath, mode, executorService, getReapingThresholdMillis(), getLeaderPath()); } else { return new ChildReaper(framework, lockPath, mode, executorService, getReapingThresholdMillis()); } } }); Multibinder<CuratorService> serviceMultibinder = Multibinder.newSetBinder(binder(), CuratorService.class); serviceMultibinder.addBinding().to(managerKey); } }; }
@Inject ReaperManager(@Private final Reaper reaper) { this.reaper = reaper; }
@Override public Module build() { return new AbstractModule() { @Override protected void configure() { if (getService() != null) { requireBinding(getService()); } final Key<ReaperManager> managerKey = holder.generateKey(ReaperManager.class); install(new PrivateModule() { @Override protected void configure() { if (getService() != null) { bind(ScheduledExecutorService.class).annotatedWith(Private.class).to(getService()); } else { bind(ScheduledExecutorService.class).annotatedWith(Private.class).toInstance( Reaper.newExecutorService()); } Key<Reaper> reaperKey = holder.generateKey(Reaper.class); bind(reaperKey).to(Key.get(Reaper.class, Private.class)).in(Singleton.class); expose(reaperKey); bind(managerKey).to(ReaperManager.class).in(Singleton.class); expose(managerKey); } @Provides @Private public Reaper reaper(@Curator final CuratorFramework framework, @Private final ScheduledExecutorService executorService) { if (getLeaderPath() != null) { return new Reaper(framework, executorService, getReapingThresholdMillis(), getLeaderPath()); } else { return new Reaper(framework, executorService, getReapingThresholdMillis()); } } }); Multibinder<CuratorService> serviceMultibinder = Multibinder.newSetBinder(binder(), CuratorService.class); serviceMultibinder.addBinding().to(managerKey); } }; }
@Test public void reaperMode_Valid_ReturnsSelf() { assertEquals(builder, builder.reaperMode(Reaper.Mode.REAP_INDEFINITELY)); }
@Test public void build_NoLockPath_ThrowsISE() { thrown.expect(IllegalStateException.class); builder.reaperMode(Reaper.Mode.REAP_INDEFINITELY).build(); }
@Test public void build_LockAndPathProvided_ReturnsNotNull() { assertNotNull(builder.reaperMode(Reaper.Mode.REAP_INDEFINITELY).lockPath("/path").build()); }
@Before public void setUp() { builder = ChildReaperModuleBuilder.create().lockPath("/path").reaperMode(Reaper.Mode.REAP_INDEFINITELY); }