Skip to content

Commit

Permalink
feat: Kafka es discontinues processing if eb publishing fails (#2214)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Sep 28, 2022
1 parent 63d4061 commit 8441b64
Show file tree
Hide file tree
Showing 102 changed files with 799 additions and 874 deletions.
4 changes: 2 additions & 2 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.
package common

import (
"fmt"
"reflect"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -110,7 +110,7 @@ const (
)

var (
ErrNilEventSource = errors.New("event source can't be nil")
ErrNilEventSource = fmt.Errorf("event source can't be nil")
)

// Miscellaneous Labels
Expand Down
8 changes: 4 additions & 4 deletions common/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package leaderelection
import (
"context"
"crypto/tls"
"fmt"

"github.com/fsnotify/fsnotify"
"github.com/nats-io/graft"
nats "github.com/nats-io/nats.go"
"github.com/pkg/errors"
"github.com/spf13/viper"
"go.uber.org/zap"

Expand Down Expand Up @@ -40,7 +40,7 @@ func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.Bus
eventBusType = apicommon.EventBusJetStream
eventBusAuth = &eventbusv1alpha1.AuthStrategyBasic
default:
return nil, errors.New("invalid event bus")
return nil, fmt.Errorf("invalid event bus")
}

var auth *eventbuscommon.Auth
Expand All @@ -56,7 +56,7 @@ func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.Bus
v.AddConfigPath(common.EventBusAuthFileMountPath)
err := v.ReadInConfig()
if err != nil {
return nil, errors.Errorf("failed to load auth.yaml. err: %+v", err)
return nil, fmt.Errorf("failed to load auth.yaml. err: %w", err)
}
err = v.Unmarshal(cred)
if err != nil {
Expand Down Expand Up @@ -91,7 +91,7 @@ func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.Bus
auth: auth,
}
default:
return nil, errors.New("invalid eventbus type")
return nil, fmt.Errorf("invalid eventbus type")
}
return elector, nil
}
Expand Down
5 changes: 2 additions & 3 deletions common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func Convert2WaitBackoff(backoff *apicommon.Backoff) (*wait.Backoff, error) {
return &result, nil
}

