-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP - Feature/smart module filter #159
Conversation
Nice work. Thanks for contribution |
Nice work for someone non Rust engineer :-) |
Your docs and rust by example, helped me pull this together, so thank you for having really good documentation! I have also implemented this in the NodeJS library, i'll add these changes there; and open a PR when i have finished the test suite |
I want to improve the file handling logic within the |
Someone beat you for client node: infinyon/fluvio-client-node#256. |
There is some good ideas in there! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other than CI, this is good! Thanks so much for the PR! Hopefully you found it fun and learned something.
Unclear why the path isn't found in CI. Does make integration-tests
work locally?
Having reviewed the NodeJS implementation and the great work by Esteban, it got me thinking, does the implementation need to be more flexible? Right now, you can only pass in a string for a filter smart module, but it would be nice to allow for other types, likewise there are other options in the My end goal would be to configure the python client like so: # Allows for more options and smart module types
# and some validation of paths before passing off to Rust
smart_module = SmartModule(file_path, SmartModuleKind.FILTER)
# mimics the fluvio::consumer::ConsumerConfigBuilder options
# other options could be exposed also, isolation, disable_continious, etc
builder = ConsumerConfigBuilder(max_bytes=256, smart_module=smart_module)
consumer = fluvio.partition_consumer("some-topic", 0)
for i in consumer.stream_with_config(Offset.beginning(), ConsumerConfig.from_builder(builder)):
# do something I wanted to get your thoughts on this, no code has been changed, but in the coming weeks we will be using it heavily at Klarian so want to make sure it is as ready and extensible as possible. |
Yeah, I think we should have the Using the same pattern from |
src/glue.rs.in
Outdated
} | ||
|
||
impl ConsumerConfigWrapper { | ||
fn new_config_with_wasm_filter(file: &str) -> ConsumerConfigWrapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't catch it until I saw the stacktrace in CI but we should remove the unwraps
. Due to the fact that std::fs::read
returns a Result<Vec<u8>, std::io::Error>
, let's make this return something like Result<ConsumerConfigWrapper, std::io::Error>
.
I'm a little hesitant to suggest it but given that FluvioError
implements From<IoError>
, this function could also just return a Result<ConsumerConfigWrapper, FluvioError>
and have raw_buffer.as_ref().unwrap()
be raw_buffer.as_ref()?
. The ?
here is called the question mark operator and is used to bubble up and handle various error types. Basically, it's syntactic sugar that roughly expands to:
let good_val = match my_result_type {
Err(e) => return FluvioError::from(e)
Ok(val) => val
};
The reason I'm a little hesitant about using the base FluvioError
is that the conversions to a FluvioError
(really just any rust error type) are done in that rust crate and are exposed upward. Something about having a library/crate that converts an io error downward a dependency's error rather than it's own error type feels wrong. That being said, I think I've done it before.
Anyway, the unwrap
causes this to segfault/panic rather than bubble up to the python as an exception. The Flapigen macros turns these error types into Strings (though, it can also return a typed error if we want) and raises them on the python side.
What do you think? We could do the simple change on the error type in this PR and then create an issue or have a follow up PR exposing a more expressive type to the Python side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will implement your first suggestion and return, sorry about the delay, i was having a weekend away from the computer!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have implemented your suggestion, it now looks like this:
impl ConsumerConfigWrapper {
fn new_config_with_wasm_filter(file: &str) -> Result<ConsumerConfigWrapper, std::io::Error> {
let raw_buffer = match std::fs::read(file) {
Ok(b) => b,
Err(error) => return Err(error)
};
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
match encoder.read_to_end(&mut buffer) {
Ok(encoder) => encoder,
Err(error) => return Err(error)
};
Ok(ConsumerConfigWrapper {
wasm_module: buffer,
})
}
}
The only bit i was unsure on, was in the stream_with_config
method, i was unsure of how to handle the error from the result so i settled on this:
pub fn stream_with_config(
consumer: &PartitionConsumer,
offset: &Offset,
wasm_module_path: &str
) -> Result<PartitionConsumerStream, FluvioError> {
let config_wrapper = match ConsumerConfigWrapper::new_config_with_wasm_filter(wasm_module_path) {
Ok(config) => config,
Err(error) => return Err(FluvioError::Other(error.to_string()))
};
let mut builder = ConsumerConfig::builder();
builder.smartmodule(Some(SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(config_wrapper.wasm_module),
kind: SmartModuleKind::Filter,
params: Default::default()
}));
let config = builder.build().map_err(|err| FluvioError::Other(err.to_string()))?;
run_block_on(consumer.stream_with_config(offset.clone(), config)).map(|stream| PartitionConsumerStream { inner: Box::pin(stream) })
}
@sehz sorry i pressed the wrong button! The joys of a bluetooth mouse! |
Wohoo! All tests are passing now, i added a config for flake8 and ran the python code through black, i'd recommend adding black to the repo in the future! |
|
||
impl ConsumerConfigWrapper { | ||
fn new_config_with_wasm_filter(file: &str) -> Result<ConsumerConfigWrapper, std::io::Error> { | ||
let raw_buffer = match std::fs::read(file) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a show stopper. this could be reduced to raw_buffer = read(file)?
}; | ||
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default()); | ||
let mut buffer = Vec::with_capacity(raw_buffer.len()); | ||
match encoder.read_to_end(&mut buffer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks ok just with minor nits that can be done in separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
Interesting, are we talking about this black? |
Yeah that black, mixed opinions on the python community but really useful in formatting to a set standard, its used by some big projects and we use at Klarian to great effect! Worth investigating |
Done in #164. |
Description
Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
Feedback is needed and would be appreciated, i am not a Rust engineer so i am sure there are better ways to do this.
This change adds the ability to steam with a smart module on the fly (referred to as SmartStream). The implementation is in two languages.
Rust
stream_with_config
in the_PartitionConsumer
modulePython
stream_with_config
Fixes # (issue)
Type of change
How Has This Been Tested?
Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration
test_consume_with_smart_module_iterator
Checklist: