Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add nullable Bindings #57

Merged
merged 1 commit into from
Nov 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions src/main/java/io/reactivex/rxjavafx/observers/BindingObserver.java
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2017 Netflix, Inc.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -19,28 +19,32 @@
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.observables.ConnectableObservable;
import javafx.beans.InvalidationListener;
import javafx.beans.binding.Binding;
import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import javafx.collections.ObservableList;

final class BindingObserver<T, S> implements Observer<T>, ObservableValue<S>, Binding<S> {

final class BindingObserver<T> implements Observer<T>, ObservableValue<T>, Binding<T> {

private final Consumer<Throwable> onError;
private final Function<T, S> unmaskingFunction;
private final Consumer<Throwable> onError;
private final ConnectableObservable<T> obs;
private boolean connected = false;
private Disposable disposable;
private ExpressionHelper<T> helper;
private T value;
private Disposable disposable;
private ExpressionHelper<S> helper;
private S value;

BindingObserver(Consumer<Throwable> onError) {
BindingObserver(Function<T, S> unmaskingFunction, Consumer<Throwable> onError) {
this.unmaskingFunction = unmaskingFunction;
this.onError = onError;
this.obs = null;
}
BindingObserver(ConnectableObservable<T> obs, Consumer<Throwable> onError) {

BindingObserver(Function<T, S> unmaskingFunction, ConnectableObservable<T> obs, Consumer<Throwable> onError) {
this.unmaskingFunction = unmaskingFunction;
this.onError = onError;
this.obs = obs;
}
Expand All @@ -66,17 +70,23 @@ public void onError(Throwable e) {

@Override
public void onNext(T t) {
value = t;
fireValueChangedEvent();
try {
value = unmaskingFunction.apply(t);
fireValueChangedEvent();
} catch (Exception e) {
onError(e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if passing the exception to onError is the right approach

Copy link

@rguillens rguillens Oct 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks fine to me, BindingObserver.onError(throwable) apply user error management if provided or the exception is ignored.

}
}

@Override
public T getValue() {
public S getValue() {
if (!connected && obs != null) {
obs.connect();
connected = true;
}
return value;
}

@Override
public boolean isValid() {
return true;
Expand Down Expand Up @@ -111,7 +121,7 @@ public void addListener(InvalidationListener listener) {
* {@inheritDoc}
*/
@Override
public void addListener(ChangeListener<? super T> listener) {
public void addListener(ChangeListener<? super S> listener) {
helper = ExpressionHelper.addListener(helper, this, listener);
}

Expand All @@ -127,13 +137,13 @@ public void removeListener(InvalidationListener listener) {
* {@inheritDoc}
*/
@Override
public void removeListener(ChangeListener<? super T> listener) {
public void removeListener(ChangeListener<? super S> listener) {
helper = ExpressionHelper.removeListener(helper, listener);
}

/**
* Notify the currently registered observers of a value change.
*
* <p>
* This implementation will ignore all adds and removes of observers that
* are done while a notification is processed. The changes take effect in
* the following call to fireValueChangedEvent.
Expand Down
66 changes: 38 additions & 28 deletions src/main/java/io/reactivex/rxjavafx/observers/BindingSubscriber.java
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2017 Netflix, Inc.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,6 +18,7 @@
import com.sun.javafx.binding.ExpressionHelper;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import javafx.beans.InvalidationListener;
import javafx.beans.binding.Binding;
import javafx.beans.value.ChangeListener;
Expand All @@ -26,23 +27,32 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class BindingSubscriber<T, S> implements Subscriber<T>, ObservableValue<S>, Binding<S> {

final class BindingSubscriber<T> implements Subscriber<T>, ObservableValue<T>, Binding<T> {

private final Consumer<Throwable> onError;
private final ConnectableFlowable<T> flowable;
private final Function<T, S> unmaskingFunction;
private final Consumer<Throwable> onError;
private final ConnectableFlowable<T> obs;
private boolean connected = false;
private Subscription subscription;
private ExpressionHelper<T> helper;
private T value;
private Subscription subscription;
private ExpressionHelper<S> helper;
private S value;

BindingSubscriber(Consumer<Throwable> onError) {
this.flowable = null;
BindingSubscriber(Function<T, S> unmaskingFunction, Consumer<Throwable> onError) {
this.unmaskingFunction = unmaskingFunction;
this.onError = onError;
this.obs = null;
}
BindingSubscriber(ConnectableFlowable<T> flowable, Consumer<Throwable> onError) {
this.flowable = flowable;

BindingSubscriber(Function<T, S> unmaskingFunction, ConnectableFlowable<T> obs, Consumer<Throwable> onError) {
this.unmaskingFunction = unmaskingFunction;
this.onError = onError;
this.obs = obs;
}

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
this.subscription.request(Long.MAX_VALUE);
}

@Override
Expand All @@ -59,25 +69,25 @@ public void onError(Throwable e) {
}
}

@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
value = t;
fireValueChangedEvent();
try {
value = unmaskingFunction.apply(t);
fireValueChangedEvent();
} catch (Exception e) {
onError(e);
}
}

@Override
public T getValue() {
if (!connected && flowable != null) {
flowable.connect();
public S getValue() {
if (!connected && obs != null) {
obs.connect();
connected = true;
}
return value;
}

@Override
public boolean isValid() {
return true;
Expand Down Expand Up @@ -112,7 +122,7 @@ public void addListener(InvalidationListener listener) {
* {@inheritDoc}
*/
@Override
public void addListener(ChangeListener<? super T> listener) {
public void addListener(ChangeListener<? super S> listener) {
helper = ExpressionHelper.addListener(helper, this, listener);
}

Expand All @@ -128,13 +138,13 @@ public void removeListener(InvalidationListener listener) {
* {@inheritDoc}
*/
@Override
public void removeListener(ChangeListener<? super T> listener) {
public void removeListener(ChangeListener<? super S> listener) {
helper = ExpressionHelper.removeListener(helper, listener);
}

/**
* Notify the currently registered observers of a value change.
*
* <p>
* This implementation will ignore all adds and removes of observers that
* are done while a notification is processed. The changes take effect in
* the following call to fireValueChangedEvent.
Expand Down
105 changes: 96 additions & 9 deletions src/main/java/io/reactivex/rxjavafx/observers/JavaFxObserver.java
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2017 Netflix, Inc.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,42 +18,129 @@
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.rxjavafx.observables.JavaFxObservable;
import javafx.beans.binding.Binding;
import javafx.beans.value.ObservableValue;

import java.util.Optional;

public enum JavaFxObserver {
;//no instances

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
*/
public static <T> Binding<T> toBinding(Observable<T> obs) {
BindingObserver<T> bindingObserver = new BindingObserver<>(e -> {});
return toBinding(obs, JavaFxObserver::onError);
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
*/
public static <T> Binding<T> toBinding(Observable<T> obs, Consumer<Throwable> onErrorAction) {
BindingObserver<T, T> bindingObserver = new BindingObserver<>(t -> t, onErrorAction);
obs.subscribe(bindingObserver);
return bindingObserver;
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered.
*/
public static <T> Binding<T> toNullBinding(Observable<T> obs, T nullSentinel) {
return toNullBinding(obs, nullSentinel, JavaFxObserver::onError);
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered.
*/
public static <T> Binding<T> toNullBinding(Observable<T> obs, T nullSentinel, Consumer<Throwable> onErrorAction) {
if (nullSentinel == null) {
throw new NullPointerException("The null value sentinel must not be null.");
}
BindingObserver<T, T> bindingObserver = new BindingObserver<>(t -> t == nullSentinel ? null : t, onErrorAction);
obs.subscribe(bindingObserver);
return bindingObserver;
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present.
*/
public static <T> Binding<T> toBinding(Observable<T> obs, Consumer<Throwable> onErrorAction ) {
BindingObserver<T> bindingObserver = new BindingObserver<>(onErrorAction);
public static <T> Binding<T> toNullableBinding(Observable<Optional<T>> obs) {
return toNullableBinding(obs, JavaFxObserver::onError);
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present.
*/
public static <T> Binding<T> toNullableBinding(Observable<Optional<T>> obs, Consumer<Throwable> onErrorAction) {
BindingObserver<Optional<T>, T> bindingObserver = new BindingObserver<>(o -> o.orElse(null), onErrorAction);
obs.subscribe(bindingObserver);
return bindingObserver;
}

/**
* Turns an Observable into an lazy JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
*/
public static <T> Binding<T> toLazyBinding(Observable<T> obs) {
return toLazyBinding(obs, JavaFxObserver::onError);
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
*/
public static <T> Binding<T> toLazyBinding(Observable<T> obs, Consumer<Throwable> onErrorAction) {
ConnectableObservable<T> published = obs.publish();
BindingObserver<T> bindingObserver = new BindingObserver<>(published, e -> {});
BindingObserver<T, T> bindingObserver = new BindingObserver<>(t -> t, published, onErrorAction);
published.subscribe(bindingObserver);
return bindingObserver;
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered.
*/
public static <T> Binding<T> toLazyBinding(Observable<T> obs, Consumer<Throwable> onErrorAction ) {
public static <T> Binding<T> toLazyNullBinding(Observable<T> obs, T nullSentinel) {
return toLazyNullBinding(obs, nullSentinel, JavaFxObserver::onError);
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered.
*/
public static <T> Binding<T> toLazyNullBinding(Observable<T> obs, T nullSentinel, Consumer<Throwable> onErrorAction) {
if (nullSentinel == null) {
throw new NullPointerException("The null value sentinel must not be null.");
}
ConnectableObservable<T> published = obs.publish();
BindingObserver<T> bindingObserver = new BindingObserver<>(published,onErrorAction);
BindingObserver<T, T> bindingObserver = new BindingObserver<>(t -> t == nullSentinel ? null : t, published, onErrorAction);
published.subscribe(bindingObserver);
return bindingObserver;
}

/**
* Turns an Observable into an lazy JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present.
*/
public static <T> Binding<T> toLazyNullableBinding(Observable<Optional<T>> obs) {
return toLazyNullableBinding(obs, JavaFxObserver::onError);
}

/**
* Turns an Observable into an lazy JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present.
*/
public static <T> Binding<T> toLazyNullableBinding(Observable<Optional<T>> obs, Consumer<Throwable> onErrorAction) {
ConnectableObservable<Optional<T>> published = obs.publish();
BindingObserver<Optional<T>, T> bindingObserver = new BindingObserver<>(o -> o.orElse(null), published, onErrorAction);
published.subscribe(bindingObserver);
return bindingObserver;
}

private static void onError(Throwable t) {
// nothing
}
}
Loading