Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(engine)!: syncing published templates #1222

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
4d181b7
impl in progress
ksrichard Nov 29, 2024
a0c833d
impl in progress
ksrichard Nov 29, 2024
bbf020d
impl done
ksrichard Dec 2, 2024
1bea85d
fix test
ksrichard Dec 2, 2024
62e6f99
format
ksrichard Dec 2, 2024
e20f4bf
added new publish template substate
ksrichard Dec 3, 2024
5778757
impl in progress
ksrichard Dec 4, 2024
7f20941
impl in progress
ksrichard Dec 5, 2024
00c3ef0
Merge remote-tracking branch 'origin/development' into feature/publis…
ksrichard Dec 5, 2024
e0de43a
impl in progress
ksrichard Dec 5, 2024
fe9807b
impl in progress
ksrichard Dec 5, 2024
0450d49
updating templates db almost done
ksrichard Dec 5, 2024
7b3a6b4
Merge remote-tracking branch 'origin/development' into feature/publis…
ksrichard Dec 6, 2024
a7b90b5
impl in progress
ksrichard Dec 6, 2024
8e29347
renamed PublishedTemplate substate to Template and almost done adding…
ksrichard Dec 9, 2024
9cb0766
Template publishing implemented, new templates are represented as sub…
ksrichard Dec 10, 2024
94598d0
Merge remote-tracking branch 'origin/development' into feature/publis…
ksrichard Dec 10, 2024
fb1bf11
small update
ksrichard Dec 10, 2024
b752270
CI fixes
ksrichard Dec 11, 2024
29436dd
Merge remote-tracking branch 'origin/development' into feature/publis…
ksrichard Dec 11, 2024
cbaef1b
small fixes
ksrichard Dec 11, 2024
29f1fde
small fix
ksrichard Dec 11, 2024
63a531b
small fix
ksrichard Dec 11, 2024
40d9936
Merge remote-tracking branch 'origin/development' into feature/publis…
ksrichard Dec 11, 2024
1e949ed
small fixes
ksrichard Dec 11, 2024
5bb6223
added placeholder TODOs
ksrichard Dec 11, 2024
716d6aa
removed unused import
ksrichard Dec 11, 2024
221ca6c
clippy + bug fix + minor code cleanup + reduce mem usage
sdbondi Dec 12, 2024
890188f
fix unncessary heap allocations in engine
sdbondi Dec 12, 2024
83f117c
support for base layer registration
ksrichard Dec 12, 2024
d48d25f
Merge remote-tracking branch 'origin/feature/publish-template-as-subs…
ksrichard Dec 12, 2024
d8c90de
clippy fixes
ksrichard Dec 12, 2024
15a81a8
Merge remote-tracking branch 'origin/feature/publish-template-as-subs…
ksrichard Dec 12, 2024
6ac8a73
Merge branch 'development' into feature/publish-template-as-substate
ksrichard Dec 12, 2024
14d9f6a
impl in progress
ksrichard Dec 12, 2024
1ece847
small fix
ksrichard Dec 12, 2024
6dab228
Merge remote-tracking branch 'origin/feature/publish-template-as-subs…
ksrichard Dec 12, 2024
07f8895
fixed auto registration of local templates
ksrichard Dec 12, 2024
31a41d7
Merge remote-tracking branch 'origin/feature/publish-template-as-subs…
ksrichard Dec 12, 2024
3a029b7
Merge remote-tracking branch 'origin/development' into feature/publis…
ksrichard Dec 13, 2024
efe6ee3
impl in progress
ksrichard Dec 13, 2024
88cc8f0
impl in progress
ksrichard Dec 13, 2024
0d1274b
impl in progress
ksrichard Dec 16, 2024
bff83cc
impl in progress
ksrichard Dec 17, 2024
f5b4920
impl in progress
ksrichard Dec 18, 2024
11437e5
impl in progress
ksrichard Dec 18, 2024
584c069
impl almost done, needs extra error handling
ksrichard Dec 19, 2024
037035a
almost fully done syncing implementation, small changes left
ksrichard Dec 20, 2024
afdb18b
added todo
ksrichard Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions applications/tari_dan_app_utilities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ tari_bor = { workspace = true, default-features = true }
tari_indexer_lib = { workspace = true }
tari_networking = { workspace = true }
tari_validator_node_rpc = { workspace = true }
tari_dan_p2p = { workspace = true }
tari_rpc_framework = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,24 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_dan_common_types::NodeAddressable;
use tari_dan_common_types::{NodeAddressable, PeerAddress, TemplateSyncRequest};
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_shutdown::ShutdownSignal;
use tokio::{sync::mpsc, task::JoinHandle};
use tari_validator_node_rpc::client::TariValidatorNodeRpcClientFactory;
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
};

