Skip to content

Commit

Permalink
尝试修改单元测试
Browse files Browse the repository at this point in the history
  • Loading branch information
woshiyanghai committed Sep 20, 2024
1 parent 72f49bd commit accd105
Showing 1 changed file with 47 additions and 16 deletions.
63 changes: 47 additions & 16 deletions sql/test_archiver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from unittest.mock import patch
from unittest.mock import patch,MagicMock

from django.conf import settings
from django.contrib.auth.models import Permission
Expand All @@ -16,6 +16,7 @@
ArchiveConfig,
WorkflowAudit,
WorkflowAuditSetting,
WorkflowAuditDetail,
)
from sql.tests import User

Expand Down Expand Up @@ -79,6 +80,14 @@ def setUp(self):
create_user="",
create_user_display="",
)
self.workflow_audit_detail= WorkflowAuditDetail.objects.create(
audit_id=1,
audit="",
audit_user="",
audit_time="",
audit_status="",
remark="",
)
self.sys_config = SysConfig()
self.client = Client()

Expand Down Expand Up @@ -248,40 +257,62 @@ def test_archive_apply_auto_pass(self, mock_generate_setting):
assert archive_config.status == WorkflowStatus.PASSED

@patch("sql.utils.workflow_audit.AuditV2.operate")
@patch("sql.archiver.async_task")
def test_archive_audit(self, _async_task, mock_operate):
@patch("sql.notify.notify_for_audit.apply_async")
def test_archive_audit(self, mock_apply_async, mock_operate):
"""
测试审核归档实例数据
:return:
"""
# 设置模拟对象的返回值
mock_operate.return_value = None

# 设置数据
data = {
"archive_id": self.archive_apply.id,
"audit_status": WorkflowStatus.PASSED,
"audit_remark": "xxxx",
}

# operate 被 patch 了, 这里强制设置一下, 走一下流程
self.audit_flow.current_status = WorkflowStatus.PASSED
self.audit_flow.save()

# 登录超级用户
self.client.force_login(self.superuser)
r = self.client.post(path="/archive/audit/", data=data)

# 发送 POST 请求
response = self.client.post(path="/archive/audit/", data=data)

# 检查重定向
self.assertRedirects(
r, f"/archive/{self.archive_apply.id}/", fetch_redirect_response=False
response, f"/archive/{self.archive_apply.id}/", fetch_redirect_response=False
)

# 刷新归档申请对象
self.archive_apply.refresh_from_db()
assert self.archive_apply.state == True
assert self.archive_apply.status == WorkflowStatus.PASSED

@patch("sql.archiver.async_task")
def test_add_archive_task(self, _async_task):
"""
测试添加异步归档任务
:return:
"""
add_archive_task()
# 检查归档申请状态
self.assertTrue(self.archive_apply.state)
self.assertEqual(self.archive_apply.status, WorkflowStatus.PASSED)

@patch("sql.archiver.async_task")
def test_add_archive(self, _async_task):
# 检查 apply_async 是否被调用
mock_apply_async.assert_called_once_with(
args=[
self.audit_flow.id,
self.workflow_audit_detail.audit_detail_id,
],
time_limit=60,
task_id=f'archive-audit-{self.archive_apply.id}'
)


@patch('sql.archiver.add_archive_task.apply_async')
def test_add_archive_task(self, mock_apply_async):
# 调用 add.apply_async
add_archive_task.apply_async()

@patch("sql.archiver.archive.apply_async")
def test_add_archive(self, mock_apply_async):
"""
测试执行归档任务
:return:
Expand Down

0 comments on commit accd105

Please sign in to comment.