Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hive performance regression between 419 and 463 #24099

Open
benrifkind opened this issue Nov 11, 2024 · 27 comments
Open

Hive performance regression between 419 and 463 #24099

benrifkind opened this issue Nov 11, 2024 · 27 comments

Comments

@benrifkind
Copy link

I am trying to understand a performance degradation that has happened on upgrading from Trino 419 to Trino 463. Querying hive tables with zstd compressed data in s3 seem to run significantly slower in Trino 463 than in Trino 419.

I have symlink Hive table built on top of zstd compressed data in s3. Querying this table is relatively fast in Trino 419 however when I tried to upgrade to the most recent version of Trino I saw a significant decrease in speed of execution and a spike in CPU.

It is a simple group by query like

select date, count(*)
   FROM <table_name>
   WHERE date in ('2024-11-07')
 AND column1='foo'
     AND column2='bar'
 group by 1
 order by 1

These are the query stats in Trino 419
Image
Image

And these are the query stats in Trino 463
Image
Image

The same query now takes double the amount of time and the CPU is much higher.

I tried to pinpoint where in the upgrade path this performance degredation occurred but when I tried running on Trino 430 for example I get errors like

io.trino.spi.TrinoException: Unsupported input format: serde=org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

I am running this self hosted on AWS EC2 instances. The coordinator is of type r7g.4xlarge and there are 5 workers of type are r7g.8xlarge.

I don't think the fact that this is symlink table has anything to do with the performance issue. The reason that this is a symlink table is because the data is stored in s3 in a funky way that does not lend itself to the hive partitioning scheme.

This is some info about one of zstd file's

$> zstd -lv file.csv.zst
# Zstandard Frames: 1
DictID: 0
Window Size: 8.00 MiB (8388608 B)
Compressed Size: 26.4 MiB (27635730 B)
Check: XXH64 109a2c1c
@nineinchnick
Copy link
Member

Which JDK are you using? Trino 457 uses native code for decompression: https://trino.io/docs/current/release/release-457.html#hive-connector

@nineinchnick
Copy link
Member

Also, 458 switched off the legacy FS library, did you migrate to native FS or kept the legacy one? Can you share your catalog configs?

@benrifkind
Copy link
Author

@nineinchnick Thank you very much for your quick response!

$> java --version
openjdk 22.0.2 2024-07-16
OpenJDK Runtime Environment Corretto-22.0.2.9.1 (build 22.0.2+9-FR)
OpenJDK 64-Bit Server VM Corretto-22.0.2.9.1 (build 22.0.2+9-FR, mixed mode, sharing)

I do have fs.native-s3.enabled=true in my hive.properties file. When that was not set I got errors like

Caused by: java.lang.IllegalArgumentException: No factory for location: s3n://<bucket>/<path>

hive.properties

connector.name=hive
hive.metastore.uri=thrift://hive2.internal.foo.com:9083
hive.parquet.use-column-names=true
hive.recursive-directories=true
hive.storage-format=ORC
hive.max-partitions-per-scan=1000000
hive.non-managed-table-writes-enabled=true
hive.domain-compaction-threshold=1000000

# Use partition projection
# https://trino.io/docs/current/connector/hive.html#accessing-tables-with-athena-partition-projection-metadata
hive.partition-projection-enabled=true

# To fix Error Line too long in text file
hive.text.max-line-length=500MB

# Trino is failing on s3n urls even if the table location is set to be s3
fs.native-s3.enabled=true

coordinator config.properties

jmx.rmiregistry.port=9999

query.min-expire-age=1.00h
query.max-history=100

query.low-memory-killer.policy=total-reservation-on-blocked-nodes

internal-communication.shared-secret=foo=
# No real limit on the number of stages
query.max-stage-count=1000

# TODO: figure out how to use http2
internal-communication.http2.enabled=false
coordinator=true
node-scheduler.include-coordinator=false
discovery.uri=http://localhost:8080
http-server.http.port=8080

http-server.https.enabled=true
http-server.https.port=8443
http-server.process-forwarded=true

http-server.authentication.type=PASSWORD
password-authenticator.config-files=/etc/trino/password-authenticator-okta.properties

