Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop throwing on network sync calls #843

Merged
merged 1 commit into from
Aug 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import java.util.concurrent.TimeUnit;

/**
* Schedules a continuous running task as an alternative to
* a {@link Thread} running a {@code while(true)} loop
* Schedules a continuous running task as an alternative to a {@link Thread} running a {@code while(true)} loop
*
* Also allows delays if required between each execution of the loop
* <p>Also allows delays if required between each execution of the loop
*/
public class TesseraScheduledExecutor {

Expand All @@ -24,44 +23,47 @@ public class TesseraScheduledExecutor {
private final Runnable action;

private final long rate;

private final long initialDelay;

public TesseraScheduledExecutor(final ScheduledExecutorService executor,
final Runnable action,
final long rate,
final long delay) {
public TesseraScheduledExecutor(
final ScheduledExecutorService executor, final Runnable action, final long rate, final long delay) {
this.executor = Objects.requireNonNull(executor);
this.action = Objects.requireNonNull(action);
this.rate = rate;
this.initialDelay = delay;
}

/**
* Starts the submitted task and schedules it to run every given time frame
* Catches any Throwable and logs it so that the scheduling doesn't break
* Starts the submitted task and schedules it to run every given time frame. Catches any Throwable and logs it so
* that the scheduling doesn't break
*/
@PostConstruct
public void start() {
LOGGER.info("Starting {}", this.action.getClass().getSimpleName());

final Runnable exceptionSafeRunnable = () -> {
try {
this.action.run();
} catch (final Throwable ex) {
LOGGER.error("Error when executing action {}", action.getClass().getSimpleName());
LOGGER.error("Error when executing action", ex);
}
};
final Runnable exceptionSafeRunnable =
() -> {
try {
LOGGER.debug("{} has started running", getClass().getSimpleName());

this.action.run();
} catch (final Throwable ex) {
LOGGER.error("Error when executing action {}", action.getClass().getSimpleName());
LOGGER.error("Error when executing action", ex);
} finally {
LOGGER.debug("{} has finished running", getClass().getSimpleName());
}
};

this.executor.scheduleWithFixedDelay(exceptionSafeRunnable, initialDelay, rate, TimeUnit.MILLISECONDS);

LOGGER.info("Started {}", this.action.getClass().getSimpleName());
}

/**
* Stops any more executions of the submitted task from running
* Does not cancel the currently running task, which may be blocking
* Stops any more executions of the submitted task from running. Does not cancel the currently running task, which
* may be blocking
*/
@PreDestroy
public void stop() {
Expand All @@ -71,5 +73,4 @@ public void stop() {

LOGGER.info("Stopped {}", this.action.getClass().getSimpleName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.ConnectException;
import java.util.Objects;

/**
* Polls every so often to all known nodes for any new discoverable nodes This keeps all nodes up-to date and
* Polls every so often to all known nodes for any new discoverable nodes. This keeps all nodes up-to date and
* discoverable by other nodes
*/
public class PartyInfoPoller implements Runnable {
Expand All @@ -22,7 +21,7 @@ public class PartyInfoPoller implements Runnable {

private final P2pClient p2pClient;

public PartyInfoPoller(PartyInfoService partyInfoService, P2pClient p2pClient) {
public PartyInfoPoller(final PartyInfoService partyInfoService, final P2pClient p2pClient) {
this(partyInfoService, PartyInfoParser.create(), p2pClient);
}

Expand All @@ -47,18 +46,15 @@ public PartyInfoPoller(
*/
@Override
public void run() {
LOGGER.debug("Polling {}", getClass().getSimpleName());

final PartyInfo partyInfo = partyInfoService.getPartyInfo();

final byte[] encodedPartyInfo = partyInfoParser.to(partyInfo);

final String ourUrl = partyInfo.getUrl();

partyInfo.getParties().stream()
.filter(party -> !party.getUrl().equals(partyInfo.getUrl()))
.map(Party::getUrl)
.filter(url -> !ourUrl.equals(url))
.forEach(url -> pollSingleParty(url, encodedPartyInfo));

LOGGER.debug("Polled {}. PartyInfo : {}", getClass().getSimpleName(), partyInfo);
}

/**
Expand All @@ -68,19 +64,11 @@ public void run() {
* @param encodedPartyInfo the encoded current party information
*/
private void pollSingleParty(final String url, final byte[] encodedPartyInfo) {

try {
p2pClient.sendPartyInfo(url, encodedPartyInfo);
} catch (final Exception ex) {

if (ConnectException.class.isInstance(ex.getCause())) {
LOGGER.warn("Server error {} when connecting to {}", ex.getMessage(), url);
LOGGER.debug(null, ex);
} else {
LOGGER.error("Error {} while executing poller for {}. ", ex.getMessage(), url);
LOGGER.debug(null, ex);
throw ex;
}
LOGGER.warn("Error {} when connecting to {}", ex.getMessage(), url);
LOGGER.debug(null, ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import org.junit.Before;
import org.junit.Test;

import java.net.ConnectException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
Expand All @@ -20,6 +22,8 @@ public class PartyInfoPollerTest {

private static final String TARGET_URL = "http://bogus.com:9878/";

private static final String TARGET_URL_2 = "http://otherwebsite.com:9878/";

private static final byte[] DATA = "BOGUS".getBytes();

private PartyInfoService partyInfoService;
Expand All @@ -35,6 +39,9 @@ public void setUp() {
this.partyInfoService = mock(PartyInfoService.class);
this.partyInfoParser = mock(PartyInfoParser.class);
this.p2pClient = mock(P2pClient.class);

when(partyInfoParser.to(any(PartyInfo.class))).thenReturn(DATA);

this.partyInfoPoller = new PartyInfoPoller(partyInfoService, partyInfoParser, p2pClient);
}

Expand All @@ -45,109 +52,43 @@ public void tearDown() {

@Test
public void run() {

doReturn(true).when(p2pClient).sendPartyInfo(TARGET_URL, DATA);

final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(TARGET_URL)));
doReturn(partyInfo).when(partyInfoService).getPartyInfo();

doReturn(DATA).when(partyInfoParser).to(partyInfo);

final PartyInfo updatedPartyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(TARGET_URL)));
doReturn(updatedPartyInfo).when(partyInfoParser).from(DATA);
doReturn(true).when(p2pClient).sendPartyInfo(TARGET_URL, DATA);

partyInfoPoller.run();

verify(partyInfoService).getPartyInfo();

verify(partyInfoParser).to(partyInfo);

verify(p2pClient).sendPartyInfo(TARGET_URL, DATA);
}

@Test
public void testWhenURLIsOwn() {

doReturn(true).when(p2pClient).sendPartyInfo(OWN_URL, DATA);

final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(OWN_URL)));
doReturn(partyInfo).when(partyInfoService).getPartyInfo();
doReturn(DATA).when(partyInfoParser).to(partyInfo);

final PartyInfo updatedPartyInfo = mock(PartyInfo.class);
doReturn(updatedPartyInfo).when(partyInfoParser).from(DATA);

partyInfoPoller.run();

verify(partyInfoParser).to(partyInfo);
verify(partyInfoService).getPartyInfo();
}

@Test
public void testWhenPostFails() {

doReturn(false).when(p2pClient).sendPartyInfo(TARGET_URL, DATA);

final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(TARGET_URL)));

doReturn(partyInfo).when(partyInfoService).getPartyInfo();
doReturn(DATA).when(partyInfoParser).to(partyInfo);

final PartyInfo updatedPartyInfo = mock(PartyInfo.class);
doReturn(updatedPartyInfo).when(partyInfoParser).from(DATA);
doReturn(true).when(p2pClient).sendPartyInfo(OWN_URL, DATA);

partyInfoPoller.run();

verify(partyInfoParser, never()).from(DATA);
verify(partyInfoParser).to(partyInfo);
verify(partyInfoService).getPartyInfo();
verify(p2pClient).sendPartyInfo(TARGET_URL, DATA);
}

@Test
public void runThrowsException() {

final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(TARGET_URL)));

public void exceptionThrowByPostDoesntBubble() {
final Set<Party> parties = new HashSet<>(Arrays.asList(new Party(TARGET_URL), new Party(TARGET_URL_2)));
final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), parties);
doReturn(partyInfo).when(partyInfoService).getPartyInfo();
doReturn(DATA).when(partyInfoParser).to(partyInfo);

PartyInfo updatedPartyInfo = mock(PartyInfo.class);
doReturn(updatedPartyInfo).when(partyInfoParser).from(DATA);

doThrow(UnsupportedOperationException.class).when(p2pClient).sendPartyInfo(TARGET_URL, DATA);

final Throwable throwable = catchThrowable(partyInfoPoller::run);
assertThat(throwable).isInstanceOf(UnsupportedOperationException.class);

verify(p2pClient).sendPartyInfo(TARGET_URL, DATA);

verify(partyInfoService).getPartyInfo();
verify(partyInfoService, never()).updatePartyInfo(updatedPartyInfo);
verify(partyInfoParser, never()).from(DATA);
verify(partyInfoParser).to(partyInfo);
}

@Test
public void runThrowsConnectionExceptionAndDoesNotThrow() {

final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(TARGET_URL)));

doReturn(partyInfo).when(partyInfoService).getPartyInfo();
doReturn(DATA).when(partyInfoParser).to(partyInfo);

final PartyInfo updatedPartyInfo = mock(PartyInfo.class);
doReturn(updatedPartyInfo).when(partyInfoParser).from(DATA);

final RuntimeException connectionException = new RuntimeException(new ConnectException("OUCH"));
doThrow(connectionException).when(p2pClient).sendPartyInfo(TARGET_URL, DATA);

partyInfoPoller.run();

assertThat(throwable).isNull();
verify(p2pClient).sendPartyInfo(TARGET_URL, DATA);

verify(p2pClient).sendPartyInfo(TARGET_URL_2, DATA);
verify(partyInfoService).getPartyInfo();
verify(partyInfoParser, never()).from(DATA);
verify(partyInfoParser).to(partyInfo);
}

Expand Down