diff --git a/AWSIoT/Internal/AWSIoTStreamThread.h b/AWSIoT/Internal/AWSIoTStreamThread.h index fb2dc572721..97fa9ee5498 100644 --- a/AWSIoT/Internal/AWSIoTStreamThread.h +++ b/AWSIoT/Internal/AWSIoTStreamThread.h @@ -20,7 +20,7 @@ NS_ASSUME_NONNULL_BEGIN @interface AWSIoTStreamThread : NSThread -@property(strong, nullable) void (^onStop)(void); +@property(nonatomic, copy, nullable) void (^onStop)(void); -(instancetype)initWithSession:(nonnull AWSMQTTSession *)session decoderInputStream:(nonnull NSInputStream *)decoderInputStream diff --git a/AWSIoT/Internal/AWSIoTStreamThread.m b/AWSIoT/Internal/AWSIoTStreamThread.m index 2f8ca2f37e3..059c4d6b8d8 100644 --- a/AWSIoT/Internal/AWSIoTStreamThread.m +++ b/AWSIoT/Internal/AWSIoTStreamThread.m @@ -27,6 +27,8 @@ @interface AWSIoTStreamThread() @property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval; @property(nonatomic, assign) BOOL isRunning; @property(nonatomic, assign) BOOL shouldDisconnect; +@property(nonatomic, strong) dispatch_queue_t serialQueue; +@property(nonatomic, assign) BOOL didCleanUp; @end @implementation AWSIoTStreamThread @@ -40,10 +42,10 @@ - (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session outputStream:nil]; } --(instancetype)initWithSession:(nonnull AWSMQTTSession *)session - decoderInputStream:(nonnull NSInputStream *)decoderInputStream - encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream - outputStream:(nullable NSOutputStream *)outputStream; { +- (instancetype)initWithSession:(nonnull AWSMQTTSession *)session + decoderInputStream:(nonnull NSInputStream *)decoderInputStream + encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream + outputStream:(nullable NSOutputStream *)outputStream { if (self = [super init]) { _session = session; _decoderInputStream = decoderInputStream; @@ -51,23 +53,31 @@ -(instancetype)initWithSession:(nonnull AWSMQTTSession *)session _outputStream = outputStream; _defaultRunLoopTimeInterval = 10; _shouldDisconnect = NO; + _serialQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.syncQueue", DISPATCH_QUEUE_SERIAL); + _didCleanUp = NO; } return self; } - (void)main { + if (self.isRunning) { + AWSDDLogWarn(@"Attempted to start a thread that is already running: [%@]", self); + return; + } + AWSDDLogVerbose(@"Started execution of Thread: [%@]", self); //This is invoked in a new thread by the webSocketDidOpen method or by the Connect method. Get the runLoop from the thread. self.runLoopForStreamsThread = [NSRunLoop currentRunLoop]; //Setup a default timer to ensure that the RunLoop always has atleast one timer on it. This is to prevent the while loop //below to spin in tight loop when all input sources and session timers are shutdown during a reconnect sequence. + __weak typeof(self) weakSelf = self; self.defaultRunLoopTimer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:60.0] - interval:60.0 - target:self - selector:@selector(timerHandler:) - userInfo:nil - repeats:YES]; + interval:60.0 + repeats:YES + block:^(NSTimer * _Nonnull timer) { + AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", weakSelf, weakSelf.isRunning ? @"YES" : @"NO", weakSelf.isCancelled ? @"YES" : @"NO"); + }]; [self.runLoopForStreamsThread addTimer:self.defaultRunLoopTimer forMode:NSDefaultRunLoopMode]; @@ -82,7 +92,7 @@ - (void)main { [self.session connectToInputStream:self.decoderInputStream outputStream:self.encoderOutputStream]; - while (self.isRunning && !self.isCancelled) { + while ([self shouldContinueRunning]) { //This will continue run until the thread is cancelled //Run one cycle of the runloop. This will return after a input source event or timer event is processed [self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode @@ -94,60 +104,76 @@ - (void)main { AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self); } +- (BOOL)shouldContinueRunning { + __block BOOL shouldRun; + dispatch_sync(self.serialQueue, ^{ + shouldRun = self.isRunning && !self.isCancelled && self.defaultRunLoopTimer != nil; + }); + return shouldRun; +} + - (void)cancel { AWSDDLogVerbose(@"Issued Cancel on thread [%@]", (NSThread *)self); - self.isRunning = NO; - [super cancel]; + dispatch_sync(self.serialQueue, ^{ + self.isRunning = NO; + [super cancel]; + }); } - (void)cancelAndDisconnect:(BOOL)shouldDisconnect { AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", shouldDisconnect ? @"YES" : @"NO", (NSThread *)self); - self.shouldDisconnect = shouldDisconnect; - self.isRunning = NO; - [super cancel]; + dispatch_sync(self.serialQueue, ^{ + self.shouldDisconnect = shouldDisconnect; + self.isRunning = NO; + [super cancel]; + }); } - (void)cleanUp { - if (self.defaultRunLoopTimer) { - [self.defaultRunLoopTimer invalidate]; - self.defaultRunLoopTimer = nil; - } - - if (self.shouldDisconnect) { - if (self.session) { - [self.session close]; - self.session = nil; + dispatch_sync(self.serialQueue, ^{ + if (self.didCleanUp) { + AWSDDLogVerbose(@"Clean up already called for thread: [%@]", (NSThread *)self); + return; } - if (self.outputStream) { - self.outputStream.delegate = nil; - [self.outputStream close]; - [self.outputStream removeFromRunLoop:self.runLoopForStreamsThread - forMode:NSDefaultRunLoopMode]; - self.outputStream = nil; + self.didCleanUp = YES; + if (self.defaultRunLoopTimer) { + [self.defaultRunLoopTimer invalidate]; + self.defaultRunLoopTimer = nil; } - if (self.decoderInputStream) { - [self.decoderInputStream close]; - self.decoderInputStream = nil; + if (self.shouldDisconnect) { + if (self.session) { + [self.session close]; + self.session = nil; + } + + if (self.outputStream) { + self.outputStream.delegate = nil; + [self.outputStream close]; + [self.outputStream removeFromRunLoop:self.runLoopForStreamsThread + forMode:NSDefaultRunLoopMode]; + self.outputStream = nil; + } + + if (self.decoderInputStream) { + [self.decoderInputStream close]; + self.decoderInputStream = nil; + } + + if (self.encoderOutputStream) { + [self.encoderOutputStream close]; + self.encoderOutputStream = nil; + } + } else { + AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self); } - if (self.encoderOutputStream) { - [self.encoderOutputStream close]; - self.encoderOutputStream = nil; + if (self.onStop) { + self.onStop(); + self.onStop = nil; } - } else { - AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self); - } - - if (self.onStop) { - self.onStop(); - self.onStop = nil; - } -} - -- (void)timerHandler:(NSTimer*)theTimer { - AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO"); + }); } @end diff --git a/AWSIoTUnitTests/AWSIoTStreamThreadTests.m b/AWSIoTUnitTests/AWSIoTStreamThreadTests.m index 89a84ae7aed..5d12f019f42 100644 --- a/AWSIoTUnitTests/AWSIoTStreamThreadTests.m +++ b/AWSIoTUnitTests/AWSIoTStreamThreadTests.m @@ -19,6 +19,10 @@ @interface AWSIoTStreamThread() @property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval; +@property (nonatomic, assign) BOOL isRunning; +@property (nonatomic, strong) dispatch_queue_t serialQueue; +@property (nonatomic, assign) BOOL didCleanUp; +@property (nonatomic, strong, nullable) NSTimer *defaultRunLoopTimer; @end @@ -56,6 +60,7 @@ - (void)setUp { } - (void)tearDown { + [self.thread cancelAndDisconnect:YES]; self.thread = nil; self.session = nil; self.decoderInputStream = nil; @@ -67,6 +72,7 @@ - (void)tearDown { /// When: The thread is started /// Then: The output stream is opened and the session is connected to the decoder and encoder streams - (void)testStart_shouldOpenStream_andInvokeConnectOnSession { + OCMVerify([self.outputStream scheduleInRunLoop:[OCMArg any] forMode:NSDefaultRunLoopMode]); OCMVerify([self.outputStream open]); OCMVerify([self.session connectToInputStream:[OCMArg any] outputStream:[OCMArg any]]); } @@ -87,6 +93,7 @@ - (void)testCancelAndDisconnect_shouldCloseStreams_andInvokeOnStop { OCMVerify([self.encoderOutputStream close]); OCMVerify([self.outputStream close]); OCMVerify([self.session close]); + XCTAssertFalse(self.thread.isRunning); } /// Given: A running AWSIoTStreamThread @@ -108,9 +115,9 @@ - (void)testCancel_shouldNotCloseStreams_andInvokeOnStop { didInvokeDecoderInputStreamClose = YES; }]; - __block BOOL didInvokeEncoderDecoderInputStreamClose = NO; + __block BOOL didInvokeEncoderOutputStreamClose = NO; [OCMStub([self.encoderOutputStream close]) andDo:^(NSInvocation *invocation) { - didInvokeEncoderDecoderInputStreamClose = YES; + didInvokeEncoderOutputStreamClose = YES; }]; __block BOOL didInvokeOutputStreamClose = NO; @@ -121,10 +128,41 @@ - (void)testCancel_shouldNotCloseStreams_andInvokeOnStop { [self.thread cancelAndDisconnect:NO]; [self waitForExpectations:@[stopExpectation] timeout:1]; - XCTAssertFalse(didInvokeSessionClose); - XCTAssertFalse(didInvokeDecoderInputStreamClose); - XCTAssertFalse(didInvokeEncoderDecoderInputStreamClose); - XCTAssertFalse(didInvokeOutputStreamClose); + XCTAssertFalse(didInvokeSessionClose, @"The `close` method on `session` should not be invoked"); + XCTAssertFalse(didInvokeDecoderInputStreamClose, @"The `close` method on `decoderInputStream` should not be invoked"); + XCTAssertFalse(didInvokeEncoderOutputStreamClose, @"The `close` method on `encoderOutputStream` should not be invoked"); + XCTAssertFalse(didInvokeOutputStreamClose, @"The `close` method on `outputStream` should not be invoked"); +} + +- (void)testCancelAndDisconnect_shouldSetDidCleanUp_andInvalidateTimer { + XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"]; + self.thread.onStop = ^{ + [stopExpectation fulfill]; + }; + + [self.thread cancelAndDisconnect:YES]; + + [self waitForExpectations:@[stopExpectation] timeout:1]; + XCTAssertTrue(self.thread.didCleanUp, @"didCleanUp should be YES after cleanup"); + XCTAssertNil(self.thread.defaultRunLoopTimer, @"defaultRunLoopTimer should be nil after invalidation"); +} + +- (void)testCancelAndDisconnect_shouldSynchronizeOnCleanupQueue { + XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"]; + self.thread.onStop = ^{ + [stopExpectation fulfill]; + }; + + [self.thread cancelAndDisconnect:YES]; + + // Validate synchronization + __block BOOL didSynchronize = NO; + dispatch_sync(self.thread.serialQueue, ^{ + didSynchronize = YES; + }); + + XCTAssertTrue(didSynchronize, @"The cleanupQueue should synchronize the operations"); + [self waitForExpectations:@[stopExpectation] timeout:1]; } @end diff --git a/CHANGELOG.md b/CHANGELOG.md index 85a05f90afb..67d609d59ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,9 @@ ### Bug Fixes - **AWSIoT** - - Fixing a race condition when invalidating/creating the reconnect timer (#5454) - - Fixing a potential race condition in the timer ring queue (#5461) + - Fixing race conditions during cleanup in `AWSIoTStreamThread` (#5477) + - Fixing a race condition when invalidating/creating the reconnect timer in `AWSIoTMQTTClient` (#5454) + - Fixing a potential race condition in the timer ring queue in `AWSMQTTSession` (#5461) ## 2.38.0