Skip to content

Commit

Permalink
Merge pull request #151 from Insight-Services-APAC/feature/v0.4.0
Browse files Browse the repository at this point in the history
Added in console master execution summary
  • Loading branch information
grantkriegerai authored Aug 9, 2024
2 parents 47353b2 + 776e6d9 commit ba4eb38
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 5 deletions.
3 changes: 2 additions & 1 deletion dbt/adapters/fabricsparknb/fabric_spark_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ class SparkCredentials(Credentials):
lakehouse: str = None
lakehouseid: str = None # type: ignore
endpoint: Optional[str] = "https://msitapi.fabric.microsoft.com/v1"
sql_endpoint: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
tenant_id: Optional[str] = None
authentication: str= "CLI"
authentication: str = "CLI"
connect_retries: int = 1
connect_timeout: int = 10
livy_session_parameters: Dict[str, Any] = field(default_factory=dict)
Expand Down
2 changes: 1 addition & 1 deletion dbt_wrapper/fabric_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def GetWorkspaceName(self, workspace_id):
fc = FabricClientCore(silent=True)
workspace = fc.get_workspace_by_id(id=workspace_id)
return workspace.display_name

def APIRunNotebook(self, progress: ProgressConsoleWrapper, task_id, workspace_id, notebook_name):
fc = FabricClientCore(silent=True)
workspace = fc.get_workspace_by_id(id=workspace_id)
Expand Down
51 changes: 51 additions & 0 deletions dbt_wrapper/fabric_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import struct
from itertools import chain, repeat
import pyodbc
from azure.identity import AzureCliCredential
from dbt_wrapper.stage_executor import ProgressConsoleWrapper
from rich.table import Table
from rich.console import Console

class FabricApiSql:
def __init__(self, console, server, database):
self.console = console
self.server = server
self.database = database

def ExecuteSQL(self, sql, progress: ProgressConsoleWrapper, task_id):
credential = AzureCliCredential() # use your authentication mechanism of choice
sql_endpoint = f"{self.server}.datawarehouse.fabric.microsoft.com" # copy and paste the SQL endpoint from any of the Lakehouses or Warehouses in your Fabric Workspace

database = f"{self.database}" # copy and paste the name of the Lakehouse or Warehouse you want to connect to

connection_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server={sql_endpoint},1433;Database=f{database};Encrypt=Yes;TrustServerCertificate=No"

token_object = credential.get_token("https://database.windows.net//.default") # Retrieve an access token valid to connect to SQL databases
token_as_bytes = bytes(token_object.token, "UTF-8") # Convert the token to a UTF-8 byte string
encoded_bytes = bytes(chain.from_iterable(zip(token_as_bytes, repeat(0)))) # Encode the bytes to a Windows byte string
token_bytes = struct.pack("<i", len(encoded_bytes)) + encoded_bytes # Package the token into a bytes object
attrs_before = {1256: token_bytes} # Attribute pointing to SQL_COPT_SS_ACCESS_TOKEN to pass access token to the driver

connection = pyodbc.connect(connection_string, attrs_before=attrs_before)
cursor = connection.cursor()

cursor.execute(sql)
rows = cursor.fetchall()

# Create a table
table = Table(title="Results")

# Add columns to the table
for column in cursor.description:
table.add_column(column[0])

# Add rows to the table
for row in rows:
cells = [str(cell) for cell in row]
table.add_row(*cells)

# Print the table using the progress console
progress.console.print(table)

cursor.close()
connection.close()
2 changes: 1 addition & 1 deletion dbt_wrapper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def run_all(
se.perform_stage(option=upload_notebooks_via_api, action_callables=[wrapper_commands.AutoUploadNotebooksViaApi], stage_name="Upload Notebooks via API")

se.perform_stage(option=auto_run_master_notebook, action_callables=[wrapper_commands.RunMasterNotebook], stage_name="Run Master Notebook")

se.perform_stage(option=auto_run_master_notebook, action_callables=[wrapper_commands.GetExecutionResults], stage_name="Get Execution Results")

if __name__ == "__main__":
app()
26 changes: 26 additions & 0 deletions dbt_wrapper/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import dbt_wrapper.utils as mn
import dbt_wrapper.generate_files as gf
from dbt_wrapper.fabric_api import FabricAPI as fa
import dbt_wrapper.fabric_sql as fas
from dbt_wrapper.log_levels import LogLevel
from dbt_wrapper.stage_executor import ProgressConsoleWrapper
from rich import print
Expand Down Expand Up @@ -55,6 +56,11 @@ def GetDbtConfigs(self, dbt_project_dir, dbt_profiles_dir=None):
self.profile_info = self.profile[self.config['profile']]
self.target_info = self.profile_info['outputs'][self.profile_info['target']]
self.lakehouse = self.target_info['lakehouse']
if "sql_endpoint" in self.target_info.keys():
self.sql_endpoint = self.target_info['sql_endpoint']
else:
self.sql_endpoint = None

self.project_name = self.config['name']
#self.workspaceid = self.config['workspaceid']

Expand Down Expand Up @@ -160,3 +166,23 @@ def RunMetadataExtract(self, progress: ProgressConsoleWrapper, task_id):
def RunMasterNotebook(self, progress: ProgressConsoleWrapper, task_id):
nb_name = f"master_{self.project_name}_notebook"
self.fa.APIRunNotebook(progress=progress, task_id=task_id, workspace_id=self.target_info['workspaceid'], notebook_name=nb_name)

def GetExecutionResults(self, progress: ProgressConsoleWrapper, task_id):
if self.sql_endpoint is not None:
sql = f"""
Select a.notebook, replace(CONVERT(varchar(20), DATEADD(second, a.start_time, '1970/01/01 00:00:00'),126), 'T',' ') start_time, status, error
from {self.lakehouse}.dbo.execution_log a
join
(
Select top 1 batch_id, max(DATEADD(second, start_time, '1970/01/01 00:00:00')) start_time
from {self.lakehouse}.dbo.execution_log
group by batch_id
order by start_time desc
) b on a.batch_id = b.batch_id
where a.status = 'error'
"""
_fas = fas.FabricApiSql(console=self.console, server=self.sql_endpoint, database=self.lakehouse)
_fas.ExecuteSQL(sql=sql, progress=progress, task_id=task_id)
else:
progress.print("SQL Endpoint not found in profile. Skipping Execution Results", level=LogLevel.WARNING)

3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ requests==2.31.0
typer>=0.12
azure-storage-file-datalake
setuptools>=72.1.0
pip-system-certs
pip-system-certs
pyodbc
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def _get_dbt_core_version():
"typer>=0.12.3",
"setuptools>=72.1.0",
"azure-storage-file-datalake",
"pip-system-certs"
"pip-system-certs",
"pyodbc"

],
zip_safe=False,
Expand Down

0 comments on commit ba4eb38

Please sign in to comment.