Skip to content

Commit

Permalink
HDDS-11285. cli to trigger quota repair and status (apache#7104)
Browse files Browse the repository at this point in the history
  • Loading branch information
sumitagrawl authored Sep 5, 2024
1 parent 2e33978 commit 3e1188a
Show file tree
Hide file tree
Showing 12 changed files with 505 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ public static boolean isReadOnly(
case PrintCompactionLogDag:
case GetSnapshotInfo:
case GetServerDefaults:
case GetQuotaRepairStatus:
case StartQuotaRepair:
return true;
case CreateVolume:
case SetVolumeProperty:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,4 +1153,17 @@ boolean setSafeMode(SafeModeAction action, boolean isChecked)
* @throws IOException
*/
OzoneFsServerDefaults getServerDefaults() throws IOException;

/**
* Get status of last triggered quota repair in OM.
* @return String
* @throws IOException
*/
String getQuotaRepairStatus() throws IOException;

/**
* start quota repair in OM.
* @throws IOException
*/
void startQuotaRepair(List<String> buckets) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2579,6 +2579,30 @@ public OzoneFsServerDefaults getServerDefaults()
serverDefaultsResponse.getServerDefaults());
}

@Override
public String getQuotaRepairStatus() throws IOException {
OzoneManagerProtocolProtos.GetQuotaRepairStatusRequest quotaRepairStatusRequest =
OzoneManagerProtocolProtos.GetQuotaRepairStatusRequest.newBuilder()
.build();

OMRequest omRequest = createOMRequest(Type.GetQuotaRepairStatus)
.setGetQuotaRepairStatusRequest(quotaRepairStatusRequest).build();

OzoneManagerProtocolProtos.GetQuotaRepairStatusResponse quotaRepairStatusResponse
= handleError(submitRequest(omRequest)).getGetQuotaRepairStatusResponse();
return quotaRepairStatusResponse.getStatus();
}

@Override
public void startQuotaRepair(List<String> buckets) throws IOException {
OzoneManagerProtocolProtos.StartQuotaRepairRequest startQuotaRepairRequest =
OzoneManagerProtocolProtos.StartQuotaRepairRequest.newBuilder()
.build();
OMRequest omRequest = createOMRequest(Type.StartQuotaRepair)
.setStartQuotaRepairRequest(startQuotaRepairRequest).build();
handleError(submitRequest(omRequest));
}

