Skip to content

Commit

Permalink
add test and cleanup synccounters
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 13, 2023
1 parent 71325eb commit d62f19e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ impl Sink for SortSink {
}

fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
// safety: we are the final thread and will drop only once.
unsafe {
self.mem_total.manual_drop();
self.free_mem.manual_drop();
}

if self.ooc {
let lock = self.io_thread.lock().unwrap();
let io_thread = lock.as_ref().unwrap();
Expand Down
1 change: 0 additions & 1 deletion polars/polars-lazy/src/physical_plan/streaming/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ pub(crate) fn insert_streaming_nodes(
continue;
}
let verbose = std::env::var("POLARS_VERBOSE").is_ok();
dbg!(verbose);

for branch in tree {
// should be reset for every branch
Expand Down
19 changes: 19 additions & 0 deletions py-polars/tests/slow/test_streaming.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import time

import numpy as np
Expand All @@ -13,3 +14,21 @@ def test_cross_join_stack() -> None:
assert a.join(a, how="cross").head().collect(streaming=True).shape == (5, 2)
t1 = time.time()
assert (t1 - t0) < 0.5


def test_ooc_sort() -> None:
# not sure if monkeypatch will be visible in rust
env = "POLARS_FORCE_OOC_SORT"
os.environ[env] = "1"

s = pl.arange(0, 100_000, eager=True).rename("idx")

df = s.shuffle().to_frame()

for reverse in [True, False]:
out = (
df.lazy().sort("idx", reverse=reverse).collect(streaming=True)
).to_series()

assert out.series_equal(s.sort(reverse=reverse))
os.unsetenv(env)

0 comments on commit d62f19e

Please sign in to comment.