Skip to content

Commit

Permalink
[CLN] Flush error propagation and debug (#2131)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Improve error propagation and debug for flush
 - New functionality
	 - None

## Test plan
*How are these changes tested?*
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
HammadB authored May 3, 2024
1 parent 9a27d16 commit eb5f3e2
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 87 deletions.
14 changes: 6 additions & 8 deletions rust/worker/src/blockstore/arrow/flusher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::errors::ChromaError;

use super::{
provider::{BlockManager, SparseIndexManager},
sparse_index::SparseIndex,
types::{ArrowWriteableKey, ArrowWriteableValue},
};
use std::collections::{HashMap, HashSet};
use crate::errors::ChromaError;
use std::collections::HashSet;
use uuid::Uuid;

pub(crate) struct ArrowBlockfileFlusher {
Expand Down Expand Up @@ -37,14 +36,13 @@ impl ArrowBlockfileFlusher {
pub(crate) async fn flush<K: ArrowWriteableKey, V: ArrowWriteableValue>(
self,
) -> Result<(), Box<dyn ChromaError>> {
// TODO: We could flush in parallel
for delta_id in self.modified_delta_ids {
self.block_manager.flush(&delta_id).await;
self.block_manager.flush(&delta_id).await?
}
// TODO: catch errors from the flush
let res = self
.sparse_index_manager
self.sparse_index_manager
.flush::<K>(&self.sparse_index.id)
.await;
.await?;
Ok(())
}

Expand Down
101 changes: 56 additions & 45 deletions rust/worker/src/blockstore/arrow/provider.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
use super::{
block::{self, delta::BlockDelta, Block},
blockfile::{self, ArrowBlockfileReader, ArrowBlockfileWriter},
sparse_index::{self, SparseIndex},
block::{delta::BlockDelta, Block},
blockfile::{ArrowBlockfileReader, ArrowBlockfileWriter},
sparse_index::SparseIndex,
types::{ArrowReadableKey, ArrowReadableValue, ArrowWriteableKey, ArrowWriteableValue},
};
use crate::{
blockstore::{
key::KeyWrapper,
memory::storage::Readable,
provider::{BlockfileProvider, CreateError, OpenError},
provider::{CreateError, OpenError},
BlockfileReader, BlockfileWriter, Key, Value,
},
errors::ChromaError,
errors::{ChromaError, ErrorCodes},
storage::Storage,
};
use core::panic;
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use std::{collections::HashMap, sync::Arc};
use tokio::{io::AsyncReadExt, pin};
use thiserror::Error;
use tokio::io::AsyncReadExt;
use uuid::Uuid;

/// A BlockFileProvider that creates ArrowBlockfiles (Arrow-backed blockfiles used for production).
Expand Down Expand Up @@ -192,35 +193,9 @@ impl BlockManager {
}
}
}

// match cache.get(id) {
// Some(block) => Some(block.clone()),
// None => {
// let key = format!("block/{}", id);
// let bytes = self.storage.get(&key).await;
// match bytes {
// Ok(mut bytes) => {
// let mut buf: Vec<u8> = Vec::new();
// bytes.read_to_end(&mut buf);
// let block = Block::from_bytes(&buf);
// match block {
// Ok(block) => {
// self.read_cache.write().insert(*id, block.clone());
// Some(block)
// }
// Err(_) => {
// // TODO: log error
// None
// }
// }
// }
// Err(_) => None,
// }
// }
// }
}

pub(super) async fn flush(&self, id: &Uuid) {
pub(super) async fn flush(&self, id: &Uuid) -> Result<(), Box<dyn ChromaError>> {
let block = self.get(id).await;

match block {
Expand All @@ -230,15 +205,32 @@ impl BlockManager {
let res = self.storage.put_bytes(&key, bytes).await;
match res {
Ok(_) => {
println!("Block written to storage")
println!("Block: {} written to storage", id);
Ok(())
}
Err(e) => {
println!("Error writing block to storage {}", e);
Err(Box::new(e))
}
}
// TODO: error handling
}
None => {}
None => {
return Err(Box::new(BlockFlushError::NotFound));
}
}
}
}

#[derive(Error, Debug)]
pub enum BlockFlushError {
#[error("Not found")]
NotFound,
}

impl ChromaError for BlockFlushError {
fn code(&self) -> ErrorCodes {
match self {
BlockFlushError::NotFound => ErrorCodes::NotFound,
}
}
}
Expand Down Expand Up @@ -332,7 +324,10 @@ impl SparseIndexManager {
self.cache.write().insert(index.id, index);
}

