From 297eb288abcae3b183be8ac4e6f312f68a007018 Mon Sep 17 00:00:00 2001 From: jeffail Date: Wed, 7 Feb 2018 19:26:16 +0000 Subject: [PATCH] Add amazon s3 output --- Makefile | 12 +- README.md | 2 +- config/env/README.md | 24 ++-- config/env/default.yaml | 140 ++++++++++--------- config/everything.yaml | 17 +++ lib/input/amazon_sqs.go | 53 +++++++ lib/input/constructor.go | 2 + lib/input/reader/amazon_s3.go | 25 ++-- lib/input/reader/amazon_sqs.go | 175 +++++++++++++++++++++++ lib/output/amazon_s3.go | 51 +++++++ lib/output/constructor.go | 2 + lib/output/writer/amazon_s3.go | 169 ++++++++++++++++++++++ lib/processor/insert_part.go | 2 +- lib/processor/set_json.go | 2 +- resources/docs/buffers/README.md | 30 +++- resources/docs/buffers/list.md | 29 ---- resources/docs/inputs/README.md | 223 +++++++++++++++++++++++++++++- resources/docs/inputs/list.md | 217 ----------------------------- resources/docs/outputs/README.md | 150 +++++++++++++++++++- resources/docs/outputs/list.md | 142 ------------------- resources/docs/processors/list.md | 4 +- 21 files changed, 975 insertions(+), 496 deletions(-) create mode 100644 lib/input/amazon_sqs.go create mode 100644 lib/input/reader/amazon_sqs.go create mode 100644 lib/output/amazon_s3.go create mode 100644 lib/output/writer/amazon_s3.go delete mode 100644 resources/docs/buffers/list.md delete mode 100644 resources/docs/inputs/list.md delete mode 100644 resources/docs/outputs/list.md diff --git a/Makefile b/Makefile index f0278c0a7c..0a95fcbde7 100644 --- a/Makefile +++ b/Makefile @@ -19,11 +19,11 @@ all: $(APPS) $(PATHINSTBIN)/benthos: $(wildcard lib/*/*.go lib/*/*/*.go cmd/benthos/*.go) -$(PATHINSTBIN)/%: +$(PATHINSTBIN)/%: deps @mkdir -p $(dir $@) @go build -tags "$(TAGS)" -ldflags "$(LDFLAGS)" -o $@ ./cmd/$* -$(APPS): %: deps $(PATHINSTBIN)/% +$(APPS): %: $(PATHINSTBIN)/% $(PATHINSTDOCKER)/benthos.tar: @mkdir -p $(dir $@) @@ -35,7 +35,7 @@ $(PATHINSTDOCKER)/benthos.tar: docker: $(PATHINSTDOCKER)/benthos.tar deps: - @go get -u github.com/golang/dep/cmd/dep + @go get github.com/golang/dep/cmd/dep @$$GOPATH/bin/dep ensure test: @@ -55,7 +55,7 @@ clean-docker: docs: $(APPS) @$(PATHINSTBIN)/benthos --print-yaml > ./config/everything.yaml; true - @$(PATHINSTBIN)/benthos --list-inputs > ./resources/docs/inputs/list.md; true + @$(PATHINSTBIN)/benthos --list-inputs > ./resources/docs/inputs/README.md; true @$(PATHINSTBIN)/benthos --list-processors > ./resources/docs/processors/list.md; true - @$(PATHINSTBIN)/benthos --list-buffers > ./resources/docs/buffers/list.md; true - @$(PATHINSTBIN)/benthos --list-outputs > ./resources/docs/outputs/list.md; true + @$(PATHINSTBIN)/benthos --list-buffers > ./resources/docs/buffers/README.md; true + @$(PATHINSTBIN)/benthos --list-outputs > ./resources/docs/outputs/README.md; true diff --git a/README.md b/README.md index d76944efae..6b42fe45c7 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ persistence, or nothing at all (direct bridge). Currently supported input/output targets: -- [Amazon S3][amazons3] +- [Amazon (S3, SQS)][amazons3] - File - HTTP(S) - [Kafka][kafka] diff --git a/config/env/README.md b/config/env/README.md index 0dd7e83d77..cc3dd322ae 100644 --- a/config/env/README.md +++ b/config/env/README.md @@ -25,15 +25,21 @@ BENTHOS_MAX_PART_SIZE = 1073741824 BENTHOS_INPUT_PROCESSOR = noop BENTHOS_OUTPUT_PROCESSOR = noop -AMAZON_S3_INPUT_REGION = eu-west-1 -AMAZON_S3_INPUT_BUCKET = -AMAZON_S3_INPUT_DELETE_OBJECTS = false -AMAZON_S3_INPUT_SQS_URL = -AMAZON_S3_INPUT_SQS_BODY_PATH = Records.s3.object.key -AMAZON_S3_INPUT_CREDENTIALS_ID = -AMAZON_S3_INPUT_CREDENTIALS_SECRET = -AMAZON_S3_INPUT_CREDENTIALS_TOKEN = -AMAZON_S3_INPUT_TIMEOUT_S = 5 +AMAZON_INPUT_REGION = eu-west-1 +AMAZON_INPUT_BUCKET = +AMAZON_INPUT_DELETE_OBJECTS = false +AMAZON_INPUT_SQS_URL = +AMAZON_INPUT_SQS_BODY_PATH = Records.s3.object.key +AMAZON_INPUT_CREDENTIALS_ID = +AMAZON_INPUT_CREDENTIALS_SECRET = +AMAZON_INPUT_CREDENTIALS_TOKEN = +AMAZON_INPUT_TIMEOUT_S = 5 +AMAZON_OUTPUT_REGION = eu-west-1 +AMAZON_OUTPUT_BUCKET = +AMAZON_OUTPUT_CREDENTIALS_ID = +AMAZON_OUTPUT_CREDENTIALS_SECRET = +AMAZON_OUTPUT_CREDENTIALS_TOKEN = +AMAZON_OUTPUT_TIMEOUT_S = 5 AMQP_INPUT_URL = AMQP_INPUT_EXCHANGE = benthos-exchange diff --git a/config/env/default.yaml b/config/env/default.yaml index 342328a904..3670bff00e 100644 --- a/config/env/default.yaml +++ b/config/env/default.yaml @@ -20,19 +20,35 @@ http: input: type: ${BENTHOS_INPUT:http_server} amazon_s3: - region: ${AMAZON_S3_INPUT_REGION:eu-west-1} - bucket: ${AMAZON_S3_INPUT_BUCKET} - delete_objects: ${AMAZON_S3_INPUT_DELETE_OBJECTS:false} - sqs_url: ${AMAZON_S3_INPUT_SQS_URL} - sqs_body_path: ${AMAZON_S3_INPUT_SQS_BODY_PATH:Records.s3.object.key} + region: ${AMAZON_INPUT_REGION:eu-west-1} + bucket: ${AMAZON_INPUT_BUCKET} + delete_objects: ${AMAZON_INPUT_DELETE_OBJECTS:false} + sqs_url: ${AMAZON_INPUT_SQS_URL} + sqs_body_path: ${AMAZON_INPUT_SQS_BODY_PATH:Records.s3.object.key} credentials: - id: ${AMAZON_S3_INPUT_CREDENTIALS_ID} - secret: ${AMAZON_S3_INPUT_CREDENTIALS_SECRET} - token: ${AMAZON_S3_INPUT_CREDENTIALS_TOKEN} - timeout_s: ${AMAZON_S3_INPUT_TIMEOUT_S:5} - http_server: - address: ${HTTP_SERVER_INPUT_ADDRESS} - path: ${HTTP_SERVER_INPUT_PATH:/post} + id: ${AMAZON_INPUT_CREDENTIALS_ID} + secret: ${AMAZON_INPUT_CREDENTIALS_SECRET} + token: ${AMAZON_INPUT_CREDENTIALS_TOKEN} + timeout_s: ${AMAZON_INPUT_TIMEOUT_S:5} + amazon_sqs: + region: ${AMAZON_INPUT_REGION:eu-west-1} + url: ${AMAZON_INPUT_SQS_URL} + credentials: + id: ${AMAZON_INPUT_CREDENTIALS_ID} + secret: ${AMAZON_INPUT_CREDENTIALS_SECRET} + token: ${AMAZON_INPUT_CREDENTIALS_TOKEN} + timeout_s: ${AMAZON_INPUT_TIMEOUT_S:5} + amqp: + url: ${AMQP_INPUT_URL} + exchange: ${AMQP_INPUT_EXCHANGE:benthos-exchange} + exchange_type: ${AMQP_INPUT_EXCHANGE_TYPE:direct} + queue: ${AMQP_INPUT_QUEUE:benthos-stream} + key: ${AMQP_INPUT_KEY:benthos-key} + consumer_tag: ${AMQP_INPUT_CONSUMER_TAG:benthos-consumer} + file: + path: ${FILE_INPUT_PATH} + multipart: ${FILE_INPUT_MULTIPART:false} + max_buffer: ${FILE_INPUT_MAX_BUFFER:65536} http_client: url: ${HTTP_CLIENT_INPUT_URL} verb: ${HTTP_CLIENT_INPUT_VERB:GET} @@ -51,16 +67,9 @@ input: enabled: ${HTTP_CLIENT_INPUT_OAUTH_ENABLED:false} timeout_ms: ${HTTP_CLIENT_INPUT_TIMEOUT_MS:5000} skip_cert_verify: ${HTTP_CLIENT_INPUT_SKIP_CERT_VERIFY:false} - zmq4: - urls: - - ${ZMQ_INPUT_URLS} - bind: ${ZMQ_INPUT_BIND:true} - socket_type: ${ZMQ_INPUT_SOCKET:PULL} - scalability_protocols: - urls: - - ${SCALE_PROTO_INPUT_URLS} - bind: ${SCALE_PROTO_INPUT_BIND:true} - socket_type: ${SCALE_PROTO_INPUT_SOCKET:PULL} + http_server: + address: ${HTTP_SERVER_INPUT_ADDRESS} + path: ${HTTP_SERVER_INPUT_PATH:/post} kafka: addresses: - ${KAFKA_INPUT_BROKER_ADDRESSES} @@ -77,21 +86,6 @@ input: topics: - ${KAFKA_INPUT_TOPIC:benthos-stream} start_from_oldest: ${KAFKA_INPUT_START_OLDEST:true} - amqp: - url: ${AMQP_INPUT_URL} - exchange: ${AMQP_INPUT_EXCHANGE:benthos-exchange} - exchange_type: ${AMQP_INPUT_EXCHANGE_TYPE:direct} - queue: ${AMQP_INPUT_QUEUE:benthos-stream} - key: ${AMQP_INPUT_KEY:benthos-key} - consumer_tag: ${AMQP_INPUT_CONSUMER_TAG:benthos-consumer} - nsq: - nsqd_tcp_addresses: - - ${NSQD_INPUT_TCP_ADDRESSES} - lookupd_http_addresses: - - ${NSQD_INPUT_LOOKUP_ADDRESSES} - topic: ${NSQ_INPUT_TOPIC:benthos-messages} - channel: ${NSQ_INPUT_CHANNEL:benthos-stream} - user_agent: ${NSQ_INPUT_USER_AGENT:benthos-consumer} nats: urls: - ${NATS_INPUT_URLS} @@ -104,6 +98,14 @@ input: queue: ${NATS_INPUT_QUEUE:benthos-queue} durable_name: ${NATS_INPUT_DURABLE_NAME:benthos-offset} subject: ${NATS_INPUT_SUBJECT:benthos-stream} + nsq: + nsqd_tcp_addresses: + - ${NSQD_INPUT_TCP_ADDRESSES} + lookupd_http_addresses: + - ${NSQD_INPUT_LOOKUP_ADDRESSES} + topic: ${NSQ_INPUT_TOPIC:benthos-messages} + channel: ${NSQ_INPUT_CHANNEL:benthos-stream} + user_agent: ${NSQ_INPUT_USER_AGENT:benthos-consumer} redis_list: url: ${REDIS_INPUT_URL} key: ${REDIS_INPUT_LIST:benthos_list} @@ -112,10 +114,16 @@ input: url: ${REDIS_INPUT_URL} channels: - ${REDIS_INPUT_CHANNEL:benthos-stream} - file: - path: ${FILE_INPUT_PATH} - multipart: ${FILE_INPUT_MULTIPART:false} - max_buffer: ${FILE_INPUT_MAX_BUFFER:65536} + scalability_protocols: + urls: + - ${SCALE_PROTO_INPUT_URLS} + bind: ${SCALE_PROTO_INPUT_BIND:true} + socket_type: ${SCALE_PROTO_INPUT_SOCKET:PULL} + zmq4: + urls: + - ${ZMQ_INPUT_URLS} + bind: ${ZMQ_INPUT_BIND:true} + socket_type: ${ZMQ_INPUT_SOCKET:PULL} processors: - type: bounds_check bounds_check: @@ -125,6 +133,21 @@ input: - type: ${BENTHOS_INPUT_PROCESSOR:noop} output: type: ${BENTHOS_OUTPUT:http_server} + amazon_s3: + region: ${AMAZON_OUTPUT_REGION:eu-west-1} + bucket: ${AMAZON_OUTPUT_BUCKET} + credentials: + id: ${AMAZON_OUTPUT_CREDENTIALS_ID} + secret: ${AMAZON_OUTPUT_CREDENTIALS_SECRET} + token: ${AMAZON_OUTPUT_CREDENTIALS_TOKEN} + timeout_s: ${AMAZON_OUTPUT_TIMEOUT_S:5} + amqp: + url: ${AMQP_OUTPUT_URL} + exchange: ${AMQP_OUTPUT_EXCHANGE:benthos-exchange} + exchange_type: ${AMQP_OUTPUT_EXCHANGE_TYPE:direct} + key: ${AMQP_OUTPUT_KEY:benthos-key} + file: + path: ${FILE_OUTPUT_PATH} http_client: url: ${HTTP_CLIENT_OUTPUT_URL} verb: ${HTTP_CLIENT_OUTPUT_VERB:POST} @@ -142,16 +165,6 @@ output: address: ${HTTP_SERVER_OUTPUT_ADDRESS} path: ${HTTP_SERVER_OUTPUT_PATH:/get} stream_path: ${HTTP_SERVER_OUTPUT_STREAM_PATH:/get/stream} - zmq4: - urls: - - ${ZMQ_OUTPUT_URLS} - bind: ${ZMQ_OUTPUT_BIND:true} - socket_type: ${ZMQ_OUTPUT_SOCKET:PULL} - scalability_protocols: - urls: - - ${SCALE_PROTO_OUTPUT_URLS} - bind: ${SCALE_PROTO_OUTPUT_BIND:false} - socket_type: ${SCALE_PROTO_OUTPUT_SOCKET:PUSH} kafka: addresses: - ${KAFKA_OUTPUT_BROKER_ADDRESSES} @@ -159,15 +172,6 @@ output: max_msg_bytes: ${KAFKA_OUTPUT_MAX_MSG_BYTES:1000000} topic: ${KAFKA_OUTPUT_TOPIC:benthos-stream} ack_replicas: ${KAFKA_OUTPUT_ACK_REP:true} - amqp: - url: ${AMQP_OUTPUT_URL} - exchange: ${AMQP_OUTPUT_EXCHANGE:benthos-exchange} - exchange_type: ${AMQP_OUTPUT_EXCHANGE_TYPE:direct} - key: ${AMQP_OUTPUT_KEY:benthos-key} - nsq: - nsqd_tcp_address: ${NSQ_OUTPUT_TCP_ADDRESS} - topic: ${NSQ_OUTPUT_TOPIC:benthos-messages} - user_agent: ${NSQ_OUTPUT_USER_AGENT:benthos-consumer} nats: urls: - ${NATS_OUTPUT_URLS} @@ -178,14 +182,26 @@ output: cluster_id: ${NATS_OUTPUT_CLUSTER_ID:benthos-cluster} client_id: ${NATS_OUTPUT_CLIENT_ID:benthos-consumer} subject: ${NATS_OUTPUT_SUBJECT:benthos-stream} + nsq: + nsqd_tcp_address: ${NSQ_OUTPUT_TCP_ADDRESS} + topic: ${NSQ_OUTPUT_TOPIC:benthos-messages} + user_agent: ${NSQ_OUTPUT_USER_AGENT:benthos-consumer} redis_list: url: ${REDIS_OUTPUT_URL} key: ${REDIS_OUTPUT_LIST:benthos_list} redis_pubsub: url: ${REDIS_OUTPUT_URL} channel: ${REDIS_OUTPUT_CHANNEL:benthos-stream} - file: - path: ${FILE_OUTPUT_PATH} + scalability_protocols: + urls: + - ${SCALE_PROTO_OUTPUT_URLS} + bind: ${SCALE_PROTO_OUTPUT_BIND:false} + socket_type: ${SCALE_PROTO_OUTPUT_SOCKET:PUSH} + zmq4: + urls: + - ${ZMQ_OUTPUT_URLS} + bind: ${ZMQ_OUTPUT_BIND:true} + socket_type: ${ZMQ_OUTPUT_SOCKET:PULL} processors: - type: ${BENTHOS_OUTPUT_PROCESSOR:noop} logger: diff --git a/config/everything.yaml b/config/everything.yaml index 24eae6e71c..ecfeda8c53 100644 --- a/config/everything.yaml +++ b/config/everything.yaml @@ -14,6 +14,14 @@ input: secret: "" token: "" timeout_s: 5 + amazon_sqs: + region: eu-west-1 + url: "" + credentials: + id: "" + secret: "" + token: "" + timeout_s: 5 amqp: url: amqp://guest:guest@localhost:5672/ exchange: benthos-exchange @@ -159,6 +167,15 @@ input: parts: [] output: type: stdout + amazon_s3: + region: eu-west-1 + bucket: "" + path: ${!count:files}-${!timestamp_unix_nano}.txt + credentials: + id: "" + secret: "" + token: "" + timeout_s: 5 amqp: url: amqp://guest:guest@localhost:5672/ exchange: benthos-exchange diff --git a/lib/input/amazon_sqs.go b/lib/input/amazon_sqs.go new file mode 100644 index 0000000000..3836d95170 --- /dev/null +++ b/lib/input/amazon_sqs.go @@ -0,0 +1,53 @@ +// Copyright (c) 2018 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package input + +import ( + "github.com/Jeffail/benthos/lib/input/reader" + "github.com/Jeffail/benthos/lib/util/service/log" + "github.com/Jeffail/benthos/lib/util/service/metrics" +) + +//------------------------------------------------------------------------------ + +func init() { + constructors["amazon_sqs"] = typeSpec{ + constructor: NewAmazonSQS, + description: ` +Receive messages from an Amazon SQS URL, only the body is extracted into +messages.`, + } +} + +//------------------------------------------------------------------------------ + +// NewAmazonSQS creates a new Amazon SQS input type. +func NewAmazonSQS(conf Config, log log.Modular, stats metrics.Type) (Type, error) { + return NewReader( + "amazon_sqs", + reader.NewPreserver( + reader.NewAmazonSQS(conf.AmazonSQS, log, stats), + ), + log, stats, + ) +} + +//------------------------------------------------------------------------------ diff --git a/lib/input/constructor.go b/lib/input/constructor.go index 09866d1cc0..9084d332e4 100644 --- a/lib/input/constructor.go +++ b/lib/input/constructor.go @@ -57,6 +57,7 @@ var constructors = map[string]typeSpec{} type Config struct { Type string `json:"type" yaml:"type"` AmazonS3 reader.AmazonS3Config `json:"amazon_s3" yaml:"amazon_s3"` + AmazonSQS reader.AmazonSQSConfig `json:"amazon_sqs" yaml:"amazon_sqs"` AMQP AMQPConfig `json:"amqp" yaml:"amqp"` FanIn FanInConfig `json:"fan_in" yaml:"fan_in"` File FileConfig `json:"file" yaml:"file"` @@ -80,6 +81,7 @@ func NewConfig() Config { return Config{ Type: "stdin", AmazonS3: reader.NewAmazonS3Config(), + AmazonSQS: reader.NewAmazonSQSConfig(), AMQP: NewAMQPConfig(), FanIn: NewFanInConfig(), File: NewFileConfig(), diff --git a/lib/input/reader/amazon_s3.go b/lib/input/reader/amazon_s3.go index b7d611fce9..28f901263f 100644 --- a/lib/input/reader/amazon_s3.go +++ b/lib/input/reader/amazon_s3.go @@ -39,8 +39,8 @@ import ( //------------------------------------------------------------------------------ -// AmazonS3CredentialsConfig contains configuration params for AWS credentials. -type AmazonS3CredentialsConfig struct { +// AmazonAWSCredentialsConfig contains configuration params for AWS credentials. +type AmazonAWSCredentialsConfig struct { ID string `json:"id"` Secret string `json:"secret"` Token string `json:"token"` @@ -48,13 +48,13 @@ type AmazonS3CredentialsConfig struct { // AmazonS3Config is configuration values for the input type. type AmazonS3Config struct { - Region string `json:"region" yaml:"region"` - Bucket string `json:"bucket" yaml:"bucket"` - DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` - SQSURL string `json:"sqs_url" yaml:"sqs_url"` - SQSBodyPath string `json:"sqs_body_path" yaml:"sqs_body_path"` - Credentials AmazonS3CredentialsConfig `json:"credentials" yaml:"credentials"` - TimeoutS int64 `json:"timeout_s" yaml:"timeout_s"` + Region string `json:"region" yaml:"region"` + Bucket string `json:"bucket" yaml:"bucket"` + DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` + SQSURL string `json:"sqs_url" yaml:"sqs_url"` + SQSBodyPath string `json:"sqs_body_path" yaml:"sqs_body_path"` + Credentials AmazonAWSCredentialsConfig `json:"credentials" yaml:"credentials"` + TimeoutS int64 `json:"timeout_s" yaml:"timeout_s"` } // NewAmazonS3Config creates a new Config with default values. @@ -65,7 +65,7 @@ func NewAmazonS3Config() AmazonS3Config { DeleteObjects: false, SQSURL: "", SQSBodyPath: "Records.s3.object.key", - Credentials: AmazonS3CredentialsConfig{ + Credentials: AmazonAWSCredentialsConfig{ ID: "", Secret: "", Token: "", @@ -83,9 +83,6 @@ type AmazonS3 struct { sqsBodyPath []string - pendingHandles []string - ackHandles []string - readKeys []string targetKeys []string @@ -158,7 +155,7 @@ func (a *AmazonS3) Connect() error { a.sqs = sqs.New(sess) } - a.log.Infof("Receiving amazon s3 objects from bucket: %s\n", a.conf.Bucket) + a.log.Infof("Receiving Amazon S3 objects from bucket: %s\n", a.conf.Bucket) a.session = sess a.downloader = dler diff --git a/lib/input/reader/amazon_sqs.go b/lib/input/reader/amazon_sqs.go new file mode 100644 index 0000000000..adf32e94e9 --- /dev/null +++ b/lib/input/reader/amazon_sqs.go @@ -0,0 +1,175 @@ +// Copyright (c) 2018 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package reader + +import ( + "time" + + "github.com/Jeffail/benthos/lib/types" + "github.com/Jeffail/benthos/lib/util/service/log" + "github.com/Jeffail/benthos/lib/util/service/metrics" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" +) + +//------------------------------------------------------------------------------ + +// AmazonSQSConfig is configuration values for the input type. +type AmazonSQSConfig struct { + Region string `json:"region" yaml:"region"` + URL string `json:"url" yaml:"url"` + Credentials AmazonAWSCredentialsConfig `json:"credentials" yaml:"credentials"` + TimeoutS int64 `json:"timeout_s" yaml:"timeout_s"` +} + +// NewAmazonSQSConfig creates a new Config with default values. +func NewAmazonSQSConfig() AmazonSQSConfig { + return AmazonSQSConfig{ + Region: "eu-west-1", + URL: "", + Credentials: AmazonAWSCredentialsConfig{ + ID: "", + Secret: "", + Token: "", + }, + TimeoutS: 5, + } +} + +//------------------------------------------------------------------------------ + +// AmazonSQS is a benthos reader.Type implementation that reads messages from an +// Amazon S3 bucket. +type AmazonSQS struct { + conf AmazonSQSConfig + + pendingHandles []*sqs.DeleteMessageBatchRequestEntry + + session *session.Session + sqs *sqs.SQS + + log log.Modular + stats metrics.Type +} + +// NewAmazonSQS creates a new Amazon S3 bucket reader.Type. +func NewAmazonSQS( + conf AmazonSQSConfig, + log log.Modular, + stats metrics.Type, +) *AmazonSQS { + return &AmazonSQS{ + conf: conf, + log: log.NewModule(".input.amazon_sqs"), + stats: stats, + } +} + +// Connect attempts to establish a connection to the target SQS queue. +func (a *AmazonSQS) Connect() error { + if a.session != nil { + return nil + } + + awsConf := aws.NewConfig() + if len(a.conf.Region) > 0 { + awsConf = awsConf.WithRegion(a.conf.Region) + } + if len(a.conf.Credentials.ID) > 0 { + awsConf = awsConf.WithCredentials(credentials.NewStaticCredentials( + a.conf.Credentials.ID, + a.conf.Credentials.Secret, + a.conf.Credentials.Token, + )) + } + + sess, err := session.NewSession(awsConf) + if err != nil { + return err + } + + a.sqs = sqs.New(sess) + a.session = sess + + a.log.Infof("Receiving Amazon SQS messages from URL: %v\n", a.conf.URL) + return nil +} + +// Read attempts to read a new message from the target SQS. +func (a *AmazonSQS) Read() (types.Message, error) { + if a.session == nil { + return types.Message{}, types.ErrNotConnected + } + + output, err := a.sqs.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: aws.String(a.conf.URL), + MaxNumberOfMessages: aws.Int64(1), + WaitTimeSeconds: aws.Int64(a.conf.TimeoutS), + }) + if err != nil { + return types.Message{}, err + } + + msg := types.NewMessage() + + for _, sqsMsg := range output.Messages { + if sqsMsg.ReceiptHandle != nil { + a.pendingHandles = append(a.pendingHandles, &sqs.DeleteMessageBatchRequestEntry{ + Id: sqsMsg.MessageId, + ReceiptHandle: sqsMsg.ReceiptHandle, + }) + } + + if sqsMsg.Body != nil { + msg.Parts = append(msg.Parts, []byte(*sqsMsg.Body)) + } + } + + return msg, nil +} + +// Acknowledge confirms whether or not our unacknowledged messages have been +// successfully propagated or not. +func (a *AmazonSQS) Acknowledge(err error) error { + if _, err := a.sqs.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{ + QueueUrl: aws.String(a.conf.URL), + Entries: a.pendingHandles, + }); err != nil { + return err + } + + a.pendingHandles = nil + return nil +} + +// CloseAsync begins cleaning up resources used by this reader asynchronously. +func (a *AmazonSQS) CloseAsync() { +} + +// WaitForClose will block until either the reader is closed or a specified +// timeout occurs. +func (a *AmazonSQS) WaitForClose(time.Duration) error { + return nil +} + +//------------------------------------------------------------------------------ diff --git a/lib/output/amazon_s3.go b/lib/output/amazon_s3.go new file mode 100644 index 0000000000..5abdb9c12d --- /dev/null +++ b/lib/output/amazon_s3.go @@ -0,0 +1,51 @@ +// Copyright (c) 2018 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package output + +import ( + "github.com/Jeffail/benthos/lib/output/writer" + "github.com/Jeffail/benthos/lib/util/service/log" + "github.com/Jeffail/benthos/lib/util/service/metrics" +) + +//------------------------------------------------------------------------------ + +func init() { + constructors["amazon_s3"] = typeSpec{ + constructor: NewAmazonS3, + description: ` +Sends message parts as objects to an Amazon S3 bucket. Each object is uploaded +with the path specified with the 'path' field, in order to have a different path +for each object you should use function interpolations described +[here](../config_interpolation.md#functions).`, + } +} + +//------------------------------------------------------------------------------ + +// NewAmazonS3 creates a new AmazonS3 output type. +func NewAmazonS3(conf Config, log log.Modular, stats metrics.Type) (Type, error) { + return NewWriter( + "amazon_s3", writer.NewAmazonS3(conf.AmazonS3, log, stats), log, stats, + ) +} + +//------------------------------------------------------------------------------ diff --git a/lib/output/constructor.go b/lib/output/constructor.go index 0b34269f69..958a0d027a 100644 --- a/lib/output/constructor.go +++ b/lib/output/constructor.go @@ -50,6 +50,7 @@ var constructors = map[string]typeSpec{} // but we want to list it as an option. type Config struct { Type string `json:"type" yaml:"type"` + AmazonS3 writer.AmazonS3Config `json:"amazon_s3" yaml:"amazon_s3"` AMQP AMQPConfig `json:"amqp" yaml:"amqp"` FanOut FanOutConfig `json:"fan_out" yaml:"fan_out"` File FileConfig `json:"file" yaml:"file"` @@ -72,6 +73,7 @@ type Config struct { func NewConfig() Config { return Config{ Type: "stdout", + AmazonS3: writer.NewAmazonS3Config(), AMQP: NewAMQPConfig(), FanOut: NewFanOutConfig(), File: NewFileConfig(), diff --git a/lib/output/writer/amazon_s3.go b/lib/output/writer/amazon_s3.go new file mode 100644 index 0000000000..32b81c2a37 --- /dev/null +++ b/lib/output/writer/amazon_s3.go @@ -0,0 +1,169 @@ +// Copyright (c) 2018 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package writer + +import ( + "bytes" + "time" + + "github.com/Jeffail/benthos/lib/types" + "github.com/Jeffail/benthos/lib/util/service/log" + "github.com/Jeffail/benthos/lib/util/service/metrics" + "github.com/Jeffail/benthos/lib/util/text" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3/s3manager" +) + +//------------------------------------------------------------------------------ + +// AmazonAWSCredentialsConfig contains configuration params for AWS credentials. +type AmazonAWSCredentialsConfig struct { + ID string `json:"id"` + Secret string `json:"secret"` + Token string `json:"token"` +} + +// AmazonS3Config is configuration values for the input type. +type AmazonS3Config struct { + Region string `json:"region" yaml:"region"` + Bucket string `json:"bucket" yaml:"bucket"` + Path string `json:"path" yaml:"path"` + Credentials AmazonAWSCredentialsConfig `json:"credentials" yaml:"credentials"` + TimeoutS int64 `json:"timeout_s" yaml:"timeout_s"` +} + +// NewAmazonS3Config creates a new Config with default values. +func NewAmazonS3Config() AmazonS3Config { + return AmazonS3Config{ + Region: "eu-west-1", + Bucket: "", + Path: "${!count:files}-${!timestamp_unix_nano}.txt", + Credentials: AmazonAWSCredentialsConfig{ + ID: "", + Secret: "", + Token: "", + }, + TimeoutS: 5, + } +} + +//------------------------------------------------------------------------------ + +// AmazonS3 is a benthos writer.Type implementation that writes messages to an +// Amazon S3 bucket. +type AmazonS3 struct { + conf AmazonS3Config + + pathBytes []byte + interpolatePath bool + + session *session.Session + uploader *s3manager.Uploader + + log log.Modular + stats metrics.Type +} + +// NewAmazonS3 creates a new Amazon S3 bucket writer.Type. +func NewAmazonS3( + conf AmazonS3Config, + log log.Modular, + stats metrics.Type, +) *AmazonS3 { + pathBytes := []byte(conf.Path) + interpolatePath := text.ContainsFunctionVariables(pathBytes) + return &AmazonS3{ + conf: conf, + pathBytes: pathBytes, + interpolatePath: interpolatePath, + log: log.NewModule(".output.amazon_s3"), + stats: stats, + } +} + +// Connect attempts to establish a connection to the target S3 bucket and any +// relevant queues used to traverse the objects (SQS, etc). +func (a *AmazonS3) Connect() error { + if a.session != nil { + return nil + } + + awsConf := aws.NewConfig() + if len(a.conf.Region) > 0 { + awsConf = awsConf.WithRegion(a.conf.Region) + } + if len(a.conf.Credentials.ID) > 0 { + awsConf = awsConf.WithCredentials(credentials.NewStaticCredentials( + a.conf.Credentials.ID, + a.conf.Credentials.Secret, + a.conf.Credentials.Token, + )) + } + + sess, err := session.NewSession(awsConf) + if err != nil { + return err + } + + a.session = sess + a.uploader = s3manager.NewUploader(sess) + + a.log.Infof("Uploading message parts as objects to Amazon S3 bucket: %v\n", a.conf.Bucket) + return nil +} + +// Write attempts to write message contents to a target S3 bucket as files. +func (a *AmazonS3) Write(msg types.Message) error { + if a.session == nil { + return types.ErrNotConnected + } + + for _, part := range msg.Parts { + path := a.conf.Path + if a.interpolatePath { + path = string(text.ReplaceFunctionVariables(a.pathBytes)) + } + + if _, err := a.uploader.Upload(&s3manager.UploadInput{ + Body: bytes.NewReader(part), + Bucket: aws.String(a.conf.Bucket), + Key: aws.String(path), + }); err != nil { + return err + } + } + + return nil +} + +// CloseAsync begins cleaning up resources used by this reader asynchronously. +func (a *AmazonS3) CloseAsync() { +} + +// WaitForClose will block until either the reader is closed or a specified +// timeout occurs. +func (a *AmazonS3) WaitForClose(time.Duration) error { + return nil +} + +//------------------------------------------------------------------------------ diff --git a/lib/processor/insert_part.go b/lib/processor/insert_part.go index d108921f46..17f4285c10 100644 --- a/lib/processor/insert_part.go +++ b/lib/processor/insert_part.go @@ -43,7 +43,7 @@ inserted before the last element, and so on. If the negative index is greater than the length of the existing parts it will be inserted at the beginning. This processor will interpolate functions within the 'content' field, you can -find a list of functions [here](../config_interpolation.md).`, +find a list of functions [here](../config_interpolation.md#functions).`, } } diff --git a/lib/processor/set_json.go b/lib/processor/set_json.go index a9a584a8f9..b8bce2a240 100644 --- a/lib/processor/set_json.go +++ b/lib/processor/set_json.go @@ -63,7 +63,7 @@ will be the last part of the message, if part = -2 then the part before the last element with be selected, and so on. This processor will interpolate functions within the 'value' field, you can find -a list of functions [here](../config_interpolation.md).`, +a list of functions [here](../config_interpolation.md#functions).`, } } diff --git a/resources/docs/buffers/README.md b/resources/docs/buffers/README.md index 5a8554b560..0e06289788 100644 --- a/resources/docs/buffers/README.md +++ b/resources/docs/buffers/README.md @@ -1,7 +1,29 @@ -Buffers +BUFFERS ======= -Benthos has many configurable buffers, and there are more constantly being -added. For a full list of buffers [check out this generated document][0]. +This document has been generated with `benthos --list-buffers`. -[0]: ./list.md +## `memory` + +The memory buffer type simply allocates a set amount of RAM for buffering +messages. This protects the pipeline against backpressure until this buffer is +full. The messages are lost if the service is stopped. + +## `mmap_file` + +The mmap file buffer type uses memory mapped files to perform low-latency, +file-persisted buffering of messages. + +To configure the mmap file buffer you need to designate a writeable directory +for storing the mapped files. Benthos will create multiple files in this +directory as it fills them. + +When files are fully read from they will be deleted. You can disable this +feature if you wish to preserve the data indefinitely, but the directory will +fill up as fast as data passes through. + +## `none` + +Selecting no buffer (default) is the lowest latency option since no extra work +is done to messages that pass through. With this option back pressure from the +output will be directly applied down the pipeline. diff --git a/resources/docs/buffers/list.md b/resources/docs/buffers/list.md deleted file mode 100644 index 0e06289788..0000000000 --- a/resources/docs/buffers/list.md +++ /dev/null @@ -1,29 +0,0 @@ -BUFFERS -======= - -This document has been generated with `benthos --list-buffers`. - -## `memory` - -The memory buffer type simply allocates a set amount of RAM for buffering -messages. This protects the pipeline against backpressure until this buffer is -full. The messages are lost if the service is stopped. - -## `mmap_file` - -The mmap file buffer type uses memory mapped files to perform low-latency, -file-persisted buffering of messages. - -To configure the mmap file buffer you need to designate a writeable directory -for storing the mapped files. Benthos will create multiple files in this -directory as it fills them. - -When files are fully read from they will be deleted. You can disable this -feature if you wish to preserve the data indefinitely, but the directory will -fill up as fast as data passes through. - -## `none` - -Selecting no buffer (default) is the lowest latency option since no extra work -is done to messages that pass through. With this option back pressure from the -output will be directly applied down the pipeline. diff --git a/resources/docs/inputs/README.md b/resources/docs/inputs/README.md index 4d0454dca0..93fab97e14 100644 --- a/resources/docs/inputs/README.md +++ b/resources/docs/inputs/README.md @@ -1,7 +1,222 @@ -Inputs +INPUTS ====== -Benthos has many configurable inputs, and there are more constantly being added. -For a full list of available inputs [check out this generated document][0]. +This document has been generated with `benthos --list-inputs`. -[0]: ./list.md +## `amazon_s3` + +Downloads objects in an Amazon S3 bucket. If an SQS queue has been configured +then only object keys read from the queue will be downloaded. Otherwise, the +entire list of objects found when this input is created will be downloaded. + +Here is a guide for setting up an SQS queue that receives events for new S3 +bucket objects: + +https://docs.aws.amazon.com/AmazonS3/latest/dev/ways-to-add-notification-config-to-bucket.html + +## `amazon_sqs` + +Receive messages from an Amazon SQS URL, only the body is extracted into +messages. + +## `amqp` + +AMQP (0.91) is the underlying messaging protocol that is used by various message +brokers, including RabbitMQ. + +Exchange type options are: direct|fanout|topic|x-custom + +## `fan_in` + +The fan in type allows you to combine multiple inputs. Each input will be read +in parallel. In order to configure a fan in type you simply add an array of +input configuration objects into the inputs field. + +Adding more input types allows you to merge streams from multiple sources into +one. For example, having both a ZMQ4 PULL socket and a Nanomsg PULL socket: + +``` yaml +type: fan_in +fan_in: + inputs: + - + type: scalability_protocols + scalability_protocols: + address: tcp://nanoserver:3003 + bind_address: false + socket_type: PULL + - + type: zmq4 + zmq4: + addresses: + - tcp://zmqserver:3004 + socket_type: PULL +``` + +Sometimes you will want several inputs of very similar configuration. You can +use the special type ditto in this case to duplicate the previous config and +apply selective changes. + +For example, if combining two kafka inputs with the same set up, reading +different partitions you can use this shortcut: + +``` yaml +inputs: +- + type: kafka + kafka: + addresses: + - localhost:9092 + client_id: benthos_kafka_input + consumer_group: benthos_consumer_group + topic: benthos_stream + partition: 0 +- + type: ditto + kafka: + partition: 1 +``` + +Which will result in two inputs targeting the same kafka brokers, on the same +consumer group etc, but consuming their own partitions. Ditto can also be +specified with a multiplier, which is useful if you want multiple inputs that do +not differ in config, like this: + +``` yaml +inputs: +- + type: kafka_balanced + kafka: + addresses: + - localhost:9092 + client_id: benthos_kafka_input + consumer_group: benthos_consumer_group + topic: benthos_stream +- + type: ditto_3 +``` + +Which results in a total of four kafka_balanced inputs. Note that ditto_0 will +result in no duplicate configs, this might be useful if the config is generated +and there's a chance you won't want any duplicates. + +## `file` + +The file type reads input from a file. If multipart is set to false each line +is read as a separate message. If multipart is set to true each line is read as +a message part, and an empty line indicates the end of a message. + +Alternatively, a custom delimiter can be set that is used instead of line +breaks. + +## `http_client` + +The HTTP client input type connects to a server and continuously performs +requests for a single message. + +You should set a sensible number of max retries and retry delays so as to not +stress your target server. + +## Streaming + +If you enable streaming with the 'stream' field then benthos will consume the +body of the response using the same rules as the 'stdin' and 'file' input types. + +For more information about sending HTTP messages, including details on sending +multipart, please read the 'docs/using_http.md' document. + +## `http_server` + +Receive messages POSTed over HTTP(S). HTTP 2.0 is supported when using TLS, +which is enabled when key and cert files are specified. + +You can leave the 'address' config field blank in order to use the default +service, but this will ignore TLS options. + +## `kafka` + +Connects to a kafka (0.8+) server. Offsets are managed within kafka as per the +consumer group (set via config). Only one partition per input is supported, if +you wish to balance partitions across a consumer group look at the +'kafka_balanced' input type instead. + +## `kafka_balanced` + +Connects to a kafka (0.9+) server. Offsets are managed within kafka as per the +consumer group (set via config), and partitions are automatically balanced +across any members of the consumer group. + +## `nats` + +Subscribe to a NATS subject. NATS is at-most-once, if you need at-least-once +behaviour then look at NATS Stream. + +The urls can contain username/password semantics. e.g. +nats://derek:pass@localhost:4222 + +## `nats_stream` + +Subscribe to a NATS Stream subject, which is at-least-once. Joining a queue is +optional and allows multiple clients of a subject to consume using queue +semantics. + +Tracking and persisting offsets through a durable name is also optional and +works with or without a queue. If a durable name is not provided then subjects +are consumed from the most recently published message. + +## `nsq` + +Subscribe to an NSQ instance topic and channel. + +## `redis_list` + +Pops messages from the beginning of a Redis list using the BLPop command. + +## `redis_pubsub` + +Redis supports a publish/subscribe model, it's possible to subscribe to multiple +channels using this input. + +## `scalability_protocols` + +The scalability protocols are common communication patterns which will be +familiar to anyone accustomed to service messaging protocols. + +This input type should be compatible with any implementation of these protocols, +but nanomsg (http://nanomsg.org/index.html) is the specific target of this type. + +Since scale proto messages are only single part we would need a binary format +for interpretting multi part messages. If the input is receiving messages from a +benthos output you can set both to use the benthos binary multipart format with +the 'benthos_multi' flag. Note, however, that this format may appear to be +gibberish to other services, and the input will be unable to read normal +messages with this setting. + +Currently only PULL, SUB, and REP sockets are supported. + +When using REP sockets Benthos will respond to each request with a success or +error message. The content of these messages are set with the 'reply_success' +and 'reply_error' config options respectively. The 'reply_timeout_ms' option +decides how long Benthos will wait before giving up on the reply, which can +result in duplicate messages when triggered. + +## `stdin` + +The stdin input simply reads any data piped to stdin as messages. By default the +messages are assumed single part and are line delimited. If the multipart option +is set to true then lines are interpretted as message parts, and an empty line +indicates the end of the message. + +Alternatively, a custom delimiter can be set that is used instead of line +breaks. + +## `zmq4` + +ZMQ4 is supported but currently depends on C bindings. Since this is an +annoyance when building or using Benthos it is not compiled by default. + +Build it into your project by getting CZMQ installed on your machine, then build +with the tag: 'go install -tags "ZMQ4" github.com/Jeffail/benthos/cmd/...' + +ZMQ4 input supports PULL and SUB sockets only. If there is demand for other +socket types then they can be added easily. diff --git a/resources/docs/inputs/list.md b/resources/docs/inputs/list.md deleted file mode 100644 index d19daeb3f7..0000000000 --- a/resources/docs/inputs/list.md +++ /dev/null @@ -1,217 +0,0 @@ -INPUTS -====== - -This document has been generated with `benthos --list-inputs`. - -## `amazon_s3` - -Downloads objects in an Amazon S3 bucket. If an SQS queue has been configured -then only object keys read from the queue will be downloaded. Otherwise, the -entire list of objects found when this input is created will be downloaded. - -Here is a guide for setting up an SQS queue that receives events for new S3 -bucket objects: - -https://docs.aws.amazon.com/AmazonS3/latest/dev/ways-to-add-notification-config-to-bucket.html - -## `amqp` - -AMQP (0.91) is the underlying messaging protocol that is used by various message -brokers, including RabbitMQ. - -Exchange type options are: direct|fanout|topic|x-custom - -## `fan_in` - -The fan in type allows you to combine multiple inputs. Each input will be read -in parallel. In order to configure a fan in type you simply add an array of -input configuration objects into the inputs field. - -Adding more input types allows you to merge streams from multiple sources into -one. For example, having both a ZMQ4 PULL socket and a Nanomsg PULL socket: - -``` yaml -type: fan_in -fan_in: - inputs: - - - type: scalability_protocols - scalability_protocols: - address: tcp://nanoserver:3003 - bind_address: false - socket_type: PULL - - - type: zmq4 - zmq4: - addresses: - - tcp://zmqserver:3004 - socket_type: PULL -``` - -Sometimes you will want several inputs of very similar configuration. You can -use the special type ditto in this case to duplicate the previous config and -apply selective changes. - -For example, if combining two kafka inputs with the same set up, reading -different partitions you can use this shortcut: - -``` yaml -inputs: -- - type: kafka - kafka: - addresses: - - localhost:9092 - client_id: benthos_kafka_input - consumer_group: benthos_consumer_group - topic: benthos_stream - partition: 0 -- - type: ditto - kafka: - partition: 1 -``` - -Which will result in two inputs targeting the same kafka brokers, on the same -consumer group etc, but consuming their own partitions. Ditto can also be -specified with a multiplier, which is useful if you want multiple inputs that do -not differ in config, like this: - -``` yaml -inputs: -- - type: kafka_balanced - kafka: - addresses: - - localhost:9092 - client_id: benthos_kafka_input - consumer_group: benthos_consumer_group - topic: benthos_stream -- - type: ditto_3 -``` - -Which results in a total of four kafka_balanced inputs. Note that ditto_0 will -result in no duplicate configs, this might be useful if the config is generated -and there's a chance you won't want any duplicates. - -## `file` - -The file type reads input from a file. If multipart is set to false each line -is read as a separate message. If multipart is set to true each line is read as -a message part, and an empty line indicates the end of a message. - -Alternatively, a custom delimiter can be set that is used instead of line -breaks. - -## `http_client` - -The HTTP client input type connects to a server and continuously performs -requests for a single message. - -You should set a sensible number of max retries and retry delays so as to not -stress your target server. - -## Streaming - -If you enable streaming with the 'stream' field then benthos will consume the -body of the response using the same rules as the 'stdin' and 'file' input types. - -For more information about sending HTTP messages, including details on sending -multipart, please read the 'docs/using_http.md' document. - -## `http_server` - -Receive messages POSTed over HTTP(S). HTTP 2.0 is supported when using TLS, -which is enabled when key and cert files are specified. - -You can leave the 'address' config field blank in order to use the default -service, but this will ignore TLS options. - -## `kafka` - -Connects to a kafka (0.8+) server. Offsets are managed within kafka as per the -consumer group (set via config). Only one partition per input is supported, if -you wish to balance partitions across a consumer group look at the -'kafka_balanced' input type instead. - -## `kafka_balanced` - -Connects to a kafka (0.9+) server. Offsets are managed within kafka as per the -consumer group (set via config), and partitions are automatically balanced -across any members of the consumer group. - -## `nats` - -Subscribe to a NATS subject. NATS is at-most-once, if you need at-least-once -behaviour then look at NATS Stream. - -The urls can contain username/password semantics. e.g. -nats://derek:pass@localhost:4222 - -## `nats_stream` - -Subscribe to a NATS Stream subject, which is at-least-once. Joining a queue is -optional and allows multiple clients of a subject to consume using queue -semantics. - -Tracking and persisting offsets through a durable name is also optional and -works with or without a queue. If a durable name is not provided then subjects -are consumed from the most recently published message. - -## `nsq` - -Subscribe to an NSQ instance topic and channel. - -## `redis_list` - -Pops messages from the beginning of a Redis list using the BLPop command. - -## `redis_pubsub` - -Redis supports a publish/subscribe model, it's possible to subscribe to multiple -channels using this input. - -## `scalability_protocols` - -The scalability protocols are common communication patterns which will be -familiar to anyone accustomed to service messaging protocols. - -This input type should be compatible with any implementation of these protocols, -but nanomsg (http://nanomsg.org/index.html) is the specific target of this type. - -Since scale proto messages are only single part we would need a binary format -for interpretting multi part messages. If the input is receiving messages from a -benthos output you can set both to use the benthos binary multipart format with -the 'benthos_multi' flag. Note, however, that this format may appear to be -gibberish to other services, and the input will be unable to read normal -messages with this setting. - -Currently only PULL, SUB, and REP sockets are supported. - -When using REP sockets Benthos will respond to each request with a success or -error message. The content of these messages are set with the 'reply_success' -and 'reply_error' config options respectively. The 'reply_timeout_ms' option -decides how long Benthos will wait before giving up on the reply, which can -result in duplicate messages when triggered. - -## `stdin` - -The stdin input simply reads any data piped to stdin as messages. By default the -messages are assumed single part and are line delimited. If the multipart option -is set to true then lines are interpretted as message parts, and an empty line -indicates the end of the message. - -Alternatively, a custom delimiter can be set that is used instead of line -breaks. - -## `zmq4` - -ZMQ4 is supported but currently depends on C bindings. Since this is an -annoyance when building or using Benthos it is not compiled by default. - -Build it into your project by getting CZMQ installed on your machine, then build -with the tag: 'go install -tags "ZMQ4" github.com/Jeffail/benthos/cmd/...' - -ZMQ4 input supports PULL and SUB sockets only. If there is demand for other -socket types then they can be added easily. diff --git a/resources/docs/outputs/README.md b/resources/docs/outputs/README.md index 114878dc1e..c5c17da161 100644 --- a/resources/docs/outputs/README.md +++ b/resources/docs/outputs/README.md @@ -1,7 +1,149 @@ -Outputs +OUTPUTS ======= -Benthos has many configurable outputs, and there are more constantly being -added. For a full list of outputs [check out this generated document][0]. +This document has been generated with `benthos --list-outputs`. -[0]: ./list.md +## `amazon_s3` + +Sends message parts as objects to an Amazon S3 bucket. Each object is uploaded +with the path specified with the 'path' field, in order to have a different path +for each object you should use function interpolations described +[here](../config_interpolation.md#functions). + +## `amqp` + +AMQP (0.91) is the underlying messaging protocol that is used by various message +brokers, including RabbitMQ. + +## `fan_out` + +The fan out output type allows you to configure multiple output targets. With +the fan out model all outputs will be sent every message that passes through +benthos. + +This process is blocking, meaning if any output applies backpressure then it +will block all outputs from receiving messages. If an output fails to guarantee +receipt of a message it will be tried again until success. + +If Benthos is stopped during a fan out send it is possible that when started +again it will send a duplicate message to some outputs. + +## `file` + +The file output type simply appends all messages to an output file. Single part +messages are printed with a line separator '\n'. Multipart messages are written +with each part line separated, with the final part followed by two line +separators, e.g. a multipart message [ "foo", "bar", "baz" ] would be written +as: + +foo\n +bar\n +baz\n\n + +You can alternatively specify a custom delimiter that will follow the same rules +as '\n' above. + +## `http_client` + +The HTTP client output type connects to a server and sends POST requests for +each message. The body of the request is the raw message contents. The output +will apply back pressure until a 2XX response has been returned from the server. + +For more information about sending HTTP messages, including details on sending +multipart, please read the 'docs/using_http.md' document. + +## `http_server` + +Sets up an HTTP server that will send messages over HTTP(S) GET requests. HTTP +2.0 is supported when using TLS, which is enabled when key and cert files are +specified. + +You can leave the 'address' config field blank in order to use the default +service, but this will ignore TLS options. + +You can receive a single, discrete message on the configured 'path' endpoint, or +receive a constant stream of line delimited messages on the configured +'stream_path' endpoint. + +## `kafka` + +The kafka output type writes messages to a kafka broker, these messages are +acknowledged, which is propagated back to the input. The config field +'ack_replicas' determines whether we wait for acknowledgement from all replicas +or just a single broker. + +## `nats` + +Publish to an NATS subject. NATS is at-most-once, so delivery is not guaranteed. +For at-least-once behaviour with NATS look at NATS Stream. + +## `nats_stream` + +Publish to a NATS Stream subject. NATS Streaming is at-least-once and therefore +this output is able to guarantee delivery on success. + +## `nsq` + +Publish to an NSQ topic. + +## `redis_list` + +Pushes messages onto the end of a Redis list (which is created if it doesn't +already exist) using the RPUSH command. + +## `redis_pubsub` + +Publishes messages through the Redis PubSub model. It is not possible to +guarantee that messages have been received. + +## `round_robin` + +The round robin output type allows you to send messages across multiple outputs, +where each message is sent to exactly one output following a strict order. + +If an output applies back pressure this will also block other outputs from +receiving content. + +## `scalability_protocols` + +The scalability protocols are common communication patterns which will be +familiar to anyone accustomed to service messaging protocols. + +This outnput type should be compatible with any implementation of these +protocols, but nanomsg (http://nanomsg.org/index.html) is the specific target of +this type. + +Since scale proto messages are only single part we would need a binary format +for sending multi part messages. We can use the benthos binary format for this +purpose. However, this format may appear to be gibberish to other services. If +you want to use the binary format you can set 'benthos_multi' to true. + +Currently only PUSH, PUB and REQ sockets are supported. + +When using REQ sockets Benthos will expect acknowledgement from the consumer +that the message has been successfully propagated downstream. This comes in the +form of an expected response which is set by the 'reply_success' configuration +field. + +If the reply from a REQ message is either not returned within the +'reply_timeout_ms' period, or if the reply does not match our 'reply_success' +string, then the message is considered lost and will be sent again. + +## `stdout` + +The stdout output type prints messages to stdout. Single part messages are +printed with a line separator '\n'. Multipart messages are written with each +part line separated, with the final part followed by two line separators, e.g. +a multipart message [ "foo", "bar", "baz" ] would be written as: + +foo\n +bar\n +baz\n\n + +You can alternatively specify a custom delimiter that will follow the same rules +as '\n' above. + +## `zmq4` + +The zmq4 output type attempts to send messages to a ZMQ4 port, currently only +PUSH and PUB sockets are supported. diff --git a/resources/docs/outputs/list.md b/resources/docs/outputs/list.md deleted file mode 100644 index c8c31d2b45..0000000000 --- a/resources/docs/outputs/list.md +++ /dev/null @@ -1,142 +0,0 @@ -OUTPUTS -======= - -This document has been generated with `benthos --list-outputs`. - -## `amqp` - -AMQP (0.91) is the underlying messaging protocol that is used by various message -brokers, including RabbitMQ. - -## `fan_out` - -The fan out output type allows you to configure multiple output targets. With -the fan out model all outputs will be sent every message that passes through -benthos. - -This process is blocking, meaning if any output applies backpressure then it -will block all outputs from receiving messages. If an output fails to guarantee -receipt of a message it will be tried again until success. - -If Benthos is stopped during a fan out send it is possible that when started -again it will send a duplicate message to some outputs. - -## `file` - -The file output type simply appends all messages to an output file. Single part -messages are printed with a line separator '\n'. Multipart messages are written -with each part line separated, with the final part followed by two line -separators, e.g. a multipart message [ "foo", "bar", "baz" ] would be written -as: - -foo\n -bar\n -baz\n\n - -You can alternatively specify a custom delimiter that will follow the same rules -as '\n' above. - -## `http_client` - -The HTTP client output type connects to a server and sends POST requests for -each message. The body of the request is the raw message contents. The output -will apply back pressure until a 2XX response has been returned from the server. - -For more information about sending HTTP messages, including details on sending -multipart, please read the 'docs/using_http.md' document. - -## `http_server` - -Sets up an HTTP server that will send messages over HTTP(S) GET requests. HTTP -2.0 is supported when using TLS, which is enabled when key and cert files are -specified. - -You can leave the 'address' config field blank in order to use the default -service, but this will ignore TLS options. - -You can receive a single, discrete message on the configured 'path' endpoint, or -receive a constant stream of line delimited messages on the configured -'stream_path' endpoint. - -## `kafka` - -The kafka output type writes messages to a kafka broker, these messages are -acknowledged, which is propagated back to the input. The config field -'ack_replicas' determines whether we wait for acknowledgement from all replicas -or just a single broker. - -## `nats` - -Publish to an NATS subject. NATS is at-most-once, so delivery is not guaranteed. -For at-least-once behaviour with NATS look at NATS Stream. - -## `nats_stream` - -Publish to a NATS Stream subject. NATS Streaming is at-least-once and therefore -this output is able to guarantee delivery on success. - -## `nsq` - -Publish to an NSQ topic. - -## `redis_list` - -Pushes messages onto the end of a Redis list (which is created if it doesn't -already exist) using the RPUSH command. - -## `redis_pubsub` - -Publishes messages through the Redis PubSub model. It is not possible to -guarantee that messages have been received. - -## `round_robin` - -The round robin output type allows you to send messages across multiple outputs, -where each message is sent to exactly one output following a strict order. - -If an output applies back pressure this will also block other outputs from -receiving content. - -## `scalability_protocols` - -The scalability protocols are common communication patterns which will be -familiar to anyone accustomed to service messaging protocols. - -This outnput type should be compatible with any implementation of these -protocols, but nanomsg (http://nanomsg.org/index.html) is the specific target of -this type. - -Since scale proto messages are only single part we would need a binary format -for sending multi part messages. We can use the benthos binary format for this -purpose. However, this format may appear to be gibberish to other services. If -you want to use the binary format you can set 'benthos_multi' to true. - -Currently only PUSH, PUB and REQ sockets are supported. - -When using REQ sockets Benthos will expect acknowledgement from the consumer -that the message has been successfully propagated downstream. This comes in the -form of an expected response which is set by the 'reply_success' configuration -field. - -If the reply from a REQ message is either not returned within the -'reply_timeout_ms' period, or if the reply does not match our 'reply_success' -string, then the message is considered lost and will be sent again. - -## `stdout` - -The stdout output type prints messages to stdout. Single part messages are -printed with a line separator '\n'. Multipart messages are written with each -part line separated, with the final part followed by two line separators, e.g. -a multipart message [ "foo", "bar", "baz" ] would be written as: - -foo\n -bar\n -baz\n\n - -You can alternatively specify a custom delimiter that will follow the same rules -as '\n' above. - -## `zmq4` - -The zmq4 output type attempts to send messages to a ZMQ4 port, currently only -PUSH and PUB sockets are supported. diff --git a/resources/docs/processors/list.md b/resources/docs/processors/list.md index 0ee72dc40e..682313053c 100644 --- a/resources/docs/processors/list.md +++ b/resources/docs/processors/list.md @@ -75,7 +75,7 @@ inserted before the last element, and so on. If the negative index is greater than the length of the existing parts it will be inserted at the beginning. This processor will interpolate functions within the 'content' field, you can -find a list of functions [here](../config_interpolation.md). +find a list of functions [here](../config_interpolation.md#functions). ## `multi_to_blob` @@ -143,7 +143,7 @@ will be the last part of the message, if part = -2 then the part before the last element with be selected, and so on. This processor will interpolate functions within the 'value' field, you can find -a list of functions [here](../config_interpolation.md). +a list of functions [here](../config_interpolation.md#functions). ## `unarchive`