Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Add ack and fail to Streamlet API (#2909) #3217

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion heron/api/src/java/BUILD
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
licenses(["notice"])

package(default_visibility = ["//visibility:public"])

load("//tools/rules:build_defs.bzl", "DOCLINT_HTML_AND_SYNTAX")
Expand All @@ -16,6 +15,7 @@ api_deps_files = \
heron_java_api_proto_files() + [
":classification",
"//heron/common/src/java:basics-java",
"//third_party/java:guava"
]

# Low Level Api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public FilterOperator(SerializablePredicate<? super R> filterFn) {
public void execute(Tuple tuple) {
R obj = (R) tuple.getValue(0);
if (filterFn.test(obj)) {
collector.emit(new Values(obj));
collector.emit(tuple, new Values(obj));
}
collector.ack(tuple);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void execute(Tuple tuple) {
R obj = (R) tuple.getValue(0);
Iterable<? extends T> result = flatMapFn.apply(obj);
for (T o : result) {
collector.emit(new Values(o));
collector.emit(tuple, new Values(o));
}
collector.ack(tuple);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public void execute(TupleWindow inputWindow) {
for (K key : reduceMap.keySet()) {
Window window = new Window(startWindow, endWindow, windowCountMap.get(key));
KeyedWindow<K> keyedWindow = new KeyedWindow<>(key, window);
collector.emit(new Values(new KeyValue<>(keyedWindow, reduceMap.get(key))));
collector.emit(inputWindow.get(), new Values(new KeyValue<>(keyedWindow,
reduceMap.get(key))));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void execute(Tuple tuple) {
T newValue = reduceFn.apply(oldValue, obj);

reduceMap.put(key, newValue);
collector.emit(new Values(new KeyValue<K, T>(key, newValue)));
collector.emit(tuple, new Values(new KeyValue<K, T>(key, newValue)));
collector.ack(tuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private void innerJoinAndEmit(K key, TupleWindow tupleWindow, Pair<List<V1>, Lis
KeyedWindow<K> keyedWindow = getKeyedWindow(key, tupleWindow);
for (V1 val1 : val.getFirst()) {
for (V2 val2 : val.getSecond()) {
collector.emit(new Values(new KeyValue<>(keyedWindow,
collector.emit(tupleWindow.get(), new Values(new KeyValue<>(keyedWindow,
joinFn.apply(val1, val2))));
}
}
Expand All @@ -180,15 +180,15 @@ private void innerJoinAndEmit(K key, TupleWindow tupleWindow, Pair<List<V1>, Lis
private void outerLeftJoinAndEmit(K key, TupleWindow tupleWindow, Pair<List<V1>, List<V2>> val) {
KeyedWindow<K> keyedWindow = getKeyedWindow(key, tupleWindow);
for (V1 val1 : val.getFirst()) {
collector.emit(new Values(new KeyValue<>(keyedWindow,
collector.emit(tupleWindow.get(), new Values(new KeyValue<>(keyedWindow,
joinFn.apply(val1, null))));
}
}

private void outerRightJoinAndEmit(K key, TupleWindow tupleWindow, Pair<List<V1>, List<V2>> val) {
KeyedWindow<K> keyedWindow = getKeyedWindow(key, tupleWindow);
for (V2 val2 : val.getSecond()) {
collector.emit(new Values(new KeyValue<>(keyedWindow,
collector.emit(tupleWindow.get(), new Values(new KeyValue<>(keyedWindow,
joinFn.apply(null, val2))));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void execute(Tuple tuple) {
K key = keyExtractor.apply(obj);
V value = valueExtractor.apply(obj);

collector.emit(new Values(new KeyValue<>(key, value)));
collector.emit(tuple, new Values(new KeyValue<>(key, value)));
collector.ack(tuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public MapOperator(SerializableFunction<? super R, ? extends T> mapFn) {
public void execute(Tuple tuple) {
R obj = (R) tuple.getValue(0);
T result = mapFn.apply(obj);
collector.emit(new Values(result));
collector.emit(tuple, new Values(result));
collector.ack(tuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public void execute(TupleWindow inputWindow) {
for (K key : reduceMap.keySet()) {
Window window = new Window(startWindow, endWindow, windowCountMap.get(key));
KeyedWindow<K> keyedWindow = new KeyedWindow<>(key, window);
collector.emit(new Values(new KeyValue<>(keyedWindow, reduceMap.get(key))));
collector.emit(inputWindow.get(),
new Values(new KeyValue<>(keyedWindow, reduceMap.get(key))));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void execute(Tuple tuple) {
}

reduceMap.put(key, newValue);
collector.emit(new Values(new KeyValue<K, T>(key, newValue)));
collector.emit(tuple, new Values(new KeyValue<K, T>(key, newValue)));
collector.ack(tuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void execute(Tuple tuple) {
R obj = (R) tuple.getValue(0);
for (Map.Entry<String, SerializablePredicate<R>> entry: splitFns.entrySet()) {
if (entry.getValue().test(obj)) {
collector.emit(entry.getKey(), new Values(obj));
collector.emit(entry.getKey(), tuple, new Values(obj));
}
}
collector.ack(tuple);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void prepare(Map<String, Object> map,
@Override
public void execute(Tuple tuple) {
R obj = (R) tuple.getValue(0);
serializableTransformer.transform(obj, x -> collector.emit(new Values(x)));
serializableTransformer.transform(obj, x -> collector.emit(tuple, new Values(x)));
collector.ack(tuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public UnionOperator() {
@Override
public void execute(Tuple tuple) {
I obj = (I) tuple.getValue(0);
collector.emit(new Values(obj));
collector.emit(tuple, new Values(obj));
collector.ack(tuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@

import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.common.cache.Cache;

import org.apache.heron.api.spout.SpoutOutputCollector;
import org.apache.heron.api.state.State;
Expand All @@ -38,9 +43,17 @@
public class ComplexSource<R> extends StreamletSource {

private static final long serialVersionUID = -5086763670301450007L;
private static final Logger LOG = Logger.getLogger(ComplexSource.class.getName());
private Source<R> generator;
private State<Serializable, Serializable> state;

// protected used to allow unit test access
protected Cache<String, Object> msgIdCache;
protected String msgId;
// taskIds are collected to facilitate units tests
protected List<Integer> taskIds;
private Level logLevel = Level.INFO;

public ComplexSource(Source<R> generator) {
this.generator = generator;
}
Expand All @@ -57,13 +70,40 @@ public void open(Map<String, Object> map, TopologyContext topologyContext,
super.open(map, topologyContext, outputCollector);
Context context = new ContextImpl(topologyContext, map, state);
generator.setup(context);
ackingEnabled = isAckingEnabled(map, topologyContext);
msgIdCache = createCache();
}

@Override
public void nextTuple() {
Collection<R> tuples = generator.get();
msgId = null;
if (tuples != null) {
tuples.forEach(tuple -> collector.emit(new Values(tuple)));
for (R tuple : tuples) {
if (ackingEnabled) {
msgId = getUniqueMessageId();
msgIdCache.put(msgId, tuple);
taskIds = collector.emit(new Values(tuple), msgId);
} else {
taskIds = collector.emit(new Values(tuple));
}
LOG.log(logLevel, "emitting: [" + msgId + "]");
}
}
}

@Override public void ack(Object mid) {
if (ackingEnabled) {
msgIdCache.invalidate(mid);
LOG.log(logLevel, "acked: [" + mid + "]");
}
}

@Override public void fail(Object mid) {
if (ackingEnabled) {
Values values = new Values(msgIdCache.getIfPresent(mid));
taskIds = collector.emit(values, mid);
LOG.log(logLevel, "re-emit: [" + mid + "]");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

import java.io.Serializable;
import java.util.Map;
import java.util.UUID;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import org.apache.heron.api.spout.BaseRichSpout;
import org.apache.heron.api.spout.SpoutOutputCollector;
Expand All @@ -28,6 +32,10 @@
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.streamlet.impl.ContextImpl;

import static org.apache.heron.api.Config.TOPOLOGY_RELIABILITY_MODE;
import static org.apache.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;

/**
* StreamletSource is the base class for all streamlet sources.
Expand All @@ -39,6 +47,7 @@ public abstract class StreamletSource extends BaseRichSpout
private static final long serialVersionUID = 8583965332619565343L;
private static final String OUTPUT_FIELD_NAME = "output";

protected boolean ackingEnabled = false;
protected SpoutOutputCollector collector;

@Override
Expand All @@ -54,6 +63,12 @@ public void open(Map<String, Object> map, TopologyContext topologyContext,
collector = outputCollector;
}

// a convenience method for creating cache
// TODO set appropriate properties in builder
<K, V> Cache<K, V> createCache() {
return CacheBuilder.newBuilder().build();
}

/**
* The sources implementing streamlet functionality have some properties.
* 1. They all output only one stream
Expand All @@ -64,4 +79,23 @@ public void open(Map<String, Object> map, TopologyContext topologyContext,
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields(OUTPUT_FIELD_NAME));
}

/**
* Return a unique message ID for use with ATLEAST_ONCE topologies.
*
* @return a unique message id string.
*/
public String getUniqueMessageId() {
return UUID.randomUUID().toString();
}

/**
* Determine if streamlet acknowledgments (i.e., ATLEAST_ONCE) are set.
*
* @return true if acking is enabled; false otherwise.
*/
public boolean isAckingEnabled(Map map, TopologyContext topologyContext) {
ContextImpl context = new ContextImpl(topologyContext, map, null);
return context.getConfig().get(TOPOLOGY_RELIABILITY_MODE).equals(ATLEAST_ONCE.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
*/
package org.apache.heron.streamlet.impl.sources;

import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.common.cache.Cache;

import org.apache.heron.api.spout.SpoutOutputCollector;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.streamlet.SerializableSupplier;

Expand All @@ -29,14 +38,56 @@
public class SupplierSource<R> extends StreamletSource {

private static final long serialVersionUID = 6476611751545430216L;
private static final Logger LOG = Logger.getLogger(SupplierSource.class.getName());

private SerializableSupplier<R> supplier;
protected SpoutOutputCollector collector;

// protected used to allow unit test access
protected Cache<String, Object> msgIdCache;
protected String msgId;
private Level logLevel = Level.INFO;

public SupplierSource(SerializableSupplier<R> supplier) {
this.supplier = supplier;
}

// The emit methods return a list of taskIds. They are collected to facilitate unit testing.
protected List<Integer> taskIds;

@SuppressWarnings("rawtypes")
@Override
public void nextTuple() {
collector.emit(new Values(supplier.get()));
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector outputCollector) {
collector = outputCollector;
ackingEnabled = isAckingEnabled(map, topologyContext);
msgIdCache = createCache();
}

@Override public void nextTuple() {
msgId = null;
R data = supplier.get();
if (ackingEnabled) {
msgId = getUniqueMessageId();
msgIdCache.put(msgId, data);
taskIds = collector.emit(new Values(data), msgId);
LOG.log(logLevel, "emitted: [" + data + ": " + msgId + "]");
} else {
taskIds = collector.emit(new Values(data));
}
}

@Override public void ack(Object mid) {
if (ackingEnabled) {
msgIdCache.invalidate(mid);
LOG.log(logLevel, "acked: [" + mid + "]");
}
}

@Override public void fail(Object mid) {
if (ackingEnabled) {
Values values = new Values(msgIdCache.getIfPresent(mid));
taskIds = collector.emit(values, mid);
LOG.log(logLevel, "re-emit: [" + mid + "]");
}
}
}
4 changes: 3 additions & 1 deletion heron/api/tests/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ java_tests(
"org.apache.heron.streamlet.impl.utils.StreamletUtilsTest",
"org.apache.heron.api.ConfigTest",
"org.apache.heron.api.HeronSubmitterTest",
"org.apache.heron.api.utils.UtilsTest"
"org.apache.heron.api.utils.UtilsTest",
"org.apache.heron.streamlet.impl.sources.SupplierSourceTest",
"org.apache.heron.streamlet.impl.sources.ComplexSourceTest"
],
runtime_deps = [ ":api-tests" ],
size = "small",
Expand Down
Loading