func Connect(backoff *apicommon.Backoff, conn func() error) error {
func DoWithRetry(backoff *apicommon.Backoff, f func() error) error {
if backoff == nil {
backoff = &DefaultBackoff
}
Expand All @@ -103,8 +103,7 @@ func Connect(backoff *apicommon.Backoff, conn func() error) error {
return fmt.Errorf("invalid backoff configuration, %w", err)
}
_ = wait.ExponentialBackoff(*b, func() (bool, error) {
if err = conn(); err != nil {
// return "false, err" will cover waitErr
if err = f(); err != nil {
return false, nil
}
return true, nil
Expand Down
10 changes: 5 additions & 5 deletions common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func TestRetryableKubeAPIError(t *testing.T) {
}

func TestConnect(t *testing.T) {
err := Connect(nil, func() error {
err := DoWithRetry(nil, func() error {
return fmt.Errorf("new error")
})
assert.NotNil(t, err)
assert.True(t, strings.Contains(err.Error(), "new error"))

err = Connect(nil, func() error {
err = DoWithRetry(nil, func() error {
return nil
})
assert.Nil(t, err)
Expand All @@ -58,7 +58,7 @@ func TestConnect(t *testing.T) {
func TestConnectDurationString(t *testing.T) {
start := time.Now()
count := 2
err := Connect(nil, func() error {
err := DoWithRetry(nil, func() error {
if count == 0 {
return nil
} else {
Expand All @@ -85,7 +85,7 @@ func TestConnectRetry(t *testing.T) {
}
count := 2
start := time.Now()
err := Connect(&backoff, func() error {
err := DoWithRetry(&backoff, func() error {
if count == 0 {
return nil
} else {
Expand All @@ -110,7 +110,7 @@ func TestRetryFailure(t *testing.T) {
Jitter: &jitter,
Steps: 2,
}
err := Connect(&backoff, func() error {
err := DoWithRetry(&backoff, func() error {
return fmt.Errorf("this is an error")
})
assert.NotNil(t, err)
Expand Down
17 changes: 8 additions & 9 deletions common/tls/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math/big"
"time"

"github.com/pkg/errors"
)

func certTemplate(org string, hosts []string, notAfter time.Time) (*x509.Certificate, error) {
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
return nil, errors.Wrap(err, "failed to generate serial number")
return nil, fmt.Errorf("failed to generate serial number, %w", err)
}
return &x509.Certificate{
SerialNumber: serialNumber,
Expand Down Expand Up @@ -61,17 +60,17 @@ func createCert(template, parent *x509.Certificate, pub, parentPriv interface{})
func createCA(org string, hosts []string, notAfter time.Time) (*rsa.PrivateKey, *x509.Certificate, []byte, error) {
rootKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed to generate random key")
return nil, nil, nil, fmt.Errorf("failed to generate random key, %w", err)
}

rootCertTmpl, err := createCACertTemplate(org, hosts, notAfter)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed to generate CA cert")
return nil, nil, nil, fmt.Errorf("failed to generate CA cert, %w", err)
}

rootCert, rootCertPEM, err := createCert(rootCertTmpl, rootCertTmpl, &rootKey.PublicKey, rootKey)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed to sign CA cert")
return nil, nil, nil, fmt.Errorf("failed to sign CA cert, %w", err)
}
return rootKey, rootCert, rootCertPEM, nil
}
Expand All @@ -82,7 +81,7 @@ func createCA(org string, hosts []string, notAfter time.Time) (*rsa.PrivateKey,
// can generate for both server and client but at least one must be specified
func CreateCerts(org string, hosts []string, notAfter time.Time, server bool, client bool) (serverKey, serverCert, caCert []byte, err error) {
if !server && !client {
return nil, nil, nil, errors.Wrap(err, "CreateCerts() must specify either server or client")
return nil, nil, nil, fmt.Errorf("CreateCerts() must specify either server or client")
}

// Create a CA certificate and private key
Expand All @@ -94,7 +93,7 @@ func CreateCerts(org string, hosts []string, notAfter time.Time, server bool, cl
// Create the private key
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed to generate random key")
return nil, nil, nil, fmt.Errorf("failed to generate random key, %w", err)
}
var cert *x509.Certificate

Expand All @@ -113,7 +112,7 @@ func CreateCerts(org string, hosts []string, notAfter time.Time, server bool, cl
// create a certificate wrapping the public key, sign it with the CA private key
_, certPEM, err := createCert(cert, caCertificate, &privateKey.PublicKey, caKey)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed to sign server cert")
return nil, nil, nil, fmt.Errorf("failed to sign server cert, %w", err)
}
privateKeyPEM := pem.EncodeToMemory(&pem.Block{
Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey),
Expand Down
8 changes: 4 additions & 4 deletions common/tls/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package tls
import (
"crypto/x509"
"encoding/pem"
"fmt"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -35,14 +35,14 @@ func validCertificate(cert []byte, t *testing.T) (*x509.Certificate, error) {
const certificate = "CERTIFICATE"
caCert, _ := pem.Decode(cert)
if caCert.Type != certificate {
return nil, errors.Errorf("CERT type mismatch, got %s, want: %s", caCert.Type, certificate)
return nil, fmt.Errorf("CERT type mismatch, got %s, want: %s", caCert.Type, certificate)
}
parsedCert, err := x509.ParseCertificate(caCert.Bytes)
if err != nil {
return nil, errors.Wrap(err, "failed to parse cert")
return nil, fmt.Errorf("failed to parse cert, %w", err)
}
if parsedCert.SignatureAlgorithm != x509.SHA256WithRSA {
return nil, errors.Errorf("signature not match. Got: %s, want: %s", parsedCert.SignatureAlgorithm, x509.SHA256WithRSA)
return nil, fmt.Errorf("signature not match. Got: %s, want: %s", parsedCert.SignatureAlgorithm, x509.SHA256WithRSA)
}
return parsedCert, nil
}
21 changes: 10 additions & 11 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"reflect"
"strings"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -119,7 +118,7 @@ func GetSecretValue(ctx context.Context, client kubernetes.Interface, namespace
}
val, ok := secret.Data[selector.Key]
if !ok {
return "", errors.Errorf("secret '%s' does not have the key '%s'", selector.Name, selector.Key)
return "", fmt.Errorf("secret '%s' does not have the key '%s'", selector.Name, selector.Key)
}
return string(val), nil
}
Expand Down Expand Up @@ -151,7 +150,7 @@ func GetSecretFromVolume(selector *v1.SecretKeySelector) (string, error) {
}
data, err := os.ReadFile(filePath)
if err != nil {
return "", errors.Wrapf(err, "failed to get secret value of name: %s, key: %s", selector.Name, selector.Key)
return "", fmt.Errorf("failed to get secret value of name: %s, key: %s, %w", selector.Name, selector.Key, err)
}
// Secrets edited by tools like "vim" always have an extra invisible "\n" in the end,
// and it's often neglected, but it makes differences for some of the applications.
Expand All @@ -161,7 +160,7 @@ func GetSecretFromVolume(selector *v1.SecretKeySelector) (string, error) {
// GetSecretVolumePath returns the path of the mounted secret
func GetSecretVolumePath(selector *v1.SecretKeySelector) (string, error) {
if selector == nil {
return "", errors.New("secret key selector is nil")
return "", fmt.Errorf("secret key selector is nil")
}
return fmt.Sprintf("/argo-events/secrets/%s/%s", selector.Name, selector.Key), nil
}
Expand All @@ -175,7 +174,7 @@ func GetConfigMapFromVolume(selector *v1.ConfigMapKeySelector) (string, error) {
}
data, err := os.ReadFile(filePath)
if err != nil {
return "", errors.Wrapf(err, "failed to get configMap value of name: %s, key: %s", selector.Name, selector.Key)
return "", fmt.Errorf("failed to get configMap value of name: %s, key: %s, %w", selector.Name, selector.Key, err)
}
// Contents edied by tools like "vim" always have an extra invisible "\n" in the end,
// and it's often negleted, but it makes differences for some of the applications.
Expand All @@ -185,7 +184,7 @@ func GetConfigMapFromVolume(selector *v1.ConfigMapKeySelector) (string, error) {
// GetConfigMapVolumePath returns the path of the mounted configmap
func GetConfigMapVolumePath(selector *v1.ConfigMapKeySelector) (string, error) {
if selector == nil {
return "", errors.New("configmap key selector is nil")
return "", fmt.Errorf("configmap key selector is nil")
}
return fmt.Sprintf("/argo-events/config/%s/%s", selector.Name, selector.Key), nil
}
Expand All @@ -211,7 +210,7 @@ func GenerateEnvFromConfigMapSpec(selector *v1.ConfigMapKeySelector) v1.EnvFromS
// GetTLSConfig returns a tls configuration for given cert and key or skips the certs if InsecureSkipVerify is true.
func GetTLSConfig(config *apicommon.TLSConfig) (*tls.Config, error) {
if config == nil {
return nil, errors.New("TLSConfig is nil")
return nil, fmt.Errorf("TLSConfig is nil")
}

if config.InsecureSkipVerify {
Expand Down Expand Up @@ -247,19 +246,19 @@ func GetTLSConfig(config *apicommon.TLSConfig) (*tls.Config, error) {

if len(caCertPath)+len(clientCertPath)+len(clientKeyPath) == 0 {
// None of 3 is configured
return nil, errors.New("invalid tls config, neither of caCertSecret, clientCertSecret and clientKeySecret is configured")
return nil, fmt.Errorf("invalid tls config, neither of caCertSecret, clientCertSecret and clientKeySecret is configured")
}

if len(clientCertPath)+len(clientKeyPath) > 0 && len(clientCertPath)*len(clientKeyPath) == 0 {
// Only one of clientCertSecret and clientKeySecret is configured
return nil, errors.New("invalid tls config, both of clientCertSecret and clientKeySecret need to be configured")
return nil, fmt.Errorf("invalid tls config, both of clientCertSecret and clientKeySecret need to be configured")
}

c := &tls.Config{}
if len(caCertPath) > 0 {
caCert, err := os.ReadFile(caCertPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to read ca cert file %s", caCertPath)
return nil, fmt.Errorf("failed to read ca cert file %s, %w", caCertPath, err)
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
Expand All @@ -269,7 +268,7 @@ func GetTLSConfig(config *apicommon.TLSConfig) (*tls.Config, error) {
if len(clientCertPath) > 0 && len(clientKeyPath) > 0 {
clientCert, err := tls.LoadX509KeyPair(clientCertPath, clientKeyPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to load client cert key pair %s", caCertPath)
return nil, fmt.Errorf("failed to load client cert key pair %s, %w", caCertPath, err)
}
c.Certificates = []tls.Certificate{clientCert}
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/eventbus/installer/exotic_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package installer

import (
"context"
"errors"
"fmt"

"go.uber.org/zap"

Expand All @@ -27,7 +27,7 @@ func NewExoticNATSInstaller(eventBus *v1alpha1.EventBus, logger *zap.SugaredLogg
func (i *exoticNATSInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) {
natsObj := i.eventBus.Spec.NATS
if natsObj == nil || natsObj.Exotic == nil {
return nil, errors.New("invalid request")
return nil, fmt.Errorf("invalid request")
}
i.eventBus.Status.MarkDeployed("Skipped", "Skip deployment because of using exotic config.")
i.logger.Info("use exotic config")
Expand Down
12 changes: 6 additions & 6 deletions controllers/eventbus/installer/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package installer

import (
"context"
"fmt"

"github.com/pkg/errors"
"go.uber.org/zap"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -49,7 +49,7 @@ func getInstaller(eventBus *v1alpha1.EventBus, client client.Client, config *con
} else if js := eventBus.Spec.JetStream; js != nil {
return NewJetStreamInstaller(client, eventBus, config, getLabels(eventBus), logger), nil
}
return nil, errors.New("invalid eventbus spec")
return nil, fmt.Errorf("invalid eventbus spec")
}

func getLabels(bus *v1alpha1.EventBus) map[string]string {
Expand All @@ -72,19 +72,19 @@ func Uninstall(ctx context.Context, eventBus *v1alpha1.EventBus, client client.C
linkedEventSources, err := linkedEventSources(ctx, eventBus.Namespace, eventBus.Name, client)
if err != nil {
logger.Errorw("failed to query linked EventSources", zap.Error(err))
return errors.Wrap(err, "failed to check if there is any EventSource linked")
return fmt.Errorf("failed to check if there is any EventSource linked, %w", err)
}
if linkedEventSources > 0 {
return errors.Errorf("Can not delete an EventBus with %v EventSources connected", linkedEventSources)
return fmt.Errorf("Can not delete an EventBus with %v EventSources connected", linkedEventSources)
}

linkedSensors, err := linkedSensors(ctx, eventBus.Namespace, eventBus.Name, client)
if err != nil {
logger.Errorw("failed to query linked Sensors", zap.Error(err))
return errors.Wrap(err, "failed to check if there is any Sensor linked")
return fmt.Errorf("failed to check if there is any Sensor linked, %w", err)
}
if linkedSensors > 0 {
return errors.Errorf("Can not delete an EventBus with %v Sensors connected", linkedSensors)
return fmt.Errorf("Can not delete an EventBus with %v Sensors connected", linkedSensors)
}

installer, err := getInstaller(eventBus, client, config, logger)
Expand Down
Loading

0 comments on commit 8441b64

Please sign in to comment.