Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#93 Download metaextracts via Python #95

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions =0.12
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Collecting typer
Using cached typer-0.12.3-py3-none-any.whl.metadata (15 kB)
Requirement already satisfied: click>=8.0.0 in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from typer) (8.1.7)
Requirement already satisfied: typing-extensions>=3.7.4.3 in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from typer) (4.11.0)
Collecting shellingham>=1.3.0 (from typer)
Using cached shellingham-1.5.4-py2.py3-none-any.whl.metadata (3.5 kB)
Collecting rich>=10.11.0 (from typer)
Using cached rich-13.7.1-py3-none-any.whl.metadata (18 kB)
Requirement already satisfied: colorama in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from click>=8.0.0->typer) (0.4.6)
Requirement already satisfied: markdown-it-py>=2.2.0 in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from rich>=10.11.0->typer) (3.0.0)
Requirement already satisfied: pygments<3.0.0,>=2.13.0 in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from rich>=10.11.0->typer) (2.18.0)
Requirement already satisfied: mdurl~=0.1 in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from markdown-it-py>=2.2.0->rich>=10.11.0->typer) (0.1.2)
Using cached typer-0.12.3-py3-none-any.whl (47 kB)
Using cached rich-13.7.1-py3-none-any.whl (240 kB)
Using cached shellingham-1.5.4-py2.py3-none-any.whl (9.8 kB)
Installing collected packages: shellingham, rich, typer
Successfully installed rich-13.7.1 shellingham-1.5.4 typer-0.12.3
32 changes: 22 additions & 10 deletions dbt/adapters/fabricsparknb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import base64



@staticmethod
def CheckSqlForModelCommentBlock(sql) -> bool:
# Extract the comments from the SQL
Expand Down Expand Up @@ -549,18 +550,21 @@ def SortManifest(nodes_orig):
sort_order += 1
return nodes_orig


def stringToBase64(s):
return base64.b64encode(s.encode('utf-8')).decode('utf-8')


def base64ToString(b):
return base64.b64decode(b).decode('utf-8')


def GenerateNotebookContent(notebookcontentBase64):
notebook_w_content = {'parts': [{'path': 'notebook-content.py', 'payload': notebookcontentBase64, 'payloadType': 'InlineBase64'}]}
return notebook_w_content


def findnotebookid(notebooks,displayname):
def findnotebookid(notebooks, displayname):
for notebook in notebooks:
if notebook.display_name == displayname:
return notebook.id
Expand Down Expand Up @@ -607,7 +611,7 @@ def PrintFirstTimeRunningMessage():


@staticmethod
def RunDbtProject(PreInstall=False,Upload=False):
def RunDbtProject(PreInstall=False, Upload=False):
# Get Config and Profile Information from dbt


Expand Down Expand Up @@ -684,17 +688,23 @@ def RunDbtProjectArg(PreInstall:bool, argv:list[str]):
else:
print("Please supply at least DBT project directory as a parameter.")



#@staticmethod
#def UploadNotebook(self, directory_client: DataLakeDirectoryClient, local_dir_path: str, file_name: str):







# @staticmethod
# def UploadNotebook(self, directory_client: DataLakeDirectoryClient, local_dir_path: str, file_name: str):
# file_client = directory_client.get_file_client(file_name)
# with io.open(file=os.path.join(local_dir_path, file_name), mode="rb") as data:
# file_client.upload_data(data, overwrite=True)


#@staticmethod
#def UploadAllNotebooks(workspacename: str, datapath: str):
# @staticmethod
# def UploadAllNotebooks(workspacename: str, datapath: str):
# print("Started uploading to :" + workspacename + " file path " + datapath)
# account_name = "onelake" # always this
# account_url = f"https://{account_name}.dfs.fabric.microsoft.com"
Expand All @@ -706,15 +716,17 @@ def RunDbtProjectArg(PreInstall:bool, argv:list[str]):
# print(datapath)
# paths = file_system_client.get_paths(path=datapath)
# print("\nCurrent paths in the workspace:")
#

# for path in paths:
# print(path.name + '\n')
#

# # directory_client = DataLakeDirectoryClient(account_url,workspacename,datapath, credential=token_credential);
# notebookarr = os.listdir(Path(local_notebook_path))
#

# for notebook in notebookarr:
# # UploadNotebook(file_system_client,directory_client,local_notebook_path,notebook)
# print("Uploaded:" + notebook)
# print("Completed uploading to :" + workspacename + " file path " + datapath)
# print("Be sure to run the notebook import from Fabric")


5 changes: 5 additions & 0 deletions dbt_wrapper/fabric_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ def GetNotebookIdByName(self, workspace_id, notebook_name):
if item.type == 'Notebook' and item.display_name == notebook_name:
return item.id
return None

def GetWorkspaceName(self, workspace_id):
fc = FabricClientCore(silent=True)
workspace = fc.get_workspace_by_id(id=workspace_id)
return workspace.display_name

def APIRunNotebook(self, progress: ProgressConsoleWrapper, task_id, workspace_id, notebook_name):
fc = FabricClientCore(silent=True)
Expand Down
42 changes: 42 additions & 0 deletions dbt_wrapper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
import os
from pathlib import Path
from sysconfig import get_paths
from azure.identity import DefaultAzureCredential
from azure.storage.filedatalake import (
DataLakeServiceClient,
DataLakeDirectoryClient,
FileSystemClient
)
from dbt_wrapper.stage_executor import ProgressConsoleWrapper


