Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[indexer-alt] - add rpc api ingestion to indexer-alt #20787

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-indexer-alt-e2e-tests/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl OffchainCluster {

let client_args = ClientArgs {
local_ingestion_path: Some(config.data_ingestion_path.clone()),
rpc_api_url: None,
remote_store_url: None,
};

Expand Down
4 changes: 3 additions & 1 deletion crates/sui-indexer-alt-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ axum.workspace = true
backoff.workspace = true
bb8 = "0.8.5"
chrono.workspace = true
clap.workspace = true
clap = { workspace = true, features = ["env"] }
diesel = { workspace = true, features = ["chrono"] }
diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] }
diesel_migrations.workspace = true
Expand All @@ -34,6 +34,8 @@ sui-indexer-alt-metrics.workspace = true
sui-pg-db.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
sui-rpc-api.workspace = true
tonic.workspace = true

[dev-dependencies]
rand.workspace = true
Expand Down
44 changes: 32 additions & 12 deletions crates/sui-indexer-alt-framework/src/ingestion/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use backoff::ExponentialBackoff;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sui_rpc_api::client::AuthInterceptor;
use sui_rpc_api::Client;
use sui_storage::blob::Blob;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio_util::bytes::Bytes;
Expand Down Expand Up @@ -42,7 +44,12 @@ pub enum FetchError {
},
}

pub type FetchResult = Result<Bytes, FetchError>;
pub type FetchResult = Result<FetchData, FetchError>;

pub enum FetchData {
Raw(Bytes),
CheckPointData(CheckpointData),
}

#[derive(Clone)]
pub struct IngestionClient {
Expand All @@ -63,6 +70,12 @@ impl IngestionClient {
Self::new_impl(client, metrics)
}

pub(crate) fn new_rpc(url: Url, metrics: Arc<IndexerMetrics>) -> IngestionResult<Self> {
let client = Client::new(url.to_string())?
.with_auth(AuthInterceptor::basic(url.username(), url.password()));
Ok(Self::new_impl(Arc::new(client), metrics))
}

fn new_impl(client: Arc<dyn IngestionClientTrait>, metrics: Arc<IndexerMetrics>) -> Self {
let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
metrics.ingested_checkpoint_timestamp_lag.clone(),
Expand Down Expand Up @@ -133,7 +146,7 @@ impl IngestionClient {
return Err(BE::permanent(IngestionError::Cancelled));
}

let bytes = client.fetch(checkpoint).await.map_err(|err| match err {
let fetch_data = client.fetch(checkpoint).await.map_err(|err| match err {
FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)),
FetchError::Permanent(error) => {
BE::permanent(IngestionError::FetchError(checkpoint, error))
Expand All @@ -145,16 +158,23 @@ impl IngestionClient {
),
})?;

self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
let data: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| {
self.metrics.inc_retry(
checkpoint,
"deserialization",
IngestionError::DeserializationError(checkpoint, e),
)
})?;

Ok(data)
Ok(match fetch_data {
FetchData::Raw(bytes) => {
self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
Blob::from_bytes(&bytes).map_err(|e| {
self.metrics.inc_retry(
checkpoint,
"deserialization",
IngestionError::DeserializationError(checkpoint, e),
)
})?
}
FetchData::CheckPointData(data) => {
// We are not recording size metric for Checkpoint data (from RPC client).
// TODO: Record the metric when we have a good way to get the size information
data
}
})
}
};

Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer-alt-framework/src/ingestion/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ pub enum Error {

#[error("Shutdown signal received, stopping ingestion service")]
Cancelled,

#[error(transparent)]
RpcClientError(#[from] tonic::Status),
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait};
use crate::ingestion::client::{FetchData, FetchError, FetchResult, IngestionClientTrait};
use axum::body::Bytes;
use std::path::PathBuf;

Expand Down Expand Up @@ -31,7 +31,7 @@ impl IngestionClientTrait for LocalIngestionClient {
}
}
})?;
Ok(Bytes::from(bytes))
Ok(FetchData::Raw(Bytes::from(bytes)))
}
}

Expand Down
16 changes: 13 additions & 3 deletions crates/sui-indexer-alt-framework/src/ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,26 @@ pub mod error;
mod local_client;
mod regulator;
mod remote_client;
mod rpc_client;
#[cfg(test)]
mod test_utils;

