Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/v0.4.0 #197

Merged
merged 3 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions dbt/adapters/fabricsparknb/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ def ListRelations(profile):
# Load JSON data from file
data = json.load(file)

table = agate.Table.from_object(data)

return table

for row in data:
# Access the 'information' field
information = row.get('information', None)
Expand Down Expand Up @@ -39,9 +43,7 @@ def ListRelations(profile):
#row.update(parsed_data)
#row.pop('information')
# Convert the data to an Agate table
table = agate.Table.from_object(data)

return table



@staticmethod
Expand Down Expand Up @@ -80,13 +82,13 @@ def ListSchema(profile, schema):
with io.open(profile.project_root + '/metaextracts/ListSchemas.json', 'r') as file:
# Load JSON data from file
data = json.load(file)

table = agate.Table.from_object(data)

#transforming Database/schema name to lower case
schema = schema.lower()

# Filter the table
filtered_table = table.where(lambda row: row['namespace'] == schema)
filtered_table = table.where(lambda row: str(row['namespace']).lower() == schema)

return filtered_table
9 changes: 7 additions & 2 deletions dbt/adapters/fabricsparknb/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,15 @@ def quote(self, identifier: str) -> str: # type: ignore
def _get_relation_information(self, row: agate.Row) -> RelationInfo:
"""relation info was fetched with SHOW TABLES EXTENDED"""
try:
_schema, name, _, information = row
_schema = row['namespace']
name = row['tableName']
information = row['information']
except ValueError:
msg:str = ""
for r in row:
msg += str(r) + "; "
raise dbt.exceptions.DbtRuntimeError(
f'Invalid value from "show tables extended ...", got {len(row)} values, expected 4'
f'Invalid value from "show tables extended ...", got {len(row)} values, expected 4 {msg}'
)

return _schema, name, information
Expand Down
32 changes: 31 additions & 1 deletion dbt/adapters/fabricsparknb/notebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def GetIncludeDir():
# print(str(path))
return (path)


class ModelNotebook:
def __init__(self, nb : nbf.NotebookNode = None, node_type='model'):

Expand Down Expand Up @@ -62,12 +63,41 @@ def SetTheSqlVariable(self):
x = 1
for sql in self.sql:
# Create a new code cell
new_cell = nbf.v4.new_code_cell(source="sql = '''" + sql + "'''\n" + "spark.sql(sql)")
cell_sql = """
sql = '''""" + sql + "'''\n" + """
for s in sql.split(';\\n'):
spark.sql(s)
"""
new_cell = nbf.v4.new_code_cell(source=cell_sql)
# Insert the new cell into the middle of the notebook
self.nb.cells.insert((i + x), new_cell)
x += 1
break

def SetThePythonPreScript(self, project_root, file_name, lakehouse_name):
python_models_dir = Path(project_root) / Path('models_python')
# print(python_models_dir)
# print(file_name)
# If the path does not exist, create it
if not os.path.exists(python_models_dir):
os.makedirs(python_models_dir)

# Remove the .sql extension and replace it with .py
file_name = file_name.replace(".ipynb", ".py")

# Check if a matching file exists in the models_python directory
if os.path.exists(python_models_dir / Path(file_name)):
data = ""
# Read the file
with io.open(python_models_dir / Path(file_name), 'r') as file:
data = file.read()
data = re.sub(r"\{\{\s*lakehouse_name\s*\}\}", lakehouse_name, data)
for i, cell in enumerate(self.nb.cells):
if cell.cell_type == 'markdown' and "# Pre-Execution Python Script" in cell.source:
new_cell = nbf.v4.new_code_cell(source=data)
self.nb.cells.insert((i + 1), new_cell)
break

def GetSparkSqlCells(self):
# Get the existing SQL Cell from the notebook. It will be the code cell following the markdown cell containing "# SPARK SQL Cell for Debugging"
spark_sql_cells = []
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/fabricsparknb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def SetSqlVariableForAllNotebooks(project_root, lakehouse_name):
# Gather the Spark SQL from the notebook and set the sql variable
mnb.GatherSql()
mnb.SetTheSqlVariable()
mnb.SetThePythonPreScript()
# always set the config in first code cell
mnb.nb.cells[1].source = mnb.nb.cells[1].source.replace("{{lakehouse_name}}", lakehouse_name)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{% macro get_incremental_delete_reload_sql(arg_dict) %}

