Skip to content

Commit

Permalink
roachtest/cdc: fix cdc/kafka-auth
Browse files Browse the repository at this point in the history
From [kafka
2.0](https://kafka.apache.org/20/documentation.html#security_confighostname)
onwards, host name verification of servers is enabled by default.

This means that the "fake" certificate we generate and use for kafka-auth is no
longer valid and missing the `DNSNames` field. Since then, the verification had
been failing. But this error message was never surfaced back to us until sarama
upgrade happened. This patch fixes the failure by adding the missing fields in
the certificate.

Test history

1. Kafka-auth was working as expected. In this test, we generate and pass "fake"
certificates for inter-broker communication within the Kafka cluster.
2. Some changes were made in the java environment or kafka cluster
(https://kafka.apache.org/20/documentation.html#security_confighostname),
resulting in hostname verification which wasn't previously enforced. This means
that the "fake" certificate we generated before is no longer valid and missing
the `DNSNames` field. Since then, we’ve always been getting an error message in
our kafka server logs. But this error was never surfaced up in sarama code
during Dial() AND kafka-auth only checks the success of the CREATE stmt but not
emitting messages. So our test has always been passing.
3. Sarama upgrade changed how Dial() works and is now invoking some untouched
kafka code and surfacing the error.

Overall, this issue pertains to test misconfiguration and not directly
user-facing. But the sarama upgrade may lead to similar issues for customers due
to the wide possibilities of kafka configurations. In this case, we don't think
a release note is necessary because customers should have encountered this error
message. This issue has been around for a while and should be surfaced once the
customer uses anything beyond Dial() - when they try to emit messages to kafka
sink.

Fixes: cockroachdb#118525
Release note: none
  • Loading branch information
wenyihu6 committed Feb 15, 2024
1 parent b2e3187 commit 81419de
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1698,7 +1698,7 @@ func (t *testCerts) CACertBase64() string {
return base64.StdEncoding.EncodeToString([]byte(t.CACert))
}

func makeTestCerts(sinkNodeIP string) (*testCerts, error) {
func makeTestCerts(sinkNodeIP string, dnsNames ...string) (*testCerts, error) {
CAKey, err := rsa.GenerateKey(cryptorand.Reader, keyLength)
if err != nil {
return nil, errors.Wrap(err, "CA private key")
Expand All @@ -1714,7 +1714,7 @@ func makeTestCerts(sinkNodeIP string) (*testCerts, error) {
return nil, errors.Wrap(err, "CA cert gen")
}

SinkCert, err := generateSinkCert(sinkNodeIP, SinkKey, CACertSpec, CAKey)
SinkCert, err := generateSinkCert(sinkNodeIP, SinkKey, CACertSpec, CAKey, dnsNames...)
if err != nil {
return nil, errors.Wrap(err, "kafka cert gen")
}
Expand Down Expand Up @@ -1748,7 +1748,7 @@ func makeTestCerts(sinkNodeIP string) (*testCerts, error) {
}

func generateSinkCert(
sinkIP string, priv *rsa.PrivateKey, CACert *x509.Certificate, CAKey *rsa.PrivateKey,
sinkIP string, priv *rsa.PrivateKey, CACert *x509.Certificate, CAKey *rsa.PrivateKey, dnsNames ...string,
) ([]byte, error) {
ip := net.ParseIP(sinkIP)
if ip == nil {
Expand All @@ -1772,14 +1772,13 @@ func generateSinkCert(
NotAfter: timeutil.Now().Add(certLifetime),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDataEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageKeyAgreement,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
DNSNames: []string{"localhost"},
DNSNames: dnsNames,
IPAddresses: []net.IP{ip},
}

return x509.CreateCertificate(cryptorand.Reader, certSpec, CACert, &priv.PublicKey, CAKey)
}

func generateCACert(priv *rsa.PrivateKey) ([]byte, *x509.Certificate, error) {
func generateCACert(priv *rsa.PrivateKey, dnsNames ...string) ([]byte, *x509.Certificate, error) {
serial, err := randomSerial()
if err != nil {
return nil, nil, err
Expand All @@ -1800,6 +1799,7 @@ func generateCACert(priv *rsa.PrivateKey) ([]byte, *x509.Certificate, error) {
BasicConstraintsValid: true,
MaxPathLenZero: true,
}

cert, err := x509.CreateCertificate(cryptorand.Reader, certSpec, certSpec, &priv.PublicKey, priv)
return cert, certSpec, err
}
Expand Down Expand Up @@ -2347,7 +2347,12 @@ func (k kafkaManager) configureAuth(ctx context.Context) *testCerts {
}
kafkaIP := ips[0]

testCerts, err := makeTestCerts(kafkaIP)
details, err := k.c.RunWithDetailsSingleNode(ctx, k.t.L(), option.WithNodes(k.kafkaSinkNode), "hostname", "-f")
if err != nil {
k.t.Fatal(err)
}
k.t.L().Printf("hostname added to TLS certificates is %v", details.Stdout)
testCerts, err := makeTestCerts(kafkaIP, strings.TrimSpace(details.Stdout))
if err != nil {
k.t.Fatal(err)
}
Expand Down

0 comments on commit 81419de

Please sign in to comment.