Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(risedev): support 3 compute nodes in playground #4332

Merged
merged 6 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,23 @@ risedev:
- use: compute-node
- use: frontend

playground-3cn:
- use: meta-node
enable-dashboard-v2: false
unsafe-disable-recovery: true
max-idle-secs-to-exit: 1800
- use: compute-node
port: 5687
exporter-port: 1222
- use: compute-node
port: 5688
exporter-port: 1223
- use: compute-node
port: 5689
exporter-port: 1224
- use: frontend
- use: compactor

docker-playground:
- use: meta-node
enable-dashboard-v2: false
Expand Down
2 changes: 1 addition & 1 deletion src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fn main() -> Result<()> {

/// Get the launch target of this all-in-one binary
fn get_target(cmds: Vec<&str>) -> (String, Vec<String>) {
if let Some(cmd) = env::args().nth(1) && cmds.contains(&cmd.as_str()){
if let Some(cmd) = env::args().nth(1) && cmds.contains(&cmd.as_str()) {
// ./risingwave meta <args>
return (cmd, env::args().skip(1).collect());
}
Expand Down
1 change: 1 addition & 0 deletions src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bytes = { version = "1", features = ["serde"] }
fail = "0.5"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = "0.10"
lazy_static = "1"
madsim = "=0.2.0-alpha.5"
prometheus = { version = "0.13", features = ["process"] }
risingwave_common = { path = "../common" }
Expand Down
18 changes: 15 additions & 3 deletions src/object_store/src/object/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use bytes::Bytes;
Expand All @@ -25,9 +26,9 @@ use super::{ObjectError, ObjectResult};
use crate::object::{BlockLocation, ObjectMetadata, ObjectStore};

/// In-memory object storage, useful for testing.
#[derive(Default)]
#[derive(Default, Clone)]
pub struct InMemObjectStore {
objects: Mutex<HashMap<String, (ObjectMetadata, Bytes)>>,
objects: Arc<Mutex<HashMap<String, (ObjectMetadata, Bytes)>>>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -112,10 +113,21 @@ impl ObjectStore for InMemObjectStore {
impl InMemObjectStore {
pub fn new() -> Self {
Self {
objects: Mutex::new(HashMap::new()),
objects: Arc::new(Mutex::new(HashMap::new())),
}
}

/// Create a shared reference to the in-memory object store in this process.
///
/// Note: Should only be used for `risedev playground`, when there're multiple compute-nodes or
/// compactors in the same process.
pub(super) fn shared() -> Self {
lazy_static::lazy_static! {
static ref SHARED: InMemObjectStore = InMemObjectStore::new();
}
SHARED.clone()
}

async fn get_object<R, F>(&self, path: &str, f: F) -> ObjectResult<R>
where
F: Fn(&Bytes) -> R,
Expand Down
12 changes: 8 additions & 4 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,17 @@ pub async fn parse_remote_object_store(
disk if disk.starts_with("disk://") => ObjectStoreImpl::Disk(
DiskObjectStore::new(disk.strip_prefix("disk://").unwrap()).monitored(metrics),
),
memory if memory.starts_with("memory") => {
"memory" => {
tracing::warn!("You're using Hummock in-memory remote object store. This should never be used in benchmarks and production environment.");
ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics))
}
"memory-shared" => {
tracing::warn!("You're using Hummock shared in-memory remote object store. This should never be used in benchmarks and production environment.");
ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics))
}
other => {
unimplemented!(
"{} hummock remote object store only supports s3, minio, disk, and memory for now.",
"{} hummock remote object store only supports s3, minio, disk, memory, and memory-shared for now.",
other
)
}
Expand All @@ -360,13 +364,13 @@ pub async fn parse_local_object_store(
.to_owned();
ObjectStoreImpl::Disk(DiskObjectStore::new(path.as_str()).monitored(metrics))
}
memory if memory.starts_with("memory") => {
"memory" => {
tracing::warn!("You're using Hummock in-memory local object store. This should never be used in benchmarks and production environment.");
ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics))
}
other => {
unimplemented!(
"{} Hummock only supports s3, minio, disk, and memory for now.",
"{} Hummock only supports s3, minio, disk, and memory for now.",
other
)
}
Expand Down
4 changes: 2 additions & 2 deletions src/risedevtool/src/risectl_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::process::Command;

use anyhow::Result;

use crate::{add_storage_backend, ServiceConfig};
use crate::{add_storage_backend, HummockInMemoryStrategy, ServiceConfig};

pub fn compute_risectl_env(services: &HashMap<String, ServiceConfig>) -> Result<String> {
// Pick one of the compute node and generate risectl config
Expand All @@ -28,7 +28,7 @@ pub fn compute_risectl_env(services: &HashMap<String, ServiceConfig>) -> Result<
"risectl",
c.provide_minio.as_ref().unwrap(),
c.provide_aws_s3.as_ref().unwrap(),
false,
HummockInMemoryStrategy::Disallowed,
&mut cmd,
)?;
let meta_node = &c.provide_meta_node.as_ref().unwrap()[0];
Expand Down
13 changes: 11 additions & 2 deletions src/risedevtool/src/task/compactor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use std::process::Command;
use anyhow::Result;

use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
use crate::{add_meta_node, add_storage_backend, CompactorConfig, ExecuteContext, Task};
use crate::{
add_meta_node, add_storage_backend, CompactorConfig, ExecuteContext, HummockInMemoryStrategy,
Task,
};

pub struct CompactorService {
config: CompactorConfig,
Expand Down Expand Up @@ -55,7 +58,13 @@ impl CompactorService {

let provide_minio = config.provide_minio.as_ref().unwrap();
let provide_aws_s3 = config.provide_aws_s3.as_ref().unwrap();
add_storage_backend(&config.id, provide_minio, provide_aws_s3, false, cmd)?;
add_storage_backend(
&config.id,
provide_minio,
provide_aws_s3,
HummockInMemoryStrategy::Shared,
cmd,
)?;

let provide_meta_node = config.provide_meta_node.as_ref().unwrap();
add_meta_node(provide_meta_node, cmd)?;
Expand Down
11 changes: 8 additions & 3 deletions src/risedevtool/src/task/compute_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use anyhow::{anyhow, Result};

use super::{ExecuteContext, Task};
use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
use crate::{add_meta_node, add_storage_backend, ComputeNodeConfig};
use crate::{add_meta_node, add_storage_backend, ComputeNodeConfig, HummockInMemoryStrategy};

pub struct ComputeNodeService {
config: ComputeNodeConfig,
Expand Down Expand Up @@ -73,8 +73,13 @@ impl ComputeNodeService {
let provide_aws_s3 = config.provide_aws_s3.as_ref().unwrap();
let provide_compute_node = config.provide_compute_node.as_ref().unwrap();

let is_shared_backend =
add_storage_backend(&config.id, provide_minio, provide_aws_s3, true, cmd)?;
let is_shared_backend = add_storage_backend(
&config.id,
provide_minio,
provide_aws_s3,
HummockInMemoryStrategy::Shared,
cmd,
)?;
if provide_compute_node.len() > 1 && !is_shared_backend {
return Err(anyhow!(
"should use a shared backend (e.g. MinIO) for multiple compute-node configuration. Consider adding `use: minio` in risedev config."
Expand Down
29 changes: 22 additions & 7 deletions src/risedevtool/src/task/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,38 @@ pub fn add_meta_node(provide_meta_node: &[MetaNodeConfig], cmd: &mut Command) ->
Ok(())
}

/// Strategy for whether to enable in-memory hummock if no minio and s3 is provided.
pub enum HummockInMemoryStrategy {
/// Enable isolated in-memory hummock.
Isolated,
/// Enable in-memory hummock shared in the process. Used by risedev playground.
Shared,
/// Disallow in-memory hummock. Always requires minio or s3.
Disallowed,
}

/// Add a storage backend to the parameters. Returns whether this is a shared backend.
pub fn add_storage_backend(
id: &str,
provide_minio: &[MinioConfig],
provide_aws_s3: &[AwsS3Config],
allow_hummock_in_memory: bool,
hummock_in_memory_strategy: HummockInMemoryStrategy,
cmd: &mut Command,
) -> Result<bool> {
let is_shared_backend = match (provide_minio, provide_aws_s3) {
([], []) => {
if allow_hummock_in_memory {
cmd.arg("--state-store").arg("hummock+memory");
false
} else {
return Err(anyhow!(
match hummock_in_memory_strategy {
HummockInMemoryStrategy::Isolated => {
cmd.arg("--state-store").arg("hummock+memory");
false
}
HummockInMemoryStrategy::Shared => {
cmd.arg("--state-store").arg("hummock+memory-shared");
true
},
HummockInMemoryStrategy::Disallowed => return Err(anyhow!(
"{} is not compatible with in-memory state backend. Need to enable either minio or aws-s3.", id
));
)),
}
}
([minio], []) => {
Expand Down