Skip to content

Commit

Permalink
Merge #33536
Browse files Browse the repository at this point in the history
33536: distsqlrun: fix lookup join order preservation r=jordanlewis a=jordanlewis

Fixes #33354.

Previously, a lookup join with a left side that doesn't have an
injective mapping into the right side would fail to preserve its input
order, in violation of its contract.

Now, the code is modified to use the order-preserving pathway that was
designed for outer joins all the time, to prevent the issue.

Release note (bug fix): lookup joins preserve their input order even if
more than one row of the input corresponds to the same row of the lookup
table.

Co-authored-by: Jordan Lewis <[email protected]>
  • Loading branch information
craig[bot] and jordanlewis committed Jan 14, 2019
2 parents 6de0527 + c792e7b commit 93857e3
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 54 deletions.
46 changes: 23 additions & 23 deletions pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ const (
// jrPerformingLookup means we are performing an index lookup for the current
// input row batch.
jrPerformingLookup
// jrCollectingOutputRows is used for left outer joins. It means we are
// collecting the result of the index lookup to be emitted, while preserving
// the order of the input and rendering rows for unmatched inputs.
// jrCollectingOutputRows means we are collecting the result of the index
// lookup to be emitted, while preserving the order of the input, and
// optionally rendering rows for unmatched inputs for left outer joins.
jrCollectingOutputRows
// jrEmittingRows means we are emitting the results of the index lookup.
jrEmittingRows
Expand Down Expand Up @@ -398,11 +398,16 @@ func (jr *joinReader) readInput() (joinReaderState, *ProducerMetadata) {
return jrStateUnknown, jr.DrainHelper()
}

// If this is an outer join, maintain a map from input row index to the
// corresponding output rows. This will allow us to emit unmatched rows while
// preserving the order of the input. For inner joins, we can skip this step
// and add output rows directly to jr.toEmit.
if jr.joinType == sqlbase.LeftOuterJoin {
// Maintain a map from input row index to the corresponding output rows. This
// will allow us to preserve the order of the input in the face of multiple
// input rows having the same lookup keyspan, or if we're doing an outer join
// and we need to emit unmatched rows.
if len(jr.inputRowIdxToOutputRows) >= len(jr.inputRows) {
jr.inputRowIdxToOutputRows = jr.inputRowIdxToOutputRows[:len(jr.inputRows)]
for i := range jr.inputRowIdxToOutputRows {
jr.inputRowIdxToOutputRows[i] = nil
}
} else {
jr.inputRowIdxToOutputRows = make([]sqlbase.EncDatumRows, len(jr.inputRows))
}

Expand Down Expand Up @@ -516,12 +521,8 @@ func (jr *joinReader) performLookup() (joinReaderState, *ProducerMetadata) {
}
if renderedRow != nil {
rowCopy := jr.out.rowAlloc.CopyRow(renderedRow)
if jr.inputRowIdxToOutputRows == nil {
jr.toEmit = append(jr.toEmit, rowCopy)
} else {
jr.inputRowIdxToOutputRows[inputRowIdx] = append(
jr.inputRowIdxToOutputRows[inputRowIdx], rowCopy)
}
jr.inputRowIdxToOutputRows[inputRowIdx] = append(
jr.inputRowIdxToOutputRows[inputRowIdx], rowCopy)
}
}
}
Expand All @@ -533,20 +534,19 @@ func (jr *joinReader) performLookup() (joinReaderState, *ProducerMetadata) {
return jrEmittingRows, nil
}

