[SPARK-50302] Ensure secondary index sizes equal primary index sizes for TransformWithState stateful variables with TTL #48853
+884
−432
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR ensures that the secondary indexes that state variables with TTL use are at most the size of the corresponding state variable's primary index. This change will eliminate unnecessary work done during the cleanup of stateful variables with TTL.
Why are the changes needed?
Context
The
TransformWithState
operator (hereon out known as "TWS") will allow users write procedural logic over streams of records. To store state between micro-batches, Spark will provide users stateful variables, which persist between micro-batches. For example, you might want to emit an average of the past 5 records, every 5 records. You might only receive 2 records in the first micro-batch, so you have to buffer these 2 records until you get 3 more in a subsequent batch. TWS supports 3 different types of stateful variables: single values, lists, and maps.The TWS operator also supports stateful variables with Time To Live; this allows you to say, "keep a certain record in state for
d
units of time". This TTL is per-record. This means that every record in a list (or map) can expiry at a different point in time, depending on when the element in the list is inserted. A record inserted into a stateful list (or map) at timet1
will expire att1 + d
, and a second that expires att2 + d
will expire att2 + d
. (For value state, there's only one value, so "everything" expires at the same time.)A very natural question to now ask is, how do we efficiently determine which elements have expired in the list, without having to do a full scan of every record in state? The idea here is to keep a secondary index from expiration timestamp, to the specific record that needs to be evicted. Not so hard, right?
The state cleanup strategy today
Today's cleanup strategy is about as simple as I indicated earlier: for every insert to a value/map/list, you:
The issue with this approach is that we do two unconditional writes. This means that if the same state variable is written to with different timestamps, there will exist one element in the primary index, while there exists two elements in the secondary index. Consider the following example for a state variable
foo
with valuev1
, and TTL delay of 500:For batch 0,
batchTimestampMs = 100
,foo
updates tov1
:[foo -> (v1, 600)]
[(600, foo) -> EMPTY]
Note that the state variable is included in the secondary index key because we might have several elements with the same expiration timestamp; we want
(600, foo)
to not overwrite a(600, bar)
, just because they both expire at 600.Batch 1:
batchTimestampMs = 200
,foo
updates tov2
.Primary index:
[foo -> (v2, 700)]
Secondary index:
[(600, foo) -> EMPTY, (700, foo) -> EMPTY]
Now, we have two entries in our secondary index. If the current timestamp advanced to something like 800, we'd take the following steps:
(600, foo)
, and lookupfoo
in the primary index. That would yield(v2, 700)
. The value of 700 in the primary index is still less than 800, so we would removefoo
from the primary index.(700, foo)
. We'd look upfoo
in the primary index and see nothing, so we'd do nothing.You'll notice here that step 2 is entirely redundant. We read
(700, foo)
and did a get to the primary index, for something that was doomed—it would have never returned anything.While this isn't great, the story is unfortunately significantly worse for lists. The way that we store lists is by having a single key in RocksDB, whose value is the concatenated bytes of all the values in that list. When we do cleanup for a list, we go through all of its records and Thus, it's possible for us to have a list that looks something like:
[foo -> [(v1, 600), (v2, 700), (v3, 900)]]
[(600, foo) -> EMPTY, (700, foo) -> EMPTY, (900, foo) -> EMPTY]
Now, suppose that the current timestamp is 800. We need to expire the records in the list. So, we do the following:
(600, foo)
. This tells us that the listfoo
needs cleaning up. We clean up everything infoo
less than 800. Since we store lists as a single key, we issue a RocksDBclear
operation, iterate through all of the existing values, eliminate(v1, 600)
and(v2, 700)
, and write back(v3, 900)
.(700, foo)
, and we unknowingly do cleanup onfoo
again. This consists of clearingfoo
, iterating through its elements, and writing back(v3, 900)
. But since cleanup already happened, this step is entirely redundant.(900, foo)
from the secondary index, and since 900 > 800, we can bail out of cleanup.Step 2 here is extremely wasteful. If we have
n
elements in our secondary index for the same key, then, in the worst case, we will do the extra cleanupn-1
times; and each time is a linear time operation! Thus, for a list that hasn
elements,d
of which need to be cleaned up, the worst-case time complexity is inO(d*(n-d))
, instead ofO(n)
. And it's completely unnecessary work.How does this PR fix the issue?
It's pretty simple to fix this for value state and map state. This is because every key in value or map state maps to exactly one element in the secondary index. We can maintain a one-to-one correspondence. Any time we modify value/map state, we make sure that we delete the previous entry in the secondary index. This logic is implemented by OneToOneTTLState.
The trickier aspect is handling this for ListState, where the secondary index goes from grouping key to the map that needs to be cleaned up. There's a one to many mapping here; one grouping key maps to multiple records, all of which could expire at a different time. The trick to making sure that secondary indexes don't explode is by making your secondary index store only the minimum expiration timestamp in a list. The rough intuition is that you don't need to store anything larger than that, since when you clean up due to the minimum expiration timestamp, you'll go through the list anyway, and you can find the next minimum timestamp; you can then put that into your secondary index. This logic is implemented by OneToManyTTLState.
How should reviewers review this PR?
OneToOneTTLState
andOneToManyTTLState
inTTLState.scala
.Does this PR introduce any user-facing change?
No, but it is a format difference in the way TWS represents its internal state. However, since TWS is currently
private[sql]
and not publicly available, this is not an issue.How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
Generated-by: GitHub Copilot