Skip to content

Commit

Permalink
cleanup and make reverse work
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 13, 2023
1 parent 27fccfb commit e45ff60
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 196 deletions.
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ docs-selection = [
ahash.workspace = true
anyhow.workspace = true
arrow.workspace = true
once_cell.workspace = true
base64 = { version = "0.13", optional = true }
bitflags.workspace = true
chrono = { version = "0.4", optional = true }
Expand All @@ -169,6 +168,7 @@ indexmap = { version = "1", features = ["std"] }
ndarray = { version = "0.15", optional = true, default_features = false }
num.workspace = true
object_store = { version = "0.5.3", default-features = false, optional = true }
once_cell.workspace = true
polars-arrow = { version = "0.26.1", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.26.1", path = "../polars-utils" }
rand = { version = "0.8", optional = true, features = ["small_rng", "std"] }
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ impl Schema {
}

pub fn try_get_full(&self, name: &str) -> PolarsResult<(usize, &String, &DataType)> {
self.inner.get_full(name)
self.inner
.get_full(name)
.ok_or_else(|| PolarsError::NotFound(name.to_string().into()))
}


pub fn get_full(&self, name: &str) -> Option<(usize, &String, &DataType)> {
self.inner.get_full(name)
}
Expand Down
5 changes: 2 additions & 3 deletions polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::VecDeque;
use std::convert::TryFrom;
use std::ops::{Deref, Range};
use std::sync::Arc;
use std::thread::current;

use arrow::array::new_empty_array;
use arrow::io::parquet::read;
Expand Down Expand Up @@ -490,11 +489,11 @@ impl Iterator for BatchedParquetIter {
match self.current_batch.next() {
Some(df) => Some(Ok(df)),
None => match self.inner.next_batches(self.batch_size) {
Err(e) => return Some(Err(e)),
Err(e) => Some(Err(e)),
Ok(opt_batch) => {
let batch = opt_batch?;
self.current_batch = batch.into_iter();
return self.current_batch.next().map(Ok);
self.current_batch.next().map(Ok)
}
},
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub enum ParquetCompression {

impl Default for ParquetCompression {
fn default() -> Self {
ParquetCompression::Lz4Raw
ParquetCompression::Lz4Raw
}
}

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/polars-pipe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ num.workspace = true
polars-arrow = { version = "0.26.1", path = "../../polars-arrow", default-features = false }
polars-core = { version = "0.26.1", path = "../../polars-core", features = ["lazy", "private", "zip_with", "random"], default-features = false }
polars-io = { version = "0.26.1", path = "../../polars-io", default-features = false }
polars-ops = { version = "0.26.1", path = "../../polars-ops", features = ["search_sorted"] }
polars-plan = { version = "0.26.1", path = "../polars-plan", default-features = false, features = ["compile"] }
polars-utils = { version = "0.26.1", path = "../../polars-utils", features = ["sysinfo"] }
polars-ops = { version = "0.26.1", path = "../../polars-ops", features = ["search_sorted"] }
rayon.workspace = true
smartstring = { version = "1" }

Expand Down
35 changes: 20 additions & 15 deletions polars/polars-lazy/polars-pipe/src/executors/sinks/sort/io.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::sync::mpsc::{Sender, SyncSender};
use std::sync::mpsc::SyncSender;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{SystemTime, UNIX_EPOCH};

use polars_core::prelude::*;
use polars_core::POOL;
use polars_io::prelude::*;
use polars_utils::atomic::SyncCounter;

pub(super) type DfIter = Box<dyn ExactSizeIterator<Item=DataFrame>+ Sync + Send>;
pub(super) type DfIter = Box<dyn ExactSizeIterator<Item = DataFrame> + Sync + Send>;
// The Option<IdxCa> are the partitions it should be written to, if any
type Payload = (Option<IdxCa>, DfIter);


pub(super) struct IOThread {
sender: SyncSender<Payload>,
pub(super) dir: PathBuf,
pub(super) sent: SyncCounter,
pub(super) total: SyncCounter,
pub(super) all_processed: Arc<(Condvar, Mutex<()>)>
pub(super) all_processed: Arc<(Condvar, Mutex<()>)>,
}

impl IOThread {
Expand Down Expand Up @@ -48,7 +46,6 @@ impl IOThread {
std::thread::spawn(move || {
let mut count = 0usize;
while let Ok((partitions, iter)) = receiver.recv() {

if let Some(partitions) = partitions {
for (part, df) in partitions.into_no_null_iter().zip(iter) {
let mut path = dir2.clone();
Expand All @@ -63,7 +60,6 @@ impl IOThread {
writer.finish().unwrap();
count += 1;
}

} else {
let mut path = dir2.clone();
path.push(format!("{count}.parquet"));
Expand All @@ -79,21 +75,18 @@ impl IOThread {
count += 1;
}
let total = total2.load(Ordering::Relaxed);
dbg!(total, count);
if total != 0 && total == count {
all_processed2.0.notify_one();
dbg!("notified");
}
}
eprintln!("kill thread");
});

Ok(Self {
sender,
dir,
sent,
total,
all_processed
all_processed,
})
}

Expand All @@ -103,9 +96,21 @@ impl IOThread {
}
pub(super) fn dump_iter(&self, partition: Option<IdxCa>, iter: DfIter) {
let add = iter.size_hint().1.unwrap();
self.sender
.send( (partition, iter))
.unwrap();
self.sender.send((partition, iter)).unwrap();
self.sent.fetch_add(add, Ordering::Relaxed);
}
}

pub(super) fn block_thread_until_io_thread_done(io_thread: &IOThread) {
// get number sent
let sent = io_thread.sent.load(Ordering::Relaxed);
// set total sent
io_thread.total.store(sent, Ordering::Relaxed);

// then the io thread will check if it has written all files, and if it has
// it will set the condvar so we can continue on this thread

// we don't really need the mutex for our case, but the condvar needs one
let cond_lock = io_thread.all_processed.1.lock().unwrap();
let _lock = io_thread.all_processed.0.wait(cond_lock).unwrap();
}
Loading

0 comments on commit e45ff60

Please sign in to comment.