Skip to content

Commit

Permalink
Pipe: Fixed the bug that CN cannot sense drop pipe failure in meta sy…
Browse files Browse the repository at this point in the history
…nc and may lead to constantly skip of drop pipe (#12059)
  • Loading branch information
Caideyipi authored Feb 21, 2024
1 parent f35b2a5 commit 4ca47c7
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 14 deletions.
14 changes: 7 additions & 7 deletions integration-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@
<artifactId>integration-test</artifactId>
<name>IoTDB: Integration-Test</name>
<properties>
<highPerformanceMode.configNodeConsensus>Ratis</highPerformanceMode.configNodeConsensus>
<highPerformanceMode.configNodeNumber>1</highPerformanceMode.configNodeNumber>
<highPerformanceMode.dataNodeNumber>3</highPerformanceMode.dataNodeNumber>
<highPerformanceMode.dataRegionConsensus>IoT</highPerformanceMode.dataRegionConsensus>
<highPerformanceMode.dataRegionReplicaNumber>2</highPerformanceMode.dataRegionReplicaNumber>
<highPerformanceMode.schemaRegionConsensus>Ratis</highPerformanceMode.schemaRegionConsensus>
<highPerformanceMode.schemaRegionReplicaNumber>3</highPerformanceMode.schemaRegionReplicaNumber>
<integrationTest.excludedGroups/>
<integrationTest.forkCount>1</integrationTest.forkCount>
<integrationTest.includedGroups/>
Expand All @@ -60,6 +53,13 @@
<scalableSingleNodeMode.dataRegionReplicaNumber>1</scalableSingleNodeMode.dataRegionReplicaNumber>
<scalableSingleNodeMode.schemaRegionConsensus>Ratis</scalableSingleNodeMode.schemaRegionConsensus>
<scalableSingleNodeMode.schemaRegionReplicaNumber>1</scalableSingleNodeMode.schemaRegionReplicaNumber>
<highPerformanceMode.configNodeConsensus>Ratis</highPerformanceMode.configNodeConsensus>
<highPerformanceMode.configNodeNumber>1</highPerformanceMode.configNodeNumber>
<highPerformanceMode.dataNodeNumber>3</highPerformanceMode.dataNodeNumber>
<highPerformanceMode.dataRegionConsensus>IoT</highPerformanceMode.dataRegionConsensus>
<highPerformanceMode.dataRegionReplicaNumber>2</highPerformanceMode.dataRegionReplicaNumber>
<highPerformanceMode.schemaRegionConsensus>Ratis</highPerformanceMode.schemaRegionConsensus>
<highPerformanceMode.schemaRegionReplicaNumber>3</highPerformanceMode.schemaRegionReplicaNumber>
<strongConsistencyClusterMode.configNodeConsensus>Ratis</strongConsistencyClusterMode.configNodeConsensus>
<strongConsistencyClusterMode.configNodeNumber>3</strongConsistencyClusterMode.configNodeNumber>
<strongConsistencyClusterMode.dataNodeNumber>3</strongConsistencyClusterMode.dataNodeNumber>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ public void testConcurrentlyCreatePipeOfSameName() throws Exception {
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successCount.updateAndGet(v -> v + 1);
successCount.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -752,7 +752,7 @@ public void testConcurrentlyCreatePipeOfSameName() throws Exception {
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
TSStatus status = client.dropPipe("p1");
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successCount.updateAndGet(v -> v + 1);
successCount.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -771,7 +771,8 @@ public void testConcurrentlyCreatePipeOfSameName() throws Exception {
t.join();
}

Assert.assertEquals(10, successCount.get());
// Assert at least 1 drop operation succeeds
Assert.assertTrue(successCount.get() >= 1);
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,7 @@ public void checkBeforeDropPipe(String pipeName) {
private void checkBeforeDropPipeInternal(String pipeName) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Check before drop pipe {}, pipe exists: {}.",
pipeName,
isPipeExisted(pipeName) ? "true" : "false");
"Check before drop pipe {}, pipe exists: {}.", pipeName, isPipeExisted(pipeName));
}
// No matter whether the pipe exists, we allow the drop operation executed on all nodes to
// ensure the consistency.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,14 @@ private List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(
dropPipe(metaInAgent.getStaticMeta().getPipeName());
}
} catch (Exception e) {
// Do not record the error messages for the pipes don't exist on coordinator.
// Report the exception message for CN to sense the failure of meta sync
final String errorMessage =
String.format(
"Failed to handle pipe meta changes for %s, because %s", pipeName, e.getMessage());
LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
exceptionMessages.add(
new TPushPipeMetaRespExceptionMessage(
pipeName, errorMessage, System.currentTimeMillis()));
}
}

Expand Down

0 comments on commit 4ca47c7

Please sign in to comment.