From 16fd96e0a9949198b29243c9b737901ac4a129bf Mon Sep 17 00:00:00 2001 From: John Rampono Date: Fri, 4 Oct 2024 23:41:22 +0800 Subject: [PATCH 1/3] Adding --- dbt/adapters/fabricsparknb/catalog.py | 12 ++- dbt/adapters/fabricsparknb/impl.py | 9 +- .../notebooks/master_notebook.ipynb | 10 +- .../notebooks/master_notebook_x.ipynb | 2 +- .../notebooks/metadata_extract.ipynb | 2 +- .../notebooks/model_notebook.ipynb | 4 +- dbt_wrapper/fabric_sql.py | 4 +- dbt_wrapper/generate_files.py | 22 ++-- dbt_wrapper/main.py | 69 +++++++++++- dbt_wrapper/utils.py | 21 ++-- dbt_wrapper/wrapper.py | 102 +++++++++++++----- file.duckdb | Bin 0 -> 12288 bytes test.py | 16 +++ 13 files changed, 208 insertions(+), 65 deletions(-) create mode 100644 file.duckdb create mode 100644 test.py diff --git a/dbt/adapters/fabricsparknb/catalog.py b/dbt/adapters/fabricsparknb/catalog.py index 7a5276b..e977253 100644 --- a/dbt/adapters/fabricsparknb/catalog.py +++ b/dbt/adapters/fabricsparknb/catalog.py @@ -10,6 +10,10 @@ def ListRelations(profile): # Load JSON data from file data = json.load(file) + table = agate.Table.from_object(data) + + return table + for row in data: # Access the 'information' field information = row.get('information', None) @@ -39,9 +43,7 @@ def ListRelations(profile): #row.update(parsed_data) #row.pop('information') # Convert the data to an Agate table - table = agate.Table.from_object(data) - - return table + @staticmethod @@ -80,13 +82,13 @@ def ListSchema(profile, schema): with io.open(profile.project_root + '/metaextracts/ListSchemas.json', 'r') as file: # Load JSON data from file data = json.load(file) - + table = agate.Table.from_object(data) #transforming Database/schema name to lower case schema = schema.lower() # Filter the table - filtered_table = table.where(lambda row: row['namespace'] == schema) + filtered_table = table.where(lambda row: str(row['namespace']).lower() == schema) return filtered_table diff --git a/dbt/adapters/fabricsparknb/impl.py b/dbt/adapters/fabricsparknb/impl.py index ce3a636..4f9ba76 100644 --- a/dbt/adapters/fabricsparknb/impl.py +++ b/dbt/adapters/fabricsparknb/impl.py @@ -198,10 +198,15 @@ def quote(self, identifier: str) -> str: # type: ignore def _get_relation_information(self, row: agate.Row) -> RelationInfo: """relation info was fetched with SHOW TABLES EXTENDED""" try: - _schema, name, _, information = row + _schema = row['namespace'] + name = row['tableName'] + information = row['information'] except ValueError: + msg:str = "" + for r in row: + msg += str(r) + "; " raise dbt.exceptions.DbtRuntimeError( - f'Invalid value from "show tables extended ...", got {len(row)} values, expected 4' + f'Invalid value from "show tables extended ...", got {len(row)} values, expected 4 {msg}' ) return _schema, name, information diff --git a/dbt/include/fabricsparknb/notebooks/master_notebook.ipynb b/dbt/include/fabricsparknb/notebooks/master_notebook.ipynb index 7b6ea86..488f6c8 100644 --- a/dbt/include/fabricsparknb/notebooks/master_notebook.ipynb +++ b/dbt/include/fabricsparknb/notebooks/master_notebook.ipynb @@ -186,7 +186,11 @@ "outputs": [], "source": [ "# First make sure that current hash info is the latest for the environment\n", - "mssparkutils.notebook.run(\"metadata_{{ project_name }}_extract\")" + "embedded_hashcheck = {{ notebook_hashcheck }} # type: ignore\n", + "if embedded_hashcheck == 0:\n", + " print('Metadata Hash Check Bypassed')\n", + "else:\n", + " mssparkutils.notebook.run(\"metadata_{{ project_name }}_extract\", {{ notebook_timeout }})" ] }, { @@ -197,7 +201,6 @@ "source": [ "embedded_hashes = {{ hashes }} # type: ignore\n", "RelativePathForMetaData = \"Files/MetaExtracts/\"\n", - "current_hashes = json.loads(get_file_content_using_notebookutils(RelativePathForMetaData + 'MetaHashes.json'))\n", "\n", "def get_hash(file, hashes):\n", " ret = \"\"\n", @@ -206,12 +209,11 @@ " return h['hash']\n", " return ret\n", "\n", - "embedded_hashcheck = {{ notebook_hashcheck }} # type: ignore\n", - "\n", "##Hashcheck: BYPASS = 0, WARNING = 1, ERROR = 2\n", "if embedded_hashcheck == 0:\n", " print('Metadata Hash Check Bypassed')\n", "else:\n", + " current_hashes = json.loads(get_file_content_using_notebookutils(RelativePathForMetaData + 'MetaHashes.json'))\n", " if current_hashes != embedded_hashes:\n", " for h in embedded_hashes:\n", " print(\n", diff --git a/dbt/include/fabricsparknb/notebooks/master_notebook_x.ipynb b/dbt/include/fabricsparknb/notebooks/master_notebook_x.ipynb index f55b43d..f156b6c 100644 --- a/dbt/include/fabricsparknb/notebooks/master_notebook_x.ipynb +++ b/dbt/include/fabricsparknb/notebooks/master_notebook_x.ipynb @@ -123,7 +123,7 @@ " start_time = time.time()\n", "\n", " try:\n", - " mssparkutils.notebook.run(notebook_file)\n", + " mssparkutils.notebook.run(notebook_file, {{ notebook_timeout }})\n", " status = 'success'\n", " error = None\n", " except Exception as e:\n", diff --git a/dbt/include/fabricsparknb/notebooks/metadata_extract.ipynb b/dbt/include/fabricsparknb/notebooks/metadata_extract.ipynb index c0a6a3f..a62b6b5 100644 --- a/dbt/include/fabricsparknb/notebooks/metadata_extract.ipynb +++ b/dbt/include/fabricsparknb/notebooks/metadata_extract.ipynb @@ -1 +1 @@ -{"cells":[{"cell_type":"markdown","id":"70d877e7","metadata":{},"source":["[comment]: # (Attach Default Lakehouse Markdown Cell)\n","# 📌 Attach Default Lakehouse\n","❗**Note the code in the cell that follows is required to programatically attach the lakehouse and enable the running of spark.sql(). If this cell fails simply restart your session as this cell MUST be the first command executed on session start.**"]},{"cell_type":"code","execution_count":null,"id":"dce04845","metadata":{},"outputs":[],"source":["%%configure\n","{\n"," \"defaultLakehouse\": { \n"," \"name\": \"{{lakehouse_name}}\",\n"," }\n","}"]},{"cell_type":"markdown","id":"2bdb5b27","metadata":{},"source":["# 📦 Pip\n","Pip installs reqired specifically for this template should occur here"]},{"cell_type":"code","execution_count":null,"id":"07752518","metadata":{},"outputs":[],"source":["# No pip installs needed for this notebook"]},{"cell_type":"markdown","id":"11c9ff11","metadata":{},"source":["# 🔗 Imports"]},{"cell_type":"code","execution_count":null,"id":"fe19e753","metadata":{},"outputs":[],"source":["from notebookutils import mssparkutils # type: ignore\n","from pyspark.sql.types import StructType, StructField, StringType, BooleanType\n","from pyspark.sql.functions import lower\n","from dataclasses import dataclass\n","import json\n","import hashlib\n","import yaml\n"]},{"cell_type":"markdown","id":"0013b888-1464-4664-a1f4-d382d141ca44","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["# { } Params"]},{"cell_type":"code","execution_count":53,"id":"a5e27e4a-df79-4ac0-a338-1dda0ac03f4c","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[{"data":{"application/vnd.livy.statement-meta+json":{"execution_finish_time":"2024-04-27T02:52:24.0250181Z","execution_start_time":"2024-04-27T02:52:23.7665395Z","livy_statement_state":"available","parent_msg_id":"690af652-34e3-4abf-bdac-9edb238ab97f","queued_time":"2024-04-27T02:52:23.0985456Z","session_id":"79594e22-45e2-4587-a343-c5b3f00bc4fb","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":55,"statement_ids":[55]},"text/plain":["StatementMeta(, 79594e22-45e2-4587-a343-c5b3f00bc4fb, 55, Finished, Available)"]},"metadata":{},"output_type":"display_data"}],"source":["# Set Lakehouse\n","RelativePathForMetaData = \"Files/MetaExtracts/\""]},{"cell_type":"markdown","id":"242ad5a9","metadata":{},"source":["# #️⃣ Functions"]},{"cell_type":"code","execution_count":54,"id":"24ad9d35-b433-4e11-86f3-4fce5a017d80","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[{"data":{"application/vnd.livy.statement-meta+json":{"execution_finish_time":"2024-04-27T02:52:24.7167997Z","execution_start_time":"2024-04-27T02:52:24.4727615Z","livy_statement_state":"available","parent_msg_id":"52e68895-2c4e-49e8-8497-a2a85c0608cf","queued_time":"2024-04-27T02:52:23.1511212Z","session_id":"79594e22-45e2-4587-a343-c5b3f00bc4fb","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":56,"statement_ids":[56]},"text/plain":["StatementMeta(, 79594e22-45e2-4587-a343-c5b3f00bc4fb, 56, Finished, Available)"]},"metadata":{},"output_type":"display_data"}],"source":["def execute_sql_and_write_to_file(sql_query, file_path):\n"," # Execute the SQL query\n"," df = spark.sql(sql_query) # type: ignore\n"," df_to_file(df, file_path)\n","\n","def df_to_file(df, file_path):\n","\n"," # Convert the DataFrame to a Pandas DataFrame\n"," pandas_df = df.toPandas()\n","\n"," # Convert the Pandas DataFrame to a string\n"," df_string = pandas_df.to_json(None, orient='records')\n","\n"," # Write the string to the file\n"," mssparkutils.fs.put(file_path, df_string, True) # type: ignore\n"]},{"cell_type":"markdown","id":"bb27a6ea-2a9f-44df-a955-b4d224fa50ff","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["# List Schemas Macro Export"]},{"cell_type":"code","execution_count":55,"id":"7e0c30c6-b3c3-45ce-a3b2-9ce0148f6d32","metadata":{"collapsed":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{},"nteract":{"transient":{"deleting":false}}},"outputs":[{"data":{"application/vnd.livy.statement-meta+json":{"execution_finish_time":"2024-04-27T02:52:27.5580209Z","execution_start_time":"2024-04-27T02:52:25.1256841Z","livy_statement_state":"available","parent_msg_id":"216be99b-ea49-44cd-8f7c-17cbdc225101","queued_time":"2024-04-27T02:52:23.2196536Z","session_id":"79594e22-45e2-4587-a343-c5b3f00bc4fb","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":57,"statement_ids":[57]},"text/plain":["StatementMeta(, 79594e22-45e2-4587-a343-c5b3f00bc4fb, 57, Finished, Available)"]},"metadata":{},"output_type":"display_data"}],"source":["#convert database/ lakehouse name to lower case\n","#execute_sql_and_write_to_file(\"show databases\", f\"{RelativePathForMetaData}/ListSchemas.json\")\n","df_database = spark.sql(\"show databases\")\n","df_database = df_database.withColumn('namespace', lower(df_database['namespace']))\n","\n","df_to_file (df_database, f\"{RelativePathForMetaData}/ListSchemas.json\")"]},{"cell_type":"markdown","id":"e69ab86b-f406-4514-84ef-3f7491fee9e2","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["# List Relations"]},{"cell_type":"code","execution_count":56,"id":"bd001abf-804c-41d5-8681-2c39e88585c2","metadata":{"collapsed":false,"microsoft":{}},"outputs":[{"data":{"application/vnd.livy.statement-meta+json":{"execution_finish_time":"2024-04-27T02:52:36.0489538Z","execution_start_time":"2024-04-27T02:52:28.0456844Z","livy_statement_state":"available","parent_msg_id":"b9553cd2-389d-4a26-83a2-de73ed1757d7","queued_time":"2024-04-27T02:52:23.2910662Z","session_id":"79594e22-45e2-4587-a343-c5b3f00bc4fb","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":58,"statement_ids":[58]},"text/plain":["StatementMeta(, 79594e22-45e2-4587-a343-c5b3f00bc4fb, 58, Finished, Available)"]},"metadata":{},"output_type":"display_data"}],"source":["# Define the schema\n","schema = StructType([\n"," StructField(\"namespace\", StringType(), True),\n"," StructField(\"tableName\", StringType(), True),\n"," StructField(\"isTemporary\", BooleanType(), True),\n"," StructField(\"information\", StringType(), True)\n","])\n","\n","# Create an empty DataFrame with the schema\n","union_df = spark.createDataFrame([], schema) # type: ignore\n","\n","df = spark.sql(\"show databases\") # type: ignore\n","for row in df.collect():\n"," sql = f\"show table extended in {row['namespace']} like '*'\"\n"," df_temp = spark.sql(sql) # type: ignore\n"," \n"," #display(df_temp)\n"," union_df = union_df.union(df_temp)\n","\n","#convert database/ lakehouse name and table name to lower case\n","union_df = union_df.withColumn('namespace', lower(union_df['namespace']))\n","union_df = union_df.withColumn('tableName', lower(union_df['tableName']))\n","\n","df_to_file(union_df, f\"{RelativePathForMetaData}/ListRelations.json\")"]},{"cell_type":"markdown","id":"f9f81007-df29-4388-968c-2c16f7066021","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["# Describe Table Extended"]},{"cell_type":"code","execution_count":60,"id":"ac47579c-c536-4150-8160-bad02f59e534","metadata":{"collapsed":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[{"data":{"application/vnd.livy.statement-meta+json":{"execution_finish_time":"2024-04-27T03:12:26.4101893Z","execution_start_time":"2024-04-27T03:12:12.0299927Z","livy_statement_state":"available","parent_msg_id":"9da09a4b-0727-4dcf-9938-89588d1c00dc","queued_time":"2024-04-27T03:12:11.5311229Z","session_id":"79594e22-45e2-4587-a343-c5b3f00bc4fb","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":62,"statement_ids":[62]},"text/plain":["StatementMeta(, 79594e22-45e2-4587-a343-c5b3f00bc4fb, 62, Finished, Available)"]},"metadata":{},"output_type":"display_data"}],"source":["import json\n","from pyspark.sql.types import StructType, StructField, StringType, BooleanType # type: ignore\n","from pyspark.sql.functions import lit # type: ignore\n","\n","\n","# Define the schema\n","schema = StructType([\n"," StructField(\"col_name\", StringType(), True),\n"," StructField(\"data_type\", StringType(), True),\n"," StructField(\"comment\", StringType(), True), \n"," StructField(\"namespace\", StringType(), True),\n"," StructField(\"tableName\", StringType(), True),\n","])\n","\n","\n","# Create an empty DataFrame with the schema\n","union_df = spark.createDataFrame([], schema) # type: ignore\n","union_dtable = spark.createDataFrame([], schema) # type: ignore\n","\n","data = spark.sparkContext.wholeTextFiles(f\"{RelativePathForMetaData}/ListRelations.json\").collect() # type: ignore\n","file_content = data[0][1]\n","jo = json.loads(file_content)\n","for j in jo:\n"," sql = f\"use { j['namespace'] }\"\n"," spark.sql(sql) # type: ignore\n"," sql = f\"describe extended { j['tableName'] }\"\n"," df_temp = spark.sql(sql) # type: ignore\n"," \n"," df_temp = df_temp.withColumn('namespace', lit(j['namespace']))\n"," df_temp = df_temp.withColumn('tableName', lit(j['tableName']))\n"," \n"," union_df = union_df.union(df_temp)\n"," \n"," # generate Schema.yml\n"," sqldesc = f\"describe { j['tableName'] }\"\n"," df_dtable = spark.sql(sqldesc) # type: ignore\n"," df_dtable = df_dtable.withColumn('namespace', lower(lit(j['namespace'])))\n"," df_dtable = df_dtable.withColumn('tableName', lower(lit(j['tableName'])))\n"," union_dtable = union_dtable.union(df_dtable)\n","\n","#convert column name to lower case\n","union_df = union_df.withColumn('col_name', lower(union_df['col_name']))\n","union_dtable = union_dtable.withColumn('col_name', lower(union_dtable['col_name']))\n","union_dtable.createOrReplaceTempView(\"describetable\")\n","df_to_file(union_df, f\"{RelativePathForMetaData}/DescribeRelations.json\")"]},{"cell_type":"markdown","id":"9eb9e644","metadata":{},"source":["# Schema yaml generation"]},{"cell_type":"code","execution_count":null,"id":"74deca63","metadata":{},"outputs":[],"source":["tables = {}\n","for item in union_dtable.collect():\n"," #print(item)\n"," table_name = item['tableName']\n"," col_name = item['col_name']\n"," \n"," if table_name not in tables:\n"," tables[table_name] = []\n"," \n"," if col_name: # Only add columns with a name\n"," tables[table_name].append({\n"," \"name\": col_name,\n"," \"description\": \"add column description\"\n"," })\n","\n","output_data = {}\n","output_data[\"version\"] = 2\n","models = []\n","for table_name, columns in tables.items():\n"," models.append({\n"," \"name\": table_name,\n"," \"description\": \"add table description\",\n"," \"columns\": columns\n"," })\n","output_data[\"models\"] = models\n","\n","yaml_string = yaml.dump(output_data, default_flow_style=False)\n","RelativePathForShemaTemplate = \"Files/SchemaTemplate\"\n","file_path = f\"{RelativePathForShemaTemplate}/schema.yml\"\n","mssparkutils.fs.put(file_path, yaml_string, True) # type: ignore"]},{"cell_type":"markdown","id":"683a2a2f","metadata":{},"source":["# #️⃣ Create Hash for Files "]},{"cell_type":"code","execution_count":null,"id":"92458c21","metadata":{},"outputs":[],"source":["# Define the schema\n","schema = StructType([\n"," StructField(\"hash\", StringType(), True),\n"," StructField(\"file\", StringType(), True)\n","])\n","\n","files = [\"ListSchemas\",\"DescribeRelations\",\"ListRelations\"]\n","\n","df = spark.createDataFrame([], schema) # type: ignore\n","for file in files:\n"," data = spark.sparkContext.wholeTextFiles(f\"{RelativePathForMetaData}/{file}.json\").collect() # type: ignore\n"," file_content = data[0][1]\n"," hashinfo = {}\n"," hashinfo['hash'] = hashlib.sha256(file_content.encode('utf-8')).hexdigest()\n"," hashinfo['file'] = f\"{file}.json\"\n"," df = df.union(spark.createDataFrame([hashinfo], schema)) # type: ignore\n"," \n","df_to_file(df, f\"{RelativePathForMetaData}/MetaHashes.json\") # type: ignore\n"]}],"metadata":{"dependencies":{"lakehouse":{"default_lakehouse":"031feff6-071d-42df-818a-984771c083c4","default_lakehouse_name":"DataLake","default_lakehouse_workspace_id":"4f0cb887-047a-48a1-98c3-ebdb38c784c2"}},"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5} +{"cells":[{"cell_type":"markdown","id":"70d877e7","metadata":{},"source":["[comment]: # (Attach Default Lakehouse Markdown Cell)\n","# 📌 Attach Default Lakehouse\n","❗**Note the code in the cell that follows is required to programatically attach the lakehouse and enable the running of spark.sql(). If this cell fails simply restart your session as this cell MUST be the first command executed on session start.**"]},{"cell_type":"code","execution_count":null,"id":"dce04845","metadata":{},"outputs":[],"source":["%%configure\n","{\n"," \"defaultLakehouse\": { \n"," \"name\": \"{{lakehouse_name}}\",\n"," }\n","}"]},{"cell_type":"markdown","id":"2bdb5b27","metadata":{},"source":["# 📦 Pip\n","Pip installs reqired specifically for this template should occur here"]},{"cell_type":"code","execution_count":null,"id":"07752518","metadata":{},"outputs":[],"source":["# No pip installs needed for this notebook"]},{"cell_type":"markdown","id":"11c9ff11","metadata":{},"source":["# 🔗 Imports"]},{"cell_type":"code","execution_count":null,"id":"fe19e753","metadata":{},"outputs":[],"source":["from notebookutils import mssparkutils # type: ignore\n","from pyspark.sql.types import StructType, StructField, StringType, BooleanType\n","from pyspark.sql.functions import lower, udf, col, row_number, when, monotonically_increasing_id\n","from pyspark.sql.window import Window\n","from concurrent.futures import ThreadPoolExecutor, as_completed\n","from dataclasses import dataclass\n","import json\n","import hashlib\n","import yaml\n","import re\n"]},{"cell_type":"markdown","id":"0013b888-1464-4664-a1f4-d382d141ca44","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["# { } Params"]},{"cell_type":"code","execution_count":53,"id":"a5e27e4a-df79-4ac0-a338-1dda0ac03f4c","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[{"data":{"application/vnd.livy.statement-meta+json":{"execution_finish_time":"2024-04-27T02:52:24.0250181Z","execution_start_time":"2024-04-27T02:52:23.7665395Z","livy_statement_state":"available","parent_msg_id":"690af652-34e3-4abf-bdac-9edb238ab97f","queued_time":"2024-04-27T02:52:23.0985456Z","session_id":"79594e22-45e2-4587-a343-c5b3f00bc4fb","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":55,"statement_ids":[55]},"text/plain":["StatementMeta(, 79594e22-45e2-4587-a343-c5b3f00bc4fb, 55, Finished, Available)"]},"metadata":{},"output_type":"display_data"}],"source":["# Set Lakehouse\n","RelativePathForMetaData = \"Files/metaextracts/\""]},{"cell_type":"markdown","id":"242ad5a9","metadata":{},"source":["# #️⃣ Functions"]},{"cell_type":"code","execution_count":54,"id":"24ad9d35-b433-4e11-86f3-4fce5a017d80","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[{"data":{"application/vnd.livy.statement-meta+json":{"execution_finish_time":"2024-04-27T02:52:24.7167997Z","execution_start_time":"2024-04-27T02:52:24.4727615Z","livy_statement_state":"available","parent_msg_id":"52e68895-2c4e-49e8-8497-a2a85c0608cf","queued_time":"2024-04-27T02:52:23.1511212Z","session_id":"79594e22-45e2-4587-a343-c5b3f00bc4fb","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":56,"statement_ids":[56]},"text/plain":["StatementMeta(, 79594e22-45e2-4587-a343-c5b3f00bc4fb, 56, Finished, Available)"]},"metadata":{},"output_type":"display_data"}],"source":["def execute_sql_and_write_to_file(sql_query, file_path):\n"," # Execute the SQL query\n"," df = spark.sql(sql_query) # type: ignore\n"," df_to_file(df, file_path)\n","\n","def df_to_file(df, file_path):\n","\n"," # Convert the DataFrame to a Pandas DataFrame\n"," pandas_df = df.toPandas()\n","\n"," # Convert the Pandas DataFrame to a string\n"," df_string = pandas_df.to_json(None, orient='records')\n","\n"," # Write the string to the file\n"," mssparkutils.fs.put(file_path, df_string, True) # type: ignore\n"]},{"cell_type":"markdown","id":"bb27a6ea-2a9f-44df-a955-b4d224fa50ff","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["# List Schemas Macro Export"]},{"cell_type":"code","execution_count":55,"id":"7e0c30c6-b3c3-45ce-a3b2-9ce0148f6d32","metadata":{"collapsed":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{},"nteract":{"transient":{"deleting":false}}},"outputs":[{"data":{"application/vnd.livy.statement-meta+json":{"execution_finish_time":"2024-04-27T02:52:27.5580209Z","execution_start_time":"2024-04-27T02:52:25.1256841Z","livy_statement_state":"available","parent_msg_id":"216be99b-ea49-44cd-8f7c-17cbdc225101","queued_time":"2024-04-27T02:52:23.2196536Z","session_id":"79594e22-45e2-4587-a343-c5b3f00bc4fb","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":57,"statement_ids":[57]},"text/plain":["StatementMeta(, 79594e22-45e2-4587-a343-c5b3f00bc4fb, 57, Finished, Available)"]},"metadata":{},"output_type":"display_data"}],"source":["#convert database/ lakehouse name to lower case\n","#execute_sql_and_write_to_file(\"show databases\", f\"{RelativePathForMetaData}/ListSchemas.json\")\n","df_database = spark.sql(\"show databases\")\n","df_database = df_database.withColumn('namespace', lower(df_database['namespace']))\n","\n","df_to_file (df_database, f\"{RelativePathForMetaData}/ListSchemas.json\")"]},{"cell_type":"markdown","id":"e69ab86b-f406-4514-84ef-3f7491fee9e2","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["# List Relations"]},{"cell_type":"code","execution_count":56,"id":"bd001abf-804c-41d5-8681-2c39e88585c2","metadata":{"collapsed":false,"microsoft":{}},"outputs":[{"data":{"application/vnd.livy.statement-meta+json":{"execution_finish_time":"2024-04-27T02:52:36.0489538Z","execution_start_time":"2024-04-27T02:52:28.0456844Z","livy_statement_state":"available","parent_msg_id":"b9553cd2-389d-4a26-83a2-de73ed1757d7","queued_time":"2024-04-27T02:52:23.2910662Z","session_id":"79594e22-45e2-4587-a343-c5b3f00bc4fb","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":58,"statement_ids":[58]},"text/plain":["StatementMeta(, 79594e22-45e2-4587-a343-c5b3f00bc4fb, 58, Finished, Available)"]},"metadata":{},"output_type":"display_data"}],"source":["# Define the schema\n","schema = StructType([\n"," StructField(\"namespace\", StringType(), True),\n"," StructField(\"tableName\", StringType(), True),\n"," StructField(\"isTemporary\", BooleanType(), True),\n"," StructField(\"information\", StringType(), True),\n"," StructField(\"type\", StringType(), True)\n","])\n","\n","# Create an empty DataFrame with the schema\n","union_df = spark.createDataFrame([], schema) # type: ignore\n","\n","\n","# Define a UDF to extract the Type from metadata\n","def extract_type(metadata):\n"," match = re.search(r\"Type:\\s*(\\w+)\", metadata)\n"," if match:\n"," return match.group(1)\n"," return None\n","\n","extract_type_udf = udf(extract_type, StringType())\n","\n","# Function to execute SQL operations\n","def execute_sql_operations(namespace):\n"," sql = f\"show table extended in {namespace} like '*'\"\n"," df_temp = spark.sql(sql) # type: ignore\n"," df_temp = df_temp.withColumn(\"type\", extract_type_udf(df_temp[\"information\"]))\n"," \n"," return df_temp\n","\n","\n","# Run SQL operations in parallel using ThreadPoolExecutor\n","def run_sql_operations_in_parallel(namespaces):\n"," results = []\n"," with ThreadPoolExecutor() as executor:\n"," future_to_namespace = {executor.submit(execute_sql_operations, row['namespace']): row['namespace'] for row in namespaces}\n"," for future in as_completed(future_to_namespace):\n"," namespace = future_to_namespace[future]\n"," try:\n"," df_temp = future.result()\n"," results.append(df_temp)\n"," except Exception as exc:\n"," print(f'{namespace} generated an exception: {exc}')\n"," return results\n","\n","df = spark.sql(\"show databases\") # type: ignore\n","namespaces = df.collect()\n","\n","# Run SQL operations in parallel and collect results\n","results = run_sql_operations_in_parallel(namespaces)\n","\n","# Union all DataFrames\n","for df_temp in results:\n"," union_df = union_df.union(df_temp)\n","\n","# Lowercasing removed as it causes issues with describe statements eg. describe unable to find objects. ## John Rampono: 11-Sep-24\n","#convert database/ lakehouse name and table name to lower case\n","# union_df = union_df.withColumn('namespace', lower(union_df['namespace']))\n","# union_df = union_df.withColumn('tableName', lower(union_df['tableName']))\n","\n","# Remove noise \n","union_df = union_df.filter((col(\"namespace\") != \"\") & (col(\"namespace\").isNotNull()))\n","\n","df_to_file(union_df, f\"{RelativePathForMetaData}/ListRelations.json\")"]},{"cell_type":"markdown","id":"f9f81007-df29-4388-968c-2c16f7066021","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["# Describe Table Extended"]},{"cell_type":"code","execution_count":60,"id":"ac47579c-c536-4150-8160-bad02f59e534","metadata":{"collapsed":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[{"data":{"application/vnd.livy.statement-meta+json":{"execution_finish_time":"2024-04-27T03:12:26.4101893Z","execution_start_time":"2024-04-27T03:12:12.0299927Z","livy_statement_state":"available","parent_msg_id":"9da09a4b-0727-4dcf-9938-89588d1c00dc","queued_time":"2024-04-27T03:12:11.5311229Z","session_id":"79594e22-45e2-4587-a343-c5b3f00bc4fb","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":62,"statement_ids":[62]},"text/plain":["StatementMeta(, 79594e22-45e2-4587-a343-c5b3f00bc4fb, 62, Finished, Available)"]},"metadata":{},"output_type":"display_data"}],"source":["import json\n","from pyspark.sql.types import StructType, StructField, StringType, BooleanType # type: ignore\n","from pyspark.sql.functions import lit # type: ignore\n","\n","\n","# Define the schema\n","schema = StructType([\n"," StructField(\"col_name\", StringType(), True),\n"," StructField(\"data_type\", StringType(), True),\n"," StructField(\"comment\", StringType(), True), \n"," StructField(\"namespace\", StringType(), True),\n"," StructField(\"tableName\", StringType(), True),\n","])\n","\n","\n","# Create an empty DataFrame with the schema\n","union_df = spark.createDataFrame([], schema) # type: ignore\n","union_dtable = spark.createDataFrame([], schema) # type: ignore\n","\n","\n","\n","# Define a function to execute the SQL operations\n","def execute_sql_operations(j):\n"," # Use statement swapped for two part naming as it did not appear to be working ## John Rampono: 11-Sep-24\n"," # sql = f\"use {j['namespace']}\"\n"," # spark.sql(sql) # type: ignore\n"," sql = f\"describe extended {j['namespace']}.{j['tableName']}\"\n"," df_temp = spark.sql(sql) # type: ignore\n"," \n"," # Add a row number column using monotonically_increasing_id\n"," df_with_row_num = df_temp.withColumn(\"row_num\", monotonically_increasing_id())\n","\n"," # Identify the first row where col_name is null or empty\n"," first_null_or_empty_row = df_with_row_num.filter((col(\"col_name\") == \"\") | col(\"col_name\").isNull()).orderBy(\"row_num\").limit(1)\n","\n"," # Get the row number of the first null or empty row\n"," first_null_or_empty_row_num = first_null_or_empty_row.select(\"row_num\").collect()\n","\n"," if first_null_or_empty_row_num:\n"," first_null_or_empty_row_num = first_null_or_empty_row_num[0][\"row_num\"]\n"," # Filter the DataFrame to keep only rows before this row\n"," df_filtered = df_with_row_num.filter(col(\"row_num\") < first_null_or_empty_row_num).drop(\"row_num\")\n"," else:\n"," # If no null or empty row is found, keep the original DataFrame\n"," df_filtered = df_temp \n"," \n"," df_filtered = df_filtered.withColumn('namespace', lit(j['namespace']))\n"," df_filtered = df_filtered.withColumn('tableName', lit(j['tableName']))\n","\n"," # generate Schema.yml\n"," sqldesc = f\"describe {j['namespace']}.{j['tableName']}\"\n"," df_dtable = spark.sql(sqldesc) # type: ignore\n"," \n"," # Lowercasing removed as it causes issues with describe statements eg. describe unable to find objects. ## John Rampono: 11-Sep-24\n"," df_dtable = df_dtable.withColumn('namespace', lit(j['namespace']))\n"," df_dtable = df_dtable.withColumn('tableName', lit(j['tableName']))\n","\n"," return df_filtered, df_dtable\n","\n","# Run SQL operations in parallel using ThreadPoolExecutor\n","def run_sql_operations_in_parallel(jo):\n"," union_df = None\n"," union_dtable = None\n","\n"," with ThreadPoolExecutor() as executor:\n"," future_to_sql = {executor.submit(execute_sql_operations, j): j for j in jo}\n"," for future in as_completed(future_to_sql):\n"," j = future_to_sql[future]\n"," try:\n"," df_temp, df_dtable = future.result()\n"," if union_df is None:\n"," union_df = df_temp\n"," else:\n"," union_df = union_df.union(df_temp)\n","\n"," if union_dtable is None:\n"," union_dtable = df_dtable\n"," else:\n"," union_dtable = union_dtable.union(df_dtable)\n"," except Exception as exc:\n"," tableName = j['tableName']\n"," namespace = j['namespace']\n"," print(f'{namespace}.{tableName} generated an exception: {exc}')\n","\n"," return union_df, union_dtable\n","\n","\n","data = spark.sparkContext.wholeTextFiles(f\"{RelativePathForMetaData}/ListRelations.json\").collect() # type: ignore\n","file_content = data[0][1]\n","jo = json.loads(file_content)\n","\n","# Filter out views as these will cause issues ## John Rampono: 11-Sep-24\n","filtered_jo = [item for item in jo if item[\"type\"].lower() == \"managed\"]\n","\n","union_df, union_dtable = run_sql_operations_in_parallel(filtered_jo)\n","\n","# Lowercasing removed as it causes issues with describe statements eg. describe unable to find objects. ## John Rampono: 11-Sep-24\n","#convert column name to lower case\n","#union_df = union_df.withColumn('col_name', lower(union_df['col_name']))\n","#union_dtable = union_dtable.withColumn('col_name', lower(union_dtable['col_name']))\n","\n","# Do we still need this view?\n","union_dtable.createOrReplaceTempView(\"describetable\")\n","\n","#create file\n","df_to_file(union_df, f\"{RelativePathForMetaData}/DescribeRelations.json\")"]},{"cell_type":"markdown","id":"9eb9e644","metadata":{},"source":["# Schema yaml generation"]},{"cell_type":"code","execution_count":null,"id":"74deca63","metadata":{},"outputs":[],"source":["tables = {}\n","for item in union_dtable.collect():\n"," #print(item)\n"," table_name = item['tableName']\n"," col_name = item['col_name']\n"," \n"," if table_name not in tables:\n"," tables[table_name] = []\n"," \n"," if col_name: # Only add columns with a name\n"," tables[table_name].append({\n"," \"name\": col_name,\n"," \"description\": \"add column description\"\n"," })\n","\n","output_data = {}\n","output_data[\"version\"] = 2\n","models = []\n","for table_name, columns in tables.items():\n"," models.append({\n"," \"name\": table_name,\n"," \"description\": \"add table description\",\n"," \"columns\": columns\n"," })\n","output_data[\"models\"] = models\n","\n","yaml_string = yaml.dump(output_data, default_flow_style=False)\n","RelativePathForShemaTemplate = \"Files/SchemaTemplate\"\n","file_path = f\"{RelativePathForShemaTemplate}/schema.yml\"\n","mssparkutils.fs.put(file_path, yaml_string, True) # type: ignore"]},{"cell_type":"markdown","id":"683a2a2f","metadata":{},"source":["# #️⃣ Create Hash for Files "]},{"cell_type":"code","execution_count":null,"id":"92458c21","metadata":{},"outputs":[],"source":["# Define the schema\n","schema = StructType([\n"," StructField(\"hash\", StringType(), True),\n"," StructField(\"file\", StringType(), True)\n","])\n","\n","files = [\"ListSchemas\",\"DescribeRelations\",\"ListRelations\"]\n","\n","df = spark.createDataFrame([], schema) # type: ignore\n","for file in files:\n"," data = spark.sparkContext.wholeTextFiles(f\"{RelativePathForMetaData}/{file}.json\").collect() # type: ignore\n"," file_content = data[0][1]\n"," hashinfo = {}\n"," hashinfo['hash'] = hashlib.sha256(file_content.encode('utf-8')).hexdigest()\n"," hashinfo['file'] = f\"{file}.json\"\n"," df = df.union(spark.createDataFrame([hashinfo], schema)) # type: ignore\n"," \n","df_to_file(df, f\"{RelativePathForMetaData}/MetaHashes.json\") # type: ignore\n"]}],"metadata":{"dependencies":{"lakehouse":{"default_lakehouse":"031feff6-071d-42df-818a-984771c083c4","default_lakehouse_name":"DataLake","default_lakehouse_workspace_id":"4f0cb887-047a-48a1-98c3-ebdb38c784c2"}},"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5} diff --git a/dbt/include/fabricsparknb/notebooks/model_notebook.ipynb b/dbt/include/fabricsparknb/notebooks/model_notebook.ipynb index 16d0105..047d692 100644 --- a/dbt/include/fabricsparknb/notebooks/model_notebook.ipynb +++ b/dbt/include/fabricsparknb/notebooks/model_notebook.ipynb @@ -72,7 +72,7 @@ "def pre_execute_notebook(notebook_file):\n", "\n", " try:\n", - " mssparkutils.notebook.run(notebook_file)\n", + " mssparkutils.notebook.run(notebook_file, {{ notebook_timeout }})\n", " status = 'PreExecute Notebook Executed'\n", " error = None\n", " except Exception as e:\n", @@ -84,7 +84,7 @@ "def post_execute_notebook(notebook_file):\n", "\n", " try:\n", - " mssparkutils.notebook.run(notebook_file)\n", + " mssparkutils.notebook.run(notebook_file, {{ notebook_timeout }})\n", " status = 'PostExecute Notebook Executed'\n", " error = None\n", " except Exception as e:\n", diff --git a/dbt_wrapper/fabric_sql.py b/dbt_wrapper/fabric_sql.py index c95d9f9..a82766a 100644 --- a/dbt_wrapper/fabric_sql.py +++ b/dbt_wrapper/fabric_sql.py @@ -12,7 +12,7 @@ def __init__(self, console, server, database): self.server = server self.database = database - def ExecuteSQL(self, sql, progress: ProgressConsoleWrapper, task_id): + def ExecuteSQL(self, sql, title, progress: ProgressConsoleWrapper, task_id): credential = AzureCliCredential() # use your authentication mechanism of choice sql_endpoint = f"{self.server}.datawarehouse.fabric.microsoft.com" # copy and paste the SQL endpoint from any of the Lakehouses or Warehouses in your Fabric Workspace @@ -33,7 +33,7 @@ def ExecuteSQL(self, sql, progress: ProgressConsoleWrapper, task_id): rows = cursor.fetchall() # Create a table - table = Table(title="Results") + table = Table(title=title) # Add columns to the table for column in cursor.description: diff --git a/dbt_wrapper/generate_files.py b/dbt_wrapper/generate_files.py index 53f7cc9..5b5fa66 100644 --- a/dbt_wrapper/generate_files.py +++ b/dbt_wrapper/generate_files.py @@ -82,7 +82,7 @@ def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_nam template = env.get_template('master_notebook_x.ipynb') # Render the template with the notebook_file variable - rendered_template = template.render(notebook_files=file_str_with_current_sort_order, run_order=sort_order, lakehouse_name=lakehouse_name, project_name=project_name,max_worker=max_worker, log_lakehouse=log_lakehouse) + rendered_template = template.render(notebook_files=file_str_with_current_sort_order, run_order=sort_order, lakehouse_name=lakehouse_name, project_name=project_name, notebook_timeout=notebook_timeout, max_worker=max_worker, log_lakehouse=log_lakehouse) # Parse the rendered template as a notebook nb = nbf.reads(rendered_template, as_version=4) @@ -126,7 +126,7 @@ def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_nam MetaHashes = Catalog.GetMetaHashes(project_root) # Render the template with the notebook_file variable - rendered_template = template.render(lakehouse_name=lakehouse_name, hashes=MetaHashes, project_name=project_name, notebook_timeout=notebook_timeout, log_lakehouse=log_lakehouse,notebook_hashcheck=notebook_hashcheck) + rendered_template = template.render(lakehouse_name=lakehouse_name, hashes=MetaHashes, project_name=project_name, notebook_timeout=notebook_timeout, log_lakehouse=log_lakehouse, notebook_hashcheck=notebook_hashcheck) # Parse the rendered template as a notebook nb = nbf.reads(rendered_template, as_version=4) @@ -178,7 +178,7 @@ def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_nam raise ex -def GenerateMetadataExtract(project_root, workspaceid, lakehouseid, lakehouse_name, project_name, progress: ProgressConsoleWrapper, task_id, lakehouse_config): +def GenerateMetadataExtract(project_root, workspaceid, lakehouseid, lakehouse_name, project_name, notebook_timeout, progress: ProgressConsoleWrapper, task_id, lakehouse_config): notebook_dir = f'./{project_root}/target/notebooks/' # Define the directory containing the Jinja templates template_dir = str((mn.GetIncludeDir()) / Path('notebooks/')) @@ -190,7 +190,7 @@ def GenerateMetadataExtract(project_root, workspaceid, lakehouseid, lakehouse_na template = env.get_template('metadata_extract.ipynb') # Render the template with the notebook_file variable - rendered_template = template.render(workspace_id=workspaceid, lakehouse_id=lakehouseid, project_root=project_root, lakehouse_name=lakehouse_name) + rendered_template = template.render(workspace_id=workspaceid, lakehouse_id=lakehouseid, project_root=project_root, notebook_timeout=notebook_timeout, lakehouse_name=lakehouse_name) # Parse the rendered template as a notebook nb = nbf.reads(rendered_template, as_version=4) @@ -222,8 +222,9 @@ def GenerateMetadataExtract(project_root, workspaceid, lakehouseid, lakehouse_na except Exception as ex: progress.print(f"Error creating: {target_file_name}", level=LogLevel.ERROR) raise ex - -def GenerateUtils(project_root, workspaceid, lakehouseid, lakehouse_name, project_name, progress: ProgressConsoleWrapper, task_id): + + +def GenerateUtils(project_root, workspaceid, lakehouseid, lakehouse_name, project_name, notebook_timeout, progress: ProgressConsoleWrapper, task_id): notebook_dir = f'./{project_root}/target/notebooks/' # Define the directory containing the Jinja templates template_dir = str((mn.GetIncludeDir()) / Path('notebooks/')) @@ -237,7 +238,7 @@ def GenerateUtils(project_root, workspaceid, lakehouseid, lakehouse_name, projec createddate = datetime.now() # Render the template with the notebook_file variable - rendered_template = template.render(lakehouse_name=lakehouse_name, createddate=createddate) + rendered_template = template.render(lakehouse_name=lakehouse_name, createddate=createddate, notebook_timeout=notebook_timeout) # Parse the rendered template as a notebook nb = nbf.reads(rendered_template, as_version=4) @@ -253,7 +254,8 @@ def GenerateUtils(project_root, workspaceid, lakehouseid, lakehouse_name, projec progress.print(f"Error creating: {target_file_name}", level=LogLevel.ERROR) raise ex -def GenerateCompareNotebook(project_root, source_env, workspaceid, lakehouseid, target_env, target_workspaceid, target_lakehouseid, lakehouse_name, project_name, progress: ProgressConsoleWrapper, task_id): + +def GenerateCompareNotebook(project_root, source_env, workspaceid, lakehouseid, target_env, target_workspaceid, target_lakehouseid, lakehouse_name, project_name, notebook_timeout, progress: ProgressConsoleWrapper, task_id): notebook_dir = f'./{project_root}/target/notebooks/' # Define the directory containing the Jinja templates template_dir = str((mn.GetIncludeDir()) / Path('notebooks/')) @@ -270,7 +272,7 @@ def GenerateCompareNotebook(project_root, source_env, workspaceid, lakehouseid, rendered_template = template.render(workspace_id=workspaceid, lakehouse_id=lakehouseid, project_root=project_root , lakehouse_name=lakehouse_name, target_workspace_id=target_workspaceid, target_lakehouse_id=target_lakehouseid , source_env=source_env, target_env=target_env, createddate=createddate - ,project_name=project_name) + , project_name=project_name, notebook_timeout=notebook_timeout) # Parse the rendered template as a notebook nb = nbf.reads(rendered_template, as_version=4) @@ -287,7 +289,7 @@ def GenerateCompareNotebook(project_root, source_env, workspaceid, lakehouseid, raise ex -def GenerateMissingObjectsNotebook(project_root, workspaceid, lakehouseid, lakehouse_name, project_name, progress: ProgressConsoleWrapper, task_id, source_env, target_env): +def GenerateMissingObjectsNotebook(project_root, workspaceid, lakehouseid, lakehouse_name, project_name, notebook_timeout, progress: ProgressConsoleWrapper, task_id, source_env, target_env): notebook_dir = f'./{project_root}/target/notebooks/' # Define the directory containing the Jinja templates template_dir = str((mn.GetIncludeDir()) / Path('notebooks/')) diff --git a/dbt_wrapper/main.py b/dbt_wrapper/main.py index 8aaea46..ce4a1bd 100644 --- a/dbt_wrapper/main.py +++ b/dbt_wrapper/main.py @@ -266,7 +266,7 @@ def run_all( se.perform_stage(option=clean_target_dir, action_callables=[wrapper_commands.CleanProjectTargetDirectory], stage_name="Clean Target") action_callables = [ - lambda **kwargs: wrapper_commands.GeneratePreDbtScripts(PreInstall=pre_install, lakehouse_config=lakehouse_config, **kwargs), + lambda **kwargs: wrapper_commands.GeneratePreDbtScripts(PreInstall=pre_install, notebook_timeout=notebook_timeout, lakehouse_config=lakehouse_config, **kwargs), lambda **kwargs: wrapper_commands.ConvertNotebooksToFabricFormat(lakehouse_config=lakehouse_config, **kwargs), ] se.perform_stage(option=generate_pre_dbt_scripts, action_callables=action_callables, stage_name="Generate Pre-DBT Scripts") @@ -290,5 +290,72 @@ def run_all( se.perform_stage(option=auto_run_master_notebook, action_callables=[wrapper_commands.RunMasterNotebook], stage_name="Run Master Notebook") se.perform_stage(option=auto_run_master_notebook, action_callables=[wrapper_commands.GetExecutionResults], stage_name="Get Execution Results") + +@app.command() +def download_metadata( + dbt_project_dir: Annotated[ + str, + typer.Argument( + help="The path to the dbt_project directory. If left blank it will use the current directory" + ), + ], + dbt_profiles_dir: Annotated[ + str, + typer.Argument( + help="The path to the dbt_profile directory. If left blank it will use the users home directory followed by .dbt." + ), + ] = None, + log_level: Annotated[ + Optional[str], + typer.Option( + help="The option to set the log level. This controls the verbosity of the output. Allowed values are `DEBUG`, `INFO`, `WARNING`, `ERROR`. Default is `WARNING`.", + ), + ] = "WARNING" +): + """ + This command will run just the metadata download. + """ + + _log_level: LogLevel = LogLevel.from_string(log_level) + + wrapper_commands.GetDbtConfigs(dbt_project_dir=dbt_project_dir, dbt_profiles_dir=dbt_profiles_dir) + se: stage_executor = stage_executor(log_level=_log_level, console=console) + + se.perform_stage(option=download_metadata, action_callables=[wrapper_commands.DownloadMetadata], stage_name="Download Metadata") + + +@app.command() +def get_execution_results( + dbt_project_dir: Annotated[ + str, + typer.Argument( + help="The path to the dbt_project directory. If left blank it will use the current directory" + ), + ], + dbt_profiles_dir: Annotated[ + str, + typer.Argument( + help="The path to the dbt_profile directory. If left blank it will use the users home directory followed by .dbt." + ), + ] = None, + log_level: Annotated[ + Optional[str], + typer.Option( + help="The option to set the log level. This controls the verbosity of the output. Allowed values are `DEBUG`, `INFO`, `WARNING`, `ERROR`. Default is `WARNING`.", + ), + ] = "WARNING" +): + """ + This command will run just the extract of the last execution results. + """ + + _log_level: LogLevel = LogLevel.from_string(log_level) + + wrapper_commands.GetDbtConfigs(dbt_project_dir=dbt_project_dir, dbt_profiles_dir=dbt_profiles_dir) + se: stage_executor = stage_executor(log_level=_log_level, console=console) + + se.perform_stage(option=True, action_callables=[wrapper_commands.GetExecutionResults], stage_name="Get Execution Results") + + if __name__ == "__main__": app() diff --git a/dbt_wrapper/utils.py b/dbt_wrapper/utils.py index 5bc7b84..7ad6cca 100644 --- a/dbt_wrapper/utils.py +++ b/dbt_wrapper/utils.py @@ -44,9 +44,10 @@ def DownloadFile(progress: ProgressConsoleWrapper, task_id, directory_client: Da def DownloadFiles(progress: ProgressConsoleWrapper, task_id, file_system_client: FileSystemClient, directory_name: str, local_notebook_path: str): - progress.progress.update(task_id=task_id, description="Listing metaextract files on one drive...") - paths = file_system_client.get_paths(path=directory_name) + progress.progress.update(task_id=task_id, description=f"Listing metaextract files for path {directory_name}") + paths = file_system_client.get_paths(path=directory_name) for path in paths: + progress.progress.print(f"Found file: {path.name}") if (path.name[-5:] == ".json"): DownloadFile(progress, task_id, file_system_client, local_notebook_path, path.name) @@ -58,9 +59,13 @@ def DownloadMetaFiles(progress: ProgressConsoleWrapper, task_id, dbt_project_dir account_name = "onelake" # always this account_url = f"https://{account_name}.dfs.fabric.microsoft.com" local_notebook_path = str(Path(Path(dbt_project_dir) / Path('metaextracts'))) - token_credential = DefaultAzureCredential() - service_client = DataLakeServiceClient(account_url, credential=token_credential) - file_system_client = service_client.get_file_system_client(workspacename) - DownloadFiles(progress, task_id, file_system_client, datapath, local_notebook_path) - progress.progress.update(task_id=task_id, description="Completed download of meta extracts") - + try: + token_credential = DefaultAzureCredential() + service_client = DataLakeServiceClient(account_url, credential=token_credential) + file_system_client = service_client.get_file_system_client(workspacename) + DownloadFiles(progress, task_id, file_system_client, datapath, local_notebook_path) + progress.progress.update(task_id=task_id, description="Completed download of meta extracts") + except Exception as e: + progress.progress.print(f"Error downloading meta extracts: Workspacename: {workspacename}, DataPath: {datapath}, LocalPath: {local_notebook_path}") + raise e + diff --git a/dbt_wrapper/wrapper.py b/dbt_wrapper/wrapper.py index 2c23b0c..50d33c6 100644 --- a/dbt_wrapper/wrapper.py +++ b/dbt_wrapper/wrapper.py @@ -14,7 +14,7 @@ from dbt_wrapper.stage_executor import ProgressConsoleWrapper from rich import print from rich.panel import Panel - +from time import sleep class Commands: @@ -32,7 +32,10 @@ def __init__(self, console): self.next_env_name = None def GetDbtConfigs(self, dbt_project_dir, dbt_profiles_dir=None, source_env=None, target_env=None): - if len(dbt_project_dir.replace("\\", "/").split("/")) > 1: + path = Path(dbt_project_dir.replace("\\", "/")) + path_elements = path.parts + num_elements = len(path_elements) + if num_elements > 1: self.console.print( "Warning: :file_folder: The dbt_project_dir provided is nested and not a valid dbt project directory in windows. Copying the dbt_project_dir to the samples_tests directory.", style="warning", @@ -90,12 +93,12 @@ def PrintFirstTimeRunningMessage(self): print('\033[1;33;48m', "Error!") print(f"Directory ./{os.environ['DBT_PROJECT_DIR']}/metaextracts/ does not exist and should have been created automatically.") - def GeneratePreDbtScripts(self, PreInstall, progress: ProgressConsoleWrapper, task_id, lakehouse_config): - gf.GenerateMetadataExtract(self.dbt_project_dir, self.target_info['workspaceid'], self.target_info['lakehouseid'], self.lakehouse, self.config['name'], progress=progress, task_id=task_id, lakehouse_config=lakehouse_config) + def GeneratePreDbtScripts(self, PreInstall, notebook_timeout, progress: ProgressConsoleWrapper, task_id, lakehouse_config): + gf.GenerateMetadataExtract(self.dbt_project_dir, self.target_info['workspaceid'], self.target_info['lakehouseid'], self.lakehouse, self.config['name'], progress=progress, task_id=task_id, lakehouse_config=lakehouse_config, notebook_timeout=notebook_timeout) # gf.GenerateNotebookUpload(self.dbt_project_dir, self.target_info['workspaceid'], self.target_info['lakehouseid'], self.lakehouse, self.config['name'], progress=progress, task_id=task_id, lakehouse_config=lakehouse_config) - gf.GenerateUtils(self.dbt_project_dir, self.target_info['workspaceid'], self.target_info['lakehouseid'], self.lakehouse, self.config['name'], progress=progress, task_id=task_id) + gf.GenerateUtils(self.dbt_project_dir, self.target_info['workspaceid'], self.target_info['lakehouseid'], self.lakehouse, self.config['name'], progress=progress, task_id=task_id, notebook_timeout=notebook_timeout) # gf.GenerateAzCopyScripts(self.dbt_project_dir, self.target_info['workspaceid'], self.target_info['lakehouseid'], progress=progress, task_id=task_id) @@ -107,7 +110,7 @@ def GeneratePostDbtScripts(self, PreInstall=False, progress=None, task_id=None, log_lakehouse = self.lakehouse gf.SetSqlVariableForAllNotebooks(self.dbt_project_dir, self.lakehouse, progress=progress, task_id=task_id, lakehouse_config=lakehouse_config) - gf.GenerateMasterNotebook(self.dbt_project_dir, self.target_info['workspaceid'], self.target_info['lakehouseid'], self.lakehouse, self.config['name'], progress=progress, task_id=task_id, notebook_timeout=notebook_timeout,max_worker = self.target_info['threads'], log_lakehouse=log_lakehouse, notebook_hashcheck=notebook_hashcheck, lakehouse_config=lakehouse_config) + gf.GenerateMasterNotebook(self.dbt_project_dir, self.target_info['workspaceid'], self.target_info['lakehouseid'], self.lakehouse, self.config['name'], progress=progress, task_id=task_id, notebook_timeout=notebook_timeout, max_worker=self.target_info['threads'], log_lakehouse=log_lakehouse, notebook_hashcheck=notebook_hashcheck, lakehouse_config=lakehouse_config) def ConvertNotebooksToFabricFormat(self, progress: ProgressConsoleWrapper, task_id=None, lakehouse_config=None): curr_dir = os.getcwd() @@ -115,6 +118,7 @@ def ConvertNotebooksToFabricFormat(self, progress: ProgressConsoleWrapper, task_ self.fa.IPYNBtoFabricPYFile(dbt_project_dir=dbt_project_dir, progress=progress, task_id=task_id, workspace_id=self.target_info['workspaceid'], lakehouse_id=self.target_info['lakehouseid'], lakehouse=self.lakehouse, lakehouse_config=lakehouse_config) def CleanProjectTargetDirectory(self, progress: ProgressConsoleWrapper, task_id): + print("Cleaning Project Target Directory") if os.path.exists(self.dbt_project_dir + "/target"): shutil.rmtree(self.dbt_project_dir + "/target") # Generate AzCopy Scripts and Metadata Extract Notebooks @@ -167,30 +171,47 @@ def BuildDbtProject(self, PreInstall=False, select="", exclude=""): Buildarr.append('--exclude') Buildarr.append(exclude) - result = subprocess.run(Buildarr, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # Access the output and error - output = result.stdout.decode('utf-8') - error = result.stderr.decode('utf-8') - - self.console.print(f"Output: {output}", style="info") - if error: - self.console.print(f"Error: {error}", style="error") + try: + result = subprocess.run(Buildarr, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Access the output and error + output = result.stdout.decode('utf-8') + error = result.stderr.decode('utf-8') + + self.console.print(f"Output: {output}", style="info") + + if error: + self.console.print(f"Error: {error}", style="error") + + except subprocess.CalledProcessError as e: + #try reading the dbt log file + if os.path.exists(self.dbt_project_dir + "/logs/dbt.log"): + with open(self.dbt_project_dir + "/logs/dbt.log", 'r') as f: + self.console.print("DBT has thrown an error. Here is the Log File:" + "\n", style="error") + self.console.print(f.read(), style="error") + else: + self.console.print(f"Error: {e.stderr.decode('utf-8')}") + raise e + print(Panel.fit("[blue]End of dbt build >>>>>>>>>>>>>>>>>>>>>>>[/blue]")) def DownloadMetadata(self, progress: ProgressConsoleWrapper, task_id): progress.print("Downloading Metadata", level=LogLevel.INFO) curr_dir = os.getcwd() dbt_project_dir = str(Path(Path(curr_dir) / Path(self.dbt_project_dir))) - mn.DownloadMetaFiles(progress=progress, task_id=task_id, dbt_project_dir=dbt_project_dir, workspacename=self.target_info['workspaceid'], datapath=self.target_info['lakehouseid'] + "/Files/MetaExtracts/") + try: + mn.DownloadMetaFiles(progress=progress, task_id=task_id, dbt_project_dir=dbt_project_dir, workspacename=self.target_info['workspaceid'], datapath=self.target_info['lakehouseid'] + "/Files/metaextracts/") + except Exception as e: + progress.print(f"Error downloading meta extracts: Workspacename: {self.target_info['workspaceid']}, DataPath: {self.target_info['lakehouseid'] + '/Files/metaextracts/'}", level=LogLevel.ERROR) + raise e def RunMetadataExtract(self, progress: ProgressConsoleWrapper, task_id): nb_name = f"metadata_{self.project_name}_extract" nb_id = self.fa.GetNotebookIdByName(workspace_id=self.target_info['workspaceid'], notebook_name=nb_name) - if nb_id is None: - progress.print("Metadata Extract Notebook Not Found in Workspace. Uploading Notebook Now", level=LogLevel.INFO) - self.fa.APIUpsertNotebooks(progress=progress, task_id=task_id, dbt_project_dir=self.dbt_project_dir, workspace_id=self.target_info['workspaceid'], notebook_name=nb_name) - else: - progress.print("Metadata Extract Notebook Found in Workspace.", level=LogLevel.INFO) + # if nb_id is None: + # progress.print("Metadata Extract Notebook Not Found in Workspace. Uploading Notebook Now", level=LogLevel.INFO) + self.fa.APIUpsertNotebooks(progress=progress, task_id=task_id, dbt_project_dir=self.dbt_project_dir, workspace_id=self.target_info['workspaceid'], notebook_name=nb_name) + #else: + # progress.print("Metadata Extract Notebook Found in Workspace.", level=LogLevel.INFO) progress.print("Running Metadata Extract", LogLevel.INFO) self.fa.APIRunNotebook(progress=progress, task_id=task_id, workspace_id=self.target_info['workspaceid'], notebook_name=f"metadata_{self.project_name}_extract") @@ -199,7 +220,31 @@ def RunMasterNotebook(self, progress: ProgressConsoleWrapper, task_id): self.fa.APIRunNotebook(progress=progress, task_id=task_id, workspace_id=self.target_info['workspaceid'], notebook_name=nb_name) def GetExecutionResults(self, progress: ProgressConsoleWrapper, task_id): - if self.sql_endpoint is not None: + if self.sql_endpoint is not None: + _fas = fas.FabricApiSql(console=self.console, server=self.sql_endpoint, database=self.lakehouse) + + # loop every 10 seconds until you have waited 1 minute + i = 0 + while i < 60: + remaining = 60 - i + progress.progress.update(task_id=task_id, description=f"Waiting for data lake to update before checking for execution Results. {remaining} seconds remaining. ", level=LogLevel.INFO) + i += 10 + sleep(10) + + sql = f""" + Select SUBSTRING(a.notebook, 0, CHARINDEX('.', a.notebook)) type, status, count(a.notebook) notebooks + from {self.lakehouse}.dbo.execution_log a + join + ( + Select top 1 batch_id, max(DATEADD(second, start_time, '1970/01/01 00:00:00')) start_time + from {self.lakehouse}.dbo.execution_log + group by batch_id + order by start_time desc + ) b on a.batch_id = b.batch_id + group by SUBSTRING(a.notebook, 0, CHARINDEX('.', a.notebook)), status + """ + _fas.ExecuteSQL(sql=sql, title="Summary", progress=progress, task_id=task_id) + sql = f""" Select a.notebook, replace(CONVERT(varchar(20), DATEADD(second, a.start_time, '1970/01/01 00:00:00'),126), 'T',' ') start_time, status, error from {self.lakehouse}.dbo.execution_log a @@ -212,12 +257,11 @@ def GetExecutionResults(self, progress: ProgressConsoleWrapper, task_id): ) b on a.batch_id = b.batch_id where a.status = 'error' """ - _fas = fas.FabricApiSql(console=self.console, server=self.sql_endpoint, database=self.lakehouse) - _fas.ExecuteSQL(sql=sql, progress=progress, task_id=task_id) + _fas.ExecuteSQL(sql=sql, title="Error Details", progress=progress, task_id=task_id) + else: progress.print("SQL Endpoint not found in profile. Skipping Execution Results", level=LogLevel.WARNING) - - + def RunBuildMetadataNotebook_Source(self, progress: ProgressConsoleWrapper, task_id): nb_name = f"util_BuildMetadata" self.fa.APIRunNotebook(progress=progress, task_id=task_id, workspace_id=self.target_info['workspaceid'], notebook_name=nb_name) @@ -230,11 +274,11 @@ def RunCompareNotebook(self, progress: ProgressConsoleWrapper, task_id): nb_name = f'compare_{self.config['name']}_{self.current_env_name}_to_{self.next_env_name}_notebook' self.fa.APIRunNotebook(progress=progress, task_id=task_id, workspace_id=self.target_info['workspaceid'], notebook_name=nb_name) - def GenerateMissingObjectsNotebook(self, progress: ProgressConsoleWrapper, task_id): - gf.GenerateMissingObjectsNotebook(self.dbt_project_dir, self.target_info['workspaceid'], self.target_info['lakehouseid'], self.lakehouse, self.config['name'], progress=progress, task_id=task_id, source_env=self.current_env_name, target_env=self.next_env_name) + def GenerateMissingObjectsNotebook(self, notebook_timeout, progress: ProgressConsoleWrapper, task_id): + gf.GenerateMissingObjectsNotebook(self.dbt_project_dir, self.target_info['workspaceid'], self.target_info['lakehouseid'], self.lakehouse, self.config['name'], progress=progress, task_id=task_id, source_env=self.current_env_name, target_env=self.next_env_name, notebook_timeout=notebook_timeout) - def GenerateCompareNotebook(self, progress: ProgressConsoleWrapper, task_id): - gf.GenerateCompareNotebook(self.dbt_project_dir, self.current_env_name, self.current_env['workspaceid'], self.current_env['lakehouseid'], self.next_env_name, self.next_env['workspaceid'], self.next_env['lakehouseid'], self.lakehouse, self.config['name'], progress=progress, task_id=task_id) + def GenerateCompareNotebook(self, notebook_timeout, progress: ProgressConsoleWrapper, task_id): + gf.GenerateCompareNotebook(self.dbt_project_dir, self.current_env_name, self.current_env['workspaceid'], self.current_env['lakehouseid'], self.next_env_name, self.next_env['workspaceid'], self.next_env['lakehouseid'], self.lakehouse, self.config['name'], progress=progress, task_id=task_id, notebook_timeout=notebook_timeout) def UploadMissingObjectsNotebookViaApi(self, progress: ProgressConsoleWrapper, task_id): curr_dir = os.getcwd() diff --git a/file.duckdb b/file.duckdb new file mode 100644 index 0000000000000000000000000000000000000000..b4d7caf8df3aaa33824dcf48c90929c84aead8e8 GIT binary patch literal 12288 zcmeI#u?fOZ5CG6G7;pt~0juz1p{5ffHmRK;-~x(kIE#ftI7D!iSV$0rfDjM@@2cbe z!5#0aKfUj_r`3G4I~3(6{1%rijcM%e&5rqc8Dq7mx_i73AV7cs0RjXF5FkK+009F3 z6G-aE?L50a+lkFrwDl0i?{xC1$OsT1K!5-N0t5&UAV7csfnf;v{}1C%wj@A+009C7 O2oNAZfB*pkeFR>}0WdTG literal 0 HcmV?d00001 diff --git a/test.py b/test.py new file mode 100644 index 0000000..ec89cc8 --- /dev/null +++ b/test.py @@ -0,0 +1,16 @@ +import duckdb +from sqlframe import activate +connection = duckdb.connect("file.duckdb") +activate("duckdb", conn=connection) +from pyspark.sql import SparkSession + +spark = SparkSession.builder.getOrCreate() + + +sql = """ +select 1 from test +""" + +spark.sql(sql).show() + + From 157d15043a2e4a8539089a63a2f8864106e23af6 Mon Sep 17 00:00:00 2001 From: John Rampono Date: Mon, 14 Oct 2024 09:26:56 +0800 Subject: [PATCH 2/3] Added pre-execute python --- dbt/adapters/fabricsparknb/notebook.py | 24 +++++++++++++++++++ dbt/adapters/fabricsparknb/utils.py | 1 + .../notebooks/model_notebook.ipynb | 7 ++++++ dbt_wrapper/generate_files.py | 6 ++++- 4 files changed, 37 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/fabricsparknb/notebook.py b/dbt/adapters/fabricsparknb/notebook.py index 48908d9..fb9e2e7 100644 --- a/dbt/adapters/fabricsparknb/notebook.py +++ b/dbt/adapters/fabricsparknb/notebook.py @@ -68,6 +68,30 @@ def SetTheSqlVariable(self): x += 1 break + def SetThePythonPreScript(self, project_root, file_name, lakehouse_name): + python_models_dir = Path(project_root) / Path('models_python') + # print(python_models_dir) + # print(file_name) + # If the path does not exist, create it + if not os.path.exists(python_models_dir): + os.makedirs(python_models_dir) + + # Remove the .sql extension and replace it with .py + file_name = file_name.replace(".ipynb", ".py") + + # Check if a matching file exists in the models_python directory + if os.path.exists(python_models_dir / Path(file_name)): + data = "" + # Read the file + with io.open(python_models_dir / Path(file_name), 'r') as file: + data = file.read() + data = re.sub(r"\{\{\s*lakehouse_name\s*\}\}", lakehouse_name, data) + for i, cell in enumerate(self.nb.cells): + if cell.cell_type == 'markdown' and "# Pre-Execution Python Script" in cell.source: + new_cell = nbf.v4.new_code_cell(source=data) + self.nb.cells.insert((i + 1), new_cell) + break + def GetSparkSqlCells(self): # Get the existing SQL Cell from the notebook. It will be the code cell following the markdown cell containing "# SPARK SQL Cell for Debugging" spark_sql_cells = [] diff --git a/dbt/adapters/fabricsparknb/utils.py b/dbt/adapters/fabricsparknb/utils.py index 537eb5a..d2442c5 100644 --- a/dbt/adapters/fabricsparknb/utils.py +++ b/dbt/adapters/fabricsparknb/utils.py @@ -264,6 +264,7 @@ def SetSqlVariableForAllNotebooks(project_root, lakehouse_name): # Gather the Spark SQL from the notebook and set the sql variable mnb.GatherSql() mnb.SetTheSqlVariable() + mnb.SetThePythonPreScript() # always set the config in first code cell mnb.nb.cells[1].source = mnb.nb.cells[1].source.replace("{{lakehouse_name}}", lakehouse_name) diff --git a/dbt/include/fabricsparknb/notebooks/model_notebook.ipynb b/dbt/include/fabricsparknb/notebooks/model_notebook.ipynb index 047d692..5d34448 100644 --- a/dbt/include/fabricsparknb/notebooks/model_notebook.ipynb +++ b/dbt/include/fabricsparknb/notebooks/model_notebook.ipynb @@ -94,6 +94,13 @@ " return status" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Pre-Execution Python Script" + ] + }, { "cell_type": "markdown", "metadata": {}, diff --git a/dbt_wrapper/generate_files.py b/dbt_wrapper/generate_files.py index 5b5fa66..1e49230 100644 --- a/dbt_wrapper/generate_files.py +++ b/dbt_wrapper/generate_files.py @@ -485,8 +485,12 @@ def SetSqlVariableForAllNotebooks(project_root, lakehouse_name, progress: Progre # Gather the Spark SQL from the notebook and set the sql variable mnb.GatherSql() mnb.SetTheSqlVariable() + mnb.SetThePythonPreScript(project_root, notebook_file, lakehouse_name) # always set the config in first code cell - mnb.nb.cells[1].source = mnb.nb.cells[1].source.replace("{{lakehouse_name}}", lakehouse_name) + import re + + # Use re.sub to replace the placeholder with optional spaces + mnb.nb.cells[1].source = re.sub(r"\{\{\s*lakehouse_name\s*\}\}", lakehouse_name, mnb.nb.cells[1].source) # Check if lakehouse_config option is set to METADATA lhconfig = lakehouse_config # Assuming highcon is a boolean variable From ebb4da6df8bdc85a8bfff852bf8380bf5a7f9206 Mon Sep 17 00:00:00 2001 From: John Rampono Date: Wed, 30 Oct 2024 10:29:01 +0800 Subject: [PATCH 3/3] Adding Custom Incremental --- dbt/adapters/fabricsparknb/notebook.py | 8 +- .../materializations/custom_strategies.sql | 79 +++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 dbt/include/fabricsparknb/macros/materializations/custom_strategies.sql diff --git a/dbt/adapters/fabricsparknb/notebook.py b/dbt/adapters/fabricsparknb/notebook.py index fb9e2e7..7dcc9d4 100644 --- a/dbt/adapters/fabricsparknb/notebook.py +++ b/dbt/adapters/fabricsparknb/notebook.py @@ -27,6 +27,7 @@ def GetIncludeDir(): # print(str(path)) return (path) + class ModelNotebook: def __init__(self, nb : nbf.NotebookNode = None, node_type='model'): @@ -62,7 +63,12 @@ def SetTheSqlVariable(self): x = 1 for sql in self.sql: # Create a new code cell - new_cell = nbf.v4.new_code_cell(source="sql = '''" + sql + "'''\n" + "spark.sql(sql)") + cell_sql = """ +sql = '''""" + sql + "'''\n" + """ +for s in sql.split(';\\n'): + spark.sql(s) +""" + new_cell = nbf.v4.new_code_cell(source=cell_sql) # Insert the new cell into the middle of the notebook self.nb.cells.insert((i + x), new_cell) x += 1 diff --git a/dbt/include/fabricsparknb/macros/materializations/custom_strategies.sql b/dbt/include/fabricsparknb/macros/materializations/custom_strategies.sql new file mode 100644 index 0000000..f0b8d05 --- /dev/null +++ b/dbt/include/fabricsparknb/macros/materializations/custom_strategies.sql @@ -0,0 +1,79 @@ +{% macro get_incremental_delete_reload_sql(arg_dict) %} + + {% do return(delete_reload_macro_with_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %} + +{% endmacro %} + + +{% macro delete_reload_macro_with_sql(target_relation, temp_relation, unique_key, dest_columns, incremental_predicates) %} + + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + + DELETE FROM {{ target_relation }} + insert into {{ target_relation }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ temp_relation }} + ); + +{% endmacro %} + + + + +{% macro dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) %} + {#-- Validate the incremental strategy #} + + {% set invalid_strategy_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + Expected one of: 'append', 'merge', 'insert_overwrite', 'delete_reload' + {%- endset %} + + {% set invalid_merge_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You can only choose this strategy when file_format is set to 'delta' or 'iceberg' or 'hudi' + {%- endset %} + + {% set invalid_insert_overwrite_endpoint_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You cannot use this strategy when connecting via endpoint + Use the 'append' or 'merge' strategy instead + {%- endset %} + + {% if raw_strategy not in ['append', 'merge', 'insert_overwrite','delete_reload'] %} + {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} + {%-else %} + {% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %} + {% do exceptions.raise_compiler_error(invalid_merge_msg) %} + {% endif %} + {% if raw_strategy == 'insert_overwrite' and file_format not in ['delta', 'iceberg', 'hudi'] %} -- and target.endpoint + {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} + {% endif %} + {% endif %} + + {% do return(raw_strategy) %} +{% endmacro %} + + +{% macro dbt_spark_get_incremental_sql(strategy, source, target, existing, unique_key, incremental_predicates) %} + {%- if strategy == 'append' -%} + {#-- insert new records into existing table, without updating or overwriting #} + {{ get_insert_into_sql(source, target) }} + {%- elif strategy == 'delete_reload' -%} + {#-- insert new records into existing table, without updating or overwriting #} + DELETE FROM {{ target }}; + {{ get_insert_into_sql(source, target) }} + {%- elif strategy == 'insert_overwrite' -%} + {#-- insert statements don't like CTEs, so support them via a temp view #} + {{ get_insert_overwrite_sql(source, target, existing) }} + {%- elif strategy == 'merge' -%} + {#-- merge all columns for datasources which implement MERGE INTO (e.g. databricks, iceberg) - schema changes are handled for us #} + {{ get_merge_sql(target, source, unique_key, dest_columns=none, incremental_predicates=incremental_predicates) }} + {%- else -%} + {% set no_sql_for_strategy_msg -%} + No known SQL for the incremental strategy provided: {{ strategy }} + {%- endset %} + {%- do exceptions.raise_compiler_error(no_sql_for_strategy_msg) -%} + {%- endif -%} + +{% endmacro %}