// collectOutputRows is used for left joins only. It iterates over
// jr.inputRowIdxToOutputRows and adds output rows to jr.Emit, rendering rows
// for unmatched inputs while preserving the input order. For inner joins it is
// a no-op.
// collectOutputRows iterates over jr.inputRowIdxToOutputRows and adds output
// rows to jr.Emit, rendering rows for unmatched inputs if the join is a left
// outer join, while preserving the input order.
func (jr *joinReader) collectOutputRows() joinReaderState {
if jr.joinType == sqlbase.LeftOuterJoin {
for i, outputRows := range jr.inputRowIdxToOutputRows {
if len(outputRows) == 0 {
for i, outputRows := range jr.inputRowIdxToOutputRows {
if len(outputRows) == 0 {
if jr.joinType == sqlbase.LeftOuterJoin {
if row := jr.renderUnmatchedRow(jr.inputRows[i], leftSide); row != nil {
jr.toEmit = append(jr.toEmit, jr.out.rowAlloc.CopyRow(row))
}
} else {
jr.toEmit = append(jr.toEmit, outputRows...)
}
} else {
jr.toEmit = append(jr.toEmit, outputRows...)
}
}
return jrEmittingRows
Expand Down
20 changes: 19 additions & 1 deletion pkg/sql/distsqlrun/joinreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,24 @@ func TestJoinReader(t *testing.T) {
outputTypes: threeIntCols,
expected: "[[0 2 2] [0 2 2] [0 5 5] [1 0 0] [1 5 5]]",
},
{
description: "Test lookup joins preserve order of left input",
post: distsqlpb.PostProcessSpec{
Projection: true,
OutputColumns: []uint32{0, 1, 3},
},
input: [][]tree.Datum{
{aFn(2), bFn(2)},
{aFn(5), bFn(5)},
{aFn(2), bFn(2)},
{aFn(10), bFn(10)},
{aFn(15), bFn(15)},
},
lookupCols: []uint32{0, 1},
inputTypes: twoIntCols,
outputTypes: threeIntCols,
expected: "[[0 2 2] [0 5 5] [0 2 2] [1 0 0] [1 5 5]]",
},
{
description: "Test lookup join with onExpr",
post: distsqlpb.PostProcessSpec{
Expand Down Expand Up @@ -327,7 +345,7 @@ func TestJoinReader(t *testing.T) {
}

// Set a lower batch size to force multiple batches.
jr.batchSize = 2
jr.batchSize = 3

jr.Run(ctx)

Expand Down
32 changes: 16 additions & 16 deletions pkg/sql/logictest/testdata/logic_test/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ ALTER TABLE abc INJECT STATISTICS '[
{
"columns": ["a"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 1,
"distinct_count": 1
"row_count": 100,
"distinct_count": 100
}
]'

Expand All @@ -31,8 +31,8 @@ ALTER TABLE def INJECT STATISTICS '[
{
"columns": ["f"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 10,
"distinct_count": 10
"row_count": 10000,
"distinct_count": 10000
}
]'

Expand All @@ -41,8 +41,8 @@ ALTER TABLE gh INJECT STATISTICS '[
{
"columns": ["g"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 10,
"distinct_count": 10
"row_count": 10000,
"distinct_count": 10000
}
]'

Expand Down Expand Up @@ -155,8 +155,8 @@ ALTER TABLE books INJECT STATISTICS '[
{
"columns": ["title"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 1,
"distinct_count": 1
"row_count": 100,
"distinct_count": 100
}
]'

Expand All @@ -165,8 +165,8 @@ ALTER TABLE books2 INJECT STATISTICS '[
{
"columns": ["title"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 10,
"distinct_count": 10
"row_count": 10000,
"distinct_count": 1000
}
]'

Expand All @@ -186,8 +186,8 @@ ALTER TABLE authors INJECT STATISTICS '[
{
"columns": ["name"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 1,
"distinct_count": 1
"row_count": 100,
"distinct_count": 100
}
]'

Expand Down Expand Up @@ -250,8 +250,8 @@ ALTER TABLE small INJECT STATISTICS '[
{
"columns": ["a"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 1,
"distinct_count": 1
"row_count": 100,
"distinct_count": 100
}
]'

Expand All @@ -260,8 +260,8 @@ ALTER TABLE large INJECT STATISTICS '[
{
"columns": ["a"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 10,
"distinct_count": 10
"row_count": 10000,
"distinct_count": 10000
}
]'

Expand Down
32 changes: 18 additions & 14 deletions pkg/sql/opt/exec/execbuilder/testdata/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ ALTER TABLE books2 INJECT STATISTICS '[
"columns": ["title"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 10000,
"distinct_count": 10000
"distinct_count": 1000
}
]'

Expand All @@ -176,7 +176,7 @@ distinct · · (title)
· table books2@primary · ·

statement ok
CREATE TABLE authors (name STRING PRIMARY KEY, book STRING)
CREATE TABLE authors (name STRING, book STRING)

statement ok
ALTER TABLE authors INJECT STATISTICS '[
Expand Down Expand Up @@ -219,24 +219,28 @@ SELECT start_key, end_key, replicas, lease_holder from [SHOW EXPERIMENTAL_RANGES
start_key end_key replicas lease_holder
NULL NULL {5} 5

# TODO(radu): this doesn't seem to be a lookup join, but it should be.

query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT DISTINCT authors.name FROM books AS b1, books2 AS b2, authors WHERE b1.title = b2.title AND authors.book = b1.title AND b1.shelf <> b2.shelf]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzEk09r3DAQxe_9FOqcdkHBlv8FDAsq5ND04JS0t7IHxZruijqSkWRoCfvdi21I1m6s1HvJzZbmN-_NG_QE2kisxCM6KH8AAwo57Cm01tTonLH98Vh0K39DGVNQuu18f7ynUBuLUD6BV75BKKEyV6aNCqAg0QvVDGUnCqbzL5Dz4oBQXp_oWWMWbvxdPDR4j0KijeJJe2itehT2D38w5pdLgMK3VmhXkiugcNf5knBGeQpLNtilNtjrNkTnj31sZz6WpJNF6RfFThsr0aKcZ_p2ySv-Pwt3_GKURhslU_vPSSWUp5Rni6bTS_NKA2tbvbXsPaPLppM0-NNvONvurDoch69-hopseEZ2hOdb8qm6IRtekI87wpPt84TL4-VrQr5Rzitd-yif-uJssX8x6f_Go75H1xrt8L9eddxnh_KA4y6c6WyNX62pB5nx927ghgOJzo-31-PPrR6veoPnMAvC6QRmczhZASdzOA3CeVg5WwH_o5wH4SIcWBGE4xm8P334GwAA__856fjX
https://cockroachdb.github.io/distsqlplan/decode.html#eJzEk09r3DAQxe_9FOqcdkHBlv8FDAsq5ND04JS0t7IHxZruijqSkWRoCfvdi23oxu5aqffSmy3Nb-a9N-gFtJFYiWd0UH4DBhRy2FNoranROWP747HoXv6EMqagdNv5_nhPoTYWoXwBr3yDUEJlbkwbFUBBoheqGcpOFEznz5Dz4oBQ3p7oq8Ys3PireGrwEYVEG8WT9tBa9SzsL_5kzA-XAIUvrdCuJDdA4aHzJeGM8hSWZLBrZbDLMkTnj31sl3UkizqSRR3n8Z02VqJFOQ_47ZILZj4Kd_xklEYbJVMvZ7mUp5Rni6LTa8NLAztcvcLsf0aXTZ00-N1vONvurDoch6_eQ0U2PCM7wvMt-VDdkQ0vyPsd4cn2j8Nle_makO-U80rXPsqnujhb7F9M-r_xwh_RtUY7_KcnHvfZoTzguAtnOlvjZ2vqYcz4-zBww4FE58fb2_HnXo9XvcDXMAvC6QRmczhZASdzOA3CeXhytgL-a3IehItwYEUQjmfw_vTudwAAAP__beT76A==

query TTTTT colnames
EXPLAIN (VERBOSE) SELECT a.name FROM authors AS a JOIN books2 AS b2 ON a.book = b2.title ORDER BY a.name
----
tree field description columns ordering
render · · (name) +name
│ render 0 name · ·
└── lookup-join · · (name, book, title) +name
│ type inner · ·
├── scan · · (name, book) +name
│ table authors@primary · ·
│ spans ALL · ·
└── scan · · (title) ·
· table books2@primary · ·
tree field description columns ordering
render · · (name) +name
│ render 0 name · ·
└── lookup-join · · (name, book, title) +name
│ type inner · ·
├── sort · · (name, book) +name
│ │ order +name · ·
│ └── scan · · (name, book) ·
│ table authors@primary · ·
│ spans ALL · ·
└── scan · · (title) ·
· table books2@primary · ·

# Cross joins should not be planned as lookup joins.
query TTTTT colnames
Expand All @@ -263,7 +267,7 @@ render · · (title, edition, shelf, title, editi
query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT * FROM authors INNER JOIN books2 ON books2.edition = 1 WHERE books2.title = authors.book]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJyUkUFrwzAMhe_7FUZnlcZpC8Mw8LVjpKPrbeTgJqLL2lrGdmCj5L8PJ4O1Y822o5706T1bJ7BcU2GOFEA9gwSEBZQIznNFIbBP8jC0rN9AZQiNdW1McolQsSdQJ4hNPBAoKHjCbpoDQk3RNId-rEPgNn5BIZodgZp1eLZYji_emO2B1mRq8tPsYj043xyNf9emjS8pL8KTMzYoMYFr1vI_1vfc2E9n-bPzlnkf0qMfmPetE6_cWMFWCZ3EVSH0XNwJqZRaFpvbJLVRCS1R56hnqOeoF1ez5hdZf_n_NQXHNtCfDpB1JQLVOxpuHLj1FT16rnqboVz1XC_UFOLQnQ3F0g6tFPAclqNwPg7no3D2DS67m48AAAD__01P5UU=
https://cockroachdb.github.io/distsqlplan/decode.html#eJyUkVFLwzAQx9_9FOGeb6zpNpCAkNeJdDL3Jn3I2mPWbbmQpKCMfndpI7iJq_p4_7vf5UfuBJZrKsyRAqhnkICwgBLBea4oBPZ9nIaW9RuoDKGxro19XCJU7AnUCWITDwQKCp6wm-aAUFM0zWEY6xC4jV9QiGZHoGYdni2W44s3ZnugNZma_DS7WA_ON0fj37Vp40vvi_DkjA1KTABh1UYltESdwzUP-R-Pe27sp4b8WWPLvA_9Dzww71snXrmxgq0Sug9XhdBzcSekUmpZbG4vFFHPUM9RL6665heuvxxjTcGxDfSna2RdiUD1jtLBA7e-okfP1fBMKlcDNwQ1hZi6s1QsbWr1guewHIXzcTgfhbNvcNndfAQAAP__24voVg==

####################################
# LOOKUP JOIN ON SECONDARY INDEX #
Expand Down

0 comments on commit 93857e3

Please sign in to comment.