Skip to content

Commit

Permalink
1099 cli ingest cleanup cache (#1211)
Browse files Browse the repository at this point in the history
* clean up cache after indexing

* add unit test; make const variables

* test changes
  • Loading branch information
Dai Dao authored Mar 27, 2022
1 parent 5f9a71e commit 0db764c
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 11 deletions.
22 changes: 20 additions & 2 deletions quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use quickwit_actors::{ActorHandle, ObservationType, Universe};
use quickwit_common::uri::Uri;
use quickwit_common::{run_checklist, GREEN_COLOR};
use quickwit_config::{IndexConfig, IndexerConfig, SourceConfig, SourceParams};
use quickwit_core::{create_index, delete_index, garbage_collect_index, reset_index};
use quickwit_core::{
clean_split_cache, create_index, delete_index, garbage_collect_index, reset_index,
};
use quickwit_doc_mapper::tag_pruning::match_tag_field_name;
use quickwit_indexing::actors::{IndexingPipeline, IndexingServer};
use quickwit_indexing::models::{
Expand Down Expand Up @@ -82,6 +84,8 @@ pub fn build_index_command<'a>() -> Command<'a> {
.required(false),
arg!(--overwrite "Overwrites pre-existing index.")
.required(false),
arg!(--clean_cache "Clean up local cache splits after indexing.")
.required(false),
])
)
.subcommand(
Expand Down Expand Up @@ -198,6 +202,7 @@ pub struct IngestDocsArgs {
pub config_uri: Uri,
pub data_dir: Option<PathBuf>,
pub overwrite: bool,
pub clean_cache: bool,
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -330,13 +335,15 @@ impl IndexCliCommand {
.expect("`config` is a required arg.")?;
let data_dir = matches.value_of("data-dir").map(PathBuf::from);
let overwrite = matches.is_present("overwrite");
let clean_cache = matches.is_present("clean_cache");

Ok(Self::Ingest(IngestDocsArgs {
index_id,
input_path_opt,
overwrite,
config_uri,
data_dir,
clean_cache,
}))
}

Expand Down Expand Up @@ -782,7 +789,7 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
};
let universe = Universe::new();
let indexing_server = IndexingServer::new(
config.data_dir_path,
config.clone().data_dir_path,
indexer_config,
metastore,
storage_resolver,
Expand Down Expand Up @@ -818,6 +825,17 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
args.index_id
);
}

if args.clean_cache {
println!("Cleaning up split cache ...");
clean_split_cache(
&config.data_dir_path,
index_metadata.index_id.clone(),
INGEST_SOURCE_ID.to_string(),
)
.await?;
}

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ mod tests {
input_path_opt: None,
overwrite: false,
data_dir: None,
clean_cache: false,
})) if &index_id == "wikipedia"
&& config_uri == Uri::try_new("file:///config.yaml").unwrap()
));
Expand All @@ -247,6 +248,7 @@ mod tests {
input_path_opt: None,
overwrite: true,
data_dir: None,
clean_cache: false
})) if &index_id == "wikipedia"
&& config_uri == Uri::try_new("file:///config.yaml").unwrap()
));
Expand Down
43 changes: 40 additions & 3 deletions quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use predicates::prelude::*;
use quickwit_cli::index::{create_index_cli, search_index, CreateIndexArgs, SearchIndexArgs};
use quickwit_common::rand::append_random_suffix;
use quickwit_common::uri::Uri;
use quickwit_core::get_cache_path;
use quickwit_indexing::source::INGEST_SOURCE_ID;
use quickwit_metastore::{quickwit_metastore_uri_resolver, Metastore};
use serde_json::{json, Number, Value};
use serial_test::serial;
Expand All @@ -50,13 +52,14 @@ fn create_logs_index(test_env: &TestEnv) {
.success();
}

fn ingest_docs(input_path: &Path, test_env: &TestEnv) {
fn ingest_docs_with_options(input_path: &Path, test_env: &TestEnv, options: &str) {
make_command(
format!(
"index ingest --index {} --input-path {} --config {}",
"index ingest --index {} --input-path {} --config {} {}",
test_env.index_id,
input_path.display(),
test_env.resource_files["config"].display(),
options
)
.as_str(),
)
Expand All @@ -69,6 +72,10 @@ fn ingest_docs(input_path: &Path, test_env: &TestEnv) {
));
}

fn ingest_docs(input_path: &Path, test_env: &TestEnv) {
ingest_docs_with_options(input_path, test_env, "");
}

