Skip to content

Commit

Permalink
Merge branch 'development' of https://github.com/tari-project/tari-dan
Browse files Browse the repository at this point in the history
…into rocksdb
  • Loading branch information
mrnaveira committed Nov 8, 2024
2 parents 0aaac73 + a148dcb commit 489029a
Show file tree
Hide file tree
Showing 87 changed files with 1,627 additions and 1,066 deletions.
56 changes: 17 additions & 39 deletions applications/tari_indexer/src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,16 @@ use anyhow::anyhow;
use futures::StreamExt;
use log::*;
use tari_bor::decode;
use tari_common::configuration::Network;
use tari_consensus::consensus_constants::ConsensusConstants;
use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::message_format::MessageFormat};
use tari_dan_common_types::{committee::Committee, Epoch, NumPreshards, PeerAddress, ShardGroup};
use tari_crypto::tari_utilities::message_format::MessageFormat;
use tari_dan_common_types::{committee::Committee, Epoch, PeerAddress, ShardGroup};
use tari_dan_p2p::proto::rpc::{GetTransactionResultRequest, PayloadResultStatus, SyncBlocksRequest};
use tari_dan_storage::consensus_models::{Block, BlockError, BlockId, Decision, TransactionRecord};
use tari_dan_storage::consensus_models::{Block, BlockId, Decision, TransactionRecord};
use tari_engine_types::{
commit_result::{ExecuteResult, TransactionResult},
events::Event,
substate::{Substate, SubstateId, SubstateValue},
};
use tari_epoch_manager::EpochManagerReader;
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};
use tari_template_lib::models::{EntityId, TemplateAddress};
use tari_transaction::{Transaction, TransactionId};
use tari_validator_node_rpc::client::{TariValidatorNodeRpcClientFactory, ValidatorNodeClientFactory};
Expand Down Expand Up @@ -96,33 +94,24 @@ struct TransactionMetadata {
}

pub struct EventScanner {
network: Network,
sidechain_id: Option<RistrettoPublicKey>,
epoch_manager: Box<dyn EpochManagerReader<Addr = PeerAddress>>,
epoch_manager: EpochManagerHandle<PeerAddress>,
client_factory: TariValidatorNodeRpcClientFactory,
substate_store: SqliteSubstateStore,
event_filters: Vec<EventFilter>,
consensus_constants: ConsensusConstants,
}

impl EventScanner {
pub fn new(
network: Network,
sidechain_id: Option<RistrettoPublicKey>,
epoch_manager: Box<dyn EpochManagerReader<Addr = PeerAddress>>,
epoch_manager: EpochManagerHandle<PeerAddress>,
client_factory: TariValidatorNodeRpcClientFactory,
substate_store: SqliteSubstateStore,
event_filters: Vec<EventFilter>,
consensus_constants: ConsensusConstants,
) -> Self {
Self {
network,
sidechain_id,
epoch_manager,
client_factory,
substate_store,
event_filters,
consensus_constants,
}
}

Expand Down Expand Up @@ -151,7 +140,7 @@ impl EventScanner {
},
None => {
// by default we start scanning since the current epoch
// TODO: it would be nice a new parameter in the indexer to spcify a custom starting epoch
// TODO: it would be nice a new parameter in the indexer to specify a custom starting epoch
event_count += self.scan_events_of_epoch(newest_epoch).await?;
},
}
Expand Down Expand Up @@ -454,12 +443,6 @@ impl EventScanner {
.collect()
}

fn build_genesis_block_id(&self, num_preshards: NumPreshards) -> Result<BlockId, BlockError> {
// TODO: this should return the actual genesis for the shard group and epoch
let start_block = Block::zero_block(self.network, num_preshards, self.sidechain_id.clone())?;
Ok(*start_block.id())
}

