Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added in console master execution summary #151

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbt/adapters/fabricsparknb/fabric_spark_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ class SparkCredentials(Credentials):
lakehouse: str = None
lakehouseid: str = None # type: ignore
endpoint: Optional[str] = "https://msitapi.fabric.microsoft.com/v1"
sql_endpoint: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
tenant_id: Optional[str] = None
authentication: str= "CLI"
authentication: str = "CLI"
connect_retries: int = 1
connect_timeout: int = 10
livy_session_parameters: Dict[str, Any] = field(default_factory=dict)
Expand Down
2 changes: 1 addition & 1 deletion dbt_wrapper/fabric_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def GetWorkspaceName(self, workspace_id):
fc = FabricClientCore(silent=True)
workspace = fc.get_workspace_by_id(id=workspace_id)
return workspace.display_name

def APIRunNotebook(self, progress: ProgressConsoleWrapper, task_id, workspace_id, notebook_name):
fc = FabricClientCore(silent=True)
workspace = fc.get_workspace_by_id(id=workspace_id)
Expand Down
51 changes: 51 additions & 0 deletions dbt_wrapper/fabric_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import struct
from itertools import chain, repeat
import pyodbc
from azure.identity import AzureCliCredential
from dbt_wrapper.stage_executor import ProgressConsoleWrapper
from rich.table import Table
from rich.console import Console

class FabricApiSql:
def __init__(self, console, server, database):
self.console = console
self.server = server
self.database = database

def ExecuteSQL(self, sql, progress: ProgressConsoleWrapper, task_id):
credential = AzureCliCredential() # use your authentication mechanism of choice
sql_endpoint = f"{self.server}.datawarehouse.fabric.microsoft.com" # copy and paste the SQL endpoint from any of the Lakehouses or Warehouses in your Fabric Workspace

database = f"{self.database}" # copy and paste the name of the Lakehouse or Warehouse you want to connect to

connection_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server={sql_endpoint},1433;Database=f{database};Encrypt=Yes;TrustServerCertificate=No"

token_object = credential.get_token("https://database.windows.net//.default") # Retrieve an access token valid to connect to SQL databases
token_as_bytes = bytes(token_object.token, "UTF-8") # Convert the token to a UTF-8 byte string
encoded_bytes = bytes(chain.from_iterable(zip(token_as_bytes, repeat(0)))) # Encode the bytes to a Windows byte string
token_bytes = struct.pack("<i", len(encoded_bytes)) + encoded_bytes # Package the token into a bytes object
attrs_before = {1256: token_bytes} # Attribute pointing to SQL_COPT_SS_ACCESS_TOKEN to pass access token to the driver

connection = pyodbc.connect(connection_string, attrs_before=attrs_before)
cursor = connection.cursor()

cursor.execute(sql)
rows = cursor.fetchall()

# Create a table
table = Table(title="Results")

# Add columns to the table
for column in cursor.description:
table.add_column(column[0])

# Add rows to the table
for row in rows:
cells = [str(cell) for cell in row]
table.add_row(*cells)

# Print the table using the progress console
progress.console.print(table)

cursor.close()
connection.close()
2 changes: 1 addition & 1 deletion dbt_wrapper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def run_all(
se.perform_stage(option=upload_notebooks_via_api, action_callables=[wrapper_commands.AutoUploadNotebooksViaApi], stage_name="Upload Notebooks via API")

se.perform_stage(option=auto_run_master_notebook, action_callables=[wrapper_commands.RunMasterNotebook], stage_name="Run Master Notebook")

se.perform_stage(option=auto_run_master_notebook, action_callables=[wrapper_commands.GetExecutionResults], stage_name="Get Execution Results")

if __name__ == "__main__":
app()
26 changes: 26 additions & 0 deletions dbt_wrapper/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import dbt_wrapper.utils as mn
import dbt_wrapper.generate_files as gf
from dbt_wrapper.fabric_api import FabricAPI as fa
import dbt_wrapper.fabric_sql as fas
from dbt_wrapper.log_levels import LogLevel
from dbt_wrapper.stage_executor import ProgressConsoleWrapper
from rich import print
Expand Down Expand Up @@ -55,6 +56,11 @@ def GetDbtConfigs(self, dbt_project_dir, dbt_profiles_dir=None):
self.profile_info = self.profile[self.config['profile']]
self.target_info = self.profile_info['outputs'][self.profile_info['target']]
self.lakehouse = self.target_info['lakehouse']
if "sql_endpoint" in self.target_info.keys():
self.sql_endpoint = self.target_info['sql_endpoint']
else:
self.sql_endpoint = None

self.project_name = self.config['name']
#self.workspaceid = self.config['workspaceid']

Expand Down Expand Up @@ -160,3 +166,23 @@ def RunMetadataExtract(self, progress: ProgressConsoleWrapper, task_id):
def RunMasterNotebook(self, progress: ProgressConsoleWrapper, task_id):
nb_name = f"master_{self.project_name}_notebook"
self.fa.APIRunNotebook(progress=progress, task_id=task_id, workspace_id=self.target_info['workspaceid'], notebook_name=nb_name)

def GetExecutionResults(self, progress: ProgressConsoleWrapper, task_id):
if self.sql_endpoint is not None:
sql = f"""
Select a.notebook, replace(CONVERT(varchar(20), DATEADD(second, a.start_time, '1970/01/01 00:00:00'),126), 'T',' ') start_time, status, error
from {self.lakehouse}.dbo.execution_log a
join
(
Select top 1 batch_id, max(DATEADD(second, start_time, '1970/01/01 00:00:00')) start_time
from {self.lakehouse}.dbo.execution_log
group by batch_id
order by start_time desc
) b on a.batch_id = b.batch_id
where a.status = 'error'
"""
_fas = fas.FabricApiSql(console=self.console, server=self.sql_endpoint, database=self.lakehouse)
_fas.ExecuteSQL(sql=sql, progress=progress, task_id=task_id)
else:
progress.print("SQL Endpoint not found in profile. Skipping Execution Results", level=LogLevel.WARNING)

3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ requests==2.31.0
typer>=0.12
azure-storage-file-datalake
setuptools>=72.1.0
pip-system-certs
pip-system-certs
pyodbc
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def _get_dbt_core_version():
"typer>=0.12.3",
"setuptools>=72.1.0",
"azure-storage-file-datalake",
"pip-system-certs"
"pip-system-certs",
"pyodbc"

],
zip_safe=False,
Expand Down
Loading