use super::{downloader::TemplateDownloadWorker, service::TemplateManagerService, TemplateManager};
use crate::template_manager::interface::TemplateManagerHandle;

pub fn spawn<TAddr: NodeAddressable + 'static>(
pub fn spawn<TAddr: NodeAddressable + 'static, S: tari_state_store_sqlite::SubstateStore + Send + Sync + 'static>(
manager: TemplateManager<TAddr>,
epoch_manager: EpochManagerHandle<PeerAddress>,
client_factory: TariValidatorNodeRpcClientFactory,
rx_template_sync: broadcast::Receiver<TemplateSyncRequest>,
state_store: S,
shutdown: ShutdownSignal,
) -> (TemplateManagerHandle, JoinHandle<anyhow::Result<()>>) {
let (tx_request, rx_request) = mpsc::channel(1);
Expand All @@ -37,8 +46,17 @@ pub fn spawn<TAddr: NodeAddressable + 'static>(
let (tx_download_queue, rx_download_queue) = mpsc::channel(1);
let (tx_completed_downloads, rx_completed_downloads) = mpsc::channel(1);

let join_handle =
TemplateManagerService::spawn(rx_request, manager, tx_download_queue, rx_completed_downloads, shutdown);
let join_handle = TemplateManagerService::spawn(
rx_request,
manager,
epoch_manager,
tx_download_queue,
rx_completed_downloads,
rx_template_sync,
client_factory,
state_store,
shutdown,
);
TemplateDownloadWorker::new(rx_download_queue, tx_completed_downloads).spawn();
(handle, join_handle)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,25 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{collections::HashMap, convert::TryFrom, fs, sync::Arc};
use std::{
collections::HashMap,
convert::TryFrom,
fs,
sync::Arc,
thread,
time::{SystemTime, UNIX_EPOCH},
};

use chrono::Utc;
use chrono::{Duration, Utc};
use log::*;
use tari_common_types::types::{FixedHash, PublicKey};
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_common_types::{optional::Optional, services::template_provider::TemplateProvider, NodeAddressable};
use tari_dan_common_types::{
optional::Optional,
services::template_provider::TemplateProvider,
NodeAddressable,
TemplateSyncRequest,
};
use tari_dan_engine::{
flow::FlowFactory,
function_definitions::FlowFunctionDefinition,
Expand Down Expand Up @@ -54,6 +66,12 @@ const LOG_TARGET: &str = "tari::validator_node::template_manager";

const CONCURRENT_ACCESS_LIMIT: isize = 100;

#[derive(Debug, Clone)]
pub enum TemplateResult {
Template(Box<Template>),
PendingTemplate,
}

#[derive(Debug)]
pub struct TemplateManager<TAddr> {
global_db: GlobalDb<SqliteGlobalDbAdapter<TAddr>>,
Expand Down Expand Up @@ -120,26 +138,31 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
name: name.to_string(),
address,
binary_sha,
author_public_key: Default::default(),
},
executable: TemplateExecutable::CompiledWasm(compiled_code),
}
}

