Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(IoT): fixing race condition in AWSIoTStreamThread .cxx_destruct … #5469

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AWSIoT/Internal/AWSIoTStreamThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ NS_ASSUME_NONNULL_BEGIN

@interface AWSIoTStreamThread : NSThread

@property(strong, nullable) void (^onStop)(void);
@property(nonatomic, copy, nullable) void (^onStop)(void);

-(instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
Expand Down
214 changes: 150 additions & 64 deletions AWSIoT/Internal/AWSIoTStreamThread.m
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ @interface AWSIoTStreamThread()
@property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval;
@property(nonatomic, assign) BOOL isRunning;
@property(nonatomic, assign) BOOL shouldDisconnect;

// Add synchronization primitives
@property(nonatomic, strong) dispatch_queue_t cleanupQueue;
@property(nonatomic, strong) dispatch_semaphore_t cleanupSemaphore;
@property(nonatomic, assign) BOOL isCleaningUp;
@end

@implementation AWSIoTStreamThread

- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream {
return [self initWithSession:session
Expand All @@ -40,114 +45,195 @@ - (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
outputStream:nil];
}

-(instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream
outputStream:(nullable NSOutputStream *)outputStream; {
- (instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream
outputStream:(nullable NSOutputStream *)outputStream {
if (self = [super init]) {
_session = session;
_decoderInputStream = decoderInputStream;
_encoderOutputStream = encoderOutputStream;
_outputStream = outputStream;
_defaultRunLoopTimeInterval = 10;
_shouldDisconnect = NO;
_isCleaningUp = NO;

// Initialize synchronization primitives
_cleanupQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.cleanup", DISPATCH_QUEUE_SERIAL);
_cleanupSemaphore = dispatch_semaphore_create(1);
}
return self;
}

- (void)main {
AWSDDLogVerbose(@"Started execution of Thread: [%@]", self);
//This is invoked in a new thread by the webSocketDidOpen method or by the Connect method. Get the runLoop from the thread.
self.runLoopForStreamsThread = [NSRunLoop currentRunLoop];
@autoreleasepool {
AWSDDLogVerbose(@"Started execution of Thread: [%@]", self);

if (![self setupRunLoop]) {
AWSDDLogError(@"Failed to setup run loop for thread: [%@]", self);
return;
}

[self startIOOperations];

while ([self shouldContinueRunning]) {
@autoreleasepool {
[self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode
beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]];
}
}

[self performCleanup];

AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self);
}
}

//Setup a default timer to ensure that the RunLoop always has atleast one timer on it. This is to prevent the while loop
//below to spin in tight loop when all input sources and session timers are shutdown during a reconnect sequence.
- (BOOL)setupRunLoop {
if (self.isRunning) {
AWSDDLogError(@"Thread already running");
return NO;
}

self.runLoopForStreamsThread = [NSRunLoop currentRunLoop];

// Setup timer with weak reference to prevent retain cycles
__weak typeof(self) weakSelf = self;
self.defaultRunLoopTimer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:60.0]
interval:60.0
target:self
selector:@selector(timerHandler:)
userInfo:nil
repeats:YES];
interval:60.0
target:weakSelf
selector:@selector(timerHandler:)
userInfo:nil
repeats:YES];

if (!self.defaultRunLoopTimer) {
AWSDDLogError(@"Failed to create run loop timer");
return NO;
}
[self.runLoopForStreamsThread addTimer:self.defaultRunLoopTimer
forMode:NSDefaultRunLoopMode];

self.isRunning = YES;
return YES;
}

- (void)startIOOperations {
if (self.outputStream) {
[self.outputStream scheduleInRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
forMode:NSDefaultRunLoopMode];
[self.outputStream open];
}

//Update the runLoop and runLoopMode in session.
[self.session connectToInputStream:self.decoderInputStream
outputStream:self.encoderOutputStream];
}

while (self.isRunning && !self.isCancelled) {
//This will continue run until the thread is cancelled
//Run one cycle of the runloop. This will return after a input source event or timer event is processed
[self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode
beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]];
}

[self cleanUp];
- (BOOL)shouldContinueRunning {
__block BOOL shouldRun;
dispatch_sync(self.cleanupQueue, ^{
shouldRun = self.isRunning && !self.isCancelled && self.defaultRunLoopTimer != nil;
});
return shouldRun;
}

AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self);
- (void)invalidateTimer {
dispatch_sync(self.cleanupQueue, ^{
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}
});
}

- (void)cancel {
AWSDDLogVerbose(@"Issued Cancel on thread [%@]", (NSThread *)self);
self.isRunning = NO;
[super cancel];
[self cancelWithDisconnect:NO];
}

- (void)cancelAndDisconnect:(BOOL)shouldDisconnect {
AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", shouldDisconnect ? @"YES" : @"NO", (NSThread *)self);
self.shouldDisconnect = shouldDisconnect;
self.isRunning = NO;
[super cancel];
AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]",
shouldDisconnect ? @"YES" : @"NO", (NSThread *)self);
[self cancelWithDisconnect:shouldDisconnect];
}

- (void)cleanUp {
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}

if (self.shouldDisconnect) {
if (self.session) {
[self.session close];
self.session = nil;
}

if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream close];
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
self.outputStream = nil;
- (void)cancelWithDisconnect:(BOOL)shouldDisconnect {
// Ensure thread-safe property updates
dispatch_sync(self.cleanupQueue, ^{
if (!self.isCleaningUp) {
self.shouldDisconnect = shouldDisconnect;
self.isRunning = NO;
[super cancel];

// Invalidate timer to trigger run loop exit
[self invalidateTimer];
}
});
}

if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
}
- (void)performCleanup {
dispatch_semaphore_wait(self.cleanupSemaphore, DISPATCH_TIME_FOREVER);

if (self.isCleaningUp) {
dispatch_semaphore_signal(self.cleanupSemaphore);
return;
}

self.isCleaningUp = YES;
dispatch_semaphore_signal(self.cleanupSemaphore);

dispatch_sync(self.cleanupQueue, ^{
[self cleanupResources];
});
}

if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
}
- (void)cleanupResources {
if (self.shouldDisconnect) {
[self closeSession];
[self closeStreams];
} else {
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
}

if (self.onStop) {
self.onStop();

// Handle onStop callback
dispatch_block_t stopBlock = self.onStop;
if (stopBlock) {
self.onStop = nil;
stopBlock();
}
}

- (void)closeSession {
if (self.session) {
[self.session close];
self.session = nil;
}
}

- (void)closeStreams {
if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream close];
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
self.outputStream = nil;
}

if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
}

if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
}
}

- (void)timerHandler:(NSTimer*)theTimer {
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO");
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@",
self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO");
}

- (void)dealloc {
AWSDDLogVerbose(@"Deallocating AWSIoTStreamThread: [%@]", self);
}

@end
Loading
Loading