Skip to content

Commit

Permalink
Merge #119077
Browse files Browse the repository at this point in the history
119077: roachtest/cdc: fix cdc/kafka-auth r=stevendanna a=wenyihu6

From [kafka
2.0](https://kafka.apache.org/20/documentation.html#security_confighostname)
onwards, host name verification of servers is enabled by default.

Previously, the self-signed test certificate we generated for kafka-auth only
included “localhost” in the list of subject alternative names. However, kafka
appears to make internal connections using the fully qualified domain name. As a
result, some inter-broker communication has been failing with a hostname
verification error for some time. But the failure wasn’t raised to the user
until the sarama upgrade happened. This patch fixes the failure by adding the
proper hostname of the kafka node to the certificate.

We don’t believe this represents a meaningful customer-facing issue. The
misconfiguration of the test kafka cluster would have surfaced even with older
sarama versions if the test had involved more than just connecting to the kafka
cluster.

Fixes: #118525
Release note: none

Co-authored-by: Wenyi Hu <[email protected]>
  • Loading branch information
craig[bot] and wenyihu6 committed Feb 19, 2024
2 parents e39dafe + 7b70a3a commit b8cca1b
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1741,7 +1741,7 @@ func (t *testCerts) CACertBase64() string {
return base64.StdEncoding.EncodeToString([]byte(t.CACert))
}

func makeTestCerts(sinkNodeIP string) (*testCerts, error) {
func makeTestCerts(sinkNodeIP string, dnsNames ...string) (*testCerts, error) {
CAKey, err := rsa.GenerateKey(cryptorand.Reader, keyLength)
if err != nil {
return nil, errors.Wrap(err, "CA private key")
Expand All @@ -1757,7 +1757,7 @@ func makeTestCerts(sinkNodeIP string) (*testCerts, error) {
return nil, errors.Wrap(err, "CA cert gen")
}

SinkCert, err := generateSinkCert(sinkNodeIP, SinkKey, CACertSpec, CAKey)
SinkCert, err := generateSinkCert(sinkNodeIP, SinkKey, CACertSpec, CAKey, dnsNames...)
if err != nil {
return nil, errors.Wrap(err, "kafka cert gen")
}
Expand Down Expand Up @@ -1791,7 +1791,11 @@ func makeTestCerts(sinkNodeIP string) (*testCerts, error) {
}

func generateSinkCert(
sinkIP string, priv *rsa.PrivateKey, CACert *x509.Certificate, CAKey *rsa.PrivateKey,
sinkIP string,
priv *rsa.PrivateKey,
CACert *x509.Certificate,
CAKey *rsa.PrivateKey,
dnsNames ...string,
) ([]byte, error) {
ip := net.ParseIP(sinkIP)
if ip == nil {
Expand All @@ -1815,10 +1819,9 @@ func generateSinkCert(
NotAfter: timeutil.Now().Add(certLifetime),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDataEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageKeyAgreement,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
DNSNames: []string{"localhost"},
DNSNames: append([]string{"localhost"}, dnsNames...),
IPAddresses: []net.IP{ip},
}

return x509.CreateCertificate(cryptorand.Reader, certSpec, CACert, &priv.PublicKey, CAKey)
}

Expand Down Expand Up @@ -2390,7 +2393,13 @@ func (k kafkaManager) configureAuth(ctx context.Context) *testCerts {
}
kafkaIP := ips[0]

testCerts, err := makeTestCerts(kafkaIP)
details, err := k.c.RunWithDetailsSingleNode(ctx, k.t.L(), option.WithNodes(k.kafkaSinkNode), "hostname", "-f")
if err != nil {
k.t.Fatal(err)
}
hostname := strings.TrimSpace(details.Stdout)
k.t.L().Printf("hostname included in TLS certificates: %s", hostname)
testCerts, err := makeTestCerts(kafkaIP, hostname)
if err != nil {
k.t.Fatal(err)
}
Expand Down

0 comments on commit b8cca1b

Please sign in to comment.