diff --git a/sender/config/models.go b/sender/config/models.go index 52e28dbd4..1bae049af 100644 --- a/sender/config/models.go +++ b/sender/config/models.go @@ -130,6 +130,7 @@ const ( ElasticVersion5 = "5.x" // ElasticVersion6 v6.x ElasticVersion6 = "6.x" + ElasticVersion7 = "7.x" //timeZone KeylocalTimezone = "Local" @@ -208,11 +209,11 @@ const ( KeyKafkaCompressionSnappy = "snappy" KeyKafkaCompressionLZ4 = "lz4" - KeyKafkaHost = "kafka_host" //主机地址,可以有多个 - KeyKafkaTopic = "kafka_topic" //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值 - KeyKafkaClientId = "kafka_client_id" //客户端ID - KeySaslUsername = "kafka_sasl_username" //SASL用户名 - KeySaslPassword = "kafka_sasl_password" //SASL密码 + KeyKafkaHost = "kafka_host" //主机地址,可以有多个 + KeyKafkaTopic = "kafka_topic" //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值 + KeyKafkaClientId = "kafka_client_id" //客户端ID + KeySaslUsername = "kafka_sasl_username" //SASL用户名 + KeySaslPassword = "kafka_sasl_password" //SASL密码 //KeyKafkaFlushNum = "kafka_flush_num" //缓冲条数 //KeyKafkaFlushFrequency = "kafka_flush_frequency" //缓冲频率 KeyKafkaRetryMax = "kafka_retry_max" //最大重试次数 diff --git a/sender/elasticsearch/elasticsearch.go b/sender/elasticsearch/elasticsearch.go index ce7a9aa81..1eb097539 100644 --- a/sender/elasticsearch/elasticsearch.go +++ b/sender/elasticsearch/elasticsearch.go @@ -8,10 +8,11 @@ import ( "strings" "time" - "github.com/json-iterator/go" + jsoniter "github.com/json-iterator/go" elasticV6 "github.com/olivere/elastic" elasticV3 "gopkg.in/olivere/elastic.v3" elasticV5 "gopkg.in/olivere/elastic.v5" + elasticV7 "gopkg.in/olivere/elastic.v7" "github.com/qiniu/log" "github.com/qiniu/pandora-go-sdk/base/reqerr" @@ -36,6 +37,7 @@ type Sender struct { elasticV3Client *elasticV3.Client elasticV5Client *elasticV5.Client elasticV6Client *elasticV6.Client + elasticV7Client *elasticV7.Client aliasFields map[string]string @@ -93,7 +95,25 @@ func NewSender(conf conf.MapConf) (elasticSender sender.Sender, err error) { var elasticV3Client *elasticV3.Client var elasticV5Client *elasticV5.Client var elasticV6Client *elasticV6.Client + var elasticV7Client *elasticV7.Client + switch eVersion { + case ElasticVersion7: + optFns := []elasticV7.ClientOptionFunc{ + elasticV7.SetSniff(false), + elasticV7.SetHealthcheck(false), + elasticV7.SetURL(host...), + elasticV7.SetGzip(enableGzip), + } + + if len(authUsername) > 0 && len(authPassword) > 0 { + optFns = append(optFns, elasticV7.SetBasicAuth(authUsername, authPassword)) + } + + elasticV7Client, err = elasticV7.NewClient(optFns...) + if err != nil { + return nil, err + } case ElasticVersion6: optFns := []elasticV6.ClientOptionFunc{ elasticV6.SetSniff(false), @@ -156,6 +176,7 @@ func NewSender(conf conf.MapConf) (elasticSender sender.Sender, err error) { elasticV3Client: elasticV3Client, elasticV5Client: elasticV5Client, elasticV6Client: elasticV6Client, + elasticV7Client: elasticV7Client, eType: eType, aliasFields: fields, intervalIndex: i,