Skip to content

Commit

Permalink
HDDS-11807. Make callId different for each request in openKeyCleanupS…
Browse files Browse the repository at this point in the history
…ervice (#7551)
  • Loading branch information
ashishkumar50 authored Dec 16, 2024
1 parent c523825 commit 54f0272
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,52 @@ public void testOfsHSync(boolean incrementalChunkList) throws Exception {
}
}

@Test
public void testHSyncOpenKeyCommitAfterExpiry() throws Exception {
// Set the fs.defaultFS
final String rootPath = String.format("%s://%s/",
OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);

final Path key1 = new Path("hsync-key");
final Path key2 = new Path("key2");

try (FileSystem fs = FileSystem.get(CONF)) {
// Create key1 with hsync
try (FSDataOutputStream os = fs.create(key1, true)) {
os.write(1);
os.hsync();
// Create key2 without hsync
try (FSDataOutputStream os1 = fs.create(key2, true)) {
os1.write(1);
// There should be 2 key in openFileTable
assertThat(2 == getOpenKeyInfo(BUCKET_LAYOUT).size());
// One key will be in fileTable as hsynced
assertThat(1 == getKeyInfo(BUCKET_LAYOUT).size());

// Resume openKeyCleanupService
openKeyCleanupService.resume();
// Verify hsync openKey gets committed eventually
// Key without hsync is deleted
GenericTestUtils.waitFor(() ->
0 == getOpenKeyInfo(BUCKET_LAYOUT).size(), 1000, 12000);
// Verify only one key is still present in fileTable
assertThat(1 == getKeyInfo(BUCKET_LAYOUT).size());

// Clean up
assertTrue(fs.delete(key1, false));
waitForEmptyDeletedTable();
} catch (OMException ex) {
assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, ex.getResult());
}
} catch (OMException ex) {
assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, ex.getResult());
} finally {
openKeyCleanupService.suspend();
}
}
}

@Test
public void testHSyncDeletedKey() throws Exception {
// Verify that a key can't be successfully hsync'ed again after it's deleted,
Expand Down Expand Up @@ -595,6 +641,21 @@ private List<OmKeyInfo> getOpenKeyInfo(BucketLayout bucketLayout) {
return omKeyInfo;
}

private List<OmKeyInfo> getKeyInfo(BucketLayout bucketLayout) {
List<OmKeyInfo> omKeyInfo = new ArrayList<>();

Table<String, OmKeyInfo> openFileTable =
cluster.getOzoneManager().getMetadataManager().getKeyTable(bucketLayout);
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
iterator = openFileTable.iterator()) {
while (iterator.hasNext()) {
omKeyInfo.add(iterator.next().getValue());
}
} catch (Exception e) {
}
return omKeyInfo;
}

@Test
public void testUncommittedBlocks() throws Exception {
waitForEmptyDeletedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.ExpiredOpenKeys;
import org.apache.hadoop.ozone.om.KeyManager;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class OpenKeyCleanupService extends BackgroundService {
private final Duration leaseThreshold;
private final int cleanupLimitPerTask;
private final AtomicLong submittedOpenKeyCount;
private final AtomicLong runCount;
private final AtomicLong callId;
private final AtomicBoolean suspended;

public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout,
Expand Down Expand Up @@ -112,20 +113,10 @@ public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout,
OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT);

this.submittedOpenKeyCount = new AtomicLong(0);
this.runCount = new AtomicLong(0);
this.callId = new AtomicLong(0);
this.suspended = new AtomicBoolean(false);
}

/**
* Returns the number of times this Background service has run.
*
* @return Long, run count.
*/
@VisibleForTesting
public long getRunCount() {
return runCount.get();
}

