Skip to content

Commit

Permalink
Merge pull request #843 from prd-fox/842-partyinfo-continuation
Browse files Browse the repository at this point in the history
Stop throwing on network sync calls
  • Loading branch information
Krish1979 authored Aug 5, 2019
2 parents 6693bbd + f733123 commit 5ee1144
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import java.util.concurrent.TimeUnit;

/**
* Schedules a continuous running task as an alternative to
* a {@link Thread} running a {@code while(true)} loop
* Schedules a continuous running task as an alternative to a {@link Thread} running a {@code while(true)} loop
*
* Also allows delays if required between each execution of the loop
* <p>Also allows delays if required between each execution of the loop
*/
public class TesseraScheduledExecutor {

Expand All @@ -24,44 +23,47 @@ public class TesseraScheduledExecutor {
private final Runnable action;

private final long rate;

private final long initialDelay;

public TesseraScheduledExecutor(final ScheduledExecutorService executor,
final Runnable action,
final long rate,
final long delay) {
public TesseraScheduledExecutor(
final ScheduledExecutorService executor, final Runnable action, final long rate, final long delay) {
this.executor = Objects.requireNonNull(executor);
this.action = Objects.requireNonNull(action);
this.rate = rate;
this.initialDelay = delay;
}

/**
* Starts the submitted task and schedules it to run every given time frame
* Catches any Throwable and logs it so that the scheduling doesn't break
* Starts the submitted task and schedules it to run every given time frame. Catches any Throwable and logs it so
* that the scheduling doesn't break
*/
@PostConstruct
public void start() {
LOGGER.info("Starting {}", this.action.getClass().getSimpleName());

final Runnable exceptionSafeRunnable = () -> {
try {
this.action.run();
} catch (final Throwable ex) {
LOGGER.error("Error when executing action {}", action.getClass().getSimpleName());
LOGGER.error("Error when executing action", ex);
}
};
final Runnable exceptionSafeRunnable =
() -> {
try {
LOGGER.debug("{} has started running", getClass().getSimpleName());

this.action.run();
} catch (final Throwable ex) {
LOGGER.error("Error when executing action {}", action.getClass().getSimpleName());
LOGGER.error("Error when executing action", ex);
} finally {
LOGGER.debug("{} has finished running", getClass().getSimpleName());
}
};

this.executor.scheduleWithFixedDelay(exceptionSafeRunnable, initialDelay, rate, TimeUnit.MILLISECONDS);

LOGGER.info("Started {}", this.action.getClass().getSimpleName());
}

/**
* Stops any more executions of the submitted task from running
* Does not cancel the currently running task, which may be blocking
* Stops any more executions of the submitted task from running. Does not cancel the currently running task, which
* may be blocking
*/
@PreDestroy
public void stop() {
Expand All @@ -71,5 +73,4 @@ public void stop() {

LOGGER.info("Stopped {}", this.action.getClass().getSimpleName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.ConnectException;
import java.util.Objects;

/**
* Polls every so often to all known nodes for any new discoverable nodes This keeps all nodes up-to date and
* Polls every so often to all known nodes for any new discoverable nodes. This keeps all nodes up-to date and
* discoverable by other nodes
*/
public class PartyInfoPoller implements Runnable {
Expand All @@ -22,7 +21,7 @@ public class PartyInfoPoller implements Runnable {

private final P2pClient p2pClient;

public PartyInfoPoller(PartyInfoService partyInfoService, P2pClient p2pClient) {
public PartyInfoPoller(final PartyInfoService partyInfoService, final P2pClient p2pClient) {
this(partyInfoService, PartyInfoParser.create(), p2pClient);
}

Expand All @@ -47,18 +46,15 @@ public PartyInfoPoller(
*/
@Override
public void run() {
LOGGER.debug("Polling {}", getClass().getSimpleName());

final PartyInfo partyInfo = partyInfoService.getPartyInfo();

final byte[] encodedPartyInfo = partyInfoParser.to(partyInfo);

final String ourUrl = partyInfo.getUrl();

partyInfo.getParties().stream()
.filter(party -> !party.getUrl().equals(partyInfo.getUrl()))
.map(Party::getUrl)
.filter(url -> !ourUrl.equals(url))
.forEach(url -> pollSingleParty(url, encodedPartyInfo));

LOGGER.debug("Polled {}. PartyInfo : {}", getClass().getSimpleName(), partyInfo);
}

/**
Expand All @@ -68,19 +64,11 @@ public void run() {
* @param encodedPartyInfo the encoded current party information
*/
private void pollSingleParty(final String url, final byte[] encodedPartyInfo) {

try {
p2pClient.sendPartyInfo(url, encodedPartyInfo);
} catch (final Exception ex) {

if (ConnectException.class.isInstance(ex.getCause())) {
LOGGER.warn("Server error {} when connecting to {}", ex.getMessage(), url);
LOGGER.debug(null, ex);
} else {
LOGGER.error("Error {} while executing poller for {}. ", ex.getMessage(), url);
LOGGER.debug(null, ex);
throw ex;
}
LOGGER.warn("Error {} when connecting to {}", ex.getMessage(), url);
LOGGER.debug(null, ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import org.junit.Before;
import org.junit.Test;

import java.net.ConnectException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
Expand All @@ -20,6 +22,8 @@ public class PartyInfoPollerTest {

private static final String TARGET_URL = "http://bogus.com:9878/";

private static final String TARGET_URL_2 = "http://otherwebsite.com:9878/";

private static final byte[] DATA = "BOGUS".getBytes();

private PartyInfoService partyInfoService;
Expand All @@ -35,6 +39,9 @@ public void setUp() {
this.partyInfoService = mock(PartyInfoService.class);
this.partyInfoParser = mock(PartyInfoParser.class);
this.p2pClient = mock(P2pClient.class);

when(partyInfoParser.to(any(PartyInfo.class))).thenReturn(DATA);

this.partyInfoPoller = new PartyInfoPoller(partyInfoService, partyInfoParser, p2pClient);
}

Expand All @@ -45,109 +52,43 @@ public void tearDown() {

@Test
public void run() {

doReturn(true).when(p2pClient).sendPartyInfo(TARGET_URL, DATA);

final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(TARGET_URL)));
doReturn(partyInfo).when(partyInfoService).getPartyInfo();

doReturn(DATA).when(partyInfoParser).to(partyInfo);

final PartyInfo updatedPartyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(TARGET_URL)));
doReturn(updatedPartyInfo).when(partyInfoParser).from(DATA);
doReturn(true).when(p2pClient).sendPartyInfo(TARGET_URL, DATA);

partyInfoPoller.run();

verify(partyInfoService).getPartyInfo();

verify(partyInfoParser).to(partyInfo);

verify(p2pClient).sendPartyInfo(TARGET_URL, DATA);
}

@Test
public void testWhenURLIsOwn() {

doReturn(true).when(p2pClient).sendPartyInfo(OWN_URL, DATA);

final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(OWN_URL)));
doReturn(partyInfo).when(partyInfoService).getPartyInfo();
doReturn(DATA).when(partyInfoParser).to(partyInfo);

final PartyInfo updatedPartyInfo = mock(PartyInfo.class);
doReturn(updatedPartyInfo).when(partyInfoParser).from(DATA);

partyInfoPoller.run();

verify(partyInfoParser).to(partyInfo);
verify(partyInfoService).getPartyInfo();
}

@Test
public void testWhenPostFails() {

doReturn(false).when(p2pClient).sendPartyInfo(TARGET_URL, DATA);

final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(TARGET_URL)));

doReturn(partyInfo).when(partyInfoService).getPartyInfo();
doReturn(DATA).when(partyInfoParser).to(partyInfo);

final PartyInfo updatedPartyInfo = mock(PartyInfo.class);
doReturn(updatedPartyInfo).when(partyInfoParser).from(DATA);
doReturn(true).when(p2pClient).sendPartyInfo(OWN_URL, DATA);

partyInfoPoller.run();

verify(partyInfoParser, never()).from(DATA);
verify(partyInfoParser).to(partyInfo);
verify(partyInfoService).getPartyInfo();
verify(p2pClient).sendPartyInfo(TARGET_URL, DATA);
}

@Test
public void runThrowsException() {

final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(TARGET_URL)));

public void exceptionThrowByPostDoesntBubble() {
final Set<Party> parties = new HashSet<>(Arrays.asList(new Party(TARGET_URL), new Party(TARGET_URL_2)));
final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), parties);
doReturn(partyInfo).when(partyInfoService).getPartyInfo();
doReturn(DATA).when(partyInfoParser).to(partyInfo);

PartyInfo updatedPartyInfo = mock(PartyInfo.class);
doReturn(updatedPartyInfo).when(partyInfoParser).from(DATA);

doThrow(UnsupportedOperationException.class).when(p2pClient).sendPartyInfo(TARGET_URL, DATA);

final Throwable throwable = catchThrowable(partyInfoPoller::run);
assertThat(throwable).isInstanceOf(UnsupportedOperationException.class);

verify(p2pClient).sendPartyInfo(TARGET_URL, DATA);

verify(partyInfoService).getPartyInfo();
verify(partyInfoService, never()).updatePartyInfo(updatedPartyInfo);
verify(partyInfoParser, never()).from(DATA);
verify(partyInfoParser).to(partyInfo);
}

@Test
public void runThrowsConnectionExceptionAndDoesNotThrow() {

final PartyInfo partyInfo = new PartyInfo(OWN_URL, emptySet(), singleton(new Party(TARGET_URL)));

doReturn(partyInfo).when(partyInfoService).getPartyInfo();
doReturn(DATA).when(partyInfoParser).to(partyInfo);

final PartyInfo updatedPartyInfo = mock(PartyInfo.class);
doReturn(updatedPartyInfo).when(partyInfoParser).from(DATA);

final RuntimeException connectionException = new RuntimeException(new ConnectException("OUCH"));
doThrow(connectionException).when(p2pClient).sendPartyInfo(TARGET_URL, DATA);

partyInfoPoller.run();

assertThat(throwable).isNull();
verify(p2pClient).sendPartyInfo(TARGET_URL, DATA);

verify(p2pClient).sendPartyInfo(TARGET_URL_2, DATA);
verify(partyInfoService).getPartyInfo();
verify(partyInfoParser, never()).from(DATA);
verify(partyInfoParser).to(partyInfo);
}

Expand Down

0 comments on commit 5ee1144

Please sign in to comment.