Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(IoT): Fixing race conditions during cleanup in AWSIoTStreamThread #5477

Merged
merged 3 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AWSIoT/Internal/AWSIoTStreamThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ NS_ASSUME_NONNULL_BEGIN

@interface AWSIoTStreamThread : NSThread

@property(strong, nullable) void (^onStop)(void);
@property(nonatomic, copy, nullable) void (^onStop)(void);

-(instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
Expand Down
122 changes: 74 additions & 48 deletions AWSIoT/Internal/AWSIoTStreamThread.m
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ @interface AWSIoTStreamThread()
@property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval;
@property(nonatomic, assign) BOOL isRunning;
@property(nonatomic, assign) BOOL shouldDisconnect;
@property(nonatomic, strong) dispatch_queue_t serialQueue;
@property(nonatomic, assign) BOOL didCleanUp;
@end

@implementation AWSIoTStreamThread
Expand All @@ -40,34 +42,42 @@ - (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
outputStream:nil];
}

-(instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream
outputStream:(nullable NSOutputStream *)outputStream; {
- (instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream
outputStream:(nullable NSOutputStream *)outputStream {
if (self = [super init]) {
_session = session;
_decoderInputStream = decoderInputStream;
_encoderOutputStream = encoderOutputStream;
_outputStream = outputStream;
_defaultRunLoopTimeInterval = 10;
_shouldDisconnect = NO;
_serialQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.syncQueue", DISPATCH_QUEUE_SERIAL);
_didCleanUp = NO;
}
return self;
}

- (void)main {
if (self.isRunning) {
AWSDDLogWarn(@"Attempted to start a thread that is already running: [%@]", self);
return;
}

AWSDDLogVerbose(@"Started execution of Thread: [%@]", self);
//This is invoked in a new thread by the webSocketDidOpen method or by the Connect method. Get the runLoop from the thread.
self.runLoopForStreamsThread = [NSRunLoop currentRunLoop];

//Setup a default timer to ensure that the RunLoop always has atleast one timer on it. This is to prevent the while loop
//below to spin in tight loop when all input sources and session timers are shutdown during a reconnect sequence.
__weak typeof(self) weakSelf = self;
self.defaultRunLoopTimer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:60.0]
interval:60.0
target:self
selector:@selector(timerHandler:)
userInfo:nil
repeats:YES];
interval:60.0
repeats:YES
block:^(NSTimer * _Nonnull timer) {
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", weakSelf, weakSelf.isRunning ? @"YES" : @"NO", weakSelf.isCancelled ? @"YES" : @"NO");
}];
[self.runLoopForStreamsThread addTimer:self.defaultRunLoopTimer
forMode:NSDefaultRunLoopMode];

Expand All @@ -82,7 +92,7 @@ - (void)main {
[self.session connectToInputStream:self.decoderInputStream
outputStream:self.encoderOutputStream];

while (self.isRunning && !self.isCancelled) {
while ([self shouldContinueRunning]) {
//This will continue run until the thread is cancelled
//Run one cycle of the runloop. This will return after a input source event or timer event is processed
[self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode
Expand All @@ -94,60 +104,76 @@ - (void)main {
AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self);
}

- (BOOL)shouldContinueRunning {
__block BOOL shouldRun;
dispatch_sync(self.serialQueue, ^{
shouldRun = self.isRunning && !self.isCancelled && self.defaultRunLoopTimer != nil;
});
return shouldRun;
}

- (void)cancel {
AWSDDLogVerbose(@"Issued Cancel on thread [%@]", (NSThread *)self);
self.isRunning = NO;
[super cancel];
dispatch_sync(self.serialQueue, ^{
self.isRunning = NO;
[super cancel];
});
}

- (void)cancelAndDisconnect:(BOOL)shouldDisconnect {
AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", shouldDisconnect ? @"YES" : @"NO", (NSThread *)self);
self.shouldDisconnect = shouldDisconnect;
self.isRunning = NO;
[super cancel];
dispatch_sync(self.serialQueue, ^{
self.shouldDisconnect = shouldDisconnect;
self.isRunning = NO;
[super cancel];
});
}

- (void)cleanUp {
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}

if (self.shouldDisconnect) {
if (self.session) {
[self.session close];
self.session = nil;
dispatch_sync(self.serialQueue, ^{
if (self.didCleanUp) {
AWSDDLogVerbose(@"Clean up already called for thread: [%@]", (NSThread *)self);
return;
}

if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream close];
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
self.outputStream = nil;
self.didCleanUp = YES;
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}

if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
if (self.shouldDisconnect) {
if (self.session) {
[self.session close];
self.session = nil;
}

if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream close];
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
self.outputStream = nil;
}

if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
}

if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
}
} else {
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
}

if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
if (self.onStop) {
self.onStop();
self.onStop = nil;
}
} else {
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
}

if (self.onStop) {
self.onStop();
self.onStop = nil;
}
}

- (void)timerHandler:(NSTimer*)theTimer {
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO");
});
}

