Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

first of the connectors - twitter spout #2852

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,18 @@ new_http_archive(
build_file = "third_party/nomad/nomad.BUILD",
)

# for twitter connector
TWITTER_CONNECTOR_VERSION = "4.0.6"
maven_jar(
name = "org_twitter4j_core",
artifact = "org.twitter4j:twitter4j-core:" + TWITTER_CONNECTOR_VERSION,
)

maven_jar(
name = "org_twitter4j_stream",
artifact = "org.twitter4j:twitter4j-stream:" + TWITTER_CONNECTOR_VERSION,
)

# scala integration
rules_scala_version="5cdae2f034581a05e23c3473613b409de5978833" # update this as needed

Expand Down
21 changes: 21 additions & 0 deletions connectors/heron-twitter/src/java/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package(default_visibility = ["//visibility:public"])

heron_deps_files = [
"//heron/api/src/java:api-java-low-level",
]

third_party_deps_files = [
"//third_party/java:guava",
"//third_party/java:kryo",
"//third_party/java:logging",
]

all_deps = heron_deps_files + third_party_deps_files

java_binary(
name = 'twitter-spout',
srcs = glob(["com/twitter/heron/twitter/**/*.java"]),
deps = all_deps + [
"//third_party/java:twitter4j",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.streamlio.connectors.twitter;

import java.io.Serializable;

@SuppressWarnings("serial")
public class Authentication {
private String consumerKey;
private String consumerSecret;
private String accessToken;
private String accessTokenSecret;
private String[] keyWords;

public Authentication() {
}

public String getConsumerKey() {
return consumerKey;
}

public void setConsumerKey(String consumerKey) {
this.consumerKey = consumerKey;
}

public String getConsumerSecret() {
return consumerSecret;
}

public void setConsumerSecret(String consumerSecret) {
this.consumerSecret = consumerSecret;
}

public String getAccessToken() {
return accessToken;
}

public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}

public String getAccessTokenSecret() {
return accessTokenSecret;
}

public void setAccessTokenSecret(String accessTokenSecret) {
this.accessTokenSecret = accessTokenSecret;
}

public String[] getKeyWords() {
return keyWords;
}

public void setKeyWords(String[] keyWords) {
this.keyWords = keyWords;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.streamlio.connectors.twitter;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the package should be org.apache.heron.connectors.twitter now


import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.common.base.Preconditions;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import com.twitter.heron.api.Config;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.topology.OutputFieldsDeclarer;
import com.twitter.heron.api.spout.BaseRichSpout;
import com.twitter.heron.api.spout.SpoutOutputCollector;
import com.twitter.heron.api.tuple.Fields;
import com.twitter.heron.api.tuple.Values;
import com.twitter.heron.api.utils.Utils;

@SuppressWarnings("serial")
public class Twitter extends BaseRichSpout {
private static final long serialVersionUID = 4322775001819135036L;
private static final Logger LOG = Logger.getLogger(Twitter.class.getName());

private String componentId;
private String spoutId;
private SpoutOutputCollector collector;

private Authentication authInfo;

private LinkedBlockingQueue<Status> queue = null;
private TwitterStream twitterStream;

public Twitter(Authentication authInfo) {
this.authInfo = authInfo;
}

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
this.componentId = context.getThisComponentId();
this.spoutId = String.format("%s-%s", componentId, context.getThisTaskId());
this.collector = spoutOutputCollector;

this.queue = new LinkedBlockingQueue<Status>(1000);
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
queue.offer(status);
}

@Override
public void onDeletionNotice(StatusDeletionNotice sdn) {}

@Override
public void onTrackLimitationNotice(int i) {}

@Override
public void onScrubGeo(long l, long l1) {}

@Override
public void onException(Exception ex) {}

@Override
public void onStallWarning(StallWarning arg0) {
// TODO Auto-generated method stub
}
};

ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(authInfo.getConsumerKey())
.setOAuthConsumerSecret(authInfo.getConsumerSecret())
.setOAuthAccessToken(authInfo.getAccessToken())
.setOAuthAccessTokenSecret(authInfo.getAccessTokenSecret());

twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
twitterStream.addListener(listener);

if (authInfo.getKeyWords().length == 0) {
twitterStream.sample();
} else {
FilterQuery query = new FilterQuery().track(authInfo.getKeyWords());
twitterStream.filter(query);
}
}

@Override
public void nextTuple() {
Status ret = queue.poll();
if (ret == null) {
Utils.sleep(50);
} else {
collector.emit(new Values(ret));
}
}

@Override
public void close() {
super.close();
twitterStream.shutdown();
}

@Override
public void ack(Object id) {}

@Override
public void fail(Object id) {}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweet"));
}
}
126 changes: 126 additions & 0 deletions third_party/cereal/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
licenses(["notice"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a cereal lib in the third_party dir here.

So please either remove the files here or send out another separate PR for changing this lib


package(default_visibility = ["//visibility:public"])

package_name = "cereal"
package_version = "1.2.1"

package_file = package_name + "-" + package_version + ".tar.gz"
package_dir = package_name + "-" + package_version

file_list = [
"include/cereal/access.hpp",
"include/cereal/archives/adapters.hpp",
"include/cereal/archives/binary.hpp",
"include/cereal/archives/json.hpp",
"include/cereal/archives/portable_binary.hpp",
"include/cereal/archives/xml.hpp",
"include/cereal/cereal.hpp",
"include/cereal/details/helpers.hpp",
"include/cereal/details/polymorphic_impl.hpp",
"include/cereal/details/polymorphic_impl_fwd.hpp",
"include/cereal/details/static_object.hpp",
"include/cereal/details/traits.hpp",
"include/cereal/details/util.hpp",
"include/cereal/external/base64.hpp",
"include/cereal/external/rapidjson/allocators.h",
"include/cereal/external/rapidjson/document.h",
"include/cereal/external/rapidjson/encodedstream.h",
"include/cereal/external/rapidjson/encodings.h",
"include/cereal/external/rapidjson/error/en.h",
"include/cereal/external/rapidjson/error/error.h",
"include/cereal/external/rapidjson/filereadstream.h",
"include/cereal/external/rapidjson/filewritestream.h",
"include/cereal/external/rapidjson/fwd.h",
"include/cereal/external/rapidjson/internal/biginteger.h",
"include/cereal/external/rapidjson/internal/diyfp.h",
"include/cereal/external/rapidjson/internal/dtoa.h",
"include/cereal/external/rapidjson/internal/ieee754.h",
"include/cereal/external/rapidjson/internal/itoa.h",
"include/cereal/external/rapidjson/internal/meta.h",
"include/cereal/external/rapidjson/internal/pow10.h",
"include/cereal/external/rapidjson/internal/regex.h",
"include/cereal/external/rapidjson/internal/stack.h",
"include/cereal/external/rapidjson/internal/strfunc.h",
"include/cereal/external/rapidjson/internal/strtod.h",
"include/cereal/external/rapidjson/internal/swap.h",
"include/cereal/external/rapidjson/istreamwrapper.h",
"include/cereal/external/rapidjson/memorybuffer.h",
"include/cereal/external/rapidjson/memorystream.h",
"include/cereal/external/rapidjson/msinttypes/inttypes.h",
"include/cereal/external/rapidjson/msinttypes/stdint.h",
"include/cereal/external/rapidjson/ostreamwrapper.h",
"include/cereal/external/rapidjson/pointer.h",
"include/cereal/external/rapidjson/prettywriter.h",
"include/cereal/external/rapidjson/rapidjson.h",
"include/cereal/external/rapidjson/reader.h",
"include/cereal/external/rapidjson/schema.h",
"include/cereal/external/rapidjson/stream.h",
"include/cereal/external/rapidjson/stringbuffer.h",
"include/cereal/external/rapidjson/writer.h",
"include/cereal/external/rapidxml/rapidxml.hpp",
"include/cereal/external/rapidxml/rapidxml_iterators.hpp",
"include/cereal/external/rapidxml/rapidxml_print.hpp",
"include/cereal/external/rapidxml/rapidxml_utils.hpp",
"include/cereal/macros.hpp",
"include/cereal/types/array.hpp",
"include/cereal/types/base_class.hpp",
"include/cereal/types/bitset.hpp",
"include/cereal/types/boost_variant.hpp",
"include/cereal/types/chrono.hpp",
"include/cereal/types/common.hpp",
"include/cereal/types/complex.hpp",
"include/cereal/types/concepts/pair_associative_container.hpp",
"include/cereal/types/deque.hpp",
"include/cereal/types/forward_list.hpp",
"include/cereal/types/functional.hpp",
"include/cereal/types/list.hpp",
"include/cereal/types/map.hpp",
"include/cereal/types/memory.hpp",
"include/cereal/types/polymorphic.hpp",
"include/cereal/types/queue.hpp",
"include/cereal/types/set.hpp",
"include/cereal/types/stack.hpp",
"include/cereal/types/string.hpp",
"include/cereal/types/tuple.hpp",
"include/cereal/types/unordered_map.hpp",
"include/cereal/types/unordered_set.hpp",
"include/cereal/types/utility.hpp",
"include/cereal/types/valarray.hpp",
"include/cereal/types/vector.hpp",
]

genrule(
name = "cereal-srcs",
srcs = [
package_file,
],
outs = file_list,
cmd = "\n".join([
"export WORKSPACE_ROOT=$$(pwd)",
"export INSTALL_DIR=$$(pwd)/$(@D)",
"export TMP_DIR=$$(mktemp -d -t cereal.XXXXX)",
"mkdir -p $$TMP_DIR",
"cp -R $(SRCS) $$TMP_DIR",
"cd $$TMP_DIR",
"tar xfz " + package_file,
"cd " + package_dir,
"$$WORKSPACE_ROOT/$(location //scripts/compile:env_exec) cmake -Wno-dev -DCMAKE_INSTALL_PREFIX:PATH=$$INSTALL_DIR .",
"$$WORKSPACE_ROOT/$(location //scripts/compile:env_exec) make install",
"rm -rf $$TMP_DIR",
]),
tools = [
"//scripts/compile:env_exec",
],
)

cc_library(
name = "cereal-cxx",
srcs = [
"empty.cc",
] + file_list,
includes = [
"include",
],
linkstatic = 1,
)
Binary file added third_party/cereal/cereal-1.2.1.tar.gz
Binary file not shown.
Empty file added third_party/cereal/empty.cc
Empty file.
13 changes: 13 additions & 0 deletions third_party/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,16 @@ java_library(
"@commons_logging_commons_logging//jar",
],
)

java_library(
name = "twitter4j",
srcs = [ "Empty.java" ],
exports = [
"@org_twitter4j_stream//jar",
"@org_twitter4j_core//jar",
],
deps = [
"@org_twitter4j_stream//jar",
"@org_twitter4j_core//jar",
],
)