From dda401ceedc0427c60a09545b7ec53de335f25f0 Mon Sep 17 00:00:00 2001 From: Michael Kunz Date: Wed, 25 Oct 2017 12:19:20 +0200 Subject: [PATCH] add overloads to JavaFxObserver and JavaFxSubscriber which allow to unmask null values in an Optional or guarded by a sentinel --- .../rxjavafx/observers/BindingObserver.java | 44 ++++--- .../rxjavafx/observers/BindingSubscriber.java | 66 ++++++---- .../rxjavafx/observers/JavaFxObserver.java | 105 +++++++++++++-- .../rxjavafx/observers/JavaFxSubscriber.java | 118 ++++++++++++++--- .../rxjavafx/subscriptions/BindingTest.java | 123 ++++++++++++++++-- 5 files changed, 375 insertions(+), 81 deletions(-) mode change 100644 => 100755 src/main/java/io/reactivex/rxjavafx/observers/BindingObserver.java mode change 100644 => 100755 src/main/java/io/reactivex/rxjavafx/observers/BindingSubscriber.java mode change 100644 => 100755 src/main/java/io/reactivex/rxjavafx/observers/JavaFxObserver.java mode change 100644 => 100755 src/main/java/io/reactivex/rxjavafx/observers/JavaFxSubscriber.java mode change 100644 => 100755 src/test/java/io/reactivex/rxjavafx/subscriptions/BindingTest.java diff --git a/src/main/java/io/reactivex/rxjavafx/observers/BindingObserver.java b/src/main/java/io/reactivex/rxjavafx/observers/BindingObserver.java old mode 100644 new mode 100755 index dc59b32..0e29a22 --- a/src/main/java/io/reactivex/rxjavafx/observers/BindingObserver.java +++ b/src/main/java/io/reactivex/rxjavafx/observers/BindingObserver.java @@ -1,12 +1,12 @@ /** * Copyright 2017 Netflix, Inc. - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,6 +19,7 @@ import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Consumer; +import io.reactivex.functions.Function; import io.reactivex.observables.ConnectableObservable; import javafx.beans.InvalidationListener; import javafx.beans.binding.Binding; @@ -26,21 +27,24 @@ import javafx.beans.value.ObservableValue; import javafx.collections.ObservableList; +final class BindingObserver implements Observer, ObservableValue, Binding { -final class BindingObserver implements Observer, ObservableValue, Binding { - - private final Consumer onError; + private final Function unmaskingFunction; + private final Consumer onError; private final ConnectableObservable obs; private boolean connected = false; - private Disposable disposable; - private ExpressionHelper helper; - private T value; + private Disposable disposable; + private ExpressionHelper helper; + private S value; - BindingObserver(Consumer onError) { + BindingObserver(Function unmaskingFunction, Consumer onError) { + this.unmaskingFunction = unmaskingFunction; this.onError = onError; this.obs = null; } - BindingObserver(ConnectableObservable obs, Consumer onError) { + + BindingObserver(Function unmaskingFunction, ConnectableObservable obs, Consumer onError) { + this.unmaskingFunction = unmaskingFunction; this.onError = onError; this.obs = obs; } @@ -66,17 +70,23 @@ public void onError(Throwable e) { @Override public void onNext(T t) { - value = t; - fireValueChangedEvent(); + try { + value = unmaskingFunction.apply(t); + fireValueChangedEvent(); + } catch (Exception e) { + onError(e); + } } + @Override - public T getValue() { + public S getValue() { if (!connected && obs != null) { obs.connect(); connected = true; } return value; } + @Override public boolean isValid() { return true; @@ -111,7 +121,7 @@ public void addListener(InvalidationListener listener) { * {@inheritDoc} */ @Override - public void addListener(ChangeListener listener) { + public void addListener(ChangeListener listener) { helper = ExpressionHelper.addListener(helper, this, listener); } @@ -127,13 +137,13 @@ public void removeListener(InvalidationListener listener) { * {@inheritDoc} */ @Override - public void removeListener(ChangeListener listener) { + public void removeListener(ChangeListener listener) { helper = ExpressionHelper.removeListener(helper, listener); } /** * Notify the currently registered observers of a value change. - * + *

* This implementation will ignore all adds and removes of observers that * are done while a notification is processed. The changes take effect in * the following call to fireValueChangedEvent. diff --git a/src/main/java/io/reactivex/rxjavafx/observers/BindingSubscriber.java b/src/main/java/io/reactivex/rxjavafx/observers/BindingSubscriber.java old mode 100644 new mode 100755 index b139d45..81435dd --- a/src/main/java/io/reactivex/rxjavafx/observers/BindingSubscriber.java +++ b/src/main/java/io/reactivex/rxjavafx/observers/BindingSubscriber.java @@ -1,12 +1,12 @@ /** * Copyright 2017 Netflix, Inc. - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,7 @@ import com.sun.javafx.binding.ExpressionHelper; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.Consumer; +import io.reactivex.functions.Function; import javafx.beans.InvalidationListener; import javafx.beans.binding.Binding; import javafx.beans.value.ChangeListener; @@ -26,23 +27,32 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +final class BindingSubscriber implements Subscriber, ObservableValue, Binding { -final class BindingSubscriber implements Subscriber, ObservableValue, Binding { - - private final Consumer onError; - private final ConnectableFlowable flowable; + private final Function unmaskingFunction; + private final Consumer onError; + private final ConnectableFlowable obs; private boolean connected = false; - private Subscription subscription; - private ExpressionHelper helper; - private T value; + private Subscription subscription; + private ExpressionHelper helper; + private S value; - BindingSubscriber(Consumer onError) { - this.flowable = null; + BindingSubscriber(Function unmaskingFunction, Consumer onError) { + this.unmaskingFunction = unmaskingFunction; this.onError = onError; + this.obs = null; } - BindingSubscriber(ConnectableFlowable flowable, Consumer onError) { - this.flowable = flowable; + + BindingSubscriber(Function unmaskingFunction, ConnectableFlowable obs, Consumer onError) { + this.unmaskingFunction = unmaskingFunction; this.onError = onError; + this.obs = obs; + } + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + this.subscription.request(Long.MAX_VALUE); } @Override @@ -59,25 +69,25 @@ public void onError(Throwable e) { } } - @Override - public void onSubscribe(Subscription s) { - subscription = s; - subscription.request(Long.MAX_VALUE); - } - @Override public void onNext(T t) { - value = t; - fireValueChangedEvent(); + try { + value = unmaskingFunction.apply(t); + fireValueChangedEvent(); + } catch (Exception e) { + onError(e); + } } + @Override - public T getValue() { - if (!connected && flowable != null) { - flowable.connect(); + public S getValue() { + if (!connected && obs != null) { + obs.connect(); connected = true; } return value; } + @Override public boolean isValid() { return true; @@ -112,7 +122,7 @@ public void addListener(InvalidationListener listener) { * {@inheritDoc} */ @Override - public void addListener(ChangeListener listener) { + public void addListener(ChangeListener listener) { helper = ExpressionHelper.addListener(helper, this, listener); } @@ -128,13 +138,13 @@ public void removeListener(InvalidationListener listener) { * {@inheritDoc} */ @Override - public void removeListener(ChangeListener listener) { + public void removeListener(ChangeListener listener) { helper = ExpressionHelper.removeListener(helper, listener); } /** * Notify the currently registered observers of a value change. - * + *

* This implementation will ignore all adds and removes of observers that * are done while a notification is processed. The changes take effect in * the following call to fireValueChangedEvent. diff --git a/src/main/java/io/reactivex/rxjavafx/observers/JavaFxObserver.java b/src/main/java/io/reactivex/rxjavafx/observers/JavaFxObserver.java old mode 100644 new mode 100755 index aa069fa..4a4ede0 --- a/src/main/java/io/reactivex/rxjavafx/observers/JavaFxObserver.java +++ b/src/main/java/io/reactivex/rxjavafx/observers/JavaFxObserver.java @@ -1,12 +1,12 @@ /** * Copyright 2017 Netflix, Inc. - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,42 +18,129 @@ import io.reactivex.Observable; import io.reactivex.functions.Consumer; import io.reactivex.observables.ConnectableObservable; +import io.reactivex.rxjavafx.observables.JavaFxObservable; import javafx.beans.binding.Binding; +import javafx.beans.value.ObservableValue; + +import java.util.Optional; public enum JavaFxObserver { ;//no instances + /** * Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription. */ public static Binding toBinding(Observable obs) { - BindingObserver bindingObserver = new BindingObserver<>(e -> {}); + return toBinding(obs, JavaFxObserver::onError); + } + + /** + * Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription. + */ + public static Binding toBinding(Observable obs, Consumer onErrorAction) { + BindingObserver bindingObserver = new BindingObserver<>(t -> t, onErrorAction); + obs.subscribe(bindingObserver); + return bindingObserver; + } + + /** + * Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered. + */ + public static Binding toNullBinding(Observable obs, T nullSentinel) { + return toNullBinding(obs, nullSentinel, JavaFxObserver::onError); + } + + /** + * Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered. + */ + public static Binding toNullBinding(Observable obs, T nullSentinel, Consumer onErrorAction) { + if (nullSentinel == null) { + throw new NullPointerException("The null value sentinel must not be null."); + } + BindingObserver bindingObserver = new BindingObserver<>(t -> t == nullSentinel ? null : t, onErrorAction); obs.subscribe(bindingObserver); return bindingObserver; } + /** * Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present. */ - public static Binding toBinding(Observable obs, Consumer onErrorAction ) { - BindingObserver bindingObserver = new BindingObserver<>(onErrorAction); + public static Binding toNullableBinding(Observable> obs) { + return toNullableBinding(obs, JavaFxObserver::onError); + } + + /** + * Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present. + */ + public static Binding toNullableBinding(Observable> obs, Consumer onErrorAction) { + BindingObserver, T> bindingObserver = new BindingObserver<>(o -> o.orElse(null), onErrorAction); obs.subscribe(bindingObserver); return bindingObserver; } + /** * Turns an Observable into an lazy JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. */ public static Binding toLazyBinding(Observable obs) { + return toLazyBinding(obs, JavaFxObserver::onError); + } + + /** + * Turns an Observable into an eager JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + */ + public static Binding toLazyBinding(Observable obs, Consumer onErrorAction) { ConnectableObservable published = obs.publish(); - BindingObserver bindingObserver = new BindingObserver<>(published, e -> {}); + BindingObserver bindingObserver = new BindingObserver<>(t -> t, published, onErrorAction); published.subscribe(bindingObserver); return bindingObserver; } + /** * Turns an Observable into an eager JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered. */ - public static Binding toLazyBinding(Observable obs, Consumer onErrorAction ) { + public static Binding toLazyNullBinding(Observable obs, T nullSentinel) { + return toLazyNullBinding(obs, nullSentinel, JavaFxObserver::onError); + } + + /** + * Turns an Observable into an eager JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered. + */ + public static Binding toLazyNullBinding(Observable obs, T nullSentinel, Consumer onErrorAction) { + if (nullSentinel == null) { + throw new NullPointerException("The null value sentinel must not be null."); + } ConnectableObservable published = obs.publish(); - BindingObserver bindingObserver = new BindingObserver<>(published,onErrorAction); + BindingObserver bindingObserver = new BindingObserver<>(t -> t == nullSentinel ? null : t, published, onErrorAction); published.subscribe(bindingObserver); return bindingObserver; } + + /** + * Turns an Observable into an lazy JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present. + */ + public static Binding toLazyNullableBinding(Observable> obs) { + return toLazyNullableBinding(obs, JavaFxObserver::onError); + } + + /** + * Turns an Observable into an lazy JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present. + */ + public static Binding toLazyNullableBinding(Observable> obs, Consumer onErrorAction) { + ConnectableObservable> published = obs.publish(); + BindingObserver, T> bindingObserver = new BindingObserver<>(o -> o.orElse(null), published, onErrorAction); + published.subscribe(bindingObserver); + return bindingObserver; + } + + private static void onError(Throwable t) { + // nothing + } } diff --git a/src/main/java/io/reactivex/rxjavafx/observers/JavaFxSubscriber.java b/src/main/java/io/reactivex/rxjavafx/observers/JavaFxSubscriber.java old mode 100644 new mode 100755 index 29035ea..b154659 --- a/src/main/java/io/reactivex/rxjavafx/observers/JavaFxSubscriber.java +++ b/src/main/java/io/reactivex/rxjavafx/observers/JavaFxSubscriber.java @@ -1,12 +1,12 @@ /** * Copyright 2017 Netflix, Inc. - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,43 +18,129 @@ import io.reactivex.Flowable; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.Consumer; +import io.reactivex.rxjavafx.observables.JavaFxObservable; import javafx.beans.binding.Binding; +import javafx.beans.value.ObservableValue; + +import java.util.Optional; + public enum JavaFxSubscriber { ;//no instances /** - * Turns a Flowable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription. + * Turns an Flowable into an eager JavaFX Binding that subscribes immediately to the Flowable. Calling the Binding's dispose() method will handle the unsubscription. + */ + public static Binding toBinding(Flowable flowable) { + return toBinding(flowable, JavaFxSubscriber::onError); + } + + /** + * Turns an Flowable into an eager JavaFX Binding that subscribes immediately to the Flowable. Calling the Binding's dispose() method will handle the unsubscription. */ - public static Binding toBinding(Flowable obs) { - BindingSubscriber bindingSubscriber = new BindingSubscriber<>(e -> {}); - obs.subscribe(bindingSubscriber); + public static Binding toBinding(Flowable flowable, Consumer onErrorAction) { + BindingSubscriber bindingSubscriber = new BindingSubscriber<>(t -> t, onErrorAction); + flowable.subscribe(bindingSubscriber); return bindingSubscriber; } + + /** + * Turns an Flowable into an eager JavaFX Binding that subscribes immediately to the Flowable. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered. + */ + public static Binding toNullBinding(Flowable flowable, T nullSentinel) { + return toNullBinding(flowable, nullSentinel, JavaFxSubscriber::onError); + } + /** - * Turns a Flowable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription. + * Turns an Flowable into an eager JavaFX Binding that subscribes immediately to the Flowable. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered. */ - public static Binding toBinding(Flowable obs, Consumer onErrorAction ) { - BindingSubscriber bindingSubscriber = new BindingSubscriber<>(onErrorAction); - obs.subscribe(bindingSubscriber); + public static Binding toNullBinding(Flowable flowable, T nullSentinel, Consumer onErrorAction) { + if (nullSentinel == null) { + throw new NullPointerException("The null value sentinel must not be null."); + } + BindingSubscriber bindingSubscriber = new BindingSubscriber<>(t -> t == nullSentinel ? null : t, onErrorAction); + flowable.subscribe(bindingSubscriber); return bindingSubscriber; } /** - * Turns a Flowable into an lazy JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + * Turns an Flowable into an eager JavaFX Binding that subscribes immediately to the Flowable. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present. + */ + public static Binding toNullableBinding(Flowable> flowable) { + return toNullableBinding(flowable, JavaFxSubscriber::onError); + } + + /** + * Turns an Flowable into an eager JavaFX Binding that subscribes immediately to the Flowable. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present. + */ + public static Binding toNullableBinding(Flowable> flowable, Consumer onErrorAction) { + BindingSubscriber, T> bindingSubscriber = new BindingSubscriber<>(o -> o.orElse(null), onErrorAction); + flowable.subscribe(bindingSubscriber); + return bindingSubscriber; + } + + /** + * Turns an Flowable into an lazy JavaFX Binding that subscribes to the Flowable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. */ public static Binding toLazyBinding(Flowable flowable) { + return toLazyBinding(flowable, JavaFxSubscriber::onError); + } + + /** + * Turns an Flowable into an eager JavaFX Binding that subscribes to the Flowable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + */ + public static Binding toLazyBinding(Flowable flowable, Consumer onErrorAction) { ConnectableFlowable published = flowable.publish(); - BindingSubscriber bindingSubscriber = new BindingSubscriber<>(published, e -> {}); + BindingSubscriber bindingSubscriber = new BindingSubscriber<>(t -> t, published, onErrorAction); published.subscribe(bindingSubscriber); return bindingSubscriber; } + + /** + * Turns an Flowable into an eager JavaFX Binding that subscribes to the Flowable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered. + */ + public static Binding toLazyNullBinding(Flowable flowable, T nullSentinel) { + return toLazyNullBinding(flowable, nullSentinel, JavaFxSubscriber::onError); + } + /** - * Turns a Flowable into an eager JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + * Turns an Flowable into an eager JavaFX Binding that subscribes to the Flowable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered. */ - public static Binding toLazyBinding(Flowable flowable, Consumer onErrorAction ) { + public static Binding toLazyNullBinding(Flowable flowable, T nullSentinel, Consumer onErrorAction) { + if (nullSentinel == null) { + throw new NullPointerException("The null value sentinel must not be null."); + } ConnectableFlowable published = flowable.publish(); - BindingSubscriber bindingSubscriber = new BindingSubscriber<>(published,onErrorAction); + BindingSubscriber bindingSubscriber = new BindingSubscriber<>(t -> t == nullSentinel ? null : t, published, onErrorAction); + published.subscribe(bindingSubscriber); + return bindingSubscriber; + } + + /** + * Turns an Flowable into an lazy JavaFX Binding that subscribes to the Flowable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present. + */ + public static Binding toLazyNullableBinding(Flowable> flowable) { + return toLazyNullableBinding(flowable, JavaFxSubscriber::onError); + } + + /** + * Turns an Flowable into an lazy JavaFX Binding that subscribes to the Flowable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription. + * This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present. + */ + public static Binding toLazyNullableBinding(Flowable> flowable, Consumer onErrorAction) { + ConnectableFlowable> published = flowable.publish(); + BindingSubscriber, T> bindingSubscriber = new BindingSubscriber<>(o -> o.orElse(null), published, onErrorAction); published.subscribe(bindingSubscriber); return bindingSubscriber; } + + private static void onError(Throwable t) { + // nothing + } } diff --git a/src/test/java/io/reactivex/rxjavafx/subscriptions/BindingTest.java b/src/test/java/io/reactivex/rxjavafx/subscriptions/BindingTest.java old mode 100644 new mode 100755 index 0e88193..ed1b1ed --- a/src/test/java/io/reactivex/rxjavafx/subscriptions/BindingTest.java +++ b/src/test/java/io/reactivex/rxjavafx/subscriptions/BindingTest.java @@ -1,12 +1,12 @@ /** * Copyright 2017 Netflix, Inc. - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,15 +17,17 @@ import io.reactivex.Flowable; import io.reactivex.Observable; -import io.reactivex.rxjavafx.subscriptions.CompositeBinding; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.rxjavafx.observers.JavaFxObserver; +import io.reactivex.rxjavafx.observers.JavaFxSubscriber; +import io.reactivex.rxjavafx.schedulers.JavaFxScheduler; +import io.reactivex.subjects.PublishSubject; import javafx.application.Platform; import javafx.beans.binding.Binding; import javafx.embed.swing.JFXPanel; import org.junit.Test; -import io.reactivex.rxjavafx.observers.JavaFxObserver; -import io.reactivex.rxjavafx.schedulers.JavaFxScheduler; -import io.reactivex.rxjavafx.observers.JavaFxSubscriber; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -37,17 +39,18 @@ public final class BindingTest { public BindingTest() { new JFXPanel(); } + @Test public void testCompositeBinding() { CompositeBinding bindings = new CompositeBinding(); - Observable source = Observable.interval(1,TimeUnit.SECONDS); + Observable source = Observable.interval(1, TimeUnit.SECONDS); CountDownLatch unsubscribeWait = new CountDownLatch(2); Binding binding1 = JavaFxObserver.toBinding(source.doOnDispose(unsubscribeWait::countDown).observeOn(JavaFxScheduler.platform())); bindings.add(binding1); - Binding binding2 = JavaFxObserver.toBinding(source.doOnDispose(unsubscribeWait::countDown).reduce(0L,(x,y) -> x + y).observeOn(JavaFxScheduler.platform()).toObservable()); + Binding binding2 = JavaFxObserver.toBinding(source.doOnDispose(unsubscribeWait::countDown).reduce(0L, (x, y) -> x + y).observeOn(JavaFxScheduler.platform()).toObservable()); bindings.add(binding2); sleep(3000); @@ -56,7 +59,7 @@ public void testCompositeBinding() { bindings.dispose(); try { - unsubscribeWait.await(10,TimeUnit.SECONDS); + unsubscribeWait.await(10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } @@ -87,6 +90,55 @@ public void testObserverBinding() { } } + @Test + public void testObserverNullBinding() { + final CountDownLatch latch = new CountDownLatch(1); + Platform.runLater(() -> { + + String nil = "null"; + PublishSubject items = PublishSubject.create(); + Binding binding = JavaFxObserver.toNullBinding(items, nil); + items.onNext(nil); + + assertTrue(binding.getValue() == null); + items.onNext("Alpha"); + assertTrue(binding.getValue().equals("Alpha")); + items.onNext(nil); + assertTrue(binding.getValue() == null); + latch.countDown(); + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void testObserverNullableBinding() { + final CountDownLatch latch = new CountDownLatch(1); + Platform.runLater(() -> { + + PublishSubject> items = PublishSubject.create(); + Binding binding = JavaFxObserver.toNullableBinding(items); + items.onNext(Optional.empty()); + + assertTrue(binding.getValue() == null); + items.onNext(Optional.of("Alpha")); + assertTrue(binding.getValue().equals("Alpha")); + items.onNext(Optional.empty()); + assertTrue(binding.getValue() == null); + latch.countDown(); + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + @Test public void testObserverLazyBinding() { final CountDownLatch latch = new CountDownLatch(1); @@ -96,7 +148,7 @@ public void testObserverLazyBinding() { Observable items = Observable.just("Alpha", "Beta", "Gamma", "Delta") - .doOnNext(s -> emissionCount.incrementAndGet()); + .doOnNext(s -> emissionCount.incrementAndGet()); Binding binding = JavaFxObserver.toLazyBinding(items); @@ -120,6 +172,55 @@ public void testObserverLazyBinding() { } } + @Test + public void testSubscriberNullBinding() { + final CountDownLatch latch = new CountDownLatch(1); + Platform.runLater(() -> { + + String nil = "null"; + PublishProcessor items = PublishProcessor.create(); + Binding binding = JavaFxSubscriber.toNullBinding(items, nil); + items.onNext(nil); + + assertTrue(binding.getValue() == null); + items.onNext("Alpha"); + assertTrue(binding.getValue().equals("Alpha")); + items.onNext(nil); + assertTrue(binding.getValue() == null); + latch.countDown(); + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void testSubscriberNullableBinding() { + final CountDownLatch latch = new CountDownLatch(1); + Platform.runLater(() -> { + + PublishProcessor> items = PublishProcessor.create(); + Binding binding = JavaFxSubscriber.toNullableBinding(items); + items.onNext(Optional.empty()); + + assertTrue(binding.getValue() == null); + items.onNext(Optional.of("Alpha")); + assertTrue(binding.getValue().equals("Alpha")); + items.onNext(Optional.empty()); + assertTrue(binding.getValue() == null); + latch.countDown(); + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + @Test public void testSubscriberBinding() { final CountDownLatch latch = new CountDownLatch(1);