Skip to content

Commit

Permalink
Merge pull request #166 from albertnetymk/trace-future-value
Browse files Browse the repository at this point in the history
Fix tracing for future value.
  • Loading branch information
albertnetymk committed May 18, 2015
2 parents 05b4201 + 89950cd commit a47e15a
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 51 deletions.
24 changes: 24 additions & 0 deletions src/runtime/encore/encore.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ static void actor_resume_context(encore_actor_t* actor, ucontext_t *ctx)
}
setcontext(ctx);
assert(0);
exit(-1);

#endif
}
Expand Down Expand Up @@ -314,6 +315,29 @@ bool gc_disabled()
return actor->suspend_counter > 0 || actor->await_counter > 0;
}

void post_gc_mark(gc_t* gc)
{
encore_actor_t *actor = (encore_actor_t*) actor_current();
assert(actor);
future_t *prev = NULL;
future_t *cur = actor->my_future;
future_t *next;
while(cur) {
next = future_get_next(cur);
if (objectmap_getobject(&gc->local, cur) == NULL) {
if (prev == NULL) {
actor->my_future = next;
} else {
future_set_next(prev, next);
}
future_finalizer(cur);
} else {
prev = cur;
}
cur = next;
}
}

encore_actor_t *encore_create(pony_type_t *type)
{
encore_actor_t *new = (encore_actor_t *)pony_create(type);
Expand Down
1 change: 1 addition & 0 deletions src/runtime/encore/encore.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct encore_actor
bool resume;
int await_counter;
int suspend_counter;
future_t *my_future;
pthread_mutex_t *lock;
#ifndef LAZY_IMPL
ucontext_t ctx;
Expand Down
94 changes: 46 additions & 48 deletions src/runtime/future/future.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,25 @@
extern pony_actor_t *actor_current();
extern pony_actor_t* task_runner_current();

extern void pony_gc_acquire();

typedef struct actor_entry actor_entry_t;
typedef struct closure_entry closure_entry_t;
typedef struct message_entry message_entry_t;
typedef enum responsibility_t responsibility_t;

// Terminology:
// Producer -- the actor responsible for fulfilling a future
// Consumer -- an non-producer actor using a future

enum responsibility_t
typedef enum responsibility_t
{
// A task closure, should be run by any task runner
TASK_CLOSURE,
// A closure that should be run by the producer
DETACHED_CLOSURE,
// A message blocked on this future
BLOCKED_MESSAGE
};
} responsibility_t;

struct closure_entry
{
Expand Down Expand Up @@ -79,7 +80,6 @@ struct future
encore_arg_t value;
pony_type_t *type;
bool fulfilled;
bool gc_recv;
// Stupid limitation for now
actor_entry_t responsibilities[16];
int no_responsibilities;
Expand All @@ -88,17 +88,12 @@ struct future
future_t *parent;
closure_entry_t *children;
actor_list *awaited_actors;
future_t *next;
};

static inline void future_gc_send_value(future_t *fut);
static inline void future_gc_recv_value(future_t *fut);

// TODO call this finalizer in startgc block in actor_run
static void final_fn(void *p)
{
future_gc_recv_value(p);
}

pony_type_t future_type = {
ID_FUTURE,
sizeof(struct future),
Expand Down Expand Up @@ -148,17 +143,31 @@ void future_trace(void* p)
// if (fut->parent) pony_traceobject(fut->parent, future_trace);
}

static inline void future_gc_trace_value(future_t *fut)
{
if (fut->type == ENCORE_ACTIVE) {
pony_traceactor(fut->value.p);
} else if (fut->type != ENCORE_PRIMITIVE) {
pony_traceobject(fut->value.p, fut->type->trace);
}
}

