diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 839aea3f63e58..ce5c9b25a0133 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -1219,7 +1219,7 @@ public long maybeUpdate(long now) { return metadataTimeout; } - if (!metadataAttemptStartMs.isPresent()) + if (metadataAttemptStartMs.isEmpty()) metadataAttemptStartMs = Optional.of(now); // Beware that the behavior of this method and the computation of timeouts for poll() are @@ -1412,7 +1412,7 @@ private long maybeUpdate(long now, Node node) { if (canSendRequest(nodeConnectionId, now)) { Optional> requestOpt = clientTelemetrySender.createRequest(); - if (!requestOpt.isPresent()) + if (requestOpt.isEmpty()) return Long.MAX_VALUE; AbstractRequest.Builder request = requestOpt.get(); diff --git a/clients/src/main/java/org/apache/kafka/common/Uuid.java b/clients/src/main/java/org/apache/kafka/common/Uuid.java index 45e2b9f1d8fb2..6f7f09537f178 100644 --- a/clients/src/main/java/org/apache/kafka/common/Uuid.java +++ b/clients/src/main/java/org/apache/kafka/common/Uuid.java @@ -20,8 +20,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -51,11 +49,7 @@ public class Uuid implements Comparable { /** * The set of reserved UUIDs that will never be returned by the randomUuid method. */ - public static final Set RESERVED = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - METADATA_TOPIC_ID, - ZERO_UUID, - ONE_UUID - ))); + public static final Set RESERVED = Set.of(ZERO_UUID, ONE_UUID); private final long mostSignificantBits; private final long leastSignificantBits; diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index b0761248894c5..a5da5294b4d4d 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -367,7 +367,7 @@ public boolean hasExpired() { } synchronized List metrics() { - return unmodifiableList(new ArrayList<>(this.metrics.values())); + return List.copyOf(this.metrics.values()); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenValidatorFactory.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenValidatorFactory.java index 5fa4c620d902f..e4b39e5cc53c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenValidatorFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenValidatorFactory.java @@ -19,8 +19,6 @@ import org.jose4j.keys.resolvers.VerificationKeyResolver; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -57,7 +55,7 @@ public static AccessTokenValidator create(Map configs, List l = cu.get(SASL_OAUTHBEARER_EXPECTED_AUDIENCE); if (l != null) - expectedAudiences = Collections.unmodifiableSet(new HashSet<>(l)); + expectedAudiences = Set.copyOf(l); Integer clockSkew = cu.validateInteger(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, false); String expectedIssuer = cu.validateString(SASL_OAUTHBEARER_EXPECTED_ISSUER, false); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index a3da5d58e0e70..ba7798cbd5437 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -223,7 +223,7 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional groupInstance groupInstanceId, retryBackoffMs, retryBackoffMaxMs, - !groupInstanceId.isPresent()); + groupInstanceId.isEmpty()); } @AfterEach @@ -4135,7 +4135,7 @@ private static class RackAwareAssignor extends MockPartitionAssignor { @Override public Map> assign(Map partitionsPerTopic, Map subscriptions) { subscriptions.forEach((consumer, subscription) -> { - if (!subscription.rackId().isPresent()) + if (subscription.rackId().isEmpty()) throw new IllegalStateException("Rack id not provided in subscription for " + consumer); rackIds.add(subscription.rackId().get()); }); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index d955c7939ae8f..347f76135866d 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -891,7 +891,7 @@ private KafkaMetric getMetric(String name, Map tags) throws Exce .filter(entry -> entry.getKey().name().equals(name) && entry.getKey().tags().equals(tags)) .findFirst(); - if (!metric.isPresent()) + if (metric.isEmpty()) throw new Exception(String.format("Could not find metric called %s with tags %s", name, tags.toString())); return metric.get().getValue(); @@ -1112,7 +1112,7 @@ private KafkaMetric getMetric(String name) throws Exception { Optional> metric = metrics.metrics().entrySet().stream() .filter(entry -> entry.getKey().name().equals(name)) .findFirst(); - if (!metric.isPresent()) + if (metric.isEmpty()) throw new Exception(String.format("Could not find metric called %s", name)); return metric.get().getValue(); diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java index 8115675f5a532..d8c55573e5c6f 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java @@ -420,7 +420,7 @@ public Schema valueSchema() { public Schema build() { return new ConnectSchema(type, isOptional(), defaultValue, name, version, doc, parameters == null ? null : Collections.unmodifiableMap(parameters), - fields == null ? null : Collections.unmodifiableList(new ArrayList<>(fields.values())), keySchema, valueSchema); + fields == null ? null : List.copyOf(fields.values()), keySchema, valueSchema); } /** @@ -441,4 +441,4 @@ private static void checkNotNull(String fieldName, Object val, String fieldToSet if (val == null) throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " must be specified to set " + fieldToSet); } -} \ No newline at end of file +} diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index 6ce91baf123b6..254e2bf8ca649 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -46,7 +46,6 @@ import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.kafka.connect.mirror.MirrorUtils.adminCall; @@ -196,7 +195,7 @@ Map checkpointsForGroup(Map shouldCheckpointTopic(x.getKey().topic())) // Only perform relevant checkpoints filtered by "topic filter" .map(x -> checkpoint(group, x.getKey(), x.getValue())) - .flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs + .flatMap(o -> o.stream()) // do not emit checkpoints for partitions that don't have offset-syncs .filter(x -> x.downstreamOffset() >= 0) // ignore offsets we cannot translate accurately .filter(this::checkpointIsMoreRecent) // do not emit checkpoints for partitions that have a later checkpoint .collect(Collectors.toMap(Checkpoint::topicPartition, Function.identity())); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index 3bc7aed02b36e..b0cc368a5fcbe 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -102,11 +102,7 @@ public class MirrorMaker { private static final long SHUTDOWN_TIMEOUT_SECONDS = 60L; - public static final List> CONNECTOR_CLASSES = Collections.unmodifiableList( - Arrays.asList( - MirrorSourceConnector.class, - MirrorHeartbeatConnector.class, - MirrorCheckpointConnector.class)); + public static final List> CONNECTOR_CLASSES = List.of(MirrorSourceConnector.class, MirrorHeartbeatConnector.class, MirrorCheckpointConnector.class); private final Map herders = new HashMap<>(); private CountDownLatch startLatch; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index f65899dac6e3a..a129390b39785 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -420,7 +420,7 @@ private Set toTopics(Collection tps) { void syncTopicAcls() throws InterruptedException, ExecutionException { Optional> rawBindings = listTopicAclBindings(); - if (!rawBindings.isPresent()) + if (rawBindings.isEmpty()) return; List filteredBindings = rawBindings.get().stream() .filter(x -> x.pattern().resourceType() == ResourceType.TOPIC) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java index 62a90bfb782a0..96993c37c5ce6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java @@ -102,9 +102,7 @@ public abstract class RestServerConfig extends AbstractConfig { static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers"; // Visible for testing static final String RESPONSE_HTTP_HEADERS_DEFAULT = ""; - private static final Collection HEADER_ACTIONS = Collections.unmodifiableList( - Arrays.asList("set", "add", "setDate", "addDate") - ); + private static final Collection HEADER_ACTIONS = List.of("set", "add", "setDate", "addDate"); /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 46c78027f251f..ea92f09bd4177 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -34,7 +34,6 @@ import org.apache.maven.artifact.versioning.VersionRange; import java.time.Instant; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; @@ -149,7 +148,7 @@ public List listConnectorPlugins( .filter(p -> PluginType.SINK.toString().equals(p.type()) || PluginType.SOURCE.toString().equals(p.type())) .collect(Collectors.toList())); } else { - return Collections.unmodifiableList(new ArrayList<>(connectorPlugins)); + return List.copyOf(connectorPlugins); } } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 22536399bdcc3..09a827fba59ed 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -378,7 +378,7 @@ public synchronized void restartConnectorAndTasks(RestartRequest request, Callba } Optional maybePlan = buildRestartPlan(request); - if (!maybePlan.isPresent()) { + if (maybePlan.isEmpty()) { cb.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); return; } diff --git a/generator/src/main/java/org/apache/kafka/message/FieldType.java b/generator/src/main/java/org/apache/kafka/message/FieldType.java index a0fa201100e2c..4b204fb4950a5 100644 --- a/generator/src/main/java/org/apache/kafka/message/FieldType.java +++ b/generator/src/main/java/org/apache/kafka/message/FieldType.java @@ -504,7 +504,7 @@ default Optional fixedLength() { } default boolean isVariableLength() { - return !fixedLength().isPresent(); + return fixedLength().isEmpty(); } /** diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java index b1315894aa50c..802ac703cefd6 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java @@ -485,7 +485,7 @@ private void generateClassReader(String className, StructSpec struct, for (FieldSpec field : struct.fields()) { Versions validTaggedVersions = field.versions().intersect(field.taggedVersions()); if (!validTaggedVersions.empty()) { - if (!field.tag().isPresent()) { + if (field.tag().isEmpty()) { throw new RuntimeException("Field " + field.name() + " has tagged versions, but no tag."); } buffer.printf("case %d: {%n", field.tag().get()); diff --git a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java index 2901557daabe3..c30ef02b19855 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -58,7 +57,7 @@ public MessageSpec(@JsonProperty("name") String name, this.apiKey = apiKey == null ? Optional.empty() : Optional.of(apiKey); this.type = Objects.requireNonNull(type); this.commonStructs = commonStructs == null ? Collections.emptyList() : - Collections.unmodifiableList(new ArrayList<>(commonStructs)); + List.copyOf(commonStructs); if (flexibleVersions == null) { throw new RuntimeException("You must specify a value for flexibleVersions. " + "Please use 0+ for all new messages."); diff --git a/generator/src/main/java/org/apache/kafka/message/checker/CheckerUtils.java b/generator/src/main/java/org/apache/kafka/message/checker/CheckerUtils.java index abf32a7f7f13d..06d6fe3d552cd 100644 --- a/generator/src/main/java/org/apache/kafka/message/checker/CheckerUtils.java +++ b/generator/src/main/java/org/apache/kafka/message/checker/CheckerUtils.java @@ -79,7 +79,7 @@ static void validateTaggedVersions( FieldSpec field, Versions topLevelFlexibleVersions ) { - if (!field.flexibleVersions().isPresent()) { + if (field.flexibleVersions().isEmpty()) { if (!topLevelFlexibleVersions.contains(field.taggedVersions())) { throw new RuntimeException("Tagged versions for " + what + " " + field.name() + " are " + field.taggedVersions() + ", but top " + diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 3b776651b870c..8bf1a8e769373 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -151,7 +151,7 @@ private ConfigurationControlManager(LogContext logContext, this.alterConfigPolicy = alterConfigPolicy; this.validator = validator; this.configData = new TimelineHashMap<>(snapshotRegistry, 0); - this.staticConfig = Collections.unmodifiableMap(new HashMap<>(staticConfig)); + this.staticConfig = Map.copyOf(staticConfig); this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId)); } @@ -439,7 +439,7 @@ Map getConfigs(ConfigResource configResource) { if (map == null) { return Collections.emptyMap(); } else { - return Collections.unmodifiableMap(new HashMap<>(map)); + return Map.copyOf(map); } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 35eae955f145e..ba4b83dce5647 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -1568,7 +1568,7 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType, .setEligibleLeaderReplicasEnabled(isElrEnabled()) .setDefaultDirProvider(clusterDescriber) .build(); - if (!record.isPresent()) { + if (record.isEmpty()) { if (electionType == ElectionType.PREFERRED) { return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE); } else { @@ -1649,7 +1649,7 @@ public ControllerResult unregisterBroker(int brokerId) { ControllerResult maybeFenceOneStaleBroker() { BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager(); Optional idAndEpoch = heartbeatManager.tracker().maybeRemoveExpired(); - if (!idAndEpoch.isPresent()) { + if (idAndEpoch.isEmpty()) { log.debug("No stale brokers found."); return ControllerResult.of(Collections.emptyList(), false); } diff --git a/metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java b/metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java index b13598d47380b..42b9ee21c3b62 100644 --- a/metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java +++ b/metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java @@ -20,8 +20,6 @@ import org.apache.kafka.image.MetadataImage; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.function.Function; @@ -37,22 +35,18 @@ public class MetadataImageNode implements MetadataNode { */ private final MetadataImage image; - private static final Map> CHILDREN; - - static { - Map> children = new HashMap<>(); - children.put(ProvenanceNode.NAME, image -> new ProvenanceNode(image.provenance())); - children.put(FeaturesImageNode.NAME, image -> new FeaturesImageNode(image.features())); - children.put(ClusterImageNode.NAME, image -> new ClusterImageNode(image.cluster())); - children.put(TopicsImageNode.NAME, image -> new TopicsImageNode(image.topics())); - children.put(ConfigurationsImageNode.NAME, image -> new ConfigurationsImageNode(image.configs())); - children.put(ClientQuotasImageNode.NAME, image -> new ClientQuotasImageNode(image.clientQuotas())); - children.put(ProducerIdsImageNode.NAME, image -> new ProducerIdsImageNode(image.producerIds())); - children.put(AclsImageNode.NAME, image -> new AclsImageByIdNode(image.acls())); - children.put(ScramImageNode.NAME, image -> new ScramImageNode(image.scram())); - children.put(DelegationTokenImageNode.NAME, image -> new DelegationTokenImageNode(image.delegationTokens())); - CHILDREN = Collections.unmodifiableMap(children); - } + private static final Map> CHILDREN = Map.of( + ProvenanceNode.NAME, image -> new ProvenanceNode(image.provenance()), + FeaturesImageNode.NAME, image -> new FeaturesImageNode(image.features()), + ClusterImageNode.NAME, image -> new ClusterImageNode(image.cluster()), + TopicsImageNode.NAME, image -> new TopicsImageNode(image.topics()), + ConfigurationsImageNode.NAME, image -> new ConfigurationsImageNode(image.configs()), + ClientQuotasImageNode.NAME, image -> new ClientQuotasImageNode(image.clientQuotas()), + ProducerIdsImageNode.NAME, image -> new ProducerIdsImageNode(image.producerIds()), + AclsImageNode.NAME, image -> new AclsImageByIdNode(image.acls()), + ScramImageNode.NAME, image -> new ScramImageNode(image.scram()), + DelegationTokenImageNode.NAME, image -> new DelegationTokenImageNode(image.delegationTokens()) + ); public MetadataImageNode(MetadataImage image) { this.image = image; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java index a7012d1505c03..cefba273b25fd 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java @@ -19,9 +19,6 @@ import org.apache.kafka.common.Uuid; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Objects; @@ -39,12 +36,8 @@ public class PartitionAssignment { private final List directories; public PartitionAssignment(List replicas, DefaultDirProvider defaultDirProvider) { - this.replicas = Collections.unmodifiableList(new ArrayList<>(replicas)); - Uuid[] directories = new Uuid[replicas.size()]; - for (int i = 0; i < directories.length; i++) { - directories[i] = defaultDirProvider.defaultDir(replicas.get(i)); - } - this.directories = Collections.unmodifiableList(Arrays.asList(directories)); + this.replicas = List.copyOf(replicas); + this.directories = replicas.stream().map(replica -> defaultDirProvider.defaultDir(replica)).toList(); } /** diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/TopicAssignment.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/TopicAssignment.java index a06a525e54057..88bdc5df96cc1 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/placement/TopicAssignment.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/TopicAssignment.java @@ -17,8 +17,6 @@ package org.apache.kafka.metadata.placement; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; @@ -31,7 +29,7 @@ public class TopicAssignment { private final List assignments; public TopicAssignment(List assignments) { - this.assignments = Collections.unmodifiableList(new ArrayList<>(assignments)); + this.assignments = List.copyOf(assignments); } /** diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 7c712c4059894..46080671e9ecb 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -1338,7 +1338,7 @@ public void close() throws Exception { } private static final List PRE_PRODUCTION_RECORDS = - Collections.unmodifiableList(Arrays.asList( + List.of( new ApiMessageAndVersion(new RegisterBrokerRecord(). setBrokerEpoch(42). setBrokerId(123). @@ -1352,7 +1352,7 @@ public void close() throws Exception { new ApiMessageAndVersion(new TopicRecord(). setName("bar"). setTopicId(Uuid.fromString("cxBT72dK4si8Ied1iP4wBA")), - (short) 0))); + (short) 0)); private static final BootstrapMetadata COMPLEX_BOOTSTRAP = BootstrapMetadata.fromRecords( Arrays.asList( diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java index 49e0519fbc167..cfe320bb38d0b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java @@ -26,7 +26,6 @@ import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -37,18 +36,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class QuorumFeaturesTest { - private static final Map LOCAL; + private static final Map LOCAL = Map.of( + "foo", VersionRange.of(0, 3), + "bar", VersionRange.of(0, 4), + "baz", VersionRange.of(2, 2) + ); - private static final QuorumFeatures QUORUM_FEATURES; - - static { - Map local = new HashMap<>(); - local.put("foo", VersionRange.of(0, 3)); - local.put("bar", VersionRange.of(0, 4)); - local.put("baz", VersionRange.of(2, 2)); - LOCAL = Collections.unmodifiableMap(local); - QUORUM_FEATURES = new QuorumFeatures(0, LOCAL, Arrays.asList(0, 1, 2)); - } + private static final QuorumFeatures QUORUM_FEATURES = new QuorumFeatures(0, LOCAL, + Arrays.asList(0, 1, 2)); @Test public void testDefaultFeatureMap() { diff --git a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java index 8e35ea6741ef2..9935e50fd49b2 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java @@ -31,20 +31,18 @@ import java.util.List; import java.util.Optional; -import static java.util.Arrays.asList; -import static java.util.Collections.unmodifiableList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @Timeout(40) public class BootstrapDirectoryTest { - static final List SAMPLE_RECORDS1 = unmodifiableList(asList( + static final List SAMPLE_RECORDS1 = List.of( new ApiMessageAndVersion(new FeatureLevelRecord(). setName(MetadataVersion.FEATURE_NAME). setFeatureLevel((short) 7), (short) 0), new ApiMessageAndVersion(new NoOpRecord(), (short) 0), - new ApiMessageAndVersion(new NoOpRecord(), (short) 0))); + new ApiMessageAndVersion(new NoOpRecord(), (short) 0)); static class BootstrapTestDirectory implements AutoCloseable { File directory = null; diff --git a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java index fd41fefabf0f4..550f6ed966a6d 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java @@ -28,9 +28,7 @@ import java.util.Collections; import java.util.List; -import static java.util.Arrays.asList; import static java.util.Collections.emptyList; -import static java.util.Collections.unmodifiableList; import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1; import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2; @@ -40,14 +38,14 @@ @Timeout(60) public class BootstrapMetadataTest { - static final List SAMPLE_RECORDS1 = unmodifiableList(asList( + static final List SAMPLE_RECORDS1 = List.of( new ApiMessageAndVersion(new FeatureLevelRecord(). setName(FEATURE_NAME). setFeatureLevel((short) 7), (short) 0), new ApiMessageAndVersion(new NoOpRecord(), (short) 0), new ApiMessageAndVersion(new FeatureLevelRecord(). setName(FEATURE_NAME). - setFeatureLevel((short) 6), (short) 0))); + setFeatureLevel((short) 6), (short) 0)); @Test public void testFromVersion() { diff --git a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java index 8914931448d83..a659fcc873f7b 100644 --- a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java +++ b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java @@ -97,7 +97,7 @@ public EarliestDeadlineFunction(long newDeadlineNs) { @Override public OptionalLong apply(OptionalLong prevDeadlineNs) { - if (!prevDeadlineNs.isPresent()) { + if (prevDeadlineNs.isEmpty()) { return OptionalLong.of(newDeadlineNs); } else if (prevDeadlineNs.getAsLong() < newDeadlineNs) { return prevDeadlineNs; @@ -116,7 +116,7 @@ public LatestDeadlineFunction(long newDeadlineNs) { @Override public OptionalLong apply(OptionalLong prevDeadlineNs) { - if (!prevDeadlineNs.isPresent()) { + if (prevDeadlineNs.isEmpty()) { return OptionalLong.of(newDeadlineNs); } else if (prevDeadlineNs.getAsLong() > newDeadlineNs) { return prevDeadlineNs; diff --git a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java index b32183a214f82..1a70bcd043f97 100644 --- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java +++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java @@ -337,7 +337,7 @@ Exception enqueue(EventContext eventContext, } break; case DEFERRED: - if (!deadlineNs.isPresent()) { + if (deadlineNs.isEmpty()) { return new RuntimeException( "You must specify a deadline for deferred events."); } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java index 6efbaa136e0e9..bd5cced567a54 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java @@ -163,7 +163,7 @@ public List read() throws IOException { line = reader.readLine(); while (line != null) { Optional maybeEntry = formatter.fromString(line); - if (!maybeEntry.isPresent()) { + if (maybeEntry.isEmpty()) { throw buildMalformedLineException(line); } entries.add(maybeEntry.get()); diff --git a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java index 6ecd7ffca3be1..4480e9f0c1088 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java @@ -20,9 +20,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.security.scram.internals.ScramMechanism; -import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -115,12 +113,12 @@ public class QuotaConfig { .define(QuotaConfig.ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfig.ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC) .define(QuotaConfig.CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfig.CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_DOC) .define(QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, CLASS, null, LOW, QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_DOC); - private static final Set USER_AND_CLIENT_QUOTA_NAMES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + private static final Set USER_AND_CLIENT_QUOTA_NAMES = Set.of( PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, REQUEST_PERCENTAGE_OVERRIDE_CONFIG, CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG - ))); + ); private static void buildUserClientQuotaConfigDef(ConfigDef configDef) { configDef.define(PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, ConfigDef.Type.LONG, Long.MAX_VALUE, diff --git a/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java b/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java index 7980ed52f4a08..9829ce76aedda 100644 --- a/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java +++ b/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java @@ -81,7 +81,7 @@ public boolean isFromConsumer() { } public boolean fetchOnlyLeader() { - return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent()) || shareFetchRequest; + return isFromFollower() || (isFromConsumer() && clientMetadata.isEmpty()) || shareFetchRequest; } public boolean hardMaxBytesLimit() { diff --git a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java index ea27ee6ea417b..012be7f55a95d 100644 --- a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java +++ b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java @@ -115,7 +115,7 @@ public static Set fromBytes(byte[] bytes) throws IOException { return Collections.emptySet(); Optional jsonValue = Json.parseBytes(bytes); - if (!jsonValue.isPresent()) + if (jsonValue.isEmpty()) return Collections.emptySet(); JsonObject js = jsonValue.get().asJsonObject(); diff --git a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java index 5920fd5563b29..3605a175e08aa 100644 --- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java +++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java @@ -429,7 +429,7 @@ int numInFlight() { } static Optional globalResponseError(Optional response) { - if (!response.isPresent()) { + if (response.isEmpty()) { return Optional.of("Timeout"); } if (response.get().authenticationException() != null) { diff --git a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java index ba26c9b4dae8a..96eb4288dada8 100644 --- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java +++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java @@ -62,7 +62,6 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -85,8 +84,8 @@ public class ClientMetricsManager implements AutoCloseable { public static final String CLIENT_METRICS_REAPER_THREAD_NAME = "client-metrics-reaper"; private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); - private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( - Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); + private static final List SUPPORTED_COMPRESSION_TYPES = List.of(CompressionType.ZSTD.id, CompressionType.LZ4.id, + CompressionType.GZIP.id, CompressionType.SNAPPY.id); // Max cache size (16k active client connections per broker) private static final int CACHE_MAX_SIZE = 16384; private static final int DEFAULT_CACHE_EXPIRY_MS = 60 * 1000; diff --git a/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java index 711ffe8c94aca..0fd251edd160e 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java @@ -19,8 +19,6 @@ import org.apache.kafka.common.config.ConfigDef; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @@ -98,24 +96,23 @@ public final class ZkConfigs { private static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket"; static { - Map zkSslConfigToSystemPropertyMap = new HashMap<>(); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_CLIENT_ENABLE_CONFIG, SECURE_CLIENT); - zkSslConfigToSystemPropertyMap.put(ZK_CLIENT_CNXN_SOCKET_CONFIG, ZOOKEEPER_CLIENT_CNXN_SOCKET); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_LOCATION_CONFIG, "zookeeper.ssl.keyStore.location"); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_PASSWORD_CONFIG, "zookeeper.ssl.keyStore.password"); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_TYPE_CONFIG, "zookeeper.ssl.keyStore.type"); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_LOCATION_CONFIG, "zookeeper.ssl.trustStore.location"); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_PASSWORD_CONFIG, "zookeeper.ssl.trustStore.password"); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_TYPE_CONFIG, "zookeeper.ssl.trustStore.type"); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_PROTOCOL_CONFIG, "zookeeper.ssl.protocol"); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENABLED_PROTOCOLS_CONFIG, "zookeeper.ssl.enabledProtocols"); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_CIPHER_SUITES_CONFIG, "zookeeper.ssl.ciphersuites"); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "zookeeper.ssl.hostnameVerification"); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_CRL_ENABLE_CONFIG, "zookeeper.ssl.crl"); - zkSslConfigToSystemPropertyMap.put(ZK_SSL_OCSP_ENABLE_CONFIG, "zookeeper.ssl.ocsp"); - - ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP = Collections.unmodifiableMap(zkSslConfigToSystemPropertyMap); + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP = Map.ofEntries( + Map.entry(ZK_SSL_CLIENT_ENABLE_CONFIG, SECURE_CLIENT), + Map.entry(ZK_CLIENT_CNXN_SOCKET_CONFIG, ZOOKEEPER_CLIENT_CNXN_SOCKET), + Map.entry(ZK_SSL_KEY_STORE_LOCATION_CONFIG, "zookeeper.ssl.keyStore.location"), + Map.entry(ZK_SSL_KEY_STORE_PASSWORD_CONFIG, "zookeeper.ssl.keyStore.password"), + Map.entry(ZK_SSL_KEY_STORE_TYPE_CONFIG, "zookeeper.ssl.keyStore.type"), + Map.entry(ZK_SSL_TRUST_STORE_LOCATION_CONFIG, "zookeeper.ssl.trustStore.location"), + Map.entry(ZK_SSL_TRUST_STORE_PASSWORD_CONFIG, "zookeeper.ssl.trustStore.password"), + Map.entry(ZK_SSL_TRUST_STORE_TYPE_CONFIG, "zookeeper.ssl.trustStore.type"), + Map.entry(ZK_SSL_PROTOCOL_CONFIG, "zookeeper.ssl.protocol"), + Map.entry(ZK_SSL_ENABLED_PROTOCOLS_CONFIG, "zookeeper.ssl.enabledProtocols"), + Map.entry(ZK_SSL_CIPHER_SUITES_CONFIG, "zookeeper.ssl.ciphersuites"), + Map.entry(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "zookeeper.ssl.hostnameVerification"), + Map.entry(ZK_SSL_CRL_ENABLE_CONFIG, "zookeeper.ssl.crl"), + Map.entry(ZK_SSL_OCSP_ENABLE_CONFIG, "zookeeper.ssl.ocsp") + ); ZK_SSL_CLIENT_ENABLE_DOC = "Set client to use TLS when connecting to ZooKeeper." + " An explicit value overrides any value set via the zookeeper.client.secure system property (note the different name)." + diff --git a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java index 4cb4053505b68..f01776f44881c 100644 --- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java +++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java @@ -1313,7 +1313,7 @@ private KafkaMetric getMetric(Metrics kafkaMetrics, String name) throws Exceptio Optional> metric = kafkaMetrics.metrics().entrySet().stream() .filter(entry -> entry.getKey().name().equals(name)) .findFirst(); - if (!metric.isPresent()) + if (metric.isEmpty()) throw new Exception(String.format("Could not find metric called %s", name)); return metric.get().getValue(); diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java index 48306a794ac42..46fb24902af79 100644 --- a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java +++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java @@ -32,8 +32,6 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -43,10 +41,10 @@ public class ClientMetricsTestUtils { public static final String DEFAULT_METRICS = "org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency"; public static final int DEFAULT_PUSH_INTERVAL_MS = 30 * 1000; // 30 seconds - public static final List DEFAULT_CLIENT_MATCH_PATTERNS = Collections.unmodifiableList(Arrays.asList( + public static final List DEFAULT_CLIENT_MATCH_PATTERNS = List.of( ClientMetricsConfigs.CLIENT_SOFTWARE_NAME + "=apache-kafka-java", ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION + "=3.5.*" - )); + ); public static final int CLIENT_PORT = 56078; public static Properties defaultProperties() { diff --git a/shell/src/main/java/org/apache/kafka/shell/command/HistoryCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/HistoryCommandHandler.java index 0f10b9a0aa6df..99a13d4d17590 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/HistoryCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/HistoryCommandHandler.java @@ -93,7 +93,7 @@ public void run( PrintWriter writer, MetadataShellState state ) throws Exception { - if (!shell.isPresent()) { + if (shell.isEmpty()) { throw new RuntimeException("The history command requires a shell."); } Iterator> iter = shell.get().history(numEntriesToShow); diff --git a/shell/src/main/java/org/apache/kafka/shell/command/LsCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/LsCommandHandler.java index 4ecd9d78e6811..86ecb5fe8438a 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/LsCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/LsCommandHandler.java @@ -191,7 +191,7 @@ static void printEntries(PrintWriter writer, static ColumnSchema calculateColumnSchema(OptionalInt screenWidth, List entries) { - if (!screenWidth.isPresent()) { + if (screenWidth.isEmpty()) { return new ColumnSchema(1, entries.size()); } int maxColumns = screenWidth.getAsInt() / 4; diff --git a/shell/src/test/java/org/apache/kafka/shell/glob/GlobVisitorTest.java b/shell/src/test/java/org/apache/kafka/shell/glob/GlobVisitorTest.java index 0d96a113fedce..a97774cb87399 100644 --- a/shell/src/test/java/org/apache/kafka/shell/glob/GlobVisitorTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/glob/GlobVisitorTest.java @@ -105,7 +105,7 @@ static class InfoConsumer implements Consumer> { @Override public void accept(Optional info) { - if (!infos.isPresent()) { + if (infos.isEmpty()) { if (info.isPresent()) { infos = Optional.of(new ArrayList<>()); infos.get().add(info.get()); diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java index 1170c7e688696..7922d88d831a5 100644 --- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java @@ -20,8 +20,6 @@ import com.yammer.metrics.core.MetricName; -import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -54,8 +52,7 @@ public class RemoteStorageMetrics { private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE = REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE; private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT = REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT; private static final String REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS = REMOTE_LOG_READER_METRICS_NAME_PREFIX + "FetchRateAndTimeMs"; - public static final Set REMOTE_STORAGE_THREAD_POOL_METRICS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(REMOTE_LOG_READER_TASK_QUEUE_SIZE, REMOTE_LOG_READER_AVG_IDLE_PERCENT))); + public static final Set REMOTE_STORAGE_THREAD_POOL_METRICS = Set.of(REMOTE_LOG_READER_TASK_QUEUE_SIZE, REMOTE_LOG_READER_AVG_IDLE_PERCENT); public static final MetricName REMOTE_COPY_BYTES_PER_SEC_METRIC = getMetricName( "kafka.server", "BrokerTopicMetrics", REMOTE_COPY_BYTES_PER_SEC); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index 4ff976bdf78d5..3312d42af02eb 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -442,7 +442,7 @@ public FetchDataInfo read(long startOffset, int maxSize, Optional maxPosit // return empty records in the fetch-data-info when: // 1. adjustedMaxSize is 0 (or) // 2. maxPosition to read is unavailable - if (adjustedMaxSize == 0 || !maxPositionOpt.isPresent()) + if (adjustedMaxSize == 0 || maxPositionOpt.isEmpty()) return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY); // calculate the length of the message set to read based on whether or not they gave us a maxOffset @@ -497,7 +497,7 @@ public int recover(ProducerStateManager producerStateManager, Optional= RecordBatch.MAGIC_VALUE_V2) { leaderEpochCache.ifPresent(cache -> { if (batch.partitionLeaderEpoch() >= 0 && - (!cache.latestEpoch().isPresent() || batch.partitionLeaderEpoch() > cache.latestEpoch().getAsInt())) + (cache.latestEpoch().isEmpty() || batch.partitionLeaderEpoch() > cache.latestEpoch().getAsInt())) cache.assign(batch.partitionLeaderEpoch(), batch.baseOffset()); }); updateProducerState(producerStateManager, batch); @@ -686,7 +686,7 @@ public void onBecomeInactiveSegment() throws IOException { * load the timestamp of the first message into memory. */ private void loadFirstBatchTimestamp() { - if (!rollingBasedTimestamp.isPresent()) { + if (rollingBasedTimestamp.isEmpty()) { Iterator iter = log.batches().iterator(); if (iter.hasNext()) rollingBasedTimestamp = OptionalLong.of(iter.next().maxTimestamp()); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index ea4723098d8ff..96f49ba013ba4 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -383,12 +383,12 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse Optional recordError = validateRecordCompression(sourceCompressionType, recordIndex, record); - if (!recordError.isPresent()) { + if (recordError.isEmpty()) { recordError = validateRecord(batch, topicPartition, record, recordIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); } - if (!recordError.isPresent() + if (recordError.isEmpty() && batch.magic() > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java index 9037d25672eb4..621f4da51f251 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java @@ -181,7 +181,7 @@ public void appendDataBatch(short epoch, // Received a non-transactional message while a transaction is active throw new InvalidTxnStateException("Expected transactional write from producer " + producerId + " at " + "offset " + firstOffsetMetadata + " in partition " + topicPartition); - } else if (!currentTxnFirstOffset.isPresent() && isTransactional) { + } else if (currentTxnFirstOffset.isEmpty() && isTransactional) { // Began a new transaction updatedEntry.setCurrentTxnFirstOffset(firstOffset); transactions.add(new TxnMetadata(producerId, firstOffsetMetadata)); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index 994f34744e43e..4b0175f3b2483 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -248,9 +248,9 @@ public Optional firstUnstableOffset() { Optional unreplicatedFirstOffset = Optional.ofNullable(unreplicatedTxns.firstEntry()).map(e -> e.getValue().firstOffset); Optional undecidedFirstOffset = Optional.ofNullable(ongoingTxns.firstEntry()).map(e -> e.getValue().firstOffset); - if (!unreplicatedFirstOffset.isPresent()) + if (unreplicatedFirstOffset.isEmpty()) return undecidedFirstOffset; - else if (!undecidedFirstOffset.isPresent()) + else if (undecidedFirstOffset.isEmpty()) return unreplicatedFirstOffset; else if (undecidedFirstOffset.get().messageOffset < unreplicatedFirstOffset.get().messageOffset) return undecidedFirstOffset; @@ -328,7 +328,7 @@ public void loadProducerEntry(ProducerStateEntry entry) { } private boolean isProducerExpired(long currentTimeMs, ProducerStateEntry producerState) { - return !producerState.currentTxnFirstOffset().isPresent() && currentTimeMs - producerState.lastTimestamp() >= producerStateManagerConfig.producerIdExpirationMs(); + return producerState.currentTxnFirstOffset().isEmpty() && currentTimeMs - producerState.lastTimestamp() >= producerStateManagerConfig.producerIdExpirationMs(); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index eaf700aee3834..8c416e5b53602 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -83,7 +83,7 @@ public static void rebuildProducerState(ProducerStateManager producerStateManage // offset (see below). The next time the log is reloaded, we will load producer state using this snapshot // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state // from the first segment. - if (!producerStateManager.latestSnapshotOffset().isPresent() && reloadFromCleanShutdown) { + if (producerStateManager.latestSnapshotOffset().isEmpty() && reloadFromCleanShutdown) { // To avoid an expensive scan through all the segments, we take empty snapshots from the start of the // last two segments and the last offset. This should avoid the full scan in the case that the log needs // truncation. diff --git a/storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java b/storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java index a26c57b680d9b..46bb266073faf 100644 --- a/storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java +++ b/storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java @@ -84,7 +84,7 @@ private BrokerTopicMetrics(Optional name, boolean remoteStorageEnabled) metricTypeMap.put(INVALID_MESSAGE_CRC_RECORDS_PER_SEC, new MeterWrapper(INVALID_MESSAGE_CRC_RECORDS_PER_SEC, "requests")); metricTypeMap.put(INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC, new MeterWrapper(INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC, "requests")); - if (!name.isPresent()) { + if (name.isEmpty()) { metricTypeMap.put(REPLICATION_BYTES_IN_PER_SEC, new MeterWrapper(REPLICATION_BYTES_IN_PER_SEC, "bytes")); metricTypeMap.put(REPLICATION_BYTES_OUT_PER_SEC, new MeterWrapper(REPLICATION_BYTES_OUT_PER_SEC, "bytes")); metricTypeMap.put(REASSIGNMENT_BYTES_IN_PER_SEC, new MeterWrapper(REASSIGNMENT_BYTES_IN_PER_SEC, "bytes")); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java index 83884e6ce3da6..5359d9e5e407e 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java @@ -73,7 +73,7 @@ public boolean matches(final LocalTieredStorageCondition condition) { if (!exception.map(e -> condition.failed).orElseGet(() -> !condition.failed)) { return false; } - if (condition.baseOffset != null && !metadata.isPresent()) { + if (condition.baseOffset != null && metadata.isEmpty()) { return false; } return condition.baseOffset == null || metadata.get().startOffset() == condition.baseOffset; diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java index 34f4c7d0892aa..9e514cd231422 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java @@ -47,16 +47,16 @@ public AlterLogDirAction(TopicPartition topicPartition, @Override public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException, TimeoutException { Optional localStorage = context.localStorages().stream().filter(storage -> storage.getBrokerId() == brokerId).findFirst(); - if (!localStorage.isPresent()) { + if (localStorage.isEmpty()) { throw new IllegalArgumentException("cannot find local storage for this topic partition:" + topicPartition + " in this broker id:" + brokerId); } Optional sourceDir = localStorage.get().getBrokerStorageDirectories().stream().filter(dir -> localStorage.get().dirContainsTopicPartition(topicPartition, dir)).findFirst(); - if (!sourceDir.isPresent()) { + if (sourceDir.isEmpty()) { throw new IllegalArgumentException("No log dir with topic partition:" + topicPartition + " in this broker id:" + brokerId); } Optional targetDir = localStorage.get().getBrokerStorageDirectories().stream().filter(dir -> !localStorage.get().dirContainsTopicPartition(topicPartition, dir)).findFirst(); - if (!targetDir.isPresent()) { + if (targetDir.isEmpty()) { throw new IllegalArgumentException("No log dir without topic partition:" + topicPartition + " in this broker id:" + brokerId); } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java index 9549a6b691670..df8f255830a12 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java @@ -100,7 +100,7 @@ public void doExecute(TieredStorageTestContext context) throws InterruptedExcept .filter(record -> record.offset() >= fetchOffset) .findFirst(); - if (!firstExpectedRecordOpt.isPresent()) { + if (firstExpectedRecordOpt.isEmpty()) { // If no records could be found in the second-tier storage, no record would be consumed from that storage. if (expectedFromSecondTierCount > 0) { fail("Could not find any record with offset >= " + fetchOffset + " from tier storage."); diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java index 639a464f3c910..b6cd73f7131e3 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java @@ -181,7 +181,7 @@ private OffsetHolder getEarliestLocalOffset(TopicPartition topicPartition) { .filter(filename -> filename.endsWith(LogFileUtils.LOG_FILE_SUFFIX)) .sorted() .findFirst(); - if (!firstLogFile.isPresent()) { + if (firstLogFile.isEmpty()) { throw new IllegalArgumentException(String.format( "[BrokerId=%d] No log file found for the topic-partition %s", brokerId, topicPartition)); } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java index d01a3acbb1ac1..6857bf9e380eb 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java @@ -379,10 +379,10 @@ public void testConcurrentlyAccessThreads() throws InterruptedException { executor.execute(() -> { try { for (int i = 0; i < loop + 1; i++) { - if (!kafkaStreams.addStreamThread().isPresent()) + if (kafkaStreams.addStreamThread().isEmpty()) throw new RuntimeException("failed to create stream thread"); kafkaStreams.metadataForLocalThreads(); - if (!kafkaStreams.removeStreamThread().isPresent()) + if (kafkaStreams.removeStreamThread().isEmpty()) throw new RuntimeException("failed to delete a stream thread"); } } catch (final Exception e) { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java index edb4d08fdd776..30f0c24677f56 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java @@ -19,10 +19,8 @@ import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KafkaStreams.StateListener; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; /** @@ -39,7 +37,7 @@ public CompositeStateListener(final StateListener... listeners) { } public CompositeStateListener(final Collection stateListeners) { - this.listeners = Collections.unmodifiableList(new ArrayList<>(stateListeners)); + this.listeners = List.copyOf(stateListeners); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 11dc049958e8c..e1180a320c17a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1222,34 +1222,27 @@ public class StreamsConfig extends AbstractConfig { // this is the list of configs for underlying clients // that streams prefer different default values - private static final Map PRODUCER_DEFAULT_OVERRIDES; - static { - final Map tempProducerDefaultOverrides = new HashMap<>(); - tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100"); - PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); - } + private static final Map PRODUCER_DEFAULT_OVERRIDES = Map.of(ProducerConfig.LINGER_MS_CONFIG, "100"); private static final Map PRODUCER_EOS_OVERRIDES; static { final Map tempProducerDefaultOverrides = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES); - tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); - tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - // Reduce the transaction timeout for quicker pending offset expiration on broker side. - tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT); - + tempProducerDefaultOverrides.putAll(Map.of( + ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE, + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true, + // Reduce the transaction timeout for quicker pending offset expiration on broker side. + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT + )); PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); } - private static final Map CONSUMER_DEFAULT_OVERRIDES; - static { - final Map tempConsumerDefaultOverrides = new HashMap<>(); - tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); - tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); - tempConsumerDefaultOverrides.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"); - CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); - } + private static final Map CONSUMER_DEFAULT_OVERRIDES = Map.of( + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", + "internal.leave.group.on.close", false, + ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic" + ); private static final Map CONSUMER_EOS_OVERRIDES; static { @@ -1258,12 +1251,8 @@ public class StreamsConfig extends AbstractConfig { CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); } - private static final Map ADMIN_CLIENT_OVERRIDES; - static { - final Map tempAdminClientDefaultOverrides = new HashMap<>(); - tempAdminClientDefaultOverrides.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true); - ADMIN_CLIENT_OVERRIDES = Collections.unmodifiableMap(tempAdminClientDefaultOverrides); - } + private static final Map ADMIN_CLIENT_OVERRIDES = + Map.of(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true); public static class InternalConfig { // This is settable in the main Streams config, but it's a private API for now diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index 5533963bcd55c..797f9d451411c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -171,7 +171,7 @@ public void process(final Record record) { // // This condition below allows us to process the out-of-order records without the need // to hold it in the temporary outer store - if (!outerJoinStore.isPresent() || timeTo < sharedTimeTracker.streamTime) { + if (outerJoinStore.isEmpty() || timeTo < sharedTimeTracker.streamTime) { context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); } else { sharedTimeTracker.updatedMinTime(inputRecordTimestamp); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 5367b8dede217..551b910be60de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -1120,7 +1120,7 @@ public KTable leftJoin(final KTable other, } private final Function>, Optional>> getPartition = maybeMulticastPartitions -> { - if (!maybeMulticastPartitions.isPresent()) { + if (maybeMulticastPartitions.isEmpty()) { return Optional.empty(); } if (maybeMulticastPartitions.get().size() != 1) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java index ad9128bde209d..2261e27c367f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java @@ -657,12 +657,12 @@ private static boolean canPerformRackAwareOptimization(final ApplicationState ap return false; } - if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) { + if (assignmentConfigs.rackAwareTrafficCost().isEmpty()) { LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG); return false; } - if (!assignmentConfigs.rackAwareNonOverlapCost().isPresent()) { + if (assignmentConfigs.rackAwareNonOverlapCost().isEmpty()) { LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG); return false; } @@ -695,7 +695,7 @@ private static boolean hasValidRackInformation(final ApplicationState applicatio } private static boolean hasValidRackInformation(final KafkaStreamsState state) { - if (!state.rackId().isPresent()) { + if (state.rackId().isEmpty()) { LOG.error("KafkaStreams client {} doesn't have a rack id configured.", state.processId().id()); return false; } @@ -710,7 +710,7 @@ private static boolean hasValidRackInformation(final TaskInfo task, for (final TaskTopicPartition topicPartition : topicPartitions) { final Optional> racks = topicPartition.rackIds(); - if (!racks.isPresent() || racks.get().isEmpty()) { + if (racks.isEmpty() || racks.get().isEmpty()) { LOG.error("Topic partition {} for task {} does not have racks configured.", topicPartition, task.id()); return false; } @@ -1043,4 +1043,4 @@ public TagStatistics(final ApplicationState applicationState) { this.tagEntryToClients = tagEntryToClients; } } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index bb2f2ddd5b755..306f4691e2f9a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -926,21 +926,21 @@ public boolean hasExceptionsAndFailedTasks() { public Set updatingStandbyTasks() { return stateUpdaterThread != null - ? Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.updatingStandbyTasks())) + ? Set.copyOf(stateUpdaterThread.updatingStandbyTasks()) : Collections.emptySet(); } @Override public Set updatingTasks() { return stateUpdaterThread != null - ? Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.updatingTasks())) + ? Set.copyOf(stateUpdaterThread.updatingTasks()) : Collections.emptySet(); } public Set restoredActiveTasks() { restoredActiveTasksLock.lock(); try { - return Collections.unmodifiableSet(new HashSet<>(restoredActiveTasks)); + return Set.copyOf(restoredActiveTasks); } finally { restoredActiveTasksLock.unlock(); } @@ -949,19 +949,19 @@ public Set restoredActiveTasks() { public List exceptionsAndFailedTasks() { exceptionsAndFailedTasksLock.lock(); try { - return Collections.unmodifiableList(new ArrayList<>(exceptionsAndFailedTasks)); + return List.copyOf(exceptionsAndFailedTasks); } finally { exceptionsAndFailedTasksLock.unlock(); } } public Set removedTasks() { - return Collections.unmodifiableSet(new HashSet<>(removedTasks)); + return Set.copyOf(removedTasks); } public Set pausedTasks() { return stateUpdaterThread != null - ? Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.pausedTasks())) + ? Set.copyOf(stateUpdaterThread.pausedTasks()) : Collections.emptySet(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 2af238726152c..a8a044edb6b3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -659,7 +659,7 @@ private Set validateTopics(final Set topicsToValidate, final Set topicsToCreate = new HashSet<>(); for (final String topicName : topicsToValidate) { final Optional numberOfPartitions = topicsMap.get(topicName).numberOfPartitions(); - if (!numberOfPartitions.isPresent()) { + if (numberOfPartitions.isEmpty()) { log.error("Found undefined number of partitions for topic {}", topicName); throw new StreamsException("Topic " + topicName + " number of partitions not defined"); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 67dfa061729e2..2ff87ef19fa6b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1459,7 +1459,7 @@ public synchronized Collection> copartitionGroups() { } } final Set> uniqueCopartitionGroups = new HashSet<>(topicsToCopartitionGroup.values()); - return Collections.unmodifiableList(new ArrayList<>(uniqueCopartitionGroups)); + return List.copyOf(uniqueCopartitionGroups); } private List maybeDecorateInternalSourceTopics(final Collection sourceTopics) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 81db581d04f75..6fc7f99a48332 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -160,7 +160,7 @@ public void send(final String topic, } if (!partitions.isEmpty()) { final Optional> maybeMulticastPartitions = partitioner.partitions(topic, key, value, partitions.size()); - if (!maybeMulticastPartitions.isPresent()) { + if (maybeMulticastPartitions.isEmpty()) { // A null//empty partition indicates we should use the default partitioner send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context); } else { @@ -570,7 +570,7 @@ private void removeAllProducedSensors() { @Override public Map offsets() { - return Collections.unmodifiableMap(new HashMap<>(offsets)); + return Map.copyOf(offsets); } private void checkForException() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java index ff217925f9d04..2a0358a866b3d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java @@ -217,7 +217,7 @@ private void setRepartitionSourceTopicPartitionCount(final Map repartitionSourceTopicPartitionCount = repartitionTopicMetadata.get(repartitionSourceTopic).numberOfPartitions(); - if (!repartitionSourceTopicPartitionCount.isPresent()) { + if (repartitionSourceTopicPartitionCount.isEmpty()) { final Integer numPartitions = computePartitionCount( repartitionTopicMetadata, topicGroups, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 3ea6a374e84a1..5000522ed0d26 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -742,7 +742,7 @@ public boolean isProcessable(final long wallClockTime) { } final boolean readyToProcess = partitionGroup.readyToProcess(wallClockTime); if (!readyToProcess) { - if (!timeCurrentIdlingStarted.isPresent()) { + if (timeCurrentIdlingStarted.isEmpty()) { timeCurrentIdlingStarted = Optional.of(wallClockTime); } } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index ca8c7accdaf73..ce019036e11b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -463,7 +463,7 @@ private List rebuildMetadataForSingleTopology(final Map>, Integer> getPartition = maybeMulticastPartitions -> { - if (!maybeMulticastPartitions.isPresent()) { + if (maybeMulticastPartitions.isEmpty()) { return null; } if (maybeMulticastPartitions.get().size() != 1) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultKafkaStreamsState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultKafkaStreamsState.java index f2d3e1b6af2ca..06f5c9ce96947 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultKafkaStreamsState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultKafkaStreamsState.java @@ -99,7 +99,7 @@ public SortedSet previousStandbyTasks() { @Override public long lagFor(final TaskId task) { - if (!taskLagTotals.isPresent()) { + if (taskLagTotals.isEmpty()) { LOG.error("lagFor was called on a KafkaStreamsState {} that does not support lag computations.", processId); throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + processId); } @@ -115,7 +115,7 @@ public long lagFor(final TaskId task) { @Override public SortedSet prevTasksByLag(final String consumerClientId) { - if (!taskLagTotals.isPresent()) { + if (taskLagTotals.isEmpty()) { LOG.error("prevTasksByLag was called on a KafkaStreamsState {} that does not support lag computations.", processId); throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + processId); } @@ -139,7 +139,7 @@ public SortedSet prevTasksByLag(final String consumerClientId) { @Override public Map statefulTasksToLagSums() { - if (!taskLagTotals.isPresent()) { + if (taskLagTotals.isEmpty()) { LOG.error("statefulTasksToLagSums was called on a KafkaStreamsState {} that does not support lag computations.", processId); throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + processId); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java index 05af50dec003a..455afd0af701f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java @@ -241,7 +241,7 @@ public static boolean validateClientRack(final Map previousRackInfo = null; for (final Map.Entry> rackEntry : entry.getValue().entrySet()) { - if (!rackEntry.getValue().isPresent()) { + if (rackEntry.getValue().isEmpty()) { if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy())) { log.error( String.format("RackId doesn't exist for process %s and consumer %s", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index 977e4ad97bf1b..943f00dc2e57b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -215,7 +215,7 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo removeTopologyFuture.completeExceptionally( new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state) ); - } else if (!getTopologyByName(topologyToRemove).isPresent()) { + } else if (getTopologyByName(topologyToRemove).isEmpty()) { log.error("Attempted to remove unknown topology {}. This application currently contains the" + "following topologies: {}.", topologyToRemove, topologyMetadata.namedTopologiesView() ); @@ -431,7 +431,7 @@ public KeyQueryMetadata queryMetadataForKey(final String storeName, * See {@link KafkaStreams#allLocalStorePartitionLags()} */ public Map> allLocalStorePartitionLagsForTopology(final String topologyName) { - if (!getTopologyByName(topologyName).isPresent()) { + if (getTopologyByName(topologyName).isEmpty()) { log.error("Can't get local store partition lags since topology {} does not exist in this application", topologyName); throw new UnknownTopologyException("Can't get local store partition lags", topologyName); diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java index f37d8067453ec..5cdbf8bc8ce7e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java @@ -18,8 +18,6 @@ import org.apache.kafka.common.annotation.InterfaceStability.Evolving; -import java.util.Collections; -import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -102,7 +100,7 @@ public StateQueryRequest withPartitions(final Set partitions) { return new StateQueryRequest<>( storeName, position, - Optional.of(Collections.unmodifiableSet(new HashSet<>(partitions))), + Optional.of(Set.copyOf(partitions)), query, executionInfoEnabled, requireActive @@ -166,7 +164,7 @@ public Query getQuery() { * Whether this request should fetch from all locally available partitions. */ public boolean isAllPartitions() { - return !partitions.isPresent(); + return partitions.isEmpty(); } /** @@ -175,7 +173,7 @@ public boolean isAllPartitions() { * @throws IllegalStateException if this is a request for all partitions */ public Set getPartitions() { - if (!partitions.isPresent()) { + if (partitions.isEmpty()) { throw new IllegalStateException( "Cannot list partitions of an 'all partitions' request"); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java index bc7a52a48141b..4bcbe0089c9ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java @@ -205,11 +205,11 @@ private static QueryResult runRangeQuery( final ResultOrder order = rangeQuery.resultOrder(); final KeyValueIterator iterator; try { - if (!lowerRange.isPresent() && !upperRange.isPresent() && !order.equals(ResultOrder.DESCENDING)) { + if (lowerRange.isEmpty() && upperRange.isEmpty() && !order.equals(ResultOrder.DESCENDING)) { iterator = kvStore.all(); } else if (!order.equals(ResultOrder.DESCENDING)) { iterator = kvStore.range(lowerRange.orElse(null), upperRange.orElse(null)); - } else if (!lowerRange.isPresent() && !upperRange.isPresent()) { + } else if (lowerRange.isEmpty() && upperRange.isEmpty()) { iterator = kvStore.reverseAll(); } else { iterator = kvStore.reverseRange(lowerRange.orElse(null), upperRange.orElse(null)); @@ -500,4 +500,4 @@ private static String parseStoreException(final Exception e, final StateStor printWriter.flush(); return stringWriter.toString(); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 5dab53290263a..bcf24ee7df888 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1552,7 +1552,7 @@ public void shouldBeProcessableIfAllPartitionsBuffered() { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); - assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent()); + assertThat("task is not idling", task.timeCurrentIdlingStarted().isEmpty()); assertFalse(task.process(0L)); @@ -1564,7 +1564,7 @@ public void shouldBeProcessableIfAllPartitionsBuffered() { task.addRecords(partition2, singleton(getConsumerRecordWithOffsetAsTimestamp(partition2, 0))); assertTrue(task.process(0L)); - assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent()); + assertThat("task is not idling", task.timeCurrentIdlingStarted().isEmpty()); } @Test @@ -1580,7 +1580,7 @@ public void shouldBeRecordIdlingTimeIfSuspended() { task.resume(); - assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent()); + assertThat("task is not idling", task.timeCurrentIdlingStarted().isEmpty()); } @Test diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java index 5a506163bb211..52a2308dafe4c 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java @@ -419,7 +419,7 @@ public void forward(final Record public List> forwarded(final String childName) { final LinkedList> result = new LinkedList<>(); for (final CapturedForward capture : capturedForwards) { - if (!capture.childName().isPresent() || capture.childName().equals(Optional.of(childName))) { + if (capture.childName().isEmpty() || capture.childName().equals(Optional.of(childName))) { result.add(capture); } } diff --git a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java index dfab866d3a6f6..ad0c43fdbbfd3 100644 --- a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java +++ b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java @@ -30,7 +30,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -119,7 +118,7 @@ public Builder setNumDisksPerBroker(int numDisksPerBroker) { public Builder setPerServerProperties(Map> perServerProperties) { this.perServerProperties = Collections.unmodifiableMap( perServerProperties.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()))))); + .collect(Collectors.toMap(Map.Entry::getKey, e -> Map.copyOf(e.getValue())))); return this; } @@ -378,4 +377,4 @@ public Map propertyOverrides() { } }; } -} \ No newline at end of file +} diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java index 1edf465cf345a..6708297c85b9f 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java @@ -23,10 +23,7 @@ import org.apache.kafka.server.common.MetadataVersion; import java.io.File; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -260,7 +257,7 @@ public static class Builder { private Builder() {} public Builder setTypes(Set types) { - this.types = Collections.unmodifiableSet(new HashSet<>(types)); + this.types = Set.copyOf(types); return this; } @@ -315,44 +312,44 @@ public Builder setMetadataVersion(MetadataVersion metadataVersion) { } public Builder setServerProperties(Map serverProperties) { - this.serverProperties = Collections.unmodifiableMap(new HashMap<>(serverProperties)); + this.serverProperties = Map.copyOf(serverProperties); return this; } public Builder setConsumerProperties(Map consumerProperties) { - this.consumerProperties = Collections.unmodifiableMap(new HashMap<>(consumerProperties)); + this.consumerProperties = Map.copyOf(consumerProperties); return this; } public Builder setProducerProperties(Map producerProperties) { - this.producerProperties = Collections.unmodifiableMap(new HashMap<>(producerProperties)); + this.producerProperties = Map.copyOf(producerProperties); return this; } public Builder setAdminClientProperties(Map adminClientProperties) { - this.adminClientProperties = Collections.unmodifiableMap(new HashMap<>(adminClientProperties)); + this.adminClientProperties = Map.copyOf(adminClientProperties); return this; } public Builder setSaslServerProperties(Map saslServerProperties) { - this.saslServerProperties = Collections.unmodifiableMap(new HashMap<>(saslServerProperties)); + this.saslServerProperties = Map.copyOf(saslServerProperties); return this; } public Builder setSaslClientProperties(Map saslClientProperties) { - this.saslClientProperties = Collections.unmodifiableMap(new HashMap<>(saslClientProperties)); + this.saslClientProperties = Map.copyOf(saslClientProperties); return this; } public Builder setPerServerProperties(Map> perServerProperties) { this.perServerProperties = Collections.unmodifiableMap( perServerProperties.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()))))); + .collect(Collectors.toMap(Map.Entry::getKey, e -> Map.copyOf(e.getValue())))); return this; } public Builder setTags(List tags) { - this.tags = Collections.unmodifiableList(new ArrayList<>(tags)); + this.tags = List.copyOf(tags); return this; } diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstanceParameterResolver.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstanceParameterResolver.java index 6b3c2339d2e1e..5582eb379ba1e 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstanceParameterResolver.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstanceParameterResolver.java @@ -50,7 +50,7 @@ public boolean supportsParameter(ParameterContext parameterContext, ExtensionCon return false; } - if (!extensionContext.getTestMethod().isPresent()) { + if (extensionContext.getTestMethod().isEmpty()) { // Allow this to be injected into the class extensionContext.getRequiredTestClass(); return true; diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java index 65cfac0e0804a..127f8a4b7bedf 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java @@ -32,7 +32,6 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -102,10 +101,9 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi private static final String PROCESS_REAPER_THREAD_PREFIX = "process reaper"; private static final String RMI_THREAD_PREFIX = "RMI"; private static final String DETECT_THREAD_LEAK_KEY = "detectThreadLeak"; - private static final Set SKIPPED_THREAD_PREFIX = Collections.unmodifiableSet(Stream.of( - METRICS_METER_TICK_THREAD_PREFIX, SCALA_THREAD_PREFIX, FORK_JOIN_POOL_THREAD_PREFIX, JUNIT_THREAD_PREFIX, - ATTACH_LISTENER_THREAD_PREFIX, PROCESS_REAPER_THREAD_PREFIX, RMI_THREAD_PREFIX, SystemTimer.SYSTEM_TIMER_THREAD_PREFIX) - .collect(Collectors.toSet())); + private static final Set SKIPPED_THREAD_PREFIX = Set.of(METRICS_METER_TICK_THREAD_PREFIX, SCALA_THREAD_PREFIX, + FORK_JOIN_POOL_THREAD_PREFIX, JUNIT_THREAD_PREFIX, ATTACH_LISTENER_THREAD_PREFIX, PROCESS_REAPER_THREAD_PREFIX, + RMI_THREAD_PREFIX, SystemTimer.SYSTEM_TIMER_THREAD_PREFIX); @Override public boolean supportsTestTemplate(ExtensionContext context) { diff --git a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java index 6d4892add5583..56cf8d85ab899 100644 --- a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java @@ -208,7 +208,7 @@ private static Map findNumExpectedAttributes(MBeanServerCon List queries, Set found) throws Exception { Map result = new HashMap<>(); - if (!attributesInclude.isPresent()) { + if (attributesInclude.isEmpty()) { found.forEach(objectName -> { try { MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName); diff --git a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java index c8b41c27fa6f0..6fd652199e42a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java @@ -102,7 +102,7 @@ static void run(Duration timeoutMs, String... args) throws Exception { * The validate function should be checking that this option is required if the --topic and --path-to-json-file * are not specified. */ - Optional> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition); + Optional> topicPartitions = jsonFileTopicPartitions.or(() -> singleTopicPartition); Properties props = new Properties(); if (commandOptions.hasAdminClientConfig()) { diff --git a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java index 884b26388213e..dba7951aa4cbf 100644 --- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java @@ -216,10 +216,10 @@ private static List> quorumInfoToRows(QuorumInfo.ReplicaState leade String status, boolean humanReadable) { return infos.map(info -> { - String lastFetchTimestamp = !info.lastFetchTimestamp().isPresent() ? "-1" : + String lastFetchTimestamp = info.lastFetchTimestamp().isEmpty() ? "-1" : humanReadable ? format("%d ms ago", relativeTimeMs(info.lastFetchTimestamp().getAsLong(), "last fetch")) : valueOf(info.lastFetchTimestamp().getAsLong()); - String lastCaughtUpTimestamp = !info.lastCaughtUpTimestamp().isPresent() ? "-1" : + String lastCaughtUpTimestamp = info.lastCaughtUpTimestamp().isEmpty() ? "-1" : humanReadable ? format("%d ms ago", relativeTimeMs(info.lastCaughtUpTimestamp().getAsLong(), "last caught up")) : valueOf(info.lastCaughtUpTimestamp().getAsLong()); return Stream.of( @@ -382,7 +382,7 @@ static Uuid getMetadataDirectoryId(String metadataDirectory) throws Exception { if (metaProperties == null) { throw new TerseException("Unable to read meta.properties from " + metadataDirectory); } - if (!metaProperties.directoryId().isPresent()) { + if (metaProperties.directoryId().isEmpty()) { throw new TerseException("No directory id found in " + metadataDirectory); } return metaProperties.directoryId().get(); diff --git a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java index c17ad32cc950d..67ec1f6c50a9f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java @@ -472,7 +472,7 @@ void verifyCheckSum(Consumer println) { if (batch.lastOffset() >= fetchResponsePerReplica.get(replicaId).highWatermark()) { isMessageInAllReplicas = false; } else { - if (!messageInfoFromFirstReplicaOpt.isPresent()) { + if (messageInfoFromFirstReplicaOpt.isEmpty()) { messageInfoFromFirstReplicaOpt = Optional.of( new MessageInfo(replicaId, batch.lastOffset(), batch.nextOffset(), batch.checksum()) ); diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java index 66824f1d2a3d7..af9df7769f1e0 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java @@ -175,7 +175,7 @@ private AbortTransactionSpec buildAbortSpec( }) .findFirst(); - if (!foundProducerState.isPresent()) { + if (foundProducerState.isEmpty()) { printErrorAndExit("Could not find any open transactions starting at offset " + startOffset + " on partition " + topicPartition); return null; @@ -543,14 +543,14 @@ void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { Optional brokerId = Optional.ofNullable(ns.getInt("broker_id")); Optional topic = Optional.ofNullable(ns.getString("topic")); - if (!topic.isPresent() && !brokerId.isPresent()) { + if (topic.isEmpty() && brokerId.isEmpty()) { printErrorAndExit("The `find-hanging` command requires either --topic " + "or --broker-id to limit the scope of the search"); return; } Optional partition = Optional.ofNullable(ns.getInt("partition")); - if (partition.isPresent() && !topic.isPresent()) { + if (partition.isPresent() && topic.isEmpty()) { printErrorAndExit("The --partition argument requires --topic to be provided"); return; } @@ -767,7 +767,7 @@ private void findTopicPartitions( Map topicDescriptions = admin.describeTopics(topics).allTopicNames().get(); topicDescriptions.forEach((topic, description) -> { description.partitions().forEach(partitionInfo -> { - if (!brokerId.isPresent() || hasReplica(brokerId.get(), partitionInfo)) { + if (brokerId.isEmpty() || hasReplica(brokerId.get(), partitionInfo)) { topicPartitions.add(new TopicPartition(topic, partitionInfo.partition())); } }); @@ -1017,7 +1017,7 @@ static void execute( .filter(cmd -> cmd.name().equals(commandName)) .findFirst(); - if (!commandOpt.isPresent()) { + if (commandOpt.isEmpty()) { printErrorAndExit("Unexpected command " + commandName); } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java index 548e1316d84db..5c6d26f433ef8 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java @@ -187,7 +187,7 @@ public ConsoleConsumerOptions(String[] args) throws IOException { private void checkRequiredArgs() { List> topicOrFilterArgs = new ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg())); - topicOrFilterArgs.removeIf(arg -> !arg.isPresent()); + topicOrFilterArgs.removeIf(arg -> arg.isEmpty()); // user need to specify value for either --topic or --include options) if (topicOrFilterArgs.size() != 1) { CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. "); diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index d790fb5586afa..768d022ba6616 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -679,7 +679,7 @@ static List getBrokerMetadata(Admin adminClient, List b : new BrokerMetadata(node.id(), Optional.empty()) ).collect(Collectors.toList()); - long numRackless = results.stream().filter(m -> !m.rack.isPresent()).count(); + long numRackless = results.stream().filter(m -> m.rack.isEmpty()).count(); if (enableRackAwareness && numRackless != 0 && numRackless != results.size()) { throw new AdminOperationException("Not all brokers have rack information. Add " + "--disable-rack-aware in command line to make replica assignment without rack " + diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index 76a5a101548d0..8646858902b2f 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -463,7 +463,7 @@ public void testDescribeOffsetsOfExistingGroup(ClusterInstance clusterInstance) return false; Optional maybePartitionState = assignments.get().stream().filter(isGrp).findFirst(); - if (!maybePartitionState.isPresent()) + if (maybePartitionState.isEmpty()) return false; PartitionAssignmentState partitionState = maybePartitionState.get(); @@ -837,7 +837,7 @@ public void testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(Clust res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 && res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 2 && - res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, group) && !x.partition.isPresent()); + res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, group) && x.partition.isEmpty()); }, "Expected two rows (one row per consumer) in describe group results."); } } @@ -1031,7 +1031,7 @@ public void testDescribeNonOffsetCommitGroup(ClusterInstance clusterInstance) th return false; Optional maybeAssignmentState = groupOffsets.getValue().get().stream().filter(isGrp).findFirst(); - if (!maybeAssignmentState.isPresent()) + if (maybeAssignmentState.isEmpty()) return false; PartitionAssignmentState assignmentState = maybeAssignmentState.get(); diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java index c8ade0666f907..1969e91076965 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.trogdor.rest; -import java.util.Arrays; -import java.util.Collections; import java.util.List; /** @@ -36,7 +34,6 @@ public static class Constants { static final String RUNNING_VALUE = "RUNNING"; static final String STOPPING_VALUE = "STOPPING"; static final String DONE_VALUE = "DONE"; - public static final List VALUES = Collections.unmodifiableList( - Arrays.asList(PENDING_VALUE, RUNNING_VALUE, STOPPING_VALUE, DONE_VALUE)); + public static final List VALUES = List.of(PENDING_VALUE, RUNNING_VALUE, STOPPING_VALUE, DONE_VALUE); } } diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java index e5dfe629ec523..dc0b09da96a2a 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -58,7 +57,7 @@ public ConnectionStressSpec(@JsonProperty("startMs") long startMs, @JsonProperty("action") ConnectionStressAction action) { super(startMs, durationMs); this.clientNodes = (clientNodes == null) ? Collections.emptyList() : - Collections.unmodifiableList(new ArrayList<>(clientNodes)); + List.copyOf(clientNodes); this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers; this.commonClientConf = configOrEmptyMap(commonClientConf); this.targetConnectionsPerSec = targetConnectionsPerSec; diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java index be9046235218d..85c53ca97a9e9 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java @@ -272,7 +272,7 @@ public void run() { while (true) { log.info("{}: stdin writer ready.", id); Optional node = stdinQueue.take(); - if (!node.isPresent()) { + if (node.isEmpty()) { log.trace("{}: StdinWriter terminating.", id); return; } diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java index 4ba2defd2b316..9cb00efcbc655 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java @@ -66,7 +66,7 @@ public PartitionsSpec(@JsonProperty("numPartitions") int numPartitions, if (configs == null) { this.configs = Collections.emptyMap(); } else { - this.configs = Collections.unmodifiableMap(new HashMap<>(configs)); + this.configs = Map.copyOf(configs); } }