Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch jve #210

Open
wants to merge 89 commits into
base: comp-integrity
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
f17d8a8
Support for multiple branched CaseWhen
Oct 2, 2020
366e92c
Interval (#116)
eric-feng-2011 Nov 22, 2020
c7fcd98
Remove partition ID argument from enclaves
chester-leung Nov 23, 2020
93dbf5e
Fix comments
chester-leung Nov 24, 2020
f357ab2
updates
chester-leung Nov 24, 2020
bb4018a
Merge serialization of ecall string as int
chester-leung Nov 30, 2020
56ace17
Modifications to integrate crumb, log-mac, and all-outputs_mac, wip
chester-leung Dec 2, 2020
21bbbfb
Store log mac after each output buffer, add all-outputs-mac to each e…
chester-leung Dec 4, 2020
549566f
Add all_outputs_mac to all EncryptedBlocks once all log_macs have bee…
chester-leung Dec 7, 2020
55ee664
Almost builds
chester-leung Dec 9, 2020
057caec
cpp builds
chester-leung Dec 10, 2020
db54c44
Use ubyte for all_outputs_mac
chester-leung Dec 10, 2020
e77f1eb
use Mac for all_outputs_mac
chester-leung Dec 10, 2020
736b8f6
Hopefully this works for flatbuffers all_outputs_mac mutation, cpp bu…
chester-leung Dec 10, 2020
cbb2373
merge
Dec 10, 2020
0351b5d
Merge branch 'comp-integrity' of https://github.com/mc2-project/opaqu…
Dec 10, 2020
3002bd3
Scala builds now too, running into error with union
chester-leung Dec 11, 2020
dc54741
Stuff builds, error with all outputs mac serialization. this commit u…
chester-leung Dec 11, 2020
5be9b7c
Fixed bug, basic encryption / show works
chester-leung Dec 12, 2020
86fab02
All single partition tests pass, multiple partiton passes until tpch-9
chester-leung Dec 12, 2020
8b1a1d1
All tests pass except tpch-9 and skew join
chester-leung Dec 12, 2020
18f45d6
comment tpch back in
chester-leung Dec 12, 2020
123fa1f
Merge branch 'crumb-path' of http://github.com/chester-leung/opaque i…
Dec 13, 2020
bfc06ba
Check same number of ecalls per partition - exception for scanCollect…
Dec 14, 2020
c818a41
First attempt at constructing executed DAG
Dec 14, 2020
39a4945
Fix typos
Dec 14, 2020
c970965
Rework graph
Dec 14, 2020
43ccd2e
Add log macs to graph nodes
Dec 15, 2020
69fc49e
Construct expected DAG and refactor JobNode.
Dec 16, 2020
35691ff
Implement 'paths to sink' for a DAG
Dec 17, 2020
98d5fc4
add crumb for last ecall
Dec 18, 2020
29e3312
Fix NULL handling for aggregation (#130)
wzheng Dec 18, 2020
51b621b
Changing operator matching from logical to physical (#129)
wzheng Dec 21, 2020
e9fe7bb
Aggregation rewrite (#132)
wzheng Jan 21, 2021
1ee8d5b
Merge new aggregate
Jan 25, 2021
4a97c66
updated build/sbt file (#135)
octaviansima Jan 26, 2021
2400a94
Travis update (#137)
wzheng Jan 29, 2021
6031a4a
update breeze (#138)
octaviansima Jan 29, 2021
0a20d71
TPC-H test suite added (#136)
octaviansima Jan 29, 2021
2fec4ad
Separate IN PR (#124)
Chenyu-Shi Jan 30, 2021
7cb2f9a
Merge new aggregate
Feb 1, 2021
c3b3f33
Uncomment log_mac_lst clear
Feb 1, 2021
f41ba90
Clean up comments
Feb 2, 2021
b78b4a4
Separate Concat PR (#125)
Chenyu-Shi Feb 2, 2021
2bb2e8d
Clean up comments in other files
Feb 4, 2021
2685530
Update pathsEqual to be less conservative
Feb 4, 2021
7efb677
Remove print statements from unit tests
Feb 4, 2021
0519def
Removed calls to toSet in TPC-H tests (#140)
octaviansima Feb 5, 2021
0d69b7b
Documentation update (#148)
wzheng Feb 5, 2021
0f877d4
Cluster Remote Attestation Fix (#146)
octaviansima Feb 8, 2021
c215a99
upgrade to 3.0.1 (#144)
octaviansima Feb 8, 2021
8bd1e09
Update two TPC-H queries (#149)
wzheng Feb 8, 2021
823d95d
TPC-H 20 Fix (#142)
octaviansima Feb 8, 2021
fbe324c
Add expected operator DAG generation from executedPlan string
Feb 8, 2021
f822784
Rebase
Feb 8, 2021
40e8e13
Merge comp-integrity
Feb 9, 2021
6e60c7c
Merge master
Feb 9, 2021
1321eaa
Merge branch 'expected-dag' of https://github.com/andrewlawhh/opaque …
Feb 9, 2021
b4ba2db
Join update (#145)
wzheng Feb 9, 2021
375de7f
Merge join update
Feb 9, 2021
8682f22
Integrate new join
Feb 9, 2021
c21cb7b
Add expected operator for sortexec
Feb 10, 2021
c1adf85
Merge comp-integrity with join update
Feb 10, 2021
9391435
Merge comp-integrity with join update
Feb 10, 2021
2b37dab
Merge join integration with expected dag update
Feb 10, 2021
8a93c6c
Remove some print statements
Feb 10, 2021
c190aae
Migrate from Travis CI to Github Actions (#156)
octaviansima Feb 10, 2021
41ea7b9
Upgrade to OE 0.12 (#153)
wzheng Feb 12, 2021
29da474
Update README.md
wzheng Feb 13, 2021
4d89ecb
Support for scalar subquery (#157)
wzheng Feb 18, 2021
96e6285
Add TPC-H Benchmarks (#139)
octaviansima Feb 19, 2021
b350992
Construct expected DAG from dataframe physical plan
Feb 23, 2021
20f4749
Refactor collect and add integrity checking helper function to Opaque…
Feb 23, 2021
3c28b5f
Float expressions (#160)
wzheng Feb 23, 2021
a4a6ff9
Broadcast Nested Loop Join - Left Anti and Left Semi (#159)
octaviansima Feb 24, 2021
a96abc5
Move join condition handling for equi-joins into enclave code (#164)
wzheng Feb 26, 2021
a5278a4
Distinct aggregation support (#163)
octaviansima Mar 1, 2021
e9b075b
Remove addExpectedOperator from JobVerificationEngine, add comments
Mar 4, 2021
dabc178
Implement expected DAG construction by doing graph manipulation on da…
Mar 4, 2021
38c9da5
Merge
Mar 15, 2021
98bcfdb
Fix merge errors in the test cases
Mar 15, 2021
592ec17
Fix merge errors
Mar 15, 2021
e3e140d
Merge BNLJ into integrity branch
Apr 2, 2021
67fd713
Merge join logic migration into integrity branch
Apr 2, 2021
29db9e6
Merge join logic migration into integrity branch
Apr 2, 2021
886eda8
Merge distinct aggregation support into integrity branch
Apr 2, 2021
1fb4a5a
Fix merge errors
Apr 2, 2021
8ba5f75
fix treeToList to skip visited vertices and operatorDAGFromPlan to pr…
Apr 10, 2021
898a1b4
Add descriptive comments to each function and class
Apr 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move join condition handling for equi-joins into enclave code (#164)
* Add in TPC-H 21

* Add condition processing in enclave code

* Code clean up

* Enable query 18

* WIP

* Local tests pass

* Apply suggestions from code review

Co-authored-by: octaviansima <34696537+octaviansima@users.noreply.github.com>

* WIP

* Address comments

* q21.sql

Co-authored-by: octaviansima <34696537+octaviansima@users.noreply.github.com>
wzheng and octaviansima authored Feb 26, 2021
commit a96abc5b30ba459334848641c0f3d9e2700ded62
30 changes: 15 additions & 15 deletions src/enclave/Enclave/ExpressionEvaluation.h
Original file line number Diff line number Diff line change
@@ -1789,6 +1789,7 @@ class FlatbuffersJoinExprEvaluator {
const tuix::JoinExpr* join_expr = flatbuffers::GetRoot<tuix::JoinExpr>(buf);

join_type = join_expr->join_type();
condition_eval = nullptr;
if (join_expr->condition() != NULL) {
condition_eval = std::unique_ptr<FlatbuffersExpressionEvaluator>(
new FlatbuffersExpressionEvaluator(join_expr->condition()));
@@ -1797,9 +1798,6 @@ class FlatbuffersJoinExprEvaluator {

if (join_expr->left_keys() != NULL && join_expr->right_keys() != NULL) {
is_equi_join = true;
if (join_expr->condition() != NULL) {
throw std::runtime_error("Equi join cannot have condition");
}
if (join_expr->left_keys()->size() != join_expr->right_keys()->size()) {
throw std::runtime_error("Mismatched join key lengths");
}
@@ -1835,14 +1833,12 @@ class FlatbuffersJoinExprEvaluator {
return is_primary(row1) ? row1 : row2;
}

/** Return true if the two rows satisfy the join condition. */
bool eval_condition(const tuix::Row *row1, const tuix::Row *row2) {
/** Return true if the two rows are from the same join group
* Since the function calls `is_primary`, the rows must have been tagged in Scala */
bool is_same_group(const tuix::Row *row1, const tuix::Row *row2) {
builder.Clear();
bool row1_equals_row2;

/** Check equality for equi joins. If it is a non-equi join,
* the key evaluators will be empty, so the code never enters the for loop.
*/
auto &row1_evaluators = is_primary(row1) ? left_key_evaluators : right_key_evaluators;
auto &row2_evaluators = is_primary(row2) ? left_key_evaluators : right_key_evaluators;
for (uint32_t i = 0; i < row1_evaluators.size(); i++) {
@@ -1855,9 +1851,8 @@ class FlatbuffersJoinExprEvaluator {
auto row2_field = flatbuffers::GetTemporaryPointer<tuix::Field>(builder, row2_eval_offset);

flatbuffers::Offset<tuix::Field> comparison = eval_binary_comparison<tuix::EqualTo, std::equal_to>(
builder,
row1_field,
row2_field);
builder, row1_field, row2_field);

row1_equals_row2 =
static_cast<const tuix::BooleanField *>(
flatbuffers::GetTemporaryPointer<tuix::Field>(
@@ -1868,9 +1863,12 @@ class FlatbuffersJoinExprEvaluator {
return false;
}
}
return true;
}

/* Check condition for non-equi joins */
if (!is_equi_join) {
/** Evaluate condition on the two input rows */
bool eval_condition(const tuix::Row *row1, const tuix::Row *row2) {
if (condition_eval != nullptr) {
std::vector<flatbuffers::Offset<tuix::Field>> concat_fields;
for (auto field : *row1->field_values()) {
concat_fields.push_back(flatbuffers_copy<tuix::Field>(field, builder));
@@ -1880,11 +1878,13 @@ class FlatbuffersJoinExprEvaluator {
}
flatbuffers::Offset<tuix::Row> concat = tuix::CreateRowDirect(builder, &concat_fields);
const tuix::Row *concat_ptr = flatbuffers::GetTemporaryPointer<tuix::Row>(builder, concat);

const tuix::Field *condition_result = condition_eval->eval(concat_ptr);

return static_cast<const tuix::BooleanField *>(condition_result->value())->value();
}

// The `condition_eval` can only be empty when it's an equi-join.
// Since `condition_eval` is an extra predicate used to filter out *matched* rows in an equi-join, an empty
// condition means the matched row should not be filtered out; hence the default return value of true
return true;
}

141 changes: 94 additions & 47 deletions src/enclave/Enclave/NonObliviousSortMergeJoin.cpp
Original file line number Diff line number Diff line change
@@ -5,9 +5,53 @@
#include "FlatbuffersWriters.h"
#include "common.h"

/** C++ implementation of a non-oblivious sort merge join.
/**
* C++ implementation of a non-oblivious sort merge join.
* Rows MUST be tagged primary or secondary for this to work.
*/

void test_rows_same_group(FlatbuffersJoinExprEvaluator &join_expr_eval,
const tuix::Row *primary,
const tuix::Row *current) {
if (!join_expr_eval.is_same_group(primary, current)) {
throw std::runtime_error(
std::string("Invariant violation: rows of primary_group "
"are not of the same group: ")
+ to_string(primary)
+ std::string(" vs ")
+ to_string(current));
}
}

void write_output_rows(RowWriter &group, RowWriter &w) {
auto group_buffer = group.output_buffer();
RowReader group_reader(group_buffer.view());

while (group_reader.has_next()) {
const tuix::Row *row = group_reader.next();
w.append(row);
}
}

/**
* Sort merge equi join algorithm
* Input: the rows are unioned from both the primary (or left) table and the non-primary (or right) table
*
* Outer loop: iterate over all input rows
*
* If it's a row from the left table
* - Add it to the current group
* - Otherwise start a new group
* - If it's a left semi/anti join, output the primary_matched_rows/primary_unmatched_rows
*
* If it's a row from the right table
* - Inner join: iterate over current left group, output the joined row only if the condition is satisfied
* - Left semi/anti join: iterate over `primary_unmatched_rows`, add a matched row to `primary_matched_rows`
* and remove from `primary_unmatched_rows`
*
* After loop: output the last group left semi/anti join
*/

void non_oblivious_sort_merge_join(
uint8_t *join_expr, size_t join_expr_length,
uint8_t *input_rows, size_t input_rows_length,
@@ -20,81 +64,84 @@ void non_oblivious_sort_merge_join(

RowWriter primary_group;
FlatbuffersTemporaryRow last_primary_of_group;

bool pk_fk_match = false;
RowWriter primary_matched_rows, primary_unmatched_rows; // This is only used for left semi/anti join

while (r.has_next()) {
const tuix::Row *current = r.next();

if (join_expr_eval.is_primary(current)) {
if (last_primary_of_group.get()
&& join_expr_eval.eval_condition(last_primary_of_group.get(), current)) {
&& join_expr_eval.is_same_group(last_primary_of_group.get(), current)) {

// Add this primary row to the current group
// If this is a left semi/anti join, also add the rows to primary_unmatched_rows
primary_group.append(current);
if (join_type == tuix::JoinType_LeftSemi || join_type == tuix::JoinType_LeftAnti) {
primary_unmatched_rows.append(current);
}
last_primary_of_group.set(current);

} else {
// If a new primary group is encountered
if (join_type == tuix::JoinType_LeftAnti && !pk_fk_match) {
auto primary_group_buffer = primary_group.output_buffer();
RowReader primary_group_reader(primary_group_buffer.view());

while (primary_group_reader.has_next()) {
const tuix::Row *primary = primary_group_reader.next();
w.append(primary);
}
if (join_type == tuix::JoinType_LeftSemi) {
write_output_rows(primary_matched_rows, w);
} else if (join_type == tuix::JoinType_LeftAnti) {
write_output_rows(primary_unmatched_rows, w);
}

primary_group.clear();
primary_unmatched_rows.clear();
primary_matched_rows.clear();

primary_group.append(current);
primary_unmatched_rows.append(current);
last_primary_of_group.set(current);

pk_fk_match = false;
}
} else {
// Output the joined rows resulting from this foreign row
if (last_primary_of_group.get()
&& join_expr_eval.eval_condition(last_primary_of_group.get(), current)) {
auto primary_group_buffer = primary_group.output_buffer();
RowReader primary_group_reader(primary_group_buffer.view());
while (primary_group_reader.has_next()) {
const tuix::Row *primary = primary_group_reader.next();
&& join_expr_eval.is_same_group(last_primary_of_group.get(), current)) {
if (join_type == tuix::JoinType_Inner) {
auto primary_group_buffer = primary_group.output_buffer();
RowReader primary_group_reader(primary_group_buffer.view());
while (primary_group_reader.has_next()) {
const tuix::Row *primary = primary_group_reader.next();
test_rows_same_group(join_expr_eval, primary, current);

if (!join_expr_eval.eval_condition(primary, current)) {
throw std::runtime_error(
std::string("Invariant violation: rows of primary_group "
"are not of the same group: ")
+ to_string(primary)
+ std::string(" vs ")
+ to_string(current));
if (join_expr_eval.eval_condition(primary, current)) {
w.append(primary, current);
}
}
} else if (join_type == tuix::JoinType_LeftSemi || join_type == tuix::JoinType_LeftAnti) {
auto primary_unmatched_rows_buffer = primary_unmatched_rows.output_buffer();
RowReader primary_unmatched_rows_reader(primary_unmatched_rows_buffer.view());
RowWriter new_primary_unmatched_rows;

if (join_type == tuix::JoinType_Inner) {
w.append(primary, current);
} else if (join_type == tuix::JoinType_LeftSemi) {
// Only output the pk group ONCE
if (!pk_fk_match) {
w.append(primary);
while (primary_unmatched_rows_reader.has_next()) {
const tuix::Row *primary = primary_unmatched_rows_reader.next();
test_rows_same_group(join_expr_eval, primary, current);
if (join_expr_eval.eval_condition(primary, current)) {
primary_matched_rows.append(primary);
} else {
new_primary_unmatched_rows.append(primary);
}
}

// Reset primary_unmatched_rows
primary_unmatched_rows.clear();
auto new_primary_unmatched_rows_buffer = new_primary_unmatched_rows.output_buffer();
RowReader new_primary_unmatched_rows_reader(new_primary_unmatched_rows_buffer.view());
while (new_primary_unmatched_rows_reader.has_next()) {
primary_unmatched_rows.append(new_primary_unmatched_rows_reader.next());
}
}

pk_fk_match = true;
} else {
// If pk_fk_match were true, and the code got to here, then that means the group match has not been "cleared" yet
// It will be processed when the code advances to the next pk group
pk_fk_match &= true;
}
}
}

if (join_type == tuix::JoinType_LeftAnti && !pk_fk_match) {
auto primary_group_buffer = primary_group.output_buffer();
RowReader primary_group_reader(primary_group_buffer.view());

while (primary_group_reader.has_next()) {
const tuix::Row *primary = primary_group_reader.next();
w.append(primary);
}
if (join_type == tuix::JoinType_LeftSemi) {
write_output_rows(primary_matched_rows, w);
} else if (join_type == tuix::JoinType_LeftAnti) {
write_output_rows(primary_unmatched_rows, w);
}

w.output_buffer(output_rows, output_rows_length);
4 changes: 2 additions & 2 deletions src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala
Original file line number Diff line number Diff line change
@@ -1259,7 +1259,7 @@ object Utils extends Logging {
def serializeJoinExpression(
joinType: JoinType, leftKeys: Option[Seq[Expression]], rightKeys: Option[Seq[Expression]],
leftSchema: Seq[Attribute], rightSchema: Seq[Attribute],
condition: Option[Expression] = None): Array[Byte] = {
condition: Option[Expression]): Array[Byte] = {
val builder = new FlatBufferBuilder
builder.finish(
tuix.JoinExpr.createJoinExpr(
@@ -1298,7 +1298,7 @@ object Utils extends Logging {
condition match {
case Some(condition) =>
flatbuffersSerializeExpression(builder, condition, leftSchema ++ rightSchema)
case _ => 0
case None => 0
}))
builder.sizedByteArray()
}
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SQLContext
object TPCHBenchmark {

// Add query numbers here once they are supported
val supportedQueries = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 17, 19, 20, 22)
val supportedQueries = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 17, 18, 19, 20, 21, 22)

def query(queryNumber: Int, tpch: TPCH, sqlContext: SQLContext, numPartitions: Int) = {
val sqlStr = tpch.getQuery(queryNumber)
Original file line number Diff line number Diff line change
@@ -281,6 +281,7 @@ case class EncryptedSortMergeJoinExec(
rightKeys: Seq[Expression],
leftSchema: Seq[Attribute],
rightSchema: Seq[Attribute],
condition: Option[Expression],
child: SparkPlan)
extends UnaryExecNode with OpaqueOperatorExec {

@@ -293,7 +294,7 @@ case class EncryptedSortMergeJoinExec(

override def executeBlocked(): RDD[Block] = {
val joinExprSer = Utils.serializeJoinExpression(
joinType, Some(leftKeys), Some(rightKeys), leftSchema, rightSchema)
joinType, Some(leftKeys), Some(rightKeys), leftSchema, rightSchema, condition)

timeOperator(
child.asInstanceOf[OpaqueOperatorExec].executeBlocked(),
8 changes: 2 additions & 6 deletions src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala
Original file line number Diff line number Diff line change
@@ -98,19 +98,15 @@ object OpaqueOperators extends Strategy {
rightKeysProj,
leftProjSchema.map(_.toAttribute),
rightProjSchema.map(_.toAttribute),
condition,
sorted)

val tagsDropped = joinType match {
case Inner => EncryptedProjectExec(dropTags(left.output, right.output), joined)
case LeftSemi | LeftAnti => EncryptedProjectExec(left.output, joined)
}

val filtered = condition match {
case Some(condition) => EncryptedFilterExec(condition, tagsDropped)
case None => tagsDropped
}

filtered :: Nil
tagsDropped :: Nil

// Used to match non-equi joins
case Join(left, right, joinType, condition, hint) if isEncrypted(left) && isEncrypted(right) =>
Original file line number Diff line number Diff line change
@@ -321,11 +321,20 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self =>
val p_data = for (i <- 1 to 16) yield (i, (i % 8).toString, i * 10)
val f_data = for (i <- 1 to 32) yield (i, (i % 8).toString, i * 10)
val p = makeDF(p_data, securityLevel, "id1", "join_col_1", "x")
val f = makeDF(f_data, securityLevel, "id2", "join_col_2", "x")
val f = makeDF(f_data, securityLevel, "id2", "join_col_2", "y")
val df = p.join(f, $"join_col_1" === $"join_col_2", "left_semi").sort($"join_col_1", $"id1")
df.collect
}

testAgainstSpark("left semi join with condition") { securityLevel =>
val p_data = for (i <- 1 to 16) yield (i, (i % 8).toString, i * 10)
val f_data = for (i <- 1 to 32) yield (i, (i % 8).toString, i * 10)
val p = makeDF(p_data, securityLevel, "id1", "join_col_1", "x")
val f = makeDF(f_data, securityLevel, "id2", "join_col_2", "y")
val df = p.join(f, $"join_col_1" === $"join_col_2" && $"x" > $"y", "left_semi").sort($"join_col_1", $"id1")
df.collect
}

testAgainstSpark("non-equi left semi join") { securityLevel =>
val p_data = for (i <- 1 to 16) yield (i, (i % 8).toString, i * 10)
val f_data = for (i <- 1 to 32) yield (i, (i % 8).toString, i * 10)
@@ -344,7 +353,7 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self =>
df.collect
}

testAgainstSpark("left anti join 1") { securityLevel =>
testAgainstSpark("left anti join") { securityLevel =>
val p_data = for (i <- 1 to 128) yield (i, (i % 16).toString, i * 10)
val f_data = for (i <- 1 to 256 if (i % 3) + 1 == 0 || (i % 3) + 5 == 0) yield (i, i.toString, i * 10)
val p = makeDF(p_data, securityLevel, "id", "join_col_1", "x")
@@ -353,6 +362,15 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self =>
df.collect
}

testAgainstSpark("left anti join with condition") { securityLevel =>
val p_data = for (i <- 1 to 16) yield (i, (i % 8).toString, i * 10)
val f_data = for (i <- 1 to 32) yield (i, (i % 8).toString, i * 10)
val p = makeDF(p_data, securityLevel, "id1", "join_col_1", "x")
val f = makeDF(f_data, securityLevel, "id2", "join_col_2", "y")
val df = p.join(f, $"join_col_1" === $"join_col_2" && $"x" > $"y", "left_anti").sort($"join_col_1", $"id1")
df.collect
}

testAgainstSpark("non-equi left anti join 1") { securityLevel =>
val p_data = for (i <- 1 to 128) yield (i, (i % 16).toString, i * 10)
val f_data = for (i <- 1 to 256 if (i % 3) + 1 == 0 || (i % 3) + 5 == 0) yield (i, i.toString, i * 10)