Skip to content

Commit

Permalink
Add timeouts to net functions.
Browse files Browse the repository at this point in the history
Further debugging of the general timeout system, as well
as having a single fiber wait on multiple state machines (select).
  • Loading branch information
bakpakin committed Jul 20, 2020
1 parent df145f4 commit 553b4d9
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 36 deletions.
5 changes: 4 additions & 1 deletion examples/tcpserver.janet
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@
# Run server.
(let [server (net/server "127.0.0.1" "8000")]
(print "Starting echo server on 127.0.0.1:8000")
(while true (ev/call handler (:accept server))))
(forever
(if-let [conn (:accept server 2.8)]
(ev/call handler conn)
(print "no new connections"))))
20 changes: 6 additions & 14 deletions src/core/ev.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "gc.h"
#include "state.h"
#include "vector.h"
#include "fiber.h"
#endif

#ifdef JANET_EV
Expand Down Expand Up @@ -240,12 +241,8 @@ void janet_pollable_deinit(JanetPollable *pollable) {
pollable->state = NULL;
}

/* In order to avoid unexpected wakeups on a fiber, prior to
* resuming a fiber after and event is triggered, we need to
* cancel all listeners that also want to wakeup this fiber.
* Otherwise, these listeners make wakeup the fiber on an unexpected
* await point. */
void janet_unschedule_others(JanetFiber *fiber) {
/* Cancel any state machines waiting on this fiber. */
void janet_cancel(JanetFiber *fiber) {
int32_t lcount = janet_v_count(fiber->waiting);
janet_v_empty(fiber->waiting);
for (int32_t index = 0; index < lcount; index++) {
Expand All @@ -260,11 +257,8 @@ void janet_unschedule_others(JanetFiber *fiber) {

/* Register a fiber to resume with value */
void janet_schedule(JanetFiber *fiber, Janet value) {
if (fiber->gc.flags & 0x10000) {
/* already scheduled to run, do nothing */
return;
}
fiber->gc.flags |= 0x10000;
if (fiber->flags & JANET_FIBER_FLAG_SCHEDULED) return;
fiber->flags |= JANET_FIBER_FLAG_SCHEDULED;
size_t oldcount = janet_vm_spawn_count;
size_t newcount = oldcount + 1;
if (newcount > janet_vm_spawn_capacity) {
Expand Down Expand Up @@ -294,9 +288,7 @@ void janet_ev_mark(void) {

/* Run a top level task */
static void run_one(JanetFiber *fiber, Janet value) {
/* Use a gc flag bit to indicate (is this fiber scheduled?) */
fiber->gc.flags &= ~0x10000;
janet_unschedule_others(fiber);
fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED;
Janet res;
JanetSignal sig = janet_continue(fiber, value, &res);
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) {
Expand Down
1 change: 1 addition & 0 deletions src/core/fiber.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t
}
if (janet_fiber_funcframe(fiber, callee)) return NULL;
janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE;
fiber->waiting = NULL;
return fiber;
}

Expand Down
3 changes: 2 additions & 1 deletion src/core/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
#define JANET_FIBER_MASK_USERN(N) (16 << (N))
#define JANET_FIBER_MASK_USER 0x3FF0

#define JANET_FIBER_STATUS_MASK 0xFF0000
#define JANET_FIBER_STATUS_MASK 0x7F0000
#define JANET_FIBER_FLAG_SCHEDULED 0x800000
#define JANET_FIBER_STATUS_OFFSET 16

#define JANET_FIBER_BREAKPOINT 0x1000000
Expand Down
11 changes: 9 additions & 2 deletions src/core/marsh.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ static void marshal_one_def(MarshalState *st, JanetFuncDef *def, int flags) {
}

#define JANET_FIBER_FLAG_HASCHILD (1 << 29)
#define JANET_FIBER_FLAG_HASENV (1 << 28)
#define JANET_STACKFRAME_HASENV (1 << 30)
#define JANET_FIBER_FLAG_HASENV (1 << 30)
#define JANET_STACKFRAME_HASENV (1 << 31)

/* Marshal a fiber */
static void marshal_one_fiber(MarshalState *st, JanetFiber *fiber, int flags) {
Expand Down Expand Up @@ -934,6 +934,8 @@ static const uint8_t *unmarshal_one_fiber(
fiber->data = NULL;
fiber->child = NULL;
fiber->env = NULL;
fiber->waiting = NULL;
fiber->timeout_index = -1;

/* Push fiber to seen stack */
janet_v_push(st->lookup, janet_wrap_fiber(fiber));
Expand Down Expand Up @@ -1048,6 +1050,11 @@ static const uint8_t *unmarshal_one_fiber(
fiber->maxstack = fiber_maxstack;
fiber->env = fiber_env;

int status = janet_fiber_status(fiber);
if (status < 0 || status > JANET_STATUS_ALIVE) {
janet_panic("invalid fiber status");
}

/* Return data */
*out = fiber;
return data;
Expand Down
52 changes: 35 additions & 17 deletions src/core/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEven
janet_mark(janet_wrap_function(state->function));
break;
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(s->fiber, janet_wrap_nil());
janet_gcunroot(janet_wrap_abstract(s->pollable));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_READ: {
Expand Down Expand Up @@ -650,7 +649,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
/* Server with handler */
JanetStream *stream = make_stream(sfd, 0);
NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server,
JANET_ASYNC_LISTEN_READ, sizeof(NetStateSimpleServer));
JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer));
ss->function = fun;
return janet_wrap_abstract(stream);
}
Expand All @@ -669,36 +668,44 @@ static void check_stream_flag(JanetStream *stream, int flag) {
}

static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
janet_arity(argc, 1, 2);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE);
double to = janet_optnumber(argv, argc, 1, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_accept(stream);
}

static Janet cfun_stream_read(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 3);
janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_READABLE);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_read(stream, buffer, n);
}

static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 3);
janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_READABLE);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_chunk(stream, buffer, n);
}

