diff --git a/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapData.scala b/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapData.scala index 4baae92e55..e93f399a16 100644 --- a/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapData.scala +++ b/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapData.scala @@ -26,7 +26,10 @@ object SwapRole extends Enumeration { val Taker: SwapRole.Value = Value(2, "Taker") } -case class SwapData(request: SwapRequest, agreement: SwapAgreement, invoice: Bolt11Invoice, openingTxBroadcasted: OpeningTxBroadcasted, swapRole: SwapRole, isInitiator: Boolean, result: String = "") +case class SwapData(request: SwapRequest, agreement: SwapAgreement, invoice: Bolt11Invoice, openingTxBroadcasted: OpeningTxBroadcasted, swapRole: SwapRole, isInitiator: Boolean, result: String = "") { + val swapId: String = request.swapId + val scid: String = request.scid +} object SwapData { } diff --git a/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapEvents.scala b/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapEvents.scala index eef6bca5b0..9ff2e75a72 100644 --- a/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapEvents.scala +++ b/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapEvents.scala @@ -42,5 +42,8 @@ object SwapEvents { case class ClaimByCsvConfirmed(swapId: String, confirmation: WatchTxConfirmedTriggered) extends SwapEvent { override def toString: String = s"Claimed by csv: $confirmation" } + case class CouldNotRestore(swapId: String, checkpoint: SwapData) extends SwapEvent { + override def toString: String = s"Could not restore from checkpoint: $checkpoint" + } } diff --git a/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapMaker.scala b/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapMaker.scala index db797e007f..228f1cf92c 100644 --- a/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapMaker.scala +++ b/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapMaker.scala @@ -123,7 +123,8 @@ object SwapMaker { ShortChannelId.fromCoordinates(d.request.scid) match { case Success(shortChannelId) => new SwapMaker(shortChannelId, nodeParams, watcher, register, wallet, keyManager, db, context) .awaitClaimPayment(d.request, d.agreement, d.invoice, d.openingTxBroadcasted, d.isInitiator) - case Failure(e) => context.log.error(s"could not restore swap sender with invalid shortChannelId: $d, $e") + case Failure(e) => context.log.error(s"Could not restore from a checkpoint with an invalid shortChannelId: $d, $e") + db.addResult(CouldNotRestore(d.swapId, d)) Behaviors.stopped } } diff --git a/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapRegister.scala b/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapRegister.scala index bdb85b1c45..8c62c80870 100644 --- a/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapRegister.scala +++ b/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapRegister.scala @@ -61,7 +61,7 @@ object SwapRegister { // @formatter:on def apply(nodeParams: NodeParams, paymentInitiator: actor.ActorRef, watcher: ActorRef[ZmqWatcher.Command], register: actor.ActorRef, wallet: OnChainWallet, keyManager: SwapKeyManager, db: SwapsDb, data: Set[SwapData]): Behavior[Command] = Behaviors.setup { context => - new SwapRegister(context, nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db, data).initializing + new SwapRegister(context, nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db, data).start } } @@ -85,19 +85,19 @@ private class SwapRegister(context: ActorContext[Command], nodeParams: NodeParam context.messageAdapter[UnknownMessageReceived](WrappedUnknownMessageReceived) } - private def initializing: Behavior[Command] = { - val swaps = data.map { state => - val swap: typed.ActorRef[SwapCommands.SwapCommand] = { - state.swapRole match { - case SwapRole.Maker => context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db)) - .onFailure(typed.SupervisorStrategy.restart), "SwapMaker-" + state.request.scid) - case SwapRole.Taker => context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db)) - .onFailure(typed.SupervisorStrategy.restart), "SwapTaker-" + state.request.scid) - } - } - context.watchWith(swap, SwapTerminated(state.request.swapId)) - swap ! RestoreSwap(state) - state.request.swapId -> SwapEntry(state.request.scid, swap.unsafeUpcast) + private def restoreSwap(checkPoint: SwapData): (String, SwapEntry) = { + val swap = checkPoint.swapRole match { + case SwapRole.Maker => context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db)).onFailure(typed.SupervisorStrategy.stop), "SwapMaker-" + checkPoint.scid) + case SwapRole.Taker => context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db)).onFailure(typed.SupervisorStrategy.stop), "SwapTaker-" + checkPoint.scid) + } + context.watchWith(swap, SwapTerminated(checkPoint.swapId)) + swap ! RestoreSwap(checkPoint) + checkPoint.swapId -> SwapEntry(checkPoint.scid, swap.unsafeUpcast) + } + + private def start: Behavior[Command] = { + val swaps = data.map { + restoreSwap }.toMap registering(swaps) } @@ -111,16 +111,14 @@ private class SwapRegister(context: ActorContext[Command], nodeParams: NodeParam Behaviors.same case SwapInRequested(replyTo, amount, shortChannelId) => val swapId = randomBytes32().toHex - val swap = context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db)) - .onFailure(SupervisorStrategy.restart), "Swap-" + shortChannelId.toString) + val swap = context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db)).onFailure(SupervisorStrategy.stop), "Swap-" + shortChannelId.toString) context.watchWith(swap, SwapTerminated(swapId)) swap ! StartSwapInSender(amount, swapId, shortChannelId) replyTo ! SwapOpened(swapId) registering(swaps + (swapId -> SwapEntry(shortChannelId.toCoordinatesString, swap))) case SwapOutRequested(replyTo, amount, shortChannelId) => val swapId = randomBytes32().toHex - val swap = context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db)) - .onFailure(SupervisorStrategy.restart), "Swap-" + shortChannelId.toString) + val swap = context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db)).onFailure(SupervisorStrategy.stop), "Swap-" + shortChannelId.toString) context.watchWith(swap, SwapTerminated(swapId)) swap ! StartSwapOutSender(amount, swapId, shortChannelId) replyTo ! SwapOpened(swapId) @@ -136,7 +134,14 @@ private class SwapRegister(context: ActorContext[Command], nodeParams: NodeParam } Behaviors.same case SwapTerminated(swapId) => - registering(swaps - swapId) + db.restore().collectFirst({ + case checkPoint if checkPoint.swapId == swapId => + context.log.error(s"Swap $swapId stopped prematurely after saving a checkpoint, but before recording a result.") + restoreSwap(checkPoint) + }) match { + case None => registering (swaps - swapId) + case Some (restoredSwap) => registering (swaps + restoredSwap) + } case WrappedUnknownMessageReceived(unknownMessageReceived) => if (PeerSwapPlugin.peerSwapTags.contains(unknownMessageReceived.message.tag)) { peerSwapMessageCodec.decode(unknownMessageCodec.encode(unknownMessageReceived.message).require) match { @@ -145,14 +150,12 @@ private class SwapRegister(context: ActorContext[Command], nodeParams: NodeParam context.log.info(s"ignoring swap request for a channel with an active swap: $swapRequest") Behaviors.same case request: SwapInRequest => - val swap = context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db)) - .onFailure(SupervisorStrategy.restart), "Swap-" + request.scid) + val swap = context.spawn(Behaviors.supervise(SwapTaker(nodeParams, paymentInitiator, watcher, register, wallet, keyManager, db)).onFailure(SupervisorStrategy.restart), "Swap-" + request.scid) context.watchWith(swap, SwapTerminated(request.swapId)) swap ! StartSwapInReceiver(request) registering(swaps + (request.swapId -> SwapEntry(request.scid, swap))) case request: SwapOutRequest => - val swap = context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db)) - .onFailure(SupervisorStrategy.restart), "Swap-" + request.scid) + val swap = context.spawn(Behaviors.supervise(SwapMaker(nodeParams, watcher, register, wallet, keyManager, db)).onFailure(SupervisorStrategy.restart), "Swap-" + request.scid) context.watchWith(swap, SwapTerminated(request.swapId)) swap ! StartSwapOutReceiver(request) registering(swaps + (request.swapId -> SwapEntry(request.scid, swap))) diff --git a/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapTaker.scala b/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapTaker.scala index 8cdc6f11e5..e0b615d77e 100644 --- a/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapTaker.scala +++ b/plugins/peerswap/src/main/scala/fr/acinq/eclair/plugins/peerswap/SwapTaker.scala @@ -130,7 +130,8 @@ object SwapTaker { // if payment was not yet sent, fail the swap swap.sendCoopClose(d.request, s"Lightning payment not sent.") ) - case Failure(e) => context.log.error(s"could not restore swap receiver with invalid shortChannelId: $d, $e") + case Failure(e) => context.log.error(s"Could not restore from a checkpoint with an invalid shortChannelId: $d, $e") + db.addResult(CouldNotRestore(d.swapId, d)) Behaviors.stopped } }