Skip to content

Commit

Permalink
Use poll instead of select #1033
Browse files Browse the repository at this point in the history
  • Loading branch information
icraggs committed Feb 4, 2022
1 parent 5e6e389 commit a43528b
Show file tree
Hide file tree
Showing 21 changed files with 331 additions and 295 deletions.
2 changes: 1 addition & 1 deletion src/Clients.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ int clientSocketCompare(void* a, void* b)
{
Clients* client = (Clients*)a;
/*printf("comparing %d with %d\n", (char*)a, (char*)b); */
return client->net.socket == *(int*)b;
return client->net.socket == *(SOCKET*)b;
}
7 changes: 4 additions & 3 deletions src/Clients.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2009, 2021 IBM Corp. and Ian Craggs
* Copyright (c) 2009, 2022 IBM Corp. and Ian Craggs
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
Expand All @@ -24,15 +24,16 @@

#include <stdint.h>
#include "MQTTTime.h"
#if defined(OPENSSL)
#if defined(_WIN32) || defined(_WIN64)
#include <winsock2.h>
#endif
#if defined(OPENSSL)
#include <openssl/ssl.h>
#endif
#include "MQTTClient.h"
#include "LinkedList.h"
#include "MQTTClientPersistence.h"
#include "Socket.h"

/**
* Stored publication data to minimize copying
Expand Down Expand Up @@ -77,7 +78,7 @@ typedef struct

typedef struct
{
int socket;
SOCKET socket;
START_TIME_TYPE lastSent;
START_TIME_TYPE lastReceived;
START_TIME_TYPE lastPing;
Expand Down
22 changes: 22 additions & 0 deletions src/Keysight_ws_add
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#if WINVER <= _WIN32_WINNT_WIN8
#define HTON(x) hton((uint64_t) (x), sizeof(x))
uint64_t hton(uint64_t x, size_t n)
{
uint64_t y = 0;
size_t i = 0;

for (i=0; i < n; ++i)
{
y = (y << 8) | (x & 0xff);
x = (x >> 8);
}
return y;
}
#define htons(x) (uint16_t) HTON(x)
#define htonl(x) (uint32_t) HTON(x)
#define htonll(x) (uint64_t) HTON(x)

#define ntohs(x) htons(x)
#define ntohl(x) htonl(x)
#define ntohll(x) htonll(x)
#endif
4 changes: 2 additions & 2 deletions src/MQTTAsync.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2009, 2021 IBM Corp., Ian Craggs and others
* Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
Expand Down Expand Up @@ -486,7 +486,7 @@ void MQTTAsync_destroy(MQTTAsync* handle)

if (m->c)
{
int saved_socket = m->c->net.socket;
SOCKET saved_socket = m->c->net.socket;
char* saved_clientid = MQTTStrdup(m->c->clientID);
#if !defined(NO_PERSISTENCE)
MQTTPersistence_close(m->c);
Expand Down
26 changes: 10 additions & 16 deletions src/MQTTAsyncUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, size_t topic
static int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout);
static int cmdMessageIDCompare(void* a, void* b);
static void MQTTAsync_retry(void);
static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc);
static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc);
static int MQTTAsync_connecting(MQTTAsyncs* m);

extern MQTTProtocol state; /* defined in MQTTAsync.c */
Expand Down Expand Up @@ -889,11 +889,12 @@ int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
{
ListDetach(MQTTAsync_commands, first_publish);

MQTTAsync_freeCommand(first_publish);
#if !defined(NO_PERSISTENCE)
if (command->client->c->persistence)
MQTTAsync_unpersistCommand(first_publish);
#endif

MQTTAsync_freeCommand(first_publish);
}
}
else
Expand Down Expand Up @@ -976,7 +977,7 @@ void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command)
/**
* Call Socket_noPendingWrites(int socket) with protection by socket_mutex, see https://github.com/eclipse/paho.mqtt.c/issues/385
*/
static int MQTTAsync_Socket_noPendingWrites(int socket)
static int MQTTAsync_Socket_noPendingWrites(SOCKET socket)
{
int rc;
MQTTAsync_lock_mutex(socket_mutex);
Expand Down Expand Up @@ -1059,7 +1060,7 @@ static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
}


