private Func1<Chat, Observable<Users>> getUsers() { return new Func1<Chat, Observable<Users>>() { @Override public Observable<Users> call(Chat chat) { firstKey = chat.getFirstKey(); List<Observable<User>> list = new ArrayList<>(); for (Message m : chat.getMessages()) list.add(userService.getUser(m.getUid())); return Observable.zip(list, new FuncN<Users>() { @Override public Users call(Object... args) { ArrayList<User> users = new ArrayList<>(); for (Object o: args) users.add((User)o); return new Users(users); } }); } }; }
private Observable example3() { return Observable.interval(500, TimeUnit.MILLISECONDS).take(10) .withLatestFrom( mObservableList, new FuncN<String>() { @Override public String call(Object... args) { String s = "["; for (Object object : args) { s += object + ","; } s += "]"; return s; } }); }
private Observable example4() { return Observable.interval(500, TimeUnit.MILLISECONDS).take(10) .withLatestFrom( mObservables, new FuncN<String>() { @Override public String call(Object... args) { String s = "["; for (Object object : args) { s += object + ","; } s += "]"; return s; } }); }
@Override public void executeTask(final Ui ui, int totalTask) { List<Observable<ApiResponse>> calls = new ArrayList<>(); for (int i = 0; i < totalTask; i++) { Observable<ApiResponse> apiResponseObservable = apiCall.callObservable(i + 1); Observable<ApiResponse> observableOnNewThread = apiResponseObservable.subscribeOn(Schedulers.newThread()); calls.add(observableOnNewThread); } Observable.zip(calls, new FuncN<Long>() { @Override public Long call(Object... args) { return System.currentTimeMillis(); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Long>() { @Override public void call(Long time) { ui.showTime(time); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { ui.showError("error " + throwable); } }); }
/** * Occasionally zip may be invoked with 0 observables. Test that we don't block indefinitely instead * of immediately invoking zip with 0 argument. * * We now expect an NoSuchElementException since last() requires at least one value and nothing will be emitted. */ @Test(expected = NoSuchElementException.class) public void nonBlockingObservable() { final Object invoked = new Object(); Collection<Observable<Object>> observables = Collections.emptyList(); Observable<Object> result = Observable.zip(observables, new FuncN<Object>() { @Override public Object call(final Object... args) { System.out.println("received: " + args); assertEquals("No argument should have been passed", 0, args.length); return invoked; } }); assertSame(invoked, result.toBlocking().last()); }
@Test public void testZeroSources() { Observable<Object> result = Observable.combineLatest(Collections.<Observable<Object>> emptyList(), new FuncN<Object>() { @Override public Object call(Object... args) { return args; } }); @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); result.subscribe(o); verify(o).onComplete(); verify(o, never()).onNext(any()); verify(o, never()).onError(any(Throwable.class)); }
@SuppressWarnings("unchecked") @Test public void testCollectionSizeDifferentThanFunction() { FuncN<String> zipr = Functions.fromFunc(getConcatStringIntegerIntArrayZipr()); //Func3<String, Integer, int[], String> /* define a Observer to receive aggregated events */ Observer<String> observer = mock(Observer.class); @SuppressWarnings("rawtypes") Collection ws = java.util.Collections.singleton(Observable.just("one", "two")); Observable<String> w = Observable.zip(ws, zipr); w.subscribe(observer); verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onComplete(); verify(observer, never()).onNext(any(String.class)); }
@Test public void testStartEmptyList() { final Object invoked = new Object(); Collection<Observable<Object>> observables = Collections.emptyList(); Observable<Object> o = Observable.zip(observables, new FuncN<Object>() { @Override public Object call(final Object... args) { assertEquals("No argument should have been passed", 0, args.length); return invoked; } }); TestSubscriber<Object> ts = new TestSubscriber<Object>(); o.subscribe(ts); ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS); ts.assertReceivedOnNext(Collections.emptyList()); }
/** * Expect NoSuchElementException instead of blocking forever as zip should emit onComplete() and no onNext * and last() expects at least a single response. */ @Test(expected = NoSuchElementException.class) public void testStartEmptyListBlocking() { final Object invoked = new Object(); Collection<Observable<Object>> observables = Collections.emptyList(); Observable<Object> o = Observable.zip(observables, new FuncN<Object>() { @Override public Object call(final Object... args) { assertEquals("No argument should have been passed", 0, args.length); return invoked; } }); o.toBlocking().last(); }
/** * Given a list of observables that emit a boolean condition AND all conditions whenever * any condition changes and emit the resulting condition when the final condition changes. * @param sources * @return */ public static Observable<Boolean> conditionAnder(List<Observable<Boolean>> sources) { return Observable.combineLatest(sources, new FuncN<Observable<Boolean>>() { @Override public Observable<Boolean> call(Object... args) { return Observable.from(args).cast(Boolean.class).firstOrDefault(true, new Func1<Boolean, Boolean>() { @Override public Boolean call(Boolean status) { return !status; } }); } }) .flatMap(new Func1<Observable<Boolean>, Observable<Boolean>>() { @Override public Observable<Boolean> call(Observable<Boolean> t1) { return t1; } }) .distinctUntilChanged(); }
/** * Occasionally zip may be invoked with 0 observables. Test that we don't block indefinitely instead * of immediately invoking zip with 0 argument. * * We now expect an IllegalArgumentException since last() requires at least one value and nothing will be emitted. */ @Test(expected = IllegalArgumentException.class) public void nonBlockingObservable() { final Object invoked = new Object(); Collection<Observable<Object>> observables = Collections.emptyList(); Observable<Object> result = Observable.zip(observables, new FuncN<Object>() { @Override public Object call(final Object... args) { System.out.println("received: " + args); assertEquals("No argument should have been passed", 0, args.length); return invoked; } }); assertSame(invoked, result.toBlockingObservable().last()); }
public static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) { List<Observable<?>> os = new ArrayList(); for (Observable<?> o : ws) { os.add(o); } return just(os.toArray(new Observable[os.size()])).lift(new OperatorZip(zipFunction)); }
public MultiSourceProducer(Subscriber<? super R> child, List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) { this.sources = sources; this.child = child; this.combinator = combinator; int n = sources.size(); this.subscribers = new MultiSourceRequestableSubscriber[n]; this.collectedValues = new Object[n]; this.haveValues = new BitSet(n); this.completion = new BitSet(n); }
public OnSubscribeCombineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) { this.sources = sources; this.combinator = combinator; if (sources.size() > RxRingBuffer.SIZE) { throw new IllegalArgumentException("More than RxRingBuffer.SIZE sources to combineLatest is not supported."); } }
public static final <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) { List<Observable<?>> os = new ArrayList(); for (Observable<?> o : ws) { os.add(o); } return just(os.toArray(new Observable[os.size()])).lift(new OperatorZip(zipFunction)); }
private Observable example1() { return Observable.zip(mObservables, new FuncN<String>() { @Override public String call(Object... args) { return getResult(args); } }); }
private Observable example2() { return Observable.zip(mObservableList, new FuncN<String>() { @Override public String call(Object... args) { return getResult(args); } }); }
private Observable example3() { return Observable.zip(Observable.just(Observable.range(1, 10), Observable.range(15, 20)), new FuncN<String>() { @Override public String call(Object... args) { return getResult(args); } }); }
public OnSubscribeCombineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) { this.sources = sources; this.combinator = combinator; if (sources.size() > 128) { // For design simplicity this is limited to 128. If more are really needed we'll need to adjust // the design of how RxRingBuffer is used in the implementation below. throw new IllegalArgumentException("More than 128 sources to combineLatest is not supported."); } }
@SuppressWarnings("unchecked") public MultiSourceProducer(final Subscriber<? super R> child, final List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) { this.sources = sources; this.child = child; this.combinator = combinator; int n = sources.size(); this.subscribers = new MultiSourceRequestableSubscriber[n]; this.collectedValues = new Object[n]; this.haveValues = new BitSet(n); this.completion = new BitSet(n); }
@Test public void test1ToNSources() { int n = 30; FuncN<List<Object>> func = new FuncN<List<Object>>() { @Override public List<Object> call(Object... args) { return Arrays.asList(args); } }; for (int i = 1; i <= n; i++) { System.out.println("test1ToNSources: " + i + " sources"); List<Observable<Integer>> sources = new ArrayList<Observable<Integer>>(); List<Object> values = new ArrayList<Object>(); for (int j = 0; j < i; j++) { sources.add(Observable.just(j)); values.add(j); } Observable<List<Object>> result = Observable.combineLatest(sources, func); @SuppressWarnings("unchecked") Observer<List<Object>> o = mock(Observer.class); result.subscribe(o); verify(o).onNext(values); verify(o).onComplete(); verify(o, never()).onError(any(Throwable.class)); } }
@Test(timeout=10000) public void testCombineLatestRequestOverflow() throws InterruptedException { List<Observable<Integer>> sources = Arrays.asList(Observable.from(Arrays.asList(1,2,3,4)), Observable.from(Arrays.asList(5,6,7,8))); Observable<Integer> o = Observable.combineLatest(sources,new FuncN<Integer>() { @Override public Integer call(Object... args) { return (Integer) args[0]; }}); //should get at least 4 final CountDownLatch latch = new CountDownLatch(4); o.subscribeOn(Schedulers.computation()).subscribe(new Subscriber<Integer>() { @Override public void onStart() { request(2); } @Override public void onComplete() { //ignore } @Override public void onError(Throwable e) { throw new RuntimeException(e); } @Override public void onNext(Integer t) { latch.countDown(); request(Long.MAX_VALUE-1); }}); assertTrue(latch.await(10, TimeUnit.SECONDS)); }
/** * @author Soussi * * @param button * Checks for validity of the Validate Button */ public void RxValidateButton(final Button button) { Observable<CharSequence> signInFieldsSubscription = (Observable<CharSequence>) Observable.combineLatest((List<? extends Observable<?>>) customChangeObservable, new FuncN<Boolean>() { @Override public Boolean call(Object... args) { for(int i = 0; i < args.length; i++){ if(!args[i].toString().isEmpty()) { return false; } } return true; } }).observeOn(AndroidSchedulers.mainThread()) .subscribe((Observer<? super Boolean>) new Observer<Boolean>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(Boolean aBoolean) { if(aBoolean) { button.setEnabled(true); } else { button.setEnabled(false); } } }); compositeSubscription.add((Subscription) signInFieldsSubscription); }
public static <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) { return create(new OnSubscribeCombineLatest(sources, combineFunction)); }
public static <T, R> Observable<R> combineLatest(Iterable<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) { return create(new OnSubscribeCombineLatest(sources, combineFunction)); }
public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) { return create(new OnSubscribeCombineLatest(null, sources, combineFunction, RxRingBuffer.SIZE, true)); }
public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) { return ws.toList().map(InternalObservableUtils.TO_ARRAY).lift(new OperatorZip(zipFunction)); }
private Observable<List<AccessPolicy>> populateAccessPolicies() { List<Observable<?>>observables = new ArrayList<>(); for (final AccessPolicyImpl accessPolicy : accessPolicies) { if (accessPolicy.objectId() == null) { if (accessPolicy.userPrincipalName() != null) { observables.add(graphRbacManager.users().getByNameAsync(accessPolicy.userPrincipalName()) .subscribeOn(SdkContext.getRxScheduler()) .doOnNext(new Action1<ActiveDirectoryUser>() { @Override public void call(ActiveDirectoryUser user) { if (user == null) { throw new CloudException(String.format("User principal name %s is not found in tenant %s", accessPolicy.userPrincipalName(), graphRbacManager.tenantId()), null); } accessPolicy.forObjectId(user.id()); } })); } else if (accessPolicy.servicePrincipalName() != null) { observables.add(graphRbacManager.servicePrincipals().getByNameAsync(accessPolicy.servicePrincipalName()) .subscribeOn(SdkContext.getRxScheduler()) .doOnNext(new Action1<ServicePrincipal>() { @Override public void call(ServicePrincipal sp) { if (sp == null) { throw new CloudException(String.format("User principal name %s is not found in tenant %s", accessPolicy.userPrincipalName(), graphRbacManager.tenantId()), null); } accessPolicy.forObjectId(sp.id()); } })); } else { throw new IllegalArgumentException("Access policy must specify object ID."); } } } if (observables.isEmpty()) { return Observable.just(accessPolicies()); } else { return Observable.zip(observables, new FuncN<List<AccessPolicy>>() { @Override public List<AccessPolicy> call(Object... args) { return accessPolicies(); } }); } }
public SingleSourceProducer(Subscriber<? super R> child, Observable<? extends T> source, FuncN<? extends R> combinator) { this.source = source; this.child = child; this.combinator = combinator; this.subscriber = new SingleSourceRequestableSubscriber(child, combinator); }
SingleSourceRequestableSubscriber(Subscriber<? super R> child, FuncN<? extends R> combinator) { super(child); this.child = child; this.combinator = combinator; }
public Zip(Subscriber<? super R> child, FuncN<? extends R> zipFunction) { this.child = child; this.zipFunction = zipFunction; child.add(this.childSubscription); }
public OperatorZip(FuncN<? extends R> f) { this.zipFunction = f; }
public static final <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) { return create(new OnSubscribeCombineLatest(sources, combineFunction)); }
public static final <R> Observable<R> zip(Observable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) { return ws.toList().map(new 4()).lift(new OperatorZip(zipFunction)); }
private static final <T, R> Observable<R> combineSinglesDelayError(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) { return combineLatestDelayError(sources, combineFunction); }
private Observable example3() { Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(final Subscriber<? super String> subscriber) { final Scheduler.Worker worker = Schedulers.io().createWorker(); subscriber.add(worker); worker.schedulePeriodically(new Action0() { long counter = 0; @Override public void call() { if (counter == 8) { subscriber.onError(new Throwable()); } try { subscriber.onNext("Observable1 " + counter++ * 10); } catch (Throwable e) { try { worker.unsubscribe(); } finally { Exceptions.throwOrReport(e, subscriber); } } } }, 1000, 1000, TimeUnit.MILLISECONDS); } }); Observable<String> observable2 = Observable.interval(1, TimeUnit.SECONDS) .map(new Func1<Long, String>() { @Override public String call(Long position) { return "observable2 " + position * 10; } }).take(10); List<Observable<String>> observableList = new ArrayList<>(); observableList.add(observable1); observableList.add(observable2); return Observable.combineLatestDelayError(observableList, new FuncN<Object>() { @Override public Object call(Object... args) { List<String> results = new ArrayList<>(); for (Object object : args) { results.add((String) object); } return results; } }); }
public SingleSourceProducer(final Subscriber<? super R> child, Observable<? extends T> source, FuncN<? extends R> combinator) { this.source = source; this.child = child; this.combinator = combinator; this.subscriber = new SingleSourceRequestableSubscriber<T, R>(child, combinator); }
public Zip(final Subscriber<? super R> child, FuncN<? extends R> zipFunction) { this.child = child; this.zipFunction = zipFunction; child.add(childSubscription); }