Skip to content

Commit

Permalink
Merge pull request #100 from Insight-Services-APAC/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
grantkriegerai authored Jul 25, 2024
2 parents c2d0508 + 2a3ebfa commit 709aef1
Show file tree
Hide file tree
Showing 50 changed files with 1,808 additions and 82 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python


name: Build_TestProj_Post_Install_Main


on:
push:
branches: [ "dev" ]
branches: [ "main", "dev"]
pull_request:
branches: [ "dev" ]
branches: [ "main", "dev"]

permissions:
contents: read
Expand All @@ -29,11 +27,10 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install .
python test_post_install.py samples/testproj 0 samples/testproj
dbt_wrapper run-all-local samples/testproj samples/testproj
- name: Archive testproj artifacts
uses: actions/upload-artifact@v4
with:
name: dist-without-markdown
path: |
samples/testproj/target/
samples_tests/target/
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ dbt-insightfabric/
.spark-warehouse/
dbt-integration-tests
*/target/*
/site
/site
/samples_tests
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"type": "debugpy",
"request": "launch",
"program": "./test_pre_install.py",
"args": ["run-all", "--pre-install", "--no-build-dbt-project", "--no-upload-notebooks-via-api", "--no-generate-post-dbt-scripts", "--log-level", "warning", "samples/testproj2"],
"console": "integratedTerminal"
}
]
Expand Down
17 changes: 17 additions & 0 deletions =0.12
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Collecting typer
Using cached typer-0.12.3-py3-none-any.whl.metadata (15 kB)
Requirement already satisfied: click>=8.0.0 in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from typer) (8.1.7)
Requirement already satisfied: typing-extensions>=3.7.4.3 in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from typer) (4.11.0)
Collecting shellingham>=1.3.0 (from typer)
Using cached shellingham-1.5.4-py2.py3-none-any.whl.metadata (3.5 kB)
Collecting rich>=10.11.0 (from typer)
Using cached rich-13.7.1-py3-none-any.whl.metadata (18 kB)
Requirement already satisfied: colorama in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from click>=8.0.0->typer) (0.4.6)
Requirement already satisfied: markdown-it-py>=2.2.0 in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from rich>=10.11.0->typer) (3.0.0)
Requirement already satisfied: pygments<3.0.0,>=2.13.0 in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from rich>=10.11.0->typer) (2.18.0)
Requirement already satisfied: mdurl~=0.1 in c:\users\gkrieger\apac-capability-dai-dbtfabricsparknb-1\dbt-env\lib\site-packages (from markdown-it-py>=2.2.0->rich>=10.11.0->typer) (0.1.2)
Using cached typer-0.12.3-py3-none-any.whl (47 kB)
Using cached rich-13.7.1-py3-none-any.whl (240 kB)
Using cached shellingham-1.5.4-py2.py3-none-any.whl (9.8 kB)
Installing collected packages: shellingham, rich, typer
Successfully installed rich-13.7.1 shellingham-1.5.4 typer-0.12.3
Empty file added __init__.py
Empty file.
11 changes: 0 additions & 11 deletions dbt/adapters/fabricsparknb/catalog.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import io
from types import SimpleNamespace
import json
import agate

Expand Down Expand Up @@ -64,16 +63,6 @@ def GetColumnsInRelation(profile, schema, identifier):
return filtered_table


@staticmethod
def GetMetaHashes(project_root):
# Open the file
with io.open(project_root + '/metaextracts/MetaHashes.json', 'r') as file:
# Load JSON data from file
data = json.load(file)

return data


@staticmethod
def ListSchemas(profile):
# Open the file
Expand Down
38 changes: 29 additions & 9 deletions dbt/adapters/fabricsparknb/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from typing import Any, Optional, Union, Tuple, List, Generator, Iterable, Sequence
from abc import ABC, abstractmethod
import time
import json
import re

logger = AdapterLogger("Microsoft Fabric-Spark")
for logger_name in [
Expand Down Expand Up @@ -209,23 +211,41 @@ def fetch_spark_version(cls, connection) -> None:
os.environ["DBT_SPARK_VERSION"] = SparkConnectionManager.spark_version
logger.debug(f"SPARK VERSION {os.getenv('DBT_SPARK_VERSION')}")

def CheckSqlForModelCommentBlock(self, sql) -> bool:
# Extract the comments from the SQL
comments = re.findall(r'/\*(.*?)\*/', sql, re.DOTALL)

