Skip to content

Commit

Permalink
Passing
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Apr 26, 2024
1 parent b0f57f9 commit 1295539
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 27 deletions.
4 changes: 2 additions & 2 deletions rust/worker/src/blockstore/arrow/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ impl BlockManager {
Ok(_) => {
println!("Block written to storage")
}
Err(_) => {
println!("Error writing block to storage");
Err(e) => {
println!("Error writing block to storage {}", e);
}
}
// TODO: error handling
Expand Down
58 changes: 52 additions & 6 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ impl Handler<Memberlist> for CompactionManager {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::*;
use crate::assignment::assignment_policy::AssignmentPolicy;
use crate::assignment::assignment_policy::RendezvousHashingAssignmentPolicy;
Expand All @@ -299,9 +301,11 @@ mod tests {
use crate::types::LogRecord;
use crate::types::Operation;
use crate::types::OperationRecord;
use crate::types::Segment;

#[tokio::test]
async fn test_compaction_manager() {
println!("Running test_compaction_manager");
let mut log = Box::new(InMemoryLog::new());
let tmpdir = tempfile::tempdir().unwrap();
let storage = Box::new(Storage::Local(LocalStorage::new(
Expand All @@ -313,13 +317,13 @@ mod tests {
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 1,
log_offset: 0,
log_ts: 1,
record: LogRecord {
log_offset: 1,
log_offset: 0,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
embedding: Some(vec![1.0, 2.0, 3.0]),
encoding: None,
metadata: None,
operation: Operation::Add,
Expand All @@ -333,13 +337,13 @@ mod tests {
collection_uuid_2.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_2.clone(),
log_offset: 2,
log_offset: 0,
log_ts: 2,
record: LogRecord {
log_offset: 2,
log_offset: 0,
record: OperationRecord {
id: "embedding_id_2".to_string(),
embedding: None,
embedding: Some(vec![4.0, 5.0, 6.0]),
encoding: None,
metadata: None,
operation: Operation::Add,
Expand Down Expand Up @@ -376,6 +380,47 @@ mod tests {
sysdb.add_collection(collection_1);
sysdb.add_collection(collection_2);

let collection_1_record_segment = Segment {
id: Uuid::new_v4(),
r#type: crate::types::SegmentType::Record,
scope: crate::types::SegmentScope::RECORD,
collection: Some(collection_uuid_1),
metadata: None,
file_path: HashMap::new(),
};

let collection_2_record_segment = Segment {
id: Uuid::new_v4(),
r#type: crate::types::SegmentType::Record,
scope: crate::types::SegmentScope::RECORD,
collection: Some(collection_uuid_2),
metadata: None,
file_path: HashMap::new(),
};

let collection_1_hnsw_segment = Segment {
id: Uuid::new_v4(),
r#type: crate::types::SegmentType::HnswDistributed,
scope: crate::types::SegmentScope::VECTOR,
collection: Some(collection_uuid_1),
metadata: None,
file_path: HashMap::new(),
};

let collection_2_hnsw_segment = Segment {
id: Uuid::new_v4(),
r#type: crate::types::SegmentType::HnswDistributed,
scope: crate::types::SegmentScope::VECTOR,
collection: Some(collection_uuid_2),
metadata: None,
file_path: HashMap::new(),
};

sysdb.add_segment(collection_1_record_segment);
sysdb.add_segment(collection_2_record_segment);
sysdb.add_segment(collection_1_hnsw_segment);
sysdb.add_segment(collection_2_hnsw_segment);

let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);
let last_compaction_time_2 = 1;
Expand Down Expand Up @@ -418,6 +463,7 @@ mod tests {
let dispatcher_handle = system.start_component(dispatcher);
manager.set_dispatcher(dispatcher_handle.receiver());
manager.set_system(system);
println!("Starting compaction manager");
let (num_completed, number_failed) = manager.compact_batch().await;
assert_eq!(num_completed, 2);
assert_eq!(number_failed, 0);
Expand Down
8 changes: 4 additions & 4 deletions rust/worker/src/compactor/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ mod tests {
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 1,
log_offset: 0,
log_ts: 1,
record: LogRecord {
log_offset: 1,
log_offset: 0,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
Expand All @@ -214,10 +214,10 @@ mod tests {
collection_uuid_2.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_2.clone(),
log_offset: 2,
log_offset: 0,
log_ts: 2,
record: LogRecord {
log_offset: 2,
log_offset: 0,
record: OperationRecord {
id: "embedding_id_2".to_string(),
embedding: None,
Expand Down
8 changes: 4 additions & 4 deletions rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ mod tests {
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 1,
log_offset: 0,
log_ts: 1,
record: LogRecord {
log_offset: 1,
log_offset: 0,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
Expand All @@ -181,10 +181,10 @@ mod tests {
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 2,
log_offset: 1,
log_ts: 2,
record: LogRecord {
log_offset: 2,
log_offset: 1,
record: OperationRecord {
id: "embedding_id_2".to_string(),
embedding: None,
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ impl Handler<PullLogsResult> for CompactOrchestrator {
return;
}
};
println!("Pulled Records: {:?}", records.len());
let final_record_pulled = records.get(records.len() - 1);
match final_record_pulled {
Some(record) => {
Expand Down
19 changes: 14 additions & 5 deletions rust/worker/src/log/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,23 +292,32 @@ impl Debug for InternalLogRecord {
// This is used for testing only
#[derive(Clone, Debug)]
pub(crate) struct InMemoryLog {
logs: HashMap<String, Vec<Box<InternalLogRecord>>>,
collection_to_log: HashMap<String, Vec<Box<InternalLogRecord>>>,
offsets: HashMap<String, i64>,
}

impl InMemoryLog {
pub fn new() -> InMemoryLog {
InMemoryLog {
logs: HashMap::new(),
collection_to_log: HashMap::new(),
offsets: HashMap::new(),
}
}

pub fn add_log(&mut self, collection_id: Uuid, log: Box<InternalLogRecord>) {
let logs = self
.logs
.collection_to_log
.entry(collection_id.to_string())
.or_insert(Vec::new());
// Ensure that the log offset is correct. Since we only use the InMemoryLog for testing,
// we expect callers to send us logs in the correct order.
let next_offset = logs.len() as i64;
if log.log_offset != next_offset {
panic!(
"Expected log offset to be {}, but got {}",
next_offset, log.log_offset
);
}
logs.push(log);
}
}
Expand All @@ -327,7 +336,7 @@ impl Log for InMemoryLog {
None => i64::MAX,
};

let logs = match self.logs.get(&collection_id.to_string()) {
let logs = match self.collection_to_log.get(&collection_id.to_string()) {
Some(logs) => logs,
None => return Ok(Vec::new()),
};
Expand All @@ -344,7 +353,7 @@ impl Log for InMemoryLog {
&mut self,
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError> {
let mut collections = Vec::new();
for (collection_id, log_records) in self.logs.iter() {
for (collection_id, log_records) in self.collection_to_log.iter() {
if log_records.is_empty() {
continue;
}
Expand Down
17 changes: 13 additions & 4 deletions rust/worker/src/segment/record_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,23 @@ impl LogMaterializer for RecordSegmentWriter {
let next_offset_id = self
.curr_max_offset_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let metadata = log_entry.record.metadata.as_ref().unwrap();
// TODO: don't unwrap
let metadata = update_metdata_to_metdata(metadata).unwrap();
let metadata = match &log_entry.record.metadata {
Some(metadata) => match update_metdata_to_metdata(&metadata) {
Ok(metadata) => Some(metadata),
Err(e) => {
// TODO: this should error out and return an error
panic!("Error converting metadata: {}", e);
}
},
None => None,
};
let data_record = DataRecord {
id: &log_entry.record.id,
// TODO: don't unwrap here, it should never happen as Adds always have embeddings
// but we should handle this gracefully
embedding: log_entry.record.embedding.as_ref().unwrap(),
document: None, // TODO: document
metadata: Some(metadata),
metadata: metadata,
};
let materialized =
MaterializedLogRecord::new(next_offset_id, log_entry, data_record);
Expand Down
7 changes: 5 additions & 2 deletions rust/worker/src/storage/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(crate) struct LocalStorage {

impl LocalStorage {
pub(crate) fn new(root: &str) -> LocalStorage {
// Create the local storage with the root path.
return LocalStorage {
root: root.to_string(),
};
Expand All @@ -20,7 +21,6 @@ impl LocalStorage {
&self,
key: &str,
) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, String> {
// Checks if a file exits with the key. If it does, it copies the file to the path.
let file_path = format!("{}/{}", self.root, key);
let file = tokio::fs::File::open(file_path).await;
match file {
Expand All @@ -35,10 +35,13 @@ impl LocalStorage {

pub(crate) async fn put_bytes(&self, key: &str, bytes: &[u8]) -> Result<(), String> {
let path = format!("{}/{}", self.root, key);
// Create the path if it doesn't exist, we unwrap since this should only be used in tests
let as_path = std::path::Path::new(&path);
let parent = as_path.parent().unwrap();
tokio::fs::create_dir_all(parent).await.unwrap();
let res = tokio::fs::write(&path, bytes).await;
match res {
Ok(_) => {
println!("copied file {} to {}", path, key);
return Ok(());
}
Err(e) => {
Expand Down

0 comments on commit 1295539

Please sign in to comment.