Skip to content

Commit

Permalink
not prefetching images when not needed (#8676)
Browse files Browse the repository at this point in the history
<!-- Raise an issue to propose your change
(https://github.com/cvat-ai/cvat/issues).
It helps to avoid duplication of efforts from multiple independent
contributors.
Discuss your ideas with maintainers to be sure that changes will be
approved and merged.
Read the [Contribution guide](https://docs.cvat.ai/docs/contributing/).
-->

<!-- Provide a general summary of your changes in the Title above -->

### Motivation and context
<!-- Why is this change required? What problem does it solve? If it
fixes an open
issue, please link to the issue here. Describe your changes in detail,
add
screenshots. -->

While importing annotations to task, all jobs of the task are loaded
from db to ram. Related data is prefetched, specifically all image
models which belong to the task.
As a result, each job holds its own copy of all the image models.

If there are a lot of jobs and a lot of images in the task, a lot of
memory can be occupied.
And images are not utilised on annotations import/delete. Hence - do not
prefetch images in these cases.

### How has this been tested?
<!-- Please describe in detail how you tested your changes.
Include details of your testing environment, and the tests you ran to
see how your change affects other areas of the code, etc. -->

### Checklist
<!-- Go over all the following points, and put an `x` in all the boxes
that apply.
If an item isn't applicable for some reason, then ~~explicitly
strikethrough~~ the whole
line. If you don't do that, GitHub will show incorrect progress for the
pull request.
If you're unsure about any of these, don't hesitate to ask. We're here
to help! -->
- [ ] I submit my changes into the `develop` branch
- [ ] I have created a changelog fragment <!-- see top comment in
CHANGELOG.md -->
- [ ] I have updated the documentation accordingly
- [ ] I have added tests to cover my changes
- [ ] I have linked related issues (see [GitHub docs](

https://help.github.com/en/github/managing-your-work-on-github/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword))
- [ ] I have increased versions of npm packages if it is necessary

([cvat-canvas](https://github.com/cvat-ai/cvat/tree/develop/cvat-canvas#versioning),

[cvat-core](https://github.com/cvat-ai/cvat/tree/develop/cvat-core#versioning),

[cvat-data](https://github.com/cvat-ai/cvat/tree/develop/cvat-data#versioning)
and

[cvat-ui](https://github.com/cvat-ai/cvat/tree/develop/cvat-ui#versioning))

### License

- [ ] I submit _my code changes_ under the same [MIT License](
https://github.com/cvat-ai/cvat/blob/develop/LICENSE) that covers the
project.
  Feel free to contact the maintainers if that's a concern.


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
	- Enhanced job retrieval process with improved error handling.
	- Introduced a mechanism for custom querysets in job initialization.
  
- **Bug Fixes**
- Improved robustness in job fetching to prevent failures when jobs are
not found.

- **Refactor**
- Updated logic in the `JobAnnotation` class for clearer control flow
and initialization.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Maria Khrustaleva <maria@cvat.ai>
Co-authored-by: Maxim Zhiltsov <zhiltsov.max35@gmail.com>
3 people authored Dec 4, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 17ec908 commit 7d9c368
Showing 3 changed files with 67 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Fixed

- Optimized memory consumption and reduced the number of database queries
when importing annotations to a task with a lot of jobs and images
(<https://github.com/cvat-ai/cvat/pull/8676>)
6 changes: 4 additions & 2 deletions cvat/apps/dataset_manager/bindings.py
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@
import rq
from attr import attrib, attrs
from datumaro.components.format_detection import RejectionReason
from django.db.models import QuerySet
from django.db.models import Prefetch, QuerySet
from django.utils import timezone
from django.conf import settings

@@ -859,7 +859,9 @@ def __init__(self, annotation_ir: AnnotationIR, db_task: Task, **kwargs):

@staticmethod
def meta_for_task(db_task, host, label_mapping=None):
db_segments = db_task.segment_set.all().prefetch_related('job_set')
db_segments = db_task.segment_set.all().prefetch_related(
Prefetch('job_set', models.Job.objects.order_by("pk"))
)

meta = OrderedDict([
("id", str(db_task.id)),
91 changes: 58 additions & 33 deletions cvat/apps/dataset_manager/task.py
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@
from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError

from django.db import transaction
from django.db.models.query import Prefetch
from django.db.models.query import Prefetch, QuerySet
from django.conf import settings
from rest_framework.exceptions import ValidationError

@@ -81,9 +81,10 @@ def merge_table_rows(rows, keys_for_merge, field_id):

return list(merged_rows.values())


class JobAnnotation:
@classmethod
def add_prefetch_info(cls, queryset):
def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool = True):
assert issubclass(queryset.model, models.Job)

label_qs = add_prefetch_fields(models.Label.objects.all(), [
@@ -93,35 +94,48 @@ def add_prefetch_info(cls, queryset):
])
label_qs = JobData.add_prefetch_info(label_qs)

task_data_queryset = models.Data.objects.all()
if prefetch_images:
task_data_queryset = task_data_queryset.select_related('video').prefetch_related(
Prefetch('images', queryset=models.Image.objects.order_by('frame'))
)

return queryset.select_related(
'segment',
'segment__task',
).prefetch_related(
'segment__task__project',
'segment__task__owner',
'segment__task__assignee',
'segment__task__project__owner',
'segment__task__project__assignee',

Prefetch('segment__task__data',
queryset=models.Data.objects.select_related('video').prefetch_related(
Prefetch('images', queryset=models.Image.objects.order_by('frame'))
)),
Prefetch('segment__task__data', queryset=task_data_queryset),

Prefetch('segment__task__label_set', queryset=label_qs),
Prefetch('segment__task__project__label_set', queryset=label_qs),
)

def __init__(self, pk, *, is_prefetched=False, queryset=None):
if queryset is None:
queryset = self.add_prefetch_info(models.Job.objects)
def __init__(
self,
pk,
*,
lock_job_in_db: bool = False,
queryset: QuerySet | None = None,
prefetch_images: bool = False,
db_job: models.Job | None = None
):
assert db_job is None or lock_job_in_db is False
assert (db_job is None and queryset is None) or prefetch_images is False
assert db_job is None or queryset is None
if db_job is None:
if queryset is None:
queryset = self.add_prefetch_info(models.Job.objects, prefetch_images=prefetch_images)

if lock_job_in_db:
queryset = queryset.select_for_update()

if is_prefetched:
self.db_job: models.Job = queryset.select_related(
'segment__task'
).select_for_update().get(id=pk)
else:
self.db_job: models.Job = get_cached(queryset, pk=int(pk))
else:
self.db_job: models.Job = db_job

db_segment = self.db_job.segment
self.start_frame = db_segment.start_frame
@@ -786,6 +800,7 @@ def import_annotations(self, src_file, importer, **options):

self.create(job_data.data.slice(self.start_frame, self.stop_frame).serialize())


class TaskAnnotation:
def __init__(self, pk):
self.db_task = models.Task.objects.prefetch_related(
@@ -797,8 +812,7 @@ def __init__(self, pk):
requested_job_types.append(models.JobType.GROUND_TRUTH)

self.db_jobs = (
models.Job.objects
.select_related("segment")
JobAnnotation.add_prefetch_info(models.Job.objects, prefetch_images=False)
.filter(segment__task_id=pk, type__in=requested_job_types)
)

@@ -821,14 +835,14 @@ def _patch_data(self, data: Union[AnnotationIR, dict], action: Optional[PatchAct
start = db_job.segment.start_frame
stop = db_job.segment.stop_frame
jobs[jid] = { "start": start, "stop": stop }
splitted_data[jid] = data.slice(start, stop)
splitted_data[jid] = (data.slice(start, stop), db_job)

for jid, job_data in splitted_data.items():
for jid, (job_data, db_job) in splitted_data.items():
data = AnnotationIR(self.db_task.dimension)
if action is None:
data.data = put_job_data(jid, job_data)
data.data = put_job_data(jid, job_data, db_job=db_job)
else:
data.data = patch_job_data(jid, job_data, action)
data.data = patch_job_data(jid, job_data, action, db_job=db_job)

if data.version > self.ir_data.version:
self.ir_data.version = data.version
@@ -936,18 +950,18 @@ def delete(self, data=None):
self._patch_data(data, PatchAction.DELETE)
else:
for db_job in self.db_jobs:
delete_job_data(db_job.id)
delete_job_data(db_job.id, db_job=db_job)

def init_from_db(self):
self.reset()

for db_job in self.db_jobs:
for db_job in self.db_jobs.select_for_update():
if db_job.type == models.JobType.GROUND_TRUTH and not (
self.db_task.data.validation_mode == models.ValidationMode.GT_POOL
):
continue

gt_annotation = JobAnnotation(db_job.id, is_prefetched=True)
gt_annotation = JobAnnotation(db_job.id, db_job=db_job)
gt_annotation.init_from_db()
if gt_annotation.ir_data.version > self.ir_data.version:
self.ir_data.version = gt_annotation.ir_data.version
@@ -1006,19 +1020,21 @@ def get_job_data(pk):

return annotation.data


@silk_profile(name="POST job data")
@transaction.atomic
def put_job_data(pk, data):
annotation = JobAnnotation(pk)
def put_job_data(pk, data: AnnotationIR | dict, *, db_job: models.Job | None = None):
annotation = JobAnnotation(pk, db_job=db_job)
annotation.put(data)

return annotation.data


@silk_profile(name="UPDATE job data")
@plugin_decorator
@transaction.atomic
def patch_job_data(pk, data, action):
annotation = JobAnnotation(pk)
def patch_job_data(pk, data: AnnotationIR | dict, action: PatchAction, *, db_job: models.Job | None = None):
annotation = JobAnnotation(pk, db_job=db_job)
if action == PatchAction.CREATE:
annotation.create(data)
elif action == PatchAction.UPDATE:
@@ -1028,26 +1044,29 @@ def patch_job_data(pk, data, action):

return annotation.data


@silk_profile(name="DELETE job data")
@transaction.atomic
def delete_job_data(pk):
annotation = JobAnnotation(pk)
def delete_job_data(pk, *, db_job: models.Job | None = None):
annotation = JobAnnotation(pk, db_job=db_job)
annotation.delete()


def export_job(job_id, dst_file, format_name, server_url=None, save_images=False):
# For big tasks dump function may run for a long time and
# we dont need to acquire lock after the task has been initialized from DB.
# But there is the bug with corrupted dump file in case 2 or
# more dump request received at the same time:
# https://github.com/cvat-ai/cvat/issues/217
with transaction.atomic():
job = JobAnnotation(job_id)
job = JobAnnotation(job_id, prefetch_images=True, lock_job_in_db=True)
job.init_from_db()

exporter = make_exporter(format_name)
with open(dst_file, 'wb') as f:
job.export(f, exporter, host=server_url, save_images=save_images)


@silk_profile(name="GET task data")
@transaction.atomic
def get_task_data(pk):
@@ -1056,6 +1075,7 @@ def get_task_data(pk):

return annotation.data


@silk_profile(name="POST task data")
@transaction.atomic
def put_task_data(pk, data):
@@ -1064,6 +1084,7 @@ def put_task_data(pk, data):

return annotation.data


@silk_profile(name="UPDATE task data")
@transaction.atomic
def patch_task_data(pk, data, action):
@@ -1077,12 +1098,14 @@ def patch_task_data(pk, data, action):

return annotation.data


@silk_profile(name="DELETE task data")
@transaction.atomic
def delete_task_data(pk):
annotation = TaskAnnotation(pk)
annotation.delete()


def export_task(task_id, dst_file, format_name, server_url=None, save_images=False):
# For big tasks dump function may run for a long time and
# we dont need to acquire lock after the task has been initialized from DB.
@@ -1097,6 +1120,7 @@ def export_task(task_id, dst_file, format_name, server_url=None, save_images=Fal
with open(dst_file, 'wb') as f:
task.export(f, exporter, host=server_url, save_images=save_images)


@transaction.atomic
def import_task_annotations(src_file, task_id, format_name, conv_mask_to_poly):
task = TaskAnnotation(task_id)
@@ -1108,9 +1132,10 @@ def import_task_annotations(src_file, task_id, format_name, conv_mask_to_poly):
except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex:
raise CvatImportError(str(ex))


@transaction.atomic
def import_job_annotations(src_file, job_id, format_name, conv_mask_to_poly):
job = JobAnnotation(job_id)
job = JobAnnotation(job_id, prefetch_images=True)

importer = make_importer(format_name)
with open(src_file, 'rb') as f:

0 comments on commit 7d9c368

Please sign in to comment.