Skip to content

Commit

Permalink
add monitorType JMX
Browse files Browse the repository at this point in the history
  • Loading branch information
alaturqua committed Feb 5, 2025
1 parent 99cae08 commit 3e4a6fb
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 2 deletions.
50 changes: 50 additions & 0 deletions docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,56 @@ monitor:

Other timeout parameters are not applicable to the JDBC connection.

#### JMX

The monitor type `JMX` can be used as an alternative to collect cluster information,
which is required for the `QueryCountBasedRouterProvider`. This uses the `v1/jmx/mbean`
endpoint on Trino clusters.

To enable this:

[JMX monitoring](https://trino.io/docs/current/admin/jmx.html) must be activated on all Trino clusters with:

```properties
jmx.rmiregistry.port=<port>
jmx.rmiserver.port=<port>
```

Allow JMX endpoint access by adding rules to your [file-based access control](https://trino.io/docs/current/security/file-system-access-control.html)
configuration. Example for `user`:

```json
{
"catalogs": [
{
"user": "user",
"catalog": "system",
"allow": "read-only"
}
],
"system_information": [
{
"user": "user",
"allow": ["read"]
}
]
}
```

Ensure that a username and password are configured by adding the `backendState`
section to your configuration. The credentials must be consistent across all
backend clusters and have `read` rights on the `system_information`.

```yaml
backendState:
username: "user"
password: "password"
```

The JMX monitor will use these credentials to authenticate against the
JMX endpoint of each Trino cluster and collect metrics like running queries,
queued queries, and worker nodes information.

#### UI_API

This pulls cluster information from the `ui/api/stats` REST endpoint. This is
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.clustermonitor;

import com.fasterxml.jackson.databind.JsonNode;
import io.airlift.http.client.BasicAuthRequestFilter;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpRequestFilter;
import io.airlift.http.client.JsonResponseHandler;
import io.airlift.http.client.Request;
import io.airlift.http.client.UnexpectedResponseException;
import io.airlift.log.Logger;
import io.trino.gateway.ha.config.BackendStateConfiguration;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;

import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
import static io.airlift.http.client.Request.Builder.prepareGet;
import static io.airlift.json.JsonCodec.jsonCodec;
import static java.util.Objects.requireNonNull;

public class ClusterStatsJmxMonitor
implements ClusterStatsMonitor
{
private static final Logger log = Logger.get(ClusterStatsJmxMonitor.class);
private static final JsonResponseHandler<JsonNode> JMX_JSON_RESPONSE_HANDLER = createJsonResponseHandler(jsonCodec(JsonNode.class));
private static final String JMX_PATH = "/v1/jmx/mbean";

private final String username;
private final String password;
private final HttpClient client;

public ClusterStatsJmxMonitor(HttpClient client, BackendStateConfiguration backendStateConfiguration)
{
this.client = requireNonNull(client, "client is null");
this.username = backendStateConfiguration.getUsername();
this.password = backendStateConfiguration.getPassword();
}

private static void updateClusterStatsFromDiscoveryNodeManagerResponse(JmxResponse response, ClusterStats.Builder clusterStats)
{
try {
response.attributes().stream()
.filter(attribute -> "ActiveNodeCount".equals(attribute.name()))
.findFirst()
.ifPresent(attribute -> {
int activeNodes = attribute.value();
TrinoStatus trinoStatus = activeNodes > 0 ? TrinoStatus.HEALTHY : TrinoStatus.UNHEALTHY;
clusterStats.numWorkerNodes(activeNodes);
clusterStats.trinoStatus(trinoStatus);
log.debug("Processed DiscoveryNodeManager: ActiveNodeCount = %d, Health = %s", activeNodes, trinoStatus);
});
}
catch (Exception e) {
log.error(e, "Error parsing DiscoveryNodeManager stats");
clusterStats.trinoStatus(TrinoStatus.UNHEALTHY);
}
}

private static void updateClusterStatsFromQueryManagerResponse(JmxResponse response, ClusterStats.Builder clusterStats)
{
try {
Map<String, Integer> stats = response.attributes().stream()
.filter(attribute -> {
String attributeName = attribute.name();
return "QueuedQueries".equals(attributeName) || "RunningQueries".equals(attributeName);
})
.collect(Collectors.toMap(JmxAttribute::name, JmxAttribute::value));

int queuedQueryCount = stats.getOrDefault("QueuedQueries", 0);
clusterStats.queuedQueryCount(queuedQueryCount);
int runningQueryCount = stats.getOrDefault("RunningQueries", 0);
clusterStats.runningQueryCount(runningQueryCount);

log.debug(String.format("Processed QueryManager: QueuedQueries = %d, RunningQueries = %d", queuedQueryCount, runningQueryCount));
}
catch (Exception e) {
log.error(e, "Error parsing QueryManager stats");
}
}

@Override
public ClusterStats monitor(ProxyBackendConfiguration backend)
{
log.info("Monitoring cluster stats for backend: %s", backend.getProxyTo());
ClusterStats.Builder clusterStatsBuilder = ClusterStatsMonitor.getClusterStatsBuilder(backend);

clusterStatsBuilder.proxyTo(backend.getProxyTo())
.externalUrl(backend.getExternalUrl())
.routingGroup(backend.getRoutingGroup());

Optional<JmxResponse> discoveryResponse = queryJmx(backend, "trino.metadata:name=DiscoveryNodeManager");
Optional<JmxResponse> queryResponse = queryJmx(backend, "trino.execution:name=QueryManager");

if (discoveryResponse.isEmpty() || queryResponse.isEmpty()) {
clusterStatsBuilder.trinoStatus(TrinoStatus.UNHEALTHY);
return clusterStatsBuilder.build();
}

discoveryResponse.ifPresent(response -> updateClusterStatsFromDiscoveryNodeManagerResponse(response, clusterStatsBuilder));
queryResponse.ifPresent(response -> updateClusterStatsFromQueryManagerResponse(response, clusterStatsBuilder));

return clusterStatsBuilder.build();
}

private Optional<JmxResponse> queryJmx(ProxyBackendConfiguration backend, String mbeanName)
{
requireNonNull(backend, "backend is null");
requireNonNull(mbeanName, "mbeanName is null");

String jmxUrl = backend.getProxyTo();
Request preparedRequest = prepareGet()
.setUri(uriBuilderFrom(URI.create(jmxUrl))
.appendPath(JMX_PATH)
.appendPath(mbeanName)
.build())
.addHeader("X-Trino-User", username)
.build();

boolean isHttps = preparedRequest.getUri().getScheme().equalsIgnoreCase("https");

if (isHttps) {
HttpRequestFilter filter = new BasicAuthRequestFilter(username, password);
preparedRequest = filter.filterRequest(preparedRequest);
}

log.debug("Querying JMX at %s for %s", preparedRequest.getUri(), mbeanName);

try {
JsonNode response = client.execute(preparedRequest, JMX_JSON_RESPONSE_HANDLER);
return Optional.ofNullable(response).map(JmxResponse::fromJson);
}
catch (UnexpectedResponseException e) {
log.error(e, "Failed to fetch JMX data for %s, response code: %d", mbeanName, e.getStatusCode());
return Optional.empty();
}
catch (Exception e) {
log.error(e, "Exception while querying JMX at %s", jmxUrl);
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.clustermonitor;

import com.fasterxml.jackson.databind.JsonNode;

public record JmxAttribute(String name, int value)
{
public static JmxAttribute fromJson(JsonNode json)
{
return new JmxAttribute(
json.get("name").asText(),
json.get("value").asInt());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.clustermonitor;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.stream.StreamSupport;

import static com.google.common.collect.ImmutableList.toImmutableList;

public record JmxResponse(List<JmxAttribute> attributes)
{
public JmxResponse
{
attributes = ImmutableList.copyOf(attributes);
}

public static JmxResponse fromJson(JsonNode json)
{
List<JmxAttribute> attributes = StreamSupport.stream(json.get("attributes").spliterator(), false)
.map(JmxAttribute::fromJson)
.collect(toImmutableList());
return new JmxResponse(attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ public enum ClusterStatsMonitorType
NOOP,
INFO_API,
UI_API,
JDBC
JDBC,
JMX
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsInfoApiMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsJdbcMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsJmxMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsObserver;
import io.trino.gateway.ha.clustermonitor.ForMonitor;
Expand Down Expand Up @@ -228,6 +229,7 @@ public ClusterStatsMonitor getClusterStatsMonitor(@ForMonitor HttpClient httpCli
case INFO_API -> new ClusterStatsInfoApiMonitor(httpClient, configuration.getMonitor());
case UI_API -> new ClusterStatsHttpMonitor(configuration.getBackendState());
case JDBC -> new ClusterStatsJdbcMonitor(configuration.getBackendState(), configuration.getMonitor());
case JMX -> new ClusterStatsJmxMonitor(httpClient, configuration.getBackendState());
case NOOP -> new NoopClusterStatsMonitor();
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@
*/
package io.trino.gateway.ha.clustermonitor;

import com.google.common.net.MediaType;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpClientConfig;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.jetty.JettyHttpClient;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
import io.airlift.units.Duration;
import io.trino.gateway.ha.config.BackendStateConfiguration;
import io.trino.gateway.ha.config.MonitorConfiguration;
Expand All @@ -41,7 +46,8 @@ final class TestClusterStatsMonitor
void setUp()
{
trino = new TrinoContainer("trinodb/trino");
trino.withCopyFileToContainer(forClasspathResource("trino-config.properties"), "/etc/trino/config.properties");
trino.withCopyFileToContainer(forClasspathResource("trino-config-with-rmi.properties"), "/etc/trino/config.properties");
trino.withCopyFileToContainer(forClasspathResource("jvm-with-rmi.config"), "/etc/trino/jvm.config");
trino.start();
}

Expand All @@ -65,6 +71,63 @@ void testJdbcMonitor()
testClusterStatsMonitor(backendStateConfiguration -> new ClusterStatsJdbcMonitor(backendStateConfiguration, monitorConfigurationWithTimeout));
}

@Test
void testJmxMonitor()
{
testClusterStatsMonitor(backendStateConfiguration -> new ClusterStatsJmxMonitor(new JettyHttpClient(new HttpClientConfig()), backendStateConfiguration));
}

@Test
void testJmxMonitorWithBadRequest()
{
HttpClient client = new TestingHttpClient(ignored -> TestingResponse
.mockResponse(HttpStatus.BAD_REQUEST, MediaType.PLAIN_TEXT_UTF_8, "Bad Request"));

testClusterStatsMonitorWithClient(client);
}

@Test
void testJmxMonitorWithServerError()
{
HttpClient client = new TestingHttpClient(ignored -> TestingResponse
.mockResponse(HttpStatus.INTERNAL_SERVER_ERROR, MediaType.PLAIN_TEXT_UTF_8, "Internal Server Error"));

testClusterStatsMonitorWithClient(client);
}

@Test
void testJmxMonitorWithInvalidJson()
{
HttpClient client = new TestingHttpClient(ignored -> TestingResponse
.mockResponse(HttpStatus.OK, MediaType.JSON_UTF_8, "{invalid:json}"));

testClusterStatsMonitorWithClient(client);
}

@Test
void testJmxMonitorWithNetworkError()
{
HttpClient client = new TestingHttpClient(ignored -> {
throw new RuntimeException("Network error");
});

testClusterStatsMonitorWithClient(client);
}

private static void testClusterStatsMonitorWithClient(HttpClient client)
{
BackendStateConfiguration backendStateConfiguration = new BackendStateConfiguration();
backendStateConfiguration.setUsername("test_user");
ClusterStatsMonitor monitor = new ClusterStatsJmxMonitor(client, backendStateConfiguration);

ProxyBackendConfiguration proxyBackend = new ProxyBackendConfiguration();
proxyBackend.setProxyTo("http://localhost:8080");
proxyBackend.setName("test_cluster");

ClusterStats stats = monitor.monitor(proxyBackend);
assertThat(stats.trinoStatus()).isEqualTo(TrinoStatus.UNHEALTHY);
}

@Test
void testInfoApiMonitor()
{
Expand Down
Loading

0 comments on commit 3e4a6fb

Please sign in to comment.