Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加离线导出工单,可通过查询页面右侧切换 #2685

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions common/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import MySQLdb
import simplejson as json
from django.http import HttpResponse
from paramiko import Transport, SFTPClient
import oss2
import os

from common.utils.permission import superuser_required
from sql.engines import get_engine
Expand Down Expand Up @@ -131,3 +134,84 @@
result["msg"] = "无法连接实例,\n{}".format(str(e))
# 返回结果
return HttpResponse(json.dumps(result), content_type="application/json")


@superuser_required
def file_storage_connect(request):
result = {"status": 0, "msg": "ok", "data": []}
storage_type = request.POST.get("storage_type")
# 检查是否存在该变量
max_export_rows = request.POST.get("max_export_rows", "10000")
max_export_exec_time = request.POST.get("max_export_exec_time", "60")
files_expire_with_days = request.POST.get("files_expire_with_days", "0")
# 若变量已经定义,检查是否为空
max_export_rows = max_export_rows if max_export_rows else "10000"
max_export_exec_time = max_export_exec_time if max_export_exec_time else "60"
files_expire_with_days = files_expire_with_days if files_expire_with_days else "0"
check_list = {
"max_export_rows": max_export_rows,
"max_export_exec_time": max_export_exec_time,
"files_expire_with_days": files_expire_with_days,
}
try:
# 遍历字典,判断是否只有数字
for key, value in check_list.items():
if not value.isdigit():
raise TypeError(f"Value: {key} \nmust be an integer.")
except TypeError as e:
result["status"] = 1
result["msg"] = "参数类型错误,\n{}".format(str(e))

if storage_type == "sftp":
sftp_host = request.POST.get("sftp_host")
sftp_port = int(request.POST.get("sftp_port"))
sftp_user = request.POST.get("sftp_user")
sftp_password = request.POST.get("sftp_password")
sftp_path = request.POST.get("sftp_path")

try:
with Transport((sftp_host, sftp_port)) as transport:
transport.connect(username=sftp_user, password=sftp_password)
# 创建 SFTPClient
sftp = SFTPClient.from_transport(transport)
remote_path = sftp_path
try:
sftp.listdir(remote_path)
except FileNotFoundError:
raise Exception(f"SFTP 远程路径 '{remote_path}' 不存在")

except Exception as e:
result["status"] = 1
result["msg"] = "无法连接,\n{}".format(str(e))
elif storage_type == "oss":
access_key_id = request.POST.get("access_key_id")
access_key_secret = request.POST.get("access_key_secret")
endpoint = request.POST.get("endpoint")
bucket_name = request.POST.get("bucket_name")
try:
# 创建 OSS 认证
auth = oss2.Auth(access_key_id, access_key_secret)
# 创建 OSS Bucket 对象
bucket = oss2.Bucket(auth, endpoint, bucket_name)

# 判断配置的 Bucket 是否存在
try:
bucket.get_bucket_info()
except oss2.exceptions.NoSuchBucket:
raise Exception(f"OSS 存储桶 '{bucket_name}' 不存在")

except Exception as e:
result["status"] = 1
result["msg"] = "无法连接,\n{}".format(str(e))
elif storage_type == "local":
local_path = r"{}".format(request.POST.get("local_path"))
try:
if not os.path.exists(local_path):
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved Hide resolved

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
raise FileNotFoundError(
f"Destination directory '{local_path}' not found."
)
except Exception as e:
result["status"] = 1
result["msg"] = "本地路径不存在,\n{}".format(str(e))