private SafeMode toProtoBuf(SafeModeAction action) {
switch (action) {
case ENTER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@
import org.apache.hadoop.ozone.debug.DBScanner;
import org.apache.hadoop.ozone.debug.RDBParser;
import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.ozone.repair.OzoneRepair;
import org.apache.hadoop.ozone.repair.RDBRepair;
import org.apache.hadoop.ozone.repair.TransactionInfoRepair;
import org.apache.hadoop.ozone.repair.quota.QuotaRepair;
import org.apache.hadoop.ozone.repair.quota.QuotaStatus;
import org.apache.hadoop.ozone.repair.quota.QuotaTrigger;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -38,6 +43,7 @@
import java.util.regex.Pattern;

import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -130,4 +136,28 @@ private String[] parseScanOutput(String output) throws IOException {
throw new IllegalStateException("Failed to scan and find raft's highest term and index from TransactionInfo table");
}

@Test
public void testQuotaRepair() throws Exception {
CommandLine cmd = new CommandLine(new OzoneRepair()).addSubcommand(new CommandLine(new QuotaRepair())
.addSubcommand(new QuotaStatus()).addSubcommand(new QuotaTrigger()));

String[] args = new String[] {"quota", "status", "--service-host", conf.get(OZONE_OM_ADDRESS_KEY)};
int exitCode = cmd.execute(args);
assertEquals(0, exitCode);
args = new String[] {"quota", "start", "--service-host", conf.get(OZONE_OM_ADDRESS_KEY)};
exitCode = cmd.execute(args);
assertEquals(0, exitCode);
GenericTestUtils.waitFor(() -> {
out.reset();
// verify quota trigger is completed having non-zero lastRunFinishedTime
String[] targs = new String[]{"quota", "status", "--service-host", conf.get(OZONE_OM_ADDRESS_KEY)};
cmd.execute(targs);
try {
return !out.toString(DEFAULT_ENCODING).contains("\"lastRunFinishedTime\":\"\"");
} catch (Exception ex) {
// do nothing
}
return false;
}, 1000, 10000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ enum Type {
ListOpenFiles = 132;
QuotaRepair = 133;
GetServerDefaults = 134;
GetQuotaRepairStatus = 135;
StartQuotaRepair = 136;
}

enum SafeMode {
Expand Down Expand Up @@ -291,6 +293,8 @@ message OMRequest {
optional ListOpenFilesRequest ListOpenFilesRequest = 130;
optional QuotaRepairRequest QuotaRepairRequest = 131;
optional ServerDefaultsRequest ServerDefaultsRequest = 132;
optional GetQuotaRepairStatusRequest GetQuotaRepairStatusRequest = 133;
optional StartQuotaRepairRequest StartQuotaRepairRequest = 134;
}

message OMResponse {
Expand Down Expand Up @@ -419,6 +423,8 @@ message OMResponse {
optional ListOpenFilesResponse ListOpenFilesResponse = 133;
optional QuotaRepairResponse QuotaRepairResponse = 134;
optional ServerDefaultsResponse ServerDefaultsResponse = 135;
optional GetQuotaRepairStatusResponse GetQuotaRepairStatusResponse = 136;
optional StartQuotaRepairResponse StartQuotaRepairResponse = 137;
}

enum Status {
Expand Down Expand Up @@ -2226,6 +2232,17 @@ message ServerDefaultsResponse {
required FsServerDefaultsProto serverDefaults = 1;
}

message GetQuotaRepairStatusRequest {
}
message GetQuotaRepairStatusResponse {
optional string status = 1;
}
message StartQuotaRepairRequest {
repeated string buckets = 1;
}
message StartQuotaRepairResponse {
}

message OMLockDetailsProto {
optional bool isLockAcquired = 1;
optional uint64 waitLockNanos = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.hadoop.ozone.om.s3.S3SecretCacheProvider;
import org.apache.hadoop.ozone.om.s3.S3SecretStoreProvider;
import org.apache.hadoop.ozone.om.service.OMRangerBGSyncService;
import org.apache.hadoop.ozone.om.service.QuotaRepairTask;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
Expand Down Expand Up @@ -4751,6 +4752,18 @@ public OzoneFsServerDefaults getServerDefaults() {
return serverDefaults;
}

@Override
public String getQuotaRepairStatus() throws IOException {
checkAdminUserPrivilege("quota repair status");
return QuotaRepairTask.getStatus();
}

@Override
public void startQuotaRepair(List<String> buckets) throws IOException {
checkAdminUserPrivilege("start quota repair");
new QuotaRepairTask(this).repair(buckets);
}

/**
* Write down Layout version of a finalized feature to DB on finalization.
* @param lvm OMLayoutVersionManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.UncheckedIOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -48,6 +49,7 @@
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
Expand All @@ -74,27 +76,31 @@ public class QuotaRepairTask {
private static final int TASK_THREAD_CNT = 3;
private static final AtomicBoolean IN_PROGRESS = new AtomicBoolean(false);
private static final RepairStatus REPAIR_STATUS = new RepairStatus();
private static final AtomicLong RUN_CNT = new AtomicLong(0);
private final OzoneManager om;
private final AtomicLong runCount = new AtomicLong(0);
private ExecutorService executor;
public QuotaRepairTask(OzoneManager ozoneManager) {
this.om = ozoneManager;
}

public CompletableFuture<Boolean> repair() throws Exception {
public CompletableFuture<Boolean> repair() throws IOException {
return repair(Collections.emptyList());
}

public CompletableFuture<Boolean> repair(List<String> buckets) throws IOException {
// lock in progress operation and reject any other
if (!IN_PROGRESS.compareAndSet(false, true)) {
LOG.info("quota repair task already running");
return CompletableFuture.supplyAsync(() -> false);
throw new OMException("Quota repair is already running", OMException.ResultCodes.QUOTA_ERROR);
}
REPAIR_STATUS.reset(runCount.get() + 1);
return CompletableFuture.supplyAsync(() -> repairTask());
REPAIR_STATUS.reset(RUN_CNT.get() + 1);
return CompletableFuture.supplyAsync(() -> repairTask(buckets));
}

public static String getStatus() {
return REPAIR_STATUS.toString();
}
private boolean repairTask() {
private boolean repairTask(List<String> buckets) {
LOG.info("Starting quota repair task {}", REPAIR_STATUS);
OMMetadataManager activeMetaManager = null;
try {
Expand All @@ -104,7 +110,7 @@ private boolean repairTask() {
= OzoneManagerProtocolProtos.QuotaRepairRequest.newBuilder();
// repair active db
activeMetaManager = createActiveDBCheckpoint(om.getMetadataManager(), om.getConfiguration());
repairActiveDb(activeMetaManager, builder);
repairActiveDb(activeMetaManager, builder, buckets);

// TODO: repair snapshots for quota

Expand All @@ -116,12 +122,12 @@ private boolean repairTask() {
.setClientId(clientId.toString())
.build();
OzoneManagerProtocolProtos.OMResponse response = submitRequest(omRequest, clientId);
if (response != null && !response.getSuccess()) {
if (response != null && response.getSuccess()) {
REPAIR_STATUS.updateStatus(builder, om.getMetadataManager());
} else {
LOG.error("update quota repair count response failed");
REPAIR_STATUS.updateStatus("Response for update DB is failed");
return false;
} else {
REPAIR_STATUS.updateStatus(builder, om.getMetadataManager());
}
} catch (Exception exp) {
LOG.error("quota repair count failed", exp);
Expand All @@ -145,11 +151,15 @@ private boolean repairTask() {

private void repairActiveDb(
OMMetadataManager metadataManager,
OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder) throws Exception {
OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder,
List<String> buckets) throws Exception {
Map<String, OmBucketInfo> nameBucketInfoMap = new HashMap<>();
Map<String, OmBucketInfo> idBucketInfoMap = new HashMap<>();
Map<String, OmBucketInfo> oriBucketInfoMap = new HashMap<>();
prepareAllBucketInfo(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap, metadataManager);
prepareAllBucketInfo(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap, metadataManager, buckets);
if (nameBucketInfoMap.isEmpty()) {
throw new OMException("no matching buckets", OMException.ResultCodes.BUCKET_NOT_FOUND);
}

repairCount(nameBucketInfoMap, idBucketInfoMap, metadataManager);

Expand All @@ -174,31 +184,36 @@ private void repairActiveDb(
}

// update volume to support quota
builder.setSupportVolumeOldQuota(true);
if (buckets.isEmpty()) {
builder.setSupportVolumeOldQuota(true);
} else {
builder.setSupportVolumeOldQuota(false);
}
}

private OzoneManagerProtocolProtos.OMResponse submitRequest(
OzoneManagerProtocolProtos.OMRequest omRequest, ClientId clientId) {
OzoneManagerProtocolProtos.OMRequest omRequest, ClientId clientId) throws Exception {
try {
if (om.isRatisEnabled()) {
OzoneManagerRatisServer server = om.getOmRatisServer();
RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(om.getOmRatisServer().getRaftPeerId())
.setGroupId(om.getOmRatisServer().getRaftGroupId())
.setCallId(runCount.getAndIncrement())
.setCallId(RUN_CNT.getAndIncrement())
.setMessage(Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
return server.submitRequest(omRequest, raftClientRequest);
} else {
RUN_CNT.getAndIncrement();
return om.getOmServerProtocol().submitRequest(
null, omRequest);
}
} catch (ServiceException e) {
LOG.error("repair quota count " + omRequest.getCmdType() + " request failed.", e);
throw e;
}
return null;
}

private OMMetadataManager createActiveDBCheckpoint(
Expand Down Expand Up @@ -228,24 +243,42 @@ private static String cleanTempCheckPointPath(OMMetadataManager omMetaManager) t

private void prepareAllBucketInfo(
Map<String, OmBucketInfo> nameBucketInfoMap, Map<String, OmBucketInfo> idBucketInfoMap,
Map<String, OmBucketInfo> oriBucketInfoMap, OMMetadataManager metadataManager) throws IOException {
Map<String, OmBucketInfo> oriBucketInfoMap, OMMetadataManager metadataManager,
List<String> buckets) throws IOException {
if (!buckets.isEmpty()) {
for (String bucketkey : buckets) {
OmBucketInfo bucketInfo = metadataManager.getBucketTable().get(bucketkey);
if (null == bucketInfo) {
continue;
}
populateBucket(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap, metadataManager, bucketInfo);
}
return;
}
try (TableIterator<String, ? extends Table.KeyValue<String, OmBucketInfo>>
iterator = metadataManager.getBucketTable().iterator()) {
while (iterator.hasNext()) {
Table.KeyValue<String, OmBucketInfo> entry = iterator.next();
OmBucketInfo bucketInfo = entry.getValue();
String bucketNameKey = buildNamePath(bucketInfo.getVolumeName(),
bucketInfo.getBucketName());
oriBucketInfoMap.put(bucketNameKey, bucketInfo.copyObject());
bucketInfo.incrUsedNamespace(-bucketInfo.getUsedNamespace());
bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes());
nameBucketInfoMap.put(bucketNameKey, bucketInfo);
idBucketInfoMap.put(buildIdPath(metadataManager.getVolumeId(bucketInfo.getVolumeName()),
bucketInfo.getObjectID()), bucketInfo);
populateBucket(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap, metadataManager, bucketInfo);
}
}
}

private static void populateBucket(
Map<String, OmBucketInfo> nameBucketInfoMap, Map<String, OmBucketInfo> idBucketInfoMap,
Map<String, OmBucketInfo> oriBucketInfoMap, OMMetadataManager metadataManager,
OmBucketInfo bucketInfo) throws IOException {
String bucketNameKey = buildNamePath(bucketInfo.getVolumeName(),
bucketInfo.getBucketName());
oriBucketInfoMap.put(bucketNameKey, bucketInfo.copyObject());
bucketInfo.incrUsedNamespace(-bucketInfo.getUsedNamespace());
bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes());
nameBucketInfoMap.put(bucketNameKey, bucketInfo);
idBucketInfoMap.put(buildIdPath(metadataManager.getVolumeId(bucketInfo.getVolumeName()),
bucketInfo.getObjectID()), bucketInfo);
}

private boolean isChange(OmBucketInfo lBucketInfo, OmBucketInfo rBucketInfo) {
if (lBucketInfo.getUsedNamespace() != rBucketInfo.getUsedNamespace()
|| lBucketInfo.getUsedBytes() != rBucketInfo.getUsedBytes()) {
Expand Down Expand Up @@ -468,8 +501,9 @@ public String toString() {
}
Map<String, Object> status = new HashMap<>();
status.put("taskId", taskId);
status.put("lastRunStartTime", lastRunStartTime);
status.put("lastRunFinishedTime", lastRunFinishedTime);
status.put("lastRunStartTime", lastRunStartTime > 0 ? new java.util.Date(lastRunStartTime).toString() : "");
status.put("lastRunFinishedTime", lastRunFinishedTime > 0 ? new java.util.Date(lastRunFinishedTime).toString()
: "");
status.put("errorMsg", errorMsg);
status.put("bucketCountDiffMap", bucketCountDiffMap);
try {
Expand Down
Loading

0 comments on commit 3e1188a

Please sign in to comment.