From 880563ae2ad2702f6d6c56b60638270d05053a68 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 16 Jan 2025 21:30:20 +0100 Subject: [PATCH] Create public static method for getEncodedElementByteSize to avoid breaking change in Coder Improve ComposedAccumulatorCoder size calculation. --- .../main/java/org/apache/beam/sdk/coders/Coder.java | 3 +++ .../apache/beam/sdk/coders/LengthPrefixCoder.java | 10 +--------- .../org/apache/beam/sdk/transforms/CombineFns.java | 13 +++++++++++++ 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index 08e25c6b77e7..ce2e329c7547 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -198,6 +198,9 @@ public static void verifyDeterministic(Coder target, String message, Iterable } } + public static long getEncodedElementByteSize(Coder target, T value) throws Exception { + return target.getEncodedElementByteSize(value); + } /** * Verifies all of the provided coders are deterministic. If any are not, throws a {@link * NonDeterministicException} for the {@code target} {@link Coder}. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java index 84711e70f1cc..64e051f145dc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java @@ -103,16 +103,8 @@ public boolean consistentWithEquals() { */ @Override protected long getEncodedElementByteSize(T value) throws Exception { - if (valueCoder instanceof StructuredCoder) { - // If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of - // the value, adding the number of bytes to represent the length. - long valueSize = ((StructuredCoder) valueCoder).getEncodedElementByteSize(value); + long valueSize = valueCoder.getEncodedElementByteSize(value); return VarInt.getLength(valueSize) + valueSize; - } - - // If value is not a StructuredCoder then fall back to the default StructuredCoder behavior - // of encoding and counting the bytes. The encoding will include the length prefix. - return super.getEncodedElementByteSize(value); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index 81a31d133509..b45cf33dec35 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -674,6 +674,19 @@ public void encode(Object[] value, OutputStream outStream, Coder.Context context coders.get(lastIndex).encode(value[lastIndex], outStream, context); } + @Override + public long getEncodedElementByteSize(Object[] value) throws Exception { + if (value.length == 0) { + return 0; + } + long size = 0; + for (int i = 0; i < value.length; ++i) { + Coder objectCoder = coders.get(i); + size += Coder.getEncodedElementByteSize(objectCoder, value); + } + return size; + } + @Override public Object[] decode(InputStream inStream) throws CoderException, IOException { return decode(inStream, Context.NESTED);