diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index cd6002ceee2..f185addf6b8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -496,6 +496,52 @@ public void testOfsHSync(boolean incrementalChunkList) throws Exception { } } + @Test + public void testHSyncOpenKeyCommitAfterExpiry() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s/", + OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + final Path key1 = new Path("hsync-key"); + final Path key2 = new Path("key2"); + + try (FileSystem fs = FileSystem.get(CONF)) { + // Create key1 with hsync + try (FSDataOutputStream os = fs.create(key1, true)) { + os.write(1); + os.hsync(); + // Create key2 without hsync + try (FSDataOutputStream os1 = fs.create(key2, true)) { + os1.write(1); + // There should be 2 key in openFileTable + assertThat(2 == getOpenKeyInfo(BUCKET_LAYOUT).size()); + // One key will be in fileTable as hsynced + assertThat(1 == getKeyInfo(BUCKET_LAYOUT).size()); + + // Resume openKeyCleanupService + openKeyCleanupService.resume(); + // Verify hsync openKey gets committed eventually + // Key without hsync is deleted + GenericTestUtils.waitFor(() -> + 0 == getOpenKeyInfo(BUCKET_LAYOUT).size(), 1000, 12000); + // Verify only one key is still present in fileTable + assertThat(1 == getKeyInfo(BUCKET_LAYOUT).size()); + + // Clean up + assertTrue(fs.delete(key1, false)); + waitForEmptyDeletedTable(); + } catch (OMException ex) { + assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, ex.getResult()); + } + } catch (OMException ex) { + assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, ex.getResult()); + } finally { + openKeyCleanupService.suspend(); + } + } + } + @Test public void testHSyncDeletedKey() throws Exception { // Verify that a key can't be successfully hsync'ed again after it's deleted, @@ -595,6 +641,21 @@ private List getOpenKeyInfo(BucketLayout bucketLayout) { return omKeyInfo; } + private List getKeyInfo(BucketLayout bucketLayout) { + List omKeyInfo = new ArrayList<>(); + + Table openFileTable = + cluster.getOzoneManager().getMetadataManager().getKeyTable(bucketLayout); + try (TableIterator> + iterator = openFileTable.iterator()) { + while (iterator.hasNext()) { + omKeyInfo.add(iterator.next().getValue()); + } + } catch (Exception e) { + } + return omKeyInfo; + } + @Test public void testUncommittedBlocks() throws Exception { waitForEmptyDeletedTable(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java index c0d958f6121..6d53e48a0fd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.om.ExpiredOpenKeys; import org.apache.hadoop.ozone.om.KeyManager; @@ -77,7 +78,7 @@ public class OpenKeyCleanupService extends BackgroundService { private final Duration leaseThreshold; private final int cleanupLimitPerTask; private final AtomicLong submittedOpenKeyCount; - private final AtomicLong runCount; + private final AtomicLong callId; private final AtomicBoolean suspended; public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout, @@ -112,20 +113,10 @@ public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout, OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT); this.submittedOpenKeyCount = new AtomicLong(0); - this.runCount = new AtomicLong(0); + this.callId = new AtomicLong(0); this.suspended = new AtomicBoolean(false); } - /** - * Returns the number of times this Background service has run. - * - * @return Long, run count. - */ - @VisibleForTesting - public long getRunCount() { - return runCount.get(); - } - /** * Suspend the service (for testing). */ @@ -189,7 +180,6 @@ public BackgroundTaskResult call() throws Exception { if (!shouldRun()) { return BackgroundTaskResult.EmptyTaskResult.newResult(); } - runCount.incrementAndGet(); long startTime = Time.monotonicNow(); final ExpiredOpenKeys expiredOpenKeys; try { @@ -244,6 +234,7 @@ private OMRequest createCommitKeyRequest( .setCmdType(Type.CommitKey) .setCommitKeyRequest(request) .setClientId(clientId.toString()) + .setVersion(ClientVersion.CURRENT_VERSION) .build(); } @@ -265,7 +256,7 @@ private OMRequest createDeleteOpenKeysRequest( private OMResponse submitRequest(OMRequest omRequest) { try { - return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get()); + return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, callId.incrementAndGet()); } catch (ServiceException e) { LOG.error("Open key " + omRequest.getCmdType() + " request failed. Will retry at next run.", e); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java index 014865f919f..ab22b353bd7 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java @@ -166,8 +166,7 @@ public void testCleanupExpiredOpenKeys( // wait for submitted tasks to complete Thread.sleep(SERVICE_INTERVAL); final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount(); - final long oldrunCount = openKeyCleanupService.getRunCount(); - LOG.info("oldkeyCount={}, oldrunCount={}", oldkeyCount, oldrunCount); + LOG.info("oldkeyCount={}", oldkeyCount); final OMMetrics metrics = om.getMetrics(); long numKeyHSyncs = metrics.getNumKeyHSyncs(); @@ -189,9 +188,6 @@ public void testCleanupExpiredOpenKeys( GenericTestUtils.waitFor( () -> openKeyCleanupService.getSubmittedOpenKeyCount() >= oldkeyCount + keyCount, SERVICE_INTERVAL, WAIT_TIME); - GenericTestUtils.waitFor( - () -> openKeyCleanupService.getRunCount() >= oldrunCount + 2, - SERVICE_INTERVAL, WAIT_TIME); waitForOpenKeyCleanup(false, BucketLayout.DEFAULT); waitForOpenKeyCleanup(hsync, BucketLayout.FILE_SYSTEM_OPTIMIZED); @@ -332,8 +328,7 @@ public void testExcludeMPUOpenKeys( // wait for submitted tasks to complete Thread.sleep(SERVICE_INTERVAL); final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount(); - final long oldrunCount = openKeyCleanupService.getRunCount(); - LOG.info("oldMpuKeyCount={}, oldMpuRunCount={}", oldkeyCount, oldrunCount); + LOG.info("oldMpuKeyCount={}", oldkeyCount); final OMMetrics metrics = om.getMetrics(); long numKeyHSyncs = metrics.getNumKeyHSyncs(); @@ -353,13 +348,8 @@ public void testExcludeMPUOpenKeys( BucketLayout.FILE_SYSTEM_OPTIMIZED); openKeyCleanupService.resume(); - - GenericTestUtils.waitFor( - () -> openKeyCleanupService.getRunCount() >= oldrunCount + 2, - SERVICE_INTERVAL, WAIT_TIME); - - // wait for requests to complete - Thread.sleep(SERVICE_INTERVAL); + // wait for openKeyCleanupService to complete at least once + Thread.sleep(SERVICE_INTERVAL * 2); // No expired open keys fetched assertEquals(openKeyCleanupService.getSubmittedOpenKeyCount(), oldkeyCount); @@ -397,8 +387,7 @@ public void testCleanupExpiredOpenMPUPartKeys( // wait for submitted tasks to complete Thread.sleep(SERVICE_INTERVAL); final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount(); - final long oldrunCount = openKeyCleanupService.getRunCount(); - LOG.info("oldMpuKeyCount={}, oldMpuRunCount={}", oldkeyCount, oldrunCount); + LOG.info("oldMpuKeyCount={},", oldkeyCount); final OMMetrics metrics = om.getMetrics(); long numOpenKeysCleaned = metrics.getNumOpenKeysCleaned(); @@ -423,9 +412,6 @@ public void testCleanupExpiredOpenMPUPartKeys( GenericTestUtils.waitFor( () -> openKeyCleanupService.getSubmittedOpenKeyCount() >= oldkeyCount + partCount, SERVICE_INTERVAL, WAIT_TIME); - GenericTestUtils.waitFor( - () -> openKeyCleanupService.getRunCount() >= oldrunCount + 2, - SERVICE_INTERVAL, WAIT_TIME); // No expired MPU parts fetched waitForOpenKeyCleanup(false, BucketLayout.DEFAULT);