Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add 添加 elasticsearch7.x sender 插件 #1159

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions sender/config/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ const (
ElasticVersion5 = "5.x"
// ElasticVersion6 v6.x
ElasticVersion6 = "6.x"
ElasticVersion7 = "7.x"

//timeZone
KeylocalTimezone = "Local"
Expand Down Expand Up @@ -208,11 +209,11 @@ const (
KeyKafkaCompressionSnappy = "snappy"
KeyKafkaCompressionLZ4 = "lz4"

KeyKafkaHost = "kafka_host" //主机地址,可以有多个
KeyKafkaTopic = "kafka_topic" //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值
KeyKafkaClientId = "kafka_client_id" //客户端ID
KeySaslUsername = "kafka_sasl_username" //SASL用户名
KeySaslPassword = "kafka_sasl_password" //SASL密码
KeyKafkaHost = "kafka_host" //主机地址,可以有多个
KeyKafkaTopic = "kafka_topic" //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值
KeyKafkaClientId = "kafka_client_id" //客户端ID
KeySaslUsername = "kafka_sasl_username" //SASL用户名
KeySaslPassword = "kafka_sasl_password" //SASL密码
//KeyKafkaFlushNum = "kafka_flush_num" //缓冲条数
//KeyKafkaFlushFrequency = "kafka_flush_frequency" //缓冲频率
KeyKafkaRetryMax = "kafka_retry_max" //最大重试次数
Expand Down
23 changes: 22 additions & 1 deletion sender/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"strings"
"time"

"github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"
elasticV6 "github.com/olivere/elastic"
elasticV3 "gopkg.in/olivere/elastic.v3"
elasticV5 "gopkg.in/olivere/elastic.v5"
elasticV7 "gopkg.in/olivere/elastic.v7"

"github.com/qiniu/log"
"github.com/qiniu/pandora-go-sdk/base/reqerr"
Expand All @@ -36,6 +37,7 @@ type Sender struct {
elasticV3Client *elasticV3.Client
elasticV5Client *elasticV5.Client
elasticV6Client *elasticV6.Client
elasticV7Client *elasticV7.Client

aliasFields map[string]string

Expand Down Expand Up @@ -93,7 +95,25 @@ func NewSender(conf conf.MapConf) (elasticSender sender.Sender, err error) {
var elasticV3Client *elasticV3.Client
var elasticV5Client *elasticV5.Client
var elasticV6Client *elasticV6.Client
var elasticV7Client *elasticV7.Client

switch eVersion {
case ElasticVersion7:
optFns := []elasticV7.ClientOptionFunc{
elasticV7.SetSniff(false),
elasticV7.SetHealthcheck(false),
elasticV7.SetURL(host...),
elasticV7.SetGzip(enableGzip),
}

if len(authUsername) > 0 && len(authPassword) > 0 {
optFns = append(optFns, elasticV7.SetBasicAuth(authUsername, authPassword))
}

elasticV7Client, err = elasticV7.NewClient(optFns...)
if err != nil {
return nil, err
}
case ElasticVersion6:
optFns := []elasticV6.ClientOptionFunc{
elasticV6.SetSniff(false),
Expand Down Expand Up @@ -156,6 +176,7 @@ func NewSender(conf conf.MapConf) (elasticSender sender.Sender, err error) {
elasticV3Client: elasticV3Client,
elasticV5Client: elasticV5Client,
elasticV6Client: elasticV6Client,
elasticV7Client: elasticV7Client,
eType: eType,
aliasFields: fields,
intervalIndex: i,
Expand Down