/**
* Suspend the service (for testing).
*/
Expand Down Expand Up @@ -189,7 +180,6 @@ public BackgroundTaskResult call() throws Exception {
if (!shouldRun()) {
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
runCount.incrementAndGet();
long startTime = Time.monotonicNow();
final ExpiredOpenKeys expiredOpenKeys;
try {
Expand Down Expand Up @@ -244,6 +234,7 @@ private OMRequest createCommitKeyRequest(
.setCmdType(Type.CommitKey)
.setCommitKeyRequest(request)
.setClientId(clientId.toString())
.setVersion(ClientVersion.CURRENT_VERSION)
.build();
}

Expand All @@ -265,7 +256,7 @@ private OMRequest createDeleteOpenKeysRequest(

private OMResponse submitRequest(OMRequest omRequest) {
try {
return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, callId.incrementAndGet());
} catch (ServiceException e) {
LOG.error("Open key " + omRequest.getCmdType()
+ " request failed. Will retry at next run.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ public void testCleanupExpiredOpenKeys(
// wait for submitted tasks to complete
Thread.sleep(SERVICE_INTERVAL);
final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
final long oldrunCount = openKeyCleanupService.getRunCount();
LOG.info("oldkeyCount={}, oldrunCount={}", oldkeyCount, oldrunCount);
LOG.info("oldkeyCount={}", oldkeyCount);

final OMMetrics metrics = om.getMetrics();
long numKeyHSyncs = metrics.getNumKeyHSyncs();
Expand All @@ -189,9 +188,6 @@ public void testCleanupExpiredOpenKeys(
GenericTestUtils.waitFor(
() -> openKeyCleanupService.getSubmittedOpenKeyCount() >= oldkeyCount + keyCount,
SERVICE_INTERVAL, WAIT_TIME);
GenericTestUtils.waitFor(
() -> openKeyCleanupService.getRunCount() >= oldrunCount + 2,
SERVICE_INTERVAL, WAIT_TIME);

waitForOpenKeyCleanup(false, BucketLayout.DEFAULT);
waitForOpenKeyCleanup(hsync, BucketLayout.FILE_SYSTEM_OPTIMIZED);
Expand Down Expand Up @@ -332,8 +328,7 @@ public void testExcludeMPUOpenKeys(
// wait for submitted tasks to complete
Thread.sleep(SERVICE_INTERVAL);
final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
final long oldrunCount = openKeyCleanupService.getRunCount();
LOG.info("oldMpuKeyCount={}, oldMpuRunCount={}", oldkeyCount, oldrunCount);
LOG.info("oldMpuKeyCount={}", oldkeyCount);

final OMMetrics metrics = om.getMetrics();
long numKeyHSyncs = metrics.getNumKeyHSyncs();
Expand All @@ -353,13 +348,8 @@ public void testExcludeMPUOpenKeys(
BucketLayout.FILE_SYSTEM_OPTIMIZED);

openKeyCleanupService.resume();

GenericTestUtils.waitFor(
() -> openKeyCleanupService.getRunCount() >= oldrunCount + 2,
SERVICE_INTERVAL, WAIT_TIME);

// wait for requests to complete
Thread.sleep(SERVICE_INTERVAL);
// wait for openKeyCleanupService to complete at least once
Thread.sleep(SERVICE_INTERVAL * 2);

// No expired open keys fetched
assertEquals(openKeyCleanupService.getSubmittedOpenKeyCount(), oldkeyCount);
Expand Down Expand Up @@ -397,8 +387,7 @@ public void testCleanupExpiredOpenMPUPartKeys(
// wait for submitted tasks to complete
Thread.sleep(SERVICE_INTERVAL);
final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
final long oldrunCount = openKeyCleanupService.getRunCount();
LOG.info("oldMpuKeyCount={}, oldMpuRunCount={}", oldkeyCount, oldrunCount);
LOG.info("oldMpuKeyCount={},", oldkeyCount);

final OMMetrics metrics = om.getMetrics();
long numOpenKeysCleaned = metrics.getNumOpenKeysCleaned();
Expand All @@ -423,9 +412,6 @@ public void testCleanupExpiredOpenMPUPartKeys(
GenericTestUtils.waitFor(
() -> openKeyCleanupService.getSubmittedOpenKeyCount() >= oldkeyCount + partCount,
SERVICE_INTERVAL, WAIT_TIME);
GenericTestUtils.waitFor(
() -> openKeyCleanupService.getRunCount() >= oldrunCount + 2,
SERVICE_INTERVAL, WAIT_TIME);

// No expired MPU parts fetched
waitForOpenKeyCleanup(false, BucketLayout.DEFAULT);
Expand Down

0 comments on commit 54f0272

Please sign in to comment.