Skip to content

Commit

Permalink
Merge pull request #197 from Insight-Services-APAC/feature/v0.4.0
Browse files Browse the repository at this point in the history
Feature/v0.4.0
  • Loading branch information
jrampono authored Nov 29, 2024
2 parents c8077db + ebb4da6 commit 0be31cf
Show file tree
Hide file tree
Showing 16 changed files with 331 additions and 67 deletions.
12 changes: 7 additions & 5 deletions dbt/adapters/fabricsparknb/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ def ListRelations(profile):
# Load JSON data from file
data = json.load(file)

table = agate.Table.from_object(data)

return table

for row in data:
# Access the 'information' field
information = row.get('information', None)
Expand Down Expand Up @@ -39,9 +43,7 @@ def ListRelations(profile):
#row.update(parsed_data)
#row.pop('information')
# Convert the data to an Agate table
table = agate.Table.from_object(data)

return table



@staticmethod
Expand Down Expand Up @@ -80,13 +82,13 @@ def ListSchema(profile, schema):
with io.open(profile.project_root + '/metaextracts/ListSchemas.json', 'r') as file:
# Load JSON data from file
data = json.load(file)

table = agate.Table.from_object(data)

#transforming Database/schema name to lower case
schema = schema.lower()

# Filter the table
filtered_table = table.where(lambda row: row['namespace'] == schema)
filtered_table = table.where(lambda row: str(row['namespace']).lower() == schema)

return filtered_table
9 changes: 7 additions & 2 deletions dbt/adapters/fabricsparknb/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,15 @@ def quote(self, identifier: str) -> str: # type: ignore
def _get_relation_information(self, row: agate.Row) -> RelationInfo:
"""relation info was fetched with SHOW TABLES EXTENDED"""
try:
_schema, name, _, information = row
_schema = row['namespace']
name = row['tableName']
information = row['information']
except ValueError:
msg:str = ""
for r in row:
msg += str(r) + "; "
raise dbt.exceptions.DbtRuntimeError(
f'Invalid value from "show tables extended ...", got {len(row)} values, expected 4'
f'Invalid value from "show tables extended ...", got {len(row)} values, expected 4 {msg}'
)

return _schema, name, information
Expand Down
32 changes: 31 additions & 1 deletion dbt/adapters/fabricsparknb/notebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def GetIncludeDir():
# print(str(path))
return (path)


class ModelNotebook:
def __init__(self, nb : nbf.NotebookNode = None, node_type='model'):

Expand Down Expand Up @@ -62,12 +63,41 @@ def SetTheSqlVariable(self):
x = 1
for sql in self.sql:
# Create a new code cell
new_cell = nbf.v4.new_code_cell(source="sql = '''" + sql + "'''\n" + "spark.sql(sql)")
cell_sql = """
sql = '''""" + sql + "'''\n" + """
for s in sql.split(';\\n'):
spark.sql(s)
"""
new_cell = nbf.v4.new_code_cell(source=cell_sql)
# Insert the new cell into the middle of the notebook
self.nb.cells.insert((i + x), new_cell)
x += 1
break

def SetThePythonPreScript(self, project_root, file_name, lakehouse_name):
python_models_dir = Path(project_root) / Path('models_python')
# print(python_models_dir)
# print(file_name)
# If the path does not exist, create it
if not os.path.exists(python_models_dir):
os.makedirs(python_models_dir)

# Remove the .sql extension and replace it with .py
file_name = file_name.replace(".ipynb", ".py")

# Check if a matching file exists in the models_python directory
if os.path.exists(python_models_dir / Path(file_name)):
data = ""
# Read the file
with io.open(python_models_dir / Path(file_name), 'r') as file:
data = file.read()
data = re.sub(r"\{\{\s*lakehouse_name\s*\}\}", lakehouse_name, data)
for i, cell in enumerate(self.nb.cells):
if cell.cell_type == 'markdown' and "# Pre-Execution Python Script" in cell.source:
new_cell = nbf.v4.new_code_cell(source=data)
self.nb.cells.insert((i + 1), new_cell)
break