async fn get_oldest_scanned_epoch(&self) -> Result<Option<Epoch>, anyhow::Error> {
self.substate_store
.with_read_tx(|tx| tx.get_oldest_scanned_epoch())
Expand All @@ -473,23 +456,18 @@ impl EventScanner {
committee: &mut Committee<PeerAddress>,
epoch: Epoch,
) -> Result<Vec<Block>, anyhow::Error> {
// We start scanning from the last scanned block for this commitee
// We start scanning from the last scanned block for this committee
let start_block_id = self
.substate_store
.with_read_tx(|tx| tx.get_last_scanned_block_id(epoch, shard_group))?;

let start_block_id = match start_block_id {
Some(block_id) => block_id,
None => self.build_genesis_block_id(self.consensus_constants.num_preshards)?,
};

committee.shuffle();
let mut last_block_id = start_block_id;

info!(
target: LOG_TARGET,
"Scanning new blocks since {} from (epoch={}, shard={})",
last_block_id,
"Scanning new blocks from (start_id={}, epoch={}, shard={})",
last_block_id.map(|id| id.to_string()).unwrap_or_else(|| "None".to_string()),
epoch,
shard_group
);
Expand All @@ -502,7 +480,7 @@ impl EventScanner {
epoch,
shard_group
);
let resp = self.get_blocks_from_vn(member, start_block_id, Some(epoch)).await;
let resp = self.get_blocks_from_vn(member, last_block_id, epoch).await;

match resp {
Ok(blocks) => {
Expand All @@ -520,9 +498,9 @@ impl EventScanner {
let last_block = blocks.iter().max_by_key(|b| (b.epoch(), b.height()));

if let Some(block) = last_block {
last_block_id = *block.id();
last_block_id = Some(*block.id());
// Store the latest scanned block id in the database for future scans
self.save_scanned_block_id(epoch, shard_group, last_block_id)?;
self.save_scanned_block_id(epoch, shard_group, *block.id())?;
}
return Ok(blocks);
},
Expand Down Expand Up @@ -578,8 +556,8 @@ impl EventScanner {
async fn get_blocks_from_vn(
&self,
vn_addr: &PeerAddress,
start_block_id: BlockId,
up_to_epoch: Option<Epoch>,
start_block_id: Option<BlockId>,
up_to_epoch: Epoch,
) -> Result<Vec<Block>, anyhow::Error> {
let mut blocks = vec![];

Expand All @@ -588,8 +566,8 @@ impl EventScanner {

let mut stream = client
.sync_blocks(SyncBlocksRequest {
start_block_id: start_block_id.as_bytes().to_vec(),
up_to_epoch: up_to_epoch.map(|epoch| epoch.into()),
start_block_id: start_block_id.map(|id| id.as_bytes().to_vec()).unwrap_or_default(),
epoch: Some(up_to_epoch.into()),
})
.await?;
while let Some(resp) = stream.next().await {
Expand Down
30 changes: 10 additions & 20 deletions applications/tari_indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,7 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow
)
.await?;

let mut epoch_manager_events = services.epoch_manager.subscribe().await.map_err(|e| {
ExitError::new(
ExitCode::ConfigError,
format!("Epoch manager crashed on startup: {}", e),
)
})?;
let mut epoch_manager_events = services.epoch_manager.subscribe();

let substate_cache_dir = config.common.base_path.join("substate_cache");
let substate_cache = SubstateFileCache::new(substate_cache_dir)
Expand Down Expand Up @@ -177,16 +172,12 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow
.map(TryInto::try_into)
.collect::<Result<_, _>>()
.map_err(|e| ExitError::new(ExitCode::ConfigError, format!("Invalid event filters: {}", e)))?;
let consensus_constants = ConsensusConstants::from(config.network);
let event_scanner = Arc::new(EventScanner::new(
config.network,
config.indexer.sidechain_id,
Box::new(services.epoch_manager.clone()),
let event_scanner = EventScanner::new(
services.epoch_manager.clone(),
services.validator_node_client_factory.clone(),
services.substate_store.clone(),
event_filters,
consensus_constants,
));
);

// Run the GraphQL API
let graphql_address = config.indexer.graphql_address;
Expand Down Expand Up @@ -229,13 +220,12 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow
}

async fn handle_epoch_manager_event(services: &Services, event: EpochManagerEvent) -> Result<(), anyhow::Error> {
if let EpochManagerEvent::EpochChanged(epoch) = event {
let all_vns = services.epoch_manager.get_all_validator_nodes(epoch).await?;
services
.networking
.set_want_peers(all_vns.into_iter().map(|vn| vn.address.as_peer_id()))
.await?;
}
let EpochManagerEvent::EpochChanged { epoch, .. } = event;
let all_vns = services.epoch_manager.get_all_validator_nodes(epoch).await?;
services
.networking
.set_want_peers(all_vns.into_iter().map(|vn| vn.address.as_peer_id()))
.await?;

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> {
use crate::substate_storage_sqlite::schema::scanned_block_ids;

diesel::delete(scanned_block_ids::table)
.filter(scanned_block_ids::epoch.lt(epoch.0 as i64))
.filter(scanned_block_ids::epoch.lt(epoch.as_u64() as i64))
.execute(&mut *self.connection())
.map_err(|e| StorageError::QueryError {
reason: format!("delete_scanned_epochs_older_than: {}", e),
Expand Down
88 changes: 59 additions & 29 deletions applications/tari_swarm_daemon/src/process_manager/manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr, time::Duration};
use std::{
collections::{HashMap, HashSet},
fs::File,
path::PathBuf,
str::FromStr,
time::Duration,
};

use anyhow::{anyhow, Context};
use log::info;
Expand Down Expand Up @@ -70,43 +76,57 @@ impl ProcessManager {
sleep(Duration::from_secs(self.instance_manager.num_instances() as u64)).await;
self.check_instances_running()?;

if !self.skip_registration {
let num_vns = self.instance_manager.num_validator_nodes();
// Mine some initial funds, guessing 10 blocks to allow for coinbase maturity
self.mine(num_vns + 10).await.context("mining failed")?;
self.wait_for_wallet_funds(num_vns)
.await
.context("waiting for wallet funds")?;

self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
}

let mut templates_to_register = vec![];
if !self.disable_template_auto_register {
let registered_templates = self.registered_templates().await?;
let registered_template_names: Vec<String> = registered_templates
let registered_template_names = registered_templates
.iter()
.map(|template_data| format!("{}-{}", template_data.name, template_data.version))
.collect();
.map(|template_data| template_data.name.as_str())
.collect::<HashSet<_>>();
let fs_templates = self.file_system_templates().await?;
for template_data in fs_templates.iter().filter(|fs_template_data| {
!registered_template_names.contains(&format!("{}-{}", fs_template_data.name, fs_template_data.version))
}) {
for template_data in fs_templates
.iter()
.filter(|fs_template_data| !registered_template_names.contains(fs_template_data.name.as_str()))
{
info!(
"🟡 Register missing template from local file system: {}",
template_data.name
);
self.register_template(TemplateData {
templates_to_register.push(TemplateData {
name: template_data.name.clone(),
version: template_data.version,
contents_hash: template_data.contents_hash,
contents_url: template_data.contents_url.clone(),
})
.await?;
});
}
}

let num_vns = if self.skip_registration {
0
} else {
self.instance_manager.num_validator_nodes()
};
let num_blocks = num_vns + u64::try_from(templates_to_register.len()).unwrap();

// Mine some initial funds, guessing 10 blocks extra to allow for coinbase maturity
self.mine(num_blocks + 10).await.context("initial mining failed")?;
self.wait_for_wallet_funds(num_blocks)
.await
.context("waiting for wallet funds")?;

if !self.skip_registration {
self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
}
for templates in templates_to_register {
self.register_template(templates).await?;
}

if num_blocks > 0 {
self.mine(20).await?;
}

Ok(())
}

Expand Down Expand Up @@ -310,9 +330,15 @@ impl ProcessManager {
}
},
RegisterTemplate { data, reply } => {
let result = self.register_template(data).await;
if reply.send(result).is_err() {
log::warn!("Request cancelled before response could be sent")
if let Err(err) = self.register_template(data).await {
if reply.send(Err(err)).is_err() {
log::warn!("Request cancelled before response could be sent")
}
} else {
let result = self.mine(10).await;
if reply.send(result).is_err() {
log::warn!("Request cancelled before response could be sent")
}
}
},
RegisterValidatorNode { instance_id, reply } => {
Expand Down Expand Up @@ -421,7 +447,6 @@ impl ProcessManager {
// inputs for a transaction.
sleep(Duration::from_secs(2)).await;
}
self.mine(20).await?;
Ok(())
}

Expand Down Expand Up @@ -462,6 +487,9 @@ impl ProcessManager {
}

async fn mine(&mut self, blocks: u64) -> anyhow::Result<()> {
if blocks == 0 {
return Ok(());
}
let executable = self
.executable_manager
.get_executable(InstanceType::MinoTariMiner)
Expand Down Expand Up @@ -510,13 +538,15 @@ impl ProcessManager {
.await?
.into_inner();
let template_address = TemplateAddress::try_from_vec(resp.template_address).unwrap();
info!("🟢 Registered template {template_address}. Mining some blocks");
self.mine(10).await?;
info!("🟢 Registered template {template_address}.");

Ok(())
}

async fn wait_for_wallet_funds(&mut self, min_expected_blocks: u64) -> anyhow::Result<()> {
if min_expected_blocks == 0 {
return Ok(());
}
// WARN: Assumes one wallet
let wallet = self.instance_manager.minotari_wallets().next().ok_or_else(|| {
anyhow!("No MinoTariConsoleWallet instances found. Please start a wallet before waiting for funds")
Expand Down
6 changes: 3 additions & 3 deletions applications/tari_swarm_daemon/webui/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 489029a

Please sign in to comment.