{% do return(delete_reload_macro_with_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %}

{% endmacro %}


{% macro delete_reload_macro_with_sql(target_relation, temp_relation, unique_key, dest_columns, incremental_predicates) %}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

DELETE FROM {{ target_relation }}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ temp_relation }}
);

{% endmacro %}




{% macro dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) %}
{#-- Validate the incremental strategy #}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'append', 'merge', 'insert_overwrite', 'delete_reload'
{%- endset %}

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta' or 'iceberg' or 'hudi'
{%- endset %}

{% set invalid_insert_overwrite_endpoint_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You cannot use this strategy when connecting via endpoint
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite','delete_reload'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format not in ['delta', 'iceberg', 'hudi'] %} -- and target.endpoint
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}
{% endif %}

{% do return(raw_strategy) %}
{% endmacro %}


{% macro dbt_spark_get_incremental_sql(strategy, source, target, existing, unique_key, incremental_predicates) %}
{%- if strategy == 'append' -%}
{#-- insert new records into existing table, without updating or overwriting #}
{{ get_insert_into_sql(source, target) }}
{%- elif strategy == 'delete_reload' -%}
{#-- insert new records into existing table, without updating or overwriting #}
DELETE FROM {{ target }};
{{ get_insert_into_sql(source, target) }}
{%- elif strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target, existing) }}
{%- elif strategy == 'merge' -%}
{#-- merge all columns for datasources which implement MERGE INTO (e.g. databricks, iceberg) - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, incremental_predicates=incremental_predicates) }}
{%- else -%}
{% set no_sql_for_strategy_msg -%}
No known SQL for the incremental strategy provided: {{ strategy }}
{%- endset %}
{%- do exceptions.raise_compiler_error(no_sql_for_strategy_msg) -%}
{%- endif -%}

{% endmacro %}
10 changes: 6 additions & 4 deletions dbt/include/fabricsparknb/notebooks/master_notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,11 @@
"outputs": [],
"source": [
"# First make sure that current hash info is the latest for the environment\n",
"mssparkutils.notebook.run(\"metadata_{{ project_name }}_extract\")"
"embedded_hashcheck = {{ notebook_hashcheck }} # type: ignore\n",
"if embedded_hashcheck == 0:\n",
" print('Metadata Hash Check Bypassed')\n",
"else:\n",
" mssparkutils.notebook.run(\"metadata_{{ project_name }}_extract\", {{ notebook_timeout }})"
]
},
{
Expand All @@ -197,7 +201,6 @@
"source": [
"embedded_hashes = {{ hashes }} # type: ignore\n",
"RelativePathForMetaData = \"Files/MetaExtracts/\"\n",
"current_hashes = json.loads(get_file_content_using_notebookutils(RelativePathForMetaData + 'MetaHashes.json'))\n",
"\n",
"def get_hash(file, hashes):\n",
" ret = \"\"\n",
Expand All @@ -206,12 +209,11 @@
" return h['hash']\n",
" return ret\n",
"\n",
"embedded_hashcheck = {{ notebook_hashcheck }} # type: ignore\n",
"\n",
"##Hashcheck: BYPASS = 0, WARNING = 1, ERROR = 2\n",
"if embedded_hashcheck == 0:\n",
" print('Metadata Hash Check Bypassed')\n",
"else:\n",
" current_hashes = json.loads(get_file_content_using_notebookutils(RelativePathForMetaData + 'MetaHashes.json'))\n",
" if current_hashes != embedded_hashes:\n",
" for h in embedded_hashes:\n",
" print(\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
" start_time = time.time()\n",
"\n",
" try:\n",
" mssparkutils.notebook.run(notebook_file)\n",
" mssparkutils.notebook.run(notebook_file, {{ notebook_timeout }})\n",
" status = 'success'\n",
" error = None\n",
" except Exception as e:\n",
Expand Down
Loading