Skip to content

Commit

Permalink
Merge branch 'apache:master' into HDDS-11268
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Sep 11, 2024
2 parents e7b81f3 + 86fe920 commit 9ae14fa
Show file tree
Hide file tree
Showing 98 changed files with 4,430 additions and 1,622 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,28 +247,49 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED";

// ozone.client.hbase.enhancements.allowed
@Config(key = "hbase.enhancements.allowed",
defaultValue = "false",
description = "When set to false, client-side HBase enhancement-related Ozone (experimental) features " +
"are disabled (not allowed to be enabled) regardless of whether those configs are set.\n" +
"\n" +
"Here is the list of configs and values overridden when this config is set to false:\n" +
"1. ozone.fs.hsync.enabled = false\n" +
"2. ozone.client.incremental.chunk.list = false\n" +
"3. ozone.client.stream.putblock.piggybacking = false\n" +
"4. ozone.client.key.write.concurrency = 1\n" +
"\n" +
"A warning message will be printed if any of the above configs are overridden by this.",
tags = ConfigTag.CLIENT)
private boolean hbaseEnhancementsAllowed = false;

// ozone.client.incremental.chunk.list
@Config(key = "incremental.chunk.list",
defaultValue = "true",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Client PutBlock request can choose incremental chunk " +
"list rather than full chunk list to optimize performance. " +
"Critical to HBase. EC does not support this feature.",
"Critical to HBase. EC does not support this feature. " +
"Can be enabled only when ozone.client.hbase.enhancements.allowed = true",
tags = ConfigTag.CLIENT)
private boolean incrementalChunkList = true;
private boolean incrementalChunkList = false;

// ozone.client.stream.putblock.piggybacking
@Config(key = "stream.putblock.piggybacking",
defaultValue = "true",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Allow PutBlock to be piggybacked in WriteChunk " +
"requests if the chunk is small.",
description = "Allow PutBlock to be piggybacked in WriteChunk requests if the chunk is small. " +
"Can be enabled only when ozone.client.hbase.enhancements.allowed = true",
tags = ConfigTag.CLIENT)
private boolean enablePutblockPiggybacking = true;
private boolean enablePutblockPiggybacking = false;

// ozone.client.key.write.concurrency
@Config(key = "key.write.concurrency",
defaultValue = "1",
description = "Maximum concurrent writes allowed on each key. " +
"Defaults to 1 which matches the behavior before HDDS-9844. " +
"For unlimited write concurrency, set this to -1 or any negative integer value.",
"For unlimited write concurrency, set this to -1 or any negative integer value. " +
"Any value other than 1 is effective only when ozone.client.hbase.enhancements.allowed = true",
tags = ConfigTag.CLIENT)
private int maxConcurrentWritePerKey = 1;

Expand Down Expand Up @@ -298,6 +319,34 @@ public void validate() {
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
}

// Verify client configs related to HBase enhancements
// Enforce check on ozone.client.hbase.enhancements.allowed
if (!hbaseEnhancementsAllowed) {
// ozone.client.hbase.enhancements.allowed = false
if (incrementalChunkList) {
LOG.warn("Ignoring ozone.client.incremental.chunk.list = true " +
"because HBase enhancements are disallowed. " +
"To enable it, set ozone.client.hbase.enhancements.allowed = true.");
incrementalChunkList = false;
LOG.debug("Final ozone.client.incremental.chunk.list = {}", incrementalChunkList);
}
if (enablePutblockPiggybacking) {
LOG.warn("Ignoring ozone.client.stream.putblock.piggybacking = true " +
"because HBase enhancements are disallowed. " +
"To enable it, set ozone.client.hbase.enhancements.allowed = true.");
enablePutblockPiggybacking = false;
LOG.debug("Final ozone.client.stream.putblock.piggybacking = {}", enablePutblockPiggybacking);
}
if (maxConcurrentWritePerKey != 1) {
LOG.warn("Ignoring ozone.client.key.write.concurrency = {} " +
"because HBase enhancements are disallowed. " +
"To enable it, set ozone.client.hbase.enhancements.allowed = true.",
maxConcurrentWritePerKey);
maxConcurrentWritePerKey = 1;
LOG.debug("Final ozone.client.key.write.concurrency = {}", maxConcurrentWritePerKey);
}
// Note: ozone.fs.hsync.enabled is enforced by OzoneFSUtils#canEnableHsync, not here
}
}

public long getStreamBufferFlushSize() {
Expand Down Expand Up @@ -486,6 +535,14 @@ public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
this.datastreamPipelineMode = datastreamPipelineMode;
}

public void setHBaseEnhancementsAllowed(boolean isHBaseEnhancementsEnabled) {
this.hbaseEnhancementsAllowed = isHBaseEnhancementsEnabled;
}

public boolean getHBaseEnhancementsAllowed() {
return this.hbaseEnhancementsAllowed;
}

