Skip to content

Commit

Permalink
Merge branch 'apache:master' into HDDS-11268
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Aug 20, 2024
2 parents bace8af + 88b88ff commit 028675d
Show file tree
Hide file tree
Showing 81 changed files with 2,676 additions and 1,178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,14 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private boolean enablePutblockPiggybacking = false;

@Config(key = "key.write.concurrency",
defaultValue = "1",
description = "Maximum concurrent writes allowed on each key. " +
"Defaults to 1 which matches the behavior before HDDS-9844. " +
"For unlimited write concurrency, set this to -1 or any negative integer value.",
tags = ConfigTag.CLIENT)
private int maxConcurrentWritePerKey = 1;

@PostConstruct
public void validate() {
Preconditions.checkState(streamBufferSize > 0);
Expand Down Expand Up @@ -485,4 +493,12 @@ public void setIncrementalChunkList(boolean enable) {
public boolean getIncrementalChunkList() {
return this.incrementalChunkList;
}

public void setMaxConcurrentWritePerKey(int maxConcurrentWritePerKey) {
this.maxConcurrentWritePerKey = maxConcurrentWritePerKey;
}

public int getMaxConcurrentWritePerKey() {
return this.maxConcurrentWritePerKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
throw new IllegalArgumentException(watchType + " is not supported. " +
"Currently only ALL_COMMITTED or MAJORITY_COMMITTED are supported");
}
LOG.info("WatchType {}. Majority {}, ", this.watchType, this.majority);
LOG.debug("WatchType {}. Majority {}, ", this.watchType, this.majority);
if (LOG.isTraceEnabled()) {
LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(),
new Throwable("TRACE"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ private HddsConfigKeys() {
public static final String HDDS_DATANODE_HANDLER_COUNT_KEY =
"hdds.datanode.handler.count";
public static final int HDDS_DATANODE_HANDLER_COUNT_DEFAULT = 10;
public static final String HDDS_DATANODE_READ_THREADPOOL_KEY =
"hdds.datanode.read.threadpool";
public static final int HDDS_DATANODE_READ_THREADPOOL_DEFAULT = 10;
public static final String HDDS_DATANODE_HTTP_BIND_HOST_DEFAULT = "0.0.0.0";
public static final int HDDS_DATANODE_HTTP_BIND_PORT_DEFAULT = 9882;
public static final int HDDS_DATANODE_HTTPS_BIND_PORT_DEFAULT = 9883;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.conf;

import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;

/**
* Utilities for Ratis configurations.
*/
public class RatisConfUtils {
/** For {@link GrpcConfigKeys}. */
public static class Grpc {
/** For setting {@link GrpcConfigKeys#setMessageSizeMax(RaftProperties, SizeInBytes)}. */
public static void setMessageSizeMax(RaftProperties properties, int max) {
Preconditions.assertTrue(max > 0, () -> "max = " + max + " <= 0");

final long logAppenderBufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSize();
Preconditions.assertTrue(max >= logAppenderBufferByteLimit,
() -> "max = " + max + " < logAppenderBufferByteLimit = " + logAppenderBufferByteLimit);

// Need an 1MB gap; see RATIS-2135
GrpcConfigKeys.setMessageSizeMax(properties, SizeInBytes.valueOf(max + SizeInBytes.ONE_MB.getSize()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,27 @@ public final class ScmConfigKeys {
"ozone.scm.handler.count.key";
public static final String OZONE_SCM_CLIENT_HANDLER_COUNT_KEY =
"ozone.scm.client.handler.count.key";
public static final String OZONE_SCM_CLIENT_READ_THREADPOOL_KEY =
"ozone.scm.client.read.threadpool";
public static final int OZONE_SCM_CLIENT_READ_THREADPOOL_DEFAULT = 10;
public static final String OZONE_SCM_BLOCK_HANDLER_COUNT_KEY =
"ozone.scm.block.handler.count.key";
public static final String OZONE_SCM_BLOCK_READ_THREADPOOL_KEY =
"ozone.scm.block.read.threadpool";
public static final int OZONE_SCM_BLOCK_READ_THREADPOOL_DEFAULT = 10;
public static final String OZONE_SCM_DATANODE_HANDLER_COUNT_KEY =
"ozone.scm.datanode.handler.count.key";
public static final String OZONE_SCM_DATANODE_READ_THREADPOOL_KEY =
"ozone.scm.datanode.read.threadpool";
public static final int OZONE_SCM_DATANODE_READ_THREADPOOL_DEFAULT = 10;
public static final int OZONE_SCM_HANDLER_COUNT_DEFAULT = 100;

public static final String OZONE_SCM_SECURITY_HANDLER_COUNT_KEY =
"ozone.scm.security.handler.count.key";
public static final int OZONE_SCM_SECURITY_HANDLER_COUNT_DEFAULT = 2;
public static final String OZONE_SCM_SECURITY_READ_THREADPOOL_KEY =
"ozone.scm.security.read.threadpool";
public static final int OZONE_SCM_SECURITY_READ_THREADPOOL_DEFAULT = 1;

public static final String OZONE_SCM_DEADNODE_INTERVAL =
"ozone.scm.dead.node.interval";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public final class OzoneConfigKeys {
public static final String OZONE_FS_HSYNC_ENABLED
= "ozone.fs.hsync.enabled";
public static final boolean OZONE_FS_HSYNC_ENABLED_DEFAULT
= true;
= false;

/**
* hsync lease soft limit.
Expand Down
52 changes: 51 additions & 1 deletion hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,36 @@
The default value is 100.
</description>
</property>
<property>
<name>ozone.scm.client.read.threadpool</name>
<value>10</value>
<tag>OZONE, MANAGEMENT, PERFORMANCE</tag>
<description>
The number of threads in RPC server reading from the socket used by Client to access SCM.
This config overrides Hadoop configuration "ipc.server.read.threadpool.size" for SCMClientProtocolServer.
The default value is 10.
</description>
</property>
<property>
<name>ozone.scm.block.read.threadpool</name>
<value>10</value>
<tag>OZONE, MANAGEMENT, PERFORMANCE</tag>
<description>
The number of threads in RPC server reading from the socket when accessing blocks.
This config overrides Hadoop configuration "ipc.server.read.threadpool.size" for SCMBlockProtocolServer.
The default value is 10.
</description>
</property>
<property>
<name>ozone.scm.datanode.read.threadpool</name>
<value>10</value>
<tag>OZONE, MANAGEMENT, PERFORMANCE</tag>
<description>
The number of threads in RPC server reading from the socket used by DataNode to access SCM.
This config overrides Hadoop configuration "ipc.server.read.threadpool.size" for SCMDatanodeProtocolServer.
The default value is 10.
</description>
</property>
<property>
<name>hdds.heartbeat.interval</name>
<value>30s</value>
Expand Down Expand Up @@ -2493,6 +2523,16 @@
<tag>OZONE, HDDS, SECURITY</tag>
<description>Threads configured for SCMSecurityProtocolServer.</description>
</property>
<property>
<name>ozone.scm.security.read.threadpool</name>
<value>1</value>
<tag>OZONE, HDDS, SECURITY, PERFORMANCE</tag>
<description>
The number of threads in RPC server reading from the socket when performing security related operations with SCM.
This config overrides Hadoop configuration "ipc.server.read.threadpool.size" for SCMSecurityProtocolServer.
The default value is 1.
</description>
</property>
<property>
<name>ozone.scm.security.service.address</name>
<value/>
Expand Down Expand Up @@ -2935,6 +2975,16 @@
service endpoints.
</description>
</property>
<property>
<name>hdds.datanode.read.threadpool</name>
<value>10</value>
<tag>OZONE, HDDS, PERFORMANCE</tag>
<description>
The number of threads in RPC server reading from the socket for Datanode client service endpoints.
This config overrides Hadoop configuration "ipc.server.read.threadpool.size" for HddsDatanodeClientProtocolServer.
The default value is 10.
</description>
</property>
<property>
<name>ozone.client.failover.max.attempts</name>
<value>500</value>
Expand Down Expand Up @@ -4163,7 +4213,7 @@

<property>
<name>ozone.fs.hsync.enabled</name>
<value>true</value>
<value>false</value>
<tag>OZONE, CLIENT</tag>
<description>
Enable hsync/hflush. By default they are disabled.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.conf;

import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.SizeInBytes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test {@link RatisConfUtils}.
*/
public class TestRatisConfUtils {
private static final Logger LOG = LoggerFactory.getLogger(TestRatisConfUtils.class);

@Test
void testGrpcSetMessageSizeMax() {
final RaftProperties properties = new RaftProperties();

final int logAppenderBufferByteLimit = 1000;

// setMessageSizeMax without setBufferByteLimit
Assertions.assertThrows(IllegalStateException.class,
() -> RatisConfUtils.Grpc.setMessageSizeMax(properties, logAppenderBufferByteLimit));

RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties, SizeInBytes.valueOf(logAppenderBufferByteLimit));

// setMessageSizeMax with a value smaller than logAppenderBufferByteLimit
Assertions.assertThrows(IllegalStateException.class,
() -> RatisConfUtils.Grpc.setMessageSizeMax(properties, logAppenderBufferByteLimit - 1));

// setMessageSizeMax with the correct logAppenderBufferByteLimit
RatisConfUtils.Grpc.setMessageSizeMax(properties, logAppenderBufferByteLimit);

final SizeInBytes max = GrpcConfigKeys.messageSizeMax(properties, LOG::info);
Assertions.assertEquals(SizeInBytes.ONE_MB.getSize(), max.getSize() - logAppenderBufferByteLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_READ_THREADPOOL_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_READ_THREADPOOL_DEFAULT;
import static org.apache.hadoop.hdds.HddsUtils.preserveThreadName;
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.CLIENT_RPC;

Expand Down Expand Up @@ -106,14 +108,16 @@ private RPC.Server getRpcServer(OzoneConfiguration configuration,

final int handlerCount = conf.getInt(HDDS_DATANODE_HANDLER_COUNT_KEY,
HDDS_DATANODE_HANDLER_COUNT_DEFAULT);
final int readThreads = conf.getInt(HDDS_DATANODE_READ_THREADPOOL_KEY,
HDDS_DATANODE_READ_THREADPOOL_DEFAULT);
ReconfigureProtocolServerSideTranslatorPB reconfigureServerProtocol
= new ReconfigureProtocolServerSideTranslatorPB(reconfigurationHandler);
BlockingService reconfigureService = ReconfigureProtocolProtos
.ReconfigureProtocolService.newReflectiveBlockingService(
reconfigureServerProtocol);

return preserveThreadName(() -> startRpcServer(configuration, rpcAddress,
ReconfigureProtocolDatanodePB.class, reconfigureService, handlerCount));
ReconfigureProtocolDatanodePB.class, reconfigureService, handlerCount, readThreads));
}

/**
Expand All @@ -130,14 +134,15 @@ private RPC.Server getRpcServer(OzoneConfiguration configuration,
private RPC.Server startRpcServer(
Configuration configuration, InetSocketAddress addr,
Class<?> protocol, BlockingService instance,
int handlerCount)
int handlerCount, int readThreads)
throws IOException {
return new RPC.Builder(configuration)
.setProtocol(protocol)
.setInstance(instance)
.setBindAddress(addr.getHostString())
.setPort(addr.getPort())
.setNumHandlers(handlerCount)
.setNumReaders(readThreads)
.setVerbose(false)
.setSecretManager(null)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void start() {
.register(REPLICATION_STREAMS_LIMIT_KEY,
this::reconfigReplicationStreamsLimit);

datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails, conf,
dnCertClient, secretKeyClient, this::terminateDatanode,
reconfigurationHandler);
try {
Expand Down Expand Up @@ -620,6 +620,10 @@ public void saveNewCertId(String newCertId) {
}
}

public boolean isStopped() {
return isStopped.get();
}

/**
* Check ozone admin privilege, throws exception if not admin.
*/
Expand Down
Loading

0 comments on commit 028675d

Please sign in to comment.