Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2959: Optimize the test case of parquet rewriter. #2960

Merged
merged 3 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,13 @@ public class ParquetRewriterTest {
private final IndexCache.CacheStrategy indexCacheStrategy;
private final boolean usingHadoop;

private List<EncryptionTestFile> inputFiles = null;
private List<EncryptionTestFile> inputFiles = Lists.newArrayList();
private String outputFile = null;
private ParquetRewriter rewriter = null;

private final EncryptionTestFile gzipEncryptionTestFileWithoutBloomFilterColumn;
private final EncryptionTestFile uncompressedEncryptionTestFileWithoutBloomFilterColumn;

@Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy = {1}, UsingHadoop = {2}")
public static Object[][] parameters() {
return new Object[][] {
Expand All @@ -121,10 +124,26 @@ public static Object[][] parameters() {
};
}

public ParquetRewriterTest(String writerVersion, String indexCacheStrategy, boolean usingHadoop) {
public ParquetRewriterTest(String writerVersion, String indexCacheStrategy, boolean usingHadoop)
throws IOException {
this.writerVersion = ParquetProperties.WriterVersion.fromString(writerVersion);
this.indexCacheStrategy = IndexCache.CacheStrategy.valueOf(indexCacheStrategy);
this.usingHadoop = usingHadoop;

MessageType testSchema = createSchema();
this.gzipEncryptionTestFileWithoutBloomFilterColumn = new TestFileBuilder(conf, testSchema)
.withNumRecord(numRecord)
.withCodec("GZIP")
.withPageSize(1024)
.withWriterVersion(this.writerVersion)
.build();

this.uncompressedEncryptionTestFileWithoutBloomFilterColumn = new TestFileBuilder(conf, testSchema)
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(this.writerVersion)
.build();
}

private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) throws Exception {
Expand All @@ -141,7 +160,7 @@ private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) throws E
rewriter.processBlocks();
rewriter.close();

// Verify the schema are not changed for the columns not pruned
// Verify the schema is not changed for the columns not pruned
validateSchema();

// Verify codec has been translated
Expand Down Expand Up @@ -179,7 +198,9 @@ public void setUp() {

@Test
public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception {
testSingleInputFileSetup("GZIP");
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(this.gzipEncryptionTestFileWithoutBloomFilterColumn);
}
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand All @@ -190,7 +211,12 @@ public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception {

@Test
public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception {
testMultipleInputFilesSetup();
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(gzipEncryptionTestFileWithoutBloomFilterColumn);
}
if (!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn);
}
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand Down Expand Up @@ -249,7 +275,9 @@ private void testPruneNullifyTranslateCodec(List<Path> inputPaths) throws Except

@Test
public void testPruneNullifyTranslateCodecSingleFile() throws Exception {
testSingleInputFileSetup("GZIP");
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(this.gzipEncryptionTestFileWithoutBloomFilterColumn);
}
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand All @@ -260,7 +288,12 @@ public void testPruneNullifyTranslateCodecSingleFile() throws Exception {

@Test
public void testPruneNullifyTranslateCodecTwoFiles() throws Exception {
testMultipleInputFilesSetup();
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(gzipEncryptionTestFileWithoutBloomFilterColumn);
}
if (!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn);
}
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand Down Expand Up @@ -331,7 +364,9 @@ private void testPruneEncryptTranslateCodec(List<Path> inputPaths) throws Except

@Test
public void testPruneEncryptTranslateCodecSingleFile() throws Exception {
testSingleInputFileSetup("GZIP");
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(gzipEncryptionTestFileWithoutBloomFilterColumn);
}
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand All @@ -342,7 +377,12 @@ public void testPruneEncryptTranslateCodecSingleFile() throws Exception {

@Test
public void testPruneEncryptTranslateCodecTwoFiles() throws Exception {
testMultipleInputFilesSetup();
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
joyCurry30 marked this conversation as resolved.
Show resolved Hide resolved
inputFiles.add(gzipEncryptionTestFileWithoutBloomFilterColumn);
}
if (!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn);
}
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand Down Expand Up @@ -497,7 +537,9 @@ private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws Exception

@Test
public void testNullifyEncryptSingleFile() throws Exception {
testSingleInputFileSetup("GZIP");
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(gzipEncryptionTestFileWithoutBloomFilterColumn);
}
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand All @@ -508,7 +550,12 @@ public void testNullifyEncryptSingleFile() throws Exception {

@Test
public void testNullifyEncryptTwoFiles() throws Exception {
testMultipleInputFilesSetup();
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(gzipEncryptionTestFileWithoutBloomFilterColumn);
}
if (!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn);
}
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand All @@ -520,7 +567,12 @@ public void testNullifyEncryptTwoFiles() throws Exception {

@Test
public void testMergeTwoFilesOnly() throws Exception {
testMultipleInputFilesSetup();
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(gzipEncryptionTestFileWithoutBloomFilterColumn);
}
if (!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn);
}

// Only merge two files but do not change anything.
List<Path> inputPaths = new ArrayList<>();
Expand All @@ -534,7 +586,7 @@ public void testMergeTwoFilesOnly() throws Exception {
rewriter.processBlocks();
rewriter.close();

// Verify the schema are not changed
// Verify the schema is not changed
ParquetMetadata pmd =
ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
Expand Down Expand Up @@ -615,7 +667,9 @@ public void testMergeTwoFilesWithDifferentSchema() throws Exception {

@Test
public void testRewriteFileWithMultipleBlocks() throws Exception {
testSingleInputFileSetup("GZIP", 1024L);
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(this.gzipEncryptionTestFileWithoutBloomFilterColumn);
}
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand All @@ -626,7 +680,7 @@ public void testRewriteFileWithMultipleBlocks() throws Exception {

@Test
public void testPruneSingleColumnTranslateCodecAndEnableBloomFilter() throws Exception {
testSingleInputFileSetupWithBloomFilter("GZIP", "DocId");
testSingleInputFileSetupWithBloomFilter("DocId");
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand All @@ -635,14 +689,14 @@ public void testPruneSingleColumnTranslateCodecAndEnableBloomFilter() throws Exc
testPruneSingleColumnTranslateCodec(inputPaths);

// Verify bloom filters
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters(null);
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters();
Map<ColumnPath, List<BloomFilter>> outputBloomFilters = allOutputBloomFilters(null);
assertEquals(inputBloomFilters, outputBloomFilters);
}

@Test
public void testPruneNullifyTranslateCodecAndEnableBloomFilter() throws Exception {
testSingleInputFileSetupWithBloomFilter("GZIP", "DocId", "Links.Forward");
testSingleInputFileSetupWithBloomFilter("DocId", "Links.Forward");
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand All @@ -651,7 +705,7 @@ public void testPruneNullifyTranslateCodecAndEnableBloomFilter() throws Exceptio
testPruneNullifyTranslateCodec(inputPaths);

// Verify bloom filters
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters(null);
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters();
assertEquals(inputBloomFilters.size(), 2);
assertTrue(inputBloomFilters.containsKey(ColumnPath.fromDotString("Links.Forward")));
assertTrue(inputBloomFilters.containsKey(ColumnPath.fromDotString("DocId")));
Expand All @@ -666,7 +720,7 @@ public void testPruneNullifyTranslateCodecAndEnableBloomFilter() throws Exceptio

@Test
public void testPruneEncryptTranslateCodecAndEnableBloomFilter() throws Exception {
testSingleInputFileSetupWithBloomFilter("GZIP", "DocId", "Links.Forward");
testSingleInputFileSetupWithBloomFilter("DocId", "Links.Forward");
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
Expand All @@ -675,7 +729,7 @@ public void testPruneEncryptTranslateCodecAndEnableBloomFilter() throws Exceptio
testPruneEncryptTranslateCodec(inputPaths);

// Verify bloom filters
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters(null);
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters();

// Cannot read without FileDecryptionProperties
assertThrows(ParquetCryptoRuntimeException.class, () -> allOutputBloomFilters(null));
Expand All @@ -685,42 +739,19 @@ public void testPruneEncryptTranslateCodecAndEnableBloomFilter() throws Exceptio
assertEquals(inputBloomFilters, outputBloomFilters);
}

private void testSingleInputFileSetup(String compression) throws IOException {
testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE);
}

private void testSingleInputFileSetupWithBloomFilter(String compression, String... bloomFilterEnabledColumns)
throws IOException {
testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE, bloomFilterEnabledColumns);
}

private void testSingleInputFileSetup(String compression, long rowGroupSize, String... bloomFilterEnabledColumns)
throws IOException {
MessageType schema = createSchema();
inputFiles = Lists.newArrayList();
inputFiles.add(new TestFileBuilder(conf, schema)
.withNumRecord(numRecord)
.withCodec(compression)
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withRowGroupSize(rowGroupSize)
.withBloomFilterEnabled(bloomFilterEnabledColumns)
.withWriterVersion(writerVersion)
.build());
private void testSingleInputFileSetupWithBloomFilter(String... bloomFilterEnabledColumns) throws IOException {
testSingleInputFileSetup(bloomFilterEnabledColumns);
}

private void testMultipleInputFilesSetup() throws IOException {
private void testSingleInputFileSetup(String... bloomFilterEnabledColumns) throws IOException {
MessageType schema = createSchema();
inputFiles = Lists.newArrayList();
inputFiles.add(new TestFileBuilder(conf, schema)
.withNumRecord(numRecord)
.withCodec("GZIP")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
inputFiles.add(new TestFileBuilder(conf, schema)
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
.withBloomFilterEnabled(bloomFilterEnabledColumns)
.withWriterVersion(writerVersion)
.build());
}
Expand Down Expand Up @@ -748,7 +779,7 @@ private void validateColumnData(
.withDecryption(fileDecryptionProperties)
.build();

// Get total number of rows from input files
// Get the total number of rows from input files
int totalRows = 0;
for (EncryptionTestFile inputFile : inputFiles) {
totalRows += inputFile.getFileContent().length;
Expand Down Expand Up @@ -821,7 +852,7 @@ private ParquetMetadata getFileMetaData(String file, FileDecryptionProperties fi
ParquetReadOptions readOptions = ParquetReadOptions.builder()
.withDecryption(fileDecryptionProperties)
.build();
ParquetMetadata pmd = null;
ParquetMetadata pmd;
InputFile inputFile = HadoopInputFile.fromPath(new Path(file), conf);
try (SeekableInputStream in = inputFile.newStream()) {
pmd = ParquetFileReader.readFooter(inputFile, readOptions, in);
Expand Down Expand Up @@ -995,12 +1026,10 @@ private void validateRowGroupRowCount() throws Exception {
assertEquals(inputRowCounts, outputRowCounts);
}

private Map<ColumnPath, List<BloomFilter>> allInputBloomFilters(FileDecryptionProperties fileDecryptionProperties)
throws Exception {
private Map<ColumnPath, List<BloomFilter>> allInputBloomFilters() throws Exception {
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = new HashMap<>();
for (EncryptionTestFile inputFile : inputFiles) {
Map<ColumnPath, List<BloomFilter>> bloomFilters =
allBloomFilters(inputFile.getFileName(), fileDecryptionProperties);
Map<ColumnPath, List<BloomFilter>> bloomFilters = allBloomFilters(inputFile.getFileName(), null);
for (Map.Entry<ColumnPath, List<BloomFilter>> entry : bloomFilters.entrySet()) {
List<BloomFilter> bloomFilterList = inputBloomFilters.getOrDefault(entry.getKey(), new ArrayList<>());
bloomFilterList.addAll(entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import org.apache.parquet.schema.Type;

public class TestFileBuilder {
private MessageType schema;
private Configuration conf;
private static final char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'};
private static final ThreadLocalRandom random = ThreadLocalRandom.current();
private final MessageType schema;
private final Configuration conf;
private Map<String, String> extraMeta = new HashMap<>();
private int numRecord = 100000;
private ParquetProperties.WriterVersion writerVersion = ParquetProperties.WriterVersion.PARQUET_1_0;
Expand Down Expand Up @@ -164,7 +166,7 @@ private void addValueToSimpleGroup(Group g, Type type) {

private void addPrimitiveValueToSimpleGroup(Group g, PrimitiveType primitiveType) {
if (primitiveType.isRepetition(Type.Repetition.REPEATED)) {
int listSize = ThreadLocalRandom.current().nextInt(1, 10);
int listSize = random.nextInt(1, 10);
for (int i = 0; i < listSize; i++) {
addSinglePrimitiveValueToSimpleGroup(g, primitiveType);
}
Expand All @@ -191,39 +193,39 @@ private void addSinglePrimitiveValueToSimpleGroup(Group g, PrimitiveType primiti
}

private static long getInt() {
return ThreadLocalRandom.current().nextInt(10000);
return random.nextInt(10000);
}

private static long getLong() {
return ThreadLocalRandom.current().nextLong(100000);
return random.nextLong(100000);
}

private static String getString() {
char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'};

StringBuilder sb = new StringBuilder();
for (int i = 0; i < 100; i++) {
sb.append(chars[ThreadLocalRandom.current().nextInt(10)]);
for (int i = 0; i < random.nextInt(100); i++) {
sb.append(chars[random.nextInt(10)]);
}
return sb.toString();
}

private static float getFloat() {
if (ThreadLocalRandom.current().nextBoolean()) {
if (random.nextBoolean()) {
return Float.NaN;
}
return ThreadLocalRandom.current().nextFloat();
return random.nextFloat();
}

private static double getDouble() {
if (ThreadLocalRandom.current().nextBoolean()) {
if (random.nextBoolean()) {
return Double.NaN;
}
return ThreadLocalRandom.current().nextDouble();
return random.nextDouble();
}

public static String createTempFile(String prefix) {
try {
return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet";
return Files.createTempDirectory(prefix).toAbsolutePath() + "/test.parquet";
} catch (IOException e) {
throw new AssertionError("Unable to create temporary file", e);
}
Expand Down