Skip to content

Commit

Permalink
fix(streaming): hang up source when failed & panic on actor error (#777)
Browse files Browse the repository at this point in the history
* unwrap actor results

Signed-off-by: Bugen Zhao <[email protected]>

* hang up the source reader when error

Signed-off-by: Bugen Zhao <[email protected]>

* revision

Signed-off-by: Bugen Zhao <[email protected]>

* use pending

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Mar 11, 2022
1 parent c009e10 commit 53c479c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
23 changes: 16 additions & 7 deletions rust/stream/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub struct SourceExecutor {

/// current allocated row id
next_row_id: AtomicU64,
first_execution: bool,

/// Identity string
identity: String,
Expand Down Expand Up @@ -159,7 +158,6 @@ impl SourceExecutor {
barrier_receiver,
}),
next_row_id: AtomicU64::from(0u64),
first_execution: true,
identity: format!("SourceExecutor {:X}", executor_id),
op_info,
reader_stream: None,
Expand Down Expand Up @@ -202,8 +200,21 @@ impl SourceReader {
#[try_stream(ok = StreamChunk, error = RwError)]
async fn stream_reader(mut stream_reader: Box<dyn StreamSourceReader>) {
loop {
yield stream_reader.next().await?;
match stream_reader.next().await {
Err(e) => {
// TODO: report this error to meta service to mark the actors failed.
error!("hang up stream reader due to polling error: {}", e);

// Drop the reader, then the error might be caught by the writer side.
drop(stream_reader);
// Then hang up this stream by breaking the loop.
break;
}
Ok(chunk) => yield chunk,
}
}

futures::future::pending().await
}

#[try_stream(ok = Message, error = RwError)]
Expand Down Expand Up @@ -234,11 +245,9 @@ impl SourceReader {
#[async_trait]
impl Executor for SourceExecutor {
async fn next(&mut self) -> Result<Message> {
if self.first_execution {
let mut reader = self.reader.take().unwrap();
if let Some(mut reader) = self.reader.take() {
reader.stream_reader.open().await?;
self.first_execution = false;
self.reader_stream = Some(reader.into_stream().boxed());
self.reader_stream.replace(reader.into_stream().boxed());
}

match self.reader_stream.as_mut().unwrap().next().await {
Expand Down
24 changes: 14 additions & 10 deletions rust/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ lazy_static::lazy_static! {
pub static ref LOCAL_TEST_ADDR: SocketAddr = "127.0.0.1:2333".parse().unwrap();
}

pub type ActorHandle = JoinHandle<()>;

pub struct StreamManagerCore {
/// Each processor runs in a future. Upon receiving a `Terminate` message, they will exit.
/// `handles` store join handles of these futures, and therefore we could wait their
/// termination.
handles: HashMap<u32, JoinHandle<Result<()>>>,
handles: HashMap<u32, ActorHandle>,

pub(crate) context: Arc<SharedContext>,

Expand Down Expand Up @@ -209,9 +211,9 @@ impl StreamManager {

/// This function was called while [`StreamManager`] exited.
pub async fn wait_all(self) -> Result<()> {
let handles = self.core.lock().unwrap().wait_all()?;
let handles = self.core.lock().unwrap().take_all_handles()?;
for (_id, handle) in handles {
handle.await??;
handle.await?;
}
Ok(())
}
Expand All @@ -220,7 +222,7 @@ impl StreamManager {
pub async fn wait_actors(&self, actor_ids: &[u32]) -> Result<()> {
let handles = self.core.lock().unwrap().remove_actor_handles(actor_ids)?;
for handle in handles {
handle.await.unwrap()?
handle.await.unwrap();
}
Ok(())
}
Expand Down Expand Up @@ -659,20 +661,22 @@ impl StreamManagerCore {
trace!("build actor: {:#?}", &dispatcher);

let actor = Actor::new(dispatcher, actor_id, self.context.clone());
self.handles.insert(actor_id, tokio::spawn(actor.run()));
self.handles.insert(
actor_id,
tokio::spawn(async move {
actor.run().await.unwrap(); // unwrap the actor result to panic on error
}),
);
}

Ok(())
}

pub fn wait_all(&mut self) -> Result<HashMap<u32, JoinHandle<Result<()>>>> {
pub fn take_all_handles(&mut self) -> Result<HashMap<u32, ActorHandle>> {
Ok(std::mem::take(&mut self.handles))
}

pub fn remove_actor_handles(
&mut self,
actor_ids: &[u32],
) -> Result<Vec<JoinHandle<Result<()>>>> {
pub fn remove_actor_handles(&mut self, actor_ids: &[u32]) -> Result<Vec<ActorHandle>> {
actor_ids
.iter()
.map(|actor_id| {
Expand Down

0 comments on commit 53c479c

Please sign in to comment.