Skip to content

Commit

Permalink
Spawn swap actors with stop supervisor strategy
Browse files Browse the repository at this point in the history
Restore a stopped swap with a checkpoint that does not include a result; otherwise, remove from the swap register.
  • Loading branch information
remyers committed Nov 24, 2022
1 parent 75f1c58 commit c7050fc
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ object SwapRole extends Enumeration {
val Taker: SwapRole.Value = Value(2, "Taker")
}

case class SwapData(request: SwapRequest, agreement: SwapAgreement, invoice: Bolt11Invoice, openingTxBroadcasted: OpeningTxBroadcasted, swapRole: SwapRole, isInitiator: Boolean, result: String = "")
case class SwapData(request: SwapRequest, agreement: SwapAgreement, invoice: Bolt11Invoice, openingTxBroadcasted: OpeningTxBroadcasted, swapRole: SwapRole, isInitiator: Boolean, result: String = "") {
val swapId: String = request.swapId
val scid: String = request.scid
}

object SwapData {
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,8 @@ object SwapEvents {
case class ClaimByCsvConfirmed(swapId: String, confirmation: WatchTxConfirmedTriggered) extends SwapEvent {
override def toString: String = s"Claimed by csv: $confirmation"
}
case class CouldNotRestore(swapId: String, checkpoint: SwapData) extends SwapEvent {
override def toString: String = s"Could not restore from checkpoint: $checkpoint"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ object SwapMaker {
ShortChannelId.fromCoordinates(d.request.scid) match {
case Success(shortChannelId) => new SwapMaker(shortChannelId, nodeParams, watcher, register, wallet, keyManager, db, context)
.awaitClaimPayment(d.request, d.agreement, d.invoice, d.openingTxBroadcasted, d.isInitiator)
case Failure(e) => context.log.error(s"could not restore swap sender with invalid shortChannelId: $d, $e")
case Failure(e) => context.log.error(s"Could not restore from a checkpoint with an invalid shortChannelId: $d, $e")
db.addResult(CouldNotRestore(d.swapId, d))
Behaviors.stopped
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object SwapRegister {
// @formatter:on

def apply(nodeParams: NodeParams, paymentInitiator: actor.ActorRef, watcher: ActorRef[ZmqWatcher.Command], register: actor.ActorRef, wallet: OnChainWallet, keyManager: SwapKeyManager, db: SwapsDb, data: Set[SwapData]): Behavior[Command] = Behaviors.setup { context =>
new SwapRegister(context, nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db, data).initializing
new SwapRegister(context, nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db, data).start
}
}

Expand All @@ -85,19 +85,19 @@ private class SwapRegister(context: ActorContext[Command], nodeParams: NodeParam
context.messageAdapter[UnknownMessageReceived](WrappedUnknownMessageReceived)
}

private def initializing: Behavior[Command] = {
val swaps = data.map { state =>
val swap: typed.ActorRef[SwapCommands.SwapCommand] = {
state.swapRole match {
case SwapRole.Maker => context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db))
.onFailure(typed.SupervisorStrategy.restart), "SwapMaker-" + state.request.scid)
case SwapRole.Taker => context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db))
.onFailure(typed.SupervisorStrategy.restart), "SwapTaker-" + state.request.scid)
}
}
context.watchWith(swap, SwapTerminated(state.request.swapId))
swap ! RestoreSwap(state)
state.request.swapId -> SwapEntry(state.request.scid, swap.unsafeUpcast)
private def restoreSwap(checkPoint: SwapData): (String, SwapEntry) = {
val swap = checkPoint.swapRole match {
case SwapRole.Maker => context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db)).onFailure(typed.SupervisorStrategy.stop), "SwapMaker-" + checkPoint.scid)
case SwapRole.Taker => context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db)).onFailure(typed.SupervisorStrategy.stop), "SwapTaker-" + checkPoint.scid)
}
context.watchWith(swap, SwapTerminated(checkPoint.swapId))
swap ! RestoreSwap(checkPoint)
checkPoint.swapId -> SwapEntry(checkPoint.scid, swap.unsafeUpcast)
}