@staticmethod
Expand All @@ -21,3 +28,38 @@ def GetIncludeDir():
path = Path(os.getcwd()) / Path('dbt/include/fabricsparknb/')
# print(str(path))
return (path)


def DownloadFile(progress: ProgressConsoleWrapper, task_id, directory_client: DataLakeDirectoryClient, local_path: str, file_name: str):
file_client = directory_client.get_file_client(file_name)
file_name_only = file_name.split('/')[-1] #One drive path
writepath = str(Path(Path(local_path) / Path(file_name_only)))
Path(local_path).mkdir(parents=True, exist_ok=True) #ensure directory exists
with open(file=writepath, mode="wb") as local_file:
download = file_client.download_file()
local_file.write(download.readall())
local_file.close()
progress.progress.update(task_id=task_id, description="Downloaded "+file_name_only)


def DownloadFiles(progress: ProgressConsoleWrapper, task_id, file_system_client: FileSystemClient, directory_name: str, local_notebook_path: str):
progress.progress.update(task_id=task_id, description="Listing metaextract files on one drive...")
paths = file_system_client.get_paths(path=directory_name)
for path in paths:
if (path.name[-5:] == ".json"):
DownloadFile(progress, task_id, file_system_client, local_notebook_path, path.name)



@staticmethod
def DownloadMetaFiles(progress: ProgressConsoleWrapper, task_id, dbt_project_dir, workspacename: str, datapath: str):
progress.progress.update(task_id=task_id, description="Connecting to one drive to download meta extracts...")
account_name = "onelake" # always this
account_url = f"https://{account_name}.dfs.fabric.microsoft.com"
local_notebook_path = str(Path(Path(dbt_project_dir) / Path('metaextracts')))
token_credential = DefaultAzureCredential()
service_client = DataLakeServiceClient(account_url, credential=token_credential)
file_system_client = service_client.get_file_system_client(workspacename)
DownloadFiles(progress, task_id, file_system_client, datapath, local_notebook_path)
progress.progress.update(task_id=task_id, description="Completed download of meta extracts")

24 changes: 10 additions & 14 deletions dbt_wrapper/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from rich import print
from rich.panel import Panel



class Commands:
def __init__(self, console):
self.console = console
Expand All @@ -22,6 +24,7 @@ def __init__(self, console):
self.profile_info = None
self.target_info = None
self.lakehouse = None
self.workspaceid = None
self.project_root = None
self.fa = fa(console=self.console)

Expand Down Expand Up @@ -53,8 +56,8 @@ def GetDbtConfigs(self, dbt_project_dir, dbt_profiles_dir=None):
self.target_info = self.profile_info['outputs'][self.profile_info['target']]
self.lakehouse = self.target_info['lakehouse']
self.project_name = self.config['name']

#self.workspaceid = self.config['workspaceid']

def PrintFirstTimeRunningMessage(self):
print('\033[1;33;48m', "It seems like this is the first time you are running this project. Please update the metadata extract json files in the metaextracts directory by performing the following steps:")
print(f"1. Run ./{os.environ['DBT_PROJECT_DIR']}/target/pwsh/upload.ps1")
Expand Down Expand Up @@ -128,19 +131,12 @@ def BuildDbtProject(self, PreInstall=False):
print(Panel.fit("[blue]End of dbt build >>>>>>>>>>>>>>>>>>>>>>>[/blue]"))

def DownloadMetadata(self, progress: ProgressConsoleWrapper, task_id):
progress.print("Downloading Metadata via Azcopy", level=LogLevel.INFO)
# Get Current Directory
progress.print("Downloading Metadata", level=LogLevel.INFO)
lakehouse = self.lakehouse
curr_dir = os.getcwd()
target = Path(curr_dir) / self.dbt_project_dir / "target" / "pwsh" / "download.ps1"
result = subprocess.run(["pwsh", "–noprofile", str(target)], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# Access the output and error
output = result.stdout.decode('utf-8')
error = result.stderr.decode('utf-8')

progress.print(f"Output: {output}", level=LogLevel.INFO)
if error:
progress.print(f"Error: {error}", level=LogLevel.ERROR)
dbt_project_dir = str(Path(Path(curr_dir) / Path(self.dbt_project_dir)))
workspacename = self.fa.GetWorkspaceName(workspace_id=self.target_info['workspaceid'])
mn.DownloadMetaFiles(progress=progress, task_id=task_id, dbt_project_dir=dbt_project_dir, workspacename=workspacename, datapath=lakehouse + ".lakehouse/Files/MetaExtracts/")

def RunMetadataExtract(self, progress: ProgressConsoleWrapper, task_id):
nb_name = f"metadata_{self.project_name}_extract"
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ sqlparams>=3.0.0
azure-identity>=1.13.0
azure-core>=1.26.4
requests==2.31.0
typer>=0.12
typer>=0.12
azure-storage-file-datalake
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def _get_dbt_core_version():
"azure-core>=1.26.4",
"requests==2.31.0",
"typer>=0.12.3",
"setuptools>=71.0.4"
"setuptools>=71.0.4",
"azure-storage-file-datalake"

],
zip_safe=False,
Expand Down
Loading