pub fn template_exists(&self, address: &TemplateAddress) -> Result<bool, TemplateManagerError> {
pub fn template_exists(
&self,
address: &TemplateAddress,
status: TemplateStatus,
) -> Result<bool, TemplateManagerError> {
if self.builtin_templates.contains_key(address) {
return Ok(true);
}
let mut tx = self.global_db.create_transaction()?;
self.global_db
.templates(&mut tx)
.template_exists(address)
.template_exists(address, status)
.map_err(|_| TemplateManagerError::TemplateNotFound { address: *address })
}

pub fn fetch_template(&self, address: &TemplateAddress) -> Result<Template, TemplateManagerError> {
pub fn fetch_template(&self, address: &TemplateAddress) -> Result<TemplateResult, TemplateManagerError> {
// first of all, check if the address is for a bulitin template
if let Some(template) = self.builtin_templates.get(address) {
return Ok(template.to_owned());
return Ok(TemplateResult::Template(Box::new(template.to_owned())));
}

let mut tx = self.global_db.create_transaction()?;
Expand All @@ -149,6 +172,11 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
.get_template(address)?
.ok_or(TemplateManagerError::TemplateNotFound { address: *address })?;

// notify the caller that the template is under sync, so not yet ready
if matches!(template.status, TemplateStatus::Pending) {
return Ok(TemplateResult::PendingTemplate);
}

if !matches!(template.status, TemplateStatus::Active | TemplateStatus::Deprecated) {
return Err(TemplateManagerError::TemplateUnavailable);
}
Expand All @@ -167,9 +195,9 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
_ => return Err(TemplateManagerError::TemplateUnavailable),
}

Ok(result)
Ok(TemplateResult::Template(Box::new(result)))
} else {
Ok(template.into())
Ok(TemplateResult::Template(Box::new(template.into())))
}
}

Expand All @@ -185,6 +213,27 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
Ok(templates)
}

pub(super) fn add_pending_template(
&self,
template_address: tari_engine_types::TemplateAddress,
) -> Result<(), TemplateManagerError> {
let template = DbTemplate::empty_pending(template_address);

let mut tx = self.global_db.create_transaction()?;
let mut templates_db = self.global_db.templates(&mut tx);
match templates_db.get_template(&template.template_address)? {
Some(_) => templates_db.update_template(
&template.template_address,
DbTemplateUpdate::status(TemplateStatus::Pending),
)?,
None => templates_db.insert_template(template)?,
}

tx.commit()?;

Ok(())
}

pub(super) fn add_template(
&self,
author_public_key: PublicKey,
Expand Down Expand Up @@ -301,10 +350,34 @@ impl<TAddr: NodeAddressable + Send + Sync + 'static> TemplateProvider for Templa
return Ok(Some(template));
}

let Some(template) = self.fetch_template(address).optional()? else {
let Some(template_result) = self.fetch_template(address).optional()? else {
return Ok(None);
};

debug!(target: LOG_TARGET, "CACHE MISS: Template {}", address);

// getting template
let template = match template_result {
TemplateResult::Template(template) => template,
TemplateResult::PendingTemplate => {
let start = SystemTime::now();
let mut template = None;
while template.is_none() {
let elapsed = start.duration_since(UNIX_EPOCH).expect("Time went backwards");
if elapsed.gt(&self.config.pending_templates_wait_timeout()) {
break;
}
if let Some(TemplateResult::Template(fetched_template)) = self.fetch_template(address).optional()? {
template = Some(fetched_template);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break here?

// sleeping here to not overload the local database while waiting for the template to be ready
thread::sleep(std::time::Duration::from_millis(100));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not ideal. Will think about how we should handle pending templates in a transaction.

}
debug!(target: LOG_TARGET, "Failed to fetch template {} within {:?}", address, self.config.pending_templates_wait_timeout());
template.ok_or(Self::Error::TemplateUnavailable)?
},
};

let loaded = match template.executable {
TemplateExecutable::CompiledWasm(wasm) => {
let module = WasmModule::from_code(wasm);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ mod initializer;
pub use initializer::spawn;

mod manager;
pub use manager::TemplateManager;
pub use manager::{TemplateManager, TemplateResult};

mod service;

mod cmap_semaphore;
Expand Down
Loading
Loading