http-server.https.keystore.path=/etc/trino/keystore.jks
http-server.https.keystore.key=<key>

http-server.authentication.password.user-mapping.pattern=(.*)(@.*)

protocol.v1.alternate-header-name=Presto
query.max-memory-per-node=71GB
memory.heap-headroom-per-node=30GB

query.max-total-memory=1000000GB
query.max-memory=999999GB

worker config.properties

jmx.rmiregistry.port=9999

query.min-expire-age=1.00h
query.max-history=100

query.low-memory-killer.policy=total-reservation-on-blocked-nodes

internal-communication.shared-secret=foo=
# No real limit on the number of stages
query.max-stage-count=1000

# TODO: figure out how to use http2
internal-communication.http2.enabled=false
coordinator=false
http-server.http.port=8080
query.max-memory-per-node=142GB
memory.heap-headroom-per-node=61GB

query.max-total-memory=1000000GB
query.max-memory=999999GB

discovery.uri=http://<host>.us-west-2.elb.amazonaws.com:8080

@wendigo
Copy link
Contributor

wendigo commented Nov 12, 2024

Can you try disabling native compression?

@benrifkind
Copy link
Author

Can you try disabling native compression?

Do you mean by following https://trino.io/docs/current/admin/properties-general.html#file-compression-and-decompression. Just tried that and it did not seem to make a difference in performance. Pretty much the same CPU Time and Execution Time

Here's my jvm.config

-server
-XX:+UseG1GC
-XX:+ExplicitGCInvokesConcurrent
-XX:+UseGCOverheadLimit
-XX:OnOutOfMemoryError=kill -9 %p
-Djavax.net.ssl.sessionCacheSize=1000
-Djavax.net.ssl.sessionCacheTimeout=60
-Djdk.attach.allowAttachSelf=true
-Dio.airlift.compress.v3.disable-native=true

-Xmx102G

@wendigo
Copy link
Contributor

wendigo commented Nov 12, 2024

So we need more performance data to analyze this case, can you share operator stats for that query?

@wendigo
Copy link
Contributor

wendigo commented Nov 12, 2024

Is the query plan the same for both versions? Or different?

@wendigo wendigo changed the title Slow performance on zstd compressed data Hive performance regression between 419 and 463 Nov 12, 2024
@wendigo
Copy link
Contributor

wendigo commented Nov 12, 2024

I've updated issue title as it doesn't seem to be related to zstd at all.

@raunaqmorarka
Copy link
Member

Can you add output of EXPLAIN ANALYZE and the queryinfo json from both versions ?

@benrifkind
Copy link
Author

