Skip to content

Commit

Permalink
Refactor to use Peer instead of Switchboard
Browse files Browse the repository at this point in the history
 - use existing GetPeerInfo when creating or restoring a swap to find the remote peer actor
 - use Peer.RelayUnknownMessage to send custom Lightning messages
  • Loading branch information
remyers committed Feb 9, 2023
1 parent 3b6e111 commit c9911e4
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class PeerSwapPlugin extends Plugin with RouteProvider with Logging {
}

override def onKit(kit: Kit): Unit = {
val data = db.restore().toSet
val data = db.restore()
val swapRegister = kit.system.spawn(Behaviors.supervise(SwapRegister(kit.nodeParams, kit.paymentInitiator, kit.watcher, kit.register, kit.switchboard, kit.wallet, swapKeyManager, db, data)).onFailure(SupervisorStrategy.restart), "peerswap-plugin-swap-register")
pluginKit = PeerSwapKit(kit.nodeParams, kit.system, swapRegister, db)
}
Expand All @@ -101,10 +101,10 @@ case class PeerSwapKit(nodeParams: NodeParams, system: ActorSystem, swapRegister
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-pending-commands").build()))

def swapIn(shortChannelId: ShortChannelId, amount: Satoshi)(implicit timeout: Timeout): Future[Response] =
swapRegister.ask(ref => SwapRegister.SwapRequested(ref, Maker, amount, shortChannelId, None))(timeout, system.scheduler.toTyped)
swapRegister.ask(ref => SwapRegister.SwapRequested(ref, Maker, amount, shortChannelId, None, None))(timeout, system.scheduler.toTyped)

def swapOut(shortChannelId: ShortChannelId, amount: Satoshi)(implicit timeout: Timeout): Future[Response] =
swapRegister.ask(ref => SwapRegister.SwapRequested(ref, Taker, amount, shortChannelId, None))(timeout, system.scheduler.toTyped)
swapRegister.ask(ref => SwapRegister.SwapRequested(ref, Taker, amount, shortChannelId, None, None))(timeout, system.scheduler.toTyped)

def listSwaps()(implicit timeout: Timeout): Future[Iterable[Status]] =
swapRegister.ask(ref => SwapRegister.ListPendingSwaps(ref))(timeout, system.scheduler.toTyped)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.ScriptFlags
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Crypto, Satoshi, Transaction}
import fr.acinq.eclair.Features.RouteBlinding
import fr.acinq.eclair.MilliSatoshi.toMilliSatoshi
import fr.acinq.eclair.blockchain.OnChainWallet
import fr.acinq.eclair.blockchain.OnChainWallet.MakeFundingTxResponse
Expand All @@ -32,7 +31,7 @@ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.db.OutgoingPaymentStatus.{Failed, Pending, Succeeded}
import fr.acinq.eclair.db.PaymentType
import fr.acinq.eclair.io.Switchboard.ForwardUnknownMessage
import fr.acinq.eclair.io.Peer.RelayUnknownMessage
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentToNode
import fr.acinq.eclair.payment.{Bolt11Invoice, PaymentFailed, PaymentReceived, PaymentSent}
import fr.acinq.eclair.plugins.peerswap.SwapCommands._
Expand Down Expand Up @@ -99,8 +98,9 @@ private object SwapHelpers {
UnknownMessage(encoded.sliceToInt(0, 16, signed = false), encoded.drop(16).toByteVector)
}

def send(switchboard: actor.ActorRef, remoteNodeId: PublicKey)(message: HasSwapId): Unit =
switchboard ! ForwardUnknownMessage(remoteNodeId, makeUnknownMessage(message))
def send(remotePeer: actor.ActorRef)(message: HasSwapId): Unit = {
remotePeer ! RelayUnknownMessage(makeUnknownMessage(message))
}