// ===============================================================
// Create, inspect and fulfil
// ===============================================================
future_t *future_mk(pony_type_t *type)
{
perr("future_mk");
encore_actor_t *actor = (encore_actor_t *) actor_current();
assert(actor);

future_t *fut = encore_alloc(sizeof(future_t));
*fut = (future_t) { .type = type };

pthread_mutex_init(&fut->lock, NULL);

fut->next = actor->my_future;
actor->my_future = fut;

return fut;
}

Expand Down Expand Up @@ -200,15 +209,18 @@ void future_fulfil(future_t *fut, encore_arg_t value)
// Responsabilities: actors that were blocked (unfulfilled future) and should be scheduled to continue
for (int i = 0; i < fut->no_responsibilities; ++i) {
actor_entry_t e = fut->responsibilities[i];
switch (e.type)
{
switch (e.type) {
case BLOCKED_MESSAGE:
{
perr("Unblocking");
actor_set_resume((encore_actor_t*)e.message.actor);
pony_schedule_first(e.message.actor);
break;
}
perr("Unblocking");
actor_set_resume((encore_actor_t*)e.message.actor);
pony_schedule_first(e.message.actor);
break;
case DETACHED_CLOSURE:
assert(0);
exit(-1);
case TASK_CLOSURE:
assert(0);
exit(-1);
}
}

Expand Down Expand Up @@ -270,6 +282,12 @@ void future_fulfil(future_t *fut, encore_arg_t value)
UNBLOCK;
}

static void acquire_future_value(future_t *fut)
{
pony_gc_acquire();
future_gc_trace_value(fut);
}

// ===============================================================
// Means for actors to get, block and chain
// ===============================================================
Expand All @@ -279,10 +297,7 @@ encore_arg_t future_get_actor(future_t *fut)
future_block_actor(fut);
}

if (!fut->gc_recv) {
fut->gc_recv = true;
future_gc_recv_value(fut);
}
acquire_future_value(fut);

return fut->value;

Expand Down Expand Up @@ -374,36 +389,19 @@ void future_await(future_t *fut)
actor_await(&ctx);
}

// FIXME: better type for this
void future_await_resume(void *argv)
void future_finalizer(future_t *fut)
{
ctx_wrapper *d = ((encore_arg_t *)argv)[0].p;
ucontext_t* ctx = d->ctx;
ctx->uc_link = d->uc_link;

future_t *fut = ((encore_arg_t *)argv)[1].p;

if (future_fulfilled(fut))
{
/// actor_set_run_to_completion(actor);

assert(swapcontext(ctx->uc_link, ctx) == 0);
future_gc_recv_value(fut);
}

/// if (actor_run_to_completion(actor)) {
/// reclaim_page(actor);
/// }
} else {
// pony_sendv(actor_current(), FUT_MSG_AWAIT, 2, argv);
}
future_t * future_get_next(future_t* fut)
{
return fut->next;
}

static inline void future_gc_trace_value(future_t *fut)
future_t * future_set_next(future_t* fut, future_t *next)
{
if (fut->type == ENCORE_ACTIVE) {
pony_traceactor(fut->value.p);
} else if (fut->type != ENCORE_PRIMITIVE) {
pony_traceobject(fut->value.p, fut->type->trace);
}
return fut->next = next;
}

static inline void future_gc_send_value(future_t *fut)
Expand Down
3 changes: 3 additions & 0 deletions src/runtime/future/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ void future_await(future_t *);
void future_await_resume(void *);
encore_arg_t run_closure(closure_t *c, encore_arg_t value);

void future_finalizer(future_t *fut);
future_t * future_get_next(future_t* fut);
future_t * future_set_next(future_t* fut, future_t *next);
#endif
7 changes: 5 additions & 2 deletions src/runtime/pony/premake4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ solution "ponyrt"
"-pthread",
"-std=gnu11",
"-fms-extensions",
"-march=native"
"-march=native",

"-Wno-unused-variable",
"-Wno-unused-function",
}