Here are the query plans

                                                                                              Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Trino version: 463
 Queued: 400.38us, Analysis: 460.67ms, Planning: 1.37s, Execution: 7.98m
 Fragment 1 [ROUND_ROBIN]
     CPU: 32.98ms, Scheduled: 73.42ms, Blocked 21.90h (Input: 21.20h, Output: 0.00ns), Input: 1 row (24B); per task: avg.: 0.20 std.dev.: 0.40, Output: 1 row (24B)
     Peak Memory: 12.93kB, Tasks count: 5; per task: max: 2.59kB
     Amount of input data processed by the workers for this stage might be skewed
     Output layout: [date, count_2]
     Output partitioning: SINGLE []
     LocalMerge[orderBy = [date ASC NULLS LAST]]
     │   Layout: [date:varchar, count_2:bigint]
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
     │   CPU: 4.00ms (0.00%), Scheduled: 5.00ms (0.00%), Blocked: 39.79m (1.21%), Output: 1 row (24B)
     │   Input avg.: 0.01 rows, Input std.dev.: 1260.95%
     └─ PartialSort[orderBy = [date ASC NULLS LAST]]
        │   Layout: [date:varchar, count_2:bigint]
        │   CPU: 13.00ms (0.00%), Scheduled: 27.00ms (0.00%), Blocked: 0.00ns (0.00%), Output: 1 row (24B)
        │   Input avg.: 0.01 rows, Input std.dev.: 1260.95%
        └─ RemoteSource[sourceFragmentIds = [2]]
               Layout: [date:varchar, count_2:bigint]
               CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 21.20h (38.56%), Output: 1 row (24B)
               Input avg.: 0.01 rows, Input std.dev.: 1260.95%

 Fragment 2 [HASH]
     CPU: 308.35ms, Scheduled: 344.56ms, Blocked 1.77d (Input: 21.18h, Output: 0.00ns), Input: 11785 rows (276.21kB); per task: avg.: 2357.00 std.dev.: 4714.00, Output: 1 row (24B)
     Peak Memory: 669.73kB, Tasks count: 5; per task: max: 648.73kB
     Amount of input data processed by the workers for this stage might be skewed
     Output layout: [date, count_2]
     Output partitioning: ROUND_ROBIN []
     Aggregate[type = FINAL, keys = [date]]
     │   Layout: [date:varchar, count_2:bigint]
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
     │   CPU: 69.00ms (0.00%), Scheduled: 72.00ms (0.00%), Blocked: 0.00ns (0.00%), Output: 1 row (24B)
     │   Input avg.: 73.66 rows, Input std.dev.: 1260.95%
     │   count_2 := count(count_3)
     └─ LocalExchange[partitioning = HASH, arguments = [date::varchar]]
        │   Layout: [date:varchar, count_3:bigint]
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        │   CPU: 65.00ms (0.00%), Scheduled: 69.00ms (0.00%), Blocked: 11.94h (21.72%), Output: 11785 rows (276.21kB)
        │   Input avg.: 73.66 rows, Input std.dev.: 212.30%
        └─ RemoteSource[sourceFragmentIds = [3]]
               Layout: [date:varchar, count_3:bigint]
               CPU: 86.00ms (0.00%), Scheduled: 93.00ms (0.00%), Blocked: 21.18h (38.52%), Output: 11785 rows (276.21kB)
               Input avg.: 73.66 rows, Input std.dev.: 212.30%

 Fragment 3 [SOURCE]
     CPU: 19.40h, Scheduled: 1.62d, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 604231718 rows (172.63kB); per task: avg.: 120846343.60 std.dev.: 2662057.24, Output: 11785 rows (276.21kB)
     Peak Memory: 195.06MB, Tasks count: 5; per task: max: 41.34MB
     Output layout: [date, count_3]
     Output partitioning: HASH [date]
     Aggregate[type = PARTIAL, keys = [date]]
     │   Layout: [date:varchar, count_3:bigint]
     │   CPU: 8.11s (0.01%), Scheduled: 15.16s (0.01%), Blocked: 0.00ns (0.00%), Output: 11785 rows (276.21kB)
     │   Input avg.: 51271.25 rows, Input std.dev.: 113.31%
     │   count_3 := count(*)
     └─ TableScan[table = hive:adroll_symlink:log_lines]
            Layout: [date:varchar]
            Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
            CPU: 19.40h (99.99%), Scheduled: 1.62d (99.99%), Blocked: 0.00ns (0.00%), Output: 604231718 rows (172.63kB)
            Input avg.: 51271.25 rows, Input std.dev.: 113.31%
            date := date:string:PARTITION_KEY
                :: [[2024-11-07]]
            line_type:string:PARTITION_KEY
                :: [[bid]]
            line_exchange:string:PARTITION_KEY
                :: [[index]]
            Input: 604231718 rows (172.63kB), Physical input: 568.89GB, Physical input time: 1.56d
                                                                                              Query Plan
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Trino version: 419
 Queued: 11.51ms, Analysis: 887.17ms, Planning: 2.81s, Execution: 3.98m
 Fragment 1 [ROUND_ROBIN]
     CPU: 502.34ms, Scheduled: 3.37s, Blocked 10.80h (Input: 10.50h, Output: 0.00ns), Input: 1 row (24B); per task: avg.: 0.20 std.dev.: 0.40, Output: 1 row (24B)
     Amount of input data processed by the workers for this stage might be skewed
     Output layout: [date, count_2]
     Output partitioning: SINGLE []
     LocalMerge[orderBy = [date ASC NULLS LAST]]
     │   Layout: [date:varchar, count_2:bigint]
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
     │   CPU: 8.00ms (0.00%), Scheduled: 16.00ms (0.00%), Blocked: 19.65m (1.53%), Output: 1 row (24B)
     │   Input avg.: 0.01 rows, Input std.dev.: 1260.95%
     └─ PartialSort[orderBy = [date ASC NULLS LAST]]
        │   Layout: [date:varchar, count_2:bigint]
        │   CPU: 273.00ms (0.00%), Scheduled: 2.59s (0.02%), Blocked: 0.00ns (0.00%), Output: 1 row (24B)
        │   Input avg.: 0.01 rows, Input std.dev.: 1260.95%
        └─ RemoteSource[sourceFragmentIds = [2]]
               Layout: [date:varchar, count_2:bigint]
               CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 10.50h (49.08%), Output: 1 row (24B)
               Input avg.: 0.01 rows, Input std.dev.: 1260.95%

 Fragment 2 [HASH]
     CPU: 839.87ms, Scheduled: 926.71ms, Blocked 20.95h (Input: 10.50h, Output: 0.00ns), Input: 11785 rows (379.79kB); per task: avg.: 2357.00 std.dev.: 4714.00, Output: 1 row (24B)
     Amount of input data processed by the workers for this stage might be skewed
     Output layout: [date, count_2]
     Output partitioning: ROUND_ROBIN []
     Project[]
     │   Layout: [date:varchar, count_2:bigint]
     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
     │   CPU: 58.00ms (0.00%), Scheduled: 70.00ms (0.00%), Blocked: 0.00ns (0.00%), Output: 1 row (24B)
     │   Input avg.: 0.01 rows, Input std.dev.: 1260.95%
     └─ Aggregate[type = FINAL, keys = [date], hash = [$hashvalue]]
        │   Layout: [date:varchar, $hashvalue:bigint, count_2:bigint]
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
        │   CPU: 124.00ms (0.00%), Scheduled: 133.00ms (0.00%), Blocked: 0.00ns (0.00%), Output: 1 row (33B)
        │   Input avg.: 73.66 rows, Input std.dev.: 1260.95%
        │   count_2 := count("count_3")
        └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["date"]]
           │   Layout: [date:varchar, count_3:bigint, $hashvalue:bigint]
           │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           │   CPU: 238.00ms (0.00%), Scheduled: 253.00ms (0.00%), Blocked: 3.92m (0.31%), Output: 11785 rows (379.79kB)
           │   Input avg.: 73.66 rows, Input std.dev.: 200.15%
           └─ RemoteSource[sourceFragmentIds = [3]]
                  Layout: [date:varchar, count_3:bigint, $hashvalue_4:bigint]
                  CPU: 222.00ms (0.00%), Scheduled: 232.00ms (0.00%), Blocked: 10.50h (49.08%), Output: 11785 rows (379.79kB)
                  Input avg.: 73.66 rows, Input std.dev.: 200.15%

 Fragment 3 [SOURCE]
     CPU: 2.17h, Scheduled: 4.59h, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 604231718 rows (568.89GB); per task: avg.: 120846343.60 std.dev.: 1946984.87, Output: 11785 rows (379.79kB)
     Output layout: [date, count_3, $hashvalue_5]
     Output partitioning: HASH [date][$hashvalue_5]
     Aggregate[type = PARTIAL, keys = [date], hash = [$hashvalue_5]]
     │   Layout: [date:varchar, $hashvalue_5:bigint, count_3:bigint]
     │   CPU: 23.32s (0.30%), Scheduled: 23.47s (0.14%), Blocked: 0.00ns (0.00%), Output: 11785 rows (379.79kB)
     │   Input avg.: 51271.25 rows, Input std.dev.: 113.31%
     │   count_3 := count(*)
     └─ ScanProject[table = hive:adroll_symlink:log_lines]
            Layout: [date:varchar, $hashvalue_5:bigint]
            Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
            CPU: 2.16h (99.69%), Scheduled: 4.58h (99.84%), Blocked: 0.00ns (0.00%), Output: 604231718 rows (13.51GB)
            Input avg.: 51271.25 rows, Input std.dev.: 113.31%
            $hashvalue_5 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("date"), 0))
            date := date:string:PARTITION_KEY
                :: [[2024-11-07]]
            line_type:string:PARTITION_KEY
                :: [[bid]]
            line_exchange:string:PARTITION_KEY
                :: [[index]]
            Input: 604231718 rows (568.89GB), Filtered: 0.00%, Physical input: 568.89GB, Physical input time: 0.00ms

