Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8550] Make Hudi 1.x write timeline to a dedicated timeline folder under .hoodie #12288

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b282946
[HUDI-8550] Make Hudi 1.x write timeline to a dedicated timeline fold…
balaji-varadarajan-ai Nov 6, 2024
eeb0727
More cleanup
balaji-varadarajan-ai Nov 18, 2024
08cb685
Fix test
balaji-varadarajan-ai Nov 19, 2024
678fb74
Fix test failures
Nov 19, 2024
7ed1760
More test fixes
Nov 20, 2024
cafa794
Fix bug in file slice after downgrade. Fix more tests.
Nov 21, 2024
359b769
Address review comments
Nov 21, 2024
0bdb224
pass timeline path when creating meta file
codope Nov 21, 2024
48e93de
update the timeline layout for meta client table config reloading
danny0405 Nov 21, 2024
e4c033b
Fix one more test
Nov 21, 2024
05d3550
Make Timeline folder a table property
Nov 21, 2024
b0ef6d7
Make Archive folder to be under timeline folder as was requested
Nov 22, 2024
99ea8bd
Fix compilation
Nov 22, 2024
f38510d
Fix compilation
Nov 22, 2024
a90e9ea
handle active timeline migration in upgrade
codope Nov 22, 2024
32526f6
fix fsview, timeline utils tests
codope Nov 22, 2024
5cdf176
fix more tests
codope Nov 22, 2024
f8e6683
fix pending restore and meta sync tests
codope Nov 22, 2024
b4c1af8
fix a couple hive sync test
codope Nov 22, 2024
cf31e85
Ensure history dir is created under timeline dir
Nov 22, 2024
0fb08c9
Fix flink test
Nov 22, 2024
abcb2e4
Fix integration test failure
Nov 22, 2024
e056999
Fix bundle validation failure
Nov 22, 2024
dba48a7
Fix more integration test failures
Nov 23, 2024
8913d2e
Add debug for integration tests
Nov 23, 2024
6ade2cb
Fix nullpointer exception
Nov 23, 2024
ccc086c
make metaClient protected
codope Nov 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/demo/get_min_commit_time_cow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

MIN_COMMIT_TIME=`hdfs dfs -ls -t /user/hive/warehouse/stock_ticks_cow/.hoodie/*.commit | head -1 | awk -F'/' ' { print $7 } ' | awk -F'.' ' { print $1 } '`
MIN_COMMIT_TIME=`hdfs dfs -ls -t /user/hive/warehouse/stock_ticks_cow/.hoodie/timeline/*.commit | head -1 | awk -F'/' ' { print $7 } ' | awk -F'.' ' { print $1 } '`
echo $MIN_COMMIT_TIME;
2 changes: 1 addition & 1 deletion docker/demo/get_min_commit_time_mor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

