Skip to content

Commit

Permalink
HDDS-11581. Remove duplicate ContainerStateMachine#RaftGroupId (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghuazhu authored Oct 14, 2024
1 parent 64e035d commit 2139367
Showing 1 changed file with 37 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ long getStartTime() {

private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final RaftGroupId gid;
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
private final XceiverServerRatis ratisServer;
Expand Down Expand Up @@ -218,7 +217,6 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
ConfigurationSource conf,
String threadNamePrefix) {
this.datanodeService = hddsDatanodeService;
this.gid = gid;
this.dispatcher = dispatcher;
this.containerController = containerController;
this.ratisServer = ratisServer;
Expand Down Expand Up @@ -282,7 +280,7 @@ public void initialize(
throws IOException {
super.initialize(server, id, raftStorage);
storage.init(raftStorage);
ratisServer.notifyGroupAdd(gid);
ratisServer.notifyGroupAdd(id);

LOG.info("{}: initialize {}", server.getId(), id);
loadSnapshot(storage.getLatestSnapshot());
Expand All @@ -293,15 +291,15 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
if (snapshot == null) {
TermIndex empty = TermIndex.valueOf(0, RaftLog.INVALID_LOG_INDEX);
LOG.info("{}: The snapshot info is null. Setting the last applied index " +
"to:{}", gid, empty);
"to:{}", getGroupId(), empty);
setLastAppliedTermIndex(empty);
return empty.getIndex();
}

final File snapshotFile = snapshot.getFile().getPath().toFile();
final TermIndex last =
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
LOG.info("{}: Setting the last applied index to {}", gid, last);
LOG.info("{}: Setting the last applied index to {}", getGroupId(), last);
setLastAppliedTermIndex(last);

// initialize the dispatcher with snapshot so that it build the missing
Expand Down Expand Up @@ -351,7 +349,7 @@ public long takeSnapshot() throws IOException {
long startTime = Time.monotonicNow();
if (!isStateMachineHealthy()) {
String msg =
"Failed to take snapshot " + " for " + gid + " as the stateMachine"
"Failed to take snapshot " + " for " + getGroupId() + " as the stateMachine"
+ " is unhealthy. The last applied index is at " + ti;
StateMachineException sme = new StateMachineException(msg);
LOG.error(msg);
Expand All @@ -360,19 +358,19 @@ public long takeSnapshot() throws IOException {
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
final File snapshotFile =
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
LOG.info("{}: Taking a snapshot at:{} file {}", getGroupId(), ti, snapshotFile);
try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
persistContainerSet(fos);
fos.flush();
// make sure the snapshot file is synced
fos.getFD().sync();
} catch (IOException ioe) {
LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti,
LOG.error("{}: Failed to write snapshot at:{} file {}", getGroupId(), ti,
snapshotFile);
throw ioe;
}
LOG.info("{}: Finished taking a snapshot at:{} file:{} took: {} ms",
gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
getGroupId(), ti, snapshotFile, (Time.monotonicNow() - startTime));
return ti.getIndex();
}
return -1;
Expand All @@ -386,7 +384,7 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole rol
final StateMachineLogEntryProto stateMachineLogEntry = entry.getStateMachineLogEntry();
final ContainerCommandRequestProto logProto;
try {
logProto = getContainerCommandRequestProto(gid, stateMachineLogEntry.getLogData());
logProto = getContainerCommandRequestProto(getGroupId(), stateMachineLogEntry.getLogData());
} catch (InvalidProtocolBufferException e) {
trx.setException(e);
return trx;
Expand All @@ -413,7 +411,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
long startTime = Time.monotonicNowNanos();
final ContainerCommandRequestProto proto =
message2ContainerCommandRequestProto(request.getMessage());
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
Preconditions.checkArgument(request.getRaftGroupId().equals(getGroupId()));

final TransactionContext.Builder builder = TransactionContext.newBuilder()
.setClientRequest(request)
Expand Down Expand Up @@ -449,7 +447,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
final WriteChunkRequestProto.Builder commitWriteChunkProto = WriteChunkRequestProto.newBuilder(write)
.clearData();
protoBuilder.setWriteChunk(commitWriteChunkProto)
.setPipelineID(gid.getUuid().toString())
.setPipelineID(getGroupId().getUuid().toString())
.setTraceID(proto.getTraceID());

builder.setStateMachineData(write.getData());
Expand Down Expand Up @@ -491,20 +489,20 @@ private static ContainerCommandRequestProto getContainerCommandRequestProto(

private ContainerCommandRequestProto message2ContainerCommandRequestProto(
Message message) throws InvalidProtocolBufferException {
return ContainerCommandRequestMessage.toProto(message.getContent(), gid);
return ContainerCommandRequestMessage.toProto(message.getContent(), getGroupId());
}

private ContainerCommandResponseProto dispatchCommand(
ContainerCommandRequestProto requestProto, DispatcherContext context) {
if (LOG.isTraceEnabled()) {
LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", getGroupId(),
requestProto.getCmdType(), requestProto.getContainerID(),
requestProto.getPipelineID(), requestProto.getTraceID());
}
ContainerCommandResponseProto response =
dispatcher.dispatch(requestProto, context);
if (LOG.isTraceEnabled()) {
LOG.trace("{}: response {}", gid, response);
LOG.trace("{}: response {}", getGroupId(), response);
}
return response;
}
Expand All @@ -531,7 +529,7 @@ private CompletableFuture<Message> writeStateMachineData(
RaftServer server = ratisServer.getServer();
Preconditions.checkArgument(!write.getData().isEmpty());
try {
if (server.getDivision(gid).getInfo().isLeader()) {
if (server.getDivision(getGroupId()).getInfo().isLeader()) {
stateMachineDataCache.put(entryIndex, write.getData());
}
} catch (InterruptedException ioe) {
Expand Down Expand Up @@ -559,7 +557,7 @@ private CompletableFuture<Message> writeStateMachineData(
return dispatchCommand(requestProto, context);
} catch (Exception e) {
LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
"{} logIndex {} chunkName {}", gid, write.getBlockID(),
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
entryIndex, write.getChunkData().getChunkName(), e);
metrics.incNumWriteDataFails();
// write chunks go in parallel. It's possible that one write chunk
Expand All @@ -573,7 +571,7 @@ private CompletableFuture<Message> writeStateMachineData(
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
"{} logIndex {} chunkName {}", gid, write.getBlockID(),
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
entryIndex, write.getChunkData().getChunkName());
}
// Remove the future once it finishes execution from the
Expand All @@ -587,7 +585,7 @@ private CompletableFuture<Message> writeStateMachineData(
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" +
LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName() + " Error message: " +
r.getMessage() + " Container Result: " + r.getResult());
Expand All @@ -601,7 +599,7 @@ private CompletableFuture<Message> writeStateMachineData(
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
if (LOG.isDebugEnabled()) {
LOG.debug(gid +
LOG.debug(getGroupId() +
": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName());
Expand All @@ -622,7 +620,7 @@ private StateMachine.DataChannel getStreamDataChannel(
DispatcherContext context) throws StorageContainerException {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: getStreamDataChannel {} containerID={} pipelineID={} " +
"traceID={}", gid, requestProto.getCmdType(),
"traceID={}", getGroupId(), requestProto.getCmdType(),
requestProto.getContainerID(), requestProto.getPipelineID(),
requestProto.getTraceID());
}
Expand Down Expand Up @@ -781,7 +779,7 @@ private ByteString readStateMachineData(
new StorageContainerException(response.getMessage(),
response.getResult());
LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : "
+ "{} Container Result: {}", gid, response.getCmdType(), index,
+ "{} Container Result: {}", getGroupId(), response.getCmdType(), index,
response.getMessage(), response.getResult());
stateMachineHealthy.set(false);
throw sce;
Expand Down Expand Up @@ -856,7 +854,7 @@ public CompletableFuture<ByteString> read(LogEntryProto entry, TransactionContex
.map(TransactionContext::getStateMachineContext)
.orElse(null);
final ContainerCommandRequestProto requestProto = context != null ? context.getLogProto()
: getContainerCommandRequestProto(gid, entry.getStateMachineLogEntry().getLogData());
: getContainerCommandRequestProto(getGroupId(), entry.getStateMachineLogEntry().getLogData());

if (requestProto.getCmdType() != Type.WriteChunk) {
throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
Expand All @@ -874,7 +872,7 @@ public CompletableFuture<ByteString> read(LogEntryProto entry, TransactionContex
return future;
} catch (Exception e) {
metrics.incNumReadStateMachineFails();
LOG.error("{} unable to read stateMachineData:", gid, e);
LOG.error("{} unable to read stateMachineData:", getGroupId(), e);
return completeExceptionally(e);
}
}
Expand Down Expand Up @@ -920,7 +918,7 @@ public void notifyServerShutdown(RaftProtos.RoleInfoProto roleInfo, boolean allS
// from `HddsDatanodeService.stop()`, otherwise, it indicates this `close` originates from ratis.
if (allServer) {
if (datanodeService != null && !datanodeService.isStopped()) {
LOG.info("{} is closed by ratis", gid);
LOG.info("{} is closed by ratis", getGroupId());
if (semaphore.tryAcquire()) {
// run with a different thread, so this raft group can be closed
Runnable runnable = () -> {
Expand Down Expand Up @@ -952,7 +950,7 @@ public void notifyServerShutdown(RaftProtos.RoleInfoProto roleInfo, boolean allS
CompletableFuture.runAsync(runnable);
}
} else {
LOG.info("{} is closed by HddsDatanodeService", gid);
LOG.info("{} is closed by HddsDatanodeService", getGroupId());
}
}
}
Expand Down Expand Up @@ -983,7 +981,7 @@ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
private void removeStateMachineDataIfNeeded(long index) {
if (waitOnBothFollowers) {
try {
RaftServer.Division division = ratisServer.getServer().getDivision(gid);
RaftServer.Division division = ratisServer.getServer().getDivision(getGroupId());
if (division.getInfo().isLeader()) {
long minIndex = Arrays.stream(division.getInfo()
.getFollowerNextIndices()).min().getAsLong();
Expand Down Expand Up @@ -1041,7 +1039,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
CompletableFuture<Message> applyTransactionFuture =
new CompletableFuture<>();
final Consumer<Throwable> exceptionHandler = e -> {
LOG.error(gid + ": failed to applyTransaction at logIndex " + index
LOG.error(getGroupId() + ": failed to applyTransaction at logIndex " + index
+ " for " + requestProto.getCmdType(), e);
stateMachineHealthy.compareAndSet(true, false);
metrics.incNumApplyTransactionsFails();
Expand Down Expand Up @@ -1069,7 +1067,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(
"gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
+ "{} Container Result: {}", gid, r.getCmdType(), index,
+ "{} Container Result: {}", getGroupId(), r.getCmdType(), index,
r.getMessage(), r.getResult());
metrics.incNumApplyTransactionsFails();
// Since the applyTransaction now is completed exceptionally,
Expand All @@ -1078,12 +1076,12 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// shutdown.
applyTransactionFuture.completeExceptionally(sce);
stateMachineHealthy.compareAndSet(true, false);
ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
ratisServer.handleApplyTransactionFailure(getGroupId(), trx.getServerRole());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : "
+ "{} Container Result: {}", gid, r.getCmdType(), index,
+ "{} Container Result: {}", getGroupId(), r.getCmdType(), index,
r.getMessage(), r.getResult());
}
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
Expand Down Expand Up @@ -1161,25 +1159,25 @@ public void evictStateMachineCache() {

@Override
public void notifyFollowerSlowness(RoleInfoProto roleInfoProto, RaftPeer follower) {
ratisServer.handleFollowerSlowness(gid, roleInfoProto, follower);
ratisServer.handleFollowerSlowness(getGroupId(), roleInfoProto, follower);
}

@Override
public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
ratisServer.handleNoLeader(gid, roleInfoProto);
ratisServer.handleNoLeader(getGroupId(), roleInfoProto);
}

@Override
public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
LOG.error("{}: {} {}", gid, TermIndex.valueOf(failedEntry),
LOG.error("{}: {} {}", getGroupId(), TermIndex.valueOf(failedEntry),
toStateMachineLogEntryString(failedEntry.getStateMachineLogEntry()), t);
ratisServer.handleNodeLogFailure(gid, t);
ratisServer.handleNodeLogFailure(getGroupId(), t);
}

@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
ratisServer.handleInstallSnapshotFromLeader(gid, roleInfoProto,
ratisServer.handleInstallSnapshotFromLeader(getGroupId(), roleInfoProto,
firstTermIndexInLog);
final CompletableFuture<TermIndex> future = new CompletableFuture<>();
future.complete(firstTermIndexInLog);
Expand All @@ -1188,15 +1186,15 @@ public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(

@Override
public void notifyGroupRemove() {
ratisServer.notifyGroupRemove(gid);
ratisServer.notifyGroupRemove(getGroupId());
// Make best effort to quasi-close all the containers on group removal.
// Containers already in terminal state like CLOSED or UNHEALTHY will not
// be affected.
for (Long cid : container2BCSIDMap.keySet()) {
try {
containerController.markContainerForClose(cid);
containerController.quasiCloseContainer(cid,
"Ratis group removed. Group id: " + gid);
"Ratis group removed. Group id: " + getGroupId());
} catch (IOException e) {
LOG.debug("Failed to quasi-close container {}", cid);
}
Expand All @@ -1218,7 +1216,7 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,

@Override
public String toStateMachineLogEntryString(StateMachineLogEntryProto proto) {
return smProtoToString(gid, containerController, proto);
return smProtoToString(getGroupId(), containerController, proto);
}

public static String smProtoToString(RaftGroupId gid,
Expand Down

0 comments on commit 2139367

Please sign in to comment.