Skip to content
This repository has been archived by the owner on Jun 21, 2022. It is now read-only.

PMM-3387 PMM-5464 Custom base Prometheus file, update code copied from Prometheus to version 2.16.0 #339

Merged
merged 17 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ const (
http1Addr = "127.0.0.1:7772"
debugAddr = "127.0.0.1:7773"

defaultAlertManagerFile = "/srv/prometheus/rules/pmm.rules.yml"
defaultAlertManagerFile = "/srv/prometheus/rules/pmm.rules.yml"
prometheusBaseConfigFile = "/srv/prometheus/prometheus.base.yml"
)

func addLogsHandler(mux *http.ServeMux, logs *supervisord.Logs) {
Expand Down Expand Up @@ -484,7 +485,7 @@ func main() {
prom.MustRegister(reformL)
db := reform.NewDB(sqlDB, postgresql.Dialect, reformL)

prometheus, err := prometheus.NewService(*prometheusConfigF, *promtoolF, db, *prometheusURLF)
prometheus, err := prometheus.NewService(*prometheusConfigF, prometheusBaseConfigFile, *promtoolF, db, *prometheusURLF)
if err != nil {
l.Panicf("Prometheus service problem: %+v", err)
}
Expand Down
241 changes: 197 additions & 44 deletions services/prometheus/internal/common/config/http_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

// Original file: https://github.com/prometheus/common/blob/v0.2.0/config/http_config.go
// Original file: https://github.com/prometheus/common/blob/v0.9.1/config/http_config.go
artemgavrilov marked this conversation as resolved.
Show resolved Hide resolved

// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -32,19 +32,26 @@
package config

import (
"bytes"
"crypto/md5"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/mwitkow/go-conntrack"
"gopkg.in/yaml.v2"
)

type closeIdler interface {
CloseIdleConnections()
}

// BasicAuth contains basic HTTP authentication credentials.
type BasicAuth struct {
Username string `yaml:"username"`
Expand Down Expand Up @@ -131,8 +138,8 @@ func newClient(rt http.RoundTripper) *http.Client {

// NewClientFromConfig returns a new HTTP client configured for the
// given config.HTTPClientConfig. The name is used as go-conntrack metric label.
func NewClientFromConfig(cfg HTTPClientConfig, name string) (*http.Client, error) {
rt, err := NewRoundTripperFromConfig(cfg, name)
func NewClientFromConfig(cfg HTTPClientConfig, name string, disableKeepAlives bool) (*http.Client, error) {
rt, err := NewRoundTripperFromConfig(cfg, name, disableKeepAlives)
if err != nil {
return nil, err
}
Expand All @@ -141,43 +148,54 @@ func NewClientFromConfig(cfg HTTPClientConfig, name string) (*http.Client, error

// NewRoundTripperFromConfig returns a new HTTP RoundTripper configured for the
// given config.HTTPClientConfig. The name is used as go-conntrack metric label.
func NewRoundTripperFromConfig(cfg HTTPClientConfig, name string) (http.RoundTripper, error) {
func NewRoundTripperFromConfig(cfg HTTPClientConfig, name string, disableKeepAlives bool) (http.RoundTripper, error) {
newRT := func(tlsConfig *tls.Config) (http.RoundTripper, error) {
// The only timeout we care about is the configured scrape timeout.
// It is applied on request. So we leave out any timings here.
var rt http.RoundTripper = &http.Transport{
Proxy: http.ProxyURL(cfg.ProxyURL.URL),
MaxIdleConns: 20000,
MaxIdleConnsPerHost: 1000, // see https://github.com/golang/go/issues/13801
DisableKeepAlives: disableKeepAlives,
TLSClientConfig: tlsConfig,
DisableCompression: true,
// 5 minutes is typically above the maximum sane scrape interval. So we can
// use keepalive for all configurations.
IdleConnTimeout: 5 * time.Minute,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DialContext: conntrack.NewDialContextFunc(
conntrack.DialWithTracing(),
conntrack.DialWithName(name),
),
}

// If a bearer token is provided, create a round tripper that will set the
// Authorization header correctly on each request.
if len(cfg.BearerToken) > 0 {
rt = NewBearerAuthRoundTripper(cfg.BearerToken, rt)
} else if len(cfg.BearerTokenFile) > 0 {
rt = NewBearerAuthFileRoundTripper(cfg.BearerTokenFile, rt)
}

if cfg.BasicAuth != nil {
rt = NewBasicAuthRoundTripper(cfg.BasicAuth.Username, cfg.BasicAuth.Password, cfg.BasicAuth.PasswordFile, rt)
}
// Return a new configured RoundTripper.
return rt, nil
}

tlsConfig, err := NewTLSConfig(&cfg.TLSConfig)
if err != nil {
return nil, err
}
// The only timeout we care about is the configured scrape timeout.
// It is applied on request. So we leave out any timings here.
var rt http.RoundTripper = &http.Transport{
Proxy: http.ProxyURL(cfg.ProxyURL.URL),
MaxIdleConns: 20000,
MaxIdleConnsPerHost: 1000, // see https://github.com/golang/go/issues/13801
DisableKeepAlives: false,
TLSClientConfig: tlsConfig,
DisableCompression: true,
// 5 minutes is typically above the maximum sane scrape interval. So we can
// use keepalive for all configurations.
IdleConnTimeout: 5 * time.Minute,
DialContext: conntrack.NewDialContextFunc(
conntrack.DialWithTracing(),
conntrack.DialWithName(name),
),
}

// If a bearer token is provided, create a round tripper that will set the
// Authorization header correctly on each request.
if len(cfg.BearerToken) > 0 {
rt = NewBearerAuthRoundTripper(cfg.BearerToken, rt)
} else if len(cfg.BearerTokenFile) > 0 {
rt = NewBearerAuthFileRoundTripper(cfg.BearerTokenFile, rt)
}

if cfg.BasicAuth != nil {
rt = NewBasicAuthRoundTripper(cfg.BasicAuth.Username, cfg.BasicAuth.Password, cfg.BasicAuth.PasswordFile, rt)
if len(cfg.TLSConfig.CAFile) == 0 {
// No need for a RoundTripper that reloads the CA file automatically.
return newRT(tlsConfig)
}

// Return a new configured RoundTripper.
return rt, nil
return newTLSRoundTripper(tlsConfig, cfg.TLSConfig.CAFile, newRT)
}

type bearerAuthRoundTripper struct {
Expand All @@ -199,6 +217,12 @@ func (rt *bearerAuthRoundTripper) RoundTrip(req *http.Request) (*http.Response,
return rt.rt.RoundTrip(req)
}

func (rt *bearerAuthRoundTripper) CloseIdleConnections() {
if ci, ok := rt.rt.(closeIdler); ok {
ci.CloseIdleConnections()
}
}

type bearerAuthFileRoundTripper struct {
bearerFile string
rt http.RoundTripper
Expand All @@ -225,6 +249,12 @@ func (rt *bearerAuthFileRoundTripper) RoundTrip(req *http.Request) (*http.Respon
return rt.rt.RoundTrip(req)
}

func (rt *bearerAuthFileRoundTripper) CloseIdleConnections() {
if ci, ok := rt.rt.(closeIdler); ok {
ci.CloseIdleConnections()
}
}

type basicAuthRoundTripper struct {
username string
password Secret
Expand Down Expand Up @@ -255,6 +285,12 @@ func (rt *basicAuthRoundTripper) RoundTrip(req *http.Request) (*http.Response, e
return rt.rt.RoundTrip(req)
}

func (rt *basicAuthRoundTripper) CloseIdleConnections() {
if ci, ok := rt.rt.(closeIdler); ok {
ci.CloseIdleConnections()
}
}

// cloneRequest returns a clone of the provided *http.Request.
// The clone is a shallow copy of the struct and its Header map.
func cloneRequest(r *http.Request) *http.Request {
Expand All @@ -276,14 +312,13 @@ func NewTLSConfig(cfg *TLSConfig) (*tls.Config, error) {
// If a CA cert is provided then let's read it in so we can validate the
// scrape target's certificate properly.
if len(cfg.CAFile) > 0 {
caCertPool := x509.NewCertPool()
// Load CA cert.
caCert, err := ioutil.ReadFile(cfg.CAFile)
b, err := readCAFile(cfg.CAFile)
if err != nil {
return nil, fmt.Errorf("unable to use specified CA cert %s: %s", cfg.CAFile, err)
return nil, err
}
if !updateRootCA(tlsConfig, b) {
return nil, fmt.Errorf("unable to use specified CA cert %s", cfg.CAFile)
}
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
}

if len(cfg.ServerName) > 0 {
Expand All @@ -295,13 +330,12 @@ func NewTLSConfig(cfg *TLSConfig) (*tls.Config, error) {
} else if len(cfg.KeyFile) > 0 && len(cfg.CertFile) == 0 {
return nil, fmt.Errorf("client key file %q specified without client cert file", cfg.KeyFile)
} else if len(cfg.CertFile) > 0 && len(cfg.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, fmt.Errorf("unable to use specified client cert (%s) & key (%s): %s", cfg.CertFile, cfg.KeyFile, err)
// Verify that client cert and key are valid.
if _, err := cfg.getClientCertificate(nil); err != nil {
return nil, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
tlsConfig.GetClientCertificate = cfg.getClientCertificate
}
tlsConfig.BuildNameToCertificate()

return tlsConfig, nil
}
Expand All @@ -326,6 +360,125 @@ func (c *TLSConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return unmarshal((*plain)(c))
}

// getClientCertificate reads the pair of client cert and key from disk and returns a tls.Certificate.
func (c *TLSConfig) getClientCertificate(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile)
if err != nil {
return nil, fmt.Errorf("unable to use specified client cert (%s) & key (%s): %s", c.CertFile, c.KeyFile, err)
}
return &cert, nil
}

// readCAFile reads the CA cert file from disk.
func readCAFile(f string) ([]byte, error) {
data, err := ioutil.ReadFile(f)
if err != nil {
return nil, fmt.Errorf("unable to load specified CA cert %s: %s", f, err)
}
return data, nil
}

// updateRootCA parses the given byte slice as a series of PEM encoded certificates and updates tls.Config.RootCAs.
func updateRootCA(cfg *tls.Config, b []byte) bool {
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(b) {
return false
}
cfg.RootCAs = caCertPool
return true
}

// tlsRoundTripper is a RoundTripper that updates automatically its TLS
// configuration whenever the content of the CA file changes.
type tlsRoundTripper struct {
caFile string
// newRT returns a new RoundTripper.
newRT func(*tls.Config) (http.RoundTripper, error)

mtx sync.RWMutex
rt http.RoundTripper
hashCAFile []byte
tlsConfig *tls.Config
}

func newTLSRoundTripper(
cfg *tls.Config,
caFile string,
newRT func(*tls.Config) (http.RoundTripper, error),
) (http.RoundTripper, error) {
t := &tlsRoundTripper{
caFile: caFile,
newRT: newRT,
tlsConfig: cfg,
}

rt, err := t.newRT(t.tlsConfig)
if err != nil {
return nil, err
}
t.rt = rt

_, t.hashCAFile, err = t.getCAWithHash()
if err != nil {
return nil, err
}

return t, nil
}

func (t *tlsRoundTripper) getCAWithHash() ([]byte, []byte, error) {
b, err := readCAFile(t.caFile)
if err != nil {
return nil, nil, err
}
h := md5.Sum(b)
return b, h[:], nil

}

// RoundTrip implements the http.RoundTrip interface.
func (t *tlsRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
b, h, err := t.getCAWithHash()
if err != nil {
return nil, err
}

t.mtx.RLock()
equal := bytes.Equal(h[:], t.hashCAFile)
rt := t.rt
t.mtx.RUnlock()
if equal {
// The CA cert hasn't changed, use the existing RoundTripper.
return rt.RoundTrip(req)
}

// Create a new RoundTripper.
tlsConfig := t.tlsConfig.Clone()
if !updateRootCA(tlsConfig, b) {
return nil, fmt.Errorf("unable to use specified CA cert %s", t.caFile)
}
rt, err = t.newRT(tlsConfig)
if err != nil {
return nil, err
}
t.CloseIdleConnections()

t.mtx.Lock()
t.rt = rt
t.hashCAFile = h[:]
t.mtx.Unlock()

return rt.RoundTrip(req)
}

func (t *tlsRoundTripper) CloseIdleConnections() {
t.mtx.RLock()
defer t.mtx.RUnlock()
if ci, ok := t.rt.(closeIdler); ok {
ci.CloseIdleConnections()
}
}

func (c HTTPClientConfig) String() string {
b, err := yaml.Marshal(c)
if err != nil {
Expand Down
Loading