# Convert each comment to a JSON object
merged_json = {}
for comment in comments:
try:
json_object = json.loads(comment)
merged_json.update(json_object)
except json.JSONDecodeError:
#logger.error('Could not parse comment as JSON')
#logger.error(comment)
pass

if 'node_id' in merged_json.keys():
return True
else:
return False

def add_query(
self,
sql: str,
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
) -> Tuple[Connection, Any]:
self,
sql: str,
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
) -> Tuple[Connection, Any]:

if(sql.__contains__('/*FABRICSPARKNB_ALERT:')):
if (sql.__contains__('/*FABRICSPARKNB_ALERT:')):
raise dbt.exceptions.DbtRuntimeError(sql)

connection = self.get_thread_connection()
if(dbt.adapters.fabricsparknb.utils.CheckSqlForModelCommentBlock(sql) == False):
if (self.CheckSqlForModelCommentBlock(sql) == False):
sql = self._add_query_comment(sql)
sql = '/*{"project_root": "'+ self.profile.project_root + '"}*/' + f'\n{sql}'
sql = '/*{"project_root": "' + self.profile.project_root + '"}*/' + f'\n{sql}'

if auto_begin and connection.transaction_open is False:
self.begin()
Expand Down
3 changes: 1 addition & 2 deletions dbt/adapters/fabricsparknb/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def execute(
:rtype: Tuple[AdapterResponse, agate.Table]
"""
# Convert self.config to a JSON string
project_root = self.config.project_root
project_root = (self.config.project_root).replace('\\', '/')

# Inject the JSON into the SQL as a comment
sql = '/*{"project_root": "'+ project_root + '"}*/' + f'\n{sql}'
Expand All @@ -510,7 +510,6 @@ def list_schemas(self, database: str) -> List[str]:

return [row[0] for row in results]


def check_schema_exists(self, database: str, schema: str) -> bool:
#logger.debug("Datalake name is ", schema)
results = catalog.ListSchema(profile=self.config, schema=schema)
Expand Down
9 changes: 4 additions & 5 deletions dbt/adapters/fabricsparknb/livysession.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
from azure.identity import AzureCliCredential, ClientSecretCredential
from dbt.adapters.fabricspark.fabric_spark_credentials import SparkCredentials
import nbformat as nbf

from pathlib import Path
import dbt.adapters
import dbt.adapters.fabricsparknb
import dbt.adapters.fabricsparknb.notebook
import dbt.adapters.fabricsparknb.utils

logger = AdapterLogger("fabricsparknb")
NUMBERS = DECIMALS + (int, float)
Expand Down Expand Up @@ -436,10 +435,10 @@ def execute(self, sql: str, *parameters: Any) -> None:
print("Node ID not found in the SQL")
print(sql)

project_root = merged_json['project_root']
notebook_dir = f'{project_root}/target/notebooks/'
project_root = merged_json['project_root'].replace("\\", "/")
notebook_dir = Path(project_root) / Path("target") / Path("notebooks")
# Use the node_id as the filename
filename = f'{notebook_dir}/{node_id}.ipynb'
filename = str(Path(notebook_dir) / Path(f'{node_id}.ipynb'))

# Create the directory if it does not exist
os.makedirs(notebook_dir, exist_ok=True)
Expand Down
56 changes: 42 additions & 14 deletions dbt/adapters/fabricsparknb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import base64



@staticmethod
def CheckSqlForModelCommentBlock(sql) -> bool:
# Extract the comments from the SQL
Expand Down Expand Up @@ -347,8 +348,18 @@ def IPYNBtoFabricPYFile(dbt_project_dir):
data = json.loads(f.read())
for cell in data['cells']:
if (cell["cell_type"] == "code"):
ParamCell = False
try:
for tag in cell["metadata"]["tags"]:
if(tag == "parameters"):
ParamCell = True
break
except:pass

if (cell["source"][0][:5] == "%%sql"):
python_file.write("# CELL ********************\n\n")
if (ParamCell == False):
python_file.write("# CELL ********************\n\n")
else: python_file.write("# PARAMETERS CELL ********************\n\n")
for sourceline in cell['source']:
line = "# MAGIC "+ sourceline
python_file.write(line)
Expand All @@ -359,7 +370,9 @@ def IPYNBtoFabricPYFile(dbt_project_dir):
python_file.write("# META \"language_group\": \"synapse_pyspark\"\n")
python_file.write("# META }\n\n")
elif (cell["source"][0][:11] == "%%configure"):
python_file.write("# CELL ********************\n\n")
if (ParamCell == False):
python_file.write("# CELL ********************\n\n")
else: python_file.write("# PARAMETERS CELL ********************\n\n")
for sourceline in cell['source']:
line = "# MAGIC "+ sourceline
python_file.write(line)
Expand All @@ -370,13 +383,17 @@ def IPYNBtoFabricPYFile(dbt_project_dir):
python_file.write("# META \"language_group\": \"synapse_pyspark\"\n")
python_file.write("# META }\n\n")
elif (cell["source"][0][:2] == "%%"):
python_file.write("# CELL ********************\n\n")
if (ParamCell == False):
python_file.write("# CELL ********************\n\n")
else: python_file.write("# PARAMETERS CELL ********************\n\n")
for sourceline in cell['source']:
line = "# MAGIC "+ sourceline
python_file.write(line)
python_file.write("\n\n")
else:
python_file.write("# CELL ********************\n\n")
if (ParamCell == False):
python_file.write("# CELL ********************\n\n")
else: python_file.write("# PARAMETERS CELL ********************\n\n")
for sourceline in cell['source']:
python_file.write(sourceline)
python_file.write("\n\n")
Expand Down Expand Up @@ -533,18 +550,21 @@ def SortManifest(nodes_orig):
sort_order += 1
return nodes_orig


def stringToBase64(s):
return base64.b64encode(s.encode('utf-8')).decode('utf-8')


def base64ToString(b):
return base64.b64decode(b).decode('utf-8')


def GenerateNotebookContent(notebookcontentBase64):
notebook_w_content = {'parts': [{'path': 'notebook-content.py', 'payload': notebookcontentBase64, 'payloadType': 'InlineBase64'}]}
return notebook_w_content


def findnotebookid(notebooks,displayname):
def findnotebookid(notebooks, displayname):
for notebook in notebooks:
if notebook.display_name == displayname:
return notebook.id
Expand Down Expand Up @@ -591,7 +611,7 @@ def PrintFirstTimeRunningMessage():


@staticmethod
def RunDbtProject(PreInstall=False,Upload=False):
def RunDbtProject(PreInstall=False, Upload=False):
# Get Config and Profile Information from dbt


Expand Down Expand Up @@ -668,17 +688,23 @@ def RunDbtProjectArg(PreInstall:bool, argv:list[str]):
else:
print("Please supply at least DBT project directory as a parameter.")



#@staticmethod
#def UploadNotebook(self, directory_client: DataLakeDirectoryClient, local_dir_path: str, file_name: str):







# @staticmethod
# def UploadNotebook(self, directory_client: DataLakeDirectoryClient, local_dir_path: str, file_name: str):
# file_client = directory_client.get_file_client(file_name)
# with io.open(file=os.path.join(local_dir_path, file_name), mode="rb") as data:
# file_client.upload_data(data, overwrite=True)


#@staticmethod
#def UploadAllNotebooks(workspacename: str, datapath: str):
# @staticmethod
# def UploadAllNotebooks(workspacename: str, datapath: str):
# print("Started uploading to :" + workspacename + " file path " + datapath)
# account_name = "onelake" # always this
# account_url = f"https://{account_name}.dfs.fabric.microsoft.com"
Expand All @@ -690,15 +716,17 @@ def RunDbtProjectArg(PreInstall:bool, argv:list[str]):
# print(datapath)
# paths = file_system_client.get_paths(path=datapath)
# print("\nCurrent paths in the workspace:")
#

# for path in paths:
# print(path.name + '\n')
#

# # directory_client = DataLakeDirectoryClient(account_url,workspacename,datapath, credential=token_credential);
# notebookarr = os.listdir(Path(local_notebook_path))
#

# for notebook in notebookarr:
# # UploadNotebook(file_system_client,directory_client,local_notebook_path,notebook)
# print("Uploaded:" + notebook)
# print("Completed uploading to :" + workspacename + " file path " + datapath)
# print("Be sure to run the notebook import from Fabric")


2 changes: 1 addition & 1 deletion dbt/include/fabricsparknb/notebooks/master_notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@
" print(\n",
" h['file'] + '\\n \\t Emb Hash: ' + get_hash(h['file'], embedded_hashes) + '\\n \\t Env Hash: ' + get_hash(h['file'], current_hashes)\n",
" )\n",
" raise Exception('Hashes do not match. Please re-generate the dbt project using the latest extract of the target environment metadata.')\n",
" print('Warning!: Hashes do not match. Its recommended to re-generate the dbt project using the latest extract of the target environment metadata.')\n",
"else:\n",
" print('Metadata Hashes Match 😏')"
]
Expand Down
Loading

0 comments on commit 709aef1

Please sign in to comment.