Skip to content

Commit

Permalink
Add amazon s3 output
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Feb 7, 2018
1 parent 02e06c6 commit 297eb28
Show file tree
Hide file tree
Showing 21 changed files with 975 additions and 496 deletions.
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ all: $(APPS)

$(PATHINSTBIN)/benthos: $(wildcard lib/*/*.go lib/*/*/*.go cmd/benthos/*.go)

$(PATHINSTBIN)/%:
$(PATHINSTBIN)/%: deps
@mkdir -p $(dir $@)
@go build -tags "$(TAGS)" -ldflags "$(LDFLAGS)" -o $@ ./cmd/$*

$(APPS): %: deps $(PATHINSTBIN)/%
$(APPS): %: $(PATHINSTBIN)/%

$(PATHINSTDOCKER)/benthos.tar:
@mkdir -p $(dir $@)
Expand All @@ -35,7 +35,7 @@ $(PATHINSTDOCKER)/benthos.tar:
docker: $(PATHINSTDOCKER)/benthos.tar

deps:
@go get -u github.com/golang/dep/cmd/dep
@go get github.com/golang/dep/cmd/dep
@$$GOPATH/bin/dep ensure

test:
Expand All @@ -55,7 +55,7 @@ clean-docker:

docs: $(APPS)
@$(PATHINSTBIN)/benthos --print-yaml > ./config/everything.yaml; true
@$(PATHINSTBIN)/benthos --list-inputs > ./resources/docs/inputs/list.md; true
@$(PATHINSTBIN)/benthos --list-inputs > ./resources/docs/inputs/README.md; true
@$(PATHINSTBIN)/benthos --list-processors > ./resources/docs/processors/list.md; true
@$(PATHINSTBIN)/benthos --list-buffers > ./resources/docs/buffers/list.md; true
@$(PATHINSTBIN)/benthos --list-outputs > ./resources/docs/outputs/list.md; true
@$(PATHINSTBIN)/benthos --list-buffers > ./resources/docs/buffers/README.md; true
@$(PATHINSTBIN)/benthos --list-outputs > ./resources/docs/outputs/README.md; true
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ persistence, or nothing at all (direct bridge).

Currently supported input/output targets:

- [Amazon S3][amazons3]
- [Amazon (S3, SQS)][amazons3]
- File
- HTTP(S)
- [Kafka][kafka]
Expand Down
24 changes: 15 additions & 9 deletions config/env/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,21 @@ BENTHOS_MAX_PART_SIZE = 1073741824
BENTHOS_INPUT_PROCESSOR = noop
BENTHOS_OUTPUT_PROCESSOR = noop

AMAZON_S3_INPUT_REGION = eu-west-1
AMAZON_S3_INPUT_BUCKET =
AMAZON_S3_INPUT_DELETE_OBJECTS = false
AMAZON_S3_INPUT_SQS_URL =
AMAZON_S3_INPUT_SQS_BODY_PATH = Records.s3.object.key
AMAZON_S3_INPUT_CREDENTIALS_ID =
AMAZON_S3_INPUT_CREDENTIALS_SECRET =
AMAZON_S3_INPUT_CREDENTIALS_TOKEN =
AMAZON_S3_INPUT_TIMEOUT_S = 5
AMAZON_INPUT_REGION = eu-west-1
AMAZON_INPUT_BUCKET =
AMAZON_INPUT_DELETE_OBJECTS = false
AMAZON_INPUT_SQS_URL =
AMAZON_INPUT_SQS_BODY_PATH = Records.s3.object.key
AMAZON_INPUT_CREDENTIALS_ID =
AMAZON_INPUT_CREDENTIALS_SECRET =
AMAZON_INPUT_CREDENTIALS_TOKEN =
AMAZON_INPUT_TIMEOUT_S = 5
AMAZON_OUTPUT_REGION = eu-west-1
AMAZON_OUTPUT_BUCKET =
AMAZON_OUTPUT_CREDENTIALS_ID =
AMAZON_OUTPUT_CREDENTIALS_SECRET =
AMAZON_OUTPUT_CREDENTIALS_TOKEN =
AMAZON_OUTPUT_TIMEOUT_S = 5

AMQP_INPUT_URL =
AMQP_INPUT_EXCHANGE = benthos-exchange
Expand Down
140 changes: 78 additions & 62 deletions config/env/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,35 @@ http:
input:
type: ${BENTHOS_INPUT:http_server}
amazon_s3:
region: ${AMAZON_S3_INPUT_REGION:eu-west-1}
bucket: ${AMAZON_S3_INPUT_BUCKET}
delete_objects: ${AMAZON_S3_INPUT_DELETE_OBJECTS:false}
sqs_url: ${AMAZON_S3_INPUT_SQS_URL}
sqs_body_path: ${AMAZON_S3_INPUT_SQS_BODY_PATH:Records.s3.object.key}
region: ${AMAZON_INPUT_REGION:eu-west-1}
bucket: ${AMAZON_INPUT_BUCKET}
delete_objects: ${AMAZON_INPUT_DELETE_OBJECTS:false}
sqs_url: ${AMAZON_INPUT_SQS_URL}
sqs_body_path: ${AMAZON_INPUT_SQS_BODY_PATH:Records.s3.object.key}
credentials:
id: ${AMAZON_S3_INPUT_CREDENTIALS_ID}
secret: ${AMAZON_S3_INPUT_CREDENTIALS_SECRET}
token: ${AMAZON_S3_INPUT_CREDENTIALS_TOKEN}
timeout_s: ${AMAZON_S3_INPUT_TIMEOUT_S:5}
http_server:
address: ${HTTP_SERVER_INPUT_ADDRESS}
path: ${HTTP_SERVER_INPUT_PATH:/post}
id: ${AMAZON_INPUT_CREDENTIALS_ID}
secret: ${AMAZON_INPUT_CREDENTIALS_SECRET}
token: ${AMAZON_INPUT_CREDENTIALS_TOKEN}
timeout_s: ${AMAZON_INPUT_TIMEOUT_S:5}
amazon_sqs:
region: ${AMAZON_INPUT_REGION:eu-west-1}
url: ${AMAZON_INPUT_SQS_URL}
credentials:
id: ${AMAZON_INPUT_CREDENTIALS_ID}
secret: ${AMAZON_INPUT_CREDENTIALS_SECRET}
token: ${AMAZON_INPUT_CREDENTIALS_TOKEN}
timeout_s: ${AMAZON_INPUT_TIMEOUT_S:5}
amqp:
url: ${AMQP_INPUT_URL}
exchange: ${AMQP_INPUT_EXCHANGE:benthos-exchange}
exchange_type: ${AMQP_INPUT_EXCHANGE_TYPE:direct}
queue: ${AMQP_INPUT_QUEUE:benthos-stream}
key: ${AMQP_INPUT_KEY:benthos-key}
consumer_tag: ${AMQP_INPUT_CONSUMER_TAG:benthos-consumer}
file:
path: ${FILE_INPUT_PATH}
multipart: ${FILE_INPUT_MULTIPART:false}
max_buffer: ${FILE_INPUT_MAX_BUFFER:65536}
http_client:
url: ${HTTP_CLIENT_INPUT_URL}
verb: ${HTTP_CLIENT_INPUT_VERB:GET}
Expand All @@ -51,16 +67,9 @@ input:
enabled: ${HTTP_CLIENT_INPUT_OAUTH_ENABLED:false}
timeout_ms: ${HTTP_CLIENT_INPUT_TIMEOUT_MS:5000}
skip_cert_verify: ${HTTP_CLIENT_INPUT_SKIP_CERT_VERIFY:false}
zmq4:
urls:
- ${ZMQ_INPUT_URLS}
bind: ${ZMQ_INPUT_BIND:true}
socket_type: ${ZMQ_INPUT_SOCKET:PULL}
scalability_protocols:
urls:
- ${SCALE_PROTO_INPUT_URLS}
bind: ${SCALE_PROTO_INPUT_BIND:true}
socket_type: ${SCALE_PROTO_INPUT_SOCKET:PULL}
http_server:
address: ${HTTP_SERVER_INPUT_ADDRESS}
path: ${HTTP_SERVER_INPUT_PATH:/post}
kafka:
addresses:
- ${KAFKA_INPUT_BROKER_ADDRESSES}
Expand All @@ -77,21 +86,6 @@ input:
topics:
- ${KAFKA_INPUT_TOPIC:benthos-stream}
start_from_oldest: ${KAFKA_INPUT_START_OLDEST:true}
amqp:
url: ${AMQP_INPUT_URL}
exchange: ${AMQP_INPUT_EXCHANGE:benthos-exchange}
exchange_type: ${AMQP_INPUT_EXCHANGE_TYPE:direct}
queue: ${AMQP_INPUT_QUEUE:benthos-stream}
key: ${AMQP_INPUT_KEY:benthos-key}
consumer_tag: ${AMQP_INPUT_CONSUMER_TAG:benthos-consumer}
nsq:
nsqd_tcp_addresses:
- ${NSQD_INPUT_TCP_ADDRESSES}
lookupd_http_addresses:
- ${NSQD_INPUT_LOOKUP_ADDRESSES}
topic: ${NSQ_INPUT_TOPIC:benthos-messages}
channel: ${NSQ_INPUT_CHANNEL:benthos-stream}
user_agent: ${NSQ_INPUT_USER_AGENT:benthos-consumer}
nats:
urls:
- ${NATS_INPUT_URLS}
Expand All @@ -104,6 +98,14 @@ input:
queue: ${NATS_INPUT_QUEUE:benthos-queue}
durable_name: ${NATS_INPUT_DURABLE_NAME:benthos-offset}
subject: ${NATS_INPUT_SUBJECT:benthos-stream}
nsq:
nsqd_tcp_addresses:
- ${NSQD_INPUT_TCP_ADDRESSES}
lookupd_http_addresses:
- ${NSQD_INPUT_LOOKUP_ADDRESSES}
topic: ${NSQ_INPUT_TOPIC:benthos-messages}
channel: ${NSQ_INPUT_CHANNEL:benthos-stream}
user_agent: ${NSQ_INPUT_USER_AGENT:benthos-consumer}
redis_list:
url: ${REDIS_INPUT_URL}
key: ${REDIS_INPUT_LIST:benthos_list}
Expand All @@ -112,10 +114,16 @@ input:
url: ${REDIS_INPUT_URL}
channels:
- ${REDIS_INPUT_CHANNEL:benthos-stream}
file:
path: ${FILE_INPUT_PATH}
multipart: ${FILE_INPUT_MULTIPART:false}
max_buffer: ${FILE_INPUT_MAX_BUFFER:65536}
scalability_protocols:
urls:
- ${SCALE_PROTO_INPUT_URLS}
bind: ${SCALE_PROTO_INPUT_BIND:true}
socket_type: ${SCALE_PROTO_INPUT_SOCKET:PULL}
zmq4:
urls:
- ${ZMQ_INPUT_URLS}
bind: ${ZMQ_INPUT_BIND:true}
socket_type: ${ZMQ_INPUT_SOCKET:PULL}
processors:
- type: bounds_check
bounds_check:
Expand All @@ -125,6 +133,21 @@ input:
- type: ${BENTHOS_INPUT_PROCESSOR:noop}
output:
type: ${BENTHOS_OUTPUT:http_server}
amazon_s3:
region: ${AMAZON_OUTPUT_REGION:eu-west-1}
bucket: ${AMAZON_OUTPUT_BUCKET}
credentials:
id: ${AMAZON_OUTPUT_CREDENTIALS_ID}
secret: ${AMAZON_OUTPUT_CREDENTIALS_SECRET}
token: ${AMAZON_OUTPUT_CREDENTIALS_TOKEN}
timeout_s: ${AMAZON_OUTPUT_TIMEOUT_S:5}
amqp:
url: ${AMQP_OUTPUT_URL}
exchange: ${AMQP_OUTPUT_EXCHANGE:benthos-exchange}
exchange_type: ${AMQP_OUTPUT_EXCHANGE_TYPE:direct}
key: ${AMQP_OUTPUT_KEY:benthos-key}
file:
path: ${FILE_OUTPUT_PATH}
http_client:
url: ${HTTP_CLIENT_OUTPUT_URL}
verb: ${HTTP_CLIENT_OUTPUT_VERB:POST}
Expand All @@ -142,32 +165,13 @@ output:
address: ${HTTP_SERVER_OUTPUT_ADDRESS}
path: ${HTTP_SERVER_OUTPUT_PATH:/get}
stream_path: ${HTTP_SERVER_OUTPUT_STREAM_PATH:/get/stream}
zmq4:
urls:
- ${ZMQ_OUTPUT_URLS}
bind: ${ZMQ_OUTPUT_BIND:true}
socket_type: ${ZMQ_OUTPUT_SOCKET:PULL}
scalability_protocols:
urls:
- ${SCALE_PROTO_OUTPUT_URLS}
bind: ${SCALE_PROTO_OUTPUT_BIND:false}
socket_type: ${SCALE_PROTO_OUTPUT_SOCKET:PUSH}
kafka:
addresses:
- ${KAFKA_OUTPUT_BROKER_ADDRESSES}
client_id: ${KAFKA_OUTPUT_CLIENT_ID:benthos-client}
max_msg_bytes: ${KAFKA_OUTPUT_MAX_MSG_BYTES:1000000}
topic: ${KAFKA_OUTPUT_TOPIC:benthos-stream}
ack_replicas: ${KAFKA_OUTPUT_ACK_REP:true}
amqp:
url: ${AMQP_OUTPUT_URL}
exchange: ${AMQP_OUTPUT_EXCHANGE:benthos-exchange}
exchange_type: ${AMQP_OUTPUT_EXCHANGE_TYPE:direct}
key: ${AMQP_OUTPUT_KEY:benthos-key}
nsq:
nsqd_tcp_address: ${NSQ_OUTPUT_TCP_ADDRESS}
topic: ${NSQ_OUTPUT_TOPIC:benthos-messages}
user_agent: ${NSQ_OUTPUT_USER_AGENT:benthos-consumer}
nats:
urls:
- ${NATS_OUTPUT_URLS}
Expand All @@ -178,14 +182,26 @@ output:
cluster_id: ${NATS_OUTPUT_CLUSTER_ID:benthos-cluster}
client_id: ${NATS_OUTPUT_CLIENT_ID:benthos-consumer}
subject: ${NATS_OUTPUT_SUBJECT:benthos-stream}
nsq:
nsqd_tcp_address: ${NSQ_OUTPUT_TCP_ADDRESS}
topic: ${NSQ_OUTPUT_TOPIC:benthos-messages}
user_agent: ${NSQ_OUTPUT_USER_AGENT:benthos-consumer}
redis_list:
url: ${REDIS_OUTPUT_URL}
key: ${REDIS_OUTPUT_LIST:benthos_list}
redis_pubsub:
url: ${REDIS_OUTPUT_URL}
channel: ${REDIS_OUTPUT_CHANNEL:benthos-stream}
file:
path: ${FILE_OUTPUT_PATH}
scalability_protocols:
urls:
- ${SCALE_PROTO_OUTPUT_URLS}
bind: ${SCALE_PROTO_OUTPUT_BIND:false}
socket_type: ${SCALE_PROTO_OUTPUT_SOCKET:PUSH}
zmq4:
urls:
- ${ZMQ_OUTPUT_URLS}
bind: ${ZMQ_OUTPUT_BIND:true}
socket_type: ${ZMQ_OUTPUT_SOCKET:PULL}
processors:
- type: ${BENTHOS_OUTPUT_PROCESSOR:noop}
logger:
Expand Down
17 changes: 17 additions & 0 deletions config/everything.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ input:
secret: ""
token: ""
timeout_s: 5
amazon_sqs:
region: eu-west-1
url: ""
credentials:
id: ""
secret: ""
token: ""
timeout_s: 5
amqp:
url: amqp://guest:guest@localhost:5672/
exchange: benthos-exchange
Expand Down Expand Up @@ -159,6 +167,15 @@ input:
parts: []
output:
type: stdout
amazon_s3:
region: eu-west-1
bucket: ""
path: ${!count:files}-${!timestamp_unix_nano}.txt
credentials:
id: ""
secret: ""
token: ""
timeout_s: 5
amqp:
url: amqp://guest:guest@localhost:5672/
exchange: benthos-exchange
Expand Down
53 changes: 53 additions & 0 deletions lib/input/amazon_sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2018 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package input

import (
"github.com/Jeffail/benthos/lib/input/reader"
"github.com/Jeffail/benthos/lib/util/service/log"
"github.com/Jeffail/benthos/lib/util/service/metrics"
)

//------------------------------------------------------------------------------

func init() {
constructors["amazon_sqs"] = typeSpec{
constructor: NewAmazonSQS,
description: `
Receive messages from an Amazon SQS URL, only the body is extracted into
messages.`,
}
}

//------------------------------------------------------------------------------

// NewAmazonSQS creates a new Amazon SQS input type.
func NewAmazonSQS(conf Config, log log.Modular, stats metrics.Type) (Type, error) {
return NewReader(
"amazon_sqs",
reader.NewPreserver(
reader.NewAmazonSQS(conf.AmazonSQS, log, stats),
),
log, stats,
)
}

//------------------------------------------------------------------------------
2 changes: 2 additions & 0 deletions lib/input/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var constructors = map[string]typeSpec{}
type Config struct {
Type string `json:"type" yaml:"type"`
AmazonS3 reader.AmazonS3Config `json:"amazon_s3" yaml:"amazon_s3"`
AmazonSQS reader.AmazonSQSConfig `json:"amazon_sqs" yaml:"amazon_sqs"`
AMQP AMQPConfig `json:"amqp" yaml:"amqp"`
FanIn FanInConfig `json:"fan_in" yaml:"fan_in"`
File FileConfig `json:"file" yaml:"file"`
Expand All @@ -80,6 +81,7 @@ func NewConfig() Config {
return Config{
Type: "stdin",
AmazonS3: reader.NewAmazonS3Config(),
AmazonSQS: reader.NewAmazonSQSConfig(),
AMQP: NewAMQPConfig(),
FanIn: NewFanInConfig(),
File: NewFileConfig(),
Expand Down
Loading

0 comments on commit 297eb28

Please sign in to comment.