Skip to content

Commit

Permalink
shard-manager: only allow alive pods to register (#146)
Browse files Browse the repository at this point in the history
* shard-manager: only allow alive pods to register

* Fail when unhealthy pod tries to register

* Clarify error message
  • Loading branch information
yoohaemin authored Oct 17, 2024
1 parent 3cfb97b commit ac770c8
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,23 @@ class ShardManager(
def getShardingEvents: ZStream[Any, Nothing, ShardingEvent] =
ZStream.fromHub(eventsHub)

def register(pod: Pod): UIO[Unit] =
for {
_ <- ZIO.logInfo(s"Registering $pod")
state <- stateRef.updateAndGetZIO(state =>
ZIO
.succeed(OffsetDateTime.now())
.map(cdt => state.copy(pods = state.pods.updated(pod.address, PodWithMetadata(pod, cdt))))
)
_ <- ManagerMetrics.pods.increment
_ <- eventsHub.publish(ShardingEvent.PodRegistered(pod.address))
_ <- ZIO.when(state.unassignedShards.nonEmpty)(rebalance(false))
_ <- persistPods.forkDaemon
} yield ()
def register(pod: Pod): Task[Unit] =
ZIO.ifZIO(healthApi.isAlive(pod.address))(
onTrue = for {
_ <- ZIO.logInfo(s"Registering $pod")
state <- stateRef.updateAndGetZIO(state =>
ZIO
.succeed(OffsetDateTime.now())
.map(cdt => state.copy(pods = state.pods.updated(pod.address, PodWithMetadata(pod, cdt))))
)
_ <- ManagerMetrics.pods.increment
_ <- eventsHub.publish(ShardingEvent.PodRegistered(pod.address))
_ <- ZIO.when(state.unassignedShards.nonEmpty)(rebalance(rebalanceImmediately = false))
_ <- persistPods.forkDaemon
} yield (),
onFalse = ZIO.logWarning(s"Pod $pod requested to register but is not alive, ignoring") *>
ZIO.fail(new RuntimeException(s"Pod $pod is not healthy, refusing to register"))
)

def notifyUnhealthyPod(podAddress: PodAddress): UIO[Unit] =
ZIO
Expand Down

0 comments on commit ac770c8

Please sign in to comment.