linkoptions {
Expand Down Expand Up @@ -87,7 +90,7 @@ solution "ponyrt"

include("src/")
-- include("examples/")
include("utils/")
-- include("utils/")
-- include("test/")

project "closure"
Expand Down
37 changes: 37 additions & 0 deletions src/runtime/pony/src/gc/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string.h>
#include <assert.h>

extern void post_gc_mark(gc_t* gc);
static __pony_thread_local actormap_t acquire;

static void acquire_actor(pony_actor_t* actor)
Expand Down Expand Up @@ -37,6 +38,41 @@ static void current_actor_dec(gc_t* gc)
}
}

void gc_acquireactor(pony_actor_t *current, gc_t *gc, pony_actor_t *actor)
{
if (current != actor) {
actorref_t* aref = actormap_getorput(&acquire, actor, 0);
if(!actorref_marked(aref, gc->mark)) {
actorref_mark(aref, gc->mark);
actorref_inc(aref);
}
}
}

void gc_acquireobject(pony_actor_t* current, heap_t* heap, gc_t* gc,
void* p, pony_trace_fn f)
{
chunk_t* chunk = (chunk_t*)pagemap_get(p);

// don't gc memory that wasn't pony_allocated
if(chunk == NULL)
return;

pony_actor_t* actor = heap_owner(chunk);
if (current != actor) {
heap_base(chunk, &p);
actorref_t* aref = actormap_getorput(&acquire, actor, 0);
object_t* obj = actorref_getorput(aref, p, 0);
if(!object_marked(obj, gc->mark)) {
object_mark(obj, gc->mark);
object_inc(obj);
if (f) {
f(p);
}
}
}
}

void gc_sendobject(pony_actor_t* current, heap_t* heap, gc_t* gc,
void* p, pony_trace_fn f)
{
Expand Down Expand Up @@ -272,6 +308,7 @@ void gc_createactor(gc_t* gc, pony_actor_t* actor)
void gc_mark(gc_t* gc)
{
objectmap_mark(&gc->local);
post_gc_mark(gc);
}

void gc_sweep(gc_t* gc)
Expand Down
5 changes: 5 additions & 0 deletions src/runtime/pony/src/gc/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@ void gc_recvobject(pony_actor_t* current, heap_t* heap, gc_t* gc,
void gc_markobject(pony_actor_t* current, heap_t* heap, gc_t* gc,
void* p, pony_trace_fn f);

void gc_acquireobject(pony_actor_t* current, heap_t* heap, gc_t* gc,
void* p, pony_trace_fn f);

void gc_sendactor(pony_actor_t* current, gc_t* gc, pony_actor_t* actor);

void gc_recvactor(pony_actor_t* current, gc_t* gc, pony_actor_t* actor);

void gc_markactor(pony_actor_t* current, gc_t* gc, pony_actor_t* actor);

void gc_acquireactor(pony_actor_t *current, gc_t *gc, pony_actor_t *actor);

void gc_createactor(gc_t* gc, pony_actor_t* actor);

void gc_mark(gc_t* gc);
Expand Down
6 changes: 6 additions & 0 deletions src/runtime/pony/src/gc/trace.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ void pony_gc_mark()
trace_actor = gc_markactor;
}

void pony_gc_acquire()
{
trace_object = gc_acquireobject;
trace_actor = gc_acquireactor;
}

void pony_send_done()
{
pony_actor_t* actor = actor_current();
Expand Down
1 change: 1 addition & 0 deletions src/runtime/task/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include "encore.h"

struct encore_task_s {
Expand Down
8 changes: 7 additions & 1 deletion src/tests/encore/basic/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ passed=0
failed=0
failed_list=()

for prog in "$@"
skipped=(largestream async_force_gc async_foreach)
progs="$@"
for rm in "${skipped[@]}"
do
progs=(${progs[@]/$rm})
done
for prog in "${progs[@]}"
do
if run_test $prog
then
Expand Down

0 comments on commit a47e15a

Please sign in to comment.