diff --git a/datahub-frontend/app/react/controllers/TrackingController.java b/datahub-frontend/app/react/controllers/TrackingController.java index 2c02ba14c3e902..c02ab93a0de1db 100644 --- a/datahub-frontend/app/react/controllers/TrackingController.java +++ b/datahub-frontend/app/react/controllers/TrackingController.java @@ -2,9 +2,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.typesafe.config.Config; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; import react.auth.Authenticator; import javax.annotation.Nonnull; import javax.inject.Inject; @@ -75,6 +78,25 @@ private KafkaProducer createKafkaProducer() { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getString("analytics.kafka.bootstrap.server")); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Actor urn. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // JSON object. + + final String securityProtocolConfig = "analytics.kafka.security.protocol"; + if (_config.hasPath(securityProtocolConfig) + && _config.getString(securityProtocolConfig).equals(SecurityProtocol.SSL)) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, _config.getString(securityProtocolConfig)); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, _config.getString("analytics.kafka.ssl.key.password")); + + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, _config.getString("analytics.kafka.ssl.keystore.type")); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, _config.getString("analytics.kafka.ssl.keystore.location")); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, _config.getString("analytics.kafka.ssl.keystore.password")); + + props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, _config.getString("analytics.kafka.ssl.truststore.type")); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, _config.getString("analytics.kafka.ssl.truststore.location")); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, _config.getString("analytics.kafka.ssl.truststore.password")); + + props.put(SslConfigs.SSL_PROTOCOL_CONFIG, _config.getString("analytics.kafka.ssl.protocol")); + props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, _config.getString("analytics.kafka.ssl.endpoint.identification.algorithm")); + } + return new KafkaProducer(props); } } diff --git a/datahub-frontend/conf/application.conf b/datahub-frontend/conf/application.conf index 183a47096dc965..a6a27fc787ff0f 100644 --- a/datahub-frontend/conf/application.conf +++ b/datahub-frontend/conf/application.conf @@ -143,6 +143,18 @@ analytics.enabled = ${?DATAHUB_ANALYTICS_ENABLED} analytics.kafka.bootstrap.server = ${KAFKA_BOOTSTRAP_SERVER} analytics.tracking.topic = ${DATAHUB_TRACKING_TOPIC} +# Kafka Producer SSL Configs. All must be provided to enable SSL. +analytics.kafka.security.protocol = ${?KAFKA_PROPERTIES_SECURITY_PROTOCOL} +analytics.kafka.ssl.key.password = ${?KAFKA_PROPERTIES_SSL_KEY_PASSWORD} +analytics.kafka.ssl.keystore.type = ${?KAFKA_PROPERTIES_SSL_KEYSTORE_TYPE} +analytics.kafka.ssl.keystore.location = ${?KAFKA_PROPERTIES_SSL_KEYSTORE_LOCATION} +analytics.kafka.ssl.keystore.password = ${?KAFKA_PROPERTIES_SSL_KEYSTORE_PASSWORD} +analytics.kafka.ssl.truststore.type = ${?KAFKA_PROPERTIES_SSL_TRUSTSTORE_TYPE} +analytics.kafka.ssl.truststore.location = ${?KAFKA_PROPERTIES_SSL_TRUSTSTORE_LOCATION} +analytics.kafka.ssl.truststore.password = ${?KAFKA_PROPERTIES_SSL_TRUSTSTORE_PASSWORD} +analytics.kafka.ssl.protocol = ${?KAFKA_PROPERTIES_SSL_PROTOCOL} +analytics.kafka.ssl.endpoint.identification.algorithm = ${?KAFKA_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM} + # Required Elastic Client Configuration analytics.elastic.host = ${ELASTIC_CLIENT_HOST} analytics.elastic.port = ${ELASTIC_CLIENT_PORT}