Skip to content
This repository has been archived by the owner on Apr 15, 2018. It is now read-only.

Add close() method to Coordination for resource cleanup #169

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ final class EtcdCoordination(clusterName: String, system: ActorSystem) extends C
override def refresh(self: Address, ttl: FiniteDuration) =
addSelfOrRefresh(self, ttl)

override def close(): Future[Done] =
Future.successful(Done)

private def addSelfOrRefresh(self: Address, ttl: FiniteDuration) = {
val node = getUrlEncoder.encodeToString(self.toString.getBytes(UTF_8))
val query = Uri.Query("ttl" -> toSeconds(ttl), "value" -> self.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ trait Coordination {
def getNodes(): Future[Set[Address]]

/**
* Akquire a lock for bootstrapping the cluster (first node).
* Acquire a lock for bootstrapping the cluster (first node).
*
* @param self self node
* @param ttl TTL for the lock
* @return true, if lock could be akquired, else false
* @return true, if lock could be acquired, else false
*/
def lock(self: Address, ttl: FiniteDuration): Future[Boolean]

Expand All @@ -80,4 +80,11 @@ trait Coordination {
* @return future signaling done
*/
def refresh(self: Address, ttl: FiniteDuration): Future[Done]

/**
* Performs resource cleanup on termination
*
* @return future signaling done
*/
def close(): Future[Done]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import akka.actor.{ Address, FSM, Props, Status }
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ InitialStateAsEvents, MemberJoined, MemberUp }
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import de.heikoseeberger.constructr.coordination.Coordination
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.util.{ Success, Failure => SFailure }

object ConstructrMachine {

Expand Down Expand Up @@ -105,8 +105,7 @@ final class ConstructrMachine(
s"ttl-factor must be greater or equal 1 + ((coordination-timeout * (1 + nr-of-retries) + retry-delay * nr-of-retries)/ refresh-interval), i.e. $minTtlFactor, but was $ttlFactor!"
)

private implicit val mat = ActorMaterializer()
private val cluster = Cluster(context.system)
private val cluster = Cluster(context.system)

startWith(State.GettingNodes, Data(Set.empty, State.GettingNodes, nrOfRetries))

Expand Down Expand Up @@ -295,6 +294,18 @@ final class ConstructrMachine(

initialize()

// Performs resource cleanup on termination

onTermination {
case se: StopEvent =>
log.warning("StopEvent received, reason: {}. Closing Coordinator for resource cleanup",
se.reason)
coordination.close().onComplete {
case Success(_) => log.info("Coordinator closed successfully")
case SFailure(e) => log.error(e, "Coordinator failed to close")
}
}

// Helpers

private def retry(retryState: ConstructrMachine.State) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ final class ConstructrMachineSpec extends WordSpec with Matchers with BeforeAndA
boom(),
delayed(1.hour.dilated, system.scheduler)(noNodes())
)
when(coordination.close()).thenReturn(
Future.successful(Done)
)

val monitor = TestProbe()
val machine = system.actorOf(
Expand Down Expand Up @@ -103,6 +106,9 @@ final class ConstructrMachineSpec extends WordSpec with Matchers with BeforeAndA
boom(),
Future.successful(Done)
)
when(coordination.close()).thenReturn(
Future.successful(Done)
)

val monitor = TestProbe()
val machine = system.actorOf(
Expand Down Expand Up @@ -228,6 +234,10 @@ final class ConstructrMachineSpec extends WordSpec with Matchers with BeforeAndA
Future.successful(Done)
)

when(coordination.close()).thenReturn(
Future.successful(Done)
)

val nrOfRetries = 2
val monitor = TestProbe()
val machine = system.actorOf(
Expand Down