Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coco benchmarks for lance and parquet formats #97

Merged
merged 9 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 195 additions & 5 deletions python/benchmarks/bench_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import pathlib
from abc import ABC, abstractmethod
from functools import wraps
import multiprocessing as mp
from typing import Iterable

import click
import pandas as pd
import time

import pyarrow as pa
import pyarrow.fs
import pyarrow.dataset as ds
import pyarrow.parquet as pq

import lance

__all__ = ["download_uris", "timeit"]
__all__ = ["download_uris", "timeit", "get_dataset", "get_uri", "BenchmarkSuite"]

KNOWN_FORMATS = ["lance", "parquet", "raw"]

def get_bytes(uri):

def read_file(uri) -> bytes:
fs, key = pyarrow.fs.FileSystem.from_uri(uri)
return fs.open_input_file(key).read()


def download_uris(uris: pd.Series) -> pd.Series:
def download_uris(uris: Iterable[str], func=read_file) -> Iterable[bytes]:
if isinstance(uris, pd.Series):
uris = uris.values
pool = mp.Pool(mp.cpu_count() - 1)
data = pool.map(get_bytes, uris.values)
data = pool.map(func, uris)
return data


Expand All @@ -46,3 +60,179 @@ def timeit_wrapper(*args, **kwargs):
return result

return timeit_wrapper


def get_dataset(uri: str) -> ds.Dataset:
"""
Return a pyarrow Dataset stored at the given uri
"""
if uri.endswith('.lance'):
return lance.dataset(uri)
return ds.dataset(uri)


def get_uri(base_uri: str, dataset_name: str, fmt: str,
flavor: str = None) -> str:
"""
Return the uri to the dataset with the given specifications

Parameters
----------
base_uri: str
Base uri to the root of the benchmark dataset catalog
dataset_name: str
Catalog name of the dataset (e.g., coco, oxford_pet)
fmt: str
'lance', 'parquet', or 'raw'
flavor: str, optional
We may store different flavors for parquet and lance,
e.g., with image links but not bytes
"""
return f"{base_uri}/{dataset_name}{('_' + flavor) if flavor else ''}.{fmt}"


class BenchmarkSuite:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there something similar in pytest or other library ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, there are a bunch. was easier to just write something quick for our use case here.

We could switch to asv at some point: https://asv.readthedocs.io/en/stable/index.html


def __init__(self, name: str):
self.name = name
self._benchmarks = {}
self._results = {}

def benchmark(self, name, key=None):
def decorator(func):
b = Benchmark(name, func, key=key)
self._benchmarks[name] = b
return func

return decorator

def get_benchmark(self, name):
return self._benchmarks[name]

def list_benchmarks(self):
return self._benchmarks.values()

def create_main(self):
@click.command
@click.option('-u', '--base-uri', required=True, type=str,
help="Base uri to the benchmark dataset catalog")
@click.option('-f', '--format', 'fmt',
help="'lance', 'parquet', or 'raw'. Omit for all")
@click.option('--flavor', type=str,
help="external if parquet/lance had external images version")
@click.option('-b', '--benchmark', type=str,
help="which benchmark to run. Omit for all")
@click.option('-r', '--repeats', type=int,
help="number of times to run each benchmark")
@click.option('-o', '--output', type=str,
help="save timing results to directory")
def main(base_uri, fmt, flavor, benchmark, repeats, output):
if fmt:
fmt = fmt.strip().lower()
assert fmt in KNOWN_FORMATS
fmt = [fmt]
else:
fmt = KNOWN_FORMATS
base_uri = f'{base_uri}/datasets/{self.name}'

def run_benchmark(bmark):
b = bmark.repeat(repeats or 1)
for f in fmt:
b.run(base_uri=base_uri, fmt=f, flavor=flavor)
if output:
path = pathlib.Path(output) / f"{bmark.name}.csv"
b.to_df().to_csv(path, index=False)

if benchmark is not None:
b = self.get_benchmark(benchmark)
run_benchmark(b)
else:
[run_benchmark(b) for b in self.list_benchmarks()]

return main


class Benchmark:

def __init__(self, name, func, key=None, num_runs=1):
self.name = name
self.func = func
self.key = key
self.num_runs = num_runs
self._timings = {}