void MQTTAsync_writeComplete(int socket, int rc)
void MQTTAsync_writeComplete(SOCKET socket, int rc)
{
ListElement* found = NULL;

Expand Down Expand Up @@ -1976,7 +1977,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
while (!MQTTAsync_tostop)
{
int rc = SOCKET_ERROR;
int sock = -1;
SOCKET sock = -1;
MQTTAsyncs* m = NULL;
MQTTPacket* pack = NULL;

Expand Down Expand Up @@ -2879,32 +2880,25 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
}


static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc)
{
struct timeval tp = {0L, 0L};
MQTTPacket* pack = NULL;
int rc1 = 0;

FUNC_ENTRY;
if (timeout > 0L)
{
tp.tv_sec = timeout / 1000;
tp.tv_usec = (timeout % 1000) * 1000; /* this field is microseconds! */
}

int rc1 = 0;
#if defined(OPENSSL)
if ((*sock = SSLSocket_getPendingRead()) == -1)
{
#endif
int should_stop = 0;

/* 0 from getReadySocket indicates no work to do, rc -1 == error */
*sock = Socket_getReadySocket(0, &tp, socket_mutex, &rc1);
*sock = Socket_getReadySocket(0, (int)timeout, socket_mutex, &rc1);
*rc = rc1;
MQTTAsync_lock_mutex(mqttasync_mutex);
should_stop = MQTTAsync_tostop;
MQTTAsync_unlock_mutex(mqttasync_mutex);
if (!should_stop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L))
if (!should_stop && *sock == 0 && (timeout > 0L))
MQTTAsync_sleep(100L);
#if defined(OPENSSL)
}
Expand Down
4 changes: 2 additions & 2 deletions src/MQTTAsyncUtils.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2009, 2020 IBM Corp. and others
* Copyright (c) 2009, 2022 IBM Corp. and others
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
Expand Down Expand Up @@ -169,7 +169,7 @@ void MQTTAsync_closeSession(Clients* client, enum MQTTReasonCodes reasonCode, MQ
int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions* options, int internal);
int MQTTAsync_assignMsgId(MQTTAsyncs* m);
int MQTTAsync_getNoBufferedMessages(MQTTAsyncs* m);
void MQTTAsync_writeComplete(int socket, int rc);
void MQTTAsync_writeComplete(SOCKET socket, int rc);
void setRetryLoopInterval(int keepalive);

#if defined(_WIN32) || defined(_WIN64)
Expand Down
35 changes: 16 additions & 19 deletions src/MQTTClient.c
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,11 @@ static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectO
static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop, enum MQTTReasonCodes, MQTTProperties*);
static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
static void MQTTClient_retry(void);
static MQTTPacket* MQTTClient_cycle(int* sock, ELAPSED_TIME_TYPE timeout, int* rc);
static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int* rc);
static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, int64_t timeout);
/*static int pubCompare(void* a, void* b); */
static void MQTTProtocol_checkPendingWrites(void);
static void MQTTClient_writeComplete(int socket, int rc);
static void MQTTClient_writeComplete(SOCKET socket, int rc);


int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, const char* clientId,
Expand Down Expand Up @@ -576,7 +576,7 @@ void MQTTClient_destroy(MQTTClient* handle)