#[derive(clap::Args, Clone, Debug)]
#[group(required = true)]
pub struct ClientArgs {
/// Remote Store to fetch checkpoints from.
#[clap(long, required = true, group = "source")]
#[clap(long, group = "source")]
amnn marked this conversation as resolved.
Show resolved Hide resolved
pub remote_store_url: Option<Url>,

/// Path to the local ingestion directory.
/// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used.
#[clap(long, required = true, group = "source")]
#[clap(long, group = "source")]
pub local_ingestion_path: Option<PathBuf>,

/// Path to the local ingestion directory.
/// If all remote_store_url, local_ingestion_path and rpc_api_url are provided, remote_store_url will be used.
#[clap(long, env, group = "source")]
pub rpc_api_url: Option<Url>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -82,8 +89,10 @@ impl IngestionService {
IngestionClient::new_remote(url.clone(), metrics.clone())?
} else if let Some(path) = args.local_ingestion_path.as_ref() {
IngestionClient::new_local(path.clone(), metrics.clone())
} else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
IngestionClient::new_rpc(rpc_api_url.clone(), metrics.clone())?
} else {
panic!("Either remote_store_url or local_ingestion_path must be provided");
panic!("One of remote_store_url, local_ingestion_path or rpc_api_url must be provided");
};

let subscribers = Vec::new();
Expand Down Expand Up @@ -204,6 +213,7 @@ mod tests {
ClientArgs {
remote_store_url: Some(Url::parse(&uri).unwrap()),
local_ingestion_path: None,
rpc_api_url: None,
},
IngestionConfig {
checkpoint_buffer_size,
Expand Down
14 changes: 9 additions & 5 deletions crates/sui-indexer-alt-framework/src/ingestion/remote_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait};
use crate::ingestion::client::{FetchData, FetchError, FetchResult, IngestionClientTrait};
use crate::ingestion::Result as IngestionResult;
use reqwest::{Client, StatusCode};
use tracing::{debug, error};
Expand Down Expand Up @@ -65,10 +65,14 @@ impl IngestionClientTrait for RemoteIngestionClient {
// checkpoint from them is considered a transient error -- the store being
// fetched from needs to be corrected, and ingestion will keep retrying it
// until it is.
response.bytes().await.map_err(|e| FetchError::Transient {
reason: "bytes",
error: e.into(),
})
response
.bytes()
.await
.map_err(|e| FetchError::Transient {
reason: "bytes",
error: e.into(),
})
.map(FetchData::Raw)
}

// Treat 404s as a special case so we can match on this error type.
Expand Down
23 changes: 23 additions & 0 deletions crates/sui-indexer-alt-framework/src/ingestion/rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::ingestion::client::{FetchData, FetchError, FetchResult, IngestionClientTrait};
use anyhow::anyhow;
use sui_rpc_api::Client as RpcClient;

#[async_trait::async_trait]
impl IngestionClientTrait for RpcClient {
async fn fetch(&self, checkpoint: u64) -> FetchResult {
let data = self.get_full_checkpoint(checkpoint).await.map_err(|e| {
if e.message().contains("not found") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a more structured way to identify that a checkpoint is not found based on the response @bmwill ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The errors are a bit of a mess still and i'm actively working on this as we speak. right now i think the gRPC Code::Unknown is always returned (which is not great)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a Code::NOT_FOUND though which will eventually be returned here (hopefully by the next release)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok #21163 should address this and allow you to look at the error code directly

FetchError::NotFound
} else {
FetchError::Transient {
reason: "get_full_checkpoint",
error: anyhow!(e),
}
}
})?;
Ok(FetchData::CheckPointData(data))
}
}
1 change: 1 addition & 0 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ impl Indexer {
ClientArgs {
remote_store_url: None,
local_ingestion_path: Some(tempdir().unwrap().into_path()),
rpc_api_url: None,
},
IngestionConfig::default(),
Some(migrations),
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub async fn run_benchmark(
let client_args = ClientArgs {
remote_store_url: None,
local_ingestion_path: Some(ingestion_path.clone()),
rpc_api_url: None,
};

let cur_time = Instant::now();
Expand Down
8 changes: 6 additions & 2 deletions crates/sui-rpc-api/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ impl Client {
let mut endpoint = tonic::transport::Endpoint::from(uri.clone());
if uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
endpoint = endpoint
.tls_config(ClientTlsConfig::new().with_enabled_roots())
.tls_config(
ClientTlsConfig::new()
.with_enabled_roots()
.assume_http2(true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? any TLS negotiation should include h2 in ALPN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed for the full node we setup in the private net

)
.map_err(Into::into)
.map_err(Status::from_error)?;
}
Expand Down Expand Up @@ -148,7 +152,7 @@ impl Client {

let (metadata, response, _extentions) = self
.raw_client()
.max_decoding_message_size(64 * 1024 * 1024)
.max_decoding_message_size(128 * 1024 * 1024)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the checkpoint in our test environment exceeded 64Mb, I bumped the max decoding message size to avoid the error, @bmwill do you know what is the correct value to set here?

.get_full_checkpoint(request)
.await?
.into_parts();
Expand Down
Loading