From 486523c97d3558e723a668af22ba88bd2b393b18 Mon Sep 17 00:00:00 2001 From: johnmcclean Date: Wed, 6 Mar 2019 18:47:44 +0000 Subject: [PATCH] fix for #1042 --- .../futurestream/BlockingStreamHelper.java | 27 +++++++++++ .../types/futurestream/LazyStream.java | 3 +- .../cyclops/futurestream/FutureStream.java | 4 +- .../FutureStreamReactiveSeqTest.java | 46 +++++++++++++++++++ .../futurestream/react/base/BaseSeqTest.java | 2 +- .../react/base/BaseSequentialSeqTest.java | 2 +- .../react/lazy/sequence/LFSNoOrderTest.java | 11 +++-- .../react/lazy/sequence/SequentialTest.java | 2 +- 8 files changed, 87 insertions(+), 10 deletions(-) diff --git a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/BlockingStreamHelper.java b/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/BlockingStreamHelper.java index 5af81111de..17ce9d8f4a 100644 --- a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/BlockingStreamHelper.java +++ b/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/BlockingStreamHelper.java @@ -100,6 +100,33 @@ public static Object getSafe(final FastFuture next, final Optional> errorHandler) { + + try { + return next.join(); + } catch (final SimpleReactCompletionException e) { + Throwable t = e; + + while(t instanceof SimpleReactCompletionException){ + t= t.getCause(); + } + while(t instanceof SimpleReactFailedStageException){ + t= t.getCause(); + } + if(t instanceof FilteredExecutionPathException){ + return MissingValue.MISSING_VALUE; + }else{ + throw ExceptionSoftener.throwSoftenedException(t); + } + } catch (final RuntimeException e) { + capture(e, errorHandler); + } catch (final Exception e) { + capture(e, errorHandler); + } + + return MissingValue.MISSING_VALUE; + } @SuppressWarnings("rawtypes") static Object getSafe(final CompletableFuture next, final Optional> errorHandler) { diff --git a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyStream.java b/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyStream.java index 417d3aed84..05b502c45b 100644 --- a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyStream.java +++ b/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyStream.java @@ -137,7 +137,8 @@ default R run(final Collector collector) { return (R) batcher.getAllResults() .stream() - .map(cf -> BlockingStreamHelper.getSafe(cf, getErrorHandler())) + .map(cf -> BlockingStreamHelper.extractNonFiltered(cf, getErrorHandler())) + //.map(cf -> BlockingStreamHelper.getSafe(cf, getErrorHandler())) .filter(v -> v != MissingValue.MISSING_VALUE) .collect((Collector) collector); diff --git a/cyclops-futurestream/src/main/java/cyclops/futurestream/FutureStream.java b/cyclops-futurestream/src/main/java/cyclops/futurestream/FutureStream.java index 749008e373..cbf4d6ef20 100644 --- a/cyclops-futurestream/src/main/java/cyclops/futurestream/FutureStream.java +++ b/cyclops-futurestream/src/main/java/cyclops/futurestream/FutureStream.java @@ -2660,9 +2660,7 @@ default Set toSet() { return collect(Collectors.toSet()); } - /* - * @see cyclops2.stream.ReactiveSeq#toList() - */ + @Override default List toList() { return collect(Collectors.toList()); diff --git a/cyclops-futurestream/src/test/java/cyclops/futurestream/FutureStreamReactiveSeqTest.java b/cyclops-futurestream/src/test/java/cyclops/futurestream/FutureStreamReactiveSeqTest.java index 9968eea4f3..f598bab454 100644 --- a/cyclops-futurestream/src/test/java/cyclops/futurestream/FutureStreamReactiveSeqTest.java +++ b/cyclops-futurestream/src/test/java/cyclops/futurestream/FutureStreamReactiveSeqTest.java @@ -9,10 +9,12 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItems; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; public class FutureStreamReactiveSeqTest extends AbstractReactiveSeqTest { @Override @@ -26,6 +28,7 @@ public ReactiveSeq empty() { return FutureStream.builder() .of(); } + @Test @Override public void recoverWithMiddleIterator(){ @@ -64,6 +67,49 @@ public void recoverWithMiddleList(){ assertThat(result,hasItems(100,200,300)); } + @Test + public void onErrorList(){ + AtomicInteger count = new AtomicInteger(0); + + try { + of(1, 2, 3)/**.filter(i->false)**/ + .map(i -> { + throw new RuntimeException(); + }) + .onError(e -> count.incrementAndGet()) + .toList(); + fail("exception expected"); + }catch(Exception e){ + + } + + + assertThat(count.get(),equalTo(3)); + + } + @Test + public void onErrorIterator(){ + AtomicInteger count = new AtomicInteger(0); + + try { + Iterator it = of(1, 2, 3).map(i -> { + throw new RuntimeException(); + }) + .onError(e -> count.incrementAndGet()) + .iterator(); + while(it.hasNext()){ + System.out.println(it.next()); + } + fail("exception expected"); + }catch(Exception e){ + + + } + + + assertThat(count.get(),equalTo(3)); + + } } diff --git a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/base/BaseSeqTest.java b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/base/BaseSeqTest.java index 6053267673..e916d50971 100644 --- a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/base/BaseSeqTest.java +++ b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/base/BaseSeqTest.java @@ -457,7 +457,7 @@ public void testDuplicate(){ Throwable ex; - @Test + @Test(expected = ClassCastException.class) public void testCastException() { ex = null; of(1, "a", 2, "b", 3, null).capture(e-> ex =e) diff --git a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/base/BaseSequentialSeqTest.java b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/base/BaseSequentialSeqTest.java index 484828f154..50810bcc2e 100644 --- a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/base/BaseSequentialSeqTest.java +++ b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/base/BaseSequentialSeqTest.java @@ -447,7 +447,7 @@ public void testDuplicate(){ Throwable ex; - @Test + @Test(expected = ClassCastException.class) public void testCastException() { ex = null; of(1, "a", 2, "b", 3, null).capture(e-> ex =e) diff --git a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/sequence/LFSNoOrderTest.java b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/sequence/LFSNoOrderTest.java index 4268e44d99..cf65939a6d 100644 --- a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/sequence/LFSNoOrderTest.java +++ b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/sequence/LFSNoOrderTest.java @@ -238,10 +238,15 @@ public void testQuadriplicateLimit(){ } @Test public void testCastException() { - of(1, "a", 2, "b", 3, null) - .peek(it ->System.out.println(it)) + of(1, "a", 2, "b", 3) + .peek(it ->System.out.println(" it is " +it)) .cast(Integer.class) - .peek(i->System.out.println(i.getClass())) + .recover(t->{ + return -10; + }) + .peek(i->{ + System.out.println(i.getClass()); + }) .peek(it ->System.out.println(it)) .toList() .stream().map(i->i.getClass()) diff --git a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/sequence/SequentialTest.java b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/sequence/SequentialTest.java index c47c33f7eb..d7d7acba1e 100644 --- a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/sequence/SequentialTest.java +++ b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/sequence/SequentialTest.java @@ -311,7 +311,7 @@ public void testIntersperse() { } - @Test + @Test(expected = ClassCastException.class) public void cast(){ LazyReact.sequentialBuilder().of(1,2,3).cast(String.class).collect(Collectors.toList()) .stream().map(i->i.getClass())