@end
50 changes: 44 additions & 6 deletions AWSIoTUnitTests/AWSIoTStreamThreadTests.m
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

@interface AWSIoTStreamThread()
@property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval;
@property (nonatomic, assign) BOOL isRunning;
@property (nonatomic, strong) dispatch_queue_t serialQueue;
@property (nonatomic, assign) BOOL didCleanUp;
@property (nonatomic, strong, nullable) NSTimer *defaultRunLoopTimer;
@end


Expand Down Expand Up @@ -56,6 +60,7 @@ - (void)setUp {
}

- (void)tearDown {
[self.thread cancelAndDisconnect:YES];
self.thread = nil;
self.session = nil;
self.decoderInputStream = nil;
Expand All @@ -67,6 +72,7 @@ - (void)tearDown {
/// When: The thread is started
/// Then: The output stream is opened and the session is connected to the decoder and encoder streams
- (void)testStart_shouldOpenStream_andInvokeConnectOnSession {
OCMVerify([self.outputStream scheduleInRunLoop:[OCMArg any] forMode:NSDefaultRunLoopMode]);
OCMVerify([self.outputStream open]);
OCMVerify([self.session connectToInputStream:[OCMArg any] outputStream:[OCMArg any]]);
}
Expand All @@ -87,6 +93,7 @@ - (void)testCancelAndDisconnect_shouldCloseStreams_andInvokeOnStop {
OCMVerify([self.encoderOutputStream close]);
OCMVerify([self.outputStream close]);
OCMVerify([self.session close]);
XCTAssertFalse(self.thread.isRunning);
}

/// Given: A running AWSIoTStreamThread
Expand All @@ -108,9 +115,9 @@ - (void)testCancel_shouldNotCloseStreams_andInvokeOnStop {
didInvokeDecoderInputStreamClose = YES;
}];

__block BOOL didInvokeEncoderDecoderInputStreamClose = NO;
__block BOOL didInvokeEncoderOutputStreamClose = NO;
[OCMStub([self.encoderOutputStream close]) andDo:^(NSInvocation *invocation) {
didInvokeEncoderDecoderInputStreamClose = YES;
didInvokeEncoderOutputStreamClose = YES;
}];

__block BOOL didInvokeOutputStreamClose = NO;
Expand All @@ -121,10 +128,41 @@ - (void)testCancel_shouldNotCloseStreams_andInvokeOnStop {
[self.thread cancelAndDisconnect:NO];
[self waitForExpectations:@[stopExpectation] timeout:1];

XCTAssertFalse(didInvokeSessionClose);
XCTAssertFalse(didInvokeDecoderInputStreamClose);
XCTAssertFalse(didInvokeEncoderDecoderInputStreamClose);
XCTAssertFalse(didInvokeOutputStreamClose);
XCTAssertFalse(didInvokeSessionClose, @"The `close` method on `session` should not be invoked");
XCTAssertFalse(didInvokeDecoderInputStreamClose, @"The `close` method on `decoderInputStream` should not be invoked");
XCTAssertFalse(didInvokeEncoderOutputStreamClose, @"The `close` method on `encoderOutputStream` should not be invoked");
XCTAssertFalse(didInvokeOutputStreamClose, @"The `close` method on `outputStream` should not be invoked");
}

- (void)testCancelAndDisconnect_shouldSeDidCleanUp_andInvalidateTimer {
ruisebas marked this conversation as resolved.
Show resolved Hide resolved
XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"];
self.thread.onStop = ^{
[stopExpectation fulfill];
};

[self.thread cancelAndDisconnect:YES];

[self waitForExpectations:@[stopExpectation] timeout:1];
XCTAssertTrue(self.thread.didCleanUp, @"didCleanUp should be YES after cleanup");
XCTAssertNil(self.thread.defaultRunLoopTimer, @"defaultRunLoopTimer should be nil after invalidation");
}

- (void)testCancelAndDisconnect_shouldSynchronizeOnCleanupQueue {
XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"];
self.thread.onStop = ^{
[stopExpectation fulfill];
};

[self.thread cancelAndDisconnect:YES];

// Validate synchronization
__block BOOL didSynchronize = NO;
dispatch_sync(self.thread.serialQueue, ^{
didSynchronize = YES;
});

XCTAssertTrue(didSynchronize, @"The cleanupQueue should synchronize the operations");
[self waitForExpectations:@[stopExpectation] timeout:1];
}

@end
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
### Bug Fixes

- **AWSIoT**
- Fixing a race condition when invalidating/creating the reconnect timer (#5454)
- Fixing a potential race condition in the timer ring queue (#5461)
- Fixing race conditions during cleanup in `AWSIoTStreamThread` (#5477)
- Fixing a race condition when invalidating/creating the reconnect timer in `AWSIoTMQTTClient` (#5454)
- Fixing a potential race condition in the timer ring queue in `AWSMQTTSession` (#5461)

## 2.38.0

Expand Down
Loading