static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) {
janet_fixarity(argc, 3);
janet_arity(argc, 3, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_UDPSERVER);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_getbuffer(argv, 2);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_recv_from(stream, buffer, n);
}

Expand All @@ -710,26 +717,32 @@ static Janet cfun_stream_close(int32_t argc, Janet *argv) {
}

static Janet cfun_stream_write(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2);
janet_arity(argc, 2, 3);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_WRITABLE);
double to = janet_optnumber(argv, argc, 2, INFINITY);
if (janet_checktype(argv[1], JANET_BUFFER)) {
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_buffer(stream, janet_getbuffer(argv, 1), NULL);
} else {
JanetByteView bytes = janet_getbytes(argv, 1);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_stringlike(stream, bytes.bytes, NULL);
}
}

static Janet cfun_stream_send_to(int32_t argc, Janet *argv) {
janet_fixarity(argc, 3);
janet_arity(argc, 3, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_UDPSERVER);
void *dest = janet_getabstract(argv, 1, &AddressAT);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (janet_checktype(argv[2], JANET_BUFFER)) {
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_buffer(stream, janet_getbuffer(argv, 2), dest);
} else {
JanetByteView bytes = janet_getbytes(argv, 2);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_stringlike(stream, bytes.bytes, dest);
}
}
Expand Down Expand Up @@ -783,39 +796,44 @@ static const JanetReg net_cfuns[] = {
},
{
"net/accept", cfun_stream_accept,
JDOC("(net/accept stream)\n\n"
JDOC("(net/accept stream &opt timeout)\n\n"
"Get the next connection on a server stream. This would usually be called in a loop in a dedicated fiber. "
"Takes an optional timeout in seconds, after which will return nil. "
"Returns a new duplex stream which represents a connection to the client.")
},
{
"net/read", cfun_stream_read,
JDOC("(net/read stream nbytes &opt buf)\n\n"
JDOC("(net/read stream nbytes &opt buf timeout)\n\n"
"Read up to n bytes from a stream, suspending the current fiber until the bytes are available. "
"If less than n bytes are available (and more than 0), will push those bytes and return early. "
"Takes an optional timeout in seconds, after which will return nil. "
"Returns a buffer with up to n more bytes in it.")
},
{
"net/chunk", cfun_stream_chunk,
JDOC("(net/chunk stream nbytes &opt buf)\n\n"
"Same a net/read, but will wait for all n bytes to arrive rather than return early.")
JDOC("(net/chunk stream nbytes &opt buf timeout)\n\n"
"Same a net/read, but will wait for all n bytes to arrive rather than return early. "
"Takes an optional timeout in seconds, after which will return nil.")
},
{
"net/write", cfun_stream_write,
JDOC("(net/write stream data)\n\n"
JDOC("(net/write stream data &opt timeout)\n\n"
"Write data to a stream, suspending the current fiber until the write "
"completes. Returns stream.")
"completes. Takes an optional timeout in seconds, after which will return nil. "
"Returns stream.")
},
{
"net/send-to", cfun_stream_send_to,
JDOC("(net/send-to stream dest data)\n\n"
JDOC("(net/send-to stream dest data &opt timeout)\n\n"
"Writes a datagram to a server stream. dest is a the destination address of the packet. "
"Takes an optional timeout in seconds, after which will return nil. "
"Returns stream.")
},
{
"net/recv-from", cfun_stream_recv_from,
JDOC("(net/recv-from stream nbytes buf)\n\n"
JDOC("(net/recv-from stream nbytes buf &opt timoeut)\n\n"
"Receives data from a server stream and puts it into a buffer. Returns the socket-address the "
"packet came from.")
"packet came from. Takes an optional timeout in seconds, after which will return nil.")
},
{
"net/flush", cfun_stream_flush,
Expand Down
4 changes: 4 additions & 0 deletions src/core/vm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,10 @@ static JanetSignal janet_continue_no_check(JanetFiber *fiber, Janet in, Janet *o

JanetFiberStatus old_status = janet_fiber_status(fiber);

#ifdef JANET_EV
janet_cancel(fiber);
#endif

/* Continue child fiber if it exists */
if (fiber->child) {
if (janet_vm_root_fiber == NULL) janet_vm_root_fiber = fiber;
Expand Down
4 changes: 4 additions & 0 deletions src/include/janet.h
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,10 @@ JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListene
/* Shorthand for yielding to event loop in C */
JANET_NO_RETURN JANET_API void janet_await(void);

/* Cancel a waiting fiber. Will notify the canceled state machines, but will not
* unwind the fiber. */
void janet_cancel(JanetFiber *fiber);

/* For use inside listeners - adds a timeout to the current fiber, such that
* it will be resumed after sec seconds if no other event schedules the current fiber. */
void janet_addtimeout(double sec);
Expand Down
2 changes: 1 addition & 1 deletion test/suite8.janet
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ neldb\0\0\0\xD8\x05printG\x01\0\xDE\xDE\xDE'\x03\0marshal_tes/\x02
# No segfault, valgrind clean.

(def x @"\xCC\xCD.nd\x80\0\r\x1C\xCDg!\0\x07\xCC\xCD\r\x1Ce\x10\0\r;\xCDb\x04\xFF9\xFF\x80\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04uu\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\0\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04}\x04\x04\x04\x04\x04\x04\x04\x04#\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\0\x01\0\0\x03\x04\x04\x04\xE2\x03\x04\x04\x04\x04\x04\x04\x04\x04\x04\x14\x1A\x04\x04\x04\x04\x04\x18\x04\x04!\x04\xE2\x03\x04\x04\x04\x04\x04\x04$\x04\x04\x04\x04\x04\x04\x04\x04\x04\x80\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04A\0\0\0\x03\0\0!\xBF\xFF")
(unmarshal x load-image-dict)
(assert-error "bad fiber status" (unmarshal x load-image-dict))
(gccollect)
(marshal x make-image-dict)

Expand Down

0 comments on commit 553b4d9

Please sign in to comment.