-
Notifications
You must be signed in to change notification settings - Fork 415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(task-processor): Add priority support #2847
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
Uffizzi Preview |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #2847 +/- ##
==========================================
+ Coverage 95.59% 95.64% +0.04%
==========================================
Files 1009 1011 +2
Lines 28833 28979 +146
==========================================
+ Hits 27562 27716 +154
+ Misses 1271 1263 -8
☔ View full report in Codecov by Sentry. |
@@ -90,12 +101,14 @@ class Meta: | |||
def create( | |||
cls, | |||
task_identifier: str, | |||
priority: TaskPriority = TaskPriority.NORMAL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@matthewelwell think we can get rid of schedule_task
and just use this method directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, but I quite like having the helper method to differentiate them. What's the benefit in removing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name is slightly odd, no? I'd expect it to schedule the task(i.e: put on some queue?) but it only creates the class instance. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, you mean that it doesn't actually persist it to the DB? But that's the same as the create
method too, right? We could update the name to create_scheduled_task
? Maybe you're right though that we should consolidate the 2 methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean that it doesn't actually persist it to the DB?
Yes, exactly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed schedule_task method, but now I am not sure if it looks any better 😕 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, there's less code so that's always a good thing. I think I would prefer to keep the logic regarding the queue in the Task model though. Can we just handle that in the create
method now instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a couple of minor comments.
This certainly seems like a ok approach but did we consider any other approaches here? For example, having distinct queues and the ability to define workers for those queues directly?
I guess we could still define a number of workers that operate only on a given priority if we wanted to, right?
api/edge_api/identities/tasks.py
Outdated
@@ -71,7 +72,7 @@ def call_environment_webhook_for_feature_state_change( | |||
call_environment_webhooks(environment, data, event_type=event_type) | |||
|
|||
|
|||
@register_task_handler() | |||
@register_task_handler(priority=TaskPriority.HIGHEST) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be highest? I think we basically want to reserve highest for those tasks involved in creating the environment document, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think this is as import as environment document, right? We want the identity overrides to reflect as soon as possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say it's slightly lower priority than environment document changes.
@@ -90,12 +101,14 @@ class Meta: | |||
def create( | |||
cls, | |||
task_identifier: str, | |||
priority: TaskPriority = TaskPriority.NORMAL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, but I quite like having the helper method to differentiate them. What's the benefit in removing it?
api/core/migration_helpers.py
Outdated
with open(file_path) as f: | ||
return cls(f.read(), reverse_sql=reverse_sql) | ||
def from_sql_file( | ||
cls, file_path: str, reverse_sql: str = None, reverse_sql_file_path: str = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add validation here to ensure that reverse_sql
and reverse_sql_file_path
aren't used together? I guess an alternative would be to use a single argument and try to infer if it's a file path (e.g. os.file.exists()
).
api/task_processor/migrations/0011_add_priority_to_get_tasks_to_process.py
Outdated
Show resolved
Hide resolved
@@ -90,12 +101,14 @@ class Meta: | |||
def create( | |||
cls, | |||
task_identifier: str, | |||
priority: TaskPriority = TaskPriority.NORMAL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, there's less code so that's always a good thing. I think I would prefer to keep the logic regarding the queue in the Task model though. Can we just handle that in the create
method now instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few minor comments but this looks good. It relies on us clearing down the taskprocessor_task
table before we can release though.
api/task_processor/models.py
Outdated
if queue_size: | ||
if cls.is_queue_full(task_identifier, queue_size): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick:
if queue_size: | |
if cls.is_queue_full(task_identifier, queue_size): | |
if queue_size and cls.is_queue_full(task_identifier, queue_size): |
api/task_processor/models.py
Outdated
task_identifier=task_identifier, | ||
args=args, | ||
kwargs=kwargs, | ||
def is_queue_full(cls, task_identifier: str, queue_size: int) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be marked as private now, right?
def is_queue_full(cls, task_identifier: str, queue_size: int) -> bool: | |
def _is_queue_full(cls, task_identifier: str, queue_size: int) -> bool: |
Thanks for submitting a PR! Please check the boxes below:
pre-commit
to check lintingChanges
In order to allow high priority tasks to be executed first, this PR adds a priority column to the task table(with default null)
How did you test this code?
Update unit test