Skip to content

Commit

Permalink
Secondary mechanism to trigger watches for transactions from past blo…
Browse files Browse the repository at this point in the history
…cks (#3002)

When a new block is found, we want to check its confirmed transactions
to potentially trigger watches. This is especially important when a
channel is spent and we haven't seen the spending transaction in our
mempool before receiving it in a block. This is already supposed to be
handled through the ZMQ `rawtx` topic, where bitcoind should send us
every transaction it receives (either in the mempool or in a block).
But when using remote `bitcoind` instances, ZMQ seems to sometimes be
unreliable and silently drop some events. That's why we add another
mechanism for extra safety, where whenever a new block is found, we
fetch the last `N` blocks and re-process their transactions. We keep
a cache of the processed blocks to ensure that we don't needlessly
re-process them multiple times.
  • Loading branch information
t-bast authored Feb 11, 2025
1 parent 23c139c commit bc44808
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 15 deletions.
4 changes: 4 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ eclair {
// may scan the entire blockchain (which is very costly). It doesn't make sense to scan too far in the past, as an
// attacker will already have swept the funds if we didn't detect a channel close that happened a long time ago.
max-channel-spent-rescan-blocks = 720
// When a new block is found, we will analyze its transactions to see if some of our channels were spent.
// We also scan previous blocks if we missed them, which may happen during reorgs: this parameter should be larger
// than the longest reorg expected.
scan-previous-blocks-depth = 6

// The default strategy, when we encounter an unhandled exception or internal error, is to locally force-close the
// channel. Not only is there a delay before the channel balance gets refunded, but if the exception was due to some
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ object NodeParams extends Logging {
maxRestartWatchDelay = FiniteDuration(config.getDuration("channel.max-restart-watch-delay").getSeconds, TimeUnit.SECONDS),
maxBlockProcessingDelay = FiniteDuration(config.getDuration("channel.max-block-processing-delay").getSeconds, TimeUnit.SECONDS),
maxTxPublishRetryDelay = FiniteDuration(config.getDuration("channel.max-tx-publish-retry-delay").getSeconds, TimeUnit.SECONDS),
scanPreviousBlocksDepth = config.getInt("channel.scan-previous-blocks-depth"),
maxChannelSpentRescanBlocks = config.getInt("channel.max-channel-spent-rescan-blocks"),
unhandledExceptionStrategy = unhandledExceptionStrategy,
revocationTimeout = FiniteDuration(config.getDuration("channel.revocation-timeout").getSeconds, TimeUnit.SECONDS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package fr.acinq.eclair.blockchain.bitcoind
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import fr.acinq.bitcoin.Block
import fr.acinq.bitcoin.scalacompat._
import fr.acinq.eclair.blockchain.Monitoring.Metrics
import fr.acinq.eclair.blockchain._
Expand All @@ -30,7 +31,7 @@ import fr.acinq.eclair.{BlockHeight, KamonExt, NodeParams, RealShortChannelId, T
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.util.{Failure, Random, Success}

/**
* Created by PM on 21/02/2016.
Expand Down Expand Up @@ -59,10 +60,15 @@ object ZmqWatcher {
private case object TickNewBlock extends Command
private case object TickBlockTimeout extends Command
private case class GetBlockCountFailed(t: Throwable) extends Command
private case class GetBlockIdFailed(blockHeight: BlockHeight, t: Throwable) extends Command
private case class GetBlockFailed(blockId: BlockId, t: Throwable) extends Command
private case class CheckBlockHeight(current: BlockHeight) extends Command
private case class PublishBlockHeight(current: BlockHeight) extends Command
private case class ProcessNewBlock(blockId: BlockId) extends Command
private case class ProcessNewTransaction(tx: Transaction) extends Command
private case class AnalyzeLastBlock(remaining: Int) extends Command
private case class AnalyzeBlockId(blockId: BlockId, remaining: Int) extends Command
private case class AnalyzeBlock(block: Block, remaining: Int) extends Command
private case class SetWatchHint(w: GenericWatch, hint: WatchHint) extends Command

final case class ValidateRequest(replyTo: ActorRef[ValidateResult], ann: ChannelAnnouncement) extends Command
Expand Down Expand Up @@ -171,7 +177,7 @@ object ZmqWatcher {
timers.startSingleTimer(TickNewBlock, 1 second)
// we start a timer in case we don't receive ZMQ block events
timers.startSingleTimer(TickBlockTimeout, blockTimeout)
new ZmqWatcher(nodeParams, blockCount, client, context, timers).watching(Map.empty[GenericWatch, Option[WatchHint]], Map.empty[OutPoint, Set[GenericWatch]])
new ZmqWatcher(nodeParams, blockCount, client, context, timers).watching(Map.empty[GenericWatch, Option[WatchHint]], Map.empty[OutPoint, Set[GenericWatch]], Nil)
}
}

Expand Down Expand Up @@ -216,7 +222,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client

private val watchdog = context.spawn(Behaviors.supervise(BlockchainWatchdog(nodeParams, 150 seconds)).onFailure(SupervisorStrategy.resume), "blockchain-watchdog")

private def watching(watches: Map[GenericWatch, Option[WatchHint]], watchedUtxos: Map[OutPoint, Set[GenericWatch]]): Behavior[Command] = {
private def watching(watches: Map[GenericWatch, Option[WatchHint]], watchedUtxos: Map[OutPoint, Set[GenericWatch]], analyzedBlocks: Seq[BlockId]): Behavior[Command] = {
Behaviors.receiveMessage {
case ProcessNewTransaction(tx) =>
log.debug("analyzing txid={} tx={}", tx.txid, tx)
Expand Down Expand Up @@ -245,6 +251,35 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
timers.startSingleTimer(TickNewBlock, 2 seconds)
Behaviors.same

case AnalyzeLastBlock(remaining) =>
val currentBlockHeight = blockHeight.get().toInt
context.pipeToSelf(client.getBlockId(currentBlockHeight)) {
case Failure(f) => GetBlockIdFailed(BlockHeight(currentBlockHeight), f)
case Success(blockId) => AnalyzeBlockId(blockId, remaining)
}
Behaviors.same

case AnalyzeBlockId(blockId, remaining) =>
if (analyzedBlocks.contains(blockId)) {
log.debug("blockId={} has already been analyzed, we can skip it", blockId)
} else if (remaining > 0) {
context.pipeToSelf(client.getBlock(blockId)) {
case Failure(f) => GetBlockFailed(blockId, f)
case Success(block) => AnalyzeBlock(block, remaining)
}
}
Behaviors.same

case AnalyzeBlock(block, remaining) =>
// We analyze every transaction in that block to see if one of our watches is triggered.
block.tx.forEach(tx => context.self ! ProcessNewTransaction(KotlinUtils.kmp2scala(tx)))
// We keep analyzing previous blocks in this chain.
context.self ! AnalyzeBlockId(BlockId(KotlinUtils.kmp2scala(block.header.hashPreviousBlock)), remaining - 1)
// We update our list of analyzed blocks, while ensuring that it doesn't grow unbounded.
val maxCacheSize = nodeParams.channelConf.scanPreviousBlocksDepth * 3
val analyzedBlocks1 = (KotlinUtils.kmp2scala(block.blockId) +: analyzedBlocks).take(maxCacheSize)
watching(watches, watchedUtxos, analyzedBlocks1)

case TickBlockTimeout =>
// we haven't received a block in a while, we check whether we're behind and restart the timer.
timers.startSingleTimer(TickBlockTimeout, blockTimeout)
Expand All @@ -258,6 +293,15 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
log.error("could not get block count from bitcoind", t)
Behaviors.same

case GetBlockIdFailed(blockHeight, t) =>
log.error(s"cannot get blockId for blockHeight=$blockHeight", t)
Behaviors.same

case GetBlockFailed(blockId, t) =>
// Note that this may happen if there is a reorg while we're analyzing the pre-reorg chain.
log.warn("cannot get block for blockId={}, a reorg may have happened: {}", blockId, t.getMessage)
Behaviors.same

case CheckBlockHeight(height) =>
val current = blockHeight.get()
if (height.toLong > current) {
Expand Down Expand Up @@ -288,14 +332,15 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
}
})
}
timers.startSingleTimer(AnalyzeLastBlock(nodeParams.channelConf.scanPreviousBlocksDepth), Random.nextLong(nodeParams.channelConf.maxBlockProcessingDelay.toMillis + 1).milliseconds)
Behaviors.same

case SetWatchHint(w, hint) =>
val watches1 = watches.get(w) match {
case Some(_) => watches + (w -> Some(hint))
case None => watches
}
watching(watches1, watchedUtxos)
watching(watches1, watchedUtxos, analyzedBlocks)

case TriggerEvent(replyTo, watch, event) =>
if (watches.contains(watch)) {
Expand All @@ -307,7 +352,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
// They are never cleaned up but it is not a big deal for now (1 channel == 1 watch)
Behaviors.same
case _ =>
watching(watches - watch, removeWatchedUtxos(watchedUtxos, watch))
watching(watches - watch, removeWatchedUtxos(watchedUtxos, watch), analyzedBlocks)
}
} else {
Behaviors.same
Expand All @@ -333,7 +378,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
case Keep =>
log.debug("adding watch {}", w)
context.watchWith(w.replyTo, StopWatching(w.replyTo))
watching(watches + (w -> None), addWatchedUtxos(watchedUtxos, w))
watching(watches + (w -> None), addWatchedUtxos(watchedUtxos, w), analyzedBlocks)
case Ignore =>
Behaviors.same
}
Expand All @@ -342,20 +387,20 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
// We remove watches associated to dead actors.
val deprecatedWatches = watches.keySet.filter(_.replyTo == origin)
val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) }
watching(watches -- deprecatedWatches, watchedUtxos1)
watching(watches -- deprecatedWatches, watchedUtxos1, analyzedBlocks)

case UnwatchTxConfirmed(txId) =>
// We remove watches that match the given txId.
val deprecatedWatches = watches.keySet.filter {
case w: WatchConfirmed[_] => w.txId == txId
case _ => false
}
watching(watches -- deprecatedWatches, watchedUtxos)
watching(watches -- deprecatedWatches, watchedUtxos, analyzedBlocks)

case UnwatchExternalChannelSpent(txId, outputIndex) =>
val deprecatedWatches = watches.keySet.collect { case w: WatchExternalChannelSpent if w.txId == txId && w.outputIndex == outputIndex => w }
val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) }
watching(watches -- deprecatedWatches, watchedUtxos1)
watching(watches -- deprecatedWatches, watchedUtxos1, analyzedBlocks)

case ValidateRequest(replyTo, ann) =>
client.validate(ann).map(replyTo ! _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ class BitcoinCoreClient(val rpcClient: BitcoinJsonRPCClient, val lockUtxos: Bool
// NB: bitcoind confusingly returns the blockId instead of the blockHash.
case None => rpcClient.invoke("getbestblockhash").collect { case JString(blockId) => BlockId(ByteVector32.fromValidHex(blockId)) }
}
// with a verbosity of 0, getblock returns the raw serialized block
block <- rpcClient.invoke("getblock", blockId, 0).collect { case JString(b) => Block.read(b) }
block <- getBlock(blockId)
res <- block.tx.asScala.find(tx => tx.txIn.asScala.exists(i => i.outPoint.txid == KotlinUtils.scala2kmp(txid) && i.outPoint.index == outputIndex)) match {
case Some(tx) => Future.successful(KotlinUtils.kmp2scala(tx))
case None if limit > 0 => lookForSpendingTx(Some(KotlinUtils.kmp2scala(block.header.hashPreviousBlock)), txid, outputIndex, limit - 1)
Expand Down Expand Up @@ -669,10 +668,26 @@ class BitcoinCoreClient(val rpcClient: BitcoinJsonRPCClient, val lockUtxos: Bool
case JInt(count) => BlockHeight(count.toLong)
}

def getBlockId(height: Int)(implicit ec: ExecutionContext): Future[BlockId] = {
rpcClient.invoke("getblockhash", height).collect {
// Even though the RPC mentions a block_hash, it is returned encoded as a block_id.
case JString(blockId) => BlockId(ByteVector32.fromValidHex(blockId))
}
}

def getBlock(blockId: BlockId)(implicit ec: ExecutionContext): Future[Block] = {
rpcClient.invoke("getblock", blockId.toString(), 0)
.collect { case JString(raw) => Block.read(raw) }
.flatMap {
case block if KotlinUtils.kmp2scala(block.blockId) != blockId => Future.failed(new IllegalArgumentException(s"invalid block returned by bitcoind: we requested $blockId, we got ${block.blockId}"))
case block => Future.successful(block)
}
}

def validate(c: ChannelAnnouncement)(implicit ec: ExecutionContext): Future[ValidateResult] = {
val TxCoordinates(blockHeight, txIndex, outputIndex) = coordinates(c.shortChannelId)
for {
blockId <- rpcClient.invoke("getblockhash", blockHeight.toInt).map(_.extractOpt[String].map(b => BlockId(ByteVector32.fromValidHex(b))).getOrElse(BlockId(ByteVector32.Zeroes)))
blockId <- getBlockId(blockHeight.toInt)
txid <- rpcClient.invoke("getblock", blockId).map(json => Try {
val JArray(txs) = json \ "tx"
TxId.fromValidHex(txs(txIndex).extract[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ object Channel {
maxRestartWatchDelay: FiniteDuration,
maxBlockProcessingDelay: FiniteDuration,
maxTxPublishRetryDelay: FiniteDuration,
scanPreviousBlocksDepth: Int,
maxChannelSpentRescanBlocks: Int,
unhandledExceptionStrategy: UnhandledExceptionStrategy,
revocationTimeout: FiniteDuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ object TestConstants {
maxRestartWatchDelay = 0 millis,
maxBlockProcessingDelay = 10 millis,
maxTxPublishRetryDelay = 10 millis,
scanPreviousBlocksDepth = 3,
maxChannelSpentRescanBlocks = 144,
htlcMinimum = 0 msat,
minDepthFunding = 6,
Expand Down Expand Up @@ -309,6 +310,7 @@ object TestConstants {
maxRestartWatchDelay = 5 millis,
maxBlockProcessingDelay = 10 millis,
maxTxPublishRetryDelay = 10 millis,
scanPreviousBlocksDepth = 3,
maxChannelSpentRescanBlocks = 144,
htlcMinimum = 1000 msat,
minDepthFunding = 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import akka.testkit.TestProbe
import fr.acinq.bitcoin
import fr.acinq.bitcoin.psbt.{Psbt, UpdateFailure}
import fr.acinq.bitcoin.scalacompat.Crypto.{PublicKey, der2compact}
import fr.acinq.bitcoin.scalacompat.{Block, Btc, BtcDouble, Crypto, DeterministicWallet, MilliBtcDouble, MnemonicCode, OP_DROP, OP_PUSHDATA, OutPoint, Satoshi, SatoshiLong, Script, ScriptWitness, Transaction, TxId, TxIn, TxOut, addressFromPublicKeyScript, addressToPublicKeyScript, computeBIP84Address, computeP2PkhAddress, computeP2WpkhAddress}
import fr.acinq.bitcoin.scalacompat.{Block, BlockId, Btc, BtcDouble, Crypto, DeterministicWallet, KotlinUtils, MilliBtcDouble, MnemonicCode, OP_DROP, OP_PUSHDATA, OutPoint, Satoshi, SatoshiLong, Script, ScriptWitness, Transaction, TxId, TxIn, TxOut, addressFromPublicKeyScript, addressToPublicKeyScript, computeBIP84Address, computeP2PkhAddress, computeP2WpkhAddress}
import fr.acinq.bitcoin.{Bech32, SigHash, SigVersion}
import fr.acinq.eclair.TestUtils.randomTxId
import fr.acinq.eclair.blockchain.OnChainWallet.{FundTransactionResponse, MakeFundingTxResponse, OnChainBalance, ProcessPsbtResponse}
Expand Down Expand Up @@ -1022,6 +1022,29 @@ class BitcoinCoreClientSpec extends TestKitBaseClass with BitcoindService with A
assert(mempoolTx3.ancestorFees == mempoolTx1.fees + 12500.sat)
}

test("get blocks") {
val sender = TestProbe()
val address = getNewAddress(sender)
val bitcoinClient = makeBitcoinCoreClient()

val tx1 = sendToAddress(address, 200_000 sat)
generateBlocks(1)
val tx2 = sendToAddress(address, 150_000 sat)
generateBlocks(1)

val currentHeight = currentBlockHeight(sender)
bitcoinClient.getBlockId(currentHeight.toInt).pipeTo(sender.ref)
val lastBlockId = sender.expectMsgType[BlockId]
bitcoinClient.getBlock(lastBlockId).pipeTo(sender.ref)
val lastBlock = sender.expectMsgType[fr.acinq.bitcoin.Block]
assert(lastBlock.tx.contains(KotlinUtils.scala2kmp(tx2)))

val previousBlockId = BlockId(KotlinUtils.kmp2scala(lastBlock.header.hashPreviousBlock))
bitcoinClient.getBlock(previousBlockId).pipeTo(sender.ref)
val previousBlock = sender.expectMsgType[fr.acinq.bitcoin.Block]
assert(previousBlock.tx.contains(KotlinUtils.scala2kmp(tx1)))
}

test("abandon transaction") {
val sender = TestProbe()
val bitcoinClient = makeBitcoinCoreClient()
Expand Down
Loading

0 comments on commit bc44808

Please sign in to comment.