Skip to content

Commit

Permalink
added http reader
Browse files Browse the repository at this point in the history
  • Loading branch information
marsupialtail committed Apr 29, 2024
1 parent 493383b commit 99e0c3c
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 85 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ thiserror = "1"
log = "0.4"

arrow-array = "50.0.0"
arrow-select = "50.0.0"
parquet = { version = "50.0.0", features = [
"arrow",
"async",
Expand Down Expand Up @@ -66,14 +67,14 @@ rand = "0.8.5"
serde_json = "1.0"
uuid = { version = "1.0", features = ["v4", "serde"] }
async-recursion = "1.0.5"
aws-config = { version = "1.1.7", features = ["behavior-version-latest"]}
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.23.0" }
bitvector = "0.1.5"
ndarray = { version = "0.15.6", features = ["rayon", "serde"] }
numpy = "0.20.0"
num-traits = "0.2.18"
ordered-float = "4.2.0"

reqwest = "0.12.4"
[profile.release]
lto = false
bit-vec = "0.6.3"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ It will use the index to search against the Parquet files on S3 directly. Rottne

Rottnest not only supports BM25 indices but also other indices, like regex and vector searches. More documentation will be forthcoming.

### Regex
### Phrase Matches

### Vector
### Vector Approximate Nearest Neighbor

## Architecture

Expand Down
Empty file modified demo.py
100644 → 100755
Empty file.
2 changes: 1 addition & 1 deletion python/rottnest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import rottnest as rottnest
from .pele import search_index_bm25, search_index_substring, search_index_vector, \
merge_index_bm25, merge_index_substring, merge_index_vector, \
index_file_bm25, index_file_substring, index_file_vector
index_file_bm25, index_file_substring, index_file_vector, index_file_kmer

__doc__ = rottnest.__doc__
if hasattr(rottnest, "__all__"):
Expand Down
91 changes: 91 additions & 0 deletions python/rottnest/nlp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import os, pickle
from typing import List
from tqdm import tqdm
import numpy as np
import hashlib

def embed_batch_bgem3(tokens: List[str]):
try:
from FlagEmbedding import BGEM3FlagModel
except:
raise Exception("BGEM3FlagModel requires installation of the FlagEmbedding python package. pip install FlagEmbedding")

model = BGEM3FlagModel('BAAI/bge-m3')

embeddings = model.encode(tokens,
batch_size=12,
max_length=max([len(i) for i in tokens])
)['dense_vecs']
return embeddings

def embed_batch_openai(tokens: List[str], model = "text-embedding-3-large"):
try:
from openai import OpenAI
except:
raise Exception("OpenAI python package required. pip install openai")
client = OpenAI()
all_vecs = []
for i in tqdm(range(0, len(tokens), 1000)):
results = client.embeddings.create(input = tokens[i:i+1000], model = model)
vecs = np.vstack([results.data[i].embedding for i in range(len(results.data))])
all_vecs.append(vecs)

return np.vstack(all_vecs)

def query_expansion_llm(tokenizer_vocab: List[str], query: str, method = "bge", expansion_tokens = 20):

assert type(query) == str, "query must be string. If you have a list of keywords, concatenate them with spaces."

cache_dir = os.path.expanduser('~/.cache')
# make a subdirectory rottnest under cache_dir if it's not there already
cache_dir = os.path.join(cache_dir, 'rottnest')
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

tokenizer_hash = hashlib.sha256(("".join(tokenizer_vocab[::1000])).encode('utf-8')).hexdigest()[:8]

# check if the tokenizer_embeddings.pkl file exists in the cache directory
tokenizer_embeddings_path = os.path.join(cache_dir, f"tokenizer_embeddings_{tokenizer_hash}_{method}.pkl")

if not os.path.exists(tokenizer_embeddings_path):
print(f"First time doing LLM query expansion with this tokenizer, computing tokenizer embeddings with {method}.")
tokenizer_vocab = [tok if tok else "[]" for tok in tokenizer_vocab]

if method == "bge":
all_vecs = embed_batch_bgem3(tokenizer_vocab)
elif method == "openai":
all_vecs = embed_batch_openai(tokenizer_vocab)

pickle.dump({"words": tokenizer_vocab, "vecs": all_vecs},
open(os.path.join(cache_dir, f"tokenizer_embeddings_{tokenizer_hash}_{method}.pkl"), "wb"))

embeddings = pickle.load(open(tokenizer_embeddings_path, "rb"))

tokens = embeddings['words']
db_vectors = embeddings['vecs']

if method == "bge":
query_vec = embed_batch_bgem3([query])[0]
elif method == "openai":
query_vec = embed_batch_openai([query])[0]

distances = np.dot(db_vectors, query_vec) / np.linalg.norm(db_vectors, axis = 1) / np.linalg.norm(query_vec)
indices = np.argsort(-distances)[:expansion_tokens]
print("Expanded tokens: ", [tokens[i] for i in indices])

return [tokens[i] for i in indices], list(indices), list(distances[indices])

def query_expansion_keyword(tokenizer_vocab: List[str], query: str):

# simply check what words in tokenizer_vocab appears in query, and the weight is how many times it appears
token_ids = []
tokens = []
weights = []
for i, token in tokenizer_vocab:
if token in query:
token_ids.append(i)
tokens.append(token)
weights.append(query.count(token))

print("Expanded tokens: ", tokens)
return tokens, token_ids, weights
34 changes: 32 additions & 2 deletions python/rottnest/pele.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def read_columns():

def index_file_bm25(file_path: str, column_name: str, name = None, tokenizer_file = None):

arr, layout = rottnest.get_parquet_layout(column_name, file_path)
arrs, layout = rottnest.get_parquet_layout(column_name, file_path)
arr = pyarrow.concat_arrays([i.cast(pyarrow.large_string()) for i in arrs])
data_page_num_rows = np.array(layout.data_page_num_rows)
uid = np.repeat(np.arange(len(data_page_num_rows)), data_page_num_rows) + 1

Expand Down Expand Up @@ -69,7 +70,8 @@ def index_file_bm25(file_path: str, column_name: str, name = None, tokenizer_fil

def index_file_substring(file_path: str, column_name: str, name = None, tokenizer_file = None):

arr, layout = rottnest.get_parquet_layout(column_name, file_path)
arrs, layout = rottnest.get_parquet_layout(column_name, file_path)
arr = pyarrow.concat_arrays([i.cast(pyarrow.large_string()) for i in arrs])
data_page_num_rows = np.array(layout.data_page_num_rows)
uid = np.repeat(np.arange(len(data_page_num_rows)), data_page_num_rows) + 1

Expand All @@ -93,6 +95,34 @@ def index_file_substring(file_path: str, column_name: str, name = None, tokenize
file_data.write_parquet(f"{name}.meta")
print(rottnest.build_lava_substring(f"{name}.lava", arr, pyarrow.array(uid.astype(np.uint64)), tokenizer_file))


def index_file_kmer(file_path: str, column_name: str, name = None, tokenizer_file = None):

arrs, layout = rottnest.get_parquet_layout(column_name, file_path)
arr = pyarrow.concat_arrays([i.cast(pyarrow.large_string()) for i in arrs])
data_page_num_rows = np.array(layout.data_page_num_rows)
uid = np.repeat(np.arange(len(data_page_num_rows)), data_page_num_rows) + 1

x = np.cumsum(np.hstack([[0],layout.data_page_num_rows[:-1]]))
y = np.repeat(x[np.cumsum(np.hstack([[0],layout.row_group_data_pages[:-1]])).astype(np.uint64)], layout.row_group_data_pages)
page_row_offsets_in_row_group = x - y

file_data = polars.from_dict({
"uid": np.arange(len(data_page_num_rows) + 1),
"file_path": [file_path] * (len(data_page_num_rows) + 1),
"column_name": [column_name] * (len(data_page_num_rows) + 1),
"data_page_offsets": [-1] + layout.data_page_offsets,
"data_page_sizes": [-1] + layout.data_page_sizes,
"dictionary_page_sizes": [-1] + layout.dictionary_page_sizes,
"row_groups": np.hstack([[-1] , np.repeat(np.arange(layout.num_row_groups), layout.row_group_data_pages)]),
"page_row_offset_in_row_group": np.hstack([[-1], page_row_offsets_in_row_group])
}
)
name = uuid.uuid4().hex if name is None else name

file_data.write_parquet(f"{name}.meta")
print(rottnest.build_lava_kmer(f"{name}.lava", arr, pyarrow.array(uid.astype(np.uint64)), tokenizer_file))

def index_file_vector(file_path: str, column_name: str, name = None):

arr, layout = rottnest.get_parquet_layout(column_name, file_path)
Expand Down
85 changes: 54 additions & 31 deletions src/formats/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use arrow::array::ArrayData;
use arrow::compute::kernels::concat_elements::concat_elements_utf8_many;
use arrow::datatypes::ToByteSlice;
use arrow::error::ArrowError;
use arrow_array::{Array, BinaryArray, StringArray};
use arrow_array::{Array, BinaryArray, LargeBinaryArray, LargeStringArray, StringArray};
use arrow_select::concat::concat;

use log::debug;
use parquet::{
arrow::array_reader::make_byte_array_reader,
Expand All @@ -22,8 +25,8 @@ use parquet::{
use thrift::protocol::TCompactInputProtocol;

use bytes::Bytes;
use std::convert::TryFrom;
use std::io::Read;
use std::{convert::TryFrom, sync::Arc};

use futures::stream::{self, StreamExt};
use itertools::{izip, Itertools};
Expand All @@ -32,7 +35,7 @@ use std::collections::HashMap;
use tokio::{self};

use crate::{
formats::readers::{AsyncReader, get_file_size_and_reader},
formats::readers::{get_file_size_and_reader, AsyncReader},
lava::error::LavaError,
};

Expand Down Expand Up @@ -186,7 +189,10 @@ fn read_page_header<C: ChunkReader>(
Ok((tracked.1, header))
}

async fn parse_metadatas(file_paths: &Vec<String>, reader_type: ReaderType) -> HashMap<String, ParquetMetaData> {
async fn parse_metadatas(
file_paths: &Vec<String>,
reader_type: ReaderType,
) -> HashMap<String, ParquetMetaData> {
let iter = file_paths.iter().dedup();

let handles = stream::iter(iter)
Expand All @@ -196,7 +202,9 @@ async fn parse_metadatas(file_paths: &Vec<String>, reader_type: ReaderType) -> H

tokio::spawn(async move {
let (file_size, mut reader) =
get_file_size_and_reader(file_path.clone(), reader_type).await.unwrap();
get_file_size_and_reader(file_path.clone(), reader_type)
.await
.unwrap();

let metadata = parse_metadata(&mut reader, file_size as usize)
.await
Expand Down Expand Up @@ -236,8 +244,9 @@ pub async fn get_parquet_layout(
column_name: &str,
file_path: &str,
reader_type: ReaderType,
) -> Result<(arrow::array::ArrayData, ParquetLayout), LavaError> {
let (file_size, mut reader) = get_file_size_and_reader(file_path.to_string(), reader_type).await?;
) -> Result<(Vec<arrow::array::ArrayData>, ParquetLayout), LavaError> {
let (file_size, mut reader) =
get_file_size_and_reader(file_path.to_string(), reader_type).await?;
let metadata = parse_metadata(&mut reader, file_size as usize).await?;

let codec_options = CodecOptionsBuilder::default()
Expand Down Expand Up @@ -373,30 +382,42 @@ pub async fn get_parquet_layout(
None,
)
.unwrap();
let array = array_reader.next_batch(total_values as usize).unwrap();

let new_array: Result<
&arrow_array::GenericByteArray<arrow::datatypes::GenericStringType<i32>>,
ArrowError,
> = array.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
ArrowError::ParseError("Expects string array as first argument".to_string())
});

let data = match new_array {
Ok(_) => new_array.unwrap().to_data(),
Err(_) => array
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| {
ArrowError::ParseError(
"Expects string or binary array as first argument".to_string(),
)
})
.unwrap()
.to_data(),
};
// let array = array_reader.next_batch(total_values as usize).unwrap();

// instead of reading in total_values at once, we need to read 10_000 at a time and collect results into a Vec<Arc<dyn Array>>

let mut arrays: Vec<ArrayData> = Vec::new();

Ok((data, parquet_layout))
for x in (0..total_values).step_by(10_000) {
let array = array_reader.next_batch(10_000).unwrap();
let new_array: Result<
&arrow_array::GenericByteArray<arrow::datatypes::GenericStringType<i32>>,
ArrowError,
> = array.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
ArrowError::ParseError("Expects string array as first argument".to_string())
});

let data = match new_array {
Ok(_) => new_array.unwrap().to_data(),
Err(_) => array
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| {
ArrowError::ParseError(
"Expects string or binary array as first argument".to_string(),
)
})
.unwrap()
.to_data(),
};
arrays.push(data);
}

// Weston's magic doesn't work, concat will overflow anyway.
// let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
// let array = concat(&array_refs).unwrap();

Ok((arrays, parquet_layout))
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -467,7 +488,9 @@ pub async fn read_indexed_pages_async(
let handle = tokio::spawn(async move {
debug!("tokio spawn thread: {:?}", std::thread::current().id());
let (_file_size, mut reader) =
get_file_size_and_reader(file_path.clone(), reader_type).await.unwrap();
get_file_size_and_reader(file_path.clone(), reader_type)
.await
.unwrap();
let mut pages: Vec<parquet::column::page::Page> = Vec::new();
if dict_page_size > 0 {
let start = dict_page_offset.unwrap() as u64;
Expand Down
Loading

0 comments on commit 99e0c3c

Please sign in to comment.