public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) { final long periodInNanos = unit.toNanos(period); final long startInNanos = TimeUnit.MILLISECONDS.toNanos(now()) + unit.toNanos(initialDelay); final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); final Action0 action0 = action; Action0 recursiveAction = new Action0() { long count = 0; public void call() { if (!mas.isUnsubscribed()) { action0.call(); long j = startInNanos; long j2 = this.count + 1; this.count = j2; mas.set(Worker.this.schedule(this, (j + (j2 * periodInNanos)) - TimeUnit.MILLISECONDS.toNanos(Worker.this.now()), TimeUnit.NANOSECONDS)); } } }; MultipleAssignmentSubscription s = new MultipleAssignmentSubscription(); mas.set(s); s.set(schedule(recursiveAction, initialDelay, unit)); return mas; }
private Observable<Void> handleHystrixRequest(final HttpServerResponse<O> response) { writeHeaders(response); final Subject<Void, Void> subject = PublishSubject.create(); final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription(); Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation()) .subscribe(new Action1<Long>() { @Override public void call(Long tick) { if (!response.getChannel().isOpen()) { subscription.unsubscribe(); return; } try { writeMetric(JsonMapper.toJson(metrics), response); } catch (Exception e) { subject.onError(e); } } }); subscription.set(actionSubscription); return subject; }
private Observable<Void> handleHystrixRequest(final HttpServerResponse<O> response) { writeHeaders(response); final Subject<Void, Void> subject = PublishSubject.create(); final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription(); Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation()) .subscribe(new Action1<Long>() { @Override public void call(Long tick) { if (!response.getChannel().isOpen()) { subscription.unsubscribe(); return; } try { for (HystrixCommandMetrics commandMetrics : HystrixCommandMetrics.getInstances()) { writeMetric(JsonMapper.toJson(commandMetrics), response); } for (HystrixThreadPoolMetrics threadPoolMetrics : HystrixThreadPoolMetrics.getInstances()) { writeMetric(JsonMapper.toJson(threadPoolMetrics), response); } } catch (Exception e) { subject.onError(e); } } }); subscription.set(actionSubscription); return subject; }
@Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { if (delayTime <= 0) { return schedule(action); } if (isUnsubscribed()) { return Subscriptions.empty(); } ScheduledExecutorService service; if (executor instanceof ScheduledExecutorService) { service = (ScheduledExecutorService)executor; } else { service = GenericScheduledExecutorService.getInstance(); } final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); // tasks.add(mas); // Needs a removal without unsubscription try { Future<?> f = service.schedule(new Runnable() { @Override public void run() { if (mas.isUnsubscribed()) { return; } mas.set(schedule(action)); // tasks.delete(mas); // Needs a removal without unsubscription } }, delayTime, unit); mas.set(Subscriptions.from(f)); } catch (RejectedExecutionException t) { // report the rejection to plugins RxJavaPlugins.getInstance().getErrorHandler().handleError(t); throw t; } return mas; }
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { if (delayTime <= 0) { return schedule(action); } if (isUnsubscribed()) { return Subscriptions.unsubscribed(); } ScheduledExecutorService service; if (this.executor instanceof ScheduledExecutorService) { service = this.executor; } else { service = GenericScheduledExecutorService.getInstance(); } MultipleAssignmentSubscription first = new MultipleAssignmentSubscription(); final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); mas.set(first); this.tasks.add(mas); final Subscription removeMas = Subscriptions.create(new Action0() { public void call() { ExecutorSchedulerWorker.this.tasks.remove(mas); } }); ScheduledAction ea = new ScheduledAction(new Action0() { public void call() { if (!mas.isUnsubscribed()) { Subscription s2 = ExecutorSchedulerWorker.this.schedule(action); mas.set(s2); if (s2.getClass() == ScheduledAction.class) { ((ScheduledAction) s2).add(removeMas); } } } }); first.set(ea); try { ea.add(service.schedule(ea, delayTime, unit)); return removeMas; } catch (RejectedExecutionException t) { RxJavaPlugins.getInstance().getErrorHandler().handleError(t); throw t; } }
public Subscription schedulePeriodically( final Scheduler.Worker worker, final Action0 action, long initialDelay, Delay periodDelay, final TimeUnit unit) { final Delay period = periodDelay; final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(worker.now()); final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay); final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); final Action0 recursiveAction = new Action0() { long count; long lastNowNanos = firstNowNanos; long startInNanos = firstStartInNanos; @Override public void call() { if (!mas.isUnsubscribed()) { action.call(); long periodInNanos = unit.toNanos(period.calculate(count)); long nextTick; long nowNanos = TimeUnit.MILLISECONDS.toNanos(worker.now()); // If the clock moved in a direction quite a bit, rebase the repetition period if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos || nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) { nextTick = nowNanos + periodInNanos; /* * Shift the start point back by the drift as if the whole thing * started count periods ago. */ startInNanos = nextTick - (periodInNanos * (++count)); } else { nextTick = startInNanos + (++count * periodInNanos); } lastNowNanos = nowNanos; long delay = nextTick - nowNanos; mas.set(worker.schedule(this, delay, TimeUnit.NANOSECONDS)); } } }; MultipleAssignmentSubscription s = new MultipleAssignmentSubscription(); // Should call `mas.set` before `schedule`, or the new Subscription may replace the old one. mas.set(s); s.set(worker.schedule(recursiveAction, initialDelay, unit)); return mas; }
@Override protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { if (!msg.uri().startsWith(urlMapping)) { ctx.fireChannelRead(ReferenceCountUtil.retain(msg)); return; } logger.debug("Handling Hystrix stream request..."); final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.headers().set(CONTENT_TYPE, "text/event-stream;charset=UTF-8"); response.headers().set(CACHE_CONTROL, "no-cache, no-store, max-age=0, must-revalidate"); response.headers().add(PRAGMA, NO_CACHE); ctx.writeAndFlush(response); final Subject<Void, Void> subject = PublishSubject.create(); final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription(); Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation()) .subscribe(new Action1<Long>() { @Override public void call(Long tick) { if (!ctx.channel().isOpen()) { subscription.unsubscribe(); logger.debug("Stopping Hystrix Turbine stream to connection"); return; } try { Collection<HystrixCommandMetrics> hystrixCommandMetrics = HystrixCommandMetrics.getInstances(); Collection<HystrixThreadPoolMetrics> hystrixThreadPoolMetrics = HystrixThreadPoolMetrics.getInstances(); logger.debug("Found {} hystrix command metrics", hystrixCommandMetrics.size()); logger.debug("Found {} hystrix thread pool metrics", hystrixThreadPoolMetrics.size()); for (HystrixCommandMetrics commandMetrics : hystrixCommandMetrics) { writeMetric(toJson(commandMetrics), ctx); } for (HystrixThreadPoolMetrics threadPoolMetrics : hystrixThreadPoolMetrics) { writeMetric(toJson(threadPoolMetrics), ctx); } if (hystrixCommandMetrics.isEmpty() && hystrixThreadPoolMetrics.isEmpty()) { ctx.writeAndFlush(PING.duplicate()).addListener(CLOSE_ON_FAILURE); } else { ctx.flush(); } } catch (Exception e) { logger.error("Unexpected error", e); subject.onError(e); } } }); subscription.set(actionSubscription); }
/** * Schedules a cancelable action to be executed periodically. This default implementation schedules * recursively and waits for actions to complete (instead of potentially executing long-running actions * concurrently). Each scheduler that can do periodic scheduling in a better way should override this. * <p> * Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as * undelayed scheduling of the first and any subsequent executions. * * @param action * the Action to execute periodically * @param initialDelay * time to wait before executing the action for the first time; non-positive values indicate * an undelayed schedule * @param period * the time interval to wait each time in between executing the action; non-positive values * indicate no delay between repeated schedules * @param unit * the time unit of {@code period} * @return a subscription to be able to unsubscribe the action (unschedule it if not executed) */ public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { final long periodInNanos = unit.toNanos(period); final long startInNanos = TimeUnit.MILLISECONDS.toNanos(now()) + unit.toNanos(initialDelay); final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); final Action0 recursiveAction = new Action0() { long count = 0; @Override public void call() { if (!mas.isUnsubscribed()) { action.call(); long nextTick = startInNanos + (++count * periodInNanos); mas.set(schedule(this, nextTick - TimeUnit.MILLISECONDS.toNanos(now()), TimeUnit.NANOSECONDS)); } } }; mas.set(schedule(recursiveAction, initialDelay, unit)); return mas; }