diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c872489b2fe..e2bf17ad68a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,4 @@ -exclude: 'chromadb/proto/(chroma_pb2|coordinator_pb2|logservice_pb2)\.(py|pyi|_grpc\.py)' # Generated files +exclude: 'chromadb/proto/(chroma_pb2|coordinator_pb2|logservice_pb2|chroma_pb2_grpc|coordinator_pb2_grpc|logservice_pb2_grpc)\.(py|pyi)' # Generated files repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.5.0 diff --git a/chromadb/db/impl/grpc/client.py b/chromadb/db/impl/grpc/client.py index 5bf98d7c619..88693921346 100644 --- a/chromadb/db/impl/grpc/client.py +++ b/chromadb/db/impl/grpc/client.py @@ -153,10 +153,7 @@ def get_segments( collection=collection.hex if collection else None, ) response = self._sys_db_stub.GetSegments(request) - print("GRPC RESPONSE: ", response) - print("SEGMENTS LENGTH: ", len(response.segments)) results: List[Segment] = [] - print("GRPC SEGMENTS: ", response.segments) for proto_segment in response.segments: segment = from_proto_segment(proto_segment) results.append(segment) diff --git a/k8s/distributed-chroma/values.yaml b/k8s/distributed-chroma/values.yaml index 8c4002c610a..c5f1f0d2023 100644 --- a/k8s/distributed-chroma/values.yaml +++ b/k8s/distributed-chroma/values.yaml @@ -24,7 +24,7 @@ frontendService: authCredentialsProvider: 'value: ""' authzProvider: 'value: ""' authzConfigProvider: 'value: ""' - memberlistProviderImpl: 'value: "chromadb.segment.impl.distributed.segment_directory.CustomResourceMemberlistProvider"' + memberlistProviderImpl: 'value: "chromadb.segment.impl.distributed.segment_directory.MockMemberlistProvider"' logServiceHost: 'value: "logservice.chroma"' logServicePort: 'value: "50051"' otherEnvConfig: '' diff --git a/rust/worker/chroma_config.yaml b/rust/worker/chroma_config.yaml index 91c5291d640..95eb1924932 100644 --- a/rust/worker/chroma_config.yaml +++ b/rust/worker/chroma_config.yaml @@ -61,4 +61,4 @@ compaction_service: compactor: compaction_manager_queue_size: 1000 max_concurrent_jobs: 100 - compaction_interval_sec: 5 + compaction_interval_sec: 60 diff --git a/rust/worker/src/blockstore/mod.rs b/rust/worker/src/blockstore/mod.rs index 4439ed08148..6a9dacd4565 100644 --- a/rust/worker/src/blockstore/mod.rs +++ b/rust/worker/src/blockstore/mod.rs @@ -2,7 +2,6 @@ mod arrow; mod key; mod memory; mod positional_posting_list_value; -mod prototyping; mod types; pub(crate) mod provider; diff --git a/rust/worker/src/blockstore/prototyping.rs b/rust/worker/src/blockstore/prototyping.rs deleted file mode 100644 index 89521c4d021..00000000000 --- a/rust/worker/src/blockstore/prototyping.rs +++ /dev/null @@ -1,368 +0,0 @@ -use parking_lot::RwLock; -use std::collections::HashMap; -use std::sync::Arc; - -// ===== Value Types ===== -// Example of a value type that references other data. (This would be data in the block) -struct NestedReferences<'value> { - id: &'value [i32], - value: &'value [i32], -} - -trait Value { - type Writeable<'writeable>: WriteableValue; - type Readable<'referred_data>: ReadableValue<'referred_data>; -} - -// Used for dynamic dispatch to get the type of the value. -// referred_data is the lifetime of the data that the value references (if any). -trait ReadableValue<'referred_data> { - fn read_from_block(block: &'referred_data Block) -> Self; -} - -trait WriteableValue { - fn write_to_block(key: &str, value: &Self, block: &BlockBuilder); -} - -// ===== Value Implementations ===== -impl<'referred_data> ReadableValue<'referred_data> for NestedReferences<'referred_data> { - fn read_from_block(block: &'referred_data Block) -> Self { - let id = block.id_storage.get("key").unwrap(); - let value = block.value_storage.get("key").unwrap(); - NestedReferences { id, value } - } -} - -impl<'referred_data> ReadableValue<'referred_data> for &'referred_data String { - fn read_from_block(block: &'referred_data Block) -> Self { - block.string_storage.get("key").unwrap() - } -} - -impl WriteableValue for NestedReferences<'_> { - fn write_to_block(key: &str, value: &Self, block: &BlockBuilder) { - block - .id_storage - .write() - .as_mut() - .unwrap() - .insert(key.to_string(), value.id.to_vec()); - block - .value_storage - .write() - .as_mut() - .unwrap() - .insert(key.to_string(), value.value.to_vec()); - } -} - -impl WriteableValue for String { - fn write_to_block(key: &str, value: &Self, block: &BlockBuilder) { - block - .string_storage - .write() - .as_mut() - .unwrap() - .insert(key.to_string(), value.clone()); - } -} - -// ===== Block Provider & Cache ===== -// Thread-safe block cache and manager. -// This just loads blocks from the cache, but you could imagine it faults to s3 -#[derive(Clone)] -struct BlockManager { - read_block_cache: Arc>>, - write_block_builder_cache: Arc>>, -} - -impl BlockManager { - fn new() -> Self { - Self { - read_block_cache: Arc::new(RwLock::new(HashMap::new())), - write_block_builder_cache: Arc::new(RwLock::new(HashMap::new())), - } - } - - fn get(&self, id: &uuid::Uuid) -> Option { - let cache_guard = self.read_block_cache.read(); - let block = cache_guard.get(id)?.clone(); - Some(block) - } - - fn create(&mut self) -> BlockBuilder { - let builder = BlockBuilder::new(uuid::Uuid::new_v4()); - self.write_block_builder_cache - .write() - .insert(builder.id, builder.clone()); - builder - } - - fn build(&mut self, id: uuid::Uuid) -> Block { - let builder = self.write_block_builder_cache.write().remove(&id).unwrap(); - let block = builder.build(); - self.read_block_cache - .write() - .insert(block.id, block.clone()); - block - } -} - -// ===== Sparse Index ===== -#[derive(Clone)] -struct SparseIndex { - storage: Vec, -} - -impl SparseIndex { - fn new() -> Self { - Self { - storage: Vec::new(), - } - } -} - -// ===== Reader & Writer ===== -// Reader is a non-thread-safe reader that reads from the block store. -// Each thread can create its own reader, the blockmanager will handle the thread-safety -// of the block cache. -struct Reader { - sparse_index: SparseIndex, - manager: BlockManager, - // Used to keep a reference to the block, since we need to keep the block alive while we return values that reference it. - loaded_blocks: HashMap, - marker: std::marker::PhantomData, -} - -trait ReaderTrait<'me, V> { - fn get(&'me mut self, key: &str) -> Option; -} - -impl<'me, V: ReadableValue<'me>> ReaderTrait<'me, V> for Reader { - fn get(&'me mut self, key: &str) -> Option { - self.get(key) - } -} - -enum ReaderEnum { - Reader(Reader), -} - -impl<'me, V: ReadableValue<'me>> ReaderEnum { - fn get(&'me mut self, key: &str) -> Option { - match self { - ReaderEnum::Reader(reader) => reader.get(key), - } - } -} - -impl<'me, V> Reader -where - V: ReadableValue<'me>, -{ - fn from_sparse_index(sparse_index: SparseIndex, manager: BlockManager) -> Self { - Self { - sparse_index: sparse_index, - manager, - loaded_blocks: HashMap::new(), - marker: std::marker::PhantomData, - } - } - - fn get(&'me mut self, key: &str) -> Option { - // Iterate over the blocks in reverse order to get the most recent value. - // Since this implementation does not delete values from old blocks, the first value found is the most recent. - for block_id in self.sparse_index.storage.iter().rev() { - if !self.loaded_blocks.contains_key(block_id) { - let block = self.manager.get(block_id)?; - self.loaded_blocks.insert(*block_id, block); - } - } - // This double for loop is to make the borrow-checker happy by scoping the mutable borrow of self. - // It's not pretty, but it works. - for block_id in self.sparse_index.storage.iter().rev() { - let block = self.loaded_blocks.get(block_id).unwrap(); - let value = V::read_from_block(block); - return Some(value); - } - None - } -} - -struct Writer { - sparse_index: SparseIndex, - active_block: Option, - block_manager: BlockManager, -} - -impl Writer { - fn new(mut block_manager: BlockManager) -> Self { - let sparse_index = SparseIndex::new(); - let new_block = block_manager.create(); - Self { - sparse_index: sparse_index, - active_block: Some(new_block), - block_manager: block_manager, - } - } - - fn from_sparse_index(sparse_index: SparseIndex, mut block_manager: BlockManager) -> Self { - let new_block = block_manager.create(); - Self { - sparse_index: sparse_index, - active_block: Some(new_block), - block_manager: block_manager, - } - } - - fn store(&self, key: String, value: &V) { - V::write_to_block(&key, value, self.active_block.as_ref().unwrap()); - } - - fn commit(mut self) -> SparseIndex { - // Concretize the active block - let read_only_block = self - .block_manager - .build(self.active_block.as_ref().unwrap().id); - self.sparse_index.storage.push(read_only_block.id); - self.sparse_index - - // TODO: remove the write block when the writer is dropped. This will prevent leaking builders. - } -} - -struct HoldsWriter { - writer: Writer, -} -// ===== Block Implementations ===== -#[derive(Clone)] -struct Block { - // This pretends to be a RecordBatch in Arrow. - // Storage for NestedReferences - id_storage: Arc>>, - value_storage: Arc>>, - // Storage for String - string_storage: Arc>, - // Storage for the block id. - id: uuid::Uuid, -} - -// Mock BlockBuilder - it pretends like its arrow by allowing it to store ANY value type. -#[derive(Clone)] -struct BlockBuilder { - // Storage for NestedReferences - id_storage: Arc>>>>, - value_storage: Arc>>>>, - // Storage for String - string_storage: Arc>>>, - // Storage for the block id. - id: uuid::Uuid, -} - -impl BlockBuilder { - fn new(id: uuid::Uuid) -> Self { - Self { - id_storage: Arc::new(RwLock::new(Some(HashMap::new()))), - value_storage: Arc::new(RwLock::new(Some(HashMap::new()))), - string_storage: Arc::new(RwLock::new(Some(HashMap::new()))), - id, - } - } - - fn build(self) -> Block { - Block { - id_storage: self.id_storage.write().take().unwrap().into(), - value_storage: self.value_storage.write().take().unwrap().into(), - string_storage: self.string_storage.write().take().unwrap().into(), - id: self.id, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_self_reference_map() { - let block_manager = BlockManager::new(); - let writer = Writer::new(block_manager.clone()); - let nested_references = NestedReferences { - id: &[1, 2, 3], - value: &[4, 5, 6], - }; - writer.store("key".to_string(), &nested_references); - let writer_sparse_index = writer.commit(); - - let mut reader = - Reader::::from_sparse_index(writer_sparse_index, block_manager); - - let value = reader.get("key").unwrap(); - assert_eq!(value.id, &[1, 2, 3]); - assert_eq!(value.value, &[4, 5, 6]); - } - - #[test] - fn test_string_map() { - let block_manager = BlockManager::new(); - let writer = Writer::new(block_manager.clone()); - let string = "value".to_string(); - writer.store("key".to_string(), &string); - let writer_sparse_index = writer.commit(); - - let mut reader = Reader::<&String>::from_sparse_index(writer_sparse_index, block_manager); - - let value = reader.get("key").unwrap(); - assert_eq!(value, "value"); - } - - #[test] - fn test_non_static_self_reference_map() { - let mut id_data_src = Vec::new(); - let mut value_data_src = Vec::new(); - - for i in 0..10 { - id_data_src.push(i); - value_data_src.push(i + 10); - } - let id_data_src = id_data_src.as_slice(); - let value_data_src = value_data_src.as_slice(); - - fn combine<'value>(id: &'value [i32], value: &'value [i32]) -> NestedReferences<'value> { - NestedReferences { id, value } - } - let combined = combine(id_data_src, value_data_src); - - let block_manager = BlockManager::new(); - let writer = Writer::new(block_manager.clone()); - writer.store("key".to_string(), &combined); - let writer_sparse_index = writer.commit(); - - let mut reader = - Reader::::from_sparse_index(writer_sparse_index, block_manager); - - let value = reader.get("key").unwrap(); - assert_eq!(value.id, id_data_src); - assert_eq!(value.value, value_data_src); - } - - #[test] - fn test_reader_trait() { - let block_manager = BlockManager::new(); - let writer = Writer::new(block_manager.clone()); - let string = "value".to_string(); - writer.store("key".to_string(), &string); - let writer_sparse_index = writer.commit(); - - // let mut reader = Reader::<&String>::from_sparse_index(writer_sparse_index, block_manager); - // let mut reader: Box> = Box::new(reader); - let mut reader = ReaderEnum::Reader(Reader::<&String>::from_sparse_index( - writer_sparse_index, - block_manager, - )); - - // RESUME: This is where the issue is. The compiler can't figure out the lifetime of the reader. - let value = reader.get("key").unwrap(); - assert_eq!(value, "value"); - } -} diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index 63681c162b9..cd2b842c7ab 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -305,7 +305,6 @@ mod tests { #[tokio::test] async fn test_compaction_manager() { - println!("Running test_compaction_manager"); let mut log = Box::new(InMemoryLog::new()); let tmpdir = tempfile::tempdir().unwrap(); let storage = Box::new(Storage::Local(LocalStorage::new( @@ -463,7 +462,6 @@ mod tests { let dispatcher_handle = system.start_component(dispatcher); manager.set_dispatcher(dispatcher_handle.receiver()); manager.set_system(system); - println!("Starting compaction manager"); let (num_completed, number_failed) = manager.compact_batch().await; assert_eq!(num_completed, 2); assert_eq!(number_failed, 0); diff --git a/rust/worker/src/compactor/scheduler.rs b/rust/worker/src/compactor/scheduler.rs index ed0a7350a86..6fa1bd95308 100644 --- a/rust/worker/src/compactor/scheduler.rs +++ b/rust/worker/src/compactor/scheduler.rs @@ -147,7 +147,7 @@ impl Scheduler { } pub(crate) async fn schedule(&mut self) { - // TODO: I ADDED THIS CLEAR, ASK LIQUAN IF IT'S OK + // For now, we clear the job queue every time, assuming we will not have any pending jobs running self.job_queue.clear(); if self.memberlist.is_none() || self.memberlist.as_ref().unwrap().is_empty() { // TODO: Log error diff --git a/rust/worker/src/compactor/scheduler_policy.rs b/rust/worker/src/compactor/scheduler_policy.rs index 09b5e157a6d..8553a39cea2 100644 --- a/rust/worker/src/compactor/scheduler_policy.rs +++ b/rust/worker/src/compactor/scheduler_policy.rs @@ -34,7 +34,6 @@ impl SchedulerPolicy for LasCompactionTimeSchedulerPolicy { collections: Vec, number_jobs: i32, ) -> Vec { - println!("POLICY SEES N COLLECTIONS: {:?}", collections.len()); let mut collections = collections; collections.sort_by(|a, b| a.last_compaction_time.cmp(&b.last_compaction_time)); let number_tasks = if number_jobs > collections.len() as i32 { @@ -42,7 +41,6 @@ impl SchedulerPolicy for LasCompactionTimeSchedulerPolicy { } else { number_jobs }; - println!("POLICY CREATING N TASKS: {:?}", number_tasks); let mut tasks = Vec::new(); for collection in &collections[0..number_tasks as usize] { tasks.push(CompactionJob { diff --git a/rust/worker/test.arrow b/rust/worker/test.arrow deleted file mode 100644 index 3f01356b515..00000000000 Binary files a/rust/worker/test.arrow and /dev/null differ