Skip to content

Commit

Permalink
Delete data source queued jobs, stored records from the inspector
Browse files Browse the repository at this point in the history
  • Loading branch information
janbaykara committed Jan 6, 2025
1 parent 1fb3b06 commit bf88f1a
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 60 deletions.
41 changes: 41 additions & 0 deletions hub/graphql/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import strawberry_django
from asgiref.sync import async_to_sync
from graphql import GraphQLError
from procrastinate.contrib.django.models import ProcrastinateJob
from strawberry import auto
from strawberry.field_extensions import InputMutationExtension
from strawberry.types.info import Info
Expand All @@ -20,6 +21,7 @@
from hub.graphql.types import model_types
from hub.graphql.utils import graphql_type_to_dict
from hub.models import BatchRequest
from hub.permissions import user_can_manage_source

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -253,6 +255,45 @@ async def import_all(
return ExternalDataSourceAction(id=request_id, external_data_source=data_source)


@strawberry_django.mutation(extensions=[IsAuthenticated()])
def cancel_import(
info: Info, external_data_source_id: str, request_id: str
) -> ExternalDataSourceAction:
data_source: models.ExternalDataSource = models.ExternalDataSource.objects.get(
id=external_data_source_id
)
# Confirm user has access to this source
user = get_current_user(info)
assert user_can_manage_source(user, data_source)
# Update all remaining procrastinate jobs, cancel them
ProcrastinateJob.objects.filter(
args__external_data_source_id=external_data_source_id,
status__in=["todo", "doing"],
args__request_id=request_id,
).update(status="cancelled")
BatchRequest.objects.filter(id=request_id).update(status="cancelled")
#
return ExternalDataSourceAction(id=request_id, external_data_source=data_source)


@strawberry_django.mutation(extensions=[IsAuthenticated()])
def delete_all_records(
info: Info, external_data_source_id: str
) -> model_types.ExternalDataSource:
data_source = models.ExternalDataSource.objects.get(id=external_data_source_id)
# Confirm user has access to this source
user = get_current_user(info)
assert user_can_manage_source(user, data_source)
# Don't import more records, since we want to wipe 'em
ProcrastinateJob.objects.filter(
args__external_data_source_id=external_data_source_id,
status__in=["todo", "doing"],
).update(status="cancelled")
# Delete all data
data_source.get_import_data().all().delete()
return models.ExternalDataSource.objects.get(id=external_data_source_id)


@strawberry_django.input(models.ExternalDataSource, partial=True)
class ExternalDataSourceInput:
id: auto
Expand Down
6 changes: 6 additions & 0 deletions hub/graphql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ class Mutation:
)

import_all: mutation_types.ExternalDataSourceAction = mutation_types.import_all
cancel_import: mutation_types.ExternalDataSourceAction = (
mutation_types.cancel_import
)
delete_all_records: model_types.ExternalDataSource = (
mutation_types.delete_all_records
)

create_map_report: model_types.MapReport = mutation_types.create_map_report
update_map_report: model_types.MapReport = django_mutations.update(
Expand Down
1 change: 1 addition & 0 deletions hub/graphql/types/model_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ProcrastinateJobStatus(Enum):
doing = "doing" #: A worker is running the job
succeeded = "succeeded" #: The job ended successfully
failed = "failed" #: The job ended with an error
cancelled = "cancelled" #: The job was cancelled


@strawberry_django.filters.filter(
Expand Down
22 changes: 12 additions & 10 deletions hub/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,12 +1337,20 @@ def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob, user=No
jobs = self.event_log_queryset().filter(args__request_id=request_id).all()
status = "todo"

number_of_jobs_ahead_in_queue = (
ProcrastinateJob.objects.filter(id__lt=parent_job.id)
.filter(status__in=["todo", "doing"])
.count()
)

if any([job.status == "doing" for job in jobs]):
status = "doing"
elif any([job.status == "failed" for job in jobs]):
status = "failed"
elif all([job.status == "succeeded" for job in jobs]):
status = "succeeded"
elif number_of_jobs_ahead_in_queue <= 0:
status = "succeeded"

total = 0
statuses = dict()
Expand All @@ -1361,12 +1369,6 @@ def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob, user=No
+ statuses.get("doing", 0)
)

number_of_jobs_ahead_in_queue = (
ProcrastinateJob.objects.filter(id__lt=parent_job.id)
.filter(status__in=["todo", "doing"])
.count()
)

time_started = (
ProcrastinateEvent.objects.filter(job_id=parent_job.id)
.order_by("at")
Expand Down Expand Up @@ -1839,7 +1841,7 @@ async def update_many(self, mapped_records: list[MappedMember], **kwargs):
"Update many not implemented for this data source type."
)

def get_record_id(self, record):
def get_record_id(self, record) -> Optional[Union[str, int]]:
"""
Get the ID for a record.
"""
Expand Down Expand Up @@ -2691,7 +2693,7 @@ def field_definitions(self):
]

def get_record_id(self, record: dict):
return record[self.id_field]
return record.get(self.id_field, None)

async def fetch_one(self, member_id):
return self.df[self.df[self.id_field] == member_id].to_dict(orient="records")[0]
Expand Down Expand Up @@ -2833,7 +2835,7 @@ async def fetch_all(self):
return itertools.chain.from_iterable(self.table.iterate())

def get_record_id(self, record):
return record["id"]
return record.get("id", None)

def get_record_field(self, record, field, field_type=None):
d = record["fields"].get(str(field), None)
Expand Down Expand Up @@ -3122,7 +3124,7 @@ def healthcheck(self):
return False

def get_record_id(self, record):
return record["id"]
return record.get("id", None)

def get_record_field(self, record, field: str, field_type=None):
field_options = [
Expand Down
2 changes: 2 additions & 0 deletions hub/permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def user_can_manage_source(user, source):
return source.organisation.members.filter(user=user).exists()
8 changes: 8 additions & 0 deletions hub/tests/fixtures/geocoding_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,12 @@
"expected_area_type_code": "WD23",
"expected_area_gss": "E05014252",
},
# Failed again
{
"id": "West LancashireBurscough Bridge & Rufford",
"ward": "Burscough Bridge & Rufford",
"council": "West Lancashire",
"expected_area_type_code": "WD23",
"expected_area_gss": "E05014930",
},
]
10 changes: 10 additions & 0 deletions nextjs/src/__generated__/gql.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit bf88f1a

Please sign in to comment.