def repeat(self, num_runs: int):
return Benchmark(self.name, self.func, key=self.key, num_runs=num_runs)

def run(self, *args, **kwargs):
output = None
func = self.timeit("total")(self.func)
for i in range(self.num_runs):
output = func(*args, **kwargs)
return output

def to_df(self):
return pd.DataFrame(self._timings)

def timeit(self, name):
def benchmark_decorator(func):
@wraps(func)
def timeit_wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
total_time = end_time - start_time
# first item in the args, ie `args[0]` is `self`
print(f"Function {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds")
key = tuple([name] + [kwargs.get(k) for k in self.key])
self._timings.setdefault(key, []).append(total_time)
return result

return timeit_wrapper

return benchmark_decorator


class DatasetConverter(ABC):

def __init__(self, name, uri_root):
self.name = name
self.uri_root = uri_root

@abstractmethod
def read_metadata(self) -> pd.DataFrame:
pass

def default_dataset_path(self, fmt, flavor=None):
suffix = f"_{flavor}" if flavor else ""
return os.path.join(self.uri_root,
f'{self.name}{suffix}.{fmt}')

def save_df(self, df, fmt='lance', output_path=None):
output_path = output_path or self.default_dataset_path(fmt, "links")
table = pa.Table.from_pandas(df, self.get_schema())
if fmt == 'parquet':
pq.write_table(table, output_path)
elif fmt == 'lance':
lance.write_table(table, output_path)
return table

@abstractmethod
def image_uris(self, table):
pass

def make_embedded_dataset(self, table: pa.Table, fmt='lance', output_path=None):
output_path = output_path or self.default_dataset_path(fmt)
uris = self.image_uris(table)
images = download_uris(pd.Series(uris))
arr = pa.BinaryArray.from_pandas(images)
embedded = table.append_column(pa.field("image", pa.binary()), arr)
if fmt == 'parquet':
pq.write_table(embedded, output_path)
elif fmt == 'lance':
lance.write_table(embedded, output_path)
return embedded

@abstractmethod
def get_schema(self):
pass
149 changes: 87 additions & 62 deletions python/benchmarks/coco.py
Original file line number Diff line number Diff line change
@@ -1,81 +1,106 @@
#!/usr/bin/env python3

import argparse
import json
import os
from typing import Union

import duckdb
import pandas as pd
import pyarrow as pa
import pyarrow.fs

from bench_utils import download_uris, timeit


def get_metadata(base_uri: str, split: str = "val"):
annotation_uri = os.path.join(base_uri, f"annotations/instances_{split}2017.json")
fs, path = pa.fs.FileSystem.from_uri(annotation_uri)
with fs.open_input_file(path) as fobj:
annotation_json = json.load(fobj)
df = pd.DataFrame(annotation_json["annotations"])
category_df = pd.DataFrame(annotation_json["categories"])
annotations_df = df.merge(category_df, left_on="category_id", right_on="id").rename(
{"id": "category_id"}
)
anno_df = (
pd.DataFrame(
{
"image_id": df.image_id,
"annotations": annotations_df.drop(
columns=["image_id"], axis=1
).to_dict(orient="records"),
}
)
.groupby("image_id")
.agg(list)
)
# print(anno_df, anno_df.columns)
images_df = pd.DataFrame(annotation_json["images"])
images_df["split"] = split
images_df["image_uri"] = images_df["file_name"].apply(
lambda fname: os.path.join(base_uri, f"{split}2017", fname)
)
return images_df.merge(anno_df, left_on="id", right_on="image_id")


@timeit
def get_label_distribution(base_uri: str):

import lance
import pyarrow.compute as pc
import pyarrow.dataset as ds
from bench_utils import download_uris, get_uri, get_dataset, BenchmarkSuite
from parse_coco import CocoConverter

coco_benchmarks = BenchmarkSuite("coco")


@coco_benchmarks.benchmark("label_distribution", key=['fmt', 'flavor'])
def label_distribution(base_uri: str, fmt: str, flavor: str = None):
if fmt == 'raw':
return _label_distribution_raw(base_uri)
elif fmt == 'lance':
uri = get_uri(base_uri, "coco", fmt, flavor)
dataset = get_dataset(uri)
return _label_distribution_lance(dataset)
elif fmt == 'parquet':
uri = get_uri(base_uri, "coco", fmt, flavor)
dataset = get_dataset(uri)
return _label_distribution_duckdb(dataset)
raise NotImplementedError()


