Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Brennan Stehling committed Jun 4, 2022
1 parent 9205058 commit dd8f5ef
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 100 deletions.
1 change: 0 additions & 1 deletion AWSS3/AWSS3TransferUtility.m
Original file line number Diff line number Diff line change
Expand Up @@ -2379,7 +2379,6 @@ - (void)completeTask:(AWSS3TransferUtilityTask *)task removeCompletedTask:(BOOL)
[self.completedTaskDictionary removeObjectForKey:task.transferID];
[self unregisterTaskIdentifier:task.taskIdentifier];
}

}

- (void)cleanupForMultiPartUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)task {
Expand Down
152 changes: 57 additions & 95 deletions AWSS3/AWSS3TransferUtilityTasks.m
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,15 @@ - (instancetype)init {
_waitingPartsDictionary = [NSMutableDictionary new];
_inProgressPartsDictionary = [NSMutableDictionary new];
_completedPartsSet = [NSMutableSet new];
_serialQueue = dispatch_queue_create("com.amazonaws.AWSS3.MultipartUploadTask", DISPATCH_QUEUE_SERIAL);
}
return self;
}

- (BOOL)isUnderConcurrencyLimit {
return self.inProgressPartsDictionary.count < [self.transferUtility.transferUtilityConfiguration.multiPartConcurrencyLimit integerValue];
NSUInteger dynamicLimit = NSProcessInfo.processInfo.activeProcessorCount * 2;
NSUInteger configuredLimit = self.transferUtility.transferUtilityConfiguration.multiPartConcurrencyLimit.integerValue;
return self.inProgressPartsDictionary.count < MAX(dynamicLimit, configuredLimit);
}

