From b44729cc73638a8109bf6183a6afb0bbe1fe02d2 Mon Sep 17 00:00:00 2001 From: luzhexuan <635426116@qq.com> Date: Thu, 23 Jan 2025 16:28:47 +0800 Subject: [PATCH] [huawei] add osc kupl module --- config/ompi_check_kupl.m4 | 34 ++ ompi/mca/osc/kupl/Makefile.am | 44 ++ ompi/mca/osc/kupl/configure.m4 | 25 + ompi/mca/osc/kupl/osc_kupl.h | 257 +++++++++ ompi/mca/osc/kupl/osc_kupl_active_target.c | 309 +++++++++++ ompi/mca/osc/kupl/osc_kupl_comm.c | 441 +++++++++++++++ ompi/mca/osc/kupl/osc_kupl_component.c | 571 ++++++++++++++++++++ ompi/mca/osc/kupl/osc_kupl_passive_target.c | 269 +++++++++ 8 files changed, 1950 insertions(+) create mode 100644 config/ompi_check_kupl.m4 create mode 100644 ompi/mca/osc/kupl/Makefile.am create mode 100644 ompi/mca/osc/kupl/configure.m4 create mode 100644 ompi/mca/osc/kupl/osc_kupl.h create mode 100644 ompi/mca/osc/kupl/osc_kupl_active_target.c create mode 100644 ompi/mca/osc/kupl/osc_kupl_comm.c create mode 100644 ompi/mca/osc/kupl/osc_kupl_component.c create mode 100644 ompi/mca/osc/kupl/osc_kupl_passive_target.c diff --git a/config/ompi_check_kupl.m4 b/config/ompi_check_kupl.m4 new file mode 100644 index 00000000000..a9360ce7a4c --- /dev/null +++ b/config/ompi_check_kupl.m4 @@ -0,0 +1,34 @@ +# OPAL_CHECK_KUPL(prefix, [action-if-found], [action-if-not-found]) +# -------------------------------------------------------- +# check if KUPL support can be found. sets prefix_{CPPFLAGS, +# LDFLAGS, LIBS} as needed and runs action-if-found if there is +# support, otherwise executes action-if-not-found +AC_DEFUN([OPAL_CHECK_KUPL],[ + OPAL_VAR_SCOPE_PUSH([ompi_check_kupl_happy max_md_size max_va_size]) + + AC_ARG_WITH([kupl], + [AS_HELP_STRING([--with-kupl(=DIR)], + [Build kupl support, optionally adding DIR/include, DIR/lib, and DIR/lib64 to the search path for headers and libraries])]) + AC_ARG_WITH([kupl-libdir], + [AS_HELP_STRING([--with-kupl-libdir=DIR], + [Search for kupl libraries in DIR])]) + + OAC_CHECK_PACKAGE([kupl], + [$1], + [kupl.h], + [-lkupl -lstdc++], + [kupl_get_num_executors], + [ompi_check_kupl_happy="yes"], + [ompi_check_kupl_happy="no"]) + + OPAL_SUMMARY_ADD([Transports], [Portals4], [], [${$1_SUMMARY}]) + + AS_IF([test "$ompi_check_kupl_happy" = "yes"], + [$2], + [AS_IF([test ! -z "$with_kupl" && test "$with_kupl" != "no"], + [AC_MSG_ERROR([kupl support requested but not found. Aborting])]) + $3]) + + OPAL_VAR_SCOPE_POP +])dnl + diff --git a/ompi/mca/osc/kupl/Makefile.am b/ompi/mca/osc/kupl/Makefile.am new file mode 100644 index 00000000000..18ada9bc89c --- /dev/null +++ b/ompi/mca/osc/kupl/Makefile.am @@ -0,0 +1,44 @@ +# +# Copyright (c) 2011 Sandia National Laboratories. All rights reserved. +# Copyright (c) 2017 IBM Corporation. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +EXTRA_DIST = + +kupl_sources = \ + osc_kupl.h \ + osc_kupl_comm.c \ + osc_kupl_component.c \ + osc_kupl_active_target.c \ + osc_kupl_passive_target.c + +AM_CPPFLAGS = $(osc_kupl_CPPFLAGS) + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_ompi_osc_kupl_DSO +component_noinst = +component_install = mca_osc_kupl.la +else +component_noinst = libmca_osc_kupl.la +component_install = +endif + +mcacomponentdir = $(pkglibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_osc_kupl_la_SOURCES = $(kupl_sources) +mca_osc_kupl_la_LIBADD = $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(osc_kupl_LIBS) +mca_osc_kupl_la_LDFLAGS = -module -avoid-version $(osc_kupl_LDFLAGS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_osc_kupl_la_SOURCES = $(kupl_sources) +libmca_osc_kupl_la_LIBADD = $(osc_kupl_LIBS) +libmca_osc_kupl_la_LDFLAGS = -module -avoid-version $(osc_kupl_LDFLAGS) diff --git a/ompi/mca/osc/kupl/configure.m4 b/ompi/mca/osc/kupl/configure.m4 new file mode 100644 index 00000000000..e22b8662a9f --- /dev/null +++ b/ompi/mca/osc/kupl/configure.m4 @@ -0,0 +1,25 @@ +# MCA_opal_shmem_kupl_CONFIG(action-if-can-compile, +# [action-if-cant-compile]) +# ------------------------------------------------ +AC_DEFUN([MCA_ompi_osc_kupl_CONFIG],[ + AC_CONFIG_FILES([ompi/mca/osc/kupl/Makefile]) + + OPAL_CHECK_KUPL([osc_kupl], + [osc_kupl_happy="yes"], + [osc_kupl_happy="no"]) + + AS_IF([test "$osc_kupl_happy" = "yes"], + [osc_kupl_WRAPPER_EXTRA_LDFLAGS="$osc_kupl_LDFLAGS" + osc_kupl_WRAPPER_EXTRA_LIBS="$osc_kupl_LIBS" + $1], + [$2]) + + # need to propagate CPPFLAGS to all of OMPI + AS_IF([test "$DIRECT_osc" = "kupl"], + [CPPFLAGS="$CPPFLAGS $osc_kupl_CPPFLAGS"]) + + # substitute in the things needed to build kupl + AC_SUBST([osc_kupl_CPPFLAGS]) + AC_SUBST([osc_kupl_LDFLAGS]) + AC_SUBST([osc_kupl_LIBS]) +])dnl diff --git a/ompi/mca/osc/kupl/osc_kupl.h b/ompi/mca/osc/kupl/osc_kupl.h new file mode 100644 index 00000000000..d990d292a56 --- /dev/null +++ b/ompi/mca/osc/kupl/osc_kupl.h @@ -0,0 +1,257 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2012 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2015-2017 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2016-2017 IBM Corporation. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OSC_KUPL_KUPL_H +#define OSC_KUPL_KUPL_H + +#include "opal/mca/shmem/base/base.h" +#include + +typedef uint64_t osc_kupl_post_type_t; +typedef opal_atomic_uint64_t osc_kupl_post_atomic_type_t; +#define OSC_KUPL_POST_BITS 6 +#define OSC_KUPL_POST_MASK 0x3f + +/* data shared across all peers */ +struct ompi_osc_kupl_global_state_t { + int use_barrier_for_fence; + + pthread_mutex_t mtx; + pthread_cond_t cond; + + int sense; + int32_t count; +}; +typedef struct ompi_osc_kupl_global_state_t ompi_osc_kupl_global_state_t; + +/* this is data exposed to remote nodes */ +struct ompi_osc_kupl_lock_t { + uint32_t counter; + uint32_t write; + uint32_t read; +}; +typedef struct ompi_osc_kupl_lock_t ompi_osc_kupl_lock_t; + +struct ompi_osc_kupl_node_state_t { + opal_atomic_int32_t complete_count; + ompi_osc_kupl_lock_t lock; + opal_atomic_lock_t accumulate_lock; +}; +typedef struct ompi_osc_kupl_node_state_t ompi_osc_kupl_node_state_t; + +struct ompi_osc_kupl_component_t { + ompi_osc_base_component_t super; + + /** Priority of the osc/kupl component */ + unsigned int priority; + + char *backing_directory; +}; +typedef struct ompi_osc_kupl_component_t ompi_osc_kupl_component_t; +OMPI_DECLSPEC extern ompi_osc_kupl_component_t mca_osc_kupl_component; + +enum ompi_osc_kupl_locktype_t { + lock_none = 0, + lock_nocheck, + lock_exclusive, + lock_shared +}; + +struct ompi_osc_kupl_module_t { + ompi_osc_base_module_t super; + struct ompi_communicator_t *comm; + kupl_shm_comm_h kupl_comm; + kupl_shm_win_h kupl_win; + + int flavor; + opal_shmem_ds_t seg_ds; + void *segment_base; + bool noncontig; + + size_t *sizes; + void **bases; + int *disp_units; + + ompi_group_t *start_group; + ompi_group_t *post_group; + + int my_sense; + + enum ompi_osc_kupl_locktype_t *outstanding_locks; + + /* exposed data */ + ompi_osc_kupl_global_state_t *global_state; + ompi_osc_kupl_node_state_t *my_node_state; + ompi_osc_kupl_node_state_t *node_states; + + osc_kupl_post_atomic_type_t **posts; + + opal_mutex_t lock; +}; +typedef struct ompi_osc_kupl_module_t ompi_osc_kupl_module_t; + +int ompi_osc_kupl_shared_query(struct ompi_win_t *win, int rank, size_t *size, int *disp_unit, void *baseptr); + +int ompi_osc_kupl_attach(struct ompi_win_t *win, void *base, size_t len); +int ompi_osc_kupl_detach(struct ompi_win_t *win, const void *base); + +int ompi_osc_kupl_free(struct ompi_win_t *win); + +int ompi_osc_kupl_put(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win); + +int ompi_osc_kupl_get(void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win); + +int ompi_osc_kupl_accumulate(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, + struct ompi_win_t *win); + +int ompi_osc_kupl_compare_and_swap(const void *origin_addr, + const void *compare_addr, + void *result_addr, + struct ompi_datatype_t *dt, + int target, + ptrdiff_t target_disp, + struct ompi_win_t *win); + +int ompi_osc_kupl_fetch_and_op(const void *origin_addr, + void *result_addr, + struct ompi_datatype_t *dt, + int target, + ptrdiff_t target_disp, + struct ompi_op_t *op, + struct ompi_win_t *win); + +int ompi_osc_kupl_get_accumulate(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_datatype, + void *result_addr, + int result_count, + struct ompi_datatype_t *result_datatype, + int target_rank, + MPI_Aint target_disp, + int target_count, + struct ompi_datatype_t *target_datatype, + struct ompi_op_t *op, + struct ompi_win_t *win); + +int ompi_osc_kupl_rput(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win, + struct ompi_request_t **request); + +int ompi_osc_kupl_rget(void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win, + struct ompi_request_t **request); + +int ompi_osc_kupl_raccumulate(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, + struct ompi_win_t *win, + struct ompi_request_t **request); + +int ompi_osc_kupl_rget_accumulate(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_datatype, + void *result_addr, + int result_count, + struct ompi_datatype_t *result_datatype, + int target_rank, + MPI_Aint target_disp, + int target_count, + struct ompi_datatype_t *target_datatype, + struct ompi_op_t *op, + struct ompi_win_t *win, + struct ompi_request_t **request); + +int ompi_osc_kupl_fence(int mpi_assert, struct ompi_win_t *win); + +int ompi_osc_kupl_start(struct ompi_group_t *group, + int mpi_assert, + struct ompi_win_t *win); + +int ompi_osc_kupl_complete(struct ompi_win_t *win); + +int ompi_osc_kupl_post(struct ompi_group_t *group, + int mpi_assert, + struct ompi_win_t *win); + +int ompi_osc_kupl_wait(struct ompi_win_t *win); + +int ompi_osc_kupl_test(struct ompi_win_t *win, + int *flag); + +int ompi_osc_kupl_lock(int lock_type, + int target, + int mpi_assert, + struct ompi_win_t *win); + +int ompi_osc_kupl_unlock(int target, + struct ompi_win_t *win); + + +int ompi_osc_kupl_lock_all(int mpi_assert, + struct ompi_win_t *win); + +int ompi_osc_kupl_unlock_all(struct ompi_win_t *win); + +int ompi_osc_kupl_sync(struct ompi_win_t *win); + +int ompi_osc_kupl_flush(int target, + struct ompi_win_t *win); +int ompi_osc_kupl_flush_all(struct ompi_win_t *win); +int ompi_osc_kupl_flush_local(int target, + struct ompi_win_t *win); +int ompi_osc_kupl_flush_local_all(struct ompi_win_t *win); + +int ompi_osc_kupl_set_info(struct ompi_win_t *win, struct opal_info_t *info); +int ompi_osc_kupl_get_info(struct ompi_win_t *win, struct opal_info_t **info_used); + +#endif diff --git a/ompi/mca/osc/kupl/osc_kupl_active_target.c b/ompi/mca/osc/kupl/osc_kupl_active_target.c new file mode 100644 index 00000000000..52855474ea0 --- /dev/null +++ b/ompi/mca/osc/kupl/osc_kupl_active_target.c @@ -0,0 +1,309 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2012 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2014-2017 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2014-2017 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "opal/sys/atomic.h" +#include "ompi/mca/osc/osc.h" +#include "ompi/mca/osc/base/base.h" +#include "ompi/mca/osc/base/osc_base_obj_convert.h" + +#include "osc_kupl.h" + +/** + * compare_ranks: + * + * @param[in] ptra Pointer to integer item + * @param[in] ptrb Pointer to integer item + * + * @returns 0 if *ptra == *ptrb + * @returns -1 if *ptra < *ptrb + * @returns 1 otherwise + * + * This function is used to sort the rank list. It can be removed if + * groups are always in order. + */ +static int compare_ranks (const void *ptra, const void *ptrb) +{ + int a = *((int *) ptra); + int b = *((int *) ptrb); + + if (a < b) { + return -1; + } else if (a > b) { + return 1; + } + + return 0; +} + +/** + * ompi_osc_pt2pt_get_comm_ranks: + * + * @param[in] module - OSC PT2PT module + * @param[in] sub_group - Group with ranks to translate + * + * @returns an array of translated ranks on success or NULL on failure + * + * Translate the ranks given in {sub_group} into ranks in the + * communicator used to create {module}. + */ +static int *ompi_osc_kupl_group_ranks (ompi_group_t *group, ompi_group_t *sub_group) +{ + int size = ompi_group_size(sub_group); + int *ranks1, *ranks2; + int ret; + + ranks1 = calloc (size, sizeof(int)); + ranks2 = calloc (size, sizeof(int)); + if (NULL == ranks1 || NULL == ranks2) { + free (ranks1); + free (ranks2); + return NULL; + } + + for (int i = 0 ; i < size ; ++i) { + ranks1[i] = i; + } + + ret = ompi_group_translate_ranks (sub_group, size, ranks1, group, ranks2); + free (ranks1); + if (OMPI_SUCCESS != ret) { + free (ranks2); + return NULL; + } + + qsort (ranks2, size, sizeof (int), compare_ranks); + + return ranks2; +} + + +int +ompi_osc_kupl_fence(int mpi_assert, struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + + /* ensure all memory operations have completed */ + opal_atomic_mb(); + + kupl_shm_fence(module->kupl_win); + return OMPI_SUCCESS; +} + +int +ompi_osc_kupl_start(struct ompi_group_t *group, + int mpi_assert, + struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + int my_rank = ompi_comm_rank (module->comm); + void *_tmp_ptr = NULL; + + OBJ_RETAIN(group); + + if (!OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&module->start_group, (void *) &_tmp_ptr, group)) { + OBJ_RELEASE(group); + return OMPI_ERR_RMA_SYNC; + } + + if (0 == (mpi_assert & MPI_MODE_NOCHECK)) { + int size; + + int *ranks = ompi_osc_kupl_group_ranks (module->comm->c_local_group, group); + if (NULL == ranks) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + size = ompi_group_size(module->start_group); + + for (int i = 0 ; i < size ; ++i) { + int rank_byte = ranks[i] >> OSC_KUPL_POST_BITS; + osc_kupl_post_type_t rank_bit = ((osc_kupl_post_type_t) 1) << (ranks[i] & OSC_KUPL_POST_MASK); + + /* wait for rank to post */ + while (!(module->posts[my_rank][rank_byte] & rank_bit)) { + opal_progress(); + opal_atomic_mb(); + } + + opal_atomic_rmb (); + + (void) opal_atomic_fetch_xor_64 ((opal_atomic_int64_t *) module->posts[my_rank] + rank_byte, rank_bit); + } + + free (ranks); + } + + opal_atomic_mb(); + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_complete(struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + ompi_group_t *group; + int gsize; + + /* ensure all memory operations have completed */ + opal_atomic_mb(); + + group = module->start_group; + if (NULL == group || !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR((opal_atomic_intptr_t *) &module->start_group, (opal_atomic_intptr_t *) &group, 0)) { + return OMPI_ERR_RMA_SYNC; + } + + opal_atomic_mb(); + + int *ranks = ompi_osc_kupl_group_ranks (module->comm->c_local_group, group); + if (NULL == ranks) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + gsize = ompi_group_size(group); + for (int i = 0 ; i < gsize ; ++i) { + (void) opal_atomic_add_fetch_32(&module->node_states[ranks[i]].complete_count, 1); + } + + free (ranks); + + OBJ_RELEASE(group); + + opal_atomic_mb(); + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_post(struct ompi_group_t *group, + int mpi_assert, + struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + int my_rank = ompi_comm_rank (module->comm); + int my_byte = my_rank >> OSC_KUPL_POST_BITS; + osc_kupl_post_type_t my_bit = ((osc_kupl_post_type_t) 1) << (my_rank & OSC_KUPL_POST_MASK); + int gsize; + + OPAL_THREAD_LOCK(&module->lock); + + if (NULL != module->post_group) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_RMA_SYNC; + } + + module->post_group = group; + + OBJ_RETAIN(group); + + if (0 == (mpi_assert & MPI_MODE_NOCHECK)) { + int *ranks = ompi_osc_kupl_group_ranks (module->comm->c_local_group, group); + if (NULL == ranks) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + module->my_node_state->complete_count = 0; + opal_atomic_mb(); + + gsize = ompi_group_size(module->post_group); + for (int i = 0 ; i < gsize ; ++i) { + (void) opal_atomic_fetch_add_64 ((opal_atomic_int64_t *) module->posts[ranks[i]] + my_byte, my_bit); + } + + opal_atomic_wmb (); + + free (ranks); + + opal_progress (); + } + + OPAL_THREAD_UNLOCK(&module->lock); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_wait(struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + ompi_group_t *group; + + OPAL_THREAD_LOCK(&module->lock); + + if (NULL == module->post_group) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_RMA_SYNC; + } + + group = module->post_group; + + int size = ompi_group_size (group); + + while (module->my_node_state->complete_count != size) { + opal_progress(); + opal_atomic_mb(); + } + + OBJ_RELEASE(group); + module->post_group = NULL; + + OPAL_THREAD_UNLOCK(&module->lock); + + /* ensure all memory operations have completed */ + opal_atomic_mb(); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_test(struct ompi_win_t *win, + int *flag) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + + OPAL_THREAD_LOCK(&module->lock); + + if (NULL == module->post_group) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_RMA_SYNC; + } + + int size = ompi_group_size(module->post_group); + + if (module->my_node_state->complete_count == size) { + OBJ_RELEASE(module->post_group); + module->post_group = NULL; + *flag = 1; + } else { + *flag = 0; + } + + OPAL_THREAD_UNLOCK(&module->lock); + + /* ensure all memory operations have completed */ + opal_atomic_mb(); + + return OMPI_SUCCESS; +} diff --git a/ompi/mca/osc/kupl/osc_kupl_comm.c b/ompi/mca/osc/kupl/osc_kupl_comm.c new file mode 100644 index 00000000000..2dc94447f59 --- /dev/null +++ b/ompi/mca/osc/kupl/osc_kupl_comm.c @@ -0,0 +1,441 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2011 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2014 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2015-2017 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/osc/osc.h" +#include "ompi/mca/osc/base/base.h" +#include "ompi/mca/osc/base/osc_base_obj_convert.h" + +#include "osc_kupl.h" + +int +ompi_osc_kupl_rput(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win, + struct ompi_request_t **ompi_req) +{ + int ret; + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + void *remote_address; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "rput: 0x%lx, %d, %s, %d, %d, %d, %s, 0x%lx", + (unsigned long) origin_addr, origin_count, + origin_dt->name, target, (int) target_disp, + target_count, target_dt->name, + (unsigned long) win)); + + remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp; + + ret = ompi_datatype_sndrcv((void *)origin_addr, origin_count, origin_dt, + remote_address, target_count, target_dt); + if (OMPI_SUCCESS != ret) { + return ret; + } + + /* the only valid field of RMA request status is the MPI_ERROR field. + * ompi_request_empty has status MPI_SUCCESS and indicates the request is + * complete. */ + *ompi_req = &ompi_request_empty; + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_rget(void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win, + struct ompi_request_t **ompi_req) +{ + int ret; + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + void *remote_address; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "rget: 0x%lx, %d, %s, %d, %d, %d, %s, 0x%lx", + (unsigned long) origin_addr, origin_count, + origin_dt->name, target, (int) target_disp, + target_count, target_dt->name, + (unsigned long) win)); + + remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp; + + ret = ompi_datatype_sndrcv(remote_address, target_count, target_dt, + origin_addr, origin_count, origin_dt); + if (OMPI_SUCCESS != ret) { + return ret; + } + + /* the only valid field of RMA request status is the MPI_ERROR field. + * ompi_request_empty has status MPI_SUCCESS and indicates the request is + * complete. */ + *ompi_req = &ompi_request_empty; + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_raccumulate(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, + struct ompi_win_t *win, + struct ompi_request_t **ompi_req) +{ + int ret; + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + void *remote_address; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "raccumulate: 0x%lx, %d, %s, %d, %d, %d, %s, %s, 0x%lx", + (unsigned long) origin_addr, origin_count, + origin_dt->name, target, (int) target_disp, + target_count, target_dt->name, + op->o_name, + (unsigned long) win)); + + remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp; + + opal_atomic_lock(&module->node_states[target].accumulate_lock); + if (op == &ompi_mpi_op_replace.op) { + ret = ompi_datatype_sndrcv((void *)origin_addr, origin_count, origin_dt, + remote_address, target_count, target_dt); + } else { + ret = ompi_osc_base_sndrcv_op(origin_addr, origin_count, origin_dt, + remote_address, target_count, target_dt, + op); + } + opal_atomic_unlock(&module->node_states[target].accumulate_lock); + + /* the only valid field of RMA request status is the MPI_ERROR field. + * ompi_request_empty has status MPI_SUCCESS and indicates the request is + * complete. */ + *ompi_req = &ompi_request_empty; + + return ret; +} + + + +int +ompi_osc_kupl_rget_accumulate(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + void *result_addr, + int result_count, + struct ompi_datatype_t *result_dt, + int target, + MPI_Aint target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, + struct ompi_win_t *win, + struct ompi_request_t **ompi_req) +{ + int ret; + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + void *remote_address; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "rget_accumulate: 0x%lx, %d, %s, %d, %d, %d, %s, %s, 0x%lx", + (unsigned long) origin_addr, origin_count, + origin_dt->name, target, (int) target_disp, + target_count, target_dt->name, + op->o_name, + (unsigned long) win)); + + remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp; + + opal_atomic_lock(&module->node_states[target].accumulate_lock); + + ret = ompi_datatype_sndrcv(remote_address, target_count, target_dt, + result_addr, result_count, result_dt); + if (OMPI_SUCCESS != ret || op == &ompi_mpi_op_no_op.op) goto done; + + if (op == &ompi_mpi_op_replace.op) { + ret = ompi_datatype_sndrcv((void *)origin_addr, origin_count, origin_dt, + remote_address, target_count, target_dt); + } else { + ret = ompi_osc_base_sndrcv_op(origin_addr, origin_count, origin_dt, + remote_address, target_count, target_dt, + op); + } + + done: + opal_atomic_unlock(&module->node_states[target].accumulate_lock); + + /* the only valid field of RMA request status is the MPI_ERROR field. + * ompi_request_empty has status MPI_SUCCESS and indicates the request is + * complete. */ + *ompi_req = &ompi_request_empty; + + return ret; +} + + +int +ompi_osc_kupl_put(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win) +{ + int ret; + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + void *remote_address; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "put: 0x%lx, %d, %s, %d, %d, %d, %s, 0x%lx", + (unsigned long) origin_addr, origin_count, + origin_dt->name, target, (int) target_disp, + target_count, target_dt->name, + (unsigned long) win)); + + remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp; + + ret = ompi_datatype_sndrcv((void *)origin_addr, origin_count, origin_dt, + remote_address, target_count, target_dt); + + return ret; +} + + +int +ompi_osc_kupl_get(void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win) +{ + int ret; + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + void *remote_address; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "get: 0x%lx, %d, %s, %d, %d, %d, %s, 0x%lx", + (unsigned long) origin_addr, origin_count, + origin_dt->name, target, (int) target_disp, + target_count, target_dt->name, + (unsigned long) win)); + + remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp; + + ret = ompi_datatype_sndrcv(remote_address, target_count, target_dt, + origin_addr, origin_count, origin_dt); + + return ret; +} + + +int +ompi_osc_kupl_accumulate(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, + struct ompi_win_t *win) +{ + int ret; + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + void *remote_address; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "accumulate: 0x%lx, %d, %s, %d, %d, %d, %s, %s, 0x%lx", + (unsigned long) origin_addr, origin_count, + origin_dt->name, target, (int) target_disp, + target_count, target_dt->name, + op->o_name, + (unsigned long) win)); + + remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp; + + opal_atomic_lock(&module->node_states[target].accumulate_lock); + if (op == &ompi_mpi_op_replace.op) { + ret = ompi_datatype_sndrcv((void *)origin_addr, origin_count, origin_dt, + remote_address, target_count, target_dt); + } else { + ret = ompi_osc_base_sndrcv_op(origin_addr, origin_count, origin_dt, + remote_address, target_count, target_dt, + op); + } + opal_atomic_unlock(&module->node_states[target].accumulate_lock); + + return ret; +} + + +int +ompi_osc_kupl_get_accumulate(const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + void *result_addr, + int result_count, + struct ompi_datatype_t *result_dt, + int target, + MPI_Aint target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, + struct ompi_win_t *win) +{ + int ret; + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + void *remote_address; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "get_accumulate: 0x%lx, %d, %s, %d, %d, %d, %s, %s, 0x%lx", + (unsigned long) origin_addr, origin_count, + origin_dt->name, target, (int) target_disp, + target_count, target_dt->name, + op->o_name, + (unsigned long) win)); + + remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp; + + opal_atomic_lock(&module->node_states[target].accumulate_lock); + + ret = ompi_datatype_sndrcv(remote_address, target_count, target_dt, + result_addr, result_count, result_dt); + if (OMPI_SUCCESS != ret || op == &ompi_mpi_op_no_op.op) goto done; + + if (op == &ompi_mpi_op_replace.op) { + ret = ompi_datatype_sndrcv((void *)origin_addr, origin_count, origin_dt, + remote_address, target_count, target_dt); + } else { + ret = ompi_osc_base_sndrcv_op(origin_addr, origin_count, origin_dt, + remote_address, target_count, target_dt, + op); + } + + done: + opal_atomic_unlock(&module->node_states[target].accumulate_lock); + + return ret; +} + + +int +ompi_osc_kupl_compare_and_swap(const void *origin_addr, + const void *compare_addr, + void *result_addr, + struct ompi_datatype_t *dt, + int target, + ptrdiff_t target_disp, + struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + void *remote_address; + size_t size; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "compare_and_swap: 0x%lx, %s, %d, %d, 0x%lx", + (unsigned long) origin_addr, + dt->name, target, (int) target_disp, + (unsigned long) win)); + + remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp; + + ompi_datatype_type_size(dt, &size); + + opal_atomic_lock(&module->node_states[target].accumulate_lock); + + /* fetch */ + ompi_datatype_copy_content_same_ddt(dt, 1, (char*) result_addr, (char*) remote_address); + /* compare */ + if (0 == memcmp(result_addr, compare_addr, size)) { + /* set */ + ompi_datatype_copy_content_same_ddt(dt, 1, (char*) remote_address, (char*) origin_addr); + } + + opal_atomic_unlock(&module->node_states[target].accumulate_lock); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_fetch_and_op(const void *origin_addr, + void *result_addr, + struct ompi_datatype_t *dt, + int target, + ptrdiff_t target_disp, + struct ompi_op_t *op, + struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + void *remote_address; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "fetch_and_op: 0x%lx, %s, %d, %d, %s, 0x%lx", + (unsigned long) origin_addr, + dt->name, target, (int) target_disp, + op->o_name, + (unsigned long) win)); + + remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp; + + opal_atomic_lock(&module->node_states[target].accumulate_lock); + + /* fetch */ + ompi_datatype_copy_content_same_ddt(dt, 1, (char*) result_addr, (char*) remote_address); + if (op == &ompi_mpi_op_no_op.op) goto done; + + /* op */ + if (op == &ompi_mpi_op_replace.op) { + ompi_datatype_copy_content_same_ddt(dt, 1, (char*) remote_address, (char*) origin_addr); + } else { + ompi_op_reduce(op, (void *)origin_addr, remote_address, 1, dt); + } + + done: + opal_atomic_unlock(&module->node_states[target].accumulate_lock); + + return OMPI_SUCCESS;; +} diff --git a/ompi/mca/osc/kupl/osc_kupl_component.c b/ompi/mca/osc/kupl/osc_kupl_component.c new file mode 100644 index 00000000000..d99d8389951 --- /dev/null +++ b/ompi/mca/osc/kupl/osc_kupl_component.c @@ -0,0 +1,571 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2012 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2014-2018 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2015-2018 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2017 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2016-2017 IBM Corporation. All rights reserved. + * Copyright (c) 2018-2022 Amazon.com, Inc. or its affiliates. All Rights reserved. + * Copyright (c) 2020 FUJITSU LIMITED. All rights reserved. + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/osc/osc.h" +#include "ompi/mca/osc/base/base.h" +#include "ompi/mca/osc/base/osc_base_obj_convert.h" +#include "ompi/request/request.h" +#include "opal/util/sys_limits.h" +#include "opal/align.h" +#include "opal/util/printf.h" +#include "opal/mca/mpool/base/base.h" + +#include "osc_kupl.h" + +static int component_open(void); +static int component_init(bool enable_progress_threads, bool enable_mpi_threads); +static int component_finalize(void); +static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit, + struct ompi_communicator_t *comm, struct opal_info_t *info, + int flavor); +static int component_register (void); +static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit, + struct ompi_communicator_t *comm, struct opal_info_t *info, + int flavor, int *model); + +static struct kupl_info config_info = { + .is_contig = 1 +}; + +ompi_osc_kupl_component_t mca_osc_kupl_component = { + { /* ompi_osc_base_component_t */ + .osc_version = { + OMPI_OSC_BASE_VERSION_3_0_0, + .mca_component_name = "kupl", + MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION, + OMPI_RELEASE_VERSION), + .mca_open_component = component_open, + .mca_register_component_params = component_register, + }, + .osc_data = { /* mca_base_component_data */ + /* The component is not checkpoint ready */ + MCA_BASE_METADATA_PARAM_NONE + }, + .osc_init = component_init, + .osc_query = component_query, + .osc_select = component_select, + .osc_finalize = component_finalize, + } +}; + + +ompi_osc_kupl_module_t ompi_osc_kupl_module_template = { + { + .osc_win_shared_query = ompi_osc_kupl_shared_query, + + .osc_win_attach = ompi_osc_kupl_attach, + .osc_win_detach = ompi_osc_kupl_detach, + .osc_free = ompi_osc_kupl_free, + + .osc_put = ompi_osc_kupl_put, + .osc_get = ompi_osc_kupl_get, + .osc_accumulate = ompi_osc_kupl_accumulate, + .osc_compare_and_swap = ompi_osc_kupl_compare_and_swap, + .osc_fetch_and_op = ompi_osc_kupl_fetch_and_op, + .osc_get_accumulate = ompi_osc_kupl_get_accumulate, + + .osc_rput = ompi_osc_kupl_rput, + .osc_rget = ompi_osc_kupl_rget, + .osc_raccumulate = ompi_osc_kupl_raccumulate, + .osc_rget_accumulate = ompi_osc_kupl_rget_accumulate, + + .osc_fence = ompi_osc_kupl_fence, + + .osc_start = ompi_osc_kupl_start, + .osc_complete = ompi_osc_kupl_complete, + .osc_post = ompi_osc_kupl_post, + .osc_wait = ompi_osc_kupl_wait, + .osc_test = ompi_osc_kupl_test, + + .osc_lock = ompi_osc_kupl_lock, + .osc_unlock = ompi_osc_kupl_unlock, + .osc_lock_all = ompi_osc_kupl_lock_all, + .osc_unlock_all = ompi_osc_kupl_unlock_all, + + .osc_sync = ompi_osc_kupl_sync, + .osc_flush = ompi_osc_kupl_flush, + .osc_flush_all = ompi_osc_kupl_flush_all, + .osc_flush_local = ompi_osc_kupl_flush_local, + .osc_flush_local_all = ompi_osc_kupl_flush_local_all, + } +}; + +int oob_allgather_callback(const void *sendbuf, void *recvbuf, int size, void *group, kupl_shm_datatype_t datatype) +{ + switch (datatype) { + case KUPL_SHM_DATATYPE_CHAR: + return MPI_Allgather(sendbuf, size, MPI_CHAR, recvbuf, size, MPI_CHAR, (MPI_Comm)group); + default: + return -1; + } +} + +int oob_barrier_callback(void *group) +{ + return MPI_Barrier((MPI_Comm)group); +} + +static int component_register (void) +{ + char *description_str; + + if (0 == access ("/dev/shm", W_OK)) { + mca_osc_kupl_component.backing_directory = "/dev/shm"; + } else { + mca_osc_kupl_component.backing_directory = ompi_process_info.proc_session_dir; + } + + (void) mca_base_component_var_register (&mca_osc_kupl_component.super.osc_version, "backing_directory", + "Directory to place backing files for shared memory windows. " + "This directory should be on a local filesystem such as /tmp or " + "/dev/shm (default: (linux) /dev/shm, (others) session directory)", + MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_3, + MCA_BASE_VAR_SCOPE_READONLY, &mca_osc_kupl_component.backing_directory); + + mca_osc_kupl_component.priority = 100; + opal_asprintf(&description_str, "Priority of the osc/kupl component (default: %d)", + mca_osc_kupl_component.priority); + (void)mca_base_component_var_register(&mca_osc_kupl_component.super.osc_version, + "priority", description_str, + MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0, + OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_GROUP, + &mca_osc_kupl_component.priority); + free(description_str); + + return OPAL_SUCCESS; +} + +static int +component_open(void) +{ + return OMPI_SUCCESS; +} + + +static int +component_init(bool enable_progress_threads, bool enable_mpi_threads) +{ + return OMPI_SUCCESS; +} + + +static int +component_finalize(void) +{ + /* clean up requests free list */ + + return OMPI_SUCCESS; +} + + +static int +component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit, + struct ompi_communicator_t *comm, struct opal_info_t *info, + int flavor) +{ + /* component only supports shared or allocate flavors */ + if (! (MPI_WIN_FLAVOR_SHARED == flavor || + MPI_WIN_FLAVOR_ALLOCATE == flavor)) { + return -1; + } + + /* If flavor is win_allocate, we can't run if there are remote + * peers in the group. The same check for flavor_shared happens + * in select(), so that we can return an error to the user (since + * we should be able to run for all flavor_shared use cases. + * There's no way to return an error from component_query to the + * user, hence the delayed check. */ + if (MPI_WIN_FLAVOR_ALLOCATE == flavor) { + if (ompi_group_have_remote_peers(comm->c_local_group)) { + return -1; + } + } + + return mca_osc_kupl_component.priority; +} + + +static int +component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit, + struct ompi_communicator_t *comm, struct opal_info_t *info, + int flavor, int *model) +{ + ompi_osc_kupl_module_t *module = NULL; + int comm_size = ompi_comm_size (comm); + int rank = ompi_comm_rank (comm); + int global_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &global_rank); + + bool unlink_needed = false; + int ret = OMPI_ERROR; + size_t memory_alignment = OPAL_ALIGN_MIN; + + assert(MPI_WIN_FLAVOR_SHARED == flavor || MPI_WIN_FLAVOR_ALLOCATE == flavor); + + if (ompi_group_have_remote_peers(comm->c_local_group)) { + return OMPI_ERR_RMA_SHARED; + } + + /* create module structure */ + module = (ompi_osc_kupl_module_t*) + calloc(1, sizeof(ompi_osc_kupl_module_t)); + if (NULL == module) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + + win->w_osc_module = &module->super; + + OBJ_CONSTRUCT(&module->lock, opal_mutex_t); + + if (NULL != info) { + ompi_osc_base_set_memory_alignment(info, &memory_alignment); + } + + /* fill in the function pointer part */ + memcpy(module, &ompi_osc_kupl_module_template, + sizeof(ompi_osc_base_module_t)); + + /* need our communicator for collectives in next phase */ + ret = ompi_comm_dup(comm, &module->comm); + if (OMPI_SUCCESS != ret) goto error; + + module->flavor = flavor; + + kupl_shm_oob_cb_t oob_cbs; + kupl_shm_oob_cb_h oob_cbs_h = &oob_cbs; + oob_cbs_h->oob_allgather = oob_allgather_callback; + oob_cbs_h->oob_barrier = oob_barrier_callback; + + kupl_shm_comm_create(comm_size, rank, global_rank, oob_cbs_h, comm, &module->kupl_comm); + + /* create the segment */ + if (1 == comm_size) { + kupl_shm_win_alloc(size, module->kupl_comm, base, &module->kupl_win, &config_info); + module->segment_base = NULL; + module->sizes = malloc(sizeof(size_t)); + if (NULL == module->sizes) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + module->bases = malloc(sizeof(void*)); + if (NULL == module->bases) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + + module->sizes[0] = size; + kupl_shm_win_query(module->kupl_win, 0, (void **)&(module->bases[0])); + + module->global_state = malloc(sizeof(ompi_osc_kupl_global_state_t)); + if (NULL == module->global_state) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + module->node_states = malloc(sizeof(ompi_osc_kupl_node_state_t)); + if (NULL == module->node_states) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + module->posts = calloc (1, sizeof(module->posts[0]) + sizeof (module->posts[0][0])); + if (NULL == module->posts) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + module->posts[0] = (osc_kupl_post_atomic_type_t *) (module->posts + 1); + } else { + size_t pagesize; + size_t state_size; + size_t posts_size, post_size = (comm_size + OSC_KUPL_POST_MASK) / (OSC_KUPL_POST_MASK + 1); + size_t data_base_size; + + opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output, + "allocating shared memory region of size %ld\n", (long) size); + + /* get the pagesize */ + pagesize = opal_getpagesize(); + + /* Note that the alloc_shared_noncontig info key only has + * meaning during window creation. Once the window is + * created, we can't move memory around without making + * everything miserable. So we intentionally do not subscribe + * to updates on the info key, because there's no useful + * update to occur. */ + module->noncontig = false; + int flag; + if (OMPI_SUCCESS != opal_info_get_bool(info, "alloc_shared_noncontig", + &module->noncontig, &flag)) { + goto error; + } + + if (module->noncontig) { + opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output, + "allocating window using non-contiguous strategy"); + config_info.is_contig = 0; + } else { + opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output, + "allocating window using contiguous strategy"); + config_info.is_contig = 1; + } + kupl_shm_win_alloc(size, module->kupl_comm, base, &module->kupl_win, &config_info); + + /* user opal/shmem directly to create a shared memory segment */ + state_size = sizeof(ompi_osc_kupl_global_state_t) + sizeof(ompi_osc_kupl_node_state_t) * comm_size; + state_size += OPAL_ALIGN_PAD_AMOUNT(state_size, 64); + posts_size = comm_size * post_size * sizeof (module->posts[0][0]); + posts_size += OPAL_ALIGN_PAD_AMOUNT(posts_size, 64); + data_base_size = state_size + posts_size; + data_base_size += OPAL_ALIGN_PAD_AMOUNT(data_base_size, pagesize); + if (0 == ompi_comm_rank (module->comm)) { + char *data_file; + ret = opal_asprintf (&data_file, "%s" OPAL_PATH_SEP "osc_kupl.%s.%x.%d.%s", + mca_osc_kupl_component.backing_directory, ompi_process_info.nodename, + OMPI_PROC_MY_NAME->jobid, (int) OMPI_PROC_MY_NAME->vpid, + ompi_comm_print_cid(module->comm)); + if (ret < 0) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto error; + } + + ret = opal_shmem_segment_create (&module->seg_ds, data_file, data_base_size); + free(data_file); + if (OPAL_SUCCESS != ret) { + goto error; + } + + unlink_needed = true; + } + + ret = module->comm->c_coll->coll_bcast (&module->seg_ds, sizeof (module->seg_ds), MPI_BYTE, 0, + module->comm, module->comm->c_coll->coll_bcast_module); + if (OMPI_SUCCESS != ret) { + goto error; + } + + module->segment_base = opal_shmem_segment_attach (&module->seg_ds); + if (NULL == module->segment_base) { + goto error; + } + + /* wait for all processes to attach */ + ret = module->comm->c_coll->coll_barrier (module->comm, module->comm->c_coll->coll_barrier_module); + if (OMPI_SUCCESS != ret) { + goto error; + } + + if (0 == ompi_comm_rank (module->comm)) { + opal_shmem_unlink (&module->seg_ds); + unlink_needed = false; + } + + module->sizes = malloc(comm_size * sizeof(size_t)); + if (NULL == module->sizes) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + module->bases = (void**)malloc(comm_size * sizeof(void*)); + if (NULL == module->bases) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + module->posts = calloc (comm_size, sizeof (module->posts[0])); + if (NULL == module->posts) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + + /* set module->posts[0] first to ensure 64-bit alignment */ + module->posts[0] = (osc_kupl_post_atomic_type_t *) (module->segment_base); + module->global_state = (ompi_osc_kupl_global_state_t *) (module->posts[0] + comm_size * post_size); + module->node_states = (ompi_osc_kupl_node_state_t *) (module->global_state + 1); + + unsigned long size_list[comm_size]; + + ret = module->comm->c_coll->coll_allgather(&size, 1, MPI_UNSIGNED_LONG, + size_list, 1, MPI_UNSIGNED_LONG, + module->comm, + module->comm->c_coll->coll_allgather_module); + + for (int i = 0; i < comm_size; i++) { + if (i > 0) { + module->posts[i] = module->posts[i - 1] + post_size; + } + if (size_list[i]) { + kupl_shm_win_query(module->kupl_win, i, (void **)&(module->bases[i])); + } else { + module->bases[i] = NULL; + } + module->sizes[i] = size_list[i]; + } + } + + /* initialize my state shared */ + module->my_node_state = &module->node_states[ompi_comm_rank(module->comm)]; + memset (module->my_node_state, 0, sizeof(*module->my_node_state)); + + opal_atomic_lock_init(&module->my_node_state->accumulate_lock, OPAL_ATOMIC_LOCK_UNLOCKED); + + /* share everyone's displacement units. */ + module->disp_units = malloc(sizeof(int) * comm_size); + ret = module->comm->c_coll->coll_allgather(&disp_unit, 1, MPI_INT, + module->disp_units, 1, MPI_INT, + module->comm, + module->comm->c_coll->coll_allgather_module); + if (OMPI_SUCCESS != ret) goto error; + + module->start_group = NULL; + module->post_group = NULL; + + /* initialize synchronization code */ + module->my_sense = 1; + + module->outstanding_locks = calloc(comm_size, sizeof(enum ompi_osc_kupl_locktype_t)); + if (NULL == module->outstanding_locks) { + ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; + goto error; + } + + if (OMPI_SUCCESS != ret) goto error; + + *model = MPI_WIN_UNIFIED; + + return OMPI_SUCCESS; + + error: + + if (0 == ompi_comm_rank (module->comm) && unlink_needed) { + opal_shmem_unlink (&module->seg_ds); + } + + ompi_osc_kupl_free (win); + + return ret; +} + + +int +ompi_osc_kupl_shared_query(struct ompi_win_t *win, int rank, size_t *size, int *disp_unit, void *baseptr) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + + if (module->flavor != MPI_WIN_FLAVOR_SHARED) { + return MPI_ERR_WIN; + } + + if (MPI_PROC_NULL != rank) { + *size = module->sizes[rank]; + *((void**) baseptr) = module->bases[rank]; + *disp_unit = module->disp_units[rank]; + } else { + int i = 0; + + *size = 0; + *((void**) baseptr) = NULL; + *disp_unit = 0; + for (i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { + if (0 != module->sizes[i]) { + *size = module->sizes[i]; + *((void**) baseptr) = module->bases[i]; + *disp_unit = module->disp_units[i]; + break; + } + } + } + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_attach(struct ompi_win_t *win, void *base, size_t len) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + + if (module->flavor != MPI_WIN_FLAVOR_DYNAMIC) { + return MPI_ERR_RMA_ATTACH; + } + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_detach(struct ompi_win_t *win, const void *base) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + + if (module->flavor != MPI_WIN_FLAVOR_DYNAMIC) { + return MPI_ERR_RMA_ATTACH; + } + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_free(struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + + /* free memory */ + if (NULL != module->segment_base) { + /* synchronize */ + module->comm->c_coll->coll_barrier(module->comm, + module->comm->c_coll->coll_barrier_module); + + opal_shmem_segment_detach (&module->seg_ds); + } else { + free(module->node_states); + free(module->global_state); + } + free(module->disp_units); + free(module->outstanding_locks); + free(module->sizes); + free(module->bases); + + free (module->posts); + + kupl_shm_win_free(module->kupl_win); + kupl_shm_comm_destroy(module->kupl_comm); + + /* cleanup */ + ompi_comm_free(&module->comm); + + OBJ_DESTRUCT(&module->lock); + + free(module); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_set_info(struct ompi_win_t *win, struct opal_info_t *info) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + + /* enforce collectiveness... */ + return module->comm->c_coll->coll_barrier(module->comm, + module->comm->c_coll->coll_barrier_module); +} + + +int +ompi_osc_kupl_get_info(struct ompi_win_t *win, struct opal_info_t **info_used) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + + opal_info_t *info = OBJ_NEW(opal_info_t); + if (NULL == info) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + + if (module->flavor == MPI_WIN_FLAVOR_SHARED) { + opal_info_set(info, "blocking_fence", + (1 == module->global_state->use_barrier_for_fence) ? "true" : "false"); + opal_info_set(info, "alloc_shared_noncontig", + (module->noncontig) ? "true" : "false"); + } + + *info_used = info; + + return OMPI_SUCCESS; +} diff --git a/ompi/mca/osc/kupl/osc_kupl_passive_target.c b/ompi/mca/osc/kupl/osc_kupl_passive_target.c new file mode 100644 index 00000000000..7665870be64 --- /dev/null +++ b/ompi/mca/osc/kupl/osc_kupl_passive_target.c @@ -0,0 +1,269 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2011 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2015 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/osc/osc.h" +#include "ompi/mca/osc/base/base.h" +#include "ompi/mca/osc/base/osc_base_obj_convert.h" + +#include "osc_kupl.h" + + +static inline uint32_t +lk_fetch_add32(ompi_osc_kupl_module_t *module, + int target, + size_t offset, + uint32_t delta) +{ + /* opal_atomic_add_fetch_32 is an add then fetch so delta needs to be subtracted out to get the + * old value */ + return opal_atomic_add_fetch_32((opal_atomic_int32_t *) ((char*) &module->node_states[target].lock + offset), + delta) - delta; +} + + +static inline void +lk_add32(ompi_osc_kupl_module_t *module, + int target, + size_t offset, + uint32_t delta) +{ + opal_atomic_add_fetch_32((opal_atomic_int32_t *) ((char*) &module->node_states[target].lock + offset), + delta); +} + + +static inline uint32_t +lk_fetch32(ompi_osc_kupl_module_t *module, + int target, + size_t offset) +{ + opal_atomic_mb (); + return *((uint32_t *)((char*) &module->node_states[target].lock + offset)); +} + + +static inline int +start_exclusive(ompi_osc_kupl_module_t *module, + int target) +{ + uint32_t me = lk_fetch_add32(module, target, + offsetof(ompi_osc_kupl_lock_t, counter), 1); + + while (me != lk_fetch32(module, target, + offsetof(ompi_osc_kupl_lock_t, write))) { + opal_progress(); + } + + return OMPI_SUCCESS; +} + + +static inline int +end_exclusive(ompi_osc_kupl_module_t *module, + int target) +{ + lk_add32(module, target, offsetof(ompi_osc_kupl_lock_t, write), 1); + lk_add32(module, target, offsetof(ompi_osc_kupl_lock_t, read), 1); + + return OMPI_SUCCESS; +} + + +static inline int +start_shared(ompi_osc_kupl_module_t *module, + int target) +{ + uint32_t me = lk_fetch_add32(module, target, + offsetof(ompi_osc_kupl_lock_t, counter), 1); + + while (me != lk_fetch32(module, target, + offsetof(ompi_osc_kupl_lock_t, read))) { + opal_progress(); + } + + lk_add32(module, target, offsetof(ompi_osc_kupl_lock_t, read), 1); + + return OMPI_SUCCESS; +} + + +static inline int +end_shared(ompi_osc_kupl_module_t *module, + int target) +{ + lk_add32(module, target, offsetof(ompi_osc_kupl_lock_t, write), 1); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_lock(int lock_type, + int target, + int mpi_assert, + struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + int ret; + + if (lock_none != module->outstanding_locks[target]) { + return OMPI_ERR_RMA_SYNC; + } + + if (0 == (mpi_assert & MPI_MODE_NOCHECK)) { + if (MPI_LOCK_EXCLUSIVE == lock_type) { + module->outstanding_locks[target] = lock_exclusive; + ret = start_exclusive(module, target); + } else { + module->outstanding_locks[target] = lock_shared; + ret = start_shared(module, target); + } + } else { + module->outstanding_locks[target] = lock_nocheck; + ret = OMPI_SUCCESS; + } + + return ret; +} + + +int +ompi_osc_kupl_unlock(int target, + struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + int ret; + + /* ensure all memory operations have completed */ + opal_atomic_mb(); + + switch (module->outstanding_locks[target]) { + case lock_none: + return OMPI_ERR_RMA_SYNC; + + case lock_nocheck: + ret = OMPI_SUCCESS; + break; + + case lock_exclusive: + ret = end_exclusive(module, target); + break; + + case lock_shared: + ret = end_shared(module, target); + break; + + default: + // This is an OMPI programming error -- cause some pain. + assert(module->outstanding_locks[target] == lock_none || + module->outstanding_locks[target] == lock_nocheck || + module->outstanding_locks[target] == lock_exclusive || + module->outstanding_locks[target] == lock_shared); + + // In non-developer builds, assert() will be a no-op, so + // ensure the error gets reported + opal_output(0, "Unknown lock type in ompi_osc_kupl_unlock -- this is an OMPI programming error"); + ret = OMPI_ERR_BAD_PARAM; + break; + } + + module->outstanding_locks[target] = lock_none; + + return ret; +} + + +int +ompi_osc_kupl_lock_all(int mpi_assert, + struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + int ret, i, comm_size; + + comm_size = ompi_comm_size(module->comm); + for (i = 0 ; i < comm_size ; ++i) { + ret = ompi_osc_kupl_lock(MPI_LOCK_SHARED, i, mpi_assert, win); + if (OMPI_SUCCESS != ret) return ret; + } + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_unlock_all(struct ompi_win_t *win) +{ + ompi_osc_kupl_module_t *module = + (ompi_osc_kupl_module_t*) win->w_osc_module; + int ret, i, comm_size; + + comm_size = ompi_comm_size(module->comm); + for (i = 0 ; i < comm_size ; ++i) { + ret = ompi_osc_kupl_unlock(i, win); + if (OMPI_SUCCESS != ret) return ret; + } + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_sync(struct ompi_win_t *win) +{ + opal_atomic_mb(); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_flush(int target, + struct ompi_win_t *win) +{ + opal_atomic_mb(); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_flush_all(struct ompi_win_t *win) +{ + opal_atomic_mb(); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_flush_local(int target, + struct ompi_win_t *win) +{ + opal_atomic_mb(); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_kupl_flush_local_all(struct ompi_win_t *win) +{ + opal_atomic_mb(); + + return OMPI_SUCCESS; +} -- Gitee