diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index f74d6ad139bbd..1d6bce52f7c82 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -96,6 +96,19 @@ + + org.apache.maven.plugins + maven-checkstyle-plugin + + + checkstyle + verify + + check + + + + diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index cb35b881d08c7..18437acb53abb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -64,7 +64,8 @@ public void setConf(Configuration conf) { super.setConf(conf); Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE); if (storeProperty == null) { - throw new RuntimeException(METADATA_STORE_INSTANCE + " configuration was not set in the BK client configuration"); + throw new RuntimeException(METADATA_STORE_INSTANCE + " configuration was not set in the BK client " + + "configuration"); } if (!(storeProperty instanceof MetadataStore)) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index 3d0f995817482..fed8943e7e9fb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -178,13 +178,15 @@ private static Optional getEnsemblePlacementPolic return Optional.empty(); } - private static Pair, Set> getIsolationGroup(EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) { + private static Pair, Set> getIsolationGroup( + EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) { MutablePair, Set> pair = new MutablePair<>(); String className = IsolatedBookieEnsemblePlacementPolicy.class.getName(); if (ensemblePlacementPolicyConfig.getPolicyClass().getName().equals(className)) { Map properties = ensemblePlacementPolicyConfig.getProperties(); String primaryIsolationGroupString = castToString(properties.getOrDefault(ISOLATION_BOOKIE_GROUPS, "")); - String secondaryIsolationGroupString = castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, "")); + String secondaryIsolationGroupString = + castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, "")); if (!primaryIsolationGroupString.isEmpty()) { pair.setLeft(new HashSet(Arrays.asList(primaryIsolationGroupString.split(",")))); } @@ -247,8 +249,8 @@ private Set getBlacklistedBookiesWithIsolationGroups(int ensembleSize, // if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well. if (totalAvailableBookiesInPrimaryGroup < ensembleSize) { log.info( - "Not found enough available-bookies from primary isolation group [{}], checking secondary group [{}]", - primaryIsolationGroup, secondaryIsolationGroup); + "Not found enough available-bookies from primary isolation group [{}], checking secondary " + + "group [{}]", primaryIsolationGroup, secondaryIsolationGroup); for (String group : secondaryIsolationGroup) { Map bookieGroup = allGroupsBookieMapping.get(group); if (bookieGroup != null && !bookieGroup.isEmpty()) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/package-info.java new file mode 100644 index 0000000000000..fd1f46466bbf9 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.bookie.rackawareness; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 37bf36e30b102..af6ec5a59aed6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -18,11 +18,8 @@ */ package org.apache.pulsar.broker; - -import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import io.netty.util.internal.PlatformDependent; import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedHashSet; @@ -104,12 +101,13 @@ public class ServiceConfiguration implements PulsarConfiguration { @Category private static final String CATEGORY_PLUGIN = "Broker Plugin"; - /***** --- pulsar configuration --- ****/ + /***** --- pulsar configuration. --- ****/ @FieldContext( category = CATEGORY_SERVER, required = false, deprecated = true, - doc = "The Zookeeper quorum connection string (as a comma-separated list). Deprecated in favour of metadataStoreUrl" + doc = "The Zookeeper quorum connection string (as a comma-separated list). Deprecated in favour of " + + "metadataStoreUrl" ) @Getter(AccessLevel.NONE) private String zookeeperServers; @@ -117,11 +115,12 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SERVER, required = false, - doc = "The metadata store URL. \n" + - " Examples: \n" + - " * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181\n" + - " * my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 (will default to ZooKeeper when the schema is not specified)\n" + - " * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181/my-chroot-path (to add a ZK chroot path)\n" + doc = "The metadata store URL. \n" + + " Examples: \n" + + " * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181\n" + + " * my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 (will default to ZooKeeper when the schema is not " + + "specified)\n" + + " * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181/my-chroot-path (to add a ZK chroot path)\n" ) private String metadataStoreUrl; @@ -205,7 +204,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String advertisedAddress; - @FieldContext(category=CATEGORY_SERVER, + @FieldContext(category = CATEGORY_SERVER, doc = "Used to specify multiple advertised listeners for the broker." + " The value must format as :pulsar://:," + "multiple listeners should separate with commas." @@ -213,13 +212,13 @@ public class ServiceConfiguration implements PulsarConfiguration { + "The Default value is absent means use advertisedAddress and brokerServicePort.") private String advertisedListeners; - @FieldContext(category=CATEGORY_SERVER, + @FieldContext(category = CATEGORY_SERVER, doc = "Used to specify the internal listener name for the broker." + "The listener name must contain in the advertisedListeners." + "The Default value is absent, the broker uses the first listener as the internal listener.") private String internalListenerName; - @FieldContext(category=CATEGORY_SERVER, + @FieldContext(category = CATEGORY_SERVER, doc = "Used to specify additional bind addresses for the broker." + " The value must format as :://:," + " multiple bind addresses should be separated with commas." @@ -228,7 +227,7 @@ public class ServiceConfiguration implements PulsarConfiguration { + " webServicePortTls properties define additional bindings.") private String bindAddresses; - @FieldContext(category=CATEGORY_SERVER, + @FieldContext(category = CATEGORY_SERVER, doc = "Enable or disable the proxy protocol.") private boolean haProxyProtocolEnabled; @@ -297,8 +296,10 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.") private boolean delayedDeliveryEnabled = true; - @FieldContext(category = CATEGORY_SERVER, doc = "Class name of the factory that implements the delayed deliver tracker") - private String delayedDeliveryTrackerFactoryClassName = "org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory"; + @FieldContext(category = CATEGORY_SERVER, doc = "Class name of the factory that implements the delayed deliver " + + "tracker") + private String delayedDeliveryTrackerFactoryClassName = "org.apache.pulsar.broker.delayed" + + ".InMemoryDelayedDeliveryTrackerFactory"; @FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for when retrying on delayed delivery, " + " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.") @@ -367,7 +368,8 @@ public class ServiceConfiguration implements PulsarConfiguration { private boolean skipBrokerShutdownOnOOM = false; @FieldContext( category = CATEGORY_SERVER, - doc = "Amount of seconds to timeout when loading a topic. In situations with many geo-replicated clusters, this may need raised." + doc = "Amount of seconds to timeout when loading a topic. In situations with many geo-replicated clusters, " + + "this may need raised." ) private long topicLoadTimeoutSeconds = 60; @@ -411,8 +413,8 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_POLICIES, doc = "Whether to enable precise time based backlog quota check. " + "Enabling precise time based backlog quota check will cause broker to read first entry in backlog " - + "of the slowest cursor on a ledger which will mostly result in reading entry from BookKeeper's " + - "disk which can have negative impact on overall performance. " + + "of the slowest cursor on a ledger which will mostly result in reading entry from BookKeeper's " + + "disk which can have negative impact on overall performance. " + "Disabling precise time based backlog quota check will just use the timestamp indicating when a " + "ledger was closed, which is of coarser granularity." ) @@ -441,8 +443,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, - doc = "Default per-topic backlog quota limit by time in second, less than 0 means no limitation. " + - "default is -1. Increase it if you want to allow larger msg backlog" + doc = "Default per-topic backlog quota limit by time in second, less than 0 means no limitation. " + + "default is -1. Increase it if you want to allow larger msg backlog" ) private int backlogQuotaDefaultLimitSecond = -1; @@ -454,7 +456,8 @@ public class ServiceConfiguration implements PulsarConfiguration { + "'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer\n" + "'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog" ) - private BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy = BacklogQuota.RetentionPolicy.producer_request_hold; + private BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy = BacklogQuota.RetentionPolicy + .producer_request_hold; @FieldContext( category = CATEGORY_POLICIES, @@ -473,7 +476,8 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_POLICIES, doc = "Metadata of inactive partitioned topic will not be automatically cleaned up by default.\n" + "Note: If `allowAutoTopicCreation` and this option are enabled at the same time,\n" - + "it may appear that a partitioned topic has just been deleted but is automatically created as a non-partitioned topic." + + "it may appear that a partitioned topic has just been deleted but is automatically created as a " + + "non-partitioned topic." ) private boolean brokerDeleteInactivePartitionedTopicMetadataEnabled = false; @FieldContext( @@ -485,11 +489,13 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, doc = "Set the inactive topic delete mode. Default is delete_when_no_subscriptions\n" - + "'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active producers\n" - + "'delete_when_subscriptions_caught_up' mode only delete the topic that all subscriptions has no backlogs(caught up)" - + "and no active producers/consumers" + + "'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active " + + "producers\n" + + "'delete_when_subscriptions_caught_up' mode only delete the topic that all subscriptions has no " + + "backlogs(caught up) and no active producers/consumers" ) - private InactiveTopicDeleteMode brokerDeleteInactiveTopicsMode = InactiveTopicDeleteMode.delete_when_no_subscriptions; + private InactiveTopicDeleteMode brokerDeleteInactiveTopicsMode = InactiveTopicDeleteMode. + delete_when_no_subscriptions; @FieldContext( category = CATEGORY_POLICIES, @@ -563,8 +569,8 @@ public class ServiceConfiguration implements PulsarConfiguration { private boolean subscriptionKeySharedEnable = true; @FieldContext(category = CATEGORY_POLICIES, - doc = "On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or " + - "consistent hashing to reassign keys to new consumers") + doc = "On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or " + + "consistent hashing to reassign keys to new consumers") private boolean subscriptionKeySharedUseConsistentHashing = false; @FieldContext( @@ -589,7 +595,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, - doc = "How often is the thread pool scheduled to check whether a snapshot needs to be taken.(disable with value 0)" + doc = "How often is the thread pool scheduled to check whether a snapshot needs to be taken." + + "(disable with value 0)" ) private int brokerDeduplicationSnapshotFrequencyInSeconds = 120; @FieldContext( @@ -924,8 +931,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SERVER, - doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed " + - "in subsequent release." + doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed " + + "in subsequent release." ) private boolean streamingDispatch = false; @@ -1060,18 +1067,19 @@ public class ServiceConfiguration implements PulsarConfiguration { + " It's shared across all the topics running in the same broker.\n\n" + " Use -1 to disable the memory limitation. Default is 1/2 of direct memory.\n\n") private int maxMessagePublishBufferSizeInMB = Math.max(64, - (int) (PlatformDependent.maxDirectMemory() / 2 / (1024 * 1024))); + (int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 2 / (1024 * 1024))); @FieldContext( category = CATEGORY_SERVER, - doc = "Interval between checks to see if message publish buffer size is exceed the max message publish buffer size" + doc = "Interval between checks to see if message publish buffer size is exceed the max message publish " + + "buffer size" ) private int messagePublishBufferCheckIntervalInMillis = 100; - @FieldContext(category = CATEGORY_SERVER, doc = "Whether to recover cursors lazily when trying to recover a " + - "managed ledger backing a persistent topic. It can improve write availability of topics.\n" + - "The caveat is now when recovered ledger is ready to write we're not sure if all old consumers last mark " + - "delete position can be recovered or not.") + @FieldContext(category = CATEGORY_SERVER, doc = "Whether to recover cursors lazily when trying to recover a " + + "managed ledger backing a persistent topic. It can improve write availability of topics.\n" + + "The caveat is now when recovered ledger is ready to write we're not sure if all old consumers last mark " + + "delete position can be recovered or not.") private boolean lazyCursorRecovery = false; @FieldContext( @@ -1111,7 +1119,8 @@ public class ServiceConfiguration implements PulsarConfiguration { private Set brokerEntryPayloadProcessors = new LinkedHashSet<>(); @FieldContext( - doc = "There are two policies to apply when broker metadata session expires: session expired happens, \"shutdown\" or \"reconnect\". \n\n" + doc = "There are two policies to apply when broker metadata session expires: session expired happens, " + + "\"shutdown\" or \"reconnect\". \n\n" + " With \"shutdown\", the broker will be restarted.\n\n" + " With \"reconnect\", the broker will keep serving the topics, while attempting to recreate a new session." ) @@ -1124,7 +1133,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int topicFencingTimeoutSeconds = 0; - /**** --- Messaging Protocols --- ****/ + /**** --- Messaging Protocol. --- ****/ @FieldContext( category = CATEGORY_PROTOCOLS, @@ -1158,8 +1167,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SERVER, - doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " + - "please enable the system topic first.") + doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " + + "please enable the system topic first.") private boolean topicLevelPoliciesEnabled = false; @FieldContext( @@ -1174,12 +1183,13 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SERVER, - doc = "Enable namespaceIsolation policy update take effect ontime or not," + - " if set to ture, then the related namespaces will be unloaded after reset policy to make it take effect." + doc = "Enable namespaceIsolation policy update take effect ontime or not," + + " if set to ture, then the related namespaces will be unloaded after reset policy to make it " + + "take effect." ) private boolean enableNamespaceIsolationUpdateOnTime = false; - /***** --- TLS --- ****/ + /***** --- TLS. --- ****/ @FieldContext( category = CATEGORY_TLS, doc = "Enable TLS" @@ -1229,7 +1239,7 @@ public class ServiceConfiguration implements PulsarConfiguration { + "the Connection if the Client Certificate is not trusted") private boolean tlsRequireTrustedClientCertOnConnect = false; - /***** --- Authentication --- ****/ + /***** --- Authentication. --- ****/ @FieldContext( category = CATEGORY_AUTHENTICATION, doc = "Enable authentication" @@ -1332,7 +1342,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_HTTP, - doc = "Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 (Too many requests)" + doc = "Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 " + + "(Too many requests)" ) private double httpRequestsMaxPerSecond = 100.0; @@ -1355,7 +1366,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String kinitCommand = "/usr/bin/kinit"; - /**** --- BookKeeper Client --- ****/ + /**** --- BookKeeper Client. --- ****/ @FieldContext( category = CATEGORY_STORAGE_BK, doc = "Metadata service uri that bookkeeper is used for loading corresponding metadata driver" @@ -1424,8 +1435,8 @@ public class ServiceConfiguration implements PulsarConfiguration { private long bookkeeperClientHealthCheckQuarantineTimeInSeconds = 1800; @FieldContext( category = CATEGORY_STORAGE_BK, - doc = "bookie quarantine ratio to avoid all clients quarantine " + - "the high pressure bookie servers at the same time" + doc = "bookie quarantine ratio to avoid all clients quarantine " + + "the high pressure bookie servers at the same time" ) private double bookkeeperClientQuarantineRatio = 1.0; @FieldContext( @@ -1445,8 +1456,9 @@ public class ServiceConfiguration implements PulsarConfiguration { private int bookkeeperClientMinNumRacksPerWriteQuorum = 2; @FieldContext( category = CATEGORY_STORAGE_BK, - doc = "Enforces rack-aware bookie selection policy to pick bookies from 'bookkeeperClientMinNumRacksPerWriteQuorum' racks for " - + "a writeQuorum. \n\nIf BK can't find bookie then it would throw BKNotEnoughBookiesException instead of picking random one.") + doc = "Enforces rack-aware bookie selection policy to pick bookies from " + + "'bookkeeperClientMinNumRacksPerWriteQuorum' racks for a writeQuorum. \n\n" + + "If BK can't find bookie then it would throw BKNotEnoughBookiesException instead of picking random one.") private boolean bookkeeperClientEnforceMinNumRacksPerWriteQuorum = false; @FieldContext( category = CATEGORY_STORAGE_BK, @@ -1461,7 +1473,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_STORAGE_BK, required = false, - doc = "Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough bookie available." + doc = "Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough " + + "bookie available." ) private String bookkeeperClientSecondaryIsolationGroups; @@ -1471,10 +1484,10 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Set the interval to retry a failed bookie info lookup") private int bookkeeperClientGetBookieInfoRetryIntervalSeconds = 60; - @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable/disable having read operations for a ledger to be sticky to " - + "a single bookie.\n" + - "If this flag is enabled, the client will use one single bookie (by " + - "preference) to read all entries for a ledger.") + @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable/disable having read operations for a ledger to be " + + "sticky to a single bookie.\n" + + "If this flag is enabled, the client will use one single bookie (by " + + "preference) to read all entries for a ledger.") private boolean bookkeeperEnableStickyReads = true; @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Set the client security provider factory class name. " @@ -1507,13 +1520,15 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Path for the trusted TLS certificate file") private String bookkeeperTLSTrustCertsFilePath; - @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Tls cert refresh duration at bookKeeper-client in seconds (0 to disable check)") + @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Tls cert refresh duration at bookKeeper-client in seconds (0 " + + "to disable check)") private int bookkeeperTlsCertFilesRefreshDurationSeconds = 300; @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable/disable disk weight based placement. Default is false") private boolean bookkeeperDiskWeightBasedPlacementEnabled = false; - @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Set the interval to check the need for sending an explicit LAC") + @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Set the interval to check the need for sending an explicit " + + "LAC") private int bookkeeperExplicitLacIntervalInMills = 0; @FieldContext( @@ -1535,7 +1550,7 @@ public class ServiceConfiguration implements PulsarConfiguration { private int bookkeeperClientNumWorkerThreads = Runtime.getRuntime().availableProcessors(); - /**** --- Managed Ledger --- ****/ + /**** --- Managed Ledger. --- ****/ @FieldContext( minValue = 1, category = CATEGORY_STORAGE_ML, @@ -1557,7 +1572,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(minValue = 1, category = CATEGORY_STORAGE_ML, - doc = "How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). Default is 60 seconds") + doc = "How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds)." + + " Default is 60 seconds") private int managedLedgerCursorPositionFlushSeconds = 60; @FieldContext(minValue = 1, @@ -1604,8 +1620,9 @@ public class ServiceConfiguration implements PulsarConfiguration { + " memory is allocated from JVM direct memory and it's shared across all the topics" + " running in the same broker. By default, uses 1/5th of available direct memory") private int managedLedgerCacheSizeMB = Math.max(64, - (int) (PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024))); - @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when inserting in cache") + (int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024))); + @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when " + + "inserting in cache") private boolean managedLedgerCacheCopyEntries = false; @FieldContext( category = CATEGORY_STORAGE_ML, @@ -1629,7 +1646,7 @@ public class ServiceConfiguration implements PulsarConfiguration { private double managedLedgerDefaultMarkDeleteRateLimit = 1.0; @FieldContext( category = CATEGORY_STORAGE_ML, - doc = "Allow automated creation of topics if set to true (default value)." + doc = "Allow automated creation of topics if set to true (default value)." ) private boolean allowAutoTopicCreation = true; @FieldContext( @@ -1784,7 +1801,7 @@ public class ServiceConfiguration implements PulsarConfiguration { + "If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly.") private String managedLedgerInfoCompressionType = "NONE"; - /*** --- Load balancer --- ****/ + /*** --- Load balancer. --- ****/ @FieldContext( category = CATEGORY_LOAD_BALANCER, doc = "Enable load balancer" @@ -1794,14 +1811,16 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_LOAD_BALANCER, deprecated = true, - doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)" + doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by " + + "SimpleLoadManagerImpl)" ) private String loadBalancerPlacementStrategy = "leastLoadedServer"; // weighted random selection @FieldContext( category = CATEGORY_LOAD_BALANCER, dynamic = true, - doc = "load balance load shedding strategy (It requires broker restart if value is changed using dynamic config)" + doc = "load balance load shedding strategy (It requires broker restart if value is changed using dynamic " + + "config)" ) private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.OverloadShedder"; @@ -2006,7 +2025,8 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_LOAD_BALANCER, doc = "Supported algorithms name for namespace bundle split" ) - private List supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", "topic_count_equally_divide"); + private List supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", + "topic_count_equally_divide"); @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, @@ -2026,7 +2046,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private long namespaceBundleUnloadingTimeoutMs = 60000; - /**** --- Replication --- ****/ + /**** --- Replication. --- ****/ @FieldContext( category = CATEGORY_REPLICATION, doc = "Enable replication metrics" @@ -2091,8 +2111,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, deprecated = true, - doc = "How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)" - + "Deprecated in favor of using `brokerDeleteInactiveTopicsFrequencySeconds`" + doc = "How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one " + + "connected) Deprecated in favor of using `brokerDeleteInactiveTopicsFrequencySeconds`" ) private int brokerServicePurgeInactiveFrequencyInSeconds = 60; @FieldContext( @@ -2102,7 +2122,7 @@ public class ServiceConfiguration implements PulsarConfiguration { private List bootstrapNamespaces = new ArrayList(); @ToString.Exclude - @JsonIgnore + @com.fasterxml.jackson.annotation.JsonIgnore private Properties properties = new Properties(); @FieldContext( @@ -2128,8 +2148,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SERVER, - doc = "Timeout for the compaction phase one loop, If the execution time of the compaction " + - "phase one loop exceeds this time, the compaction will not proceed." + doc = "Timeout for the compaction phase one loop, If the execution time of the compaction " + + "phase one loop exceeds this time, the compaction will not proceed." ) private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30; @@ -2146,7 +2166,8 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_SCHEMA, doc = "The schema storage implementation used by this broker" ) - private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"; + private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema" + + ".BookkeeperSchemaStorageFactory"; @FieldContext( category = CATEGORY_SCHEMA, @@ -2165,7 +2186,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.UNDEFINED; - /**** --- WebSocket --- ****/ + /**** --- WebSocket. --- ****/ @FieldContext( category = CATEGORY_WEBSOCKET, doc = "Number of IO threads in Pulsar Client used in WebSocket proxy" @@ -2188,7 +2209,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int webSocketMaxTextFrameSize = 1048576; - /**** --- Metrics --- ****/ + /**** --- Metrics. --- ****/ @FieldContext( category = CATEGORY_METRICS, doc = "If true, export topic level metrics otherwise namespace level" @@ -2221,35 +2242,35 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_METRICS, - doc = "Enable expose the precise backlog stats.\n" + - " Set false to use published counter and consumed counter to calculate,\n" + - " this would be more efficient but may be inaccurate. Default is false." + doc = "Enable expose the precise backlog stats.\n" + + " Set false to use published counter and consumed counter to calculate,\n" + + " this would be more efficient but may be inaccurate. Default is false." ) private boolean exposePreciseBacklogInPrometheus = false; @FieldContext( category = CATEGORY_METRICS, - doc = "Time in milliseconds that metrics endpoint would time out. Default is 30s.\n" + - " Increase it if there are a lot of topics to expose topic-level metrics.\n" + - " Set it to 0 to disable timeout." + doc = "Time in milliseconds that metrics endpoint would time out. Default is 30s.\n" + + " Increase it if there are a lot of topics to expose topic-level metrics.\n" + + " Set it to 0 to disable timeout." ) private long metricsServletTimeoutMs = 30000; @FieldContext( category = CATEGORY_METRICS, - doc = "Enable expose the backlog size for each subscription when generating stats.\n" + - " Locking is used for fetching the status so default to false." + doc = "Enable expose the backlog size for each subscription when generating stats.\n" + + " Locking is used for fetching the status so default to false." ) private boolean exposeSubscriptionBacklogSizeInPrometheus = false; @FieldContext( category = CATEGORY_METRICS, - doc = "Enable splitting topic and partition label in Prometheus.\n" + - " If enabled, a topic name will split into 2 parts, one is topic name without partition index,\n" + - " another one is partition index, e.g. (topic=xxx, partition=0).\n" + - " If the topic is a non-partitioned topic, -1 will be used for the partition index.\n" + - " If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)\n" + - " Default is false." + doc = "Enable splitting topic and partition label in Prometheus.\n" + + " If enabled, a topic name will split into 2 parts, one is topic name without partition index,\n" + + " another one is partition index, e.g. (topic=xxx, partition=0).\n" + + " If the topic is a non-partitioned topic, -1 will be used for the partition index.\n" + + " If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)\n" + + " Default is false." ) private boolean splitTopicAndPartitionLabelInPrometheus = false; @@ -2259,7 +2280,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private boolean exposeBundlesMetricsInPrometheus = false; - /**** --- Functions --- ****/ + /**** --- Functions. --- ****/ @FieldContext( category = CATEGORY_FUNCTIONS, doc = "Flag indicates enabling or disabling function worker on brokers" @@ -2272,7 +2293,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String functionsWorkerServiceNarPackage = ""; - /**** --- Broker Web Stats --- ****/ + /**** --- Broker Web Stats. --- ****/ @FieldContext( category = CATEGORY_METRICS, doc = "If true, export publisher stats when returning topics stats from the admin rest api" @@ -2289,7 +2310,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int statsUpdateInitialDelayInSecs = 60; - /**** --- Ledger Offloading --- ****/ + /**** --- Ledger Offloading. --- ****/ /**** * NOTES: all implementation related settings should be put in implementation package. * only common settings like driver name, io threads can be added here. @@ -2324,7 +2345,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int managedLedgerOffloadPrefetchRounds = 1; - /**** --- Transaction config variables --- ****/ + /**** --- Transaction config variables. --- ****/ @FieldContext( category = CATEGORY_TRANSACTION, doc = "Enable transaction coordinator in broker" @@ -2371,7 +2392,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int transactionBufferSnapshotMinTimeInMillis = 5000; - /**** --- KeyStore TLS config variables --- ****/ + /**** --- KeyStore TLS config variables. --- ****/ @FieldContext( category = CATEGORY_KEYSTORE_TLS, doc = "Enable TLS with KeyStore type configuration in broker" @@ -2420,7 +2441,7 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String tlsTrustStorePassword = null; - /**** --- KeyStore TLS config variables used for internal client/admin to auth with other broker--- ****/ + /**** --- KeyStore TLS config variables used for internal client/admin to auth with other broker. --- ****/ @FieldContext( category = CATEGORY_KEYSTORE_TLS, doc = "Whether internal client use KeyStore type to authenticate with other Pulsar brokers" @@ -2479,7 +2500,8 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_PACKAGES_MANAGEMENT, doc = "The packages management service storage service provider" ) - private String packagesManagementStorageProvider = "org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorageProvider"; + private String packagesManagementStorageProvider = "org.apache.pulsar.packages.management.storage.bookkeeper" + + ".BookKeeperPackagesStorageProvider"; @FieldContext( category = CATEGORY_PACKAGES_MANAGEMENT, diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java index efc332968afb2..02fc76f60e129 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java @@ -20,7 +20,6 @@ import java.net.SocketAddress; import java.security.cert.Certificate; - import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; import lombok.extern.slf4j.Slf4j; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java index 958e5eab9c462..9d8ab42b467c9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java @@ -20,7 +20,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; - import javax.servlet.http.HttpServletRequest; public class AuthenticationDataHttp implements AuthenticationDataSource { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java index 4e1d33b5ec5e7..ec4dc292c13ce 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.authentication; import java.security.cert.X509Certificate; - import javax.servlet.http.HttpServletRequest; public class AuthenticationDataHttps extends AuthenticationDataHttp { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java index 59e7f6bdcf103..453fafbab3244 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java @@ -24,7 +24,7 @@ import org.apache.pulsar.common.api.AuthData; /** - * Interface for accessing data which are used in variety of authentication schemes on server side + * Interface for accessing data which are used in variety of authentication schemes on server side. */ public interface AuthenticationDataSource { /* @@ -63,7 +63,7 @@ default boolean hasDataFromHttp() { /** * - * @return a authentication scheme, or null if the request is not be authenticated + * @return a authentication scheme, or null if the request is not be authenticated. */ default String getHttpAuthType() { return null; @@ -141,16 +141,16 @@ default boolean hasSubscription() { } /** - * Subscription name can be necessary for consumption + * Subscription name can be necessary for consumption. * * @return a String containing the subscription name */ - default String getSubscription() { return null; } + default String getSubscription() { + return null; + } /** - * Subscription name can be necessary for consumption - * - * @return a String containing the subscription name + * Subscription name can be necessary for consumption. */ default void setSubscription(String subscription) { }; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java index c0716198a71af..6a25cc57db2c5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java @@ -20,27 +20,24 @@ import java.io.Closeable; import java.io.IOException; - import java.net.SocketAddress; import java.util.concurrent.CompletableFuture; import javax.naming.AuthenticationException; - import javax.net.ssl.SSLSession; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.common.util.FutureUtil; /** - * Provider of authentication mechanism + * Provider of authentication mechanism. */ public interface AuthenticationProvider extends Closeable { /** - * Perform initialization for the authentication provider + * Perform initialization for the authentication provider. * * @param config * broker config object diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java index 46c1e3a36de6d..631659c24b518 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java @@ -19,25 +19,23 @@ package org.apache.pulsar.broker.authentication; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; import java.util.Arrays; import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.naming.AuthenticationException; +import lombok.Cleanup; import org.apache.commons.codec.digest.Crypt; import org.apache.commons.codec.digest.Md5Crypt; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; - -import lombok.Cleanup; import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics; -import javax.naming.AuthenticationException; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; - public class AuthenticationProviderBasic implements AuthenticationProvider { private static final String HTTP_HEADER_NAME = "Authorization"; private static final String CONF_SYSTEM_PROPERTY_KEY = "pulsar.auth.basic.conf"; @@ -99,7 +97,8 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat throw new AuthenticationException(msg); } } catch (AuthenticationException exception) { - AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage()); + AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), + exception.getMessage()); throw exception; } AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName()); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java index 9ec1c2eb706cc..ecb2a078e52d6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java @@ -36,16 +36,16 @@ @Slf4j public class AuthenticationProviderList implements AuthenticationProvider { - private interface AuthProcessor { + private interface AuthProcessor { - T apply(P process) throws AuthenticationException; + T apply(W process) throws AuthenticationException; } - static T applyAuthProcessor(List

processors, AuthProcessor authFunc) + static T applyAuthProcessor(List processors, AuthProcessor authFunc) throws AuthenticationException { AuthenticationException authenticationException = null; - for (P ap : processors) { + for (W ap : processors) { try { return authFunc.apply(ap); } catch (AuthenticationException ae) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java index 5747a828d8fe1..4cfcad6a68695 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java @@ -21,9 +21,7 @@ import java.io.IOException; import java.security.cert.Certificate; import java.security.cert.X509Certificate; - import javax.naming.AuthenticationException; - import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics; @@ -50,8 +48,8 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat try { if (authData.hasDataFromTls()) { /** - * Maybe authentication type should be checked if it is an HTTPS session. However this check fails actually - * because authType is null. + * Maybe authentication type should be checked if it is an HTTPS session. However this check fails + * actually because authType is null. * * This check is not necessarily needed, because an untrusted certificate is not passed to * HttpServletRequest. @@ -91,7 +89,8 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat } AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName()); } catch (AuthenticationException exception) { - AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage()); + AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), + exception.getMessage()); throw exception; } return commonName; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java index 3801aecb5fa6a..451c63fb807da 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java @@ -19,35 +19,31 @@ package org.apache.pulsar.broker.authentication; import static java.nio.charset.StandardCharsets.UTF_8; - +import com.google.common.annotations.VisibleForTesting; +import io.jsonwebtoken.Claims; +import io.jsonwebtoken.ExpiredJwtException; +import io.jsonwebtoken.Jwt; +import io.jsonwebtoken.JwtException; +import io.jsonwebtoken.JwtParser; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.RequiredTypeException; +import io.jsonwebtoken.SignatureAlgorithm; +import io.jsonwebtoken.security.SignatureException; +import io.prometheus.client.Counter; +import io.prometheus.client.Histogram; import java.io.IOException; import java.net.SocketAddress; import java.security.Key; - import java.util.Date; import java.util.List; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; - -import com.google.common.annotations.VisibleForTesting; -import io.jsonwebtoken.ExpiredJwtException; -import io.jsonwebtoken.RequiredTypeException; -import io.jsonwebtoken.JwtParser; -import io.prometheus.client.Counter; -import io.prometheus.client.Histogram; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.common.api.AuthData; -import io.jsonwebtoken.Claims; -import io.jsonwebtoken.Jwt; -import io.jsonwebtoken.JwtException; -import io.jsonwebtoken.Jwts; -import io.jsonwebtoken.SignatureAlgorithm; -import io.jsonwebtoken.security.SignatureException; - public class AuthenticationProviderToken implements AuthenticationProvider { static final String HTTP_HEADER_NAME = "Authorization"; @@ -135,9 +131,9 @@ public void initialize(ServiceConfiguration config) throws IOException, IllegalA this.parser = Jwts.parserBuilder().setSigningKey(this.validationKey).build(); - if (audienceClaim != null && audience == null ) { + if (audienceClaim != null && audience == null) { throw new IllegalArgumentException("Token Audience Claim [" + audienceClaim - + "] configured, but Audience stands for this broker not."); + + "] configured, but Audience stands for this broker not."); } } @@ -157,7 +153,8 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName()); return role; } catch (AuthenticationException exception) { - AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage()); + AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), + exception.getMessage()); throw exception; } } @@ -226,7 +223,8 @@ private Jwt authenticateToken(final String token) throws Authenticati } if (jwt.getBody().getExpiration() != null) { - expiringTokenMinutesMetrics.observe((double) (jwt.getBody().getExpiration().getTime() - new Date().getTime()) / (60 * 1000)); + expiringTokenMinutesMetrics.observe( + (double) (jwt.getBody().getExpiration().getTime() - new Date().getTime()) / (60 * 1000)); } return jwt; } catch (JwtException e) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java index 620dee3fb159a..d3668204a7869 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java @@ -28,7 +28,6 @@ import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.servlet.http.HttpServletRequest; - import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; @@ -36,7 +35,7 @@ import org.slf4j.LoggerFactory; /** - * Authentication service + * Authentication service. * */ public class AuthenticationService implements Closeable { @@ -100,7 +99,8 @@ public String authenticateHttpRequest(HttpServletRequest request) throws Authent return providerToUse.authenticate(authData); } catch (AuthenticationException e) { if (LOG.isDebugEnabled()) { - LOG.debug("Authentication failed for provider " + providerToUse.getAuthMethodName() + ": " + e.getMessage(), e); + LOG.debug("Authentication failed for provider " + providerToUse.getAuthMethodName() + " : " + + e.getMessage(), e); } // Store the exception so we can throw it later instead of a generic one authenticationException = e; @@ -112,7 +112,8 @@ public String authenticateHttpRequest(HttpServletRequest request) throws Authent return provider.authenticate(authData); } catch (AuthenticationException e) { if (LOG.isDebugEnabled()) { - LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": " + e.getMessage(), e); + LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": " + + e.getMessage(), e); } // Ignore the exception because we don't know which authentication method is expected here. } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java index d6755a10f193e..0e5dcc3121f27 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java @@ -19,13 +19,11 @@ package org.apache.pulsar.broker.authentication; +import java.util.concurrent.CompletableFuture; import javax.naming.AuthenticationException; - import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.util.FutureUtil; -import java.util.concurrent.CompletableFuture; - /** * Interface for authentication state. * @@ -89,7 +87,7 @@ default CompletableFuture authenticateAsync(AuthData authData) { boolean isComplete(); /** - * Get AuthenticationState ID + * Get AuthenticationState ID. */ default long getStateId() { return -1L; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java index f2667c3b46c52..f2cb251eaf82b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java @@ -19,13 +19,12 @@ package org.apache.pulsar.broker.authentication; +import static java.nio.charset.StandardCharsets.UTF_8; import java.net.SocketAddress; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; import org.apache.pulsar.common.api.AuthData; -import static java.nio.charset.StandardCharsets.UTF_8; - /** * Interface for authentication state. * diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java index 1fd803f6ae566..60565db521d89 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java @@ -33,7 +33,7 @@ public class AuthenticationMetrics { .register(); /** - * Log authenticate success event to the authentication metrics + * Log authenticate success event to the authentication metrics. * @param providerName The short class name of the provider * @param authMethod Authentication method name */ diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/package-info.java new file mode 100644 index 0000000000000..068c693b9c395 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.broker.authentication.metrics; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/package-info.java new file mode 100644 index 0000000000000..e4432e5951756 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.broker.authentication; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/AuthTokenUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/AuthTokenUtils.java index 376e322eeff1f..8e1fd6696cf48 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/AuthTokenUtils.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/AuthTokenUtils.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.authentication.utils; import com.google.common.io.ByteStreams; - import io.jsonwebtoken.JwtBuilder; import io.jsonwebtoken.Jwts; import io.jsonwebtoken.SignatureAlgorithm; @@ -27,7 +26,6 @@ import io.jsonwebtoken.io.DecodingException; import io.jsonwebtoken.io.Encoders; import io.jsonwebtoken.security.Keys; - import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; @@ -40,11 +38,8 @@ import java.security.spec.X509EncodedKeySpec; import java.util.Date; import java.util.Optional; - import javax.crypto.SecretKey; - import lombok.experimental.UtilityClass; - import org.apache.commons.codec.binary.Base64; import org.apache.pulsar.client.api.url.URL; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/package-info.java new file mode 100644 index 0000000000000..5492a2dea9d6c --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.broker.authentication.utils; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java index d4da9bf2187b2..bc0d4b7f20be2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java @@ -23,31 +23,30 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; - import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.resources.PulsarResources; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RestException; /** - * Provider of authorization mechanism + * Provider of authorization mechanism. */ public interface AuthorizationProvider extends Closeable { /** - * Check if specified role is a super user + * Check if specified role is a super user. * @param role the role to check * @param authenticationData authentication data related to the role * @return a CompletableFuture containing a boolean in which true means the role is a super user @@ -73,7 +72,7 @@ default CompletableFuture isSuperUser(String role, ServiceConfiguration } /** - * Check if specified role is an admin of the tenant + * Check if specified role is an admin of the tenant. * @param tenant the tenant to check * @param role the role to check * @return a CompletableFuture containing a boolean in which true means the role is an admin user @@ -81,11 +80,12 @@ default CompletableFuture isSuperUser(String role, ServiceConfiguration */ default CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) { - return CompletableFuture.completedFuture(role != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(role)); + return CompletableFuture.completedFuture(role != null && tenantInfo.getAdminRoles() != null + && tenantInfo.getAdminRoles().contains(role)); } /** - * Perform initialization for the authorization provider + * Perform initialization for the authorization provider. * * @param conf * broker config object @@ -102,7 +102,7 @@ default void initialize(ServiceConfiguration conf, ConfigurationCacheService con } /** - * Perform initialization for the authorization provider + * Perform initialization for the authorization provider. * * @param conf * broker config object @@ -154,7 +154,7 @@ CompletableFuture canLookupAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData); /** - * Allow all function operations with in this namespace + * Allow all function operations with in this namespace. * @param namespaceName The namespace that the function operations can be executed in * @param role The role to check * @param authenticationData authentication data related to the role @@ -164,7 +164,7 @@ CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, St AuthenticationDataSource authenticationData); /** - * Allow all source operations with in this namespace + * Allow all source operations with in this namespace. * @param namespaceName The namespace that the sources operations can be executed in * @param role The role to check * @param authenticationData authentication data related to the role @@ -174,7 +174,7 @@ CompletableFuture allowSourceOpsAsync(NamespaceName namespaceName, Stri AuthenticationDataSource authenticationData); /** - * Allow all sink operations with in this namespace + * Allow all sink operations with in this namespace. * @param namespaceName The namespace that the sink operations can be executed in * @param role The role to check * @param authenticationData authentication data related to the role @@ -185,7 +185,7 @@ CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, String /** * - * Grant authorization-action permission on a namespace to the given client + * Grant authorization-action permission on a namespace to the given client. * * @param namespace * @param actions @@ -201,7 +201,7 @@ CompletableFuture grantPermissionAsync(NamespaceName namespace, Set grantPermissionAsync(NamespaceName namespace, Set grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set roles, - String authDataJson); + CompletableFuture grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, + Set roles, String authDataJson); /** - * Revoke subscription admin-api access for a role + * Revoke subscription admin-api access for a role. * @param namespace * @param subscriptionName * @param role @@ -224,7 +224,7 @@ CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName namespac String role, String authDataJson); /** - * Grant authorization-action permission on a topic to the given client + * Grant authorization-action permission on a topic to the given client. * * @param topicName * @param role @@ -239,7 +239,7 @@ CompletableFuture grantPermissionAsync(TopicName topicName, Set allowTenantOperationAsync(String tenantName, TenantOperation operation, AuthenticationDataSource authData) { return FutureUtil.failedFuture(new IllegalStateException( - String.format("allowTenantOperation(%s) on tenant %s is not supported by the Authorization" + - " provider you are using.", + String.format("allowTenantOperation(%s) on tenant %s is not supported by the Authorization" + + " provider you are using.", operation.toString(), tenantName))); } @@ -332,7 +332,7 @@ default Boolean allowNamespaceOperation(NamespaceName namespaceName, } /** - * Grant authorization-action permission on a namespace to the given client + * Grant authorization-action permission on a namespace to the given client. * * @param namespaceName * @param role @@ -404,7 +404,7 @@ default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, } /** - * Grant authorization-action permission on a namespace to the given client + * Grant authorization-action permission on a namespace to the given client. * @param namespaceName * @param originalRole role not overriden by proxy role if request do pass through proxy * @param role originalRole | proxyRole if the request didn't pass through proxy @@ -477,7 +477,7 @@ default Boolean allowTopicOperation(TopicName topicName, } /** - * Grant authorization-action permission on a topic to the given client + * Grant authorization-action permission on a topic to the given client. * @param topic * @param originalRole role not overriden by proxy role if request do pass through proxy * @param role originalRole | proxyRole if the request didn't pass through proxy diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 09a8e2948645d..d630a96f1a8db 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -18,6 +18,11 @@ */ package org.apache.pulsar.broker.authorization; +import static java.util.concurrent.TimeUnit.SECONDS; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; @@ -37,13 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.core.Response; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -import static java.util.concurrent.TimeUnit.SECONDS; - /** * Authorization service that manages pluggable authorization provider and authorize requests accordingly. * @@ -91,7 +89,7 @@ public CompletableFuture isTenantAdmin(String tenant, String role, Tena /** * - * Grant authorization-action permission on a namespace to the given client + * Grant authorization-action permission on a namespace to the given client. * * @param namespace * @param actions @@ -114,7 +112,7 @@ public CompletableFuture grantPermissionAsync(NamespaceName namespace, Set } /** - * Grant permission to roles that can access subscription-admin api + * Grant permission to roles that can access subscription-admin api. * * @param namespace * @param subscriptionName @@ -133,7 +131,7 @@ public CompletableFuture grantSubscriptionPermissionAsync(NamespaceName na } /** - * Revoke subscription admin-api access for a role + * Revoke subscription admin-api access for a role. * * @param namespace * @param subscriptionName @@ -149,7 +147,7 @@ public CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName n } /** - * Grant authorization-action permission on a topic to the given client + * Grant authorization-action permission on a topic to the given client. * * @param topicname * @param role @@ -206,7 +204,8 @@ public CompletableFuture canProduceAsync(TopicName topicName, String ro * the subscription name defined by the client */ public CompletableFuture canConsumeAsync(TopicName topicName, String role, - AuthenticationDataSource authenticationData, String subscription) { + AuthenticationDataSource authenticationData, + String subscription) { if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } @@ -328,7 +327,8 @@ private static void validateOriginalPrincipal(Set proxyRoles, String aut // Request has come from a proxy if (StringUtils.isBlank(originalPrincipal)) { log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal); - throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy."); + throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the " + + "request is via proxy."); } if (proxyRoles.contains(originalPrincipal)) { log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles); @@ -342,7 +342,7 @@ private boolean isProxyRole(String role) { } /** - * Grant authorization-action permission on a tenant to the given client + * Grant authorization-action permission on a tenant to the given client. * * @param tenantName tenant name * @param operation tenant operation @@ -365,8 +365,8 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, return provider.allowTenantOperationAsync(tenantName, role, operation, authData); } - return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + - "allowTenantOperationAsync")); + return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + + "allowTenantOperationAsync")); } public CompletableFuture allowTenantOperationAsync(String tenantName, @@ -403,7 +403,7 @@ public boolean allowTenantOperation(String tenantName, } /** - * Grant authorization-action permission on a namespace to the given client + * Grant authorization-action permission on a namespace to the given client. * * @param namespaceName * @param operation @@ -426,8 +426,8 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam return provider.allowNamespaceOperationAsync(namespaceName, role, operation, authData); } - return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + - "allowNamespaceOperationAsync")); + return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + + "allowNamespaceOperationAsync")); } public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, @@ -464,7 +464,7 @@ public boolean allowNamespaceOperation(NamespaceName namespaceName, } /** - * Grant authorization-action permission on a namespace to the given client + * Grant authorization-action permission on a namespace to the given client. * * @param namespaceName * @param operation @@ -488,8 +488,8 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData); } - return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + - "allowNamespacePolicyOperationAsync")); + return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + + "allowNamespacePolicyOperationAsync")); } public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, @@ -528,7 +528,7 @@ public boolean allowNamespacePolicyOperation(NamespaceName namespaceName, } /** - * Grant authorization-action permission on a topic to the given client + * Grant authorization-action permission on a topic to the given client. * * @param topicName * @param policy @@ -550,8 +550,8 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic return provider.allowTopicPolicyOperationAsync(topicName, role, policy, operation, authData); } - return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + - "allowTopicPolicyOperationAsync")); + return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + + "allowTopicPolicyOperationAsync")); } public CompletableFuture allowTopicPolicyOperationAsync(TopicName topicName, @@ -592,7 +592,7 @@ public Boolean allowTopicPolicyOperation(TopicName topicName, } /** - * Grant authorization-action permission on a topic to the given client + * Grant authorization-action permission on a topic to the given client. * * @param topicName * @param operation @@ -639,8 +639,8 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, } } - return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + - "allowTopicOperationAsync")); + return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + + "allowTopicOperationAsync")); } public CompletableFuture allowTopicOperationAsync(TopicName topicName, diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java index e83c733273cb5..3c5d3a25e15f1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java @@ -23,6 +23,13 @@ import io.jsonwebtoken.JwtParser; import io.jsonwebtoken.Jwts; import io.jsonwebtoken.RequiredTypeException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -38,13 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.function.Function; public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationProvider { private static final Logger log = LoggerFactory.getLogger(MultiRolesTokenAuthorizationProvider.class); @@ -102,8 +102,9 @@ private List getRoles(AuthenticationDataSource authData) { token = httpHeaderValue.substring(HTTP_HEADER_VALUE_PREFIX.length()); } - if (token == null) + if (token == null) { return Collections.emptyList(); + } String[] splitToken = token.split("\\."); String unsignedToken = splitToken[0] + "." + splitToken[1] + "."; @@ -125,7 +126,8 @@ private List getRoles(AuthenticationDataSource authData) { return Collections.emptyList(); } - public CompletableFuture authorize(AuthenticationDataSource authenticationData, Function> authorizeFunc) { + public CompletableFuture authorize(AuthenticationDataSource authenticationData, Function> authorizeFunc) { List roles = getRoles(authenticationData); if (roles.isEmpty()) { return CompletableFuture.completedFuture(false); @@ -138,11 +140,15 @@ public CompletableFuture authorize(AuthenticationDataSource authenticat List> doneFutures = new ArrayList<>(); FutureUtil.waitForAny(futures).get(); for (CompletableFuture future : futures) { - if (!future.isDone()) continue; + if (!future.isDone()) { + continue; + } doneFutures.add(future); if (future.get()) { futures.forEach(f -> { - if (!f.isDone()) f.cancel(false); + if (!f.isDone()) { + f.cancel(false); + } }); return true; } @@ -177,8 +183,10 @@ public CompletableFuture canProduceAsync(TopicName topicName, String ro */ @Override public CompletableFuture canConsumeAsync(TopicName topicName, String role, - AuthenticationDataSource authenticationData, String subscription) { - return authorize(authenticationData, r -> super.canConsumeAsync(topicName, r, authenticationData, subscription)); + AuthenticationDataSource authenticationData, + String subscription) { + return authorize(authenticationData, r -> super.canConsumeAsync(topicName, r, authenticationData, + subscription)); } /** @@ -198,17 +206,20 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol } @Override - public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) { + public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { return authorize(authenticationData, r -> super.allowFunctionOpsAsync(namespaceName, r, authenticationData)); } @Override - public CompletableFuture allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) { + public CompletableFuture allowSourceOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { return authorize(authenticationData, r -> super.allowSourceOpsAsync(namespaceName, r, authenticationData)); } @Override - public CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) { + public CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { return authorize(authenticationData, r -> super.allowSinkOpsAsync(namespaceName, r, authenticationData)); } @@ -234,7 +245,8 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa PolicyOperation operation, String role, AuthenticationDataSource authData) { - return authorize(authData, r -> super.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, r, authData)); + return authorize(authData, r -> super.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, r, + authData)); } @Override @@ -251,6 +263,7 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic PolicyName policyName, PolicyOperation policyOperation, AuthenticationDataSource authData) { - return authorize(authData, r -> super.allowTopicPolicyOperationAsync(topicName, r, policyName, policyOperation, authData)); + return authorize(authData, r -> super.allowTopicPolicyOperationAsync(topicName, r, policyName, policyOperation, + authData)); } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index c0dcb32d96836..c070c4fa482bd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -20,21 +20,20 @@ import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isNotBlank; - +import com.google.common.base.Joiner; import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import javax.ws.rs.core.Response; -import com.google.common.base.Joiner; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.resources.PulsarResources; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PolicyName; @@ -74,7 +73,7 @@ public void initialize(ServiceConfiguration conf, PulsarResources pulsarResource requireNonNull(pulsarResources, "PulsarResources can't be null"); this.conf = conf; this.pulsarResources = pulsarResources; - + // For compatibility, call the old deprecated initialize initialize(conf, (ConfigurationCacheService) null); } @@ -132,8 +131,8 @@ public CompletableFuture canConsumeAsync(TopicName topicName, String ro case Prefix: if (!subscription.startsWith(role)) { PulsarServerException ex = new PulsarServerException(String.format( - "Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for topic: %s", - role, topicName)); + "Failed to create consumer - The subscription name needs to be prefixed by the " + + "authentication role, like %s-xxxx for topic: %s", role, topicName)); permissionFuture.completeExceptionally(ex); return; } @@ -193,7 +192,8 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol topicName.toString(), role, ex.getMessage()); } } - canConsumeAsync(topicName, role, authenticationData, null).whenComplete((consumeAuthorized, e) -> { + canConsumeAsync(topicName, role, authenticationData, null).whenComplete((consumeAuthorized, e) + -> { if (e == null) { finalResult.complete(consumeAuthorized); } else { @@ -211,17 +211,20 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol } @Override - public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) { + public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.functions); } @Override - public CompletableFuture allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) { + public CompletableFuture allowSourceOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.sources); } @Override - public CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) { + public CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.sinks); } @@ -650,7 +653,8 @@ public CompletableFuture validateTenantAdminAccess(String tenantName, try { TenantInfo tenantInfo = pulsarResources.getTenantResources() .getTenant(tenantName) - .orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist")); + .orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, + "Tenant does not exist")); return isTenantAdmin(tenantName, role, tenantInfo, authData); } catch (NotFoundException e) { log.warn("Failed to get tenant info data for non existing tenant {}", tenantName); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/package-info.java new file mode 100644 index 0000000000000..26c9419d8e36e --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.broker.authorization; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/package-info.java new file mode 100644 index 0000000000000..83e11fdf2c61b --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.broker.cache; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/package-info.java new file mode 100644 index 0000000000000..cf095a74f8d9b --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.broker; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BrokerResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BrokerResources.java index 5d6959dcc66e1..9ba2f6ef51b88 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BrokerResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BrokerResources.java @@ -20,7 +20,6 @@ import java.util.HashSet; import java.util.Set; - import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java index 32586d246926e..4429afe59395b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java @@ -146,8 +146,8 @@ public void setFailureDomainWithCreate(String clusterName, String domainName, public void registerListener(Consumer listener) { getStore().registerListener(n -> { // Prefilter the notification just for failure domains - if (n.getPath().startsWith(BASE_CLUSTERS_PATH) && - n.getPath().contains("/" + FAILURE_DOMAIN)) { + if (n.getPath().startsWith(BASE_CLUSTERS_PATH) + && n.getPath().contains("/" + FAILURE_DOMAIN)) { listener.accept(n); } }); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/DynamicConfigurationResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/DynamicConfigurationResources.java index d918dc8f2c5d3..84d7a49174e92 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/DynamicConfigurationResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/DynamicConfigurationResources.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.resources; import com.fasterxml.jackson.core.type.TypeReference; -import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java index cf3936160e5e9..f83d2bed19964 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java @@ -52,7 +52,8 @@ public CompletableFuture> getLocalPoliciesAsync(Namespac return getCache().get(joinPath(LOCAL_POLICIES_ROOT, ns.toString())); } - public void setLocalPoliciesWithCreate(NamespaceName ns, Function, LocalPolicies> createFunction) throws MetadataStoreException { + public void setLocalPoliciesWithCreate(NamespaceName ns, Function, + LocalPolicies> createFunction) throws MetadataStoreException { setWithCreate(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), createFunction); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java index 862d3c1712daa..eada0db382dcb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java @@ -25,7 +25,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.common.util.FutureUtil; @@ -57,15 +56,13 @@ public MetadataStoreCacheLoader(PulsarResources pulsarResources, int operationTi } /** - * Initialize ZooKeeper session and creates broker cache list + * Initialize ZooKeeper session and creates broker cache list. * - * @param zookeeperServers * @throws Exception */ public void init() throws Exception { - loadReportResources.getStore().registerListener((n) -> { - if(LOADBALANCE_BROKERS_ROOT.equals(n.getPath()) && NotificationType.ChildrenChanged.equals(n.getType())) { + if (LOADBALANCE_BROKERS_ROOT.equals(n.getPath()) && NotificationType.ChildrenChanged.equals(n.getType())) { loadReportResources.getChildrenAsync(LOADBALANCE_BROKERS_ROOT).thenApplyAsync((brokerNodes)->{ updateBrokerList(brokerNodes).thenRun(() -> { log.info("Successfully updated broker info {}", brokerNodes); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index beaf78e471f2d..1cbcfaa487779 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -82,8 +82,7 @@ public void createPolicies(NamespaceName ns, Policies policies) throws MetadataS public boolean namespaceExists(NamespaceName ns) throws MetadataStoreException { String path = joinPath(BASE_POLICIES_PATH, ns.toString()); - return super.exists(path) && - super.getChildren(path).isEmpty(); + return super.exists(path) && super.getChildren(path).isEmpty(); } public CompletableFuture namespaceExistsAsync(NamespaceName ns) { @@ -193,7 +192,8 @@ public PartitionedTopicResources(MetadataStore configurationStore, int operation super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec); } - public CompletableFuture updatePartitionedTopicAsync(TopicName tn, Function f) { + public CompletableFuture updatePartitionedTopicAsync(TopicName tn, Function f) { return setAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), tn.getEncodedLocalName()), f); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java index 7a8e00a289ec3..e206d5ad542b3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java @@ -19,14 +19,12 @@ package org.apache.pulsar.broker.resources; import java.util.Optional; - +import lombok.Getter; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; -import lombok.Getter; - public class PulsarResources { public static final int DEFAULT_OPERATION_TIMEOUT_SEC = 30; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java index d3194c29e6e58..414bf4ffcfc35 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java @@ -22,7 +22,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; - import org.apache.pulsar.common.policies.data.ResourceGroup; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/package-info.java new file mode 100644 index 0000000000000..94cd53aed5797 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.broker.resources; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java index c845092b95afd..00313c6048cc9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java @@ -19,12 +19,12 @@ package org.apache.pulsar.broker.validator; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.ArrayList; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; @@ -75,19 +75,23 @@ private static List migrateBindAddresses(ServiceConfiguration confi List addresses = new ArrayList<>(2); if (config.getBrokerServicePort().isPresent()) { addresses.add(new BindAddress(null, URI.create( - ServiceConfigurationUtils.brokerUrl(config.getBindAddress(), config.getBrokerServicePort().get())))); + ServiceConfigurationUtils.brokerUrl(config.getBindAddress(), + config.getBrokerServicePort().get())))); } if (config.getBrokerServicePortTls().isPresent()) { addresses.add(new BindAddress(null, URI.create( - ServiceConfigurationUtils.brokerUrlTls(config.getBindAddress(), config.getBrokerServicePortTls().get())))); + ServiceConfigurationUtils.brokerUrlTls(config.getBindAddress(), + config.getBrokerServicePortTls().get())))); } if (config.getWebServicePort().isPresent()) { addresses.add(new BindAddress(null, URI.create( - ServiceConfigurationUtils.webServiceUrl(config.getBindAddress(), config.getWebServicePort().get())))); + ServiceConfigurationUtils.webServiceUrl(config.getBindAddress(), + config.getWebServicePort().get())))); } if (config.getWebServicePortTls().isPresent()) { addresses.add(new BindAddress(null, URI.create( - ServiceConfigurationUtils.webServiceUrlTls(config.getBindAddress(), config.getWebServicePortTls().get())))); + ServiceConfigurationUtils.webServiceUrlTls(config.getBindAddress(), + config.getWebServicePortTls().get())))); } return addresses; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java index d7e78ca6cfd3c..cb9eaf76e9fea 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java @@ -18,10 +18,6 @@ */ package org.apache.pulsar.broker.validator; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; - import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -31,6 +27,9 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; /** * Validates multiple listener address configurations. @@ -56,8 +55,8 @@ public static Map validateAndAnalysisAdvertisedListe for (final String str : StringUtils.split(config.getAdvertisedListeners(), ",")) { int index = str.indexOf(":"); if (index <= 0) { - throw new IllegalArgumentException("the configure entry `advertisedListeners` is invalid. because " + - str + " do not contain listener name"); + throw new IllegalArgumentException("the configure entry `advertisedListeners` is invalid. because " + + str + " do not contain listener name"); } String listenerName = StringUtils.trim(str.substring(0, index)); if (!firstListenerName.isPresent()) { @@ -71,13 +70,15 @@ public static Map validateAndAnalysisAdvertisedListe config.setInternalListenerName(firstListenerName.get()); } if (!listeners.containsKey(config.getInternalListenerName())) { - throw new IllegalArgumentException("the `advertisedListeners` configure do not contain `internalListenerName` entry"); + throw new IllegalArgumentException("the `advertisedListeners` configure do not contain " + + "`internalListenerName` entry"); } final Map result = new LinkedHashMap<>(); final Map> reverseMappings = new LinkedHashMap<>(); for (final Map.Entry> entry : listeners.entrySet()) { if (entry.getValue().size() > 2) { - throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`"); + throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + + "`"); } URI pulsarAddress = null, pulsarSslAddress = null; for (final String strUri : entry.getValue()) { @@ -87,26 +88,31 @@ public static Map validateAndAnalysisAdvertisedListe if (pulsarAddress == null) { pulsarAddress = uri; } else { - throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`"); + throw new IllegalArgumentException("there are redundant configure for listener `" + + entry.getKey() + "`"); } } else if (StringUtils.equalsIgnoreCase(uri.getScheme(), "pulsar+ssl")) { if (pulsarSslAddress == null) { pulsarSslAddress = uri; } else { - throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`"); + throw new IllegalArgumentException("there are redundant configure for listener `" + + entry.getKey() + "`"); } } String hostPort = String.format("%s:%d", uri.getHost(), uri.getPort()); Set sets = reverseMappings.computeIfAbsent(hostPort, k -> new TreeSet<>()); sets.add(entry.getKey()); if (sets.size() > 1) { - throw new IllegalArgumentException("must not specify `" + hostPort + "` to different listener."); + throw new IllegalArgumentException("must not specify `" + hostPort + + "` to different listener."); } } catch (Throwable cause) { - throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` configure is invalid"); + throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` " + + "configure is invalid"); } } - result.put(entry.getKey(), AdvertisedListener.builder().brokerServiceUrl(pulsarAddress).brokerServiceUrlTls(pulsarSslAddress).build()); + result.put(entry.getKey(), AdvertisedListener.builder().brokerServiceUrl(pulsarAddress) + .brokerServiceUrlTls(pulsarSslAddress).build()); } return result; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/package-info.java new file mode 100644 index 0000000000000..09ccc74260cef --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.broker.validator; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java index 5d3cae49394d5..eacefbe49b59a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java @@ -19,9 +19,7 @@ package org.apache.pulsar.broker.web; import java.io.IOException; - import javax.naming.AuthenticationException; - import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -30,7 +28,6 @@ import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.common.sasl.SaslConstants; @@ -38,7 +35,7 @@ import org.slf4j.LoggerFactory; /** - * Servlet filter that hooks up with AuthenticationService to reject unauthenticated HTTP requests + * Servlet filter that hooks up with AuthenticationService to reject unauthenticated HTTP requests. */ public class AuthenticationFilter implements Filter { private static final Logger LOG = LoggerFactory.getLogger(AuthenticationFilter.class); @@ -53,8 +50,8 @@ public AuthenticationFilter(AuthenticationService authenticationService) { } private boolean isSaslRequest(HttpServletRequest request) { - if (request.getHeader(SaslConstants.SASL_HEADER_TYPE) == null || - request.getHeader(SaslConstants.SASL_HEADER_TYPE).isEmpty()) { + if (request.getHeader(SaslConstants.SASL_HEADER_TYPE) == null + || request.getHeader(SaslConstants.SASL_HEADER_TYPE).isEmpty()) { return false; } if (request.getHeader(SaslConstants.SASL_HEADER_TYPE) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/DisableDebugHttpMethodFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/DisableDebugHttpMethodFilter.java index 79ba9b7a44ea4..2bd68a7be0856 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/DisableDebugHttpMethodFilter.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/DisableDebugHttpMethodFilter.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.web; import java.io.IOException; - import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -28,13 +27,11 @@ import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import lombok.extern.slf4j.Slf4j; - import org.apache.pulsar.broker.ServiceConfiguration; /** - * Servlet filter that rejects HTTP requests using TRACE/TRACK methods + * Servlet filter that rejects HTTP requests using TRACE/TRACK methods. */ @Slf4j public class DisableDebugHttpMethodFilter implements Filter { @@ -51,8 +48,8 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha HttpServletRequest httpRequest = (HttpServletRequest) request; HttpServletResponse httpResponse = (HttpServletResponse) response; - if (this.serviceConfiguration.isDisableHttpDebugMethods() ) { - if("TRACE".equalsIgnoreCase(httpRequest.getMethod())) { + if (this.serviceConfiguration.isDisableHttpDebugMethods()) { + if ("TRACE".equalsIgnoreCase(httpRequest.getMethod())) { // TRACE is not allowed httpResponse.setStatus(HttpServletResponse.SC_METHOD_NOT_ALLOWED); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JsonMapperProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JsonMapperProvider.java index cb4ef7fff4b51..f9f767c622f2b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JsonMapperProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JsonMapperProvider.java @@ -19,10 +19,8 @@ package org.apache.pulsar.broker.web; import com.fasterxml.jackson.databind.ObjectMapper; - import javax.ws.rs.ext.ContextResolver; import javax.ws.rs.ext.Provider; - import org.apache.pulsar.common.util.ObjectMapperFactory; @Provider diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java index f65987ec4801a..b2c2b84bbdf79 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java @@ -20,11 +20,8 @@ package org.apache.pulsar.broker.web; import com.google.common.util.concurrent.RateLimiter; - import io.prometheus.client.Counter; - import java.io.IOException; - import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java index 87437e510dc4b..2a8c9e8e15c4e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java @@ -19,9 +19,7 @@ package org.apache.pulsar.broker.web; import io.netty.util.concurrent.DefaultThreadFactory; - import java.util.concurrent.ThreadFactory; - import org.eclipse.jetty.util.thread.ExecutorThreadPool; public class WebExecutorThreadPool extends ExecutorThreadPool { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/package-info.java new file mode 100644 index 0000000000000..5f61532b031b6 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.broker.web; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlet.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlet.java index 07b8e6ab4bfae..21a00107c1bce 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlet.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlet.java @@ -29,21 +29,21 @@ public interface AdditionalServlet extends AutoCloseable { /** - * load plugin config + * load plugin config. * * @param pulsarConfiguration */ void loadConfig(PulsarConfiguration pulsarConfiguration); /** - * Get the base path of prometheus metrics + * Get the base path of prometheus metrics. * * @return the base path of prometheus metrics */ String getBasePath(); /** - * Get the servlet holder + * Get the servlet holder. * * @return the servlet holder */ diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletMetadata.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletMetadata.java index f689cc0328b63..4138d4ea8bab0 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletMetadata.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletMetadata.java @@ -23,7 +23,7 @@ import lombok.NoArgsConstructor; /** - * The metadata of additional servlet + * The metadata of additional servlet. */ @Data @NoArgsConstructor diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletUtils.java index eb99c9072af83..e22dcb0815484 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletUtils.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletUtils.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.web.plugin.servlet; +import static com.google.common.base.Preconditions.checkArgument; import java.io.File; import java.io.IOException; import java.nio.file.DirectoryStream; @@ -31,8 +32,6 @@ import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.util.ObjectMapperFactory; -import static com.google.common.base.Preconditions.checkArgument; - /** * Util class to search and load {@link AdditionalServlets}. */ @@ -40,7 +39,7 @@ @Slf4j public class AdditionalServletUtils { - public final String ADDITIONAL_SERVLET_FILE = "additional_servlet.yml"; + public static final String ADDITIONAL_SERVLET_FILE = "additional_servlet.yml"; /** * Retrieve the additional servlet definition from the provided nar package. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java index 7f1af734431ef..9ebbf8cf260dc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.web.plugin.servlet; import java.io.IOException; - import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java index 080e1c7509643..8a7bce8e332a4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java @@ -19,10 +19,8 @@ package org.apache.pulsar.broker.web.plugin.servlet; import com.google.common.collect.ImmutableMap; - import java.io.IOException; import java.util.Map; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.configuration.PulsarConfiguration; @@ -72,7 +70,7 @@ public static AdditionalServlets load(PulsarConfiguration conf) throws IOExcepti } String narExtractionDirectory = conf.getProperties().getProperty(NAR_EXTRACTION_DIRECTORY); - if(narExtractionDirectory == null) { + if (narExtractionDirectory == null) { narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/FieldContext.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/FieldContext.java index 17a3c49fd510d..3c0ffb36ebfd5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/FieldContext.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/FieldContext.java @@ -25,8 +25,6 @@ /** * Stores field context to validate based on requirement or value constraints. - * - * */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) @@ -34,35 +32,35 @@ /** * checks field value is required. By default field is mandatory false. - * + * * @return true if attribute is required else returns false */ boolean required() default false; /** - * binds numeric value's lower bound - * + * binds numeric value's lower bound. + * * @return minimum value of the field */ long minValue() default Long.MIN_VALUE; /** - * binds numeric value's upper bound - * + * binds numeric value's upper bound. + * * @return maximum value of the field */ long maxValue() default Long.MAX_VALUE; /** - * binds character length of text - * + * binds character length of text. + * * @return character length of field */ int maxCharLength() default Integer.MAX_VALUE; - + /** - * allow field to be updated dynamically - * + * allow field to be updated dynamically. + * * @return */ boolean dynamic() default false; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfiguration.java index 2bef92cada479..7dbbffadcfebd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfiguration.java @@ -22,7 +22,7 @@ public interface PulsarConfiguration { - public Properties getProperties(); + Properties getProperties(); - public void setProperties(Properties properties); + void setProperties(Properties properties); } \ No newline at end of file diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java index 05feb58f7e742..117681c87fe3c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java @@ -20,7 +20,6 @@ import static java.util.Objects.requireNonNull; import static org.apache.pulsar.common.util.FieldParser.update; - import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -30,14 +29,13 @@ import java.util.Arrays; import java.util.Map; import java.util.Properties; - import org.apache.commons.lang.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Loads ServiceConfiguration with properties + * Loads ServiceConfiguration with properties. * * */ @@ -174,10 +172,12 @@ private static boolean isEmpty(Object obj) { * @param ignoreNonExistMember * @return * @throws IllegalArgumentException - * if conf has the field whose name is not contained in ServiceConfiguration and ignoreNonExistMember is false. + * if conf has the field whose name is not contained in ServiceConfiguration and ignoreNonExistMember + * is false. * @throws RuntimeException */ - public static ServiceConfiguration convertFrom(PulsarConfiguration conf, boolean ignoreNonExistMember) throws RuntimeException { + public static ServiceConfiguration convertFrom(PulsarConfiguration conf, boolean ignoreNonExistMember) + throws RuntimeException { try { final ServiceConfiguration convertedConf = ServiceConfiguration.class .getDeclaredConstructor().newInstance(); @@ -192,7 +192,8 @@ public static ServiceConfiguration convertFrom(PulsarConfiguration conf, boolean } } catch (NoSuchFieldException e) { if (!ignoreNonExistMember) { - throw new IllegalArgumentException("Exception caused while converting configuration: " + e.getMessage()); + throw new IllegalArgumentException("Exception caused while converting configuration: " + + e.getMessage()); } } catch (IllegalAccessException e) { throw new RuntimeException("Exception caused while converting configuration: " + e.getMessage()); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index c80f0a5471c93..a0bd7a35bcee5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -20,7 +20,6 @@ import java.io.File; import java.util.function.Supplier; - import javax.servlet.ServletContext; import javax.ws.rs.GET; import javax.ws.rs.Path; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/package-info.java new file mode 100644 index 0000000000000..7306dab9a3b7a --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.common.configuration; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/zookeeper/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/zookeeper/package-info.java new file mode 100644 index 0000000000000..5e129c634444f --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/zookeeper/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.zookeeper;