#[test]
fn test_cmd_help() -> anyhow::Result<()> {
let mut cmd = make_command("--help");
Expand Down Expand Up @@ -187,14 +194,44 @@ fn test_cmd_ingest_on_non_existing_file() -> Result<()> {
Ok(())
}

#[test]
fn test_cmd_ingest_clean_cache() -> Result<()> {
let index_id = append_random_suffix("test-index-clean-cache");
let test_env = create_test_env(index_id, TestStorageType::LocalFileSystem)?;
create_logs_index(&test_env);

ingest_docs_with_options(
test_env.resource_files["logs"].as_path(),
&test_env,
"--clean_cache",
);

// check cache path
let cache_path = get_cache_path(
&test_env.data_dir_path,
&test_env.index_id,
INGEST_SOURCE_ID,
);
assert_eq!(false, cache_path.exists());

Ok(())
}

#[test]
fn test_cmd_ingest_simple() -> Result<()> {
let index_id = append_random_suffix("test-index-simple");
let test_env = create_test_env(index_id, TestStorageType::LocalFileSystem)?;
create_logs_index(&test_env);

ingest_docs(test_env.resource_files["logs"].as_path(), &test_env);

// check cache path
let cache_path = get_cache_path(
&test_env.data_dir_path,
&test_env.index_id,
INGEST_SOURCE_ID,
);
assert_eq!(true, cache_path.exists());

// Using piped input
let log_path = test_env.resource_files["logs"].clone();
make_command(
Expand Down
32 changes: 31 additions & 1 deletion quickwit-core/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use quickwit_indexing::actors::INDEXING;
use quickwit_indexing::models::CACHE;
use quickwit_indexing::{
delete_splits_with_files, run_garbage_collect, FileEntry, IndexingSplitStore,
};
use quickwit_metastore::{
quickwit_metastore_uri_resolver, IndexMetadata, Metastore, SplitMetadata, SplitState,
};
use quickwit_storage::{quickwit_storage_uri_resolver, Storage};
use tracing::error;
use tokio::fs;
use tracing::{error, info};

/// Creates an index at `index-path` extracted from `metastore_uri`. The command fails if an index
/// already exists at `index-path`.
Expand Down Expand Up @@ -114,6 +118,32 @@ pub async fn delete_index(
Ok(deleted_entries)
}

/// Helper function to get the cache path.
pub fn get_cache_path(data_dir_path: &Path, index_id: &str, source_id: &str) -> PathBuf {
data_dir_path
.join(INDEXING)
.join(index_id)
.join(source_id)
.join(CACHE)
}

/// Cleans up split cache in local split store.
///
/// * `data_dir_path` - Path to directory where data (tmp data, splits kept for caching purpose) is
/// persisted.
/// * `index_id` - The target index Id.
/// * `source_id` - The source Id.
pub async fn clean_split_cache(
data_dir_path: &Path,
index_id: String,
source_id: String,
) -> anyhow::Result<()> {
let cache_path = get_cache_path(data_dir_path, &index_id, &source_id);
info!(cache_path = %cache_path.as_path().display(), "cache_path");
fs::remove_dir_all(cache_path.as_path()).await?;
Ok(())
}

/// Detect all dangling splits and associated files from the index and removes them.
///
/// * `metastore_uri` - The metastore URI for accessing the metastore.
Expand Down
5 changes: 4 additions & 1 deletion quickwit-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
mod index;

pub use index::{create_index, delete_index, garbage_collect_index, reset_index};
pub use index::{
clean_split_cache, create_index, delete_index, garbage_collect_index, get_cache_path,
reset_index,
};

#[cfg(test)]
mod tests {
Expand Down
4 changes: 3 additions & 1 deletion quickwit-indexing/src/actors/indexing_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ use crate::models::{
};
use crate::{IndexingPipeline, IndexingPipelineParams, IndexingStatistics};

pub const INDEXING: &str = "indexing";

#[derive(Error, Debug)]
pub enum IndexingServerError {
#[error("Indexing pipeline `{index_id}` for source `{source_id}` does not exist.")]
Expand Down Expand Up @@ -82,7 +84,7 @@ impl IndexingServer {
storage_resolver: StorageUriResolver,
) -> IndexingServer {
Self {
indexing_dir_path: data_dir_path.join("indexing"),
indexing_dir_path: data_dir_path.join(INDEXING),
split_store_max_num_bytes: indexer_config.split_store_max_num_bytes.get_bytes()
as usize,
split_store_max_num_splits: indexer_config.split_store_max_num_splits,
Expand Down
2 changes: 1 addition & 1 deletion quickwit-indexing/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod publisher;
mod uploader;

pub use indexing_pipeline::{IndexingPipeline, IndexingPipelineHandler, IndexingPipelineParams};
pub use indexing_server::IndexingServer;
pub use indexing_server::{IndexingServer, INDEXING};
use tantivy::schema::{Field, FieldType};
mod merge_executor;
mod merge_planner;
Expand Down
4 changes: 3 additions & 1 deletion quickwit-indexing/src/models/indexing_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use tokio::fs;

use super::ScratchDirectory;

pub const CACHE: &str = "cache";

/// Root of an [`IndexingDirectory`].
#[derive(Clone)]
enum Root {
Expand Down Expand Up @@ -54,7 +56,7 @@ pub struct IndexingDirectory {
impl IndexingDirectory {
pub async fn create_in_dir<P: AsRef<Path>>(dir_path: P) -> anyhow::Result<IndexingDirectory> {
// Create cache directory if does not exist.
let cache_directory_path = dir_path.as_ref().join("cache");
let cache_directory_path = dir_path.as_ref().join(CACHE);
fs::create_dir_all(&cache_directory_path)
.await
.with_context(|| {
Expand Down
2 changes: 1 addition & 1 deletion quickwit-indexing/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod raw_doc_batch;
mod scratch_directory;

pub use indexed_split::{IndexedSplit, IndexedSplitBatch};
pub use indexing_directory::IndexingDirectory;
pub use indexing_directory::{IndexingDirectory, CACHE};
pub use indexing_server_message::{
DetachPipeline, IndexingPipelineId, ObservePipeline, SpawnMergePipeline, SpawnPipeline,
SpawnPipelinesForIndex,
Expand Down

0 comments on commit 0db764c

Please sign in to comment.