Skip to content

Commit

Permalink
feat: Initial globbing ranges support (#2654)
Browse files Browse the repository at this point in the history
  • Loading branch information
vrongmeal authored Mar 6, 2024
1 parent 4cfdb7e commit 73ee918
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 157 deletions.
15 changes: 6 additions & 9 deletions crates/datasources/src/bson/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::bson::errors::BsonError;
use crate::bson::schema::{merge_schemas, schema_from_document};
use crate::bson::stream::BsonPartitionStream;
use crate::common::url::DatasourceUrl;
use crate::object_store::ObjStoreAccess;
use crate::object_store::{ObjStoreAccess, ObjStoreAccessor};

pub async fn bson_streaming_table(
store_access: Arc<dyn ObjStoreAccess>,
Expand All @@ -25,22 +25,19 @@ pub async fn bson_streaming_table(
// (at least n but stop after n the same) or skip documents
let sample_size = schema_inference_sample_size.unwrap_or(100);

let path = source_url.path();

let store = store_access.create_store()?;

// assume that the file type is a glob and see if there are
// more files...
let mut list = store_access.list_globbed(&store, path.as_ref()).await?;
let accessor = ObjStoreAccessor::new(store_access)?;

let mut list = accessor.list_globbed(source_url.path()).await?;
if list.is_empty() {
return Err(BsonError::NotFound(path.into_owned()));
return Err(BsonError::NotFound(source_url.path().into()));
}

// for consistent results, particularly for the sample, always
// sort by location
list.sort_by(|a, b| a.location.cmp(&b.location));

let store = accessor.into_object_store();

// build a vector of streams, one for each file, that handle BSON's framing.
let mut readers = VecDeque::with_capacity(list.len());
for obj in list {
Expand Down
15 changes: 9 additions & 6 deletions crates/datasources/src/excel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use datafusion::datasource::MemTable;
use object_store::ObjectStore;

use crate::common::url::DatasourceUrl;
use crate::object_store::ObjStoreAccess;
use crate::object_store::{ObjStoreAccess, ObjStoreAccessor};

pub mod errors;
pub mod stream;
Expand Down Expand Up @@ -50,11 +50,9 @@ impl ExcelTable {
}

DatasourceUrl::Url(_) => {
let store = store_access.create_store()?;
let list = store_access
.list_globbed(&store, source_url.path().as_ref())
.await?;
let accessor = ObjStoreAccessor::new(store_access)?;

let list = accessor.list_globbed(source_url.path()).await?;
if list.is_empty() {
return Err(ExcelError::Load(
"could not find .xlsx file at remote".to_string(),
Expand All @@ -66,7 +64,12 @@ impl ExcelTable {
};

let meta = list.first().expect("remote file has a sheet");
let bs = store.get(&meta.location).await?.bytes().await?;
let bs = accessor
.into_object_store()
.get(&meta.location)
.await?
.bytes()
.await?;

let buffer = Cursor::new(bs);
let mut sheets: Sheets<_> = calamine::open_workbook_auto_from_rs(buffer).unwrap();
Expand Down
19 changes: 8 additions & 11 deletions crates/datasources/src/json/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,30 @@ use serde_json::{Map, Value};
use crate::common::url::DatasourceUrl;
use crate::json::errors::JsonError;
use crate::json::stream::{JsonPartitionStream, LazyJsonPartitionStream};
use crate::object_store::ObjStoreAccess;
use crate::object_store::{ObjStoreAccess, ObjStoreAccessor};

pub async fn json_streaming_table(
store_access: Arc<dyn ObjStoreAccess>,
source_url: DatasourceUrl,
) -> Result<Arc<dyn TableProvider>, JsonError> {
let path = source_url.path();
let path = source_url.path().into_owned();

let store = store_access.create_store()?;

// assume that the file type is a glob and see if there are
// more files...
let mut list = store_access.list_globbed(&store, path.as_ref()).await?;
let accessor = ObjStoreAccessor::new(store_access)?;

let mut list = accessor.list_globbed(source_url.path()).await?;
if list.is_empty() {
return Err(JsonError::NotFound(path.into_owned()));
return Err(JsonError::NotFound(path));
}

// for consistent results, particularly for the sample, always
// sort by location
list.sort_by(|a, b| a.location.cmp(&b.location));

let store = accessor.into_object_store();

let mut data = Vec::new();
{
let first_obj = list
.pop()
.ok_or_else(|| JsonError::NotFound(path.into_owned()))?;
let first_obj = list.pop().ok_or_else(|| JsonError::NotFound(path))?;
let blob = store
.get(&first_obj.location)
.await?
Expand Down
260 changes: 260 additions & 0 deletions crates/datasources/src/object_store/glob_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
use std::vec::IntoIter;

use once_cell::sync::Lazy;
use regex::Regex;
use tracing::debug;

static VALID_STRING_REGEX: Lazy<Regex> = Lazy::new(|| Regex::new("^[a-zA-Z0-9_-]+$").unwrap());

#[derive(Debug, Clone)]
struct Replacer {
start: usize,
end: usize,
iter: IntoIter<String>,
}

#[derive(Debug, thiserror::Error)]
#[error("Replacer find error: {0}")]
struct ReplacerFindError(String);

impl ReplacerFindError {
fn new(s: impl Into<String>) -> Self {
Self(s.into())
}
}

fn get_range_iter(start: &str, end: &str) -> Result<Vec<String>, ReplacerFindError> {
// Valid cases:
// N..M -> N and M are both integers
// N..M -> N and M are both lowercase chars
// N..M -> N and M are both uppercase chars
// ..M -> M is a positive integer
// ..M -> M is a lowercase char
// ..M -> M is an uppercase char
// N.. -> N is a lowercase char
// N.. -> N is an uppercase char

fn is_char(s: &str) -> Option<char> {
let mut chars = s.chars();
chars
.next()
.filter(|c| chars.next().is_none() && c.is_ascii_alphabetic())
}

fn parse_int(s: &str) -> Result<i64, ReplacerFindError> {
s.parse::<i64>()
.map_err(|e| ReplacerFindError::new(format!("{s} not a valid i64: {e}")))
}

match (start, end) {
(s, e) if s.is_empty() && e.is_empty() => Err(ReplacerFindError::new(
"both start and end in range are empty strings",
)),
(s, e) if s.is_empty() => {
if let Some(e_char) = is_char(e) {
let s_char = if e_char.is_ascii_uppercase() {
'A'
} else if e_char.is_ascii_lowercase() {
'a'
} else {
unreachable!("e_char should always be an ascii alphabet")
};
let v: Vec<_> = (s_char..=e_char).map(|c| c.to_string()).collect();
Ok(v)
} else {
let i = parse_int(e)?;
if i < 0 {
Err(ReplacerFindError::new(format!(
"end {i} not a valid positive integer in ..M pattern"
)))
} else {
let v: Vec<_> = (0..=i).map(|i| i.to_string()).collect();
Ok(v)
}
}
}
(s, e) if e.is_empty() => {
let s_char = is_char(s).ok_or_else(|| {
ReplacerFindError::new(format!("{s} not a valid char in N.. pattern"))
})?;
let e_char = if s_char.is_ascii_uppercase() {
'Z'
} else if s_char.is_ascii_lowercase() {
'z'
} else {
unreachable!("s_char should always be an ascii alphabet")
};
let v: Vec<_> = (s_char..=e_char).map(|c| c.to_string()).collect();
Ok(v)
}
(s, e) => {
match (is_char(s), is_char(e)) {
(Some(s_char), Some(e_char)) => {
let (s_char, e_char) = if (s_char.is_ascii_uppercase()
&& e_char.is_ascii_uppercase())
|| (s_char.is_ascii_lowercase() && e_char.is_ascii_lowercase())
{
if s_char < e_char {
(s_char, e_char)
} else {
(e_char, s_char)
}
} else {
return Err(ReplacerFindError::new(format!(
"{s} and {e} are invalid chars for range in N..M pattern"
)));
};
let v: Vec<_> = (s_char..=e_char).map(|c| c.to_string()).collect();
Ok(v)
}
(Some(_), None) | (None, Some(_)) => Err(ReplacerFindError::new(format!(
"{s} and {e} are not valid for range in N..M pattern"
))),
(None, None) => {
// Both might be integers.
let s_int = parse_int(s)?;
let e_int = parse_int(e)?;
let (s_int, e_int) = if s_int < e_int {
(s_int, e_int)
} else {
(e_int, s_int)
};
let v: Vec<_> = (s_int..=e_int).map(|i| i.to_string()).collect();
Ok(v)
}
}
}
}
}

fn get_replacer(pattern: &str) -> Result<Replacer, ReplacerFindError> {
fn validate_string(s: &str) -> Result<String, ReplacerFindError> {
if VALID_STRING_REGEX.is_match(s) {
Ok(s.to_string())
} else {
Err(ReplacerFindError::new(format!(
"invalid string in comma seperated values: {s}"
)))
}
}

let start = pattern
.find('{')
.ok_or_else(|| ReplacerFindError::new("no { found"))?;
if start == pattern.len() - 1 {
return Err(ReplacerFindError::new("{ found at the end of pattern"));
}

let end = pattern[start..pattern.len()]
.find('}')
.ok_or_else(|| ReplacerFindError::new("no matching } found"))?;
let end = start + end; // Since we sliced to find the end

let pattern = &pattern[start + 1..end];

let ranges = if pattern.contains(',') {
pattern
.split(',')
.map(validate_string)
.collect::<Result<Vec<_>, _>>()?
} else if pattern.contains("..") {
let (start, end) = pattern
.split_once("..")
.ok_or_else(|| ReplacerFindError::new(format!("expected range N..M: {pattern}")))?;

get_range_iter(start, end)?
} else {
vec![validate_string(pattern)?]
};

Ok(Replacer {
start,
end,
iter: ranges.into_iter(),
})
}

pub fn get_resolved_patterns(pattern: String) -> Vec<ResolvedPattern> {
let Replacer { start, end, iter } = match get_replacer(&pattern) {
Ok(replacer) => replacer,
Err(error) => {
debug!(%error, %pattern, "cannot resolve pattern");
return vec![ResolvedPattern(pattern.to_owned())];
}
};

iter.flat_map(|replacement| {
let begin = &pattern[0..start];
let end = &pattern[end + 1..pattern.len()];
let new_pattern = format!("{begin}{replacement}{end}");
get_resolved_patterns(new_pattern)
})
.collect()
}

#[derive(Debug, Clone)]
pub struct ResolvedPattern(String);

impl From<ResolvedPattern> for String {
fn from(ResolvedPattern(string): ResolvedPattern) -> Self {
string
}
}

impl AsRef<str> for ResolvedPattern {
fn as_ref(&self) -> &str {
let ResolvedPattern(string) = self;
string
}
}

#[cfg(test)]
mod tests {
use super::get_resolved_patterns;

#[test]
fn test_resolve_glob_patterns() {
let test_cases: &[(&str, &[&str])] = &[
("file_{3..6}", &["file_3", "file_4", "file_5", "file_6"]),
("file_{6..3}", &["file_3", "file_4", "file_5", "file_6"]),
("file_{p..r}", &["file_p", "file_q", "file_r"]),
("file_{r..p}", &["file_p", "file_q", "file_r"]),
("file_{..2}", &["file_0", "file_1", "file_2"]),
("file_{..C}", &["file_A", "file_B", "file_C"]),
("file_{..c}", &["file_a", "file_b", "file_c"]),
("file_{X..}", &["file_X", "file_Y", "file_Z"]),
("file_{x..}", &["file_x", "file_y", "file_z"]),
("file_{x,y,z}", &["file_x", "file_y", "file_z"]),
(
"file_0{1..3}.csv",
&["file_01.csv", "file_02.csv", "file_03.csv"],
),
(
"file_{a-b,c-d,e-f}.json",
&["file_a-b.json", "file_c-d.json", "file_e-f.json"],
),
(
"file_{0..2}_{p,q,r}.parquet",
&[
"file_0_p.parquet",
"file_0_q.parquet",
"file_0_r.parquet",
"file_1_p.parquet",
"file_1_q.parquet",
"file_1_r.parquet",
"file_2_p.parquet",
"file_2_q.parquet",
"file_2_r.parquet",
],
),
];

for (pattern, expected) in test_cases {
let patterns = get_resolved_patterns(pattern.to_string());
let actual = patterns.iter().map(AsRef::as_ref).collect::<Vec<_>>();
assert_eq!(&actual, expected, "pattern = {pattern}");
}

// TODO: Add test cases for failed patterns...
}
}
Loading

0 comments on commit 73ee918

Please sign in to comment.