Skip to content

Commit

Permalink
fix(IoT): Fixing race conditions during StreamThread cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ruisebas committed Dec 13, 2024
1 parent 9a39679 commit 580e1e6
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 220 deletions.
222 changes: 81 additions & 141 deletions AWSIoT/Internal/AWSIoTStreamThread.m
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,13 @@ @interface AWSIoTStreamThread()
@property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval;
@property(nonatomic, assign) BOOL isRunning;
@property(nonatomic, assign) BOOL shouldDisconnect;

// Add synchronization primitives
@property(nonatomic, strong) dispatch_queue_t cleanupQueue;
@property(nonatomic, strong) dispatch_semaphore_t cleanupSemaphore;
@property(nonatomic, assign) BOOL isCleaningUp;
@property(nonatomic, strong) dispatch_queue_t serialQueue;
@property(nonatomic, assign) BOOL didCleanUp;
@end

@implementation AWSIoTStreamThread

- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream {
return [self initWithSession:session
Expand All @@ -56,184 +53,127 @@ - (instancetype)initWithSession:(nonnull AWSMQTTSession *)session
_outputStream = outputStream;
_defaultRunLoopTimeInterval = 10;
_shouldDisconnect = NO;
_isCleaningUp = NO;

// Initialize synchronization primitives
_cleanupQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.cleanup", DISPATCH_QUEUE_SERIAL);
_cleanupSemaphore = dispatch_semaphore_create(1);
_serialQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.syncQueue", DISPATCH_QUEUE_SERIAL);
_didCleanUp = NO;
}
return self;
}

- (void)main {
@autoreleasepool {
AWSDDLogVerbose(@"Started execution of Thread: [%@]", self);

if (![self setupRunLoop]) {
AWSDDLogError(@"Failed to setup run loop for thread: [%@]", self);
return;
}

[self startIOOperations];

while ([self shouldContinueRunning]) {
@autoreleasepool {
[self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode
beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]];
}
}

[self performCleanup];

AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self);
}
}

- (BOOL)setupRunLoop {
if (self.isRunning) {
AWSDDLogError(@"Thread already running");
return NO;
AWSDDLogWarn(@"Attempted to start a thread that is already running: [%@]", self);
return;
}


AWSDDLogVerbose(@"Started execution of Thread: [%@]", self);
//This is invoked in a new thread by the webSocketDidOpen method or by the Connect method. Get the runLoop from the thread.
self.runLoopForStreamsThread = [NSRunLoop currentRunLoop];

// Setup timer with weak reference to prevent retain cycles

//Setup a default timer to ensure that the RunLoop always has atleast one timer on it. This is to prevent the while loop
//below to spin in tight loop when all input sources and session timers are shutdown during a reconnect sequence.
__weak typeof(self) weakSelf = self;
self.defaultRunLoopTimer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:60.0]
interval:60.0
target:weakSelf
selector:@selector(timerHandler:)
userInfo:nil
repeats:YES];

if (!self.defaultRunLoopTimer) {
AWSDDLogError(@"Failed to create run loop timer");
return NO;
}
repeats:YES
block:^(NSTimer * _Nonnull timer) {
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", weakSelf, weakSelf.isRunning ? @"YES" : @"NO", weakSelf.isCancelled ? @"YES" : @"NO");
}];
[self.runLoopForStreamsThread addTimer:self.defaultRunLoopTimer
forMode:NSDefaultRunLoopMode];
self.isRunning = YES;
return YES;
}

- (void)startIOOperations {
self.isRunning = YES;
if (self.outputStream) {
[self.outputStream scheduleInRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
forMode:NSDefaultRunLoopMode];
[self.outputStream open];
}

//Update the runLoop and runLoopMode in session.
[self.session connectToInputStream:self.decoderInputStream
outputStream:self.encoderOutputStream];

while ([self shouldContinueRunning]) {
//This will continue run until the thread is cancelled
//Run one cycle of the runloop. This will return after a input source event or timer event is processed
[self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode
beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]];
}

[self cleanUp];

AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self);
}

- (BOOL)shouldContinueRunning {
__block BOOL shouldRun;
dispatch_sync(self.cleanupQueue, ^{
dispatch_sync(self.serialQueue, ^{
shouldRun = self.isRunning && !self.isCancelled && self.defaultRunLoopTimer != nil;
});
return shouldRun;
}

- (void)invalidateTimer {
dispatch_sync(self.cleanupQueue, ^{
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}
});
}

- (void)cancel {
AWSDDLogVerbose(@"Issued Cancel on thread [%@]", (NSThread *)self);
[self cancelWithDisconnect:NO];
dispatch_sync(self.serialQueue, ^{
self.isRunning = NO;
[super cancel];
});
}

- (void)cancelAndDisconnect:(BOOL)shouldDisconnect {
AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]",
shouldDisconnect ? @"YES" : @"NO", (NSThread *)self);
[self cancelWithDisconnect:shouldDisconnect];
AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", shouldDisconnect ? @"YES" : @"NO", (NSThread *)self);
dispatch_sync(self.serialQueue, ^{
self.shouldDisconnect = shouldDisconnect;
self.isRunning = NO;
[super cancel];
});
}

- (void)cancelWithDisconnect:(BOOL)shouldDisconnect {
// Ensure thread-safe property updates
dispatch_sync(self.cleanupQueue, ^{
if (!self.isCleaningUp) {
self.shouldDisconnect = shouldDisconnect;
self.isRunning = NO;
[super cancel];

// Invalidate timer to trigger run loop exit
[self invalidateTimer];
- (void)cleanUp {
dispatch_sync(self.serialQueue, ^{
if (self.didCleanUp) {
AWSDDLogVerbose(@"Clean up already called for thread: [%@]", (NSThread *)self);
return;
}
});
}

- (void)performCleanup {
dispatch_semaphore_wait(self.cleanupSemaphore, DISPATCH_TIME_FOREVER);

if (self.isCleaningUp) {
dispatch_semaphore_signal(self.cleanupSemaphore);
return;
}

self.isCleaningUp = YES;
dispatch_semaphore_signal(self.cleanupSemaphore);

dispatch_sync(self.cleanupQueue, ^{
[self cleanupResources];
});
}
self.didCleanUp = YES;
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}

- (void)cleanupResources {
if (self.shouldDisconnect) {
[self closeSession];
[self closeStreams];
} else {
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
}

// Handle onStop callback
dispatch_block_t stopBlock = self.onStop;
if (stopBlock) {
self.onStop = nil;
stopBlock();
}
}
if (self.shouldDisconnect) {
if (self.session) {
[self.session close];
self.session = nil;
}

- (void)closeSession {
if (self.session) {
[self.session close];
self.session = nil;
}
}
if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream close];
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
self.outputStream = nil;
}

- (void)closeStreams {
if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream close];
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
self.outputStream = nil;
}

if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
}

if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
}
}
if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
}

- (void)timerHandler:(NSTimer*)theTimer {
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@",
self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO");
}
if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
}
} else {
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
}

- (void)dealloc {
AWSDDLogVerbose(@"Deallocating AWSIoTStreamThread: [%@]", self);
if (self.onStop) {
self.onStop();
self.onStop = nil;
}
});
}

@end
Loading

0 comments on commit 580e1e6

Please sign in to comment.