-
Notifications
You must be signed in to change notification settings - Fork 674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flyteadmin digest comparison should rely on database semantics #6058
Flyteadmin digest comparison should rely on database semantics #6058
Conversation
Signed-off-by: Alex Wu <[email protected]>
…vent TaskManager CreateTask method Task not found isue Signed-off-by: Alex Wu <[email protected]>
Signed-off-by: Alex Wu <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6058 +/- ##
=======================================
Coverage 37.10% 37.11%
=======================================
Files 1318 1318
Lines 132326 132337 +11
=======================================
+ Hits 49099 49112 +13
+ Misses 78955 78952 -3
- Partials 4272 4273 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Signed-off-by: Alex Wu <[email protected]>
e717145
to
28481a0
Compare
@@ -30,12 +30,12 @@ func (r *TaskRepo) Create(ctx context.Context, input models.Task, descriptionEnt | |||
} | |||
return nil | |||
} | |||
tx := r.db.WithContext(ctx).Omit("id").Create(descriptionEntity) | |||
tx := r.db.WithContext(ctx).Omit("id").Create(&input) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change is technically not necessary since this is all wrapped in a transaction, and if any insert fails then the whole transaction should be rolled back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! thank you so much for taking on these changes
for the testing:
Then, we make a shell script to register 2 groups of tasks with same ID but different digest at the same time.
did you modify the task definition to force a different task digest? I didn't quite follow from the description
/review |
Code Review Agent Run #ec8f77Actionable Suggestions - 2
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
// See if an identical task already exists by checking the error code | ||
flyteErr, ok := err.(errors.FlyteAdminError) | ||
if !ok || flyteErr.Code() != codes.AlreadyExists { | ||
logger.Errorf(ctx, "Failed to create task model with id [%+v] with err %v", request.GetId(), err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using logger.Debugf
instead of logger.Errorf
for non-critical database errors. The error is already being returned to the caller.
Code suggestion
Check the AI-generated fix before applying
logger.Errorf(ctx, "Failed to create task model with id [%+v] with err %v", request.GetId(), err) | |
logger.Debugf(ctx, "Failed to create task model with id [%+v] with err %v", request.GetId(), err) |
Code Review Run #ec8f77
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
}, nil | ||
}) | ||
mockRepository.TaskRepo().(*repositoryMocks.MockTaskRepo).SetCreateCallback(func(input models.Task, descriptionEntity *models.DescriptionEntity) error { | ||
return adminErrors.NewFlyteAdminErrorf(codes.AlreadyExists, "task already exists") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider making the error message more descriptive by including task identifier details in adminErrors.NewFlyteAdminErrorf()
call
Code suggestion
Check the AI-generated fix before applying
return adminErrors.NewFlyteAdminErrorf(codes.AlreadyExists, "task already exists") | |
return adminErrors.NewFlyteAdminErrorf(codes.AlreadyExists, "task %v already exists", input.TaskKey) |
Code Review Run #ec8f77
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Tracking issue
Closes #4780
Why are the changes needed?
In current TaskManager CreateTask code, FlyteAdmin checks if a task with the same ID already exists in the database. If it does, FlyteAdmin verifies whether the registered task has a different digest compared to the existing task. If no task with the same ID is found in the database, FlyteAdmin proceeds to create the task in the database.
However, the current approach may lead to a race condition that prevents the digest comparison for two identical tasks from occurring. For example, consider two identical tasks (tasks with the same ID and digest), A and B, being registered to FlyteAdmin simultaneously. It is likely that the digest check will be skipped because the existing task is not yet present in the database. Consequently, one task will be created in the database, and the other will fail due to a primary key conflict. (Refer to the diagram below for a better understanding.)
What changes were proposed in this pull request?
1.Do digest check in a transactional way:
The procedure of creating task should be 1. create task -> 2. if task id exists already(pramary key conflict) -> 3. do digest check. The pseudocode could look like
In this way we can make sure that task digest will be checked even though 2 identical task registered at the same time frame. Refer to the diagram below for a better understanding.
2.Write Task to DB before write Description in TaskRepo Create method:
In current TaskRepo Create method, task description is created before task. However, if TaskManger catches primary key conflict error from task description creation and try to get existing task in DB for digest check, a task not found error could possibly occurred as task is not yet created in DB, which does not make sense for user. In this PR it is proposed to write Task to DB before write Description in TaskRepo Create method.
How was this patch tested?
Set up a simple workflow with 2 tasks
Write a shell script to request task registration 10 times at the same time to simulate hi concurrency situation. It is expected that each task will be registered successfully once only, otherwise the response message should shown AlreadyExists.
The result show each task only registered once as we expected
Then, we make a shell script to register 2 groups of tasks with same ID but different digest at the same time. It is expected that TaskExistsDifferentStructureError will shown in the response
The error shown as expected
Check all the applicable boxes
Summary by Bito
This PR implements a transactional approach to resolve race conditions in FlyteAdmin's task registration process. The changes modify task creation workflow by handling primary key conflicts and performing digest checks. The implementation reorders task and description creation sequence in the database for consistent error handling.Unit tests added: True
Estimated effort to review (1-5, lower is better): 1