Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagination: Remove getTotalHits feature #1649

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ void execute(PhysicalPlan plan, ExecutionContext context,
class QueryResponse {
private final Schema schema;
private final List<ExprValue> results;
private final long total;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
private final Cursor cursor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ public ExecutionEngine.Schema schema() {
return new ExecutionEngine.Schema(List.of());
}

// TODO remove
@Override
public long getTotalHits() {
return 0;
}

@Override
public void open() {
// no-op, no search should be invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class FilterOperator extends PhysicalPlan {
private final Expression conditions;
@ToString.Exclude
private ExprValue next = null;
private long totalHits = 0;

@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
Expand All @@ -51,7 +50,6 @@ public boolean hasNext() {
ExprValue exprValue = conditions.valueOf(inputValue.bindingTuples());
if (!(exprValue.isNull() || exprValue.isMissing()) && (exprValue.booleanValue())) {
next = inputValue;
totalHits++;
return true;
}
}
Expand All @@ -62,10 +60,4 @@ public boolean hasNext() {
public ExprValue next() {
return next;
}

@Override
public long getTotalHits() {
// ignore `input.getTotalHits()`, because it returns wrong (unfiltered) value
return totalHits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public class NestedOperator extends PhysicalPlan {
@EqualsAndHashCode.Exclude
private ListIterator<Map<String, ExprValue>> flattenedResult = result.listIterator();

private long totalHits = 0;

/**
* Constructor for NestedOperator with list of map as arg.
* @param input : PhysicalPlan input.
Expand Down Expand Up @@ -121,13 +119,11 @@ public ExprValue next() {

if (result.isEmpty()) {
flattenedResult = result.listIterator();
totalHits++;
return new ExprTupleValue(new LinkedHashMap<>());
}

flattenedResult = result.listIterator();
}
totalHits++;
return new ExprTupleValue(new LinkedHashMap<>(flattenedResult.next()));
}

Expand Down Expand Up @@ -283,9 +279,4 @@ private void getNested(
row, ret, currentObj);
}
}

@Override
public long getTotalHits() {
return totalHits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,4 @@ public ExecutionEngine.Schema schema() {
throw new IllegalStateException(String.format("[BUG] schema can been only applied to "
+ "ProjectOperator, instead of %s", this.getClass().getSimpleName()));
}

/**
* Returns Total hits matched the search criteria. Note: query may return less if limited.
* {@see Settings#QUERY_SIZE_LIMIT}.
* Any plan which adds/removes rows to the response should overwrite it to provide valid values.
*
* @return Total hits matched the search criteria.
*/
public long getTotalHits() {
return getChild().stream().mapToLong(PhysicalPlan::getTotalHits).max().orElse(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ public boolean hasNext() {
return valuesIterator.hasNext();
}

@Override
public long getTotalHits() {
// ValuesOperator used for queries without `FROM` clause, e.g. `select 1`.
// Such query always returns 1 row.
return 1;
}

@Override
public ExprValue next() {
List<ExprValue> values = valuesIterator.next().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Helper executeSuccess(Split split) {
invocation -> {
ResponseListener<ExecutionEngine.QueryResponse> listener = invocation.getArgument(2);
listener.onResponse(
new ExecutionEngine.QueryResponse(schema, Collections.emptyList(), 0,
new ExecutionEngine.QueryResponse(schema, Collections.emptyList(),
Cursor.None));
return null;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Helper executeSuccess(Long... offsets) {
ResponseListener<ExecutionEngine.QueryResponse> listener =
invocation.getArgument(2);
listener.onResponse(
new ExecutionEngine.QueryResponse(null, Collections.emptyList(), 0,
new ExecutionEngine.QueryResponse(null, Collections.emptyList(),
Cursor.None));

PlanContext planContext = invocation.getArgument(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@ public void never_hasNext() {
assertFalse(plan.hasNext());
}

// TODO remove
@Test
public void no_total_hits() {
var plan = new CursorCloseOperator(null);
assertEquals(0, plan.getTotalHits());
plan.open();
assertEquals(0, plan.getTotalHits());
}

@Test
public void open_is_not_propagated() {
var child = mock(PhysicalPlan.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public void filter_test() {
.tupleValue(ImmutableMap
.of("ip", "209.160.24.63", "action", "GET", "response", 404, "referer",
"www.amazon.com"))));
assertEquals(1, plan.getTotalHits());
}

@Test
Expand All @@ -64,7 +63,6 @@ public void null_value_should_been_ignored() {
DSL.equal(DSL.ref("response", INTEGER), DSL.literal(404)));
List<ExprValue> result = execute(plan);
assertEquals(0, result.size());
assertEquals(0, plan.getTotalHits());
}

@Test
Expand All @@ -78,21 +76,5 @@ public void missing_value_should_been_ignored() {
DSL.equal(DSL.ref("response", INTEGER), DSL.literal(404)));
List<ExprValue> result = execute(plan);
assertEquals(0, result.size());
assertEquals(0, plan.getTotalHits());
}

@Test
public void totalHits() {
when(inputPlan.hasNext()).thenReturn(true, true, true, true, true, false);
var answers = Stream.of(200, 240, 300, 403, 404).map(c ->
new ExprTupleValue(new LinkedHashMap<>(Map.of("response", new ExprIntegerValue(c)))))
.collect(Collectors.toList());
when(inputPlan.next()).thenAnswer(AdditionalAnswers.returnsElementsOf(answers));

FilterOperator plan = new FilterOperator(inputPlan,
DSL.less(DSL.ref("response", INTEGER), DSL.literal(400)));
List<ExprValue> result = execute(plan);
assertEquals(3, result.size());
assertEquals(3, plan.getTotalHits());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public void nested_one_nested_field() {
)
)
);
assertEquals(3, nested.getTotalHits());
}

@Test
Expand Down Expand Up @@ -241,7 +240,6 @@ public void nested_two_nested_field() {
)
)
);
assertEquals(9, nested.getTotalHits());
}

@Test
Expand Down Expand Up @@ -284,7 +282,6 @@ public void nested_two_nested_fields_with_same_path() {
)
)
);
assertEquals(3, nested.getTotalHits());
}

@Test
Expand All @@ -304,7 +301,6 @@ public void non_nested_field_tests() {
tupleValue(new LinkedHashMap<>(Map.of("message", "val")))
)
);
assertEquals(1, nested.getTotalHits());
}

@Test
Expand All @@ -323,7 +319,6 @@ public void nested_missing_tuple_field() {
tupleValue(new LinkedHashMap<>(Map.of("message.val", ExprNullValue.of())))
)
);
assertEquals(1, nested.getTotalHits());
}

@Test
Expand All @@ -340,6 +335,5 @@ public void nested_missing_array_field() {
.get(0)
.tupleValue()
.size());
assertEquals(1, nested.getTotalHits());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,4 @@ void add_split_to_child_by_default() {
testPlan.add(split);
verify(child).add(split);
}

@Test
void get_total_hits_from_child() {
var plan = mock(PhysicalPlan.class);
when(child.getTotalHits()).thenReturn(42L);
when(plan.getChild()).thenReturn(List.of(child));
when(plan.getTotalHits()).then(CALLS_REAL_METHODS);
assertEquals(42, plan.getTotalHits());
verify(child).getTotalHits();
}

@Test
void get_total_hits_uses_default_value() {
var plan = mock(PhysicalPlan.class);
when(plan.getTotalHits()).then(CALLS_REAL_METHODS);
assertEquals(0, plan.getTotalHits());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.opensearch.sql.data.model.ExprValueUtils.collectionValue;
import static org.opensearch.sql.expression.DSL.literal;
Expand Down Expand Up @@ -45,7 +44,6 @@ public void iterateSingleRow() {
results,
contains(collectionValue(Arrays.asList(1, "abc")))
);
assertThat(values.getTotalHits(), equalTo(1L));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void execute(
result.add(plan.next());
}
QueryResponse response = new QueryResponse(new Schema(new ArrayList<>()), new ArrayList<>(),
0, Cursor.None);
Cursor.None);
listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
71 changes: 19 additions & 52 deletions docs/dev/Pagination-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ SQLService ->>+ QueryPlanFactory: execute

Processing of an Initial Query Request has few extra steps comparing versus processing a regular Query Request:
1. Query validation with `CanPaginateVisitor`. This is required to validate whether incoming query can be paged. This also activate legacy engine fallback mechanism.
2. `Serialization` is performed by `PlanSerializer` - it converts Physical Query Plan into a cursor, which could be used query a next page.

2. Creating a paged index scan with `CreatePagingTableScanBuilder` `Optimizer` rule. A Regular Query Request triggers `CreateTableScanBuilder` rule instead.
3. `Serialization` is performed by `PlanSerializer` - it converts Physical Plan Tree into a cursor, which could be used query a next page.

```mermaid
sequenceDiagram
Expand Down Expand Up @@ -482,9 +482,6 @@ SQLService ->>+ QueryPlanFactory : execute
OpenSearchExecutionEngine ->>+ PlanSerializer : convertToCursor
PlanSerializer -->>- OpenSearchExecutionEngine : cursor
end
rect rgb(91, 123, 155)
Note over OpenSearchExecutionEngine : get total hits
end
OpenSearchExecutionEngine -->>- QueryService : execution completed
QueryService -->>- QueryPlanFactory : execution completed
QueryPlanFactory -->>- SQLService : execution completed
Expand All @@ -493,10 +490,9 @@ SQLService ->>+ QueryPlanFactory : execute
#### Subsequent Query Request

Subsequent pages are processed by a new workflow. The key point there:
1. `Deserialization` is performed by `PlanSerializer` to restore entire Physical Query Plan encoded into the cursor.
2. Since query already contains the Physical Query Plan, analysis and optimization steps are no-ops.
3. `Serialization` is performed by `PlanSerializer` - it converts Physical Query Plan into a cursor, which could be used query a next page.
4. Traversal of Physical Query Plan to get total hits, which is required to properly fill response to a user.
1. `Deserialization` is performed by `PlanSerializer` to restore entire Physical Plan Tree encoded into the cursor.
2. Since query already contains the Physical Plan Tree, all tree processing steps are skipped.
3. `Serialization` is performed by `PlanSerializer` - it converts Physical Plan Tree into a cursor, which could be used query a next page.

```mermaid
sequenceDiagram
Expand All @@ -509,17 +505,20 @@ sequenceDiagram
SQLService ->>+ QueryPlanFactory : execute
QueryPlanFactory ->>+ QueryService : execute
QueryService ->>+ Analyzer : analyze
Analyzer -->>- QueryService : new LogicalFetchCursor
QueryService ->>+ Planner : plan
Planner ->>+ DefaultImplementor : implement
DefaultImplementor ->>+ PlanSerializer : deserialize
PlanSerializer -->>- DefaultImplementor: physical query plan
DefaultImplementor -->>- Planner : physical query plan
Planner -->>- QueryService : physical query plan
QueryService ->>+ OpenSearchExecutionEngine : execute
OpenSearchExecutionEngine -->>- QueryService: execution completed
QueryService -->>- QueryPlanFactory : execution completed
rect rgb(91, 123, 155)
note over QueryService, PlanSerializer : Deserialization
QueryService ->>+ PlanSerializer: convertToPlan
PlanSerializer -->>- QueryService: Physical plan tree
end
Note over QueryService : Planner, Optimizer and Implementor<br />are skipped
QueryService ->>+ OpenSearchExecutionEngine : execute
rect rgb(91, 123, 155)
note over OpenSearchExecutionEngine, PlanSerializer : Serialization
OpenSearchExecutionEngine ->>+ PlanSerializer : convertToCursor
PlanSerializer -->>- OpenSearchExecutionEngine : cursor
end
OpenSearchExecutionEngine -->>- QueryService: execution completed
QueryService -->>- QueryPlanFactory : execution completed
QueryPlanFactory -->>- SQLService : execution completed
```

Expand Down Expand Up @@ -776,35 +775,3 @@ class PhysicalPlan:
def close:
innerPlan.close()
```

#### Total Hits

Total Hits is the number of rows matching the search criteria; with `select *` queries it is equal to row (doc) number in the table (index).
Example:
Paging thru `SELECT * FROM calcs` (17 rows) with `fetch_size = 5` returns:

* Page 1: total hits = 17, result size = 5, cursor
* Page 2: total hits = 17, result size = 5, cursor
* Page 3: total hits = 17, result size = 5, cursor
* Page 4: total hits = 17, result size = 2, cursor
* Page 5: total hits = 0, result size = 0

Default implementation of `getTotalHits` in a Physical Plan iterate child plans down the tree and gets the maximum value or 0.

```mermaid
sequenceDiagram
participant OpenSearchExecutionEngine
participant ProjectOperator
participant ResourceMonitorPlan
participant OpenSearchIndexScan
OpenSearchExecutionEngine ->>+ ProjectOperator: getTotalHits
Note over ProjectOperator: default implementation
ProjectOperator ->>+ ResourceMonitorPlan: getTotalHits
Note over ResourceMonitorPlan: call to delegate
ResourceMonitorPlan ->>+ OpenSearchIndexScan: getTotalHits
Note over OpenSearchIndexScan: use stored value from the search response
OpenSearchIndexScan -->>- ResourceMonitorPlan: value
ResourceMonitorPlan -->>- ProjectOperator: value
ProjectOperator -->>- OpenSearchExecutionEngine: value
```
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private ResponseListener<QueryResponse> createQueryResponseListener(
public void onResponse(QueryResponse response) {
sendResponse(channel, OK,
formatter.format(new QueryResult(response.getSchema(), response.getResults(),
response.getCursor(), response.getTotal())));
response.getCursor())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void execute(PhysicalPlan physicalPlan, ExecutionContext context,
}

QueryResponse response = new QueryResponse(physicalPlan.schema(), result,
plan.getTotalHits(), planSerializer.convertToCursor(plan));
planSerializer.convertToCursor(plan));
listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Loading