Skip to content

Commit

Permalink
Add BlockHound SPI support via ServiceLoader (#1682)
Browse files Browse the repository at this point in the history
Add BlockHound SPI's implementation. By keeping it next to core,
we control how we intercept the tasks (e.g., `onScheduleHook`)
and can also apply internal optimizations in future.

BlockHound's built-in Reactor integration will be adjusted to not
apply anything if Reactor's version is 3.3 or higher.

The integration class must be public to to the ServiceLoader
use, but it is excluded from javadoc and documented inline as
"do not consider public".
  • Loading branch information
bsideup authored and Stephane Maldini committed May 6, 2019
1 parent dc636ee commit c7cc8a7
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ plugins {
id "org.jetbrains.dokka" version "0.9.16"
id "me.champeau.gradle.japicmp" version "0.2.6"
id "de.undercouch.download" version "3.4.3"
id "org.unbroken-dome.test-sets" version "1.5.1" apply false // 2.x version does not work with Gradle 4.10
}

apply from: "gradle/doc.gradle"
Expand Down
15 changes: 15 additions & 0 deletions reactor-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import me.champeau.gradle.japicmp.JapicmpTask

apply plugin: 'idea' //needed to avoid IDEA seeing the jmh folder as source
apply plugin: 'me.champeau.gradle.jmh'
apply plugin: 'org.unbroken-dome.test-sets'

sourceSets {
noMicrometerTest {
Expand All @@ -25,6 +26,10 @@ sourceSets {
}
}

testSets {
blockHoundTest
}

configurations {
compileOnly.extendsFrom jsr166backport
testCompile.extendsFrom jsr166backport
Expand All @@ -50,6 +55,11 @@ dependencies {

optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")

//Optional BlockHound support
optional 'io.projectreactor.tools:blockhound:1.0.0.BUILD-SNAPSHOT'

blockHoundTestRuntime 'io.projectreactor.tools:blockhound:1.0.0.BUILD-SNAPSHOT'

//Optional JDK 9 Converter
jsr166backport "io.projectreactor:jsr166:1.0.0.RELEASE"

Expand Down Expand Up @@ -145,6 +155,11 @@ javadoc {
maxMemory = "1024m"
destinationDir = new File(project.buildDir, "docs/javadoc")

excludes = [
// Must be public due to the ServiceLoader's requirements
"reactor/core/scheduler/ReactorBlockHoundIntegration.java",
]

doLast {
// work around https://github.com/gradle/gradle/issues/4046
copy {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package reactor.core.scheduler;

import java.util.ServiceLoader;

import org.junit.Test;
import reactor.blockhound.integration.BlockHoundIntegration;

import static org.assertj.core.api.Assertions.assertThat;

public class ReactorBlockHoundIntegrationSPITest {

@Test
public void shouldSupportServiceLoader() {
assertThat(ServiceLoader.load(BlockHoundIntegration.class))
.hasAtLeastOneElementOfType(ReactorBlockHoundIntegration.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.scheduler;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import reactor.blockhound.BlockHound;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.ReactorBlockHoundIntegration;
import reactor.core.scheduler.Schedulers;

public class ReactorBlockHoundIntegrationTest {

static {
// Use the builder to load only our integration to avoid false positives
BlockHound.builder()
.with(new ReactorBlockHoundIntegration())
.install();
}

@Rule
public Timeout timeout = new Timeout(1, TimeUnit.SECONDS);

@Test
public void shouldDetectBlockingCalls() {
expectBlockingCall("java.lang.Thread.sleep", future -> {
Schedulers.parallel()
.schedule(() -> {
try {
Thread.sleep(10);
future.complete(null);
}
catch (Throwable e) {
future.completeExceptionally(e);
}
});
});
}

@Test
public void shouldDetectBlockingCallsOnSubscribe() {
expectBlockingCall("java.lang.Thread.yield", future -> {
Mono.fromRunnable(Thread::yield)
.subscribeOn(Schedulers.parallel())
.subscribe(future::complete, future::completeExceptionally);
});
}

@Test
public void shouldDetectBlockingCallsInOperators() {
expectBlockingCall("java.lang.Thread.yield", future -> {
Mono.delay(Duration.ofMillis(10))
.doOnNext(__ -> Thread.yield())
.subscribe(future::complete, future::completeExceptionally);
});
}

void expectBlockingCall(String desc, Consumer<CompletableFuture<Object>> callable) {
Assertions
.assertThatThrownBy(() -> {
CompletableFuture<Object> future = new CompletableFuture<>();
callable.accept(future);
future.join();
})
.hasMessageContaining("Blocking call! " + desc);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.scheduler;

import reactor.blockhound.BlockHound;
import reactor.blockhound.integration.BlockHoundIntegration;

import java.util.concurrent.ScheduledThreadPoolExecutor;

/**
* {@link BlockHoundIntegration} with Reactor's scheduling mechanism.
* Wraps every scheduled {@link Runnable} with a noop {@link Wrapper}, so that it can be
* detected as an entry point of the non-blocking call stack.
*
* WARNING: this class is not intended to be public, but {@link java.util.ServiceLoader}
* requires it to be so. Public visibility DOES NOT make it part of the public API.
*
* @since 3.3.0
*/
public final class ReactorBlockHoundIntegration implements BlockHoundIntegration {

@Override
public void applyTo(BlockHound.Builder builder) {
builder.nonBlockingThreadPredicate(current -> current.or(NonBlocking.class::isInstance));

// `ScheduledThreadPoolExecutor$DelayedWorkQueue.offer` parks the Thread with Unsafe#park.
builder.allowBlockingCallsInside(ScheduledThreadPoolExecutor.class.getName(), "scheduleAtFixedRate");

Schedulers.onScheduleHook("BlockHound", Wrapper::new);
builder.disallowBlockingCallsInside(Wrapper.class.getName(), "run");
}

static final class Wrapper implements Runnable {

final Runnable delegate;

Wrapper(Runnable delegate) {
this.delegate = delegate;
}

@Override
public void run() {
delegate.run();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
reactor.core.scheduler.ReactorBlockHoundIntegration

0 comments on commit c7cc8a7

Please sign in to comment.