return HttpResponse(json.dumps(result), content_type="application/json")
303 changes: 303 additions & 0 deletions common/templates/config.html

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def sql_workflow(db_instance):
instance=db_instance,
db_name="some_db",
syntax_type=1,
is_offline_export="no",
)
wf_content = SqlWorkflowContent.objects.create(
workflow=wf, sql_content="some_sql", execute_result=""
Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ mozilla-django-oidc==3.0.0
django-auth-dingding==0.0.3
django-cas-ng==4.3.0
cassandra-driver
sqlparse==0.4.4
paramiko==3.4.0
oss2==2.18.3
openpyxl==3.1.2
6 changes: 5 additions & 1 deletion sql/engines/goinception.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ def escape_string(self, value: str) -> str:
"""字符串参数转义"""
return pymysql.escape_string(value)

def execute_check(self, instance=None, db_name=None, sql=""):
def execute_check(
self, instance=None, db_name=None, sql="", is_offline_export=None
):
"""inception check"""
# 判断如果配置了隧道则连接隧道
host, port, user, password = self.remote_instance_conn(instance)
Expand Down Expand Up @@ -99,6 +101,8 @@ def execute_check(self, instance=None, db_name=None, sql=""):
if check_result.syntax_type == 2:
if get_syntax_type(r[5], parser=False, db_type="mysql") == "DDL":
check_result.syntax_type = 1
if is_offline_export == "yes":
check_result.syntax_type = 3
check_result.column_list = inception_result.column_list
check_result.checked = True
check_result.error = inception_result.error
Expand Down
69 changes: 41 additions & 28 deletions sql/engines/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .models import ResultSet, ReviewResult, ReviewSet
from sql.utils.data_masking import data_masking
from common.config import SysConfig
from sql.engines.offlinedownload import OffLineDownLoad

logger = logging.getLogger("default")

Expand Down Expand Up @@ -71,6 +72,7 @@ def __init__(self, instance=None):
super().__init__(instance=instance)
self.config = SysConfig()
self.inc_engine = GoInceptionEngine()
self.sql_export = OffLineDownLoad()

def get_connection(self, db_name=None):
# https://stackoverflow.com/questions/19256155/python-mysqldb-returning-x01-for-bit-values
Expand Down Expand Up @@ -621,12 +623,19 @@ def query_masking(self, db_name=None, sql="", resultset=None):
mask_result = resultset
return mask_result

def execute_check(self, db_name=None, sql=""):
def execute_check(self, db_name=None, sql="", offline_data=None):
"""上线单执行前的检查, 返回Review set"""
# 获取离线导出工单参数
offline_exp = (
offline_data["is_offline_export"] if offline_data is not None else "0"
)
# 进行Inception检查,获取检测结果
try:
check_result = self.inc_engine.execute_check(
instance=self.instance, db_name=db_name, sql=sql
instance=self.instance,
db_name=db_name,
sql=sql,
is_offline_export=offline_exp,
)
except Exception as e:
logger.debug(
Expand Down Expand Up @@ -659,10 +668,11 @@ def execute_check(self, db_name=None, sql=""):
syntax_type = get_syntax_type(statement, parser=False, db_type="mysql")
# 禁用语句
if re.match(r"^select", statement.lower()):
check_result.error_count += 1
row.stagestatus = "驳回不支持语句"
row.errlevel = 2
row.errormessage = "仅支持DML和DDL语句,查询语句请使用SQL查询功能!"
if offline_exp != "yes":
check_result.error_count += 1
row.stagestatus = "驳回不支持语句"
row.errlevel = 2
row.errormessage = "仅支持DML和DDL语句,查询语句请使用SQL查询功能!"
# 高危语句
elif critical_ddl_regex and p.match(statement.strip().lower()):
check_result.error_count += 1
Expand All @@ -681,28 +691,31 @@ def execute_check(self, db_name=None, sql=""):

def execute_workflow(self, workflow):
"""执行上线单,返回Review set"""
# 判断实例是否只读
read_only = self.query(sql="SELECT @@global.read_only;").rows[0][0]
if read_only in (1, "ON"):
result = ReviewSet(
full_sql=workflow.sqlworkflowcontent.sql_content,
rows=[
ReviewResult(
id=1,
errlevel=2,
stagestatus="Execute Failed",
errormessage="实例read_only=1,禁止执行变更语句!",
sql=workflow.sqlworkflowcontent.sql_content,
)
],
)
result.error = ("实例read_only=1,禁止执行变更语句!",)
return result
# TODO 原生执行
# if workflow.is_manual == 1:
# return self.execute(db_name=workflow.db_name, sql=workflow.sqlworkflowcontent.sql_content)
# inception执行
return self.inc_engine.execute(workflow)
if workflow.is_offline_export == "yes":
return self.sql_export.execute_offline_download(workflow)
else:
# 判断实例是否只读
read_only = self.query(sql="SELECT @@global.read_only;").rows[0][0]
if read_only in (1, "ON"):
result = ReviewSet(
full_sql=workflow.sqlworkflowcontent.sql_content,
rows=[
ReviewResult(
id=1,
errlevel=2,
stagestatus="Execute Failed",
errormessage="实例read_only=1,禁止执行变更语句!",
sql=workflow.sqlworkflowcontent.sql_content,
)
],
)
result.error = ("实例read_only=1,禁止执行变更语句!",)
return result
# TODO 原生执行
# if workflow.is_manual == 1:
# return self.execute(db_name=workflow.db_name, sql=workflow.sqlworkflowcontent.sql_content)
# inception执行
return self.inc_engine.execute(workflow)

def execute(self, db_name=None, sql="", close_conn=True, parameters=None):
"""原生执行语句"""
Expand Down
Loading
Loading