Skip to content

Commit

Permalink
HDDS-11416. refactor ratis submit request avoid code duplicate (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
sumitagrawl authored Sep 11, 2024
1 parent 86fe920 commit 0f16195
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -233,7 +234,7 @@ public void setUp() throws IOException {
ozoneManager.getMetadataManager().getMetaTable().put(
OzoneConsts.RANGER_OZONE_SERVICE_VERSION_KEY, String.valueOf(v));
return null;
}).when(omRatisServer).submitRequest(any(), any());
}).when(omRatisServer).submitRequest(any(), any(), anyLong());
} catch (ServiceException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2065,6 +2065,7 @@ private void addOMNodeToPeers(String newOMNodeId) throws IOException {
} catch (IOException e) {
LOG.error("{}: Couldn't add OM {} to peer list.", getOMNodeId(),
newOMNodeId);
return;
}

if (omRatisSnapshotProvider == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.hadoop.ozone.om;

import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.om.exceptions.OMException;
Expand All @@ -35,15 +34,12 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -69,8 +65,6 @@
*/
public class TrashOzoneFileSystem extends FileSystem {

private static final RpcController NULL_RPC_CONTROLLER = null;

private static final int OZONE_FS_ITERATE_BATCH_SIZE = 100;

private static final int OZONE_MAX_LIST_KEYS_SIZE = 10000;
Expand All @@ -97,34 +91,15 @@ public TrashOzoneFileSystem(OzoneManager ozoneManager) throws IOException {
ozoneConfiguration = OzoneConfiguration.of(getConf());
}

private RaftClientRequest getRatisRequest(
OzoneManagerProtocolProtos.OMRequest omRequest) {
return RaftClientRequest.newBuilder()
.setClientId(CLIENT_ID)
.setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
.setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
.setCallId(runCount.getAndIncrement())
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();

}

private void submitRequest(OzoneManagerProtocolProtos.OMRequest omRequest)
throws Exception {
ozoneManager.getMetrics().incNumTrashWriteRequests();
if (ozoneManager.isRatisEnabled()) {
OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
// perform preExecute as ratis submit do no perform preExecute
OMClientRequest omClientRequest = OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
omRequest = omClientRequest.preExecute(ozoneManager);
RaftClientRequest req = getRatisRequest(omRequest);
ozoneManager.getOmRatisServer().submitRequest(omRequest, req);
} else {
ozoneManager.getOmServerProtocol().
submitRequest(NULL_RPC_CONTROLLER, omRequest);
}
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, CLIENT_ID, runCount.getAndIncrement());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,23 @@ private RaftClientRequest createRaftRequest(OMRequest omRequest) {
}

/**
* API used internally from OzoneManager Server when requests needs to be
* submitted to ratis, where the crafted RaftClientRequest is passed along.
* API used internally from OzoneManager Server when requests need to be submitted.
* @param omRequest
* @param raftClientRequest
* @param cliId
* @param callId
* @return OMResponse
* @throws ServiceException
*/
public OMResponse submitRequest(OMRequest omRequest,
RaftClientRequest raftClientRequest) throws ServiceException {
public OMResponse submitRequest(OMRequest omRequest, ClientId cliId, long callId) throws ServiceException {
RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
.setClientId(cliId)
.setServerId(getRaftPeerId())
.setGroupId(getRaftGroupId())
.setCallId(callId)
.setMessage(Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
RaftClientReply raftClientReply =
submitRequestToRatis(raftClientRequest);
return createOmResponse(omRequest, raftClientReply);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.File;
import java.nio.file.InvalidPathException;
Expand Down Expand Up @@ -98,6 +99,7 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.ClientId;
import org.rocksdb.RocksDBException;

import java.io.IOException;
Expand All @@ -117,6 +119,7 @@
public final class OzoneManagerRatisUtils {
private static final Logger LOG = LoggerFactory
.getLogger(OzoneManagerRatisUtils.class);
private static final RpcController NULL_RPC_CONTROLLER = null;

private OzoneManagerRatisUtils() {
}
Expand Down Expand Up @@ -502,4 +505,13 @@ public static GrpcTlsConfig createServerTlsConfig(SecurityConfig conf,

return null;
}

public static OzoneManagerProtocolProtos.OMResponse submitRequest(
OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) throws ServiceException {
if (om.isRatisEnabled()) {
return om.getOmRatisServer().submitRequest(omRequest, clientId, callId);
} else {
return om.getOmServerProtocol().submitRequest(NULL_RPC_CONTROLLER, omRequest);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
Expand All @@ -48,8 +48,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.util.Preconditions;

import java.io.IOException;
Expand Down Expand Up @@ -247,10 +245,7 @@ private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,

// Submit PurgeKeys request to OM
try {
RaftClientRequest raftClientRequest =
createRaftClientRequestForPurge(omRequest);
ozoneManager.getOmRatisServer().submitRequest(omRequest,
raftClientRequest);
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
} catch (ServiceException e) {
LOG.error("PurgeKey request failed. Will retry at next run.");
return 0;
Expand All @@ -259,20 +254,6 @@ private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
return deletedCount;
}

protected RaftClientRequest createRaftClientRequestForPurge(
OMRequest omRequest) {
return RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
.setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
.setCallId(runCount.get())
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
}

/**
* Parse Volume and Bucket Name from ObjectKey and add it to given map of
* keys to be purged per bucket.
Expand Down Expand Up @@ -311,15 +292,7 @@ protected void submitPurgePaths(List<PurgePathRequest> requests,

// Submit Purge paths request to OM
try {
if (isRatisEnabled()) {
RaftClientRequest raftClientRequest =
createRaftClientRequestForPurge(omRequest);
ozoneManager.getOmRatisServer().submitRequest(omRequest,
raftClientRequest);
} else {
getOzoneManager().getOmServerProtocol()
.submitRequest(null, omRequest);
}
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
} catch (ServiceException e) {
LOG.error("PurgePaths request failed. Will retry at next run.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
Expand All @@ -67,8 +66,6 @@
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -481,24 +478,7 @@ private void updateDeepCleanedSnapshots(List<String> deepCleanedSnapshots) {

public void submitRequest(OMRequest omRequest, ClientId clientId) {
try {
if (isRatisEnabled()) {
OzoneManagerRatisServer server = getOzoneManager().getOmRatisServer();

RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(server.getRaftPeerId())
.setGroupId(server.getRaftGroupId())
.setCallId(getRunCount().get())
.setMessage(Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();

server.submitRequest(omRequest, raftClientRequest);
} else {
getOzoneManager().getOmServerProtocol()
.submitRequest(null, omRequest);
}
OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest, clientId, getRunCount().get());
} catch (ServiceException e) {
LOG.error("Snapshot deep cleaning request failed. " +
"Will retry at next run.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,13 @@
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -208,24 +205,7 @@ private OMRequest createRequest(List<ExpiredMultipartUploadsBucket>

private void submitRequest(OMRequest omRequest) {
try {
if (isRatisEnabled()) {
OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();

RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(server.getRaftPeerId())
.setGroupId(server.getRaftGroupId())
.setCallId(runCount.get())
.setMessage(Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();

server.submitRequest(omRequest, raftClientRequest);
} else {
ozoneManager.getOmServerProtocol().submitRequest(null,
omRequest);
}
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
} catch (ServiceException e) {
LOG.error("Expired multipart info delete request failed. " +
"Will retry at next run.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,18 @@
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
import org.apache.hadoop.ozone.om.multitenant.AuthorizerLock;
import org.apache.hadoop.ozone.om.multitenant.InMemoryMultiTenantAccessController;
import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController;
import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Policy;
import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Role;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetRangerServiceVersionRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -375,19 +373,6 @@ long getRangerOzoneServicePolicyVersion() throws IOException {
return policyVersion;
}

private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
return RaftClientRequest.newBuilder()
.setClientId(CLIENT_ID)
.setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
.setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
.setCallId(runCount.get())
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
}

public void setOMDBRangerServiceVersion(long version)
throws ServiceException {
// OM DB update goes through Ratis
Expand All @@ -402,9 +387,7 @@ public void setOMDBRangerServiceVersion(long version)
.build();

try {
RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
ozoneManager.getOmRatisServer().submitRequest(omRequest,
raftClientRequest);
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, CLIENT_ID, runCount.get());
} catch (ServiceException e) {
LOG.error("SetRangerServiceVersion request failed. "
+ "Will retry at next run.");
Expand Down
Loading

0 comments on commit 0f16195

Please sign in to comment.