public void setIncrementalChunkList(boolean enable) {
this.incrementalChunkList = enable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ public BlockOutputStream(
blkIDBuilder.build()).addMetadata(keyValue);
this.pipeline = pipeline;
// tell DataNode I will send incremental chunk list
// EC does not support incremental chunk list.
this.supportIncrementalChunkList = config.getIncrementalChunkList() &&
this instanceof RatisBlockOutputStream && allDataNodesSupportPiggybacking();
this.supportIncrementalChunkList = canEnableIncrementalChunkList();
LOG.debug("incrementalChunkList is {}", supportIncrementalChunkList);
if (supportIncrementalChunkList) {
this.containerBlockData.addMetadata(INCREMENTAL_CHUNK_LIST_KV);
Expand Down Expand Up @@ -237,11 +235,51 @@ public BlockOutputStream(
config.getBytesPerChecksum());
this.clientMetrics = clientMetrics;
this.streamBufferArgs = streamBufferArgs;
this.allowPutBlockPiggybacking = config.getEnablePutblockPiggybacking() &&
allDataNodesSupportPiggybacking();
this.allowPutBlockPiggybacking = canEnablePutblockPiggybacking();
LOG.debug("PutBlock piggybacking is {}", allowPutBlockPiggybacking);
}

/**
* Helper method to check if incremental chunk list can be enabled.
* Prints debug messages if it cannot be enabled.
*/
private boolean canEnableIncrementalChunkList() {
boolean confEnableIncrementalChunkList = config.getIncrementalChunkList();
if (!confEnableIncrementalChunkList) {
return false;
}

if (!(this instanceof RatisBlockOutputStream)) {
// Note: EC does not support incremental chunk list
LOG.debug("Unable to enable incrementalChunkList because BlockOutputStream is not a RatisBlockOutputStream");
return false;
}
if (!allDataNodesSupportPiggybacking()) {
// Not all datanodes support piggybacking and incremental chunk list.
LOG.debug("Unable to enable incrementalChunkList because not all datanodes support piggybacking");
return false;
}
return confEnableIncrementalChunkList;
}

/**
* Helper method to check if PutBlock piggybacking can be enabled.
* Prints debug message if it cannot be enabled.
*/
private boolean canEnablePutblockPiggybacking() {
boolean confEnablePutblockPiggybacking = config.getEnablePutblockPiggybacking();
if (!confEnablePutblockPiggybacking) {
return false;
}

if (!allDataNodesSupportPiggybacking()) {
// Not all datanodes support piggybacking and incremental chunk list.
LOG.debug("Unable to enable PutBlock piggybacking because not all datanodes support piggybacking");
return false;
}
return confEnablePutblockPiggybacking;
}

private boolean allDataNodesSupportPiggybacking() {
// return true only if all DataNodes in the pipeline are on a version
// that supports PutBlock piggybacking.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class TestOzoneClientConfig {

Expand All @@ -36,4 +38,42 @@ void missingSizeSuffix() {

assertEquals(OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE, subject.getBytesPerChecksum());
}

@Test
void testClientHBaseEnhancementsAllowedTrue() {
// When ozone.client.hbase.enhancements.allowed = true,
// related client configs should be effective as-is.
OzoneConfiguration conf = new OzoneConfiguration();
conf.setBoolean("ozone.client.hbase.enhancements.allowed", true);

// Note: ozone.fs.hsync.enabled is checked by OzoneFSUtils.canEnableHsync(), thus not checked here
conf.setBoolean("ozone.client.incremental.chunk.list", true);
conf.setBoolean("ozone.client.stream.putblock.piggybacking", true);
conf.setInt("ozone.client.key.write.concurrency", -1);

OzoneClientConfig subject = conf.getObject(OzoneClientConfig.class);

assertTrue(subject.getIncrementalChunkList());
assertTrue(subject.getEnablePutblockPiggybacking());
assertEquals(-1, subject.getMaxConcurrentWritePerKey());
}

@Test
void testClientHBaseEnhancementsAllowedFalse() {
// When ozone.client.hbase.enhancements.allowed = false,
// related client configs should be reverted back to default.
OzoneConfiguration conf = new OzoneConfiguration();
conf.setBoolean("ozone.client.hbase.enhancements.allowed", false);

// Note: ozone.fs.hsync.enabled is checked by OzoneFSUtils.canEnableHsync(), thus not checked here
conf.setBoolean("ozone.client.incremental.chunk.list", true);
conf.setBoolean("ozone.client.stream.putblock.piggybacking", true);
conf.setInt("ozone.client.key.write.concurrency", -1);

OzoneClientConfig subject = conf.getObject(OzoneClientConfig.class);

assertFalse(subject.getIncrementalChunkList());
assertFalse(subject.getEnablePutblockPiggybacking());
assertEquals(1, subject.getMaxConcurrentWritePerKey());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,24 @@ public final class JavaUtils {
* is equal or greater than the parameter.
*
* @param version 8, 9, 10 etc.
* @return comparison with system property, always true for 8
* @return comparison with system property, always true for any int up to 8
*/
public static boolean isJavaVersionAtLeast(int version) {
return JAVA_SPEC_VER >= version;
}

/**
* Query to see if major version of Java specification of the system
* is equal or less than the parameter.
*
* @param version 8, 9, 10 etc.
* @return comparison with system property
*/
public static boolean isJavaVersionAtMost(int version) {
return JAVA_SPEC_VER <= version;
}


/**
* Private constructor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public final class ScmConfigKeys {
"ozone.chunk.read.mapped.buffer.threshold";
public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT =
"32KB";
public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_KEY =
"ozone.chunk.read.mapped.buffer.max.count";
// this max_count could not be greater than Linux platform max_map_count which by default is 65530.
public static final int OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_DEFAULT = 0;

public static final String OZONE_SCM_CONTAINER_LAYOUT_KEY =
"ozone.scm.container.layout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ public final class OzoneConfigKeys {
public static final String OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT
= "4MB";

/**
* Flag to allow server-side HBase-related features and enhancements to be enabled.
*/
public static final String OZONE_HBASE_ENHANCEMENTS_ALLOWED
= "ozone.hbase.enhancements.allowed";
public static final boolean OZONE_HBASE_ENHANCEMENTS_ALLOWED_DEFAULT
= false;

/**
* Flag to enable hsync/hflush.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public enum OzoneManagerVersion implements ComponentVersion {

ATOMIC_REWRITE_KEY(6, "OzoneManager version that supports rewriting key as atomic operation"),
HBASE_SUPPORT(7, "OzoneManager version that supports HBase integration"),
LIGHTWEIGHT_LIST_STATUS(8, "OzoneManager version that supports lightweight"
+ " listStatus API."),

FUTURE_VERSION(-1, "Used internally in the client when the server side is "
+ " newer and an unknown server version has arrived to the client.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ public class ChecksumByteBufferImpl implements ChecksumByteBuffer {

static {
Field f = null;
try {
f = ByteBuffer.class
.getDeclaredField("isReadOnly");
f.setAccessible(true);
} catch (NoSuchFieldException e) {
LOG.error("No isReadOnly field in ByteBuffer", e);
if (JavaUtils.isJavaVersionAtMost(8)) {
try {
f = ByteBuffer.class
.getDeclaredField("isReadOnly");
f.setAccessible(true);
} catch (NoSuchFieldException e) {
LOG.error("No isReadOnly field in ByteBuffer", e);
}
}
IS_READY_ONLY_FIELD = f;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ static ChunkBuffer wrap(ByteBuffer buffer) {
return new ChunkBufferImplWithByteBuffer(buffer);
}

/** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer}. */
/** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer},
* with a function called when buffers are released.*/
static ChunkBuffer wrap(List<ByteBuffer> buffers) {
Objects.requireNonNull(buffers, "buffers == null");
if (buffers.size() == 1) {
Expand Down
28 changes: 26 additions & 2 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,15 @@
The default read threshold to use memory mapped buffers.
</description>
</property>
<property>
<name>ozone.chunk.read.mapped.buffer.max.count</name>
<value>0</value>
<tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>
<description>
The default max count of memory mapped buffers allowed for a DN.
Default 0 means no mapped buffers allowed for data read.
</description>
</property>
<property>
<name>ozone.scm.container.layout</name>
<value>FILE_PER_BLOCK</value>
Expand Down Expand Up @@ -4216,12 +4225,27 @@
</description>
</property>

<property>
<name>ozone.hbase.enhancements.allowed</name>
<value>false</value>
<tag>OZONE, OM</tag>
<description>
When set to false, server-side HBase enhancement-related Ozone (experimental) features
are disabled (not allowed to be enabled) regardless of whether those configs are set.

Here is the list of configs and values overridden when this config is set to false:
1. ozone.fs.hsync.enabled = false

A warning message will be printed if any of the above configs are overridden by this.
</description>
</property>
<property>
<name>ozone.fs.hsync.enabled</name>
<value>false</value>
<tag>OZONE, CLIENT</tag>
<tag>OZONE, CLIENT, OM</tag>
<description>
Enable hsync/hflush. By default they are disabled.
Enable hsync/hflush on the Ozone Manager and/or client side. Disabled by default.
Can be enabled only when ozone.hbase.enhancements.allowed = true
</description>
</property>
<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
Expand Down Expand Up @@ -217,7 +216,6 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
ReplicationSupervisorMetrics.create(supervisor);

ecReconstructionMetrics = ECReconstructionMetrics.create();
ClientTrustManager clientTrustManager = null;
ecReconstructionCoordinator = new ECReconstructionCoordinator(
conf, certClient, secretKeyClient, context, ecReconstructionMetrics,
threadNamePrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public ECReconstructionCoordinatorTask(
debugString = reconstructionCommandInfo.toString();
}

@Override
public String getMetricName() {
return "ECReconstructions";
}

@Override
public String getMetricDescriptionSegment() {
return "EC reconstructions";
}

@Override
public void runTask() {
// Implement the coordinator logic to handle a container group
Expand Down
Loading

0 comments on commit 9ae14fa

Please sign in to comment.