@benrifkind
Copy link
Author

And here are the JSONs
trino-419.json
trino-463.json

@wendigo
Copy link
Contributor

wendigo commented Nov 12, 2024

In 463 table scan is CPU: 19.40h (99.99%), Scheduled: 1.62d (99.99%), Blocked: 0.00ns (0.00%), Output: 604231718 rows (172.63kB) while in 419 it is 2.16h (99.69%), Scheduled: 4.58h (99.84%), Blocked: 0.00ns (0.00%), Output: 604231718 rows (13.51GB)

@benrifkind
Copy link
Author

@wendigo Just following up here. Do you need any more info from me? Or do you see something incorrect with my setup that has caused the increase in CPU?

@wendigo
Copy link
Contributor

wendigo commented Nov 13, 2024

@raunaqmorarka do you have time and resources to triage that?

@raunaqmorarka
Copy link
Member

Given that this is a CSV file, we're likely dealing with some missing optimization in the new native csv reader.
@benrifkind is there a reason for using csv over parquet/orc ?
cc: @dain @electrum

@benrifkind
Copy link
Author

@raunaqmorarka this is an upstream data source that I don't have control over. We use trino to aggregate this csv and transform it into orc for downstream consumers

@vburenin
Copy link

You can also notice a significant memory usage increase.