def fundOpening(wallet: OnChainWallet, feeRatePerKw: FeeratePerKw)(amount: Satoshi, makerPubkey: PublicKey, takerPubkey: PublicKey, invoice: Bolt11Invoice)(implicit context: ActorContext[SwapCommand]): Unit = {
// setup conditions satisfied, create the opening tx
Expand Down Expand Up @@ -158,10 +158,9 @@ private object SwapHelpers {
def createInvoice(nodeParams: NodeParams, amount: Satoshi, description: String)(implicit context: ActorContext[SwapCommand]): Try[Bolt11Invoice] =
Try {
val paymentPreimage = randomBytes32()
val invoiceFeatures = nodeParams.features.invoiceFeatures().remove(RouteBlinding)
val invoice: Bolt11Invoice = Bolt11Invoice(nodeParams.chainHash, Some(toMilliSatoshi(amount)), Crypto.sha256(paymentPreimage), nodeParams.privateKey, Left(description),
nodeParams.channelConf.minFinalExpiryDelta, fallbackAddress = None, expirySeconds = Some(nodeParams.invoiceExpiry.toSeconds),
extraHops = Nil, timestamp = TimestampSecond.now(), paymentSecret = paymentPreimage, paymentMetadata = None, features = invoiceFeatures)
extraHops = Nil, timestamp = TimestampSecond.now(), paymentSecret = paymentPreimage, paymentMetadata = None)
context.log.debug("generated invoice={} from amount={} sat, description={}", invoice.toString, amount, description)
nodeParams.db.payments.addIncomingPayment(invoice, paymentPreimage, PaymentType.Standard)
invoice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,21 @@ object SwapMaker {
*/

def apply(remoteNodeId: PublicKey, nodeParams: NodeParams, watcher: ActorRef[ZmqWatcher.Command], switchboard: actor.ActorRef, wallet: OnChainWallet, keyManager: SwapKeyManager, db: SwapsDb): Behavior[SwapCommands.SwapCommand] =
def apply(remoteNodeId: PublicKey, nodeParams: NodeParams, watcher: ActorRef[ZmqWatcher.Command], remotePeer: actor.ActorRef, wallet: OnChainWallet, keyManager: SwapKeyManager, db: SwapsDb): Behavior[SwapCommands.SwapCommand] =
Behaviors.setup { context =>
Behaviors.receiveMessagePartial {
case StartSwapInSender(amount, swapId, shortChannelId) => new SwapMaker(remoteNodeId, shortChannelId, nodeParams, watcher, switchboard, wallet, keyManager, db, context)
case StartSwapInSender(amount, swapId, shortChannelId) => new SwapMaker(remoteNodeId, shortChannelId, nodeParams, watcher, remotePeer, wallet, keyManager, db, context)
.createSwap(amount, swapId)
case StartSwapOutReceiver(request: SwapOutRequest) =>
ShortChannelId.fromCoordinates(request.scid) match {
case Success(shortChannelId) => new SwapMaker(remoteNodeId, shortChannelId, nodeParams, watcher, switchboard, wallet, keyManager, db, context)
case Success(shortChannelId) => new SwapMaker(remoteNodeId, shortChannelId, nodeParams, watcher, remotePeer, wallet, keyManager, db, context)
.validateRequest(request)
case Failure(e) => context.log.error(s"received swap request with invalid shortChannelId: $request, $e")
Behaviors.stopped
}
case RestoreSwap(d) =>
ShortChannelId.fromCoordinates(d.request.scid) match {
case Success(shortChannelId) => new SwapMaker(remoteNodeId, shortChannelId, nodeParams, watcher, switchboard, wallet, keyManager, db, context)
case Success(shortChannelId) => new SwapMaker(remoteNodeId, shortChannelId, nodeParams, watcher, remotePeer, wallet, keyManager, db, context)
.awaitClaimPayment(d.request, d.agreement, d.invoice, d.openingTxBroadcasted, d.isInitiator)
case Failure(e) =>
context.log.error(s"Could not restore from a checkpoint with an invalid shortChannelId: $d, $e")
Expand All @@ -130,7 +130,7 @@ object SwapMaker {
}
}

private class SwapMaker(remoteNodeId: PublicKey, shortChannelId: ShortChannelId, nodeParams: NodeParams, watcher: ActorRef[ZmqWatcher.Command], switchboard: actor.ActorRef, wallet: OnChainWallet, keyManager: SwapKeyManager, db: SwapsDb, implicit val context: ActorContext[SwapCommands.SwapCommand]) {
private class SwapMaker(remoteNodeId: PublicKey, shortChannelId: ShortChannelId, nodeParams: NodeParams, watcher: ActorRef[ZmqWatcher.Command], remotePeer: actor.ActorRef, wallet: OnChainWallet, keyManager: SwapKeyManager, db: SwapsDb, implicit val context: ActorContext[SwapCommands.SwapCommand]) {
private val protocolVersion = 3
private val noAsset = ""
private implicit val feeRatePerKw: FeeratePerKw = nodeParams.onChainFeeConf.feeEstimator.getFeeratePerKw(target = nodeParams.onChainFeeConf.feeTargets.fundingBlockTarget)
Expand Down Expand Up @@ -167,7 +167,7 @@ private class SwapMaker(remoteNodeId: PublicKey, shortChannelId: ShortChannelId,

private def awaitFeePayment(request: SwapOutRequest, agreement: SwapOutAgreement, invoice: Bolt11Invoice): Behavior[SwapCommand] = {
watchForPaymentReceived(watch = true)
send(switchboard, remoteNodeId)(agreement)
send(remotePeer)(agreement)
Behaviors.withTimers { timers =>
timers.startSingleTimer(swapFeeExpiredTimer(request.swapId), InvoiceExpired, invoice.createdAt + invoice.relativeExpiry.toSeconds - TimestampSecond.now())
receiveSwapMessage[AwaitFeePaymentMessages](context, "awaitFeePayment") {
Expand All @@ -189,7 +189,7 @@ private class SwapMaker(remoteNodeId: PublicKey, shortChannelId: ShortChannelId,
}

private def awaitAgreement(request: SwapInRequest): Behavior[SwapCommand] = {
send(switchboard, remoteNodeId)(request)
send(remotePeer)(request)
receiveSwapMessage[AwaitAgreementMessages](context, "awaitAgreement") {
case SwapMessageReceived(agreement: SwapInAgreement) if agreement.protocolVersion != protocolVersion =>
swapCanceled(WrongVersion(request.swapId, protocolVersion))
Expand Down Expand Up @@ -242,7 +242,7 @@ private class SwapMaker(remoteNodeId: PublicKey, shortChannelId: ShortChannelId,
swapCompleted(ClaimByInvoicePaid(request.swapId))
case _ =>
watchForPaymentReceived(watch = true)
send(switchboard, remoteNodeId)(openingTxBroadcasted)
send(remotePeer)(openingTxBroadcasted)
Behaviors.withTimers { timers =>
timers.startSingleTimer(swapInvoiceExpiredTimer(request.swapId), InvoiceExpired, invoice.createdAt + invoice.relativeExpiry.toSeconds - TimestampSecond.now())
receiveSwapMessage[AwaitClaimPaymentMessages](context, "awaitClaimPayment") {
Expand Down Expand Up @@ -342,7 +342,7 @@ private class SwapMaker(remoteNodeId: PublicKey, shortChannelId: ShortChannelId,
private def swapCanceled(failure: Fail): Behavior[SwapCommand] = {
context.system.eventStream ! Publish(Canceled(failure.swapId, failure.toString))
context.log.error(s"canceled swap: $failure")
if (!failure.isInstanceOf[PeerCanceled]) send(switchboard, remoteNodeId)(CancelSwap(failure.swapId, failure.toString))
if (!failure.isInstanceOf[PeerCanceled]) send(remotePeer)(CancelSwap(failure.swapId, failure.toString))
Behaviors.stopped
}

Expand Down
Loading

0 comments on commit c9911e4

Please sign in to comment.