diff --git a/src/uct/cuda/cuda_copy/cuda_copy_ep.c b/src/uct/cuda/cuda_copy/cuda_copy_ep.c index ed56e70e286..09218c80bb0 100644 --- a/src/uct/cuda/cuda_copy/cuda_copy_ep.c +++ b/src/uct/cuda/cuda_copy/cuda_copy_ep.c @@ -55,16 +55,16 @@ ucs_status_t uct_cuda_copy_init_stream(CUstream *stream) } static UCS_F_ALWAYS_INLINE CUstream * -uct_cuda_copy_get_stream(uct_cuda_copy_iface_t *iface, +uct_cuda_copy_get_stream(uct_cuda_copy_per_ctx_rsc_t *ctx_rsc, ucs_memory_type_t src_type, ucs_memory_type_t dst_type) { - CUstream *stream = NULL; + CUstream *stream; ucs_status_t status; ucs_assert((src_type < UCS_MEMORY_TYPE_LAST) && (dst_type < UCS_MEMORY_TYPE_LAST)); - stream = &iface->queue_desc[src_type][dst_type].stream; + stream = &ctx_rsc->queue_desc[src_type][dst_type].stream; status = uct_cuda_copy_init_stream(stream); if (status != UCS_OK) { @@ -97,6 +97,38 @@ uct_cuda_copy_get_mem_type(uct_md_h md, void *address, size_t length) return mem_info.type; } +static UCS_F_ALWAYS_INLINE ucs_status_t +uct_cuda_copy_get_ctx_rsc(uct_cuda_copy_iface_t *iface, + uct_cuda_copy_per_ctx_rsc_t **ctx_rsc) +{ + CUcontext current_ctx; + ucs_status_t status; + + status = UCT_CUDADRV_FUNC_LOG_ERR(cuCtxGetCurrent(¤t_ctx)); + if (status != UCS_OK) { + return status; + } else if (current_ctx == NULL) { + ucs_error("no context bound to calling thread"); + return UCS_ERR_IO_ERROR; + } + + return uct_cuda_copy_get_per_ctx_rscs(iface, current_ctx, ctx_rsc); +} + +static UCS_F_ALWAYS_INLINE ucs_status_t +uct_cuda_copy_get_short_stream(uct_cuda_copy_iface_t *iface, + uct_cuda_copy_per_ctx_rsc_t **ctx_rsc) +{ + ucs_status_t status; + + status = uct_cuda_copy_get_ctx_rsc(iface, ctx_rsc); + if (status != UCS_OK) { + return status; + } + + return uct_cuda_copy_init_stream(&(*ctx_rsc)->short_stream); +} + static UCS_F_ALWAYS_INLINE ucs_status_t uct_cuda_copy_post_cuda_async_copy(uct_ep_h tl_ep, void *dst, void *src, size_t length, uct_completion_t *comp) @@ -110,25 +142,22 @@ uct_cuda_copy_post_cuda_async_copy(uct_ep_h tl_ep, void *dst, void *src, ucs_memory_type_t dst_type; CUstream *stream; ucs_queue_head_t *event_q; + uct_cuda_copy_per_ctx_rsc_t *ctx_rsc; if (!length) { return UCS_OK; } - /* ensure context is set before creating events/streams */ - if (iface->cuda_context == NULL) { - UCT_CUDADRV_FUNC_LOG_ERR(cuCtxGetCurrent(&iface->cuda_context)); - if (iface->cuda_context == NULL) { - ucs_error("attempt to perform cuda memcpy without active context"); - return UCS_ERR_IO_ERROR; - } + status = uct_cuda_copy_get_ctx_rsc(iface, &ctx_rsc); + if (status != UCS_OK) { + return status; } src_type = uct_cuda_copy_get_mem_type(base_iface->md, src, length); dst_type = uct_cuda_copy_get_mem_type(base_iface->md, dst, length); - q_desc = &iface->queue_desc[src_type][dst_type]; + q_desc = &ctx_rsc->queue_desc[src_type][dst_type]; event_q = &q_desc->event_queue; - stream = uct_cuda_copy_get_stream(iface, src_type, dst_type); + stream = uct_cuda_copy_get_stream(ctx_rsc, src_type, dst_type); if (stream == NULL) { ucs_error("stream for src %s dst %s not available", ucs_memory_type_names[src_type], @@ -136,7 +165,7 @@ uct_cuda_copy_post_cuda_async_copy(uct_ep_h tl_ep, void *dst, void *src, return UCS_ERR_IO_ERROR; } - cuda_event = ucs_mpool_get(&iface->cuda_event_desc); + cuda_event = ucs_mpool_get(&ctx_rsc->cuda_event_desc); if (ucs_unlikely(cuda_event == NULL)) { ucs_error("Failed to allocate cuda event object"); return UCS_ERR_NO_MEMORY; @@ -215,18 +244,18 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_copy_ep_put_short, uint64_t remote_addr, uct_rkey_t rkey) { uct_cuda_copy_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_cuda_copy_iface_t); - CUstream *stream = &iface->short_stream; ucs_status_t status; + uct_cuda_copy_per_ctx_rsc_t *ctx_rsc; - status = uct_cuda_copy_init_stream(stream); + status = uct_cuda_copy_get_short_stream(iface, &ctx_rsc); if (status != UCS_OK) { return status; } UCT_CUDADRV_FUNC_LOG_ERR(cuMemcpyAsync((CUdeviceptr)remote_addr, (CUdeviceptr)buffer, length, - *stream)); - status = UCT_CUDADRV_FUNC_LOG_ERR(cuStreamSynchronize(*stream)); + ctx_rsc->short_stream)); + status = UCT_CUDADRV_FUNC_LOG_ERR(cuStreamSynchronize(ctx_rsc->short_stream)); UCT_TL_EP_STAT_OP(ucs_derived_of(tl_ep, uct_base_ep_t), PUT, SHORT, length); ucs_trace_data("PUT_SHORT size %d from %p to %p", @@ -240,18 +269,18 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_copy_ep_get_short, uint64_t remote_addr, uct_rkey_t rkey) { uct_cuda_copy_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_cuda_copy_iface_t); - CUstream *stream = &iface->short_stream; ucs_status_t status; + uct_cuda_copy_per_ctx_rsc_t *ctx_rsc; - status = uct_cuda_copy_init_stream(stream); + status = uct_cuda_copy_get_short_stream(iface, &ctx_rsc); if (status != UCS_OK) { return status; } UCT_CUDADRV_FUNC_LOG_ERR(cuMemcpyAsync((CUdeviceptr)buffer, (CUdeviceptr)remote_addr, length, - *stream)); - status = UCT_CUDADRV_FUNC_LOG_ERR(cuStreamSynchronize(*stream)); + ctx_rsc->short_stream)); + status = UCT_CUDADRV_FUNC_LOG_ERR(cuStreamSynchronize(ctx_rsc->short_stream)); UCT_TL_EP_STAT_OP(ucs_derived_of(tl_ep, uct_base_ep_t), GET, SHORT, length); ucs_trace_data("GET_SHORT size %d from %p to %p", diff --git a/src/uct/cuda/cuda_copy/cuda_copy_iface.c b/src/uct/cuda/cuda_copy/cuda_copy_iface.c index e9c70fb35f0..7b98d2a6fe9 100644 --- a/src/uct/cuda/cuda_copy/cuda_copy_iface.c +++ b/src/uct/cuda/cuda_copy/cuda_copy_iface.c @@ -131,33 +131,6 @@ static ucs_status_t uct_cuda_copy_iface_query(uct_iface_h tl_iface, return UCS_OK; } -static ucs_status_t uct_cuda_copy_sync_streams(uct_cuda_copy_iface_t *iface) -{ - CUstream stream; - uint32_t stream_index; - ucs_memory_type_t src_mem_type, dst_mem_type; - ucs_status_t status; - - UCS_STATIC_BITMAP_FOR_EACH_BIT(stream_index, &iface->streams_to_sync) { - src_mem_type = stream_index / UCS_MEMORY_TYPE_LAST; - if ((src_mem_type >= UCS_MEMORY_TYPE_LAST)) { - break; - } - - dst_mem_type = stream_index % UCS_MEMORY_TYPE_LAST; - stream = iface->queue_desc[src_mem_type][dst_mem_type].stream; - status = UCT_CUDADRV_FUNC_LOG_ERR(cuStreamSynchronize(stream)); - if (status != UCS_OK) { - return status; - } - - UCS_STATIC_BITMAP_RESET(&iface->streams_to_sync, - uct_cuda_copy_flush_bitmap_idx(src_mem_type, - dst_mem_type)); - } - - return UCS_OK; -} static ucs_status_t uct_cuda_copy_iface_flush(uct_iface_h tl_iface, unsigned flags, uct_completion_t *comp) @@ -165,16 +138,11 @@ static ucs_status_t uct_cuda_copy_iface_flush(uct_iface_h tl_iface, unsigned fla uct_cuda_copy_iface_t *iface = ucs_derived_of(tl_iface, uct_cuda_copy_iface_t); uct_cuda_copy_queue_desc_t *q_desc; ucs_queue_iter_t iter; - ucs_status_t status; - + if (comp != NULL) { return UCS_ERR_UNSUPPORTED; } - - status = uct_cuda_copy_sync_streams(iface); - if (status != UCS_OK) { - return status; - } + ucs_queue_for_each_safe(q_desc, iter, &iface->active_queue, queue) { if (!ucs_queue_is_empty(&q_desc->event_queue)) { @@ -306,15 +274,6 @@ static ucs_status_t uct_cuda_copy_iface_event_fd_arm(uct_iface_h tl_iface, static ucs_status_t uct_cuda_copy_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp) { - uct_cuda_copy_iface_t *iface = ucs_derived_of(tl_ep->iface, - uct_cuda_copy_iface_t); - ucs_status_t status; - - status = uct_cuda_copy_sync_streams(iface); - if (status != UCS_OK) { - return status; - } - return uct_base_ep_flush(tl_ep, flags, comp); } @@ -356,20 +315,6 @@ static void uct_cuda_copy_event_desc_init(ucs_mpool_t *mp, void *obj, void *chun } } -static void uct_cuda_copy_event_desc_cleanup(ucs_mpool_t *mp, void *obj) -{ - uct_cuda_copy_event_desc_t *base = (uct_cuda_copy_event_desc_t *) obj; - uct_cuda_copy_iface_t *iface = ucs_container_of(mp, - uct_cuda_copy_iface_t, - cuda_event_desc); - CUcontext cuda_context; - - UCT_CUDADRV_FUNC_LOG_ERR(cuCtxGetCurrent(&cuda_context)); - if (uct_cuda_base_context_match(cuda_context, iface->cuda_context)) { - UCT_CUDADRV_FUNC_LOG_ERR(cuEventDestroy(base->event)); - } -} - static ucs_status_t uct_cuda_copy_estimate_perf(uct_iface_h tl_iface, uct_perf_attr_t *perf_attr) { @@ -441,7 +386,7 @@ static ucs_mpool_ops_t uct_cuda_copy_event_desc_mpool_ops = { .chunk_alloc = ucs_mpool_chunk_malloc, .chunk_release = ucs_mpool_chunk_free, .obj_init = uct_cuda_copy_event_desc_init, - .obj_cleanup = uct_cuda_copy_event_desc_cleanup, + .obj_cleanup = ucs_empty_function, .obj_str = NULL }; @@ -455,6 +400,94 @@ static uct_iface_internal_ops_t uct_cuda_copy_iface_internal_ops = { .ep_is_connected = uct_base_ep_is_connected }; +static ucs_status_t +uct_cuda_copy_init_per_ctx_rscs(const uct_cuda_copy_iface_t *iface, + CUcontext cuda_ctx, + unsigned long long ctx_id, + uct_cuda_copy_per_ctx_rsc_t *ctx_rsc) +{ + ucs_status_t status; + ucs_mpool_params_t mp_params; + ucs_memory_type_t src, dst; + + ucs_mpool_params_reset(&mp_params); + mp_params.elem_size = sizeof(uct_cuda_copy_event_desc_t); + mp_params.elems_per_chunk = 128; + mp_params.priv_size = sizeof(CUcontext); + mp_params.max_elems = iface->config.max_cuda_events; + mp_params.ops = &uct_cuda_copy_event_desc_mpool_ops; + mp_params.name = "CUDA EVENT objects"; + status = ucs_mpool_init(&mp_params, &ctx_rsc->cuda_event_desc); + if (UCS_OK != status) { + ucs_error("mpool creation failed"); + return UCS_ERR_IO_ERROR; + } + + ucs_memory_type_for_each(src) { + ucs_memory_type_for_each(dst) { + ctx_rsc->queue_desc[src][dst].stream = 0; + ucs_queue_head_init(&ctx_rsc->queue_desc[src][dst].event_queue); + } + } + + ctx_rsc->short_stream = 0; + ctx_rsc->cuda_ctx = cuda_ctx; + ctx_rsc->ctx_id = ctx_id; + + return UCS_OK; +} + +ucs_status_t +uct_cuda_copy_get_per_ctx_rscs(uct_cuda_copy_iface_t *iface, CUcontext cuda_ctx, + uct_cuda_copy_per_ctx_rsc_t **ctx_rsc_p) +{ + uct_cuda_copy_per_ctx_rsc_t *ctx_rsc; + ucs_status_t status; + khiter_t iter; + int ret; + unsigned long long ctx_id; + + status = UCT_CUDADRV_FUNC_LOG_ERR(cuCtxGetId(cuda_ctx, &ctx_id)); + if (status != UCS_OK) { + goto err; + } + + iter = kh_put(cuda_copy_ctx_rscs, &iface->ctx_rscs, ctx_id, &ret); + if (ret == UCS_KH_PUT_FAILED) { + ucs_error("cannot allocate hash entry"); + status = UCS_ERR_NO_MEMORY; + goto err; + } + + if (ret == UCS_KH_PUT_KEY_PRESENT) { + ctx_rsc = kh_value(&iface->ctx_rscs, iter); + } else { + ctx_rsc = ucs_malloc(sizeof(*ctx_rsc), "cuda_copy_per_ctx_rsc"); + if (ctx_rsc == NULL) { + ucs_error("failed to allocate per context resource struct"); + status = UCS_ERR_NO_MEMORY; + goto err_kh_del; + } + + status = uct_cuda_copy_init_per_ctx_rscs(iface, cuda_ctx, ctx_id, ctx_rsc); + if (status != UCS_OK) { + goto err_free_ctx; + } + + kh_value(&iface->ctx_rscs, iter) = ctx_rsc; + } + + *ctx_rsc_p = ctx_rsc; + return UCS_OK; + +err_free_ctx: + ucs_free(ctx_rsc); +err_kh_del: + kh_del(cuda_copy_ctx_rscs, &iface->ctx_rscs, iter); +err: + return status; +} + static UCS_CLASS_INIT_FUNC(uct_cuda_copy_iface_t, uct_md_h md, uct_worker_h worker, const uct_iface_params_t *params, const uct_iface_config_t *tl_config) @@ -462,8 +495,6 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_copy_iface_t, uct_md_h md, uct_worker_h work uct_cuda_copy_iface_config_t *config = ucs_derived_of(tl_config, uct_cuda_copy_iface_config_t); ucs_status_t status; - ucs_memory_type_t src, dst; - ucs_mpool_params_t mp_params; UCS_CLASS_CALL_SUPER_INIT(uct_cuda_iface_t, &uct_cuda_copy_iface_ops, &uct_cuda_copy_iface_internal_ops, md, worker, @@ -480,69 +511,27 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_copy_iface_t, uct_md_h md, uct_worker_h work self->config.bandwidth = config->bandwidth; UCS_STATIC_BITMAP_RESET_ALL(&self->streams_to_sync); - ucs_mpool_params_reset(&mp_params); - mp_params.elem_size = sizeof(uct_cuda_copy_event_desc_t); - mp_params.elems_per_chunk = 128; - mp_params.max_elems = self->config.max_cuda_events; - mp_params.ops = &uct_cuda_copy_event_desc_mpool_ops; - mp_params.name = "CUDA EVENT objects"; - status = ucs_mpool_init(&mp_params, &self->cuda_event_desc); - if (UCS_OK != status) { - ucs_error("mpool creation failed"); - return UCS_ERR_IO_ERROR; - } - - ucs_queue_head_init(&self->active_queue); - ucs_memory_type_for_each(src) { - ucs_memory_type_for_each(dst) { - self->queue_desc[src][dst].stream = 0; - ucs_queue_head_init(&self->queue_desc[src][dst].event_queue); - } - } + kh_init_inplace(cuda_copy_ctx_rscs, &self->ctx_rscs); - self->short_stream = 0; - self->cuda_context = 0; + ucs_queue_head_init(&self->active_queue); return UCS_OK; } static UCS_CLASS_CLEANUP_FUNC(uct_cuda_copy_iface_t) { - CUstream *stream; - CUcontext cuda_context; - ucs_queue_head_t *event_q; - ucs_memory_type_t src, dst; + uct_cuda_copy_per_ctx_rsc_t *ctx_rsc; uct_base_iface_progress_disable(&self->super.super.super, UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); - UCT_CUDADRV_FUNC_LOG_ERR(cuCtxGetCurrent(&cuda_context)); - if (uct_cuda_base_context_match(cuda_context, self->cuda_context)) { - - ucs_memory_type_for_each(src) { - ucs_memory_type_for_each(dst) { - stream = &self->queue_desc[src][dst].stream; - event_q = &self->queue_desc[src][dst].event_queue; - - if (!ucs_queue_is_empty(event_q)) { - ucs_warn("stream destroyed but queue not empty"); - } - - if (*stream == 0) { - continue; - } - - UCT_CUDADRV_FUNC_LOG_ERR(cuStreamDestroy(*stream)); - } - } - - if (self->short_stream) { - UCT_CUDADRV_FUNC_LOG_ERR(cuStreamDestroy(self->short_stream)); - } - } + kh_foreach_value(&self->ctx_rscs, ctx_rsc, { + ucs_mpool_cleanup(&ctx_rsc->cuda_event_desc, 1); + ucs_free(ctx_rsc); + }); - ucs_mpool_cleanup(&self->cuda_event_desc, 1); + kh_destroy_inplace(cuda_copy_ctx_rscs, &self->ctx_rscs); } UCS_CLASS_DEFINE(uct_cuda_copy_iface_t, uct_cuda_iface_t); diff --git a/src/uct/cuda/cuda_copy/cuda_copy_iface.h b/src/uct/cuda/cuda_copy/cuda_copy_iface.h index 700345d22a2..30b4085742e 100644 --- a/src/uct/cuda/cuda_copy/cuda_copy_iface.h +++ b/src/uct/cuda/cuda_copy/cuda_copy_iface.h @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -48,23 +49,30 @@ typedef struct uct_cuda_copy_queue_desc { ucs_queue_elem_t queue; } uct_cuda_copy_queue_desc_t; +typedef struct uct_cuda_copy_per_ctx_rsc { + CUcontext cuda_ctx; + unsigned long long ctx_id; + /* pool of cuda events to check completion of memcpy operations */ + ucs_mpool_t cuda_event_desc; + /* stream used to issue short operations */ + CUstream short_stream; + /* array of queue descriptors for each src/dst memory type combination */ + uct_cuda_copy_queue_desc_t queue_desc[UCS_MEMORY_TYPE_LAST][UCS_MEMORY_TYPE_LAST]; +} uct_cuda_copy_per_ctx_rsc_t; + + +KHASH_MAP_INIT_INT64(cuda_copy_ctx_rscs, struct uct_cuda_copy_per_ctx_rsc*); typedef struct uct_cuda_copy_iface { uct_cuda_iface_t super; /* used to store uuid and check iface reachability */ uct_cuda_copy_iface_addr_t id; - /* pool of cuda events to check completion of memcpy operations */ - ucs_mpool_t cuda_event_desc; + /* per context resources */ + khash_t(cuda_copy_ctx_rscs) ctx_rscs; /* list of queues which require progress */ ucs_queue_head_t active_queue; - /* stream used to issue short operations */ - CUstream short_stream; /* fd to get event notifications */ int eventfd; - /* stream used to issue short operations */ - CUcontext cuda_context; - /* array of queue descriptors for each src/dst memory type combination */ - uct_cuda_copy_queue_desc_t queue_desc[UCS_MEMORY_TYPE_LAST][UCS_MEMORY_TYPE_LAST]; /* config parameters to control cuda copy transport */ struct { unsigned max_poll; @@ -105,4 +113,9 @@ uct_cuda_copy_flush_bitmap_idx(ucs_memory_type_t src_mem_type, return (src_mem_type * UCS_MEMORY_TYPE_LAST) + dst_mem_type; } + +ucs_status_t uct_cuda_copy_get_per_ctx_rscs(uct_cuda_copy_iface_t *iface, + CUcontext cuda_ctx, + uct_cuda_copy_per_ctx_rsc_t **ctx_rsc_p); + #endif diff --git a/src/uct/cuda/cuda_copy/cuda_copy_md.c b/src/uct/cuda/cuda_copy/cuda_copy_md.c index be600db31a7..530377f3d44 100644 --- a/src/uct/cuda/cuda_copy/cuda_copy_md.c +++ b/src/uct/cuda/cuda_copy/cuda_copy_md.c @@ -299,6 +299,7 @@ uct_cuda_copy_md_query_attributes(uct_cuda_copy_md_t *md, const void *address, uint32_t is_managed = 0; CUdevice cuda_device = -1; CUcontext cuda_mem_ctx = NULL; + CUcontext cuda_popped_ctx; CUpointer_attribute attr_type[UCT_CUDA_MEM_QUERY_NUM_ATTRS]; void *attr_data[UCT_CUDA_MEM_QUERY_NUM_ATTRS]; CUdeviceptr base_address; @@ -373,8 +374,14 @@ uct_cuda_copy_md_query_attributes(uct_cuda_copy_md_t *md, const void *address, goto out_default_range; } + /* GetAddressRange requires context to be set. On DGXA100 it takes 0.03 us + * to push and pop the context associated with address (which should be + * non-NULL if we are at this point)*/ + cuCtxPushCurrent(cuda_mem_ctx); + cu_err = cuMemGetAddressRange(&base_address, &alloc_length, (CUdeviceptr)address); + cuCtxPopCurrent(&cuda_popped_ctx); if (cu_err != CUDA_SUCCESS) { ucs_error("cuMemGetAddressRange(%p) error: %s", address, uct_cuda_base_cu_get_error_string(cu_err));