From 14a335ac4760495a15b7293632cc8c8dff5cad1b Mon Sep 17 00:00:00 2001 From: sychen Date: Mon, 22 Jan 2024 19:14:09 +0800 Subject: [PATCH 1/5] reduce NN RPC --- .../core/src/java/org/apache/orc/OrcConf.java | 5 +- .../java/org/apache/orc/impl/ReaderImpl.java | 17 +++- .../apache/orc/util/OrcInputStreamUtil.java | 79 +++++++++++++++++++ 3 files changed, 96 insertions(+), 5 deletions(-) create mode 100644 java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java index fa7bc9bf3f..02eb6ad711 100644 --- a/java/core/src/java/org/apache/orc/OrcConf.java +++ b/java/core/src/java/org/apache/orc/OrcConf.java @@ -251,7 +251,10 @@ public enum OrcConf { ROW_BATCH_CHILD_LIMIT("orc.row.child.limit", "orc.row.child.limit", 1024 * 32, "The maximum number of child elements to buffer before "+ "the ORC row writer writes the batch to the file." - ) + ), + FILE_LENGTH_FAST("orc.file.length.fast", "orc.file.length.fast", + false, "A boolean flag to enable reduce file length RPC. " + ) ; private final String attribute; diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java index 3afbff5fc3..9b581ba39c 100644 --- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java @@ -48,6 +48,7 @@ import org.apache.orc.TypeDescription; import org.apache.orc.impl.reader.ReaderEncryption; import org.apache.orc.impl.reader.ReaderEncryptionVariant; +import org.apache.orc.util.OrcInputStreamUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -784,12 +785,20 @@ protected OrcTail extractFileTail(FileSystem fs, Path path, // figure out the size of the file using the option or filesystem long size; if (maxFileLength == Long.MAX_VALUE) { - FileStatus fileStatus = fs.getFileStatus(path); - size = fileStatus.getLen(); - modificationTime = fileStatus.getModificationTime(); + boolean fileLengthFast = conf.getBoolean(OrcConf.FILE_LENGTH_FAST.getAttribute(), + (boolean)OrcConf.FILE_LENGTH_FAST.getDefaultValue()); + long fileLength = fileLengthFast ? OrcInputStreamUtil.getFileLength(file) : -1L; + if (fileLength > 0) { + size = fileLength; + modificationTime = -1L; + } else { + FileStatus fileStatus = fs.getFileStatus(path); + size = fileStatus.getLen(); + modificationTime = fileStatus.getModificationTime(); + } } else { size = maxFileLength; - modificationTime = -1; + modificationTime = -1L; } if (size == 0) { // Hive often creates empty files (including ORC) and has an diff --git a/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java b/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java new file mode 100644 index 0000000000..ac9ef31185 --- /dev/null +++ b/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.util; + +import org.apache.hadoop.fs.FSDataInputStream; + +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +public class OrcInputStreamUtil { + + private static final String DFS_CLASS = "org.apache.hadoop.hdfs.DFSInputStream"; + private static final String DFS_STRIPED_CLASS = "org.apache.hadoop.hdfs.DFSStripedInputStream"; + + private static Method shortCircuitForbiddenMethod; + private static Method getFileLengthMethod; + + static { + init(); + } + + private static void init() { + try { + initInt(); + } catch (ClassNotFoundException | NoSuchMethodException ignored) { + } + } + + private static void initInt() throws ClassNotFoundException, NoSuchMethodException { + Class dfsClass = Class.forName(DFS_CLASS); + shortCircuitForbiddenMethod = dfsClass.getDeclaredMethod("shortCircuitForbidden"); + shortCircuitForbiddenMethod.setAccessible(true); + getFileLengthMethod = dfsClass.getMethod("getFileLength"); + } + + public static long getFileLength(FSDataInputStream file) { + try { + return getFileLengthInt(file); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + return -1L; + } + } + + private static long getFileLengthInt(FSDataInputStream file) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + if (shortCircuitForbiddenMethod == null || getFileLengthMethod == null) { + return -1L; + } + InputStream wrappedStream = file.getWrappedStream(); + Class wrappedStreamClass = wrappedStream.getClass(); + String className = wrappedStreamClass.getName(); + if (!className.equals(DFS_CLASS) && !className.equals(DFS_STRIPED_CLASS)) { + return -1L; + } + boolean isUnderConstruction = (boolean) shortCircuitForbiddenMethod.invoke(wrappedStream); + // If file are under construction, we need to get the file length from NameNode. + if (!isUnderConstruction) { + return (long) getFileLengthMethod.invoke(wrappedStream); + } + return -1L; + } +} From 4e659d873592bdd3d39b1e6bb9bee8fc7f69b0ad Mon Sep 17 00:00:00 2001 From: sychen Date: Mon, 22 Jan 2024 20:27:51 +0800 Subject: [PATCH 2/5] set true to test --- java/core/src/java/org/apache/orc/OrcConf.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java index 02eb6ad711..56bc8245be 100644 --- a/java/core/src/java/org/apache/orc/OrcConf.java +++ b/java/core/src/java/org/apache/orc/OrcConf.java @@ -253,7 +253,7 @@ public enum OrcConf { "the ORC row writer writes the batch to the file." ), FILE_LENGTH_FAST("orc.file.length.fast", "orc.file.length.fast", - false, "A boolean flag to enable reduce file length RPC. " + true, "A boolean flag to enable reduce file length RPC. " ) ; From 2146d8e0953506a724b6f3a52818f79567f6d9b1 Mon Sep 17 00:00:00 2001 From: sychen Date: Mon, 22 Jan 2024 22:06:35 +0800 Subject: [PATCH 3/5] Revert "set true to test" This reverts commit 4e659d873592bdd3d39b1e6bb9bee8fc7f69b0ad. --- java/core/src/java/org/apache/orc/OrcConf.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java index 56bc8245be..02eb6ad711 100644 --- a/java/core/src/java/org/apache/orc/OrcConf.java +++ b/java/core/src/java/org/apache/orc/OrcConf.java @@ -253,7 +253,7 @@ public enum OrcConf { "the ORC row writer writes the batch to the file." ), FILE_LENGTH_FAST("orc.file.length.fast", "orc.file.length.fast", - true, "A boolean flag to enable reduce file length RPC. " + false, "A boolean flag to enable reduce file length RPC. " ) ; From a73e1e5c3e73c5233ae310f6615cea123808cb58 Mon Sep 17 00:00:00 2001 From: sychen Date: Wed, 24 Jan 2024 14:56:05 +0800 Subject: [PATCH 4/5] getFileLength --- .../apache/orc/util/OrcInputStreamUtil.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java b/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java index ac9ef31185..9760b1fd35 100644 --- a/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java +++ b/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java @@ -19,6 +19,7 @@ package org.apache.orc.util; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hdfs.DFSInputStream; import java.io.InputStream; import java.lang.reflect.InvocationTargetException; @@ -27,10 +28,8 @@ public class OrcInputStreamUtil { private static final String DFS_CLASS = "org.apache.hadoop.hdfs.DFSInputStream"; - private static final String DFS_STRIPED_CLASS = "org.apache.hadoop.hdfs.DFSStripedInputStream"; private static Method shortCircuitForbiddenMethod; - private static Method getFileLengthMethod; static { init(); @@ -45,9 +44,9 @@ private static void init() { private static void initInt() throws ClassNotFoundException, NoSuchMethodException { Class dfsClass = Class.forName(DFS_CLASS); + // org.apache.hadoop.hdfs.DFSInputStream.shortCircuitForbidden Method is not public shortCircuitForbiddenMethod = dfsClass.getDeclaredMethod("shortCircuitForbidden"); shortCircuitForbiddenMethod.setAccessible(true); - getFileLengthMethod = dfsClass.getMethod("getFileLength"); } public static long getFileLength(FSDataInputStream file) { @@ -60,19 +59,16 @@ public static long getFileLength(FSDataInputStream file) { private static long getFileLengthInt(FSDataInputStream file) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - if (shortCircuitForbiddenMethod == null || getFileLengthMethod == null) { + if (shortCircuitForbiddenMethod == null) { return -1L; } InputStream wrappedStream = file.getWrappedStream(); - Class wrappedStreamClass = wrappedStream.getClass(); - String className = wrappedStreamClass.getName(); - if (!className.equals(DFS_CLASS) && !className.equals(DFS_STRIPED_CLASS)) { - return -1L; - } - boolean isUnderConstruction = (boolean) shortCircuitForbiddenMethod.invoke(wrappedStream); - // If file are under construction, we need to get the file length from NameNode. - if (!isUnderConstruction) { - return (long) getFileLengthMethod.invoke(wrappedStream); + if (wrappedStream instanceof DFSInputStream dfsInputStream) { + boolean isUnderConstruction = (boolean) shortCircuitForbiddenMethod.invoke(wrappedStream); + // If file are under construction, we need to get the file length from NameNode. + if (!isUnderConstruction) { + return dfsInputStream.getFileLength(); + } } return -1L; } From 61d1f2f69d10b679e346cfa961875eb9ba8acda2 Mon Sep 17 00:00:00 2001 From: sychen Date: Wed, 24 Jan 2024 15:00:09 +0800 Subject: [PATCH 5/5] remove --- .../src/java/org/apache/orc/util/OrcInputStreamUtil.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java b/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java index 9760b1fd35..322ef4e875 100644 --- a/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java +++ b/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java @@ -27,8 +27,6 @@ public class OrcInputStreamUtil { - private static final String DFS_CLASS = "org.apache.hadoop.hdfs.DFSInputStream"; - private static Method shortCircuitForbiddenMethod; static { @@ -43,9 +41,8 @@ private static void init() { } private static void initInt() throws ClassNotFoundException, NoSuchMethodException { - Class dfsClass = Class.forName(DFS_CLASS); // org.apache.hadoop.hdfs.DFSInputStream.shortCircuitForbidden Method is not public - shortCircuitForbiddenMethod = dfsClass.getDeclaredMethod("shortCircuitForbidden"); + shortCircuitForbiddenMethod = DFSInputStream.class.getDeclaredMethod("shortCircuitForbidden"); shortCircuitForbiddenMethod.setAccessible(true); }