Skip to content

Commit

Permalink
osc: support skip_checksum_for_modified for more cases
Browse files Browse the repository at this point in the history
Summary: This diff adds support for `skip_checksum_for_modified` in both native checksum and fixes some code paths that were omitted in D4606292, like delta checksum and full table checksum.

Reviewed By: preritj24

Differential Revision: D65899326

fbshipit-source-id: d0dfdfd14a3e41e36af7e8a69305bbfe78b3ee00
  • Loading branch information
alexbudfb authored and facebook-github-bot committed Nov 14, 2024
1 parent 0cf88bc commit 002fcf3
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 22 deletions.
47 changes: 30 additions & 17 deletions core/lib/payload/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ def dropped_column_name_list(self):
@property
def old_column_list(self):
"""
list of column names for all the columns in the old schema except
the ones are being dropped in the new schema.
It will be used in query construction for checksum
list of column names for all the columns in the old schema except the
ones are being dropped in the new schema. Used to create triggers and
delta table.
"""
return [
col.name
Expand All @@ -295,19 +295,22 @@ def old_non_pk_column_list(self):
and col.name not in self.dropped_column_name_list
]

@property
def checksum_column_list(self):
def checksum_column_list(self, exclude_pk: bool):
"""
A list of non-pk column name suitable for comparing checksum
A list of column names suitable for comparing checksums. `exclude_pk`
causes this function to exclude primary key columns, for use when the
caller already provides them through another means.
"""
column_list = []
# Create a mapping from the new table's column names to their definitions
# to detect changes to column definitions between old and new tables.
new_columns = {col.name: col for col in self._new_table.column_list}
old_pk_name_list = [c.name for c in self._old_table.primary_key.column_list]
for col in self._old_table.column_list:
if col.name in old_pk_name_list:
if exclude_pk and col.name in old_pk_name_list:
continue
if col.name in self.dropped_column_name_list:
continue
new_columns = {col.name: col for col in self._new_table.column_list}
if col != new_columns[col.name]:
if self.skip_checksum_for_modified:
continue
Expand Down Expand Up @@ -2897,12 +2900,16 @@ def checksum_full_table(self) -> None:
"""
# Calculate checksum for old table
old_checksum = self.query(
sql.checksum_full_table(self.table_name, self.old_column_list)
sql.checksum_full_table(
self.table_name, self.checksum_column_list(exclude_pk=False)
)
)

# Calculate checksum for new table
new_checksum = self.query(
sql.checksum_full_table(self.new_table_name, self.old_column_list)
sql.checksum_full_table(
self.new_table_name, self.checksum_column_list(exclude_pk=False)
)
)
self.commit()

Expand All @@ -2919,7 +2926,9 @@ def checksum_full_table_native(self) -> None:
checksums = []
for table in [self.table_name, self.new_table_name]:
log.info(f"Calculating checksum for {table}")
sql_query = sql.checksum_full_table_native(table, self.old_column_list)
sql_query = sql.checksum_full_table_native(
table, self.checksum_column_list(exclude_pk=False)
)
checksums.append(
# Take the first row only as only one is expected.
self.query(sql_query)[0]
Expand All @@ -2945,7 +2954,7 @@ def checksum_for_single_chunk(self, table_name, use_where, idx_for_checksum):
return self.query(
sql.checksum_by_chunk_with_assign(
table_name,
self.checksum_column_list,
self.checksum_column_list(exclude_pk=True),
self._pk_for_filter,
self.range_start_vars_array,
self.range_end_vars_array,
Expand All @@ -2966,7 +2975,9 @@ def dump_current_chunk(self, use_where):
"""
log.info("Dumping raw data onto local disk for further investigation")
log.info("Columns will be dumped in following order: ")
log.info(", ".join(self._pk_for_filter + self.checksum_column_list))
log.info(
", ".join(self._pk_for_filter + self.checksum_column_list(exclude_pk=True))
)
for table_name in [self.table_name, self.new_table_name]:
if table_name == self.new_table_name:
# index for new schema can be any indexes that provides
Expand All @@ -2991,7 +3002,7 @@ def dump_current_chunk(self, use_where):
self.execute_sql(
sql.dump_current_chunk(
table_name,
self.checksum_column_list,
self.checksum_column_list(exclude_pk=True),
self._pk_for_filter,
self.range_start_vars_array,
self.select_chunk_size,
Expand Down Expand Up @@ -3066,7 +3077,7 @@ def checksum_by_chunk(
checksum: list[dict[str, int], ...] = self.query(
sql.checksum_by_chunk(
table_name,
self.checksum_column_list,
self.checksum_column_list(exclude_pk=True),
self._pk_for_filter,
self.range_start_vars_array,
self.range_end_vars_array,
Expand All @@ -3083,7 +3094,7 @@ def checksum_by_chunk(
self.execute_sql(
sql.dump_current_chunk(
table_name,
self.checksum_column_list,
self.checksum_column_list(exclude_pk=True),
self._pk_for_filter,
self.range_start_vars_array,
self.select_checksum_chunk_size,
Expand Down Expand Up @@ -3375,7 +3386,9 @@ def checksum_by_replay_chunk(self, table_name):
sql.checksum_by_replay_chunk(
table_name,
self.delta_table_name,
self.old_column_list,
# This query only uses PK for the join condition, so don't
# exclude them from the checksum itself.
self.checksum_column_list(exclude_pk=False),
self._pk_for_filter,
self.IDCOLNAME,
id_limit,
Expand Down
6 changes: 6 additions & 0 deletions core/lib/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,12 @@ def checksum_by_chunk(
using_where: bool,
force_index: str = "PRIMARY",
) -> str:
"""
Generate a SQL query to run an aggregate function over a table, taking
checksums of the columns named in `columns` and `pk_list`, where the latter
is also used to assign to SQL user variables for continuation, to facilitate
a chunk-by-chunk checksum.
"""
if using_where:
row_range = get_range_start_condition(pk_list, range_start_values)
where_clause = " WHERE {} ".format(row_range)
Expand Down
23 changes: 18 additions & 5 deletions core/tests/copy_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,27 +817,40 @@ def test_checksum_column_list(self):
payload._old_table = table_obj
payload._new_table = table_obj
# No change in the schema
self.assertEqual(payload.checksum_column_list, ["col1", "col2"])
self.assertEqual(
payload.checksum_column_list(exclude_pk=True), ["col1", "col2"]
)
self.assertEqual(
payload.checksum_column_list(exclude_pk=False), ["ID", "col1", "col2"]
)

# changed column being kept
payload._new_table = table_obj_new
payload.skip_checksum_for_modified = False
self.assertEqual(payload.checksum_column_list, ["col1", "col2"])
self.assertEqual(
payload.checksum_column_list(exclude_pk=True), ["col1", "col2"]
)
self.assertEqual(
payload.checksum_column_list(exclude_pk=False), ["ID", "col1", "col2"]
)

# skip changed
payload._new_table = table_obj_new
payload.skip_checksum_for_modified = True
self.assertEqual(payload.checksum_column_list, ["col1"])
self.assertEqual(payload.checksum_column_list(exclude_pk=True), ["col1"])
self.assertEqual(payload.checksum_column_list(exclude_pk=False), ["ID", "col1"])

# skip dropped
payload._new_table = table_obj_dropped
payload.skip_checksum_for_modified = False
self.assertEqual(payload.checksum_column_list, ["col2"])
self.assertEqual(payload.checksum_column_list(exclude_pk=True), ["col2"])
self.assertEqual(payload.checksum_column_list(exclude_pk=False), ["ID", "col2"])

# skip dropped
payload._new_table = table_obj_dropped
payload.skip_checksum_for_modified = False
self.assertEqual(payload.checksum_column_list, ["col2"])
self.assertEqual(payload.checksum_column_list(exclude_pk=True), ["col2"])
self.assertEqual(payload.checksum_column_list(exclude_pk=False), ["ID", "col2"])

def test_parse_session_overrides_str_empty(self):
payload = self.payload_setup()
Expand Down

0 comments on commit 002fcf3

Please sign in to comment.