diff --git a/Cargo.lock b/Cargo.lock
index 8845f437af54..0c6af8c94ce8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7595,6 +7595,7 @@ version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
+ "bp-millau",
"bp-rialto",
"futures 0.3.6",
"headers-relay",
@@ -7606,6 +7607,7 @@ dependencies = [
"relay-rialto-client",
"relay-substrate-client",
"relay-utils",
+ "sp-core",
"sp-runtime",
"structopt",
]
diff --git a/bin/rialto/runtime/src/lib.rs b/bin/rialto/runtime/src/lib.rs
index 77a23cbf8f4a..cdb09c6cb009 100644
--- a/bin/rialto/runtime/src/lib.rs
+++ b/bin/rialto/runtime/src/lib.rs
@@ -62,6 +62,7 @@ pub use frame_support::{
pub use pallet_balances::Call as BalancesCall;
pub use pallet_bridge_currency_exchange::Call as BridgeCurrencyExchangeCall;
pub use pallet_bridge_eth_poa::Call as BridgeEthPoACall;
+pub use pallet_substrate_bridge::Call as BridgeMillauCall;
pub use pallet_timestamp::Call as TimestampCall;
#[cfg(any(feature = "std", test))]
diff --git a/bin/rialto/runtime/src/millau.rs b/bin/rialto/runtime/src/millau.rs
index 47568f41742a..57ca750e2406 100644
--- a/bin/rialto/runtime/src/millau.rs
+++ b/bin/rialto/runtime/src/millau.rs
@@ -36,7 +36,7 @@ pub fn initial_header() -> Header {
Header {
parent_hash: Default::default(),
number: Default::default(),
- state_root: hex!("bb65e8ba99408ebfefea9d28f74403d41da6858fa075c51fcc71dc383455c530").into(),
+ state_root: hex!("e901070e3bb061a6ae9ea8e4ba5417bf4c4642f9e75af9d372861c170ba7a9a3").into(),
extrinsics_root: hex!("03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314").into(),
digest: Default::default(),
}
diff --git a/primitives/millau/src/lib.rs b/primitives/millau/src/lib.rs
index 99bb0207f50d..844e4667c603 100644
--- a/primitives/millau/src/lib.rs
+++ b/primitives/millau/src/lib.rs
@@ -49,6 +49,13 @@ impl Chain for Millau {
type Header = Header;
}
+/// Name of the `MillauHeaderApi::best_block` runtime method.
+pub const BEST_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_best_block";
+/// Name of the `MillauHeaderApi::is_known_block` runtime method.
+pub const IS_KNOWN_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_is_known_block";
+/// Name of the `MillauHeaderApi::incomplete_headers` runtime method.
+pub const INCOMPLETE_MILLAU_HEADERS_METHOD: &str = "MillauHeaderApi_incomplete_headers";
+
sp_api::decl_runtime_apis! {
/// API for querying information about Millau headers from the Bridge Pallet instance.
///
diff --git a/relays/headers-relay/src/headers.rs b/relays/headers-relay/src/headers.rs
index 5800d370fba0..b8c51eec1b49 100644
--- a/relays/headers-relay/src/headers.rs
+++ b/relays/headers-relay/src/headers.rs
@@ -32,6 +32,8 @@ use std::{
type HeadersQueue
=
BTreeMap<
::Number, HashMap<
::Hash, QueuedHeader
>>;
+type SyncedChildren
=
+ BTreeMap<
::Number, HashMap<
::Hash, HashSet>>>;
type KnownHeaders =
BTreeMap<
::Number, HashMap<
::Hash, HeaderStatus>>;
@@ -63,6 +65,9 @@ pub struct QueuedHeaders {
/// Headers that are (we believe) currently submitted to target node by our,
/// not-yet mined transactions.
submitted: HeadersQueue,
+ /// Synced headers childrens. We need it to support case when header is synced, but some of
+ /// its parents are incomplete.
+ synced_children: SyncedChildren
,
/// Pointers to all headers that we ever seen and we believe we can touch in the future.
known_headers: KnownHeaders
,
/// Headers that are waiting for completion data from source node. Mapped (and auto-sorted
@@ -96,6 +101,7 @@ impl Default for QueuedHeaders {
ready: HeadersQueue::new(),
incomplete: HeadersQueue::new(),
submitted: HeadersQueue::new(),
+ synced_children: SyncedChildren::
::new(),
known_headers: KnownHeaders::
::new(),
incomplete_headers: LinkedHashMap::new(),
completion_data: LinkedHashMap::new(),
@@ -419,13 +425,17 @@ impl QueuedHeaders {
self.header_synced(&new_incomplete_header);
}
- move_header_descendants::
(
- &mut [&mut self.ready, &mut self.submitted],
- &mut self.incomplete,
- &mut self.known_headers,
- HeaderStatus::Incomplete,
- &new_incomplete_header,
- );
+ let move_origins = select_synced_children::
(&self.synced_children, &new_incomplete_header);
+ let move_origins = move_origins.into_iter().chain(std::iter::once(new_incomplete_header));
+ for move_origin in move_origins {
+ move_header_descendants::
(
+ &mut [&mut self.ready, &mut self.submitted],
+ &mut self.incomplete,
+ &mut self.known_headers,
+ HeaderStatus::Incomplete,
+ &move_origin,
+ );
+ }
if make_header_incomplete {
log::debug!(
@@ -460,13 +470,20 @@ impl QueuedHeaders {
.cloned()
.collect::>();
for just_completed_header in just_completed_headers {
- move_header_descendants::(
- &mut [&mut self.incomplete],
- &mut self.ready,
- &mut self.known_headers,
- HeaderStatus::Ready,
- &just_completed_header,
- );
+ // sub2eth rejects H if H.Parent is incomplete
+ // sub2sub allows 'syncing' headers like that
+ // => let's check if there are some synced children of just completed header
+ let move_origins = select_synced_children::
(&self.synced_children, &just_completed_header);
+ let move_origins = move_origins.into_iter().chain(std::iter::once(just_completed_header));
+ for move_origin in move_origins {
+ move_header_descendants::
(
+ &mut [&mut self.incomplete],
+ &mut self.ready,
+ &mut self.known_headers,
+ HeaderStatus::Ready,
+ &move_origin,
+ );
+ }
log::debug!(
target: "bridge",
@@ -514,6 +531,7 @@ impl QueuedHeaders {
prune_queue(&mut self.ready, prune_border);
prune_queue(&mut self.submitted, prune_border);
prune_queue(&mut self.incomplete, prune_border);
+ self.synced_children = self.synced_children.split_off(&prune_border);
prune_known_headers::
(&mut self.known_headers, prune_border);
self.prune_border = prune_border;
}
@@ -527,6 +545,7 @@ impl QueuedHeaders {
self.ready.clear();
self.incomplete.clear();
self.submitted.clear();
+ self.synced_children.clear();
self.known_headers.clear();
self.best_synced_number = Zero::zero();
self.prune_border = Zero::zero();
@@ -568,6 +587,7 @@ impl QueuedHeaders {
// queues
let mut current = *id;
let mut id_processed = false;
+ let mut previous_current = None;
loop {
let header = match self.status(¤t) {
HeaderStatus::Unknown => break,
@@ -582,8 +602,42 @@ impl QueuedHeaders {
}
.expect("header has a given status; given queue has the header; qed");
+ // remember ids of all the children of the current header
+ let synced_children_entry = self
+ .synced_children
+ .entry(current.0)
+ .or_default()
+ .entry(current.1)
+ .or_default();
+ let all_queues = [
+ &self.maybe_orphan,
+ &self.orphan,
+ &self.maybe_extra,
+ &self.extra,
+ &self.ready,
+ &self.incomplete,
+ &self.submitted,
+ ];
+ for queue in &all_queues {
+ let children_from_queue = queue
+ .get(&(current.0 + One::one()))
+ .map(|potential_children| {
+ potential_children
+ .values()
+ .filter(|potential_child| potential_child.header().parent_id() == current)
+ .map(|child| child.id())
+ .collect::>()
+ })
+ .unwrap_or_default();
+ synced_children_entry.extend(children_from_queue);
+ }
+ if let Some(previous_current) = previous_current {
+ synced_children_entry.insert(previous_current);
+ }
+
set_header_status::(&mut self.known_headers, ¤t, HeaderStatus::Synced);
+ previous_current = Some(current);
current = header.parent_id();
id_processed = true;
}
@@ -706,6 +760,35 @@ fn move_header_descendants(
}
}
+/// Selects (recursive) all synced children of given header.
+fn select_synced_children(
+ synced_children: &SyncedChildren,
+ id: &HeaderIdOf
,
+) -> Vec> {
+ let mut result = Vec::new();
+ let mut current_parents = HashSet::new();
+ current_parents.insert(*id);
+
+ while !current_parents.is_empty() {
+ let mut next_parents = HashSet::new();
+ for current_parent in ¤t_parents {
+ let current_parent_synced_children = synced_children
+ .get(¤t_parent.0)
+ .and_then(|by_number_entry| by_number_entry.get(¤t_parent.1));
+ if let Some(current_parent_synced_children) = current_parent_synced_children {
+ for current_parent_synced_child in current_parent_synced_children {
+ result.push(*current_parent_synced_child);
+ next_parents.insert(*current_parent_synced_child);
+ }
+ }
+ }
+
+ let _ = std::mem::replace(&mut current_parents, next_parents);
+ }
+
+ result
+}
+
/// Return oldest header from the queue.
fn oldest_header(queue: &HeadersQueue) -> Option<&QueuedHeader
> {
queue.values().flat_map(|h| h.values()).next()
@@ -1050,6 +1133,37 @@ pub(crate) mod tests {
.known_headers
.values()
.all(|s| s.values().all(|s| *s == HeaderStatus::Synced)));
+
+ // children of synced headers are stored
+ assert_eq!(
+ vec![id(97)],
+ queue.synced_children[&96][&hash(96)]
+ .iter()
+ .cloned()
+ .collect::>()
+ );
+ assert_eq!(
+ vec![id(98)],
+ queue.synced_children[&97][&hash(97)]
+ .iter()
+ .cloned()
+ .collect::>()
+ );
+ assert_eq!(
+ vec![id(99)],
+ queue.synced_children[&98][&hash(98)]
+ .iter()
+ .cloned()
+ .collect::>()
+ );
+ assert_eq!(
+ vec![id(100)],
+ queue.synced_children[&99][&hash(99)]
+ .iter()
+ .cloned()
+ .collect::>()
+ );
+ assert_eq!(0, queue.synced_children[&100][&hash(100)].len());
}
#[test]
@@ -1463,6 +1577,16 @@ pub(crate) mod tests {
.or_default()
.insert(hash(100), HeaderStatus::Ready);
queue.ready.entry(100).or_default().insert(hash(100), header(100));
+ queue
+ .synced_children
+ .entry(100)
+ .or_default()
+ .insert(hash(100), vec![id(101)].into_iter().collect());
+ queue
+ .synced_children
+ .entry(102)
+ .or_default()
+ .insert(hash(102), vec![id(102)].into_iter().collect());
queue.prune(102);
@@ -1472,6 +1596,7 @@ pub(crate) mod tests {
assert_eq!(queue.orphan.len(), 1);
assert_eq!(queue.maybe_orphan.len(), 1);
assert_eq!(queue.incomplete.len(), 1);
+ assert_eq!(queue.synced_children.len(), 1);
assert_eq!(queue.known_headers.len(), 4);
queue.prune(110);
@@ -1482,6 +1607,7 @@ pub(crate) mod tests {
assert_eq!(queue.orphan.len(), 0);
assert_eq!(queue.maybe_orphan.len(), 0);
assert_eq!(queue.incomplete.len(), 0);
+ assert_eq!(queue.synced_children.len(), 0);
assert_eq!(queue.known_headers.len(), 0);
queue.header_response(header(109).header().clone());
@@ -1537,4 +1663,47 @@ pub(crate) mod tests {
assert_eq!(queue.status(&id(103)), HeaderStatus::Incomplete);
assert_eq!(queue.status(&id(104)), HeaderStatus::Incomplete);
}
+
+ #[test]
+ fn incomplete_headers_response_moves_synced_headers() {
+ let mut queue = QueuedHeaders::::default();
+
+ // we have submitted two headers - 100 and 101. 102 is ready
+ queue.submitted.entry(100).or_default().insert(hash(100), header(100));
+ queue.submitted.entry(101).or_default().insert(hash(101), header(101));
+ queue.ready.entry(102).or_default().insert(hash(102), header(102));
+ queue
+ .known_headers
+ .entry(100)
+ .or_default()
+ .insert(hash(100), HeaderStatus::Submitted);
+ queue
+ .known_headers
+ .entry(101)
+ .or_default()
+ .insert(hash(101), HeaderStatus::Submitted);
+ queue
+ .known_headers
+ .entry(102)
+ .or_default()
+ .insert(hash(102), HeaderStatus::Ready);
+
+ // both headers are accepted
+ queue.target_best_header_response(&id(101));
+
+ // but header 100 is incomplete
+ queue.incomplete_headers_response(vec![id(100)].into_iter().collect());
+ assert_eq!(queue.status(&id(100)), HeaderStatus::Synced);
+ assert_eq!(queue.status(&id(101)), HeaderStatus::Synced);
+ assert_eq!(queue.status(&id(102)), HeaderStatus::Incomplete);
+ assert!(queue.incomplete_headers.contains_key(&id(100)));
+ assert!(queue.incomplete[&102].contains_key(&hash(102)));
+
+ // when header 100 is completed, 101 is synced and 102 is ready
+ queue.incomplete_headers_response(HashSet::new());
+ assert_eq!(queue.status(&id(100)), HeaderStatus::Synced);
+ assert_eq!(queue.status(&id(101)), HeaderStatus::Synced);
+ assert_eq!(queue.status(&id(102)), HeaderStatus::Ready);
+ assert!(queue.ready[&102].contains_key(&hash(102)));
+ }
}
diff --git a/relays/headers-relay/src/sync_loop.rs b/relays/headers-relay/src/sync_loop.rs
index 14eb7e2a4140..c53a1ab0f082 100644
--- a/relays/headers-relay/src/sync_loop.rs
+++ b/relays/headers-relay/src/sync_loop.rs
@@ -277,7 +277,7 @@ pub fn run>(
},
&mut target_go_offline_future,
|delay| async_std::task::sleep(delay),
- || format!("Error retrieving best known header from {} node", P::TARGET_NAME),
+ || format!("Error retrieving best known {} header from {} node", P::SOURCE_NAME, P::TARGET_NAME),
).is_ok();
},
incomplete_headers_ids = target_incomplete_headers_future => {
diff --git a/relays/headers-relay/src/sync_types.rs b/relays/headers-relay/src/sync_types.rs
index 0dcb712c9180..a910ce581c7e 100644
--- a/relays/headers-relay/src/sync_types.rs
+++ b/relays/headers-relay/src/sync_types.rs
@@ -43,7 +43,7 @@ pub enum HeaderStatus {
}
/// Headers synchronization pipeline.
-pub trait HeadersSyncPipeline: Clone + Copy + Send + Sync {
+pub trait HeadersSyncPipeline: Clone + Send + Sync {
/// Name of the headers source.
const SOURCE_NAME: &'static str;
/// Name of the headers target.
diff --git a/relays/millau-client/src/lib.rs b/relays/millau-client/src/lib.rs
index 291954cbea9e..c1fadba024da 100644
--- a/relays/millau-client/src/lib.rs
+++ b/relays/millau-client/src/lib.rs
@@ -60,12 +60,18 @@ impl From for SyncHeader {
}
}
+impl From for millau_runtime::Header {
+ fn from(header: SyncHeader) -> Self {
+ header.0
+ }
+}
+
impl SourceHeader for SyncHeader {
fn id(&self) -> HeaderId {
relay_utils::HeaderId(*self.number(), self.hash())
}
fn parent_id(&self) -> HeaderId {
- relay_utils::HeaderId(*self.number(), *self.parent_hash())
+ relay_utils::HeaderId(*self.number() - 1, *self.parent_hash())
}
}
diff --git a/relays/rialto-client/src/lib.rs b/relays/rialto-client/src/lib.rs
index b8601c3ccd0f..95382cf591d9 100644
--- a/relays/rialto-client/src/lib.rs
+++ b/relays/rialto-client/src/lib.rs
@@ -25,6 +25,8 @@ use sp_runtime::{
traits::{Header as HeaderT, IdentifyAccount},
};
+pub use rialto_runtime::BridgeMillauCall;
+
/// Rialto header id.
pub type HeaderId = relay_utils::HeaderId;
diff --git a/relays/substrate-client/src/client.rs b/relays/substrate-client/src/client.rs
index ad6dad1a2a47..896b01477590 100644
--- a/relays/substrate-client/src/client.rs
+++ b/relays/substrate-client/src/client.rs
@@ -33,6 +33,9 @@ const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
pub type OpaqueGrandpaAuthoritiesSet = Vec;
/// Substrate client type.
+///
+/// Cloning Client is a cheap operation.
+#[derive(Clone)]
pub struct Client {
/// Substrate RPC client.
client: RpcClient,
diff --git a/relays/substrate/Cargo.toml b/relays/substrate/Cargo.toml
index c10923dff9ff..2e61699b7d93 100644
--- a/relays/substrate/Cargo.toml
+++ b/relays/substrate/Cargo.toml
@@ -16,6 +16,7 @@ structopt = "0.3"
# Bridge dependencies
+bp-millau = { path = "../../primitives/millau" }
bp-rialto = { path = "../../primitives/rialto" }
headers-relay = { path = "../headers-relay" }
messages-relay = { path = "../messages-relay" }
@@ -24,6 +25,7 @@ relay-rialto-client = { path = "../rialto-client" }
relay-substrate-client = { path = "../substrate-client" }
relay-utils = { path = "../utils" }
-# Substrate dependencies
+# Substrate Dependencies
+sp-core = "2.0"
sp-runtime = "2.0"
diff --git a/relays/substrate/src/headers_target.rs b/relays/substrate/src/headers_target.rs
new file mode 100644
index 000000000000..92bc017a72eb
--- /dev/null
+++ b/relays/substrate/src/headers_target.rs
@@ -0,0 +1,168 @@
+// Copyright 2019-2020 Parity Technologies (UK) Ltd.
+// This file is part of Parity Bridges Common.
+
+// Parity Bridges Common is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity Bridges Common is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity Bridges Common. If not, see .
+
+//! Substrate client as Substrate headers target. The chain we connect to should have
+//! runtime that implements `HeaderApi` to allow bridging with
+//! chain.
+
+use async_trait::async_trait;
+use codec::{Decode, Encode};
+use futures::TryFutureExt;
+use headers_relay::{
+ sync_loop::TargetClient,
+ sync_types::{HeaderIdOf, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders},
+};
+use relay_substrate_client::{Chain, Client, Error as SubstrateError};
+use relay_utils::HeaderId;
+use sp_core::Bytes;
+use sp_runtime::{DeserializeOwned, Justification};
+use std::collections::HashSet;
+
+/// Headers sync pipeline for Substrate <-> Substrate relays.
+#[async_trait]
+pub trait SubstrateHeadersSyncPipeline: HeadersSyncPipeline {
+ /// Name of the `best_block` runtime method.
+ const BEST_BLOCK_METHOD: &'static str;
+ /// Name of the `is_known_block` runtime method.
+ const IS_KNOWN_BLOCK_METHOD: &'static str;
+ /// Name of the `incomplete_headers` runtime method.
+ const INCOMPLETE_HEADERS_METHOD: &'static str;
+
+ /// Signed transaction type.
+ type SignedTransaction: Send + Sync + Encode;
+
+ /// Make submit header transaction.
+ async fn make_submit_header_transaction(
+ &self,
+ header: QueuedHeader,
+ ) -> Result;
+
+ /// Make completion transaction for the header.
+ async fn make_complete_header_transaction(
+ &self,
+ id: HeaderIdOf,
+ completion: Justification,
+ ) -> Result;
+}
+
+/// Substrate client as Substrate headers target.
+pub struct SubstrateHeadersTarget {
+ client: Client,
+ pipeline: P,
+}
+
+impl SubstrateHeadersTarget {
+ /// Create new Substrate headers target.
+ pub fn new(client: Client, pipeline: P) -> Self {
+ SubstrateHeadersTarget { client, pipeline }
+ }
+}
+
+#[async_trait]
+impl TargetClient for SubstrateHeadersTarget
+where
+ C: Chain,
+ C::Header: DeserializeOwned,
+ C::Index: DeserializeOwned,
+ P::Number: Decode,
+ P::Hash: Decode + Encode,
+ P: SubstrateHeadersSyncPipeline,
+{
+ type Error = SubstrateError;
+
+ async fn best_header_id(&self) -> Result, Self::Error> {
+ let call = P::BEST_BLOCK_METHOD.into();
+ let data = Bytes(Vec::new());
+
+ let encoded_response = self.client.state_call(call, data, None).await?;
+ let decoded_response: (P::Number, P::Hash) =
+ Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?;
+
+ let best_header_id = HeaderId(decoded_response.0, decoded_response.1);
+ Ok(best_header_id)
+ }
+
+ async fn is_known_header(&self, id: HeaderIdOf) -> Result<(HeaderIdOf
, bool), Self::Error> {
+ let call = P::IS_KNOWN_BLOCK_METHOD.into();
+ let data = Bytes(id.1.encode());
+
+ let encoded_response = self.client.state_call(call, data, None).await?;
+ let is_known_block: bool =
+ Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?;
+
+ Ok((id, is_known_block))
+ }
+
+ async fn submit_headers(&self, mut headers: Vec>) -> SubmittedHeaders, Self::Error> {
+ debug_assert_eq!(
+ headers.len(),
+ 1,
+ "Substrate pallet only supports single header / transaction"
+ );
+
+ let header = headers.remove(0);
+ let id = header.id();
+ let submit_transaction_result = self
+ .pipeline
+ .make_submit_header_transaction(header)
+ .and_then(|tx| self.client.submit_extrinsic(Bytes(tx.encode())))
+ .await;
+
+ match submit_transaction_result {
+ Ok(_) => SubmittedHeaders {
+ submitted: vec![id],
+ incomplete: Vec::new(),
+ rejected: Vec::new(),
+ fatal_error: None,
+ },
+ Err(error) => SubmittedHeaders {
+ submitted: Vec::new(),
+ incomplete: Vec::new(),
+ rejected: vec![id],
+ fatal_error: Some(error),
+ },
+ }
+ }
+
+ async fn incomplete_headers_ids(&self) -> Result>, Self::Error> {
+ let call = P::INCOMPLETE_HEADERS_METHOD.into();
+ let data = Bytes(Vec::new());
+
+ let encoded_response = self.client.state_call(call, data, None).await?;
+ let decoded_response: Vec<(P::Number, P::Hash)> =
+ Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?;
+
+ let incomplete_headers = decoded_response
+ .into_iter()
+ .map(|(number, hash)| HeaderId(number, hash))
+ .collect();
+ Ok(incomplete_headers)
+ }
+
+ async fn complete_header(
+ &self,
+ id: HeaderIdOf,
+ completion: Justification,
+ ) -> Result, Self::Error> {
+ let tx = self.pipeline.make_complete_header_transaction(id, completion).await?;
+ self.client.submit_extrinsic(Bytes(tx.encode())).await?;
+ Ok(id)
+ }
+
+ async fn requires_extra(&self, header: QueuedHeader) -> Result<(HeaderIdOf
, bool), Self::Error> {
+ Ok((header.id(), false))
+ }
+}
diff --git a/relays/substrate/src/main.rs b/relays/substrate/src/main.rs
index 743827ab8d87..ad77a13eea9b 100644
--- a/relays/substrate/src/main.rs
+++ b/relays/substrate/src/main.rs
@@ -28,6 +28,7 @@ pub type MillauClient = relay_substrate_client::Client;
mod cli;
+mod headers_target;
mod millau_headers_to_rialto;
fn main() {
diff --git a/relays/substrate/src/millau_headers_to_rialto.rs b/relays/substrate/src/millau_headers_to_rialto.rs
index f7064c51bf11..67ac6d6b94e7 100644
--- a/relays/substrate/src/millau_headers_to_rialto.rs
+++ b/relays/substrate/src/millau_headers_to_rialto.rs
@@ -16,24 +16,33 @@
//! Millau-to-Rialto headers sync entrypoint.
-use crate::{MillauClient, RialtoClient};
+use crate::{
+ headers_target::{SubstrateHeadersSyncPipeline, SubstrateHeadersTarget},
+ MillauClient, RialtoClient,
+};
use async_trait::async_trait;
+use bp_millau::{BEST_MILLAU_BLOCK_METHOD, INCOMPLETE_MILLAU_HEADERS_METHOD, IS_KNOWN_MILLAU_BLOCK_METHOD};
use codec::Encode;
use headers_relay::{
sync::{HeadersSyncParams, TargetTransactionMode},
- sync_loop::TargetClient,
- sync_types::{HeadersSyncPipeline, QueuedHeader, SubmittedHeaders},
+ sync_types::{HeadersSyncPipeline, QueuedHeader},
};
use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SyncHeader as MillauSyncHeader};
-use relay_rialto_client::SigningParams as RialtoSigningParams;
-use relay_substrate_client::{headers_source::HeadersSource, BlockNumberOf, Error as SubstrateError, HashOf};
+use relay_rialto_client::{BridgeMillauCall, Rialto, SigningParams as RialtoSigningParams};
+use relay_substrate_client::{
+ headers_source::HeadersSource, BlockNumberOf, Error as SubstrateError, HashOf, TransactionSignScheme,
+};
+use sp_core::Pair;
use sp_runtime::Justification;
-use std::{collections::HashSet, time::Duration};
+use std::time::Duration;
/// Millau-to-Rialto headers pipeline.
-#[derive(Debug, Clone, Copy)]
-struct MillauHeadersToRialto;
+#[derive(Debug, Clone)]
+struct MillauHeadersToRialto {
+ client: RialtoClient,
+ sign: RialtoSigningParams,
+}
impl HeadersSyncPipeline for MillauHeadersToRialto {
const SOURCE_NAME: &'static str = "Millau";
@@ -50,51 +59,46 @@ impl HeadersSyncPipeline for MillauHeadersToRialto {
}
}
-/// Millau header in-the-queue.
-type QueuedMillauHeader = QueuedHeader;
-
-/// Millau node as headers source.
-type MillauSourceClient = HeadersSource;
-
-/// Rialto node as headers target.
-struct RialtoTargetClient {
- _client: RialtoClient,
- _sign: RialtoSigningParams,
-}
-
#[async_trait]
-impl TargetClient for RialtoTargetClient {
- type Error = SubstrateError;
+impl SubstrateHeadersSyncPipeline for MillauHeadersToRialto {
+ const BEST_BLOCK_METHOD: &'static str = BEST_MILLAU_BLOCK_METHOD;
+ const IS_KNOWN_BLOCK_METHOD: &'static str = IS_KNOWN_MILLAU_BLOCK_METHOD;
+ const INCOMPLETE_HEADERS_METHOD: &'static str = INCOMPLETE_MILLAU_HEADERS_METHOD;
- async fn best_header_id(&self) -> Result {
- unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209")
- }
+ type SignedTransaction = ::SignedTransaction;
- async fn is_known_header(&self, _id: MillauHeaderId) -> Result<(MillauHeaderId, bool), Self::Error> {
- unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209")
+ async fn make_submit_header_transaction(
+ &self,
+ header: QueuedMillauHeader,
+ ) -> Result {
+ let account_id = self.sign.signer.public().as_array_ref().clone().into();
+ let nonce = self.client.next_account_index(account_id).await?;
+ let call = BridgeMillauCall::import_signed_header(header.header().clone().into()).into();
+ let transaction = Rialto::sign_transaction(&self.client, &self.sign.signer, nonce, call);
+ Ok(transaction)
}
- async fn submit_headers(&self, _headers: Vec) -> SubmittedHeaders {
- unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209")
+ async fn make_complete_header_transaction(
+ &self,
+ id: MillauHeaderId,
+ completion: Justification,
+ ) -> Result {
+ let account_id = self.sign.signer.public().as_array_ref().clone().into();
+ let nonce = self.client.next_account_index(account_id).await?;
+ let call = BridgeMillauCall::finalize_header(id.1, completion).into();
+ let transaction = Rialto::sign_transaction(&self.client, &self.sign.signer, nonce, call);
+ Ok(transaction)
}
+}
- async fn incomplete_headers_ids(&self) -> Result, Self::Error> {
- unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209")
- }
+/// Millau header in-the-queue.
+type QueuedMillauHeader = QueuedHeader;
- #[allow(clippy::unit_arg)]
- async fn complete_header(
- &self,
- _id: MillauHeaderId,
- _completion: Justification,
- ) -> Result {
- unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209")
- }
+/// Millau node as headers source.
+type MillauSourceClient = HeadersSource;
- async fn requires_extra(&self, _header: QueuedMillauHeader) -> Result<(MillauHeaderId, bool), Self::Error> {
- unimplemented!("https://github.com/paritytech/parity-bridges-common/issues/209")
- }
-}
+/// Rialto node as headers target.
+type RialtoTargetClient = SubstrateHeadersTarget;
/// Run Millau-to-Rialto headers sync.
pub fn run(
@@ -107,8 +111,8 @@ pub fn run(
let rialto_tick = Duration::from_secs(5);
let sync_params = HeadersSyncParams {
max_future_headers_to_download: 32,
- max_headers_in_submitted_status: 1024,
- max_headers_in_single_submit: 8,
+ max_headers_in_submitted_status: 8,
+ max_headers_in_single_submit: 1,
max_headers_size_in_single_submit: 1024 * 1024,
prune_depth: 256,
target_tx_mode: TargetTransactionMode::Signed,
@@ -117,10 +121,13 @@ pub fn run(
headers_relay::sync_loop::run(
MillauSourceClient::new(millau_client),
millau_tick,
- RialtoTargetClient {
- _client: rialto_client,
- _sign: rialto_sign,
- },
+ RialtoTargetClient::new(
+ rialto_client.clone(),
+ MillauHeadersToRialto {
+ client: rialto_client,
+ sign: rialto_sign,
+ },
+ ),
rialto_tick,
sync_params,
metrics_params,