I just tried to do a migration from Trino 419 to 464 and ran into excessive memory usage using just Iceberg catalogs.
The query that used to use only 2GB started to fail with out of memory errors on 464:

I can't share the SQL query, however query uses LEFT JOIN and GROUP BY.

but here is some statistic results from Trino 419:

Query 20241114_191246_68424_diinp, FINISHED, 12 nodes
Splits: 2,473 total, 2,473 done (100.00%)
18.75 [68.2M rows, 2.12GB] [3.64M rows/s, 116MB/s]

vs Trino 464:

Query 20241114_191734_00002_3zcrk, FAILED, 2 nodes
Splits: 10,568 total, 9,433 done (89.26%)
1:48 [1.23B rows, 46.8GB] [11.4M rows/s, 444MB/s]

Query 20241114_191734_00002_3zcrk failed: Query exceeded per-node memory limit of 40GB [Allocated: 40.00GB, Delta: 7.46MB, Top Consumers: {HashBuilderOperator=39.71GB, LazyOutputBuffer=131.29MB, ExchangeOperator=103.94MB}]

So it doesn't appear to be Hive specific.

@raunaqmorarka
Copy link
Member

@benrifkind are you able to share JFRs from old and new version ?
427 release was when we removed the old csv reader.

@benrifkind
Copy link
Author

@raunaqmorarka Can you explain how I would do that? Do you want recordings from the coordinator and the workers?

@raunaqmorarka
Copy link
Member

A profile from any worker which reads the csv files would be sufficient https://www.baeldung.com/java-flight-recorder-monitoring

@benrifkind
Copy link
Author

@raunaqmorarka Does this work? I had to rename it to .txt because Github doesn't allow .jfr file.

flight.jfr.txt

@raunaqmorarka
Copy link
Member

Thanks, can you also share a JFR from the old version for comparison ?

Image

@wendigo @dain ^

@wendigo
Copy link
Contributor

wendigo commented Nov 21, 2024

@raunaqmorarka this is Java decompressor, not a native one

@raunaqmorarka
Copy link
Member

Yes, I see that, I was wondering why the native decompressor is not used and why it's slower than previous releases. Was legacy reader using native zstd from hadoop or the same Java implementation ?

@wendigo
Copy link
Contributor

wendigo commented Nov 21, 2024

@raunaqmorarka the V3 Java implementation is the same as V2/V1 so there should be no performance penalty at all.

@benrifkind
Copy link
Author

Here is the Trino 419 jfr file trino-419.jfr.txt

@raunaqmorarka
Copy link
Member

Image

There's no zstd in stack traces in old version, I think that means it was using a native zstd implementation from hadoop.
@wendigo @electrum @dain is it feasible to use native zstd decompressor in the new CSV reader ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

5 participants