Java 类io.reactivex.Scheduler.Worker 实例源码

项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerScheduleOnceUsesHook() {
    final CountingRunnable newCounter = new CountingRunnable();
    final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
    RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
        @Override public Runnable apply(Runnable runnable) {
            runnableRef.set(runnable);
            return newCounter;
        }
    });

    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    worker.schedule(counter);

    // Verify our runnable was passed to the schedulers hook.
    assertSame(counter, runnableRef.get());

    runUiThreadTasks();
    // Verify the scheduled runnable was the one returned from the hook.
    assertEquals(1, newCounter.get());
    assertEquals(0, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerScheduleOnceWithDelayUsesHook() {
    final CountingRunnable newCounter = new CountingRunnable();
    final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
    RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
        @Override public Runnable apply(Runnable runnable) {
            runnableRef.set(runnable);
            return newCounter;
        }
    });

    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    worker.schedule(counter, 1, MINUTES);

    // Verify our runnable was passed to the schedulers hook.
    assertSame(counter, runnableRef.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    // Verify the scheduled runnable was the one returned from the hook.
    assertEquals(1, newCounter.get());
    assertEquals(0, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test @Ignore("Implementation delegated to default RxJava implementation")
public void workerSchedulePeriodicallyReschedulesItself() {
    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    worker.schedulePeriodically(counter, 1, 1, MINUTES);

    runUiThreadTasks();
    assertEquals(0, counter.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(1, counter.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(2, counter.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(3, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test @Ignore("Implementation delegated to default RxJava implementation")
public void workerSchedulePeriodicallyDisposedDoesNotRun() {
    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    Disposable disposable = worker.schedulePeriodically(counter, 1, 1, MINUTES);

    runUiThreadTasks();
    assertEquals(0, counter.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(1, counter.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(2, counter.get());

    disposable.dispose();

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(2, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerUnsubscriptionDuringSchedulingCancelsScheduledAction() {
    final AtomicReference<Worker> workerRef = new AtomicReference<>();
    RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
        @Override public Runnable apply(Runnable runnable) {
            // Purposefully unsubscribe in an asinine point after the normal unsubscribed check.
            workerRef.get().dispose();
            return runnable;
        }
    });

    Worker worker = scheduler.createWorker();
    workerRef.set(worker);

    CountingRunnable counter = new CountingRunnable();
    worker.schedule(counter);

    runUiThreadTasks();
    assertEquals(0, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerUnsubscriptionDoesNotAffectOtherWorkers() {
    Worker workerA = scheduler.createWorker();
    CountingRunnable counterA = new CountingRunnable();
    workerA.schedule(counterA, 1, MINUTES);

    Worker workerB = scheduler.createWorker();
    CountingRunnable counterB = new CountingRunnable();
    workerB.schedule(counterB, 1, MINUTES);

    workerA.dispose();

    runUiThreadTasksIncludingDelayedTasks();
    assertEquals(0, counterA.get());
    assertEquals(1, counterB.get());
}
项目:RxJava2Swing    文件:SwingSchedulersTest.java   
@Test
public void workerPeriodic() throws Exception {
    Task t = new Task(3);

    Worker w = SwingSchedulers.edt().createWorker();
    try {
        Disposable d = w.schedulePeriodically(t, 100, 100, TimeUnit.MILLISECONDS);

        Assert.assertTrue(t.await(5, TimeUnit.SECONDS));

        d.dispose();

        Thread.sleep(500);

        Assert.assertEquals(3, t.calls);
    } finally {
        w.dispose();
    }
}
项目:RxJava2Swing    文件:SwingSchedulersTest.java   
@Test
public void workerDispose() throws Exception {
    Task t = new Task(1);

    Worker w = SwingSchedulers.edt().createWorker();
    try {
        Disposable d = w.schedule(t, 500, TimeUnit.MILLISECONDS);

        Thread.sleep(100);

        d.dispose();

        Thread.sleep(500);

        Assert.assertEquals(0, t.calls);
    } finally {
        w.dispose();
    }
}
项目:RxJava2Extensions    文件:SharedSchedulerTest.java   
@Test(timeout = 5000)
public void futureDisposeRace() throws Exception {
    SharedScheduler scheduler = new SharedScheduler(Schedulers.computation());
    try {
        Worker w = scheduler.createWorker();
        for (int i = 0; i < 1000; i++) {
           w.schedule(this);
        }

        while (calls != 1000) {
            Thread.sleep(100);
        }
    } finally {
        scheduler.shutdown();
    }
}
项目:RxJava2Extensions    文件:ParallelSchedulerTest.java   
protected void taskThrows(Scheduler s) throws InterruptedException {
    try {
        List<Throwable> errors = TestHelper.trackPluginErrors();

        Worker w = s.createWorker();

        w.schedule(new Runnable() {
            @Override
            public void run() {
                calls.getAndIncrement();
                throw new IllegalStateException();
            }
        });

        while (errors.isEmpty()) {
            Thread.sleep(20);
        }

        TestHelper.assertError(errors, 0, IllegalStateException.class);
    } finally {
        s.shutdown();
    }
}
项目:RxJava2Extensions    文件:ParallelSchedulerTest.java   
@Test
public void setFutureRace() {
    final Scheduler s = new ParallelScheduler(2, true);
    try {
        for (int i = 0; i < 1000; i++) {
            final Worker w = s.createWorker();

            Runnable r1 = new Runnable() {
                @Override
                public void run() {
                    w.schedule(ParallelSchedulerTest.this);
                }
            };

            Runnable r2 = new Runnable() {
                @Override
                public void run() {
                    w.dispose();
                }
            };
            TestHelper.race(r1, r2, Schedulers.single());
        }
    } finally {
        s.shutdown();
    }
}
项目:RxJava2Extensions    文件:BlockingSchedulerTest.java   
@Test(timeout = 10000)
public void workerCrash() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        final BlockingScheduler scheduler = new BlockingScheduler();
        scheduler.execute(new Action() {
            @Override
            public void run() throws Exception {
                final Worker worker = scheduler.createWorker();
                worker.schedule(new Runnable() {
                    @Override
                    public void run() {
                        worker.dispose();
                        scheduler.shutdown();
                        throw new IllegalArgumentException();
                    }
                });
            }
        });

        TestHelper.assertError(errors, 0, IllegalArgumentException.class);
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:rxjava2-extras    文件:SchedulerHelper.java   
public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads, long timeout, TimeUnit unit) {
    final CountDownLatch latch = new CountDownLatch(numThreads);
    for (int i = 1; i <= numThreads; i++) {
        final Worker worker = scheduler.createWorker();
        worker.schedule(new Runnable() {
            @Override
            public void run() {
                worker.dispose();
                latch.countDown();
            }
        });
    }
    try {
        boolean finished = latch.await(timeout, unit);
        if (!finished) {
            throw new RuntimeException("timeout occured waiting for work to finish");
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerScheduleOncePostsImmediately() {
    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    worker.schedule(counter);

    runUiThreadTasks();
    assertEquals(1, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerScheduleOnceWithNegativeDelayPostsImmediately() {
    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    worker.schedule(counter, -1, TimeUnit.MINUTES);

    runUiThreadTasks();
    assertEquals(1, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerScheduleOnceDisposedDoesNotRun() {
    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    Disposable disposable = worker.schedule(counter);
    disposable.dispose();

    runUiThreadTasks();
    assertEquals(0, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerScheduleOnceWithDelayPostsWithDelay() {
    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    worker.schedule(counter, 1, MINUTES);

    runUiThreadTasks();
    assertEquals(0, counter.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(1, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerScheduleOnceWithDelayDisposedDoesNotRun() {
    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    Disposable disposable = worker.schedule(counter, 1, MINUTES);

    idleMainLooper(30, SECONDS);
    disposable.dispose();

    idleMainLooper(30, SECONDS);
    runUiThreadTasks();
    assertEquals(0, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test @Ignore("Implementation delegated to default RxJava implementation")
public void workerSchedulePeriodicallyUsesHookOnce() {
    Worker worker = scheduler.createWorker();

    final CountingRunnable newCounter = new CountingRunnable();
    final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
    RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
        @Override public Runnable apply(Runnable runnable) {
            runnableRef.set(runnable);
            return newCounter;
        }
    });

    CountingRunnable counter = new CountingRunnable();
    worker.schedulePeriodically(counter, 1, 1, MINUTES);

    // Verify our action was passed to the schedulers hook.
    assertSame(counter, runnableRef.get());
    runnableRef.set(null);

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    // Verify the scheduled action was the one returned from the hook.
    assertEquals(1, newCounter.get());
    assertEquals(0, counter.get());

    // Ensure the hook was not called again when the runnable re-scheduled itself.
    assertNull(runnableRef.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test @Ignore("Implementation delegated to default RxJava implementation")
public void workerSchedulePeriodicallyDisposedDuringRunDoesNotReschedule() {
    Worker worker = scheduler.createWorker();

    final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
    CountingRunnable counter = new CountingRunnable() {
        @Override public void run() {
            super.run();
            if (get() == 2) {
                disposableRef.get().dispose();
            }
        }
    };
    Disposable disposable = worker.schedulePeriodically(counter, 1, 1, MINUTES);
    disposableRef.set(disposable);

    runUiThreadTasks();
    assertEquals(0, counter.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(1, counter.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(2, counter.get());

    // Dispose will have happened here during the last run() execution.

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(2, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test @Ignore("Implementation delegated to default RxJava implementation")
public void workerSchedulePeriodicallyThrowingDoesNotReschedule() {
    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable() {
        @Override public void run() {
            super.run();
            if (get() == 2) {
                throw new RuntimeException("Broken!");
            }
        }
    };
    worker.schedulePeriodically(counter, 1, 1, MINUTES);

    runUiThreadTasks();
    assertEquals(0, counter.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(1, counter.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(2, counter.get());

    // Exception will have happened here during the last run() execution.

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    assertEquals(2, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerDisposableTracksDisposedState() {
    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    Disposable disposable = worker.schedule(counter);
    assertFalse(disposable.isDisposed());

    disposable.dispose();
    assertTrue(disposable.isDisposed());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerDisposeCancelsScheduled() {
    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    worker.schedule(counter, 1, MINUTES);

    worker.dispose();

    runUiThreadTasks();
    assertEquals(0, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerTracksDisposedState() {
    Worker worker = scheduler.createWorker();
    assertFalse(worker.isDisposed());

    worker.dispose();
    assertTrue(worker.isDisposed());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void disposedWorkerReturnsDisposedDisposables() {
    Worker worker = scheduler.createWorker();
    worker.dispose();

    Disposable disposable = worker.schedule(new CountingRunnable());
    assertTrue(disposable.isDisposed());
}
项目:RxJava2Swing    文件:SwingSchedulersTest.java   
@Test
public void worker() throws Exception {
    Task t = new Task(1);

    Worker w = SwingSchedulers.edt().createWorker();
    try {
        w.schedule(t);

        Assert.assertTrue(t.await(5, TimeUnit.SECONDS));
    } finally {
        w.dispose();
    }
}
项目:RxJava2Swing    文件:SwingSchedulersTest.java   
@Test
public void workerDelay() throws Exception {
    Task t = new Task(1);

    Worker w = SwingSchedulers.edt().createWorker();
    try {
        w.schedule(t, 100, TimeUnit.MILLISECONDS);

        Assert.assertTrue(t.await(5, TimeUnit.SECONDS));
    } finally {
        w.dispose();
    }
}
项目:rxjava2-jdbc    文件:MemberSingle.java   
private boolean tryEmit(Observers<T> obs, DecoratingMember<T> m) {
    // get a fresh worker each time so we jump threads to
    // break the stack-trace (a long-enough chain of
    // checkout-checkins could otherwise provoke stack
    // overflow)

    // advance counter so the next and choose an Observer to emit to (round robin)

    int index = obs.index;
    MemberSingleObserver<T> o = obs.observers[index];
    MemberSingleObserver<T> oNext = o;
    // atomically bump up the index (if that entry has not been deleted in
    // the meantime by disposal)
    while (true) {
        Observers<T> x = observers.get();
        if (x.index == index && x.observers[index] == o) {
            boolean[] active = new boolean[x.active.length];
            System.arraycopy(x.active, 0, active, 0, active.length);
            int nextIndex = (index + 1) % active.length;
            while (nextIndex != index && !active[nextIndex]) {
                nextIndex = (nextIndex + 1) % active.length;
            }
            active[nextIndex] = false;
            if (observers.compareAndSet(x, new Observers<T>(x.observers, active, x.activeCount - 1, nextIndex))) {
                oNext = x.observers[nextIndex];
                break;
            }
        } else {
            // checkin because no active observers
            m.checkin();
            return false;
        }
    }
    Worker worker = scheduler.createWorker();
    worker.schedule(new Emitter<T>(worker, oNext, m));
    return true;
}
项目:RxJava2Extensions    文件:FlowableOnBackpressureTimeout.java   
OnBackpressureTimeoutSubscriber(Subscriber<? super T> actual, int maxSize, long timeout, TimeUnit unit,
        Worker worker, Consumer<? super T> onEvict) {
    this.actual = actual;
    this.maxSizeDouble = maxSize << 1;
    this.timeout = timeout;
    this.unit = unit;
    this.worker = worker;
    this.onEvict = onEvict;
    this.requested = new AtomicLong();
    this.queue = new ArrayDeque<Object>();
}
项目:RxJava2Extensions    文件:FlowableZipLatest.java   
@SuppressWarnings("unchecked")
ZipLatestCoordinator(Subscriber<? super R> actual, int n, Worker worker, Function<? super Object[], ? extends R> combiner) {
    super(n);
    this.actual = actual;
    this.subscribers = new InnerSubscriber[n];
    this.wip = new AtomicInteger();
    this.requested = new AtomicLong();
    this.errors = new AtomicThrowable();
    this.worker = worker;
    for (int i = 0; i < n; i++) {
        subscribers[i] = new InnerSubscriber<T>(this, i);
    }
    this.combiner = combiner;
}
项目:RxJava2Extensions    文件:FlowableTimeoutLast.java   
TimeoutLast(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
    super(actual);
    this.timeout = timeout;
    this.unit = unit;
    this.worker = worker;
    this.task = new SequentialDisposable();
    this.index = new AtomicLong();
    this.value = new AtomicReference<T>();
}
项目:RxJava2Extensions    文件:FlowableSpanout.java   
SpanoutSubscriber(Subscriber<? super T> actual, long initialSpan, long betweenSpan,
        Worker worker, boolean delayError, int bufferSize) {
    this.actual = actual;
    this.initialSpan = initialSpan;
    this.betweenSpan = betweenSpan;
    this.worker = worker;
    this.delayError = delayError;
    this.lastEvent = -1L;
    this.queue = new SpscLinkedArrayQueue<T>(bufferSize);
}
项目:RxJava2Extensions    文件:SoloSubscribeOn.java   
@Override
protected void subscribeActual(Subscriber<? super T> s) {
    Worker worker = scheduler.createWorker();

    SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(s, worker, source);
    s.onSubscribe(parent);

    DisposableHelper.replace(parent.task, worker.schedule(parent));
}
项目:RxJava2Extensions    文件:SoloSubscribeOn.java   
SubscribeOnSubscriber(Subscriber<? super T> actual, Worker worker, Publisher<T> source) {
    this.actual = actual;
    this.worker = worker;
    this.source = source;
    this.task = new AtomicReference<Disposable>();
    this.requested = new AtomicBoolean();
}
项目:RxJava2Extensions    文件:PerhapsSubscribeOn.java   
@Override
protected void subscribeActual(Subscriber<? super T> s) {
    Worker worker = scheduler.createWorker();

    SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(s, worker, source);
    s.onSubscribe(parent);

    DisposableHelper.replace(parent.task, worker.schedule(parent));
}
项目:RxJava2Extensions    文件:ParallelSchedulerTest.java   
void cancelledTask(Scheduler s) throws InterruptedException {
    try {
        Worker w = s.createWorker();

        try {
            assertFalse(w.isDisposed());

            Disposable d = w.schedule(this, 200, TimeUnit.MILLISECONDS);

            assertFalse(d.isDisposed());

            d.dispose();

            assertTrue(d.isDisposed());

            Thread.sleep(300);

            assertEquals(0, calls.get());
            w.dispose();

            assertTrue(w.isDisposed());
        } finally {
            w.dispose();
        }
    } finally {
        s.shutdown();
    }
}
项目:rxjava2-extras    文件:FlowableOnBackpressureBufferToFile.java   
@Override
protected void subscribeActual(Subscriber<? super T> child) {
    PagedQueue queue = new PagedQueue(options.fileFactory(), options.pageSizeBytes());
    Worker worker = options.scheduler().createWorker();
    if (source != null) {
        source.subscribe(
                new BufferToFileSubscriberFlowable<T>(child, queue, serializer, worker));
    } else {
        source2.subscribe(
                new BufferToFileSubscriberObservable<T>(child, queue, serializer, worker));
    }
}
项目:rxjava2-extras    文件:FlowableOnBackpressureBufferToFile.java   
BufferToFileSubscriber(Subscriber<? super T> child, PagedQueue queue,
        Serializer<T> serializer, Worker worker) {
    this.child = child;
    this.queue = queue;
    this.serializer = serializer;
    this.worker = worker;
}
项目:rxjava2-extras    文件:SchedulerHelperTest.java   
@Test
public void testDispose() {
    Scheduler s = SchedulerHelper.withThreadId(Schedulers.trampoline(), "boo");
    Worker w = s.createWorker();
    Assert.assertFalse(w.isDisposed());
    w.dispose();
    Assert.assertTrue(w.isDisposed());
}
项目:rxjava2-extras    文件:FlowableOnBackpressureBufferToFileTest.java   
@Test
public void testPollQueueThrowsExceptionEmitsError() {
    PagedQueue queue = Mockito.mock(PagedQueue.class);
    RuntimeException err = new RuntimeException();
    Mockito.doThrow(err).when(queue).poll();
    Worker worker = Schedulers.trampoline().createWorker();
    TestSubscriber<String> ts = TestSubscriber.create(1);
    BufferToFileSubscriberFlowable<String> b = new BufferToFileSubscriberFlowable<String>(ts, queue,
            Serializers.utf8(), worker);
    b.onSubscribe(IGNORE);
    b.request(1);
    b.run();
    Mockito.verify(queue, Mockito.atLeastOnce()).poll();
    ts.assertError(err);
}