diff --git a/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m b/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m index ffceb738fe0..45394991b58 100644 --- a/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m +++ b/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m @@ -19,6 +19,7 @@ #import "AWSMQTTEncoder.h" #import "AWSMQttTxFlow.h" #import "AWSIoTMessage.h" +#import "AWSMQTTTimerRing.h" #import "AWSIoTMessage+AWSMQTTMessage.h" @interface AWSMQTTSession () { @@ -58,7 +59,7 @@ - (void)send:(AWSMQTTMessage*)msg; - (UInt16)nextMsgId; @property (strong,atomic) NSMutableArray* queue; //Queue to temporarily hold messages if encoder is busy sending another message -@property (strong,atomic) NSMutableArray* timerRing; // circular array of 60. Each element is a set that contains the messages that need to be retried. +@property (strong,atomic) AWSMQTTTimerRing* timerRing; // A collection of messages that need to be retried. @property (nonatomic, strong) dispatch_queue_t drainSenderSerialQueue; @property (nonatomic, strong) AWSMQTTEncoder* encoder; //Low level protocol handler that converts a message into out bound network data @property (nonatomic, strong) AWSMQTTDecoder* decoder; //Low level protocol handler that converts in bound network data into a Message @@ -103,11 +104,7 @@ - (id)initWithClientId:(NSString*)theClientId txMsgId = 1; txFlows = [[NSMutableDictionary alloc] init]; rxFlows = [[NSMutableDictionary alloc] init]; - self.timerRing = [[NSMutableArray alloc] initWithCapacity:60]; - int i; - for (i = 0; i < 60; i++) { - [self.timerRing addObject:[NSMutableSet new]]; - } + self.timerRing = [[AWSMQTTTimerRing alloc] init]; serialQueue = dispatch_queue_create("com.amazon.aws.iot.test-queue", DISPATCH_QUEUE_SERIAL); ticks = 0; status = AWSMQTTSessionStatusCreated; @@ -233,7 +230,7 @@ - (UInt16)publishDataAtLeastOnce:(NSData*)data AWSMQttTxFlow *flow = [AWSMQttTxFlow flowWithMsg:msg deadline:deadline]; [txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]]; - [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]]; + [self.timerRing addMsgId:[NSNumber numberWithUnsignedInt:msgId] atTick:[flow deadline]]; AWSDDLogDebug(@"Published message %hu for QOS 1", msgId); [self send:msg]; return msgId; @@ -267,7 +264,7 @@ - (UInt16)publishDataExactlyOnce:(NSData*)data AWSMQttTxFlow *flow = [AWSMQttTxFlow flowWithMsg:msg deadline:(ticks + 60)]; [txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]]; - [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]]; + [self.timerRing addMsgId:[NSNumber numberWithUnsignedInt:msgId] atTick:[flow deadline]]; [self send:msg]; return msgId; } @@ -299,7 +296,7 @@ - (void)timerHandler:(NSTimer*)theTimer { dispatch_sync(serialQueue, ^{ ticks++; }); - NSEnumerator *e = [[[self.timerRing objectAtIndex:(ticks % 60)] allObjects] objectEnumerator]; + NSEnumerator *e = [[self.timerRing allMsgIdsAtTick:ticks] objectEnumerator]; id msgId; //Stay under the throttle here and move the work to the next tick if throttle is breached. @@ -321,8 +318,8 @@ - (void)timerHandler:(NSTimer*)theTimer { while ((msgId = [e nextObject])) { AWSMQttTxFlow *flow = [txFlows objectForKey:msgId]; [flow setDeadline:((ticks +1) % 60)]; - [[self.timerRing objectAtIndex:((ticks + 1) % 60)] addObject:msgId]; - [[self.timerRing objectAtIndex:(ticks % 60)] removeObject:msgId]; + [self.timerRing addMsgId:msgId atTick:(ticks + 1)]; + [self.timerRing removeMsgId:msgId atTick:ticks]; } if (count > 0 ) { @@ -567,8 +564,8 @@ - (void)handlePuback:(AWSMQTTMessage*)msg { if ([[flow msg] type] != AWSMQTTPublish || [[flow msg] qos] != 1) { return; } - - [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId]; + + [self.timerRing removeMsgId:msgId atTick:[flow deadline]]; [txFlows removeObjectForKey:msgId]; AWSDDLogDebug(@"Removing msgID %@ from internal store for QOS1 guarantee", msgId); [self.delegate session:self newAckForMessageId:msgId.unsignedShortValue]; @@ -594,10 +591,10 @@ - (void)handlePubrec:(AWSMQTTMessage*)msg { } msg = [AWSMQTTMessage pubrelMessageWithMessageId:[msgId unsignedIntValue]]; [flow setMsg:msg]; - [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId]; + [self.timerRing removeMsgId:msgId atTick:[flow deadline]]; [flow setDeadline:(ticks + 60)]; - [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:msgId]; - + [self.timerRing addMsgId:msgId atTick:[flow deadline]]; + [self send:msg]; } @@ -638,8 +635,8 @@ - (void)handlePubcomp:(AWSMQTTMessage*)msg { if (flow == nil || [[flow msg] type] != AWSMQTTPubrel) { return; } - - [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId]; + + [self.timerRing removeMsgId:msgId atTick:[flow deadline]]; [txFlows removeObjectForKey:msgId]; AWSDDLogDebug(@"Removing msgID %@ from internal store for QOS2 guarantee", msgId); diff --git a/AWSIoT/Internal/MQTTSDK/AWSMQTTTimerRing.h b/AWSIoT/Internal/MQTTSDK/AWSMQTTTimerRing.h new file mode 100644 index 00000000000..7bc2c01bdba --- /dev/null +++ b/AWSIoT/Internal/MQTTSDK/AWSMQTTTimerRing.h @@ -0,0 +1,30 @@ +// +// Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). +// You may not use this file except in compliance with the License. +// A copy of the License is located at +// +// http://aws.amazon.com/apache2.0 +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. +// + +#import + +NS_ASSUME_NONNULL_BEGIN + +/// A circular collection containing the messages that need to be retried at a given clock tick. +/// The maximum number of ticks is 60 +@interface AWSMQTTTimerRing: NSObject + +- (void)addMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick; +- (void)removeMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick; +- (NSArray *)allMsgIdsAtTick:(NSUInteger)tick; + +@end + +NS_ASSUME_NONNULL_END diff --git a/AWSIoT/Internal/MQTTSDK/AWSMQTTTimerRing.m b/AWSIoT/Internal/MQTTSDK/AWSMQTTTimerRing.m new file mode 100644 index 00000000000..40998f4a994 --- /dev/null +++ b/AWSIoT/Internal/MQTTSDK/AWSMQTTTimerRing.m @@ -0,0 +1,60 @@ +// +// Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). +// You may not use this file except in compliance with the License. +// A copy of the License is located at +// +// http://aws.amazon.com/apache2.0 +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. +// + +#import "AWSMQTTTimerRing.h" + +@interface AWSMQTTTimerRing() + +@property (nonatomic, strong) NSLock *lock; +// Array of 60, with each index being a tick, and its value a set containing the messages that need to be retried. +@property (strong,atomic) NSMutableArray* timerRing; + +@end + +@implementation AWSMQTTTimerRing + +- (instancetype)init +{ + self = [super init]; + if (self) { + _lock = [[NSLock alloc] init]; + _timerRing = [[NSMutableArray alloc] initWithCapacity:60]; + int i; + for (i = 0; i < 60; i++) { + [_timerRing addObject:[NSMutableSet new]]; + } + } + return self; +} +- (void)addMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick { + [self.lock lock]; + [[self.timerRing objectAtIndex:(tick % 60)] addObject:msgId]; + [self.lock unlock]; +} + +- (void)removeMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick { + [self.lock lock]; + [[self.timerRing objectAtIndex:(tick % 60)] removeObject:msgId]; + [self.lock unlock]; +} + +- (NSArray *)allMsgIdsAtTick:(NSUInteger)tick { + [self.lock lock]; + NSArray *result = [[self.timerRing objectAtIndex:(tick % 60)] allObjects]; + [self.lock unlock]; + return result; +} + +@end diff --git a/AWSiOSSDKv2.xcodeproj/project.pbxproj b/AWSiOSSDKv2.xcodeproj/project.pbxproj index 88f71373dc7..d5a4a2c636d 100644 --- a/AWSiOSSDKv2.xcodeproj/project.pbxproj +++ b/AWSiOSSDKv2.xcodeproj/project.pbxproj @@ -598,6 +598,8 @@ 5C1590172755727C00F88085 /* AWSCore.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = CE0D416D1C6A66E5006B91B5 /* AWSCore.framework */; }; 5C1978DD2702364800F9C11E /* AWSLocationTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5C1978DC2702364800F9C11E /* AWSLocationTests.swift */; }; 5C71F33F295672B8001183A4 /* guten_tag.wav in Resources */ = {isa = PBXBuildFile; fileRef = 5C71F33E295672B8001183A4 /* guten_tag.wav */; }; + 685AA2112CDA7843008EFC7B /* AWSMQTTTimerRing.h in Headers */ = {isa = PBXBuildFile; fileRef = 685AA20F2CDA7843008EFC7B /* AWSMQTTTimerRing.h */; }; + 685AA2122CDA7843008EFC7B /* AWSMQTTTimerRing.m in Sources */ = {isa = PBXBuildFile; fileRef = 685AA2102CDA7843008EFC7B /* AWSMQTTTimerRing.m */; }; 687952932B8FE2C5001E8990 /* AWSDDLog+Optional.swift in Sources */ = {isa = PBXBuildFile; fileRef = 687952922B8FE2C5001E8990 /* AWSDDLog+Optional.swift */; }; 6883619E2B72D1C200D74FF4 /* AWSS3PreSignedURLBuilderUnitTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6883619D2B72D1C200D74FF4 /* AWSS3PreSignedURLBuilderUnitTests.swift */; }; 688361A12B73D25B00D74FF4 /* AWSIoTStreamThreadTests.m in Sources */ = {isa = PBXBuildFile; fileRef = 688361A02B73D25B00D74FF4 /* AWSIoTStreamThreadTests.m */; }; @@ -3215,6 +3217,8 @@ 5C1978DB2702364800F9C11E /* AWSLocationTests-Bridging-Header.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = "AWSLocationTests-Bridging-Header.h"; sourceTree = ""; }; 5C1978DC2702364800F9C11E /* AWSLocationTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSLocationTests.swift; sourceTree = ""; }; 5C71F33E295672B8001183A4 /* guten_tag.wav */ = {isa = PBXFileReference; lastKnownFileType = audio.wav; path = guten_tag.wav; sourceTree = ""; }; + 685AA20F2CDA7843008EFC7B /* AWSMQTTTimerRing.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = AWSMQTTTimerRing.h; sourceTree = ""; }; + 685AA2102CDA7843008EFC7B /* AWSMQTTTimerRing.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = AWSMQTTTimerRing.m; sourceTree = ""; }; 687952922B8FE2C5001E8990 /* AWSDDLog+Optional.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "AWSDDLog+Optional.swift"; sourceTree = ""; }; 6883619D2B72D1C200D74FF4 /* AWSS3PreSignedURLBuilderUnitTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSS3PreSignedURLBuilderUnitTests.swift; sourceTree = ""; }; 688361A02B73D25B00D74FF4 /* AWSIoTStreamThreadTests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = AWSIoTStreamThreadTests.m; sourceTree = ""; }; @@ -7244,6 +7248,8 @@ CE9DE6461C6A78D70060793F /* AWSMQTTSession.m */, CE9DE6471C6A78D70060793F /* AWSMQttTxFlow.h */, CE9DE6481C6A78D70060793F /* AWSMQttTxFlow.m */, + 685AA20F2CDA7843008EFC7B /* AWSMQTTTimerRing.h */, + 685AA2102CDA7843008EFC7B /* AWSMQTTTimerRing.m */, ); path = MQTTSDK; sourceTree = ""; @@ -8455,6 +8461,7 @@ files = ( CE9DE6521C6A78D70060793F /* AWSIoTDataResources.h in Headers */, CE9DE65A1C6A78D70060793F /* AWSIoTResources.h in Headers */, + 685AA2112CDA7843008EFC7B /* AWSMQTTTimerRing.h in Headers */, 68EE1A6C2B713D8100B7CF41 /* AWSIoTStreamThread.h in Headers */, CE9DE6231C6A78AF0060793F /* AWSIoT.h in Headers */, CE9DE6561C6A78D70060793F /* AWSIoTManager.h in Headers */, @@ -13398,6 +13405,7 @@ CE9DE66D1C6A78D70060793F /* AWSMQTTSession.m in Sources */, CE9DE6551C6A78D70060793F /* AWSIoTDataService.m in Sources */, 0342776A269D185200379263 /* AWSIoTMessage+AWSMQTTMessage.m in Sources */, + 685AA2122CDA7843008EFC7B /* AWSMQTTTimerRing.m in Sources */, CE9DE66B1C6A78D70060793F /* AWSMQTTMessage.m in Sources */, CE9DE65D1C6A78D70060793F /* AWSIoTService.m in Sources */, 68DD11872C5AF52B004E1C37 /* AWSIoTAtomicDictionary.m in Sources */, diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ac61be687e..85a05f90afb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - **AWSIoT** - Fixing a race condition when invalidating/creating the reconnect timer (#5454) + - Fixing a potential race condition in the timer ring queue (#5461) ## 2.38.0