Skip to content

Commit

Permalink
fix(IoT): Fixing race conditions during cleanup in `AWSIoTStreamThrea…
Browse files Browse the repository at this point in the history
…d` (#5477)

---------

Co-authored-by: Andrei Konovalov <[email protected]>
  • Loading branch information
ruisebas and Andrei Konovalov authored Dec 16, 2024
1 parent 80711f4 commit 06ab635
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 57 deletions.
2 changes: 1 addition & 1 deletion AWSIoT/Internal/AWSIoTStreamThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ NS_ASSUME_NONNULL_BEGIN

@interface AWSIoTStreamThread : NSThread

@property(strong, nullable) void (^onStop)(void);
@property(nonatomic, copy, nullable) void (^onStop)(void);

-(instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
Expand Down
122 changes: 74 additions & 48 deletions AWSIoT/Internal/AWSIoTStreamThread.m
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ @interface AWSIoTStreamThread()
@property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval;
@property(nonatomic, assign) BOOL isRunning;
@property(nonatomic, assign) BOOL shouldDisconnect;
@property(nonatomic, strong) dispatch_queue_t serialQueue;
@property(nonatomic, assign) BOOL didCleanUp;
@end

@implementation AWSIoTStreamThread
Expand All @@ -40,34 +42,42 @@ - (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
outputStream:nil];
}

-(instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream
outputStream:(nullable NSOutputStream *)outputStream; {
- (instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream
outputStream:(nullable NSOutputStream *)outputStream {
if (self = [super init]) {
_session = session;
_decoderInputStream = decoderInputStream;
_encoderOutputStream = encoderOutputStream;
_outputStream = outputStream;
_defaultRunLoopTimeInterval = 10;
_shouldDisconnect = NO;
_serialQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.syncQueue", DISPATCH_QUEUE_SERIAL);
_didCleanUp = NO;
}
return self;
}

- (void)main {
if (self.isRunning) {
AWSDDLogWarn(@"Attempted to start a thread that is already running: [%@]", self);
return;
}

AWSDDLogVerbose(@"Started execution of Thread: [%@]", self);
//This is invoked in a new thread by the webSocketDidOpen method or by the Connect method. Get the runLoop from the thread.
self.runLoopForStreamsThread = [NSRunLoop currentRunLoop];

//Setup a default timer to ensure that the RunLoop always has atleast one timer on it. This is to prevent the while loop
//below to spin in tight loop when all input sources and session timers are shutdown during a reconnect sequence.
__weak typeof(self) weakSelf = self;
self.defaultRunLoopTimer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:60.0]
interval:60.0
target:self
selector:@selector(timerHandler:)
userInfo:nil
repeats:YES];
interval:60.0
repeats:YES
block:^(NSTimer * _Nonnull timer) {
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", weakSelf, weakSelf.isRunning ? @"YES" : @"NO", weakSelf.isCancelled ? @"YES" : @"NO");
}];
[self.runLoopForStreamsThread addTimer:self.defaultRunLoopTimer
forMode:NSDefaultRunLoopMode];

Expand All @@ -82,7 +92,7 @@ - (void)main {
[self.session connectToInputStream:self.decoderInputStream
outputStream:self.encoderOutputStream];

while (self.isRunning && !self.isCancelled) {
while ([self shouldContinueRunning]) {
//This will continue run until the thread is cancelled
//Run one cycle of the runloop. This will return after a input source event or timer event is processed
[self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode
Expand All @@ -94,60 +104,76 @@ - (void)main {
AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self);
}

- (BOOL)shouldContinueRunning {
__block BOOL shouldRun;
dispatch_sync(self.serialQueue, ^{
shouldRun = self.isRunning && !self.isCancelled && self.defaultRunLoopTimer != nil;
});
return shouldRun;
}

- (void)cancel {
AWSDDLogVerbose(@"Issued Cancel on thread [%@]", (NSThread *)self);
self.isRunning = NO;
[super cancel];
dispatch_sync(self.serialQueue, ^{
self.isRunning = NO;
[super cancel];
});
}

- (void)cancelAndDisconnect:(BOOL)shouldDisconnect {
AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", shouldDisconnect ? @"YES" : @"NO", (NSThread *)self);
self.shouldDisconnect = shouldDisconnect;
self.isRunning = NO;
[super cancel];
dispatch_sync(self.serialQueue, ^{
self.shouldDisconnect = shouldDisconnect;
self.isRunning = NO;
[super cancel];
});
}

- (void)cleanUp {
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}

if (self.shouldDisconnect) {
if (self.session) {
[self.session close];
self.session = nil;
dispatch_sync(self.serialQueue, ^{
if (self.didCleanUp) {
AWSDDLogVerbose(@"Clean up already called for thread: [%@]", (NSThread *)self);
return;
}

if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream close];
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
self.outputStream = nil;
self.didCleanUp = YES;
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}

if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
if (self.shouldDisconnect) {
if (self.session) {
[self.session close];
self.session = nil;
}

if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream close];
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
self.outputStream = nil;
}

if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
}

if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
}
} else {
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
}

if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
if (self.onStop) {
self.onStop();
self.onStop = nil;
}
} else {
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
}

if (self.onStop) {
self.onStop();
self.onStop = nil;
}
}

- (void)timerHandler:(NSTimer*)theTimer {
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO");
});
}

