Skip to content

Commit

Permalink
Finish implementing operator looping, close #6
Browse files Browse the repository at this point in the history
- introduces named operator threads - from now on, every thread is named
  after the function it encapsulates in order to help in the debugging
  process
- to provide the name, an operator field containing the name has been
  added - it's currently filled ad hoc
  • Loading branch information
Feliix42 committed Feb 23, 2018
1 parent a3b7b16 commit 85fb850
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 9 deletions.
43 changes: 34 additions & 9 deletions src/templates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod runtime;
use self::generictype::*;
use self::types::{Arc, ArcIdentifier, OhuaOperator, ValueType};

use std::thread;
use std::thread::{self, Builder};
use std::sync::mpsc;


Expand Down Expand Up @@ -119,6 +119,7 @@ pub fn ohua_main({input_args}) -> {return_type} {
operators.push(OhuaOperator {
input: vec![],
output: vec![],
name: op.operatorType.qualified_name(),
func: op.operatorType.func,
})
}
Expand All @@ -134,19 +135,43 @@ pub fn ohua_main({input_args}) -> {return_type} {

// thread spawning
for op in operators.drain(..) {
thread::spawn(move || 'threadloop: loop {
// receive arguments
Builder::new()
.name(op.name.as_str().into())
.spawn(move || 'threadloop: loop {
let mut exiting = false;

// receive the arguments from all senders
let mut args = vec![];
for recv in &op.input {
for (index, recv) in (&op.input).iter().enumerate() {
if let Ok(content) = recv.recv() {
args.push(content);
if !exiting {
args.push(content);
} else {
#[cold]
// when we sre in `exiting` state, we should not be here...
eprintln!("[Error] Thread {} entered an inconsistent state. Some input Arcs are empty, others not.", thread::current().name().unwrap());
break 'threadloop;
}
} else {
// TODO: Implement check whether *all* channels are empty
// when there are no messages left to receive, we are done
break 'threadloop;
// when there are no messages left to receive, this operator is done
if !exiting {
// before entering the `exiting` state, make sure that this is valid behavior
if index > 0 {
#[cold]
eprintln!("[Error] Thread {} entered an inconsistent state. Some input Arcs are empty, others not.", thread::current().name().unwrap());
break 'threadloop;
} else {
exiting = true;
}
}
}
}

// when we are in `exiting` state, kill gracefully
if exiting {
break 'threadloop;
}

// call function & send results
let mut results = (op.func)(args);
for (index, mut element_vec) in results.drain(..).enumerate() {
Expand All @@ -157,7 +182,7 @@ pub fn ohua_main({input_args}) -> {return_type} {
}
}
}
});
}).unwrap();
}

// provide the operators with input from the function arguments, if any
Expand Down
13 changes: 13 additions & 0 deletions src/templates/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ohua_runtime::generictype::GenericType;
pub struct OhuaOperator {
pub input: Vec<Receiver<Box<GenericType>>>,
pub output: Vec<Vec<Sender<Box<GenericType>>>>,
pub name: String,
pub func: Box<fn(Vec<Box<GenericType>>) -> Vec<Vec<Box<GenericType>>>>,
}

Expand Down Expand Up @@ -57,3 +58,15 @@ pub struct SfDependency {
pub qbNamespace: Vec<String>,
pub qbName: String,
}

// TODO: [Performance] Move this to compile time?
impl OperatorType {
pub fn qualified_name(&self) -> String {
let mut name = String::new();
for ns_part in &self.qbNamespace {
name += ns_part.as_str();
name += "::";
}
name + self.qbName.as_str()
}
}

0 comments on commit 85fb850

Please sign in to comment.