if (m->c)
{
int saved_socket = m->c->net.socket;
SOCKET saved_socket = m->c->net.socket;
char* saved_clientid = MQTTStrdup(m->c->clientID);
#if !defined(NO_PERSISTENCE)
MQTTPersistence_close(m->c);
Expand Down Expand Up @@ -807,7 +807,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
while (!tostop)
{
int rc = SOCKET_ERROR;
int sock = -1;
SOCKET sock = -1;
MQTTClients* m = NULL;
MQTTPacket* pack = NULL;

Expand Down Expand Up @@ -2484,27 +2484,24 @@ static void MQTTClient_retry(void)
}


static MQTTPacket* MQTTClient_cycle(int* sock, ELAPSED_TIME_TYPE timeout, int* rc)
static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int* rc)
{
struct timeval tp = {0L, 0L};
static Ack ack;
MQTTPacket* pack = NULL;
int rc1 = 0;
START_TIME_TYPE start;

FUNC_ENTRY;
if (timeout > 0L)
{
tp.tv_sec = (long)(timeout / 1000);
tp.tv_usec = (long)((timeout % 1000) * 1000); /* this field is microseconds! */
}

int rc1 = 0;
#if defined(OPENSSL)
if ((*sock = SSLSocket_getPendingRead()) == -1)
{
/* 0 from getReadySocket indicates no work to do, rc -1 == error */
#endif
*sock = Socket_getReadySocket(0, &tp, socket_mutex, rc);
start = MQTTTime_start_clock();
*sock = Socket_getReadySocket(0, (int)timeout, socket_mutex, rc);
*rc = rc1;
if (*sock == 0 && timeout >= 100L && MQTTTime_elapsed(start) < (int64_t)10)
MQTTTime_sleep(100L);
#if defined(OPENSSL)
}
#endif
Expand Down Expand Up @@ -2618,7 +2615,7 @@ static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* r
*rc = TCPSOCKET_COMPLETE;
while (1)
{
int sock = -1;
SOCKET sock = -1;
pack = MQTTClient_cycle(&sock, 100L, rc);
if (sock == m->c->net.socket)
{
Expand Down Expand Up @@ -2678,7 +2675,7 @@ static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* r
}
}
}
if (MQTTTime_elapsed(start) > (int64_t)timeout)
if (MQTTTime_elapsed(start) > (uint64_t)timeout)
{
pack = NULL;
break;
Expand Down Expand Up @@ -2723,7 +2720,7 @@ int MQTTClient_receive(MQTTClient handle, char** topicName, int* topicLen, MQTTC
elapsed = MQTTTime_elapsed(start);
do
{
int sock = 0;
SOCKET sock = 0;
MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);

if (rc == SOCKET_ERROR)
Expand Down Expand Up @@ -2765,7 +2762,7 @@ void MQTTClient_yield(void)
elapsed = MQTTTime_elapsed(start);
do
{
int sock = -1;
SOCKET sock = -1;
MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
Thread_lock_mutex(mqttclient_mutex);
if (rc == SOCKET_ERROR && ListFindItem(handles, &sock, clientSockCompare))
Expand Down Expand Up @@ -3012,7 +3009,7 @@ static void MQTTProtocol_checkPendingWrites(void)
}


static void MQTTClient_writeComplete(int socket, int rc)
static void MQTTClient_writeComplete(SOCKET socket, int rc)
{
ListElement* found = NULL;

Expand Down
4 changes: 2 additions & 2 deletions src/MQTTPersistence.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2009, 2020 IBM Corp.
* Copyright (c) 2009, 2022 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
Expand Down Expand Up @@ -431,7 +431,7 @@ void MQTTPersistence_insertInOrder(List* list, void* content, size_t size)
* @param the MQTT version being used (>= MQTTVERSION_5 means properties included)
* @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
*/
int MQTTPersistence_putPacket(int socket, char* buf0, size_t buf0len, int count,
int MQTTPersistence_putPacket(SOCKET socket, char* buf0, size_t buf0len, int count,
char** buffers, size_t* buflens, int htype, int msgId, int scr, int MQTTVersion)
{
int rc = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/MQTTPersistence.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2009, 2020 IBM Corp.
* Copyright (c) 2009, 2022 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
Expand Down Expand Up @@ -63,7 +63,7 @@ int MQTTPersistence_clear(Clients* c);
int MQTTPersistence_restorePackets(Clients* c);
void* MQTTPersistence_restorePacket(int MQTTVersion, char* buffer, size_t buflen);
void MQTTPersistence_insertInOrder(List* list, void* content, size_t size);
int MQTTPersistence_putPacket(int socket, char* buf0, size_t buf0len, int count,
int MQTTPersistence_putPacket(SOCKET socket, char* buf0, size_t buf0len, int count,
char** buffers, size_t* buflens, int htype, int msgId, int scr, int MQTTVersion);
int MQTTPersistence_remove(Clients* c, char* type, int qos, int msgId);
void MQTTPersistence_wrapMsgID(Clients *c);
Expand Down
4 changes: 2 additions & 2 deletions src/MQTTProtocol.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
* Copyright (c) 2009, 2022 IBM Corp., Ian Craggs
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
Expand Down Expand Up @@ -27,7 +27,7 @@

typedef struct
{
int socket;
SOCKET socket;
Publications* p;
} pending_write;

Expand Down
Loading

0 comments on commit a43528b

Please sign in to comment.