pub async fn flush<'read, K: ArrowWriteableKey + 'read>(&self, id: &Uuid) {
pub async fn flush<'read, K: ArrowWriteableKey + 'read>(
&self,
id: &Uuid,
) -> Result<(), Box<dyn ChromaError>> {
let index = self.get::<K::ReadableKey<'read>>(id).await;
match index {
Some(index) => {
Expand All @@ -344,22 +339,24 @@ impl SparseIndexManager {
let res = self.storage.put_bytes(&key, bytes).await;
match res {
Ok(_) => {
println!("Sparse index written to storage")
println!("Sparse index written to storage");
Ok(())
}
Err(_) => {
Err(e) => {
println!("Error writing sparse index to storage");
Err(Box::new(e))
}
}
}
Err(_) => {
// TODO: error
panic!("Failed to convert sparse index to block");
Err(e) => {
println!("Failed to convert sparse index to block");
Err(e)
}
}
}
None => {
// TODO: error
panic!("Tried to flush a sparse index that doesn't exist");
println!("Tried to flush a sparse index that doesn't exist");
return Err(Box::new(SparseIndexFlushError::NotFound));
}
}
}
Expand All @@ -377,3 +374,17 @@ impl SparseIndexManager {
forked
}
}

#[derive(Error, Debug)]
pub enum SparseIndexFlushError {
#[error("Not found")]
NotFound,
}

impl ChromaError for SparseIndexFlushError {
fn code(&self) -> ErrorCodes {
match self {
SparseIndexFlushError::NotFound => ErrorCodes::NotFound,
}
}
}
4 changes: 2 additions & 2 deletions rust/worker/src/index/hnsw_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ impl HnswIndexProvider {
println!("Flushed hnsw index file: {}", file);
}
Err(e) => {
// TODO: return err
panic!("Failed to flush index: {}", e);
println!("Failed to flush index: {}", e);
return Err(Box::new(e));
}
}
}
Expand Down
89 changes: 77 additions & 12 deletions rust/worker/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,106 @@
use self::config::StorageConfig;
use self::s3::S3GetError;
use crate::config::Configurable;
use crate::errors::ChromaError;
use async_trait::async_trait;
use bytes::Bytes;
use crate::errors::{ChromaError, ErrorCodes};
use tokio::io::AsyncBufRead;
pub(crate) mod config;
pub(crate) mod local;
pub(crate) mod s3;
use thiserror::Error;

#[derive(Clone)]
pub(crate) enum Storage {
S3(s3::S3Storage),
Local(local::LocalStorage),
}

#[derive(Error, Debug)]
pub enum GetError {
#[error("No such key: {0}")]
NoSuchKey(String),
#[error("S3 error: {0}")]
S3Error(#[from] S3GetError),
#[error("Local storage error: {0}")]
LocalError(String),
}

impl ChromaError for GetError {
fn code(&self) -> ErrorCodes {
match self {
GetError::NoSuchKey(_) => ErrorCodes::NotFound,
GetError::S3Error(_) => ErrorCodes::Internal,
GetError::LocalError(_) => ErrorCodes::Internal,
}
}
}

#[derive(Error, Debug)]
pub enum PutError {
#[error("S3 error: {0}")]
S3Error(#[from] s3::S3PutError),
#[error("Local storage error: {0}")]
LocalError(String),
}

impl ChromaError for PutError {
fn code(&self) -> ErrorCodes {
match self {
PutError::S3Error(_) => ErrorCodes::Internal,
PutError::LocalError(_) => ErrorCodes::Internal,
}
}
}

impl Storage {
pub(crate) async fn get(
&self,
key: &str,
) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, String> {
) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, GetError> {
match self {
Storage::S3(s3) => s3.get(key).await,
Storage::Local(local) => local.get(key).await,
Storage::S3(s3) => {
let res = s3.get(key).await;
match res {
Ok(res) => Ok(res),
Err(e) => match e {
S3GetError::NoSuchKey(_) => Err(GetError::NoSuchKey(key.to_string())),
_ => Err(GetError::S3Error(e)),
},
}
}
Storage::Local(local) => {
let res = local.get(key).await;
match res {
Ok(res) => Ok(res),
// TODO: Special case no such key if possible
Err(e) => Err(GetError::LocalError(e)),
}
}
}
}

pub(crate) async fn put_file(&self, key: &str, path: &str) -> Result<(), String> {
pub(crate) async fn put_file(&self, key: &str, path: &str) -> Result<(), PutError> {
match self {
Storage::S3(s3) => s3.put_file(key, path).await,
Storage::Local(local) => local.put_file(key, path).await,
Storage::S3(s3) => s3
.put_file(key, path)
.await
.map_err(|e| PutError::S3Error(e)),
Storage::Local(local) => local
.put_file(key, path)
.await
.map_err(|e| PutError::LocalError(e)),
}
}

pub(crate) async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), String> {
pub(crate) async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), PutError> {
match self {
Storage::S3(s3) => s3.put_bytes(key, bytes).await,
Storage::Local(local) => local.put_bytes(key, &bytes).await,
Storage::S3(s3) => s3
.put_bytes(key, bytes)
.await
.map_err(|e| PutError::S3Error(e)),
Storage::Local(local) => local
.put_bytes(key, &bytes)
.await
.map_err(|e| PutError::LocalError(e)),
}
}
}
Expand Down
Loading

0 comments on commit eb5f3e2

Please sign in to comment.