Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2193 Optimizer isn't cost-based #2194

Merged
merged 2 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 946
__build__ = 947

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 2 additions & 0 deletions opteryx/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from opteryx.connectors.disk_connector import DiskConnector
from opteryx.connectors.gcp_cloudstorage_connector import GcpCloudStorageConnector
from opteryx.connectors.gcp_firestore_connector import GcpFireStoreConnector
from opteryx.connectors.iceberg_connector import IcebergConnector
from opteryx.connectors.mongodb_connector import MongoDbConnector
from opteryx.connectors.sql_connector import SqlConnector
from opteryx.shared import MaterializedDatasets
Expand All @@ -35,6 +36,7 @@
"DiskConnector",
"GcpCloudStorageConnector",
"GcpFireStoreConnector",
"IcebergConnector",
"MongoDbConnector",
"SqlConnector",
)
Expand Down
4 changes: 4 additions & 0 deletions opteryx/connectors/disk_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@


class DiskConnector(BaseConnector, Cacheable, Partitionable, PredicatePushable, Asynchronous):
"""
Connector for reading datasets from files on local storage.
"""

__mode__ = "Blob"
__type__ = "LOCAL"

Expand Down
4 changes: 4 additions & 0 deletions opteryx/connectors/file_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@


class FileConnector(BaseConnector, PredicatePushable):
"""
Connector for reading datasets from a file.
"""

__mode__ = "Blob"
__type__ = "FILE"
_byte_array: Optional[bytes] = None # Instance attribute to store file bytes
Expand Down
52 changes: 52 additions & 0 deletions opteryx/connectors/iceberg_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# See the License at http://www.apache.org/licenses/LICENSE-2.0
# Distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND.

"""
Arrow Reader

Used to read datasets registered using the register_arrow or register_df functions.
"""

import pyarrow
from orso.schema import FlatColumn
from orso.schema import RelationSchema

from opteryx.connectors.base.base_connector import DEFAULT_MORSEL_SIZE
from opteryx.connectors.base.base_connector import BaseConnector
from opteryx.shared import MaterializedDatasets
from opteryx.utils import arrow


class IcebergConnector(BaseConnector):
__mode__ = "Internal"
__type__ = "ARROW"

def __init__(self, *args, **kwargs):
BaseConnector.__init__(self, **kwargs)

self.dataset = self.dataset.lower()
self._datasets = MaterializedDatasets()

def get_dataset_schema(self) -> RelationSchema:
dataset = self._datasets[self.dataset]
arrow_schema = dataset.schema

self.schema = RelationSchema(
name=self.dataset,
columns=[FlatColumn.from_arrow(field) for field in arrow_schema],
)

return self.schema

def read_dataset(self, columns: list = None, **kwargs) -> pyarrow.Table:
dataset = self._datasets[self.dataset]

batch_size = DEFAULT_MORSEL_SIZE // (dataset.nbytes / dataset.num_rows)

