Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace proto sstable meta with self managed struct #1447

Merged
merged 6 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,6 @@ message SstableInfo {
KeyRange key_range = 2;
}

message BlockMeta {
bytes smallest_key = 1;
uint32 offset = 2;
uint32 len = 3;
}

message SstableMeta {
// Offsets of different blocks
repeated BlockMeta block_metas = 1;
bytes bloom_filter = 2;
// Estimated size, only used on encryption or compression
uint32 estimated_size = 3;
uint32 key_count = 5;
// Serve as cheap index.
// Since table is ordered, they are exactly the first key and last key.
// The keys are appended by timestamp.
bytes smallest_key = 6;
bytes largest_key = 7;
}

message Checksum {
enum Algorithm {
CRC32C = 0;
XX_HASH64 = 1;
}
// For storing type of Checksum algorithm used
Algorithm algo = 1;
uint64 sum = 2;
}

enum LevelType {
NONOVERLAPPING = 0;
OVERLAPPING = 1;
Expand Down
6 changes: 1 addition & 5 deletions rust/bench/ss_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod utils;

use clap::Parser;
use operations::*;
use risingwave_common::config::{parse_checksum_algo, StorageConfig};
use risingwave_common::config::StorageConfig;
use risingwave_storage::hummock::mock::{MockHummockMetaClient, MockHummockMetaService};
use risingwave_storage::monitor::StateStoreMetrics;
use risingwave_storage::{dispatch_state_store, StateStoreImpl};
Expand All @@ -42,9 +42,6 @@ pub(crate) struct Opts {
#[clap(long, default_value_t = 0.1)]
bloom_false_positive: f64,

#[clap(long, default_value = "crc32c")]
checksum_algo: String,

// ----- benchmarks -----
#[clap(long)]
benchmarks: String,
Expand Down Expand Up @@ -123,7 +120,6 @@ async fn main() {

let config = Arc::new(StorageConfig {
bloom_false_positive: opts.bloom_false_positive,
checksum_algo: parse_checksum_algo(&opts.checksum_algo).unwrap(),
sstable_size: opts.table_size_mb * (1 << 20),
block_size: opts.block_size_kb * (1 << 10),
data_directory: "hummock_001".to_string(),
Expand Down
41 changes: 1 addition & 40 deletions rust/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
use std::fs;
use std::path::PathBuf;

use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use serde::{Deserialize, Serialize};

use crate::error::ErrorCode::InternalError;
use crate::error::{Result, RwError};
Expand Down Expand Up @@ -104,13 +103,6 @@ pub struct StorageConfig {
#[serde(default = "default::data_directory")]
pub data_directory: String,

/// Checksum algorithm (Crc32c, XxHash64).
#[serde(
default = "default::checksum_algorithm",
deserialize_with = "ser_parse_checksum_algo"
)]
pub checksum_algo: risingwave_pb::hummock::checksum::Algorithm,

/// Whether to enable async checkpoint
#[serde(default = "default::async_checkpoint_enabled")]
pub async_checkpoint_enabled: bool,
Expand All @@ -120,32 +112,6 @@ pub struct StorageConfig {
pub write_conflict_detection_enabled: bool,
}

fn ser_parse_checksum_algo<'de, D>(
deserializer: D,
) -> core::result::Result<risingwave_pb::hummock::checksum::Algorithm, D::Error>
where
D: Deserializer<'de>,
{
let checksum_algo = String::deserialize(deserializer)?;
match parse_checksum_algo(checksum_algo.as_str()) {
Some(algo) => Ok(algo),
_ => Err(D::Error::custom(format!(
"{} is not supported for Hummock",
checksum_algo
))),
}
}

pub fn parse_checksum_algo(
checksum_algo: &str,
) -> Option<risingwave_pb::hummock::checksum::Algorithm> {
match checksum_algo {
"crc32c" => Some(risingwave_pb::hummock::checksum::Algorithm::Crc32c),
"xxhash64" => Some(risingwave_pb::hummock::checksum::Algorithm::XxHash64),
_ => None,
}
}

impl Default for StorageConfig {
fn default() -> Self {
toml::from_str("").unwrap()
Expand Down Expand Up @@ -208,10 +174,6 @@ mod default {
"hummock_001".to_string()
}

pub fn checksum_algorithm() -> risingwave_pb::hummock::checksum::Algorithm {
risingwave_pb::hummock::checksum::Algorithm::XxHash64
}

pub fn async_checkpoint_enabled() -> bool {
true
}
Expand Down Expand Up @@ -258,7 +220,6 @@ mod tests {
default::bloom_false_positive()
);
assert_eq!(cfg.storage.data_directory, "test");
assert_eq!(cfg.storage.checksum_algo, default::checksum_algorithm());
assert!(!cfg.storage.async_checkpoint_enabled);
}
}
1 change: 0 additions & 1 deletion rust/config/risingwave.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ sstable_size = 268435456
block_size = 4096
bloom_false_positive = 0.1
data_directory = "hummock_001"
checksum_algo = "crc32c"
async_checkpoint_enabled = true
1 change: 0 additions & 1 deletion rust/meta/src/hummock/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ async fn get_hummock_storage() -> (HummockStorage, Arc<HummockManager<MemStore>>
block_size: 1 << 10,
bloom_false_positive: 0.1,
data_directory: remote_dir.clone(),
checksum_algo: ChecksumAlg::XxHash64,
async_checkpoint_enabled: true,
write_conflict_detection_enabled: true,
});
Expand Down
4 changes: 2 additions & 2 deletions rust/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use std::time::Duration;
use bytes::Bytes;
use itertools::Itertools;
use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
use risingwave_pb::hummock::{HummockVersion, KeyRange, SstableInfo, SstableMeta};
use risingwave_pb::hummock::{HummockVersion, KeyRange, SstableInfo};
use risingwave_storage::hummock::key::key_with_epoch;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
CompressionAlgorithm, HummockContextId, HummockEpoch, HummockSSTableId, SSTableBuilder,
SSTableBuilderOptions,
SSTableBuilderOptions, SstableMeta,
};

use crate::cluster::{ClusterManager, ClusterManagerRef};
Expand Down
4 changes: 1 addition & 3 deletions rust/storage/src/hummock/block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ impl BlockCache {
.await
{
Ok(block) => Ok(block),
Err(arc_e) => {
Err(Arc::try_unwrap(arc_e).map_err(|e| HummockError::Other(e.to_string()))?)
}
Err(e) => Err(HummockError::Other(e.to_string()).into()),
}
}

Expand Down
4 changes: 2 additions & 2 deletions rust/storage/src/hummock/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ impl Compactor {
.extend(sst.iter().map(|sst| SstableInfo {
id: sst.id,
key_range: Some(risingwave_pb::hummock::KeyRange {
left: sst.meta.get_smallest_key().to_vec(),
right: sst.meta.get_largest_key().to_vec(),
left: sst.meta.smallest_key.clone(),
right: sst.meta.largest_key.clone(),
inf: false,
}),
}));
Expand Down
4 changes: 4 additions & 0 deletions rust/storage/src/hummock/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use thiserror::Error;

#[derive(Error, Debug)]
pub enum HummockError {
#[error("Magic number mismatch: expected {expected}, found: {found}.")]
MagicMismatch { expected: u32, found: u32 },
#[error("Invalid format version: {0}.")]
InvalidFormatVersion(u32),
#[error("Checksum mismatch: expected {expected}, found: {found}.")]
ChecksumMismatch { expected: u64, found: u64 },
#[error("Invalid block.")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ impl SharedBufferUploader {
.map(|sst| SstableInfo {
id: sst.id,
key_range: Some(risingwave_pb::hummock::KeyRange {
left: sst.meta.get_smallest_key().to_vec(),
right: sst.meta.get_largest_key().to_vec(),
left: sst.meta.smallest_key.clone(),
right: sst.meta.largest_key.clone(),
inf: false,
}),
})
Expand Down
6 changes: 3 additions & 3 deletions rust/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
// limitations under the License.

use bytes::{BufMut, Bytes, BytesMut};
use risingwave_pb::hummock::{BlockMeta, SstableMeta};

use super::bloom::Bloom;
use super::utils::CompressionAlgorithm;
use super::{
BlockBuilder, BlockBuilderOptions, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
DEFAULT_RESTART_INTERVAL,
BlockBuilder, BlockBuilderOptions, BlockMeta, SstableMeta, DEFAULT_BLOCK_SIZE,
DEFAULT_ENTRY_SIZE, DEFAULT_RESTART_INTERVAL, VERSION,
};
use crate::hummock::key::user_key;
use crate::hummock::value::HummockValue;
Expand Down Expand Up @@ -155,6 +154,7 @@ impl SSTableBuilder {
key_count: self.key_count as u32,
smallest_key,
largest_key,
version: VERSION,
};

(self.buf.freeze(), meta)
Expand Down
Loading