Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
johnmcclean committed Mar 6, 2019
1 parent 51f49f6 commit 6a11519
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 18 deletions.
8 changes: 8 additions & 0 deletions cyclops-futurestream/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ sourceSets {
test.runtimeClasspath += [configurations.provided]
}

test {
reports.html.destination = file("$buildDir/reports/test")
forkEvery = 1
testLogging {
events "started", "passed", "skipped", "failed"//, "standardOut", "standardError"
}
}

modifyPom {
project {
name 'cyclops-futurestream'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3360,13 +3360,13 @@ default FutureStream<U> recoverWith(final Function<Throwable, ? extends Publishe
}

@Override
default ReactiveSeq<U> recoverWith(final BiFunction<Integer, Throwable, ? extends Publisher<? extends U>> fn) {
default FutureStream<U> recoverWith(final BiFunction<Integer, Throwable, ? extends Publisher<? extends U>> fn) {
return (FutureStream<U>)ReactiveSeq.super.recoverWith(fn);

}

@Override
default <X extends Throwable> ReactiveSeq<U> recoverWith(Class<X> type, final BiFunction<Integer, X, ? extends Publisher<? extends U>> fn) {
default <X extends Throwable> FutureStream<U> recoverWith(Class<X> type, final BiFunction<Integer, X, ? extends Publisher<? extends U>> fn) {
return (FutureStream<U>)ReactiveSeq.super.recoverWith(type,fn);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public <R> IO<R> map(Function<? super T, ? extends R> s) {

@Override
public <R> IO<R> flatMap(Function<? super T, IO<? extends R>> s) {
return of(flowable.mergeMap(s));
return of(flowable.flatMap(in->FutureStream.builder().fromPublisher(s.apply(in))));
}

@Override
Expand Down Expand Up @@ -182,7 +182,7 @@ public static <T> cyclops.reactive.Managed<Seq<T>> sequence(Iterable<? extends


public <R> Managed<R> map(Function<? super T, ? extends R> mapper){
return of(apply(mapper.andThen(IO::of)),__->{});
return of(apply(mapper.andThen(FutureStreamIO::of)),__->{});
}
public <R> Managed<R> flatMap(Function<? super T, cyclops.reactive.Managed<R>> f){

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import cyclops.reactive.AbstractIOTestBase;
import cyclops.reactive.IO;
import org.hamcrest.MatcherAssert;
import org.junit.Ignore;
import org.junit.Test;
import reactor.core.scheduler.Schedulers;

Expand All @@ -15,7 +16,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.*;


@Ignore
public class FutureStreamIOTest extends AbstractIOTestBase {
Executor ex = Executors.newFixedThreadPool(1);
RuntimeException re = new RuntimeException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ public void forEachWithErrorsAsync(){
stream.forEach(i->list.add(i),
e->error=e);

ExceptionSoftener.softenRunnable(()->Thread.sleep(100)).run();
ExceptionSoftener.softenRunnable(()->Thread.sleep(1000)).run();


assertThat(list,hasItems(1,2,3,4,5));
assertThat(list.size(),equalTo(5));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cyclops.futurestream.react.lazy.DuplicationTest;
import com.oath.cyclops.util.SimpleTimer;
import cyclops.futurestream.LazyReact;
import org.junit.Before;
import org.junit.Test;

import cyclops.reactive.collections.mutable.ListX;
Expand Down Expand Up @@ -119,6 +120,11 @@ private String saveStatus(Status s) {
volatile int otherCount;
volatile int count3;
volatile int peek;

@Before
public void setup(){
count2 = new AtomicInteger(0);
}
@Test
public void windowByTimeFiltered() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void recover2IO(){
.recover(IOException.class,e->"hello")
.firstValue(null),equalTo("hello"));
}
@Test(expected=NoSuchElementException.class)
@Test(expected=IOException.class)
public void recoverIOUnhandledThrown(){
assertThat(DuplicationTest.of(1,2,3,4)
.map(i->i+2)
Expand Down
7 changes: 1 addition & 6 deletions cyclops/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,7 @@ task packageTests(type: Jar) {

artifacts.archives packageTests

javadoc {
configure((CoreJavadocOptions) getOptions()) {
addStringOption('sourcepath', "/Sources/jool-0.9.11-sources.jar")
}
exclude '**/internal/**'
}


modifyPom {
project {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,14 @@ public boolean tryAdvance(final Consumer<? super T> action) {
return false;
}catch(Queue.QueueTimeoutException e){
timeoutRetry =true;
} catch(Queue.Error e){
throw ExceptionSoftener.throwSoftenedException(e.t);

} catch(final Exception e) {


closed.set(true);
throw ExceptionSoftener.throwSoftenedException(e);
// return false;
return false;
} finally {

}
Expand Down
10 changes: 7 additions & 3 deletions cyclops/src/main/java/com/oath/cyclops/async/adapters/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -511,10 +511,9 @@ public T poll(final long time, final TimeUnit unit) throws QueueTimeoutException
public T get() {

T res = ensureOpen(this.timeout, this.timeUnit);
System.out.println("Res " + res);
if(res instanceof Error){
Error e = (Error)(res);
throw ExceptionSoftener.throwSoftenedException(e.t);
throw ExceptionSoftener.throwSoftenedException(e);
}
return res;

Expand Down Expand Up @@ -550,8 +549,13 @@ public boolean addError(Throwable t){
}

@AllArgsConstructor
private static final class Error{
public static final class Error extends RuntimeException{
Throwable t;

@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}

/**
Expand Down

0 comments on commit 6a11519

Please sign in to comment.