private def start: Behavior[Command] = {
val swaps = data.map {
restoreSwap
}.toMap
registering(swaps)
}
Expand All @@ -111,16 +111,14 @@ private class SwapRegister(context: ActorContext[Command], nodeParams: NodeParam
Behaviors.same
case SwapInRequested(replyTo, amount, shortChannelId) =>
val swapId = randomBytes32().toHex
val swap = context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db))
.onFailure(SupervisorStrategy.restart), "Swap-" + shortChannelId.toString)
val swap = context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db)).onFailure(SupervisorStrategy.stop), "Swap-" + shortChannelId.toString)
context.watchWith(swap, SwapTerminated(swapId))
swap ! StartSwapInSender(amount, swapId, shortChannelId)
replyTo ! SwapOpened(swapId)
registering(swaps + (swapId -> SwapEntry(shortChannelId.toCoordinatesString, swap)))
case SwapOutRequested(replyTo, amount, shortChannelId) =>
val swapId = randomBytes32().toHex
val swap = context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db))
.onFailure(SupervisorStrategy.restart), "Swap-" + shortChannelId.toString)
val swap = context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db)).onFailure(SupervisorStrategy.stop), "Swap-" + shortChannelId.toString)
context.watchWith(swap, SwapTerminated(swapId))
swap ! StartSwapOutSender(amount, swapId, shortChannelId)
replyTo ! SwapOpened(swapId)
Expand All @@ -136,7 +134,14 @@ private class SwapRegister(context: ActorContext[Command], nodeParams: NodeParam
}
Behaviors.same
case SwapTerminated(swapId) =>
registering(swaps - swapId)
db.restore().collectFirst({
case checkPoint if checkPoint.swapId == swapId =>
context.log.error(s"Swap $swapId stopped prematurely after saving a checkpoint, but before recording a result.")
restoreSwap(checkPoint)
}) match {
case None => registering (swaps - swapId)
case Some (restoredSwap) => registering (swaps + restoredSwap)
}
case WrappedUnknownMessageReceived(unknownMessageReceived) =>
if (PeerSwapPlugin.peerSwapTags.contains(unknownMessageReceived.message.tag)) {
peerSwapMessageCodec.decode(unknownMessageCodec.encode(unknownMessageReceived.message).require) match {
Expand All @@ -145,14 +150,12 @@ private class SwapRegister(context: ActorContext[Command], nodeParams: NodeParam
context.log.info(s"ignoring swap request for a channel with an active swap: $swapRequest")
Behaviors.same
case request: SwapInRequest =>
val swap = context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db))
.onFailure(SupervisorStrategy.restart), "Swap-" + request.scid)
val swap = context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db)).onFailure(SupervisorStrategy.restart), "Swap-" + request.scid)
context.watchWith(swap, SwapTerminated(request.swapId))
swap ! StartSwapInReceiver(request)
registering(swaps + (request.swapId -> SwapEntry(request.scid, swap)))
case request: SwapOutRequest =>
val swap = context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db))
.onFailure(SupervisorStrategy.restart), "Swap-" + request.scid)
val swap = context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db)).onFailure(SupervisorStrategy.restart), "Swap-" + request.scid)
context.watchWith(swap, SwapTerminated(request.swapId))
swap ! StartSwapOutReceiver(request)
registering(swaps + (request.swapId -> SwapEntry(request.scid, swap)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ object SwapTaker {
// if payment was not yet sent, fail the swap
swap.sendCoopClose(d.request, s"Lightning payment not sent.")
)
case Failure(e) => context.log.error(s"could not restore swap receiver with invalid shortChannelId: $d, $e")
case Failure(e) => context.log.error(s"Could not restore from a checkpoint with an invalid shortChannelId: $d, $e")
db.addResult(CouldNotRestore(d.swapId, d))
Behaviors.stopped
}
}
Expand Down

0 comments on commit c7050fc

Please sign in to comment.