Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/enhancedlogging #156

Merged
merged 25 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fbcc82f
Made the changes to apply a different lakehouse for logging if select…
cheinamann Aug 7, 2024
050ac0a
Updated the Logs to include the master notebook being used for the ex…
cheinamann Aug 7, 2024
0e7b01a
Updated the notebooks to make the timestamps human readable formatted…
cheinamann Aug 8, 2024
4785d09
Fixing error where default is not being populated for log lakehouse
cheinamann Aug 8, 2024
85db4b5
Updated code to fix the child notebook log lakehouse
cheinamann Aug 8, 2024
3a3323f
Changed the log lakehouse from an option in the wrapper to a config i…
cheinamann Aug 8, 2024
7754743
Added a fix for the child notebooks. #50
cheinamann Aug 8, 2024
fe0959b
Fixed a log lakehouse reference. #50
cheinamann Aug 8, 2024
3a30b35
Fixed a log error #50
cheinamann Aug 8, 2024
bafd247
Fixed batch logging error #50
cheinamann Aug 8, 2024
1bbcf98
Added code that will not fail the log lakehouse default if the profil…
cheinamann Aug 8, 2024
b7b6784
Added config for log_lakehouse in profile setup #50
cheinamann Aug 8, 2024
fc94b89
Added the log_lakehouse to the template. #50
cheinamann Aug 8, 2024
7f92d44
Added a check for log_lakehouse in profile create #50
cheinamann Aug 9, 2024
22dcccf
Removed the parameter for child notebooks for the log lakehouse. #50
cheinamann Aug 9, 2024
d3d7a6a
Fixed a notebook reference #50
cheinamann Aug 9, 2024
c52b1c7
Fixed code error #50
cheinamann Aug 9, 2024
8a49555
Merge dev to feature
cheinamann Aug 9, 2024
43d157d
Fixed a code error #50
cheinamann Aug 9, 2024
3e88991
Merge remote-tracking branch 'origin/dev' into feature/enhancedlogging
cheinamann Aug 11, 2024
2c2a15b
Changed creditial to optional for log lakehouse #50
cheinamann Aug 12, 2024
b82c689
Updated docs #50
cheinamann Aug 13, 2024
a615d17
Removed the timestamp change as this breaks the log table for existin…
cheinamann Aug 13, 2024
54d57e8
Fixed data type error #50
cheinamann Aug 13, 2024
871ab91
Added a check to update the log tables if the columns dont exist. #50
cheinamann Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbt/adapters/fabricsparknb/fabric_spark_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class SparkCredentials(Credentials):
method: str = "livy"
workspaceid: str = None
database: Optional[str] = None
log_lakehouse: Optional[str] = None
lakehouse: str = None
lakehouseid: str = None # type: ignore
endpoint: Optional[str] = "https://msitapi.fabric.microsoft.com/v1"
Expand Down
99 changes: 72 additions & 27 deletions dbt/include/fabricsparknb/notebooks/master_notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"import pandas as pd # type: ignore\n",
"from tabulate import tabulate # type: ignore\n",
"import json\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql.functions import * # type: ignore\n",
"import os\n",
"import uuid"
]
Expand All @@ -68,7 +68,8 @@
"metadata": {},
"outputs": [],
"source": [
"gv_lakehouse = '{{lakehouse_name}}'"
"gv_lakehouse = '{{lakehouse_name}}'\n",
"gv_log_lakehouse = '{{log_lakehouse}}'"
]
},
{
Expand Down Expand Up @@ -144,8 +145,8 @@
"\n",
" return all_files\n",
"\n",
"def call_child_notebook(notebook, batch_id):\n",
" mssparkutils.notebook.run(notebook, {{ notebook_timeout }},{\"pm_batch_id\": batch_id})"
"def call_child_notebook(notebook, batch_id, master_notebook):\n",
" mssparkutils.notebook.run(notebook, {{ notebook_timeout }},{\"pm_batch_id\": batch_id, \"pm_master_notebook\": master_notebook}) # type: ignore"
]
},
{
Expand All @@ -171,7 +172,7 @@
"metadata": {},
"outputs": [],
"source": [
"embedded_hashes = {{ hashes }}\n",
"embedded_hashes = {{ hashes }} # type: ignore\n",
"RelativePathForMetaData = \"Files/MetaExtracts/\"\n",
"current_hashes = json.loads(get_file_content_using_notebookutils(RelativePathForMetaData + 'MetaHashes.json'))\n",
"\n",
Expand All @@ -182,7 +183,7 @@
" return h['hash']\n",
" return ret\n",
"\n",
"embedded_hashcheck = {{ notebook_hashcheck }}\n",
"embedded_hashcheck = {{ notebook_hashcheck }} # type: ignore\n",
"\n",
"##Hashcheck: BYPASS = 0, WARNING = 1, ERROR = 2\n",
"if embedded_hashcheck == 0:\n",
Expand Down Expand Up @@ -212,7 +213,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Tables"
"## Create or Alter Tables"
]
},
{
Expand All @@ -222,14 +223,15 @@
"outputs": [],
"source": [
"sql = f'''\n",
"CREATE TABLE IF NOT EXISTS {gv_lakehouse}.execution_log (\n",
"CREATE TABLE IF NOT EXISTS {gv_log_lakehouse}.execution_log (\n",
" notebook STRING,\n",
" start_time DOUBLE,\n",
" status STRING,\n",
" error STRING,\n",
" execution_time DOUBLE,\n",
" run_order INT,\n",
" batch_id string \n",
" batch_id string,\n",
" master_notebook STRING \n",
")\n",
"USING DELTA\n",
"'''\n",
Expand All @@ -244,17 +246,58 @@
"outputs": [],
"source": [
"sql = f'''\n",
"CREATE TABLE IF NOT EXISTS {gv_lakehouse}.batch (\n",
"CREATE TABLE IF NOT EXISTS {gv_log_lakehouse}.batch (\n",
" batch_id STRING,\n",
" start_time LONG,\n",
" status STRING\n",
" status STRING,\n",
" master_notebook STRING\n",
")\n",
"USING DELTA\n",
"'''\n",
"\n",
"spark.sql(sql) # type: ignore"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check if the master_notebook column exists in the batch table\n",
"schema_check_sql = f\"DESCRIBE {gv_log_lakehouse}.execution_log\"\n",
"schema_check_df = spark.sql(schema_check_sql) # type: ignore\n",
"\n",
"# Check if the master_notebook column exists in the schema\n",
"if 'master_notebook' not in [row['col_name'] for row in schema_check_df.collect()]:\n",
" # Add the master_notebook column to the table\n",
" alter_table_sql = f'''\n",
" ALTER TABLE {gv_log_lakehouse}.execution_log\n",
" ADD COLUMN master_notebook STRING\n",
" '''\n",
" spark.sql(alter_table_sql) # type: ignore"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check if the master_notebook column exists in the batch table\n",
"schema_check_sql = f\"DESCRIBE {gv_log_lakehouse}.batch\"\n",
"schema_check_df = spark.sql(schema_check_sql) # type: ignore\n",
"\n",
"# Check if the master_notebook column exists in the schema\n",
"if 'master_notebook' not in [row['col_name'] for row in schema_check_df.collect()]:\n",
" # Add the master_notebook column to the table\n",
" alter_table_sql = f'''\n",
" ALTER TABLE {gv_log_lakehouse}.batch\n",
" ADD COLUMN master_notebook STRING\n",
" '''\n",
" spark.sql(alter_table_sql) # type: ignore"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand All @@ -270,28 +313,29 @@
"source": [
"\n",
"\n",
"def close_batch(batch_id, status):\n",
"def close_batch(batch_id, master_notebook, status):\n",
" sql = f'''\n",
" UPDATE {gv_lakehouse}.batch\n",
" UPDATE {gv_log_lakehouse}.batch\n",
" SET status = '{status}'\n",
" WHERE batch_id = '{str(batch_id)}' '''\n",
" WHERE batch_id = '{str(batch_id)}' \n",
" AND master_notebook = '{str(master_notebook)}' '''\n",
"\n",
" spark.sql(sql)\n",
" spark.sql(sql) # type: ignore\n",
"\n",
"def get_open_batch():\n",
"def get_open_batch(master_notebook):\n",
" sql = f'''\n",
" SELECT MAX(batch_id) AS LatestBatchID FROM {gv_lakehouse}.batch WHERE status = 'open'\n",
" SELECT MAX(batch_id) AS LatestBatchID FROM {gv_log_lakehouse}.batch WHERE status = 'open' AND master_notebook = '{str(master_notebook)}'\n",
" '''\n",
"\n",
" return spark.sql(sql).collect()[0]['LatestBatchID']\n",
" return spark.sql(sql).collect()[0]['LatestBatchID'] # type: ignore\n",
"\n",
"def insert_new_batch(batch_id):\n",
"def insert_new_batch(batch_id, master_notebook):\n",
" sql = f'''\n",
" INSERT INTO {gv_lakehouse}.batch\n",
" SELECT '{batch_id}' AS batch_id, UNIX_TIMESTAMP() AS start_time, 'open' AS status\n",
" INSERT INTO {gv_log_lakehouse}.batch\n",
" SELECT '{batch_id}' AS batch_id, UNIX_TIMESTAMP() AS start_time, 'open' AS status, '{str(master_notebook)}' AS master_notebook\n",
" '''\n",
"\n",
" spark.sql(sql)"
" spark.sql(sql) # type: ignore"
]
},
{
Expand All @@ -308,7 +352,8 @@
"outputs": [],
"source": [
"new_batch_id = str(uuid.uuid4())\n",
"insert_new_batch(new_batch_id)"
"master_notebook = mssparkutils.runtime.context.get('currentNotebookName')\n",
"insert_new_batch(new_batch_id, master_notebook) # type: ignore"
]
},
{
Expand All @@ -332,10 +377,10 @@
"outputs": [],
"source": [
"# Read the log for this batch execution\n",
"df_execution_log = spark.sql(f\"SELECT * FROM {gv_lakehouse}.execution_log WHERE batch_id = '{new_batch_id}'\")\n",
"df_execution_log = spark.sql(f\"SELECT * FROM {gv_log_lakehouse}.execution_log WHERE batch_id = '{new_batch_id}' AND master_notebook = '{master_notebook}'\") # type: ignore\n",
"# Check if any have not succeeded\n",
"failed_results = df_execution_log.filter(col(\"status\") != \"success\")\n",
"succeeded_results = df_execution_log.filter(col(\"status\") == \"success\")\n",
"failed_results = df_execution_log.filter(col(\"status\") != \"success\") # type: ignore\n",
"succeeded_results = df_execution_log.filter(col(\"status\") == \"success\") # type: ignore\n",
"\n",
"if failed_results.count() == 0: \n",
" print(\"Batch Succeeded\")\n",
Expand All @@ -344,7 +389,7 @@
" print(\"Batch Failed\")\n",
" display(failed_results)\n",
"\n",
"close_batch(new_batch_id, 'closed')\n"
"close_batch(new_batch_id, master_notebook, 'closed') # type: ignore\n"
]
}
],
Expand Down
64 changes: 33 additions & 31 deletions dbt/include/fabricsparknb/notebooks/master_notebook_x.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
},
"outputs": [],
"source": [
"pm_batch_id = None"
"pm_batch_id = None\n",
"pm_master_notebook = None"
]
},
{
Expand Down Expand Up @@ -64,8 +65,8 @@
"import time\n",
"import jsonpickle # type: ignore\n",
"import json\n",
"from pyspark.sql.types import *\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql.types import * # type: ignore\n",
"from pyspark.sql.functions import * # type: ignore\n",
"import os"
]
},
Expand All @@ -82,17 +83,17 @@
"metadata": {},
"outputs": [],
"source": [
"notebook_files1 = {{ notebook_files }}\n",
"run_order1 = {{ run_order }}\n",
"notebook_files1 = {{ notebook_files }} # type: ignore\n",
"run_order1 = {{ run_order }} # type: ignore\n",
"\n",
"# Define a function to execute a notebook and return the results\n",
"@dataclass\n",
"class NotebookResult: \n",
" notebook: str\n",
" start_time: float\n",
" start_time: int\n",
" status: str\n",
" error: str\n",
" execution_time: float\n",
" execution_time: int\n",
" run_order: int\n",
"\n",
"def execute_notebook(notebook_file):\n",
Expand Down Expand Up @@ -170,31 +171,31 @@
"outputs": [],
"source": [
"# Define the schema for the DataFrame\n",
"schema = StructType([\n",
" StructField(\"notebook\", StringType(), True),\n",
" StructField(\"start_time\", DoubleType(), True),\n",
" StructField(\"status\", StringType(), True),\n",
" StructField(\"error\", StringType(), True),\n",
" StructField(\"execution_time\", DoubleType(), True),\n",
" StructField(\"run_order\", IntegerType(), True),\n",
" StructField(\"batch_id\", StringType(), True)\n",
"schema = StructType([ # type: ignore\n",
" StructField(\"notebook\", StringType(), True), # type: ignore\n",
" StructField(\"start_time\", DoubleType(), True), # type: ignore\n",
" StructField(\"status\", StringType(), True), # type: ignore\n",
" StructField(\"error\", StringType(), True), # type: ignore\n",
" StructField(\"execution_time\", DoubleType(), True), # type: ignore\n",
" StructField(\"run_order\", IntegerType(), True), # type: ignore\n",
" StructField(\"batch_id\", StringType(), True) # type: ignore\n",
"])\n",
"\n",
"# Create an empty DataFrame with the defined schema\n",
"failed_results = spark.createDataFrame([], schema=schema)\n",
"failed_results = spark.createDataFrame([], schema=schema) # type: ignore\n",
"# Read the log for this batch execution\n",
"df_execution_log = spark.sql(f\"SELECT * FROM {{lakehouse_name}}.execution_log WHERE batch_id = '{pm_batch_id}'\")\n",
"df_execution_log = spark.sql(f\"SELECT * FROM {{log_lakehouse}}.execution_log WHERE batch_id = '{pm_batch_id}' AND master_notebook = '{pm_master_notebook}'\") # type: ignore\n",
"if df_execution_log.count() > 0:\n",
" \n",
" # Check if any have not succeeded\n",
" failed_results = df_execution_log.filter(col(\"status\") != \"success\")\n",
" failed_results = df_execution_log.filter(col(\"status\") != \"success\") # type: ignore\n",
"\n",
" # Print the failed results\n",
" for row in failed_results.collect():\n",
" print(f\"Notebook {row['notebook']} failed with error: {row['error']}\")\n",
"\n",
" # Check if have succeeded\n",
" succeeded_results = df_execution_log.filter(col(\"status\") == \"success\")\n",
" succeeded_results = df_execution_log.filter(col(\"status\") == \"success\") # type: ignore\n",
"\n",
" # Print the succeeded results\n",
" for row in succeeded_results.collect():\n",
Expand All @@ -215,26 +216,27 @@
"outputs": [],
"source": [
"# Define the schema for the Log DataFrame\n",
"schema = StructType([\n",
" StructField(\"notebook\", StringType(), True),\n",
" StructField(\"start_time\", DoubleType(), True),\n",
" StructField(\"status\", StringType(), True),\n",
" StructField(\"error\", StringType(), True),\n",
" StructField(\"execution_time\", DoubleType(), True),\n",
" StructField(\"run_order\", IntegerType(), True)\n",
"schema = StructType([ # type: ignore\n",
" StructField(\"notebook\", StringType(), True), # type: ignore\n",
" StructField(\"start_time\", DoubleType(), True), # type: ignore\n",
" StructField(\"status\", StringType(), True), # type: ignore\n",
" StructField(\"error\", StringType(), True), # type: ignore\n",
" StructField(\"execution_time\", DoubleType(), True), # type: ignore\n",
" StructField(\"run_order\", IntegerType(), True) # type: ignore\n",
"])\n",
"\n",
"if failed_results.count() == 0:\n",
" new_results = []\n",
" # Use a ThreadPoolExecutor to run the notebooks in parallel\n",
" # Execute the notebooks and collect the results\n",
" with ThreadPoolExecutor(max_workers={{ max_worker }}) as executor:\n",
" new_results = list(executor.map(execute_notebook, notebook_files1))\n",
" with ThreadPoolExecutor(max_workers={{ max_worker }}) as executor: # type: ignore\n",
" new_results = list(executor.map(execute_notebook, notebook_files1)) # type: ignore\n",
"\n",
" # Write the results to the log file\n",
" df_log = spark.createDataFrame(new_results, schema=schema)\n",
" df_log = df_log.withColumn(\"batch_id\", lit(f'{pm_batch_id}'))\n",
" df_log.write.format(\"delta\").mode(\"append\").saveAsTable(\"{{lakehouse_name}}.execution_log\")\n",
" df_log = spark.createDataFrame(new_results, schema=schema) # type: ignore\n",
" df_log = df_log.withColumn(\"batch_id\", lit(f'{pm_batch_id}')) # type: ignore\n",
" df_log = df_log.withColumn(\"master_notebook\", lit(f'{pm_master_notebook}')) # type: ignore\n",
" df_log.write.format(\"delta\").mode(\"append\").saveAsTable(\"{{log_lakehouse}}.execution_log\")\n",
"else:\n",
" print(\"Failures in previous run_order... supressing execution\")\n",
" raise Exception(\"Failures in previous run_order... supressing execution\")"
Expand Down
2 changes: 2 additions & 0 deletions dbt/include/fabricsparknb/profile_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ prompts:
hint: Name of the Lakehouse in the workspace that you want to connect to
lakehouseid:
hint: GUID of the lakehouse, which can be extracted from url when you open lakehouse artifact from fabric.microsoft.com
log_lakehouse:
hint: Name of the Lakehouse in the workspace that you want to log to
endpoint:
default: https://api.fabric.microsoft.com/v1
auth:
Expand Down
12 changes: 8 additions & 4 deletions dbt_wrapper/generate_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@


@staticmethod
def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_name, project_name, progress: ProgressConsoleWrapper, task_id, notebook_timeout, max_worker, notebook_hashcheck):
def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_name, project_name, progress: ProgressConsoleWrapper, task_id, notebook_timeout, max_worker, log_lakehouse, notebook_hashcheck):
# If log lakehouse is None use lakehouse as default
if log_lakehouse is None:
log_lakehouse = lakehouse_name

# Iterate through the notebooks directory and create a list of notebook files
notebook_dir = f'./{project_root}/target/notebooks/'
notebook_files_str = [os.path.splitext(os.path.basename(f))[0] for f in os.listdir(Path(notebook_dir)) if f.endswith('.ipynb') and 'master_notebook' not in f]
Expand Down Expand Up @@ -77,7 +81,7 @@ def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_nam
template = env.get_template('master_notebook_x.ipynb')

# Render the template with the notebook_file variable
rendered_template = template.render(notebook_files=file_str_with_current_sort_order, run_order=sort_order, lakehouse_name=lakehouse_name, project_name=project_name,max_worker=max_worker)
rendered_template = template.render(notebook_files=file_str_with_current_sort_order, run_order=sort_order, lakehouse_name=lakehouse_name, project_name=project_name,max_worker=max_worker, log_lakehouse=log_lakehouse)

# Parse the rendered template as a notebook
nb = nbf.reads(rendered_template, as_version=4)
Expand All @@ -104,7 +108,7 @@ def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_nam

MetaHashes = Catalog.GetMetaHashes(project_root)
# Render the template with the notebook_file variable
rendered_template = template.render(lakehouse_name=lakehouse_name, hashes=MetaHashes, project_name=project_name, notebook_timeout=notebook_timeout,notebook_hashcheck=notebook_hashcheck)
rendered_template = template.render(lakehouse_name=lakehouse_name, hashes=MetaHashes, project_name=project_name, notebook_timeout=notebook_timeout, log_lakehouse=log_lakehouse,notebook_hashcheck=notebook_hashcheck)

# Parse the rendered template as a notebook
nb = nbf.reads(rendered_template, as_version=4)
Expand All @@ -121,7 +125,7 @@ def GenerateMasterNotebook(project_root, workspaceid, lakehouseid, lakehouse_nam
nb.cells.insert((insertion_point), cell)
insertion_point += 1
# Create a new code cell with the SQL
code = f'call_child_notebook("master_{project_name}_notebook_' + str(sort_order) + '", new_batch_id)'
code = f'call_child_notebook("master_{project_name}_notebook_' + str(sort_order) + '", new_batch_id, master_notebook)'
cell = nbf.v4.new_code_cell(source=code)
# Add the cell to the notebook
nb.cells.insert((insertion_point), cell)
Expand Down
Loading
Loading