MIN_COMMIT_TIME=`hdfs dfs -ls -t /user/hive/warehouse/stock_ticks_mor/.hoodie/*.deltacommit | head -1 | awk -F'/' ' { print $7 } ' | awk -F'.' ' { print $1 } '`
MIN_COMMIT_TIME=`hdfs dfs -ls -t /user/hive/warehouse/stock_ticks_mor/.hoodie/timeline/*.deltacommit | head -1 | awk -F'/' ' { print $7 } ' | awk -F'.' ' { print $1 } '`
echo $MIN_COMMIT_TIME;
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public String showCommits(
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
StoragePath basePath = metaClient.getBasePath();
StoragePath archivePath =
new StoragePath(metaClient.getArchivePath() + "/.commits_.archive*");
new StoragePath(metaClient.getArchivePath(), ".commits_.archive*");
List<StoragePathInfo> pathInfoList =
HoodieStorageUtils.getStorage(basePath, HoodieCLI.conf).globEntries(archivePath);
List<Comparable[]> allCommits = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public String exportInstants(
throws Exception {

final StoragePath basePath = HoodieCLI.getTableMetaClient().getBasePath();
final StoragePath archivePath = new StoragePath(HoodieCLI.getTableMetaClient().getArchivePath());
final StoragePath archivePath = HoodieCLI.getTableMetaClient().getArchivePath();
final Set<String> actionSet = new HashSet<String>(Arrays.asList(filter.split(",")));
int numExports = limit == -1 ? Integer.MAX_VALUE : limit;
int numCopied = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,12 @@ public void removeCorruptedPendingCleanAction() {
CleanerUtils.getCleanerPlan(client, instant);
} catch (AvroRuntimeException e) {
LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
TimelineUtils.deleteInstantFile(client.getStorage(), client.getMetaPath(),
TimelineUtils.deleteInstantFile(client.getStorage(), client.getActiveTimelinePath(),
instant, client.getInstantFileNameGenerator());
} catch (IOException ioe) {
if (ioe.getMessage().contains("Not an Avro data file")) {
LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
TimelineUtils.deleteInstantFile(client.getStorage(), client.getMetaPath(),
TimelineUtils.deleteInstantFile(client.getStorage(), client.getActiveTimelinePath(),
instant, client.getInstantFileNameGenerator());
} else {
throw new HoodieIOException(ioe.getMessage(), ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ public String showActive(
HoodieTableMetaClient mtMetaClient = getMetadataTableMetaClient(metaClient);
return printTimelineInfoWithMetadataTable(
metaClient.getActiveTimeline(), mtMetaClient.getActiveTimeline(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getMetaPath()),
getInstantInfoFromTimeline(mtMetaClient, mtMetaClient.getStorage(), mtMetaClient.getMetaPath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
getInstantInfoFromTimeline(mtMetaClient, mtMetaClient.getStorage(), mtMetaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo);
}
return printTimelineInfo(
metaClient.getActiveTimeline(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getMetaPath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo);
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -115,7 +115,7 @@ public String showIncomplete(
try {
return printTimelineInfo(
metaClient.getActiveTimeline().filterInflightsAndRequested(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getMetaPath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo);
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -137,7 +137,7 @@ public String metadataShowActive(
try {
return printTimelineInfo(
metaClient.getActiveTimeline(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getMetaPath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, false);
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -159,7 +159,7 @@ public String metadataShowIncomplete(
try {
return printTimelineInfo(
metaClient.getActiveTimeline().filterInflightsAndRequested(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getMetaPath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, false);
} catch (IOException e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void cleanUp() throws IOException {
@Test
public void testAddPartitionMetaWithDryRun() throws IOException {
// create commit instant
Files.createFile(Paths.get(tablePath, ".hoodie", "100.commit"));
Files.createFile(Paths.get(tablePath, ".hoodie/timeline/", "100.commit"));

// create partition path
String partition1 = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void testCreateWithSpecifiedValues() {
assertTrue(ShellEvaluationResultUtil.isSuccess(result));
assertEquals("Metadata for table " + tableName + " loaded", result.toString());
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
assertEquals(metaPath + StoragePath.SEPARATOR + "archive", client.getArchivePath());
assertEquals(new StoragePath(metaPath, "archive"), client.getArchivePath());
assertEquals(tablePath, client.getBasePath().toString());
assertEquals(metaPath, client.getMetaPath().toString());
assertEquals(HoodieTableVersion.SIX, client.getTableConfig().getTableVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public List<RenameOpResult> unscheduleCompactionPlan(String compactionInstant, b
if (!dryRun) {
// Overwrite compaction request with empty compaction operations
HoodieInstant inflight = metaClient.createNewInstant(State.INFLIGHT, COMPACTION_ACTION, compactionInstant);
StoragePath inflightPath = new StoragePath(metaClient.getMetaPath(), metaClient.getInstantFileNameGenerator().getFileName(inflight));
StoragePath inflightPath = new StoragePath(metaClient.getActiveTimelinePath(), metaClient.getInstantFileNameGenerator().getFileName(inflight));
if (metaClient.getStorage().exists(inflightPath)) {
// We need to rollback data-files because of this inflight compaction before unscheduling
throw new IllegalStateException("Please rollback the inflight compaction before unscheduling");
Expand All @@ -122,7 +122,7 @@ public List<RenameOpResult> unscheduleCompactionPlan(String compactionInstant, b
// TODO: Add a rollback instant but for compaction
HoodieInstant instant = metaClient.createNewInstant(State.REQUESTED, COMPACTION_ACTION, compactionInstant);
boolean deleted = metaClient.getStorage().deleteFile(
new StoragePath(metaClient.getMetaPath(), instantFileNameGenerator.getFileName(instant)));
new StoragePath(metaClient.getActiveTimelinePath(), instantFileNameGenerator.getFileName(instant)));
ValidationUtils.checkArgument(deleted, "Unable to delete compaction instant.");
}
return new ArrayList<>();
Expand Down Expand Up @@ -159,7 +159,7 @@ public List<RenameOpResult> unscheduleCompactionFileId(HoodieFileGroupId fgId, b
HoodieCompactionPlan.newBuilder().setOperations(newOps).setExtraMetadata(plan.getExtraMetadata()).build();
HoodieInstant inflight =
metaClient.createNewInstant(State.INFLIGHT, COMPACTION_ACTION, compactionOperationWithInstant.getLeft());
StoragePath inflightPath = new StoragePath(metaClient.getMetaPath(), instantFileNameGenerator.getFileName(inflight));
StoragePath inflightPath = new StoragePath(metaClient.getActiveTimelinePath(), instantFileNameGenerator.getFileName(inflight));
if (metaClient.getStorage().exists(inflightPath)) {
// revert if in inflight state
metaClient.getActiveTimeline().revertInstantFromInflightToRequested(inflight);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.versioning.v1.ActiveTimelineV1;
import org.apache.hudi.common.table.timeline.versioning.v1.CommitMetadataSerDeV1;
import org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2;
import org.apache.hudi.common.table.timeline.versioning.v2.CommitMetadataSerDeV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -47,16 +48,21 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static org.apache.hudi.common.table.timeline.HoodieInstant.UNDERSCORE;
import static org.apache.hudi.common.table.timeline.TimelineLayout.TIMELINE_LAYOUT_V1;
import static org.apache.hudi.common.table.timeline.TimelineLayout.TIMELINE_LAYOUT_V2;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
Expand All @@ -71,6 +77,28 @@ public class EightToSevenDowngradeHandler implements DowngradeHandler {
private static final Logger LOG = LoggerFactory.getLogger(EightToSevenDowngradeHandler.class);
private static final Set<String> SUPPORTED_METADATA_PARTITION_PATHS = getSupportedMetadataPartitionPaths();

/**
* Extract Epoch time from completion time string
* @param instant : HoodieInstant
* @return
*/
public static long convertCompletionTimeToEpoch(HoodieInstant instant) {
try {
String completionTime = instant.getCompletionTime();
// In Java 8, no direct API to convert to epoch in millis.
// Strip off millis
String completionTimeInSecs = completionTime.substring(0, completionTime.length() - 3);
DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
ZoneId zoneId = ZoneId.systemDefault();
LocalDateTime ldtInSecs = LocalDateTime.parse(completionTimeInSecs, inputFormatter);
long millis = Long.parseLong(completionTime.substring(completionTime.length() - 3));
return ldtInSecs.atZone(zoneId).toEpochSecond() * 1000 + millis;
} catch (Exception e) {
LOG.warn("Failed to parse completion time string for instant " + instant, e);
return -1;
}
}

@Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
Expand All @@ -79,33 +107,48 @@ public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEng
UpgradeDowngradeUtils.syncCompactionRequestedFileToAuxiliaryFolder(table);

HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(context.getStorageConf().newInstance()).setBasePath(config.getBasePath()).build();
List<HoodieInstant> instants = metaClient.getActiveTimeline().getInstants();
List<HoodieInstant> instants = new ArrayList<>();
try {
// We need to move all the instants - not just completed ones.
instants = metaClient.scanHoodieInstantsFromFileSystem(metaClient.getActiveTimelinePath(),
ActiveTimelineV2.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, false);
} catch (IOException ioe) {
LOG.error("Failed to get instants from filesystem", ioe);
throw new HoodieIOException("Failed to get instants from filesystem", ioe);
}

if (!instants.isEmpty()) {
InstantFileNameGenerator instantFileNameGenerator = metaClient.getInstantFileNameGenerator();
CommitMetadataSerDeV2 commitMetadataSerDeV2 = new CommitMetadataSerDeV2();
CommitMetadataSerDeV1 commitMetadataSerDeV1 = new CommitMetadataSerDeV1();
ActiveTimelineV1 activeTimelineV1 = new ActiveTimelineV1(metaClient);
String tmpFilePrefix = "temp_commit_file_for_eight_to_seven_downgrade_";
context.map(instants, instant -> {
String fileName = instantFileNameGenerator.getFileName(instant);
// Rename the metadata file name from the ${instant_time}_${completion_time}.action[.state] format in version 1.x to the ${instant_time}.action[.state] format in version 0.x.
StoragePath fromPath = new StoragePath(TIMELINE_LAYOUT_V2.getTimelinePathProvider().getActiveTimelinePath(
metaClient.getTableConfig(), metaClient.getBasePath()), fileName);
long modificationTime = instant.isCompleted() ? convertCompletionTimeToEpoch(instant) : -1;
StoragePath toPath = new StoragePath(TIMELINE_LAYOUT_V1.getTimelinePathProvider().getActiveTimelinePath(
metaClient.getTableConfig(), metaClient.getBasePath()), fileName.replaceAll(UNDERSCORE + "\\d+", ""));
boolean success = true;
if (fileName.contains(UNDERSCORE)) {
try {
// Rename the metadata file name from the ${instant_time}_${completion_time}.action[.state] format in version 1.x to the ${instant_time}.action[.state] format in version 0.x.
StoragePath fromPath = new StoragePath(metaClient.getMetaPath(), fileName);
StoragePath toPath = new StoragePath(metaClient.getMetaPath(), fileName.replaceAll(UNDERSCORE + "\\d+", ""));
boolean success = true;
if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) || instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
HoodieCommitMetadata commitMetadata =
commitMetadataSerDeV2.deserialize(instant, metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
Option<byte[]> data = commitMetadataSerDeV1.serialize(commitMetadata);
// Create a temporary file to store the json metadata.
String tmpFileName = tmpFilePrefix + UUID.randomUUID() + ".json";
StoragePath tmpPath = new StoragePath(metaClient.getTempFolderPath(), tmpFileName);
String tmpPathStr = tmpPath.toUri().toString();
activeTimelineV1.createFileInMetaPath(tmpPathStr, data, true);
// Note. this is a 2 step. First we create the V1 commit file and then delete file. If it fails in the middle, rerunning downgrade will be idempotent.
metaClient.getStorage().deleteFile(toPath); // First delete if it was created by previous failed downgrade.
success = metaClient.getStorage().rename(tmpPath, toPath);
String toPathStr = toPath.toUri().toString();
activeTimelineV1.createFileInMetaPath(toPathStr, data, true);
/**
* When we downgrade the table from 1.0 to 0.x, it is important to set the modification
* timestamp of the 0.x completed instant to match the completion time of the
* corresponding 1.x instant. Otherwise, log files in previous file slices could
* be wrongly attributed to latest file slice for 1.0 readers.
* (see HoodieFileGroup.getBaseInstantTime)
*/
if (modificationTime > 0) {
metaClient.getStorage().setModificationTime(toPath, modificationTime);
}
metaClient.getStorage().deleteFile(fromPath);
} else {
success = metaClient.getStorage().rename(fromPath, toPath);
Expand All @@ -119,6 +162,8 @@ public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEng
LOG.error("Can not to complete the downgrade from version eight to version seven. The reason for failure is {}", e.getMessage());
throw new HoodieException(e);
}
} else {
success = metaClient.getStorage().rename(fromPath, toPath);
}
return false;
}, instants.size());
Expand Down
Loading
Loading