- (BOOL)hasWaitingTasks {
Expand Down Expand Up @@ -167,13 +170,11 @@ - (AWSS3TransferUtilityMultiPartUploadExpression *)expression {
- (void)cancel {
self.cancelled = YES;
self.status = AWSS3TransferUtilityTransferStatusCancelled;
for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) {
AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key];
for (AWSS3TransferUtilityUploadSubTask *subTask in self.inProgressTasks) {
[subTask.sessionTask cancel];
}

for (NSNumber *key in [self.waitingPartsDictionary allKeys]) {
AWSS3TransferUtilityUploadSubTask *subTask = [self.waitingPartsDictionary objectForKey:key];
for (AWSS3TransferUtilityUploadSubTask *subTask in self.waitingTasks) {
[subTask.sessionTask cancel];
}

Expand All @@ -190,37 +191,13 @@ - (void)resume {
}

NSCAssert(self.transferUtility != nil, @"Transfer Utility must be provided.");

// for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) {
// AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key];
// subTask.status = AWSS3TransferUtilityTransferStatusInProgress;
// [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID
// partNumber:subTask.partNumber
// taskIdentifier:subTask.taskIdentifier
// eTag:subTask.eTag
// status:subTask.status
// retry_count:self.retryCount
// databaseQueue:self.databaseQueue];
// [subTask.sessionTask resume];
// }
//
// self.status = AWSS3TransferUtilityTransferStatusInProgress;
// //Update the Master Record
// [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID
// partNumber:@0
// taskIdentifier:0
// eTag:@""
// status:self.status
// retry_count:self.retryCount
// databaseQueue:self.databaseQueue];

// Change status from paused to waiting
for (AWSS3TransferUtilityUploadSubTask * nextSubTask in self.waitingTasks) {
nextSubTask.status = AWSS3TransferUtilityTransferStatusWaiting;
}

[self moveWaitingTasksToInProgress];
[self completeIfDone];
[self moveWaitingTasksToInProgress:YES];
}

- (void)suspend {
Expand All @@ -231,30 +208,6 @@ - (void)suspend {

NSCAssert(self.transferUtility != nil, @"Transfer Utility must be provided.");

// for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) {
// // all in progress tasks should be cancelled and a new subtask should replace it which is
// // put in the waiting dictionary and set with that status with a URLSessionTask which
// // has not been started.
//
// // then resuming should start uploading a number of parts up to the concurrency limit.
//
// AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key];
// if (!subTask) {
// continue;
// }
// [subTask.sessionTask suspend];
// subTask.status = AWSS3TransferUtilityTransferStatusPaused;
//
// [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID
// partNumber:subTask.partNumber
// taskIdentifier:subTask.taskIdentifier
// eTag:subTask.eTag
// status:subTask.status
// retry_count:self.retryCount
// databaseQueue:self.databaseQueue];
// }
//

// Cancel session task for all subtasks which are in progress and set status to paused
for (AWSS3TransferUtilityUploadSubTask *inProgressSubTask in self.inProgressTasks) {
// Note: This can happen due to lack of thread-safety
Expand All @@ -280,8 +233,7 @@ - (void)suspend {

NSError *error = [self.transferUtility createUploadSubTask:self
subTask:subTask
startTransfer:NO
internalDictionaryToAddSubTaskTo:self.waitingPartsDictionary];
startTransfer:NO];

if (error) {
AWSDDLogError(@"Error creating AWSS3TransferUtilityUploadSubTask [%@]", error);
Expand All @@ -294,7 +246,7 @@ - (void)suspend {
[AWSS3TransferUtilityDatabaseHelper insertMultiPartUploadRequestSubTaskInDB:self subTask:subTask databaseQueue:self.databaseQueue];
}
}

self.status = AWSS3TransferUtilityTransferStatusPaused;
//Update the Master Record
[AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID
Expand All @@ -313,13 +265,14 @@ - (void)addUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask {
self.waitingPartsDictionary[@(subTask.taskIdentifier)] = subTask;
} else if (subTask.status == AWSS3TransferUtilityTransferStatusInProgress) {
self.inProgressPartsDictionary[@(subTask.taskIdentifier)] = subTask;

} else if (subTask.status == AWSS3TransferUtilityTransferStatusCompleted) {
[self.completedPartsSet addObject:subTask];
} else {
AWSDDLogDebug(@"Sub Task status not supported: %lu", subTask.status);
NSCAssert(NO, @"Status not supported");
}

[self completeIfDone];
}

- (void)removeWaitingUploadSubTask:(NSUInteger)taskIdentifier {
Expand All @@ -344,13 +297,15 @@ - (void)moveWaitingTaskToInProgress:(AWSS3TransferUtilityUploadSubTask *)subTask

- (void)moveWaitingTaskToInProgress:(AWSS3TransferUtilityUploadSubTask *)subTask startTransfer:(BOOL)startTransfer {
if ([self.waitingTasks containsObject:subTask]) {
//Add to inProgress list
// Add to inProgress list
self.inProgressPartsDictionary[@(subTask.taskIdentifier)] = subTask;
//Remove it from the waitingList
// Remove it from the waitingList
self.waitingPartsDictionary[@(subTask.taskIdentifier)] = nil;
AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(subTask.taskIdentifier), self.uploadID);

if (startTransfer) {
AWSDDLogDebug(@"Starting subTask %@", @(subTask.taskIdentifier));
NSCAssert(subTask.sessionTask.state == NSURLSessionTaskStateSuspended, @"State should be suspended before resuming.");
[subTask.sessionTask resume];
}
}
Expand Down Expand Up @@ -381,20 +336,24 @@ - (void)moveWaitingTasksToInProgress:(BOOL)startTransfer {
// move parts from waiting to in progress if under the concurrency limit
while (self.isUnderConcurrencyLimit && self.hasWaitingTasks) {
//Get a part from the waitingList
AWSS3TransferUtilityUploadSubTask *nextSubTask = [[self.waitingPartsDictionary allValues] objectAtIndex:0];
AWSS3TransferUtilityUploadSubTask *nextSubTask = [self.waitingTasks objectAtIndex:0];

//Add to inProgress list
self.inProgressPartsDictionary[@(nextSubTask.taskIdentifier)] = nextSubTask;
nextSubTask.status = AWSS3TransferUtilityTransferStatusInProgress;

//Remove it from the waitingList
[self.waitingPartsDictionary removeObjectForKey:@(nextSubTask.taskIdentifier)];
self.waitingPartsDictionary[@(nextSubTask.taskIdentifier)] = nil;
AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(nextSubTask.taskIdentifier), self.uploadID);

if (startTransfer) {
AWSDDLogDebug(@"Starting subTask %@", @(nextSubTask.taskIdentifier));
NSCAssert(nextSubTask.sessionTask.state == NSURLSessionTaskStateSuspended, @"State should be suspended before resuming.");
[nextSubTask.sessionTask resume];
}
nextSubTask.status = AWSS3TransferUtilityTransferStatusInProgress;
}

[self completeIfDone];
}

- (void)completeUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask
Expand All @@ -421,45 +380,49 @@ - (void)completeUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask
}

- (void)completeIfDone {
// Complete multipart upload if in progress and waiting tasks are done
if (!self.isDone) {
return;
}
dispatch_async(self.serialQueue, ^{
// Complete multipart upload if in progress and waiting tasks are done
if (!self.isDone && self.status != AWSS3TransferUtilityTransferStatusCompleted) {
return;
}

//If there are no more inProgress parts, then we are done.
//If there are no more inProgress parts, then we are done.

//Validate that all the content has been uploaded.
int64_t totalBytesSent = 0;
for (AWSS3TransferUtilityUploadSubTask *aSubTask in self.completedPartsSet) {
totalBytesSent += aSubTask.totalBytesExpectedToSend;
}
//Validate that all the content has been uploaded.
int64_t totalBytesSent = 0;
for (AWSS3TransferUtilityUploadSubTask *aSubTask in self.completedTasks) {
totalBytesSent += aSubTask.totalBytesExpectedToSend;
}

if (totalBytesSent != self.contentLength.longLongValue ) {
NSString *errorMessage = [NSString stringWithFormat:@"Expected to send [%@], but sent [%@] and there are no remaining parts. Failing transfer ",
self.contentLength, @(totalBytesSent)];
AWSDDLogDebug(@"%@", errorMessage);
NSDictionary *userInfo = [NSDictionary dictionaryWithObject:errorMessage
forKey:@"Message"];
if (totalBytesSent != self.contentLength.longLongValue ) {
NSString *errorMessage = [NSString stringWithFormat:@"Expected to send [%@], but sent [%@] and there are no remaining parts. Failing transfer ",
self.contentLength, @(totalBytesSent)];
AWSDDLogDebug(@"%@", errorMessage);
NSDictionary *userInfo = [NSDictionary dictionaryWithObject:errorMessage
forKey:@"Message"];

self.error = [NSError errorWithDomain:AWSS3TransferUtilityErrorDomain
code:AWSS3TransferUtilityErrorClientError
userInfo:userInfo];
self.error = [NSError errorWithDomain:AWSS3TransferUtilityErrorDomain
code:AWSS3TransferUtilityErrorClientError
userInfo:userInfo];

//Execute call back if provided.
[self.transferUtility completeTask:self];
//Execute call back if provided.
[self.transferUtility completeTask:self];

//Abort the request, so the server can clean up any partials.
[self.transferUtility callAbortMultiPartForUploadTask:self];
//Abort the request, so the server can clean up any partials.
[self.transferUtility callAbortMultiPartForUploadTask:self];

//clean up.
[self.transferUtility cleanupForMultiPartUploadTask:self];
return;
}
//clean up.
[self.transferUtility cleanupForMultiPartUploadTask:self];
return;
}

AWSDDLogDebug(@"There are %lu waiting upload parts.", (unsigned long)self.waitingTasks.count);
AWSDDLogDebug(@"There are %lu in progress upload parts.", (unsigned long)self.inProgressTasks.count);
AWSDDLogDebug(@"There are %lu completed upload parts.", (unsigned long)self.completedTasks.count);
[self.transferUtility completeMultiPartForUploadTask:self];
self.status = AWSS3TransferUtilityTransferStatusCompleted;
});

AWSDDLogDebug(@"There are %lu waiting upload parts.", (unsigned long)self.waitingPartsDictionary.count);
AWSDDLogDebug(@"There are %lu in progress upload parts.", (unsigned long)self.inProgressPartsDictionary.count);
AWSDDLogDebug(@"There are %lu completed upload parts.", (unsigned long)self.completedPartsSet.count);
[self.transferUtility completeMultiPartForUploadTask:self];
}

- (void)setCompletionHandler:(AWSS3TransferUtilityMultiPartUploadCompletionHandlerBlock)completionHandler {
Expand Down Expand Up @@ -498,7 +461,6 @@ - (void)cancel {
}

- (void)setCompletionHandler:(AWSS3TransferUtilityDownloadCompletionHandlerBlock)completionHandler {

self.expression.completionHandler = completionHandler;
//If the task has already completed successfully
//Or the task has completed with error, complete the task
Expand Down
7 changes: 3 additions & 4 deletions AWSS3/AWSS3TransferUtility_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@
@interface AWSS3TransferUtility ()

- (NSError *)createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *)transferUtilityMultiPartUploadTask
subTask:(AWSS3TransferUtilityUploadSubTask *)subTask
internalDictionaryToAddSubTaskTo:(NSMutableDictionary *)internalDictionaryToAddSubTaskTo;
subTask:(AWSS3TransferUtilityUploadSubTask *)subTask;

- (NSError *)createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *)transferUtilityMultiPartUploadTask
subTask:(AWSS3TransferUtilityUploadSubTask *)subTask
startTransfer:(BOOL)startTransfer
internalDictionaryToAddSubTaskTo:(NSMutableDictionary *)internalDictionaryToAddSubTaskTo;
startTransfer:(BOOL)startTransfer;

- (void)completeTask:(AWSS3TransferUtilityTask *)task;
- (AWSTask *)callAbortMultiPartForUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)uploadTask;
Expand Down Expand Up @@ -81,6 +79,7 @@ internalDictionaryToAddSubTaskTo:(NSMutableDictionary *)internalDictionaryToAddS
@property (strong, nonatomic) NSMutableDictionary <NSNumber *, AWSS3TransferUtilityUploadSubTask *> *waitingPartsDictionary;
@property (strong, nonatomic) NSMutableDictionary <NSNumber *, AWSS3TransferUtilityUploadSubTask *> *inProgressPartsDictionary;
@property (strong, nonatomic) NSMutableSet <AWSS3TransferUtilityUploadSubTask *> *completedPartsSet;
@property (strong, nonatomic) dispatch_queue_t serialQueue;
@property int partNumber;
@property NSNumber *contentLength;

Expand Down

0 comments on commit dd8f5ef

Please sign in to comment.