@coco_benchmarks.benchmark("filter_data", key=['fmt', 'flavor'])
def filter_data(base_uri: str, fmt: str, flavor: str = None):
if fmt == 'raw':
return _filter_data_raw(base_uri)
elif fmt == 'lance':
return _filter_data_lance(base_uri, flavor=flavor)
elif fmt == 'parquet':
return _filter_data_parquet(base_uri, flavor=flavor)
raise NotImplementedError()


def _label_distribution_raw(base_uri: str):
"""Minic
SELECT label, count(1) FROM coco_dataset GROUP BY 1
"""
metadata = get_metadata(base_uri)
exploded_series = (
metadata["annotations"].explode("annotations").apply(lambda r: r["name"])
)
return exploded_series.value_counts()
c = CocoConverter(base_uri)
df = c.read_metadata()
return pd.json_normalize(df.annotations.explode()).name.value_counts()


@timeit
def get_filtered_data(url: str, klass="cat", offset=20, limit=50):
def _filter_data_raw(base_uri: str, klass="cat", offset=20, limit=50):
"""SELECT image, annotations FROM coco WHERE annotations.label = 'cat' LIMIT 50 OFFSET 20"""
# %time rs = bench.get_pets_filtered_data(url, "pug", 20, 50)
df = get_metadata(url)
print(df["annotations"])
filtered = df[["image_uri", "annotations"]].loc[df["annotations"].apply(
lambda annos: any([a["name"] == "cat" for a in annos])
)]
c = CocoConverter(base_uri)
df = c.read_metadata()
mask = df.annotations.apply(lambda ann: any([a["name"] == klass for a in ann]))
filtered = df.loc[mask, ["image_uri", "annotations"]]
limited = filtered[offset:offset + limit]
limited["image"] = download_uris(limited.image_uri)
limited.assign(image=download_uris(limited.image_uri))
return limited


def main():
parser = argparse.ArgumentParser(description="Benchmarks on COCO dataset")
parser.add_argument("uri", help="base uri for coco dataset")
args = parser.parse_args()
def _filter_data_lance(base_uri: str, klass="cat", offset=20, limit=50, flavor=None):
uri = get_uri(base_uri, "coco", "lance", flavor)
index_scanner = lance.scanner(uri, columns=['image_id', 'annotations.name'])
query = (f"SELECT distinct image_id FROM ("
f" SELECT image_id, UNNEST(annotations) as ann FROM index_scanner"
f") WHERE ann.name == '{klass}'")
filtered_ids = duckdb.query(query).arrow().column("image_id").combine_chunks()
scanner = lance.scanner(uri, ['image_id', 'image', 'annotations.name'],
# filter=pc.field("image_id").isin(filtered_ids),
limit=50, offset=20)
return scanner.to_table().to_pandas()


def _filter_data_parquet(base_uri: str, klass="cat", offset=20, limit=50, flavor=None):
uri = get_uri(base_uri, "coco", "parquet", flavor)
dataset = ds.dataset(uri)
query = (f"SELECT distinct image_id FROM ("
f" SELECT image_id, UNNEST(annotations) as ann FROM dataset"
f") WHERE ann.name == '{klass}'")
filtered_ids = duckdb.query(query).arrow().column("image_id").to_numpy().tolist()
id_string = ','.join([f"'{x}'" for x in filtered_ids])
return duckdb.query(f"SELECT image, annotations "
f"FROM dataset "
f"WHERE image_id in ({id_string}) "
f"LIMIT 50 OFFSET 20").to_arrow_table()


get_label_distribution(args.uri)
get_filtered_data(args.uri)
def _label_distribution_lance(dataset: ds.Dataset):
scanner = lance.scanner(dataset, columns=['annotations.name'])
return _label_distribution_duckdb(scanner)


def _label_distribution_duckdb(arrow_obj: Union[ds.Dataset | ds.Scanner]):
query = """\
SELECT ann.name, COUNT(1) FROM (
SELECT UNNEST(annotations) as ann FROM arrow_obj
) GROUP BY 1
"""
return duckdb.query(query).to_df()


if __name__ == "__main__":
main = coco_benchmarks.create_main()
main()
Loading