for batch in dataset.to_batches(max_chunksize=batch_size):
morsel = pyarrow.Table.from_batches([batch], schema=dataset.schema)
if columns:
morsel = arrow.post_read_projector(morsel, columns)
yield morsel
4 changes: 2 additions & 2 deletions opteryx/planner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ def query_planner(
from opteryx.models import QueryProperties
from opteryx.planner.ast_rewriter import do_ast_rewriter
from opteryx.planner.binder import do_bind_phase
from opteryx.planner.cost_based_optimizer import do_cost_based_optimizer
from opteryx.planner.logical_planner import apply_visibility_filters
from opteryx.planner.logical_planner import do_logical_planning_phase
from opteryx.planner.optimizer import do_optimizer
from opteryx.planner.physical_planner import create_physical_plan
from opteryx.planner.sql_rewriter import do_sql_rewrite
from opteryx.third_party import sqloxide
Expand Down Expand Up @@ -182,7 +182,7 @@ def query_planner(
statistics.time_planning_binder += time.monotonic_ns() - start

start = time.monotonic_ns()
optimized_plan = do_cost_based_optimizer(bound_plan, statistics)
optimized_plan = do_optimizer(bound_plan, statistics)
statistics.time_planning_optimizer += time.monotonic_ns() - start

# before we write the new optimizer and execution engine, convert to a V1 plan
Expand Down
4 changes: 2 additions & 2 deletions opteryx/planner/executor/v2_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ def query_planner(
from opteryx.models import QueryProperties
from opteryx.planner.ast_rewriter import do_ast_rewriter
from opteryx.planner.binder import do_bind_phase
from opteryx.planner.cost_based_optimizer import do_cost_based_optimizer
from opteryx.planner.logical_planner import LogicalPlan
from opteryx.planner.logical_planner import do_logical_planning_phase
from opteryx.planner.optimizer import do_optimizer
from opteryx.planner.sql_rewriter import do_sql_rewrite
from opteryx.planner.temporary_physical_planner import create_physical_plan
from opteryx.third_party import sqloxide
Expand Down Expand Up @@ -130,7 +130,7 @@ def query_planner(
# common_table_expressions=ctes,
)

optimized_plan = do_cost_based_optimizer(bound_plan, statistics)
optimized_plan = do_optimizer(bound_plan, statistics)

# before we write the new optimizer and execution engine, convert to a V1 plan
query_properties = QueryProperties(qid=qid, variables=conn.context.variables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@
- Context: Maintains the state during optimization, including the pre-optimized and optimized plans.

The `CostBasedOptimizerVisitor` class orchestrates the optimization process by applying each strategy
in sequence. The `do_cost_based_optimizer` function serves as the entry point for optimizing a logical plan.
in sequence. The `do_optimizer` function serves as the entry point for optimizing a logical plan.

Example Usage:
optimized_plan = do_cost_based_optimizer(logical_plan)
optimized_plan = do_optimizer(logical_plan)

This module aims to enhance query performance through systematic and incremental optimization steps.
"""

from opteryx.config import DISABLE_OPTIMIZER
from opteryx.models import QueryStatistics
from opteryx.planner.cost_based_optimizer.strategies import *
from opteryx.planner.logical_planner import LogicalPlan
from opteryx.planner.optimizer.strategies import *

from .strategies.optimization_strategy import OptimizerContext

__all__ = "do_cost_based_optimizer"
__all__ = "do_optimizer"


class CostBasedOptimizerVisitor:
Expand Down Expand Up @@ -134,9 +134,9 @@ def optimize(self, plan: LogicalPlan) -> LogicalPlan:
return current_plan


def do_cost_based_optimizer(plan: LogicalPlan, statistics: QueryStatistics) -> LogicalPlan:
def do_optimizer(plan: LogicalPlan, statistics: QueryStatistics) -> LogicalPlan:
"""
Perform cost-based optimization on the given logical plan.
Perform optimization on the given logical plan.

Parameters:
plan (LogicalPlan): The logical plan to optimize.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ select = ["SIM"]
ignore = []

[tool.ruff.lint.per-file-ignores]
"**/cost_based_optimizer/**" = ["SIM102"]
"**/optimizer/**" = ["SIM102"]
"opteryx/managers/expression/ops.py" = ["SIM118"]
46 changes: 34 additions & 12 deletions tests/catalog/test_iceberg.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@

import os
import sys

sys.path.insert(1, os.path.join(sys.path[0], "../.."))

from tests.tools import is_arm, is_mac, is_windows, skip_if
import opteryx
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.catalog import load_catalog
from opteryx.connectors import DiskConnector
from opteryx.connectors import IcebergConnector

BASE_PATH: str = "tmp/iceberg"

@skip_if(is_arm() or is_windows() or is_mac())
def set_up_iceberg():
"""
Set up a local Iceberg catalog for testing with NVD data.
Expand All @@ -19,6 +25,9 @@ def set_up_iceberg():
Returns:
str: Path to the created Iceberg table.
"""

from pyiceberg.catalog.sql import SqlCatalog

# Clean up previous test runs if they exist
if os.path.exists(BASE_PATH):
import shutil
Expand Down Expand Up @@ -47,15 +56,28 @@ def set_up_iceberg():
print(f"Iceberg table set up at {BASE_PATH}")
return BASE_PATH

set_up_iceberg()

catalog = load_catalog(
"default",
**{
"uri": f"sqlite:///{BASE_PATH}/pyiceberg_catalog.db",
"warehouse": f"file://{BASE_PATH}",
},
)
def test_iceberg_basic():

from pyiceberg.catalog import load_catalog

set_up_iceberg()

catalog = load_catalog(
"default",
**{
"uri": f"sqlite:///{BASE_PATH}/pyiceberg_catalog.db",
"warehouse": f"file://{BASE_PATH}",
},
)

opteryx.register_store("iceberg", IcebergConnector, io=DiskConnector)

table = catalog.load_table("iceberg.tweets")
print(table.scan().to_arrow())


if __name__ == "__main__": # pragma: no cover
from tests.tools import run_tests

table = catalog.load_table("iceberg.tweets")
print(table.scan().to_arrow())
run_tests()
Loading