Skip to content

Commit

Permalink
HDDS-11695. SCM follower should not log NotLeaderException during Pip…
Browse files Browse the repository at this point in the history
…eline Report processing. (apache#7430)
  • Loading branch information
nandakumar131 authored Nov 15, 2024
1 parent 5275ded commit 12419fa
Showing 1 changed file with 29 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.safemode.SafeModeManager;
import org.apache.hadoop.hdds.scm.server
Expand Down Expand Up @@ -91,35 +92,45 @@ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
for (PipelineReport report : pipelineReport.getPipelineReportList()) {
try {
processPipelineReport(report, dn, publisher);
} catch (NotLeaderException ex) {
// Avoid NotLeaderException logging which happens when processing
// pipeline report on followers.
} catch (PipelineNotFoundException e) {
LOGGER.error("Could not find pipeline {}", report.getPipelineID());
handlePipelineNotFoundException(report, dn, publisher);
} catch (IOException | TimeoutException e) {
LOGGER.error("Could not process pipeline report={} from dn={}.",
report, dn, e);
// Ignore NotLeaderException logging which happens when processing
// pipeline report on followers.
if (!isNotLeaderException(e)) {
LOGGER.error("Could not process pipeline report={} from dn={}.",
report, dn, e);
}
}
}
}

protected void processPipelineReport(PipelineReport report,
DatanodeDetails dn, EventPublisher publisher)
throws IOException, TimeoutException {
PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
Pipeline pipeline;
try {
pipeline = pipelineManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) {
if (scmContext.isLeader()) {
LOGGER.info("Reported pipeline {} is not found", pipelineID);
SCMCommand< ? > command = new ClosePipelineCommand(pipelineID);
private void handlePipelineNotFoundException(final PipelineReport report,
final DatanodeDetails dn, final EventPublisher publisher) {
final PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
LOGGER.info("Pipeline {}, reported by datanode {} is not found.", pipelineID, dn);
if (scmContext.isLeader()) {
try {
final SCMCommand<?> command = new ClosePipelineCommand(pipelineID);
command.setTerm(scmContext.getTermOfLeader());
publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(dn.getUuid(), command));
} catch (NotLeaderException ex) {
// Do nothing if the leader has changed.
}
return;
}
}

private static boolean isNotLeaderException(final Exception e) {
return e instanceof SCMException && ((SCMException) e).getResult().equals(
SCMException.ResultCodes.SCM_NOT_LEADER);
}

protected void processPipelineReport(PipelineReport report,
DatanodeDetails dn, EventPublisher publisher)
throws IOException, TimeoutException {
final PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
final Pipeline pipeline = pipelineManager.getPipeline(pipelineID);

setReportedDatanode(pipeline, dn);
setPipelineLeaderId(report, pipeline, dn);
Expand Down

0 comments on commit 12419fa

Please sign in to comment.