def GetSparkSqlCells(self):
# Get the existing SQL Cell from the notebook. It will be the code cell following the markdown cell containing "# SPARK SQL Cell for Debugging"
spark_sql_cells = []
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/fabricsparknb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def SetSqlVariableForAllNotebooks(project_root, lakehouse_name):
# Gather the Spark SQL from the notebook and set the sql variable
mnb.GatherSql()
mnb.SetTheSqlVariable()
mnb.SetThePythonPreScript()
# always set the config in first code cell
mnb.nb.cells[1].source = mnb.nb.cells[1].source.replace("{{lakehouse_name}}", lakehouse_name)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{% macro get_incremental_delete_reload_sql(arg_dict) %}

{% do return(delete_reload_macro_with_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %}

{% endmacro %}


{% macro delete_reload_macro_with_sql(target_relation, temp_relation, unique_key, dest_columns, incremental_predicates) %}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

DELETE FROM {{ target_relation }}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ temp_relation }}
);

{% endmacro %}




{% macro dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) %}
{#-- Validate the incremental strategy #}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'append', 'merge', 'insert_overwrite', 'delete_reload'
{%- endset %}

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta' or 'iceberg' or 'hudi'
{%- endset %}

{% set invalid_insert_overwrite_endpoint_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You cannot use this strategy when connecting via endpoint
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite','delete_reload'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format not in ['delta', 'iceberg', 'hudi'] %} -- and target.endpoint
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}
{% endif %}

{% do return(raw_strategy) %}
{% endmacro %}


{% macro dbt_spark_get_incremental_sql(strategy, source, target, existing, unique_key, incremental_predicates) %}
{%- if strategy == 'append' -%}
{#-- insert new records into existing table, without updating or overwriting #}
{{ get_insert_into_sql(source, target) }}
{%- elif strategy == 'delete_reload' -%}
{#-- insert new records into existing table, without updating or overwriting #}
DELETE FROM {{ target }};
{{ get_insert_into_sql(source, target) }}
{%- elif strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target, existing) }}
{%- elif strategy == 'merge' -%}
{#-- merge all columns for datasources which implement MERGE INTO (e.g. databricks, iceberg) - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, incremental_predicates=incremental_predicates) }}
{%- else -%}
{% set no_sql_for_strategy_msg -%}
No known SQL for the incremental strategy provided: {{ strategy }}
{%- endset %}
{%- do exceptions.raise_compiler_error(no_sql_for_strategy_msg) -%}
{%- endif -%}

{% endmacro %}
10 changes: 6 additions & 4 deletions dbt/include/fabricsparknb/notebooks/master_notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,11 @@
"outputs": [],
"source": [
"# First make sure that current hash info is the latest for the environment\n",
"mssparkutils.notebook.run(\"metadata_{{ project_name }}_extract\")"
"embedded_hashcheck = {{ notebook_hashcheck }} # type: ignore\n",
"if embedded_hashcheck == 0:\n",
" print('Metadata Hash Check Bypassed')\n",
"else:\n",
" mssparkutils.notebook.run(\"metadata_{{ project_name }}_extract\", {{ notebook_timeout }})"
]
},
{
Expand All @@ -197,7 +201,6 @@
"source": [
"embedded_hashes = {{ hashes }} # type: ignore\n",
"RelativePathForMetaData = \"Files/MetaExtracts/\"\n",
"current_hashes = json.loads(get_file_content_using_notebookutils(RelativePathForMetaData + 'MetaHashes.json'))\n",
"\n",
"def get_hash(file, hashes):\n",
" ret = \"\"\n",
Expand All @@ -206,12 +209,11 @@
" return h['hash']\n",
" return ret\n",
"\n",
"embedded_hashcheck = {{ notebook_hashcheck }} # type: ignore\n",
"\n",
"##Hashcheck: BYPASS = 0, WARNING = 1, ERROR = 2\n",
"if embedded_hashcheck == 0:\n",
" print('Metadata Hash Check Bypassed')\n",
"else:\n",
" current_hashes = json.loads(get_file_content_using_notebookutils(RelativePathForMetaData + 'MetaHashes.json'))\n",
" if current_hashes != embedded_hashes:\n",
" for h in embedded_hashes:\n",
" print(\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
" start_time = time.time()\n",
"\n",
" try:\n",
" mssparkutils.notebook.run(notebook_file)\n",
" mssparkutils.notebook.run(notebook_file, {{ notebook_timeout }})\n",
" status = 'success'\n",
" error = None\n",
" except Exception as e:\n",
Expand Down
Loading

0 comments on commit 0be31cf

Please sign in to comment.