Skip to content

Commit

Permalink
Merge pull request #156 from Insight-Services-APAC/feature/enhancedlo…
Browse files Browse the repository at this point in the history
…gging

Feature/enhancedlogging
  • Loading branch information
grantkriegerai authored Aug 15, 2024
2 parents 38b6aeb + 871ab91 commit c43e3a8
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 75 deletions.
1 change: 1 addition & 0 deletions dbt/adapters/fabricsparknb/fabric_spark_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class SparkCredentials(Credentials):
method: str = "livy"
workspaceid: str = None
database: Optional[str] = None
log_lakehouse: Optional[str] = None
lakehouse: str = None
lakehouseid: str = None # type: ignore
endpoint: Optional[str] = "https://msitapi.fabric.microsoft.com/v1"
Expand Down
99 changes: 72 additions & 27 deletions dbt/include/fabricsparknb/notebooks/master_notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"import pandas as pd # type: ignore\n",
"from tabulate import tabulate # type: ignore\n",
"import json\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql.functions import * # type: ignore\n",
"import os\n",
"import uuid"
]
Expand All @@ -68,7 +68,8 @@
"metadata": {},
"outputs": [],
"source": [
"gv_lakehouse = '{{lakehouse_name}}'"
"gv_lakehouse = '{{lakehouse_name}}'\n",
"gv_log_lakehouse = '{{log_lakehouse}}'"
]
},
{
Expand Down Expand Up @@ -144,8 +145,8 @@
"\n",
" return all_files\n",
"\n",
"def call_child_notebook(notebook, batch_id):\n",
" mssparkutils.notebook.run(notebook, {{ notebook_timeout }},{\"pm_batch_id\": batch_id})"
"def call_child_notebook(notebook, batch_id, master_notebook):\n",
" mssparkutils.notebook.run(notebook, {{ notebook_timeout }},{\"pm_batch_id\": batch_id, \"pm_master_notebook\": master_notebook}) # type: ignore"
]
},
{
Expand All @@ -171,7 +172,7 @@
"metadata": {},
"outputs": [],
"source": [
"embedded_hashes = {{ hashes }}\n",
"embedded_hashes = {{ hashes }} # type: ignore\n",
"RelativePathForMetaData = \"Files/MetaExtracts/\"\n",
"current_hashes = json.loads(get_file_content_using_notebookutils(RelativePathForMetaData + 'MetaHashes.json'))\n",
"\n",
Expand All @@ -182,7 +183,7 @@
" return h['hash']\n",
" return ret\n",
"\n",
"embedded_hashcheck = {{ notebook_hashcheck }}\n",
"embedded_hashcheck = {{ notebook_hashcheck }} # type: ignore\n",
"\n",
"##Hashcheck: BYPASS = 0, WARNING = 1, ERROR = 2\n",
"if embedded_hashcheck == 0:\n",
Expand Down Expand Up @@ -212,7 +213,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Tables"
"## Create or Alter Tables"
]
},
{
Expand All @@ -222,14 +223,15 @@
"outputs": [],
"source": [
"sql = f'''\n",
"CREATE TABLE IF NOT EXISTS {gv_lakehouse}.execution_log (\n",
"CREATE TABLE IF NOT EXISTS {gv_log_lakehouse}.execution_log (\n",
" notebook STRING,\n",
" start_time DOUBLE,\n",
" status STRING,\n",
" error STRING,\n",
" execution_time DOUBLE,\n",
" run_order INT,\n",
" batch_id string \n",
" batch_id string,\n",
" master_notebook STRING \n",
")\n",
"USING DELTA\n",
"'''\n",
Expand All @@ -244,17 +246,58 @@
"outputs": [],
"source": [
"sql = f'''\n",
"CREATE TABLE IF NOT EXISTS {gv_lakehouse}.batch (\n",
"CREATE TABLE IF NOT EXISTS {gv_log_lakehouse}.batch (\n",
" batch_id STRING,\n",
" start_time LONG,\n",
" status STRING\n",
" status STRING,\n",
" master_notebook STRING\n",
")\n",
"USING DELTA\n",
"'''\n",
"\n",
"spark.sql(sql) # type: ignore"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check if the master_notebook column exists in the batch table\n",
"schema_check_sql = f\"DESCRIBE {gv_log_lakehouse}.execution_log\"\n",
"schema_check_df = spark.sql(schema_check_sql) # type: ignore\n",
"\n",
"# Check if the master_notebook column exists in the schema\n",
"if 'master_notebook' not in [row['col_name'] for row in schema_check_df.collect()]:\n",
" # Add the master_notebook column to the table\n",
" alter_table_sql = f'''\n",
" ALTER TABLE {gv_log_lakehouse}.execution_log\n",
" ADD COLUMN master_notebook STRING\n",
" '''\n",
" spark.sql(alter_table_sql) # type: ignore"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check if the master_notebook column exists in the batch table\n",
"schema_check_sql = f\"DESCRIBE {gv_log_lakehouse}.batch\"\n",
"schema_check_df = spark.sql(schema_check_sql) # type: ignore\n",
"\n",
"# Check if the master_notebook column exists in the schema\n",
"if 'master_notebook' not in [row['col_name'] for row in schema_check_df.collect()]:\n",
" # Add the master_notebook column to the table\n",
" alter_table_sql = f'''\n",
" ALTER TABLE {gv_log_lakehouse}.batch\n",
" ADD COLUMN master_notebook STRING\n",
" '''\n",
" spark.sql(alter_table_sql) # type: ignore"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand All @@ -270,28 +313,29 @@
"source": [
"\n",
"\n",
"def close_batch(batch_id, status):\n",
"def close_batch(batch_id, master_notebook, status):\n",
" sql = f'''\n",
" UPDATE {gv_lakehouse}.batch\n",
" UPDATE {gv_log_lakehouse}.batch\n",
" SET status = '{status}'\n",
" WHERE batch_id = '{str(batch_id)}' '''\n",
" WHERE batch_id = '{str(batch_id)}' \n",
" AND master_notebook = '{str(master_notebook)}' '''\n",
"\n",
" spark.sql(sql)\n",
" spark.sql(sql) # type: ignore\n",
"\n",
"def get_open_batch():\n",
"def get_open_batch(master_notebook):\n",
" sql = f'''\n",
" SELECT MAX(batch_id) AS LatestBatchID FROM {gv_lakehouse}.batch WHERE status = 'open'\n",
" SELECT MAX(batch_id) AS LatestBatchID FROM {gv_log_lakehouse}.batch WHERE status = 'open' AND master_notebook = '{str(master_notebook)}'\n",
" '''\n",
"\n",
" return spark.sql(sql).collect()[0]['LatestBatchID']\n",
" return spark.sql(sql).collect()[0]['LatestBatchID'] # type: ignore\n",
"\n",
"def insert_new_batch(batch_id):\n",
"def insert_new_batch(batch_id, master_notebook):\n",
" sql = f'''\n",
" INSERT INTO {gv_lakehouse}.batch\n",
" SELECT '{batch_id}' AS batch_id, UNIX_TIMESTAMP() AS start_time, 'open' AS status\n",
" INSERT INTO {gv_log_lakehouse}.batch\n",
" SELECT '{batch_id}' AS batch_id, UNIX_TIMESTAMP() AS start_time, 'open' AS status, '{str(master_notebook)}' AS master_notebook\n",
" '''\n",
"\n",
" spark.sql(sql)"
" spark.sql(sql) # type: ignore"
]
},
{
Expand All @@ -308,7 +352,8 @@
"outputs": [],
"source": [
"new_batch_id = str(uuid.uuid4())\n",
"insert_new_batch(new_batch_id)"
"master_notebook = mssparkutils.runtime.context.get('currentNotebookName')\n",
"insert_new_batch(new_batch_id, master_notebook) # type: ignore"
]
},
{
Expand All @@ -332,10 +377,10 @@
"outputs": [],
"source": [
"# Read the log for this batch execution\n",
"df_execution_log = spark.sql(f\"SELECT * FROM {gv_lakehouse}.execution_log WHERE batch_id = '{new_batch_id}'\")\n",
"df_execution_log = spark.sql(f\"SELECT * FROM {gv_log_lakehouse}.execution_log WHERE batch_id = '{new_batch_id}' AND master_notebook = '{master_notebook}'\") # type: ignore\n",
"# Check if any have not succeeded\n",
"failed_results = df_execution_log.filter(col(\"status\") != \"success\")\n",
"succeeded_results = df_execution_log.filter(col(\"status\") == \"success\")\n",
"failed_results = df_execution_log.filter(col(\"status\") != \"success\") # type: ignore\n",
"succeeded_results = df_execution_log.filter(col(\"status\") == \"success\") # type: ignore\n",
"\n",
"if failed_results.count() == 0: \n",
" print(\"Batch Succeeded\")\n",
Expand All @@ -344,7 +389,7 @@
" print(\"Batch Failed\")\n",
" display(failed_results)\n",
"\n",
"close_batch(new_batch_id, 'closed')\n"
"close_batch(new_batch_id, master_notebook, 'closed') # type: ignore\n"
]
}
],
Expand Down
64 changes: 33 additions & 31 deletions dbt/include/fabricsparknb/notebooks/master_notebook_x.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
},
"outputs": [],
"source": [
"pm_batch_id = None"
"pm_batch_id = None\n",
"pm_master_notebook = None"
]
},
{
Expand Down Expand Up @@ -64,8 +65,8 @@
"import time\n",
"import jsonpickle # type: ignore\n",
"import json\n",
"from pyspark.sql.types import *\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql.types import * # type: ignore\n",
"from pyspark.sql.functions import * # type: ignore\n",
"import os"
]
},
Expand All @@ -82,17 +83,17 @@
"metadata": {},
"outputs": [],
"source": [
"notebook_files1 = {{ notebook_files }}\n",
"run_order1 = {{ run_order }}\n",
"notebook_files1 = {{ notebook_files }} # type: ignore\n",
"run_order1 = {{ run_order }} # type: ignore\n",
"\n",
"# Define a function to execute a notebook and return the results\n",
"@dataclass\n",
"class NotebookResult: \n",
" notebook: str\n",
" start_time: float\n",
" start_time: int\n",
" status: str\n",
" error: str\n",
" execution_time: float\n",
" execution_time: int\n",
" run_order: int\n",
"\n",
"def execute_notebook(notebook_file):\n",
Expand Down Expand Up @@ -170,31 +171,31 @@
"outputs": [],
"source": [
"# Define the schema for the DataFrame\n",
"schema = StructType([\n",
" StructField(\"notebook\", StringType(), True),\n",
" StructField(\"start_time\", DoubleType(), True),\n",
" StructField(\"status\", StringType(), True),\n",
" StructField(\"error\", StringType(), True),\n",
" StructField(\"execution_time\", DoubleType(), True),\n",
" StructField(\"run_order\", IntegerType(), True),\n",
" StructField(\"batch_id\", StringType(), True)\n",
"schema = StructType([ # type: ignore\n",
" StructField(\"notebook\", StringType(), True), # type: ignore\n",
" StructField(\"start_time\", DoubleType(), True), # type: ignore\n",
" StructField(\"status\", StringType(), True), # type: ignore\n",
" StructField(\"error\", StringType(), True), # type: ignore\n",
" StructField(\"execution_time\", DoubleType(), True), # type: ignore\n",
" StructField(\"run_order\", IntegerType(), True), # type: ignore\n",
" StructField(\"batch_id\", StringType(), True) # type: ignore\n",
"])\n",
"\n",
"# Create an empty DataFrame with the defined schema\n",
"failed_results = spark.createDataFrame([], schema=schema)\n",
"failed_results = spark.createDataFrame([], schema=schema) # type: ignore\n",
"# Read the log for this batch execution\n",
"df_execution_log = spark.sql(f\"SELECT * FROM {{lakehouse_name}}.execution_log WHERE batch_id = '{pm_batch_id}'\")\n",
"df_execution_log = spark.sql(f\"SELECT * FROM {{log_lakehouse}}.execution_log WHERE batch_id = '{pm_batch_id}' AND master_notebook = '{pm_master_notebook}'\") # type: ignore\n",
"if df_execution_log.count() > 0:\n",
" \n",
" # Check if any have not succeeded\n",
" failed_results = df_execution_log.filter(col(\"status\") != \"success\")\n",
" failed_results = df_execution_log.filter(col(\"status\") != \"success\") # type: ignore\n",
"\n",
" # Print the failed results\n",
" for row in failed_results.collect():\n",
" print(f\"Notebook {row['notebook']} failed with error: {row['error']}\")\n",
"\n",
" # Check if have succeeded\n",
" succeeded_results = df_execution_log.filter(col(\"status\") == \"success\")\n",
" succeeded_results = df_execution_log.filter(col(\"status\") == \"success\") # type: ignore\n",
"\n",
" # Print the succeeded results\n",
" for row in succeeded_results.collect():\n",
Expand All @@ -215,26 +216,27 @@
"outputs": [],
"source": [
"# Define the schema for the Log DataFrame\n",
"schema = StructType([\n",
" StructField(\"notebook\", StringType(), True),\n",
" StructField(\"start_time\", DoubleType(), True),\n",
" StructField(\"status\", StringType(), True),\n",
" StructField(\"error\", StringType(), True),\n",
" StructField(\"execution_time\", DoubleType(), True),\n",
" StructField(\"run_order\", IntegerType(), True)\n",
"schema = StructType([ # type: ignore\n",
" StructField(\"notebook\", StringType(), True), # type: ignore\n",
" StructField(\"start_time\", DoubleType(), True), # type: ignore\n",
" StructField(\"status\", StringType(), True), # type: ignore\n",
" StructField(\"error\", StringType(), True), # type: ignore\n",
" StructField(\"execution_time\", DoubleType(), True), # type: ignore\n",
" StructField(\"run_order\", IntegerType(), True) # type: ignore\n",
"])\n",
"\n",
"if failed_results.count() == 0:\n",
" new_results = []\n",
" # Use a ThreadPoolExecutor to run the notebooks in parallel\n",
" # Execute the notebooks and collect the results\n",
" with ThreadPoolExecutor(max_workers={{ max_worker }}) as executor:\n",
" new_results = list(executor.map(execute_notebook, notebook_files1))\n",
" with ThreadPoolExecutor(max_workers={{ max_worker }}) as executor: # type: ignore\n",
" new_results = list(executor.map(execute_notebook, notebook_files1)) # type: ignore\n",
"\n",
" # Write the results to the log file\n",
" df_log = spark.createDataFrame(new_results, schema=schema)\n",
" df_log = df_log.withColumn(\"batch_id\", lit(f'{pm_batch_id}'))\n",
" df_log.write.format(\"delta\").mode(\"append\").saveAsTable(\"{{lakehouse_name}}.execution_log\")\n",
" df_log = spark.createDataFrame(new_results, schema=schema) # type: ignore\n",
" df_log = df_log.withColumn(\"batch_id\", lit(f'{pm_batch_id}')) # type: ignore\n",
" df_log = df_log.withColumn(\"master_notebook\", lit(f'{pm_master_notebook}')) # type: ignore\n",
" df_log.write.format(\"delta\").mode(\"append\").saveAsTable(\"{{log_lakehouse}}.execution_log\")\n",
"else:\n",
" print(\"Failures in previous run_order... supressing execution\")\n",
" raise Exception(\"Failures in previous run_order... supressing execution\")"
Expand Down
2 changes: 2 additions & 0 deletions dbt/include/fabricsparknb/profile_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ prompts:
hint: Name of the Lakehouse in the workspace that you want to connect to
lakehouseid:
hint: GUID of the lakehouse, which can be extracted from url when you open lakehouse artifact from fabric.microsoft.com
log_lakehouse:
hint: Name of the Lakehouse in the workspace that you want to log to
endpoint:
default: https://api.fabric.microsoft.com/v1
auth:
Expand Down
12 changes: 8 additions & 4 deletions dbt_wrapper/generate_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@


@staticmethod
def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_name, project_name, progress: ProgressConsoleWrapper, task_id, notebook_timeout, max_worker, notebook_hashcheck):
def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_name, project_name, progress: ProgressConsoleWrapper, task_id, notebook_timeout, max_worker, log_lakehouse, notebook_hashcheck):
# If log lakehouse is None use lakehouse as default
if log_lakehouse is None:
log_lakehouse = lakehouse_name

# Iterate through the notebooks directory and create a list of notebook files
notebook_dir = f'./{project_root}/target/notebooks/'
notebook_files_str = [os.path.splitext(os.path.basename(f))[0] for f in os.listdir(Path(notebook_dir)) if f.endswith('.ipynb') and 'master_notebook' not in f]
Expand Down Expand Up @@ -77,7 +81,7 @@ def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_nam
template = env.get_template('master_notebook_x.ipynb')

# Render the template with the notebook_file variable
rendered_template = template.render(notebook_files=file_str_with_current_sort_order, run_order=sort_order, lakehouse_name=lakehouse_name, project_name=project_name,max_worker=max_worker)
rendered_template = template.render(notebook_files=file_str_with_current_sort_order, run_order=sort_order, lakehouse_name=lakehouse_name, project_name=project_name,max_worker=max_worker, log_lakehouse=log_lakehouse)

# Parse the rendered template as a notebook
nb = nbf.reads(rendered_template, as_version=4)
Expand All @@ -104,7 +108,7 @@ def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_nam

MetaHashes = Catalog.GetMetaHashes(project_root)
# Render the template with the notebook_file variable
rendered_template = template.render(lakehouse_name=lakehouse_name, hashes=MetaHashes, project_name=project_name, notebook_timeout=notebook_timeout,notebook_hashcheck=notebook_hashcheck)
rendered_template = template.render(lakehouse_name=lakehouse_name, hashes=MetaHashes, project_name=project_name, notebook_timeout=notebook_timeout, log_lakehouse=log_lakehouse,notebook_hashcheck=notebook_hashcheck)

# Parse the rendered template as a notebook
nb = nbf.reads(rendered_template, as_version=4)
Expand All @@ -121,7 +125,7 @@ def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_nam
nb.cells.insert((insertion_point), cell)
insertion_point += 1
# Create a new code cell with the SQL
code = f'call_child_notebook("master_{project_name}_notebook_' + str(sort_order) + '", new_batch_id)'
code = f'call_child_notebook("master_{project_name}_notebook_' + str(sort_order) + '", new_batch_id, master_notebook)'
cell = nbf.v4.new_code_cell(source=code)
# Add the cell to the notebook
nb.cells.insert((insertion_point), cell)
Expand Down
Loading

0 comments on commit c43e3a8

Please sign in to comment.