@end
50 changes: 44 additions & 6 deletions AWSIoTUnitTests/AWSIoTStreamThreadTests.m
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

@interface AWSIoTStreamThread()
@property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval;
@property (nonatomic, assign) BOOL isRunning;
@property (nonatomic, strong) dispatch_queue_t serialQueue;
@property (nonatomic, assign) BOOL didCleanUp;
@property (nonatomic, strong, nullable) NSTimer *defaultRunLoopTimer;
@end


Expand Down Expand Up @@ -56,6 +60,7 @@ - (void)setUp {
}

- (void)tearDown {
[self.thread cancelAndDisconnect:YES];
self.thread = nil;
self.session = nil;
self.decoderInputStream = nil;
Expand All @@ -67,6 +72,7 @@ - (void)tearDown {
/// When: The thread is started
/// Then: The output stream is opened and the session is connected to the decoder and encoder streams
- (void)testStart_shouldOpenStream_andInvokeConnectOnSession {
OCMVerify([self.outputStream scheduleInRunLoop:[OCMArg any] forMode:NSDefaultRunLoopMode]);
OCMVerify([self.outputStream open]);
OCMVerify([self.session connectToInputStream:[OCMArg any] outputStream:[OCMArg any]]);
}
Expand All @@ -87,6 +93,7 @@ - (void)testCancelAndDisconnect_shouldCloseStreams_andInvokeOnStop {
OCMVerify([self.encoderOutputStream close]);
OCMVerify([self.outputStream close]);
OCMVerify([self.session close]);
XCTAssertFalse(self.thread.isRunning);
}

/// Given: A running AWSIoTStreamThread
Expand All @@ -108,9 +115,9 @@ - (void)testCancel_shouldNotCloseStreams_andInvokeOnStop {
didInvokeDecoderInputStreamClose = YES;
}];

__block BOOL didInvokeEncoderDecoderInputStreamClose = NO;
__block BOOL didInvokeEncoderOutputStreamClose = NO;
[OCMStub([self.encoderOutputStream close]) andDo:^(NSInvocation *invocation) {
didInvokeEncoderDecoderInputStreamClose = YES;
didInvokeEncoderOutputStreamClose = YES;
}];

__block BOOL didInvokeOutputStreamClose = NO;
Expand All @@ -121,10 +128,41 @@ - (void)testCancel_shouldNotCloseStreams_andInvokeOnStop {
[self.thread cancelAndDisconnect:NO];
[self waitForExpectations:@[stopExpectation] timeout:1];

XCTAssertFalse(didInvokeSessionClose);
XCTAssertFalse(didInvokeDecoderInputStreamClose);
XCTAssertFalse(didInvokeEncoderDecoderInputStreamClose);
XCTAssertFalse(didInvokeOutputStreamClose);
XCTAssertFalse(didInvokeSessionClose, @"The `close` method on `session` should not be invoked");
XCTAssertFalse(didInvokeDecoderInputStreamClose, @"The `close` method on `decoderInputStream` should not be invoked");
XCTAssertFalse(didInvokeEncoderOutputStreamClose, @"The `close` method on `encoderOutputStream` should not be invoked");
XCTAssertFalse(didInvokeOutputStreamClose, @"The `close` method on `outputStream` should not be invoked");
}

- (void)testCancelAndDisconnect_shouldSetDidCleanUp_andInvalidateTimer {
XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"];
self.thread.onStop = ^{
[stopExpectation fulfill];
};

[self.thread cancelAndDisconnect:YES];

[self waitForExpectations:@[stopExpectation] timeout:1];
XCTAssertTrue(self.thread.didCleanUp, @"didCleanUp should be YES after cleanup");
XCTAssertNil(self.thread.defaultRunLoopTimer, @"defaultRunLoopTimer should be nil after invalidation");
}

- (void)testCancelAndDisconnect_shouldSynchronizeOnCleanupQueue {
XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"];
self.thread.onStop = ^{
[stopExpectation fulfill];
};

[self.thread cancelAndDisconnect:YES];

// Validate synchronization
__block BOOL didSynchronize = NO;
dispatch_sync(self.thread.serialQueue, ^{
didSynchronize = YES;
});

XCTAssertTrue(didSynchronize, @"The cleanupQueue should synchronize the operations");
[self waitForExpectations:@[stopExpectation] timeout:1];
}

@end
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
### Bug Fixes

- **AWSIoT**
- Fixing a race condition when invalidating/creating the reconnect timer (#5454)
- Fixing a potential race condition in the timer ring queue (#5461)
- Fixing race conditions during cleanup in `AWSIoTStreamThread` (#5477)
- Fixing a race condition when invalidating/creating the reconnect timer in `AWSIoTMQTTClient` (#5454)
- Fixing a potential race condition in the timer ring queue in `AWSMQTTSession` (#5461)

## 2.38.0

Expand Down

0 comments on commit 06ab635

Please sign in to comment.