Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[monitoring][broker][metadata] add metadata store metrics #17041

Merged
merged 15 commits into from
Sep 20, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public static MetadataStore createMetadataStore(Configuration conf) throws Metad
int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout"));
store = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder()
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.sessionTimeoutMillis(zkTimeout)
.build());
} catch (MetadataStoreException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,17 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura
this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore);
}

public static MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs)
public static MetadataStoreExtended createLocalMetadataStore(String serverUrls, int sessionTimeoutMs)
throws MetadataStoreException {
return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder()
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false).build());
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false)
.metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
}

public static MetadataStoreExtended createConfigMetadataStore(String serverUrls, int sessionTimeoutMs)
throws MetadataStoreException {
return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder()
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false)
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ private static void initializeCluster(Arguments arguments) throws Exception {
arguments.metadataStoreUrl, arguments.configurationMetadataStore);

MetadataStoreExtended localStore =
initMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis);
MetadataStoreExtended configStore = initMetadataStore(arguments.configurationMetadataStore,
initLocalMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis);
MetadataStoreExtended configStore = initConfigMetadataStore(arguments.configurationMetadataStore,
arguments.zkSessionTimeoutMillis);

final String metadataStoreUrlNoIdentifer = MetadataStoreFactoryImpl
Expand Down Expand Up @@ -388,9 +388,22 @@ static void createPartitionedTopic(MetadataStore configStore, TopicName topicNam
}
}

public static MetadataStoreExtended initMetadataStore(String connection, int sessionTimeout) throws Exception {
public static MetadataStoreExtended initLocalMetadataStore(String connection, int sessionTimeout) throws Exception {
MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
.sessionTimeoutMillis(sessionTimeout)
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.build());
if (store instanceof MetadataStoreLifecycle) {
((MetadataStoreLifecycle) store).initializeCluster().get();
}
return store;
}

public static MetadataStoreExtended initConfigMetadataStore(String connection, int sessionTimeout)
throws Exception {
MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
.sessionTimeoutMillis(sessionTimeout)
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
.build());
if (store instanceof MetadataStoreLifecycle) {
((MetadataStoreLifecycle) store).initializeCluster().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ public static void main(String[] args) throws Exception {

@Cleanup
MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.zookeeper,
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
MetadataStoreConfig.builder()
.sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.build());

if (arguments.bkMetadataServiceUri != null) {
@Cleanup
Expand All @@ -121,7 +124,8 @@ public static void main(String[] args) throws Exception {
// Should it be done by REST API before broker is down?
@Cleanup
MetadataStore configMetadataStore = MetadataStoreFactory.create(arguments.configurationStore,
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static int doMain(String[] args) throws Exception {
}

try (MetadataStore configStore = PulsarClusterMetadataSetup
.initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
.initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
PulsarResources pulsarResources = new PulsarResources(null, configStore);
for (String namespace : arguments.namespaces) {
NamespaceName namespaceName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void main(String[] args) throws Exception {
}

try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup
.initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
.initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
PulsarResources pulsarResources = new PulsarResources(null, configStore);
// Create system tenant
PulsarClusterMetadataSetup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
.synchronizer(synchronizer)
.build());
}
Expand Down Expand Up @@ -1043,6 +1044,7 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.synchronizer(synchronizer)
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public static void main(String[] args) throws Exception {
MetadataStoreExtended store = MetadataStoreExtended.create(brokerConfig.getMetadataStoreUrl(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int) brokerConfig.getMetadataStoreSessionTimeoutMillis())
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.build());

@Cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,21 +380,27 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
}

protected MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder().synchronizer(synchronizer).build());
return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder()
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.synchronizer(synchronizer).build());
}

protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeper);
return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder()
.metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
}

protected MetadataStoreExtended createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
return new ZKMetadataStore(mockZooKeeperGlobal,
MetadataStoreConfig.builder().synchronizer(synchronizer).build());
MetadataStoreConfig.builder()
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
.synchronizer(synchronizer).build());

}

protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeperGlobal);
return new ZKMetadataStore(mockZooKeeperGlobal, MetadataStoreConfig.builder()
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
}

private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats;

import com.google.common.collect.Multimap;
import java.io.ByteArrayOutputStream;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


@Test(groups = "broker")
public class MetadataStoreStatsTest extends BrokerTestBase {

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
super.baseSetup();
AuthenticationProviderToken.resetMetrics();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
resetConfig();
}

@Test
public void testMetadataStoreStats() throws Exception {
String ns = "prop/ns-abc1";
admin.namespaces().createNamespace(ns);

String topic = "persistent://prop/ns-abc1/metadata-store-" + UUID.randomUUID();
String subName = "my-sub1";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic).create();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic).subscriptionName(subName).subscribe();

for (int i = 0; i < 100; i++) {
producer.newMessage().value(UUID.randomUUID().toString()).send();
}

for (;;) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
}
consumer.acknowledge(message);
}

ByteArrayOutputStream output = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output);
String metricsStr = output.toString();
Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);

Collection<PrometheusMetricsTest.Metric> opsLatency = metricsMap.get("pulsar_metadata_store_ops_latency_ms" + "_sum");
Collection<PrometheusMetricsTest.Metric> putBytes = metricsMap.get("pulsar_metadata_store_put_bytes" + "_total");

Assert.assertTrue(opsLatency.size() > 1);
Assert.assertTrue(putBytes.size() > 1);

for (PrometheusMetricsTest.Metric m : opsLatency) {
Assert.assertEquals(m.tags.get("cluster"), "test");
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertNotNull(m.tags.get("status"));

if (m.tags.get("status").equals("success")) {
if (m.tags.get("type").equals("get")) {
Assert.assertTrue(m.value >= 0);
} else if (m.tags.get("type").equals("del")) {
Assert.assertTrue(m.value >= 0);
} else if (m.tags.get("type").equals("put")) {
Assert.assertTrue(m.value >= 0);
} else {
Assert.fail();
}
} else {
if (m.tags.get("type").equals("get")) {
Assert.assertTrue(m.value >= 0);
} else if (m.tags.get("type").equals("del")) {
Assert.assertTrue(m.value >= 0);
} else if (m.tags.get("type").equals("put")) {
Assert.assertTrue(m.value >= 0);
} else {
Assert.fail();
}
}
}
for (PrometheusMetricsTest.Metric m : putBytes) {
Assert.assertEquals(m.tags.get("cluster"), "test");
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertTrue(m.value > 0);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public void testSetupWithBkMetadataServiceUri() throws Exception {
PulsarClusterMetadataSetup.main(args);

try (MetadataStoreExtended localStore = PulsarClusterMetadataSetup
.initMetadataStore(zkConnection, 30000)) {
.initLocalMetadataStore(zkConnection, 30000)) {
// expected not exist
assertFalse(localStore.exists("/ledgers").get());

Expand All @@ -268,7 +268,7 @@ public void testSetupWithBkMetadataServiceUri() throws Exception {

PulsarClusterMetadataSetup.main(bookkeeperMetadataServiceUriArgs);
try (MetadataStoreExtended bookkeeperMetadataServiceUriStore = PulsarClusterMetadataSetup
.initMetadataStore(zkConnection, 30000)) {
.initLocalMetadataStore(zkConnection, 30000)) {
// expected not exist
assertFalse(bookkeeperMetadataServiceUriStore.exists("/ledgers").get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void setup() throws Exception {
}

service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected void setup() throws Exception {
config.setWebServicePort(Optional.of(0));
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
service.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void configTest(int numIoThreads, int connectionsPerBroker) throws Except
config.setServiceUrl("http://localhost:8080");
config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100");
WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
service.start();

PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void setup() throws Exception {
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
config.setCryptoKeyReaderFactoryClassName(CryptoKeyReaderFactoryImpl.class.getName());
WebSocketService service = spy(new WebSocketService(config));
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void setup() throws Exception {
config.setClusterName("test");
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void setup() throws Exception {
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
Expand Down
Loading