Skip to content
Snippets Groups Projects
Commit 5ba8b58e authored by Utz-Uwe Haus's avatar Utz-Uwe Haus
Browse files

Reduce exposure of RDMA transport to other code

Avoid exporting the internal memory registration table to main transport
and pool_client code.

Avoid duplicate code in transfer_completion handler on PC
vs. transport_bh handler

Properly distinguish between 0-length RDMA transports (which have no
memory registration) vs. erroneously missing memory registrations
parent 8ff63c7d
Branches
Tags
No related merge requests found
......@@ -740,48 +740,41 @@ mstro_pc__handle_transfer_completed(Mstro__Pool__MstroMsg *msg)
/* if this is an OFI ticket: call mstro_transport_rdma_src_execute_bh */
struct mstro_transport_mreg_table_entry* regentry = NULL;
s = mstro_transport_rdma_src_execute_bh(completion);
if(s==MSTRO_NOENT) {
mstro_cdo cdo;
struct mstro_cdo_id cdoid = { .qw[0] = completion->srccdoid->qw0,
.qw[1] = completion->srccdoid->qw1,
.local_id = completion->srccdoid->local_id };
struct mstro_pm_app_registry_entry *e;
s = mstro_pm_app_lookup(app_id, &e);
if(e==NULL) {
ERR("Target %zu not in app table\n", app_id);
return MSTRO_FAIL;
}
struct mstro_transport_mreg_table_key key;
memset(&key, 0, sizeof(struct mstro_transport_mreg_table_key));
key.id = cdoid;
key.domain = e->ep->domain;
key.ep = e->ep->fi->domain_attr->mr_mode & FI_MR_ENDPOINT ? e->ep->ep : NULL;
int err = pthread_mutex_lock(&g_mstro_transport_mreg_table_lock);
if (err) {
ERR("Couldn't lock mutex on mreg table (err: %d)\n", err);
return MSTRO_FAIL;
mstro_cdo__find_cdo(&cdoid, &cdo);
if(cdo==NULL) {
WITH_CDO_ID_STR(idstr, &cdo, {
ERR("Cannot find CDO %s for which completion was signalled\n", idstr);
});
goto DONE;
}
HASH_FIND(hh, g_mstro_transport_mreg_table,
&key, sizeof(struct mstro_transport_mreg_table_key), regentry);
err = pthread_mutex_unlock(&g_mstro_transport_mreg_table_lock);
if (err) {
ERR("Couldn't unlock mutex on mreg table (err: %d)\n", err);
return MSTRO_FAIL;
const void *size=NULL;
enum mstro_cdo_attr_value_type vt;
s = mstro_attribute_dict_get(cdo->attributes,
MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE,
&vt, &size, NULL, false);
if(s==MSTRO_NOENT && vt==MSTRO_CDO_ATTR_VALUE_INVALID) {
ERR("Source CDO has no local-size\n");
goto DONE;
}
if (regentry) {
s = mstro_transport_rdma_src_execute_bh(completion);
if (s != MSTRO_OK)
return s;
} else {
WITH_CDO_ID_STR(
idstr, &cdoid,
DEBUG("Can't find CDO id %s in mreg table for RDMA transfer cleanup, likely 0-length\n",
idstr););
int64_t realsize = *(int64_t*)size;
if(realsize==-1) {
s=MSTRO_OK; // 0-size CDO has
DEBUG("Source CDO empty, doing NULL transfer\n");
s=MSTRO_OK;
} else {
WITH_CDO_ID_STR(idstr, &cdo, {
ERR("Nonempty CDO had no memory registration to clean up\n");
});
}
}
/* do not free the message on the PC -- the message handler that called us will do that */
DONE:
return s;
......
......@@ -60,12 +60,41 @@
#define WARN(...) LOG_WARN(MSTRO_LOG_MODULE_TRANSP,__VA_ARGS__)
#define ERR(...) LOG_ERR(MSTRO_LOG_MODULE_TRANSP,__VA_ARGS__)
/** hashing of memory registrations is done on an aggregate key: CDO ID, OFI domain and maybe OFI endpoint.
* Be sure to make * padding 0, and set EP to NULL if FI_MR_ENDPOINT is not set in the domain
* attributes */
struct mstro_transport_mreg_table_key {
struct mstro_cdo_id id; /**< CDO ID for which this registration is done */
struct fid_domain *domain; /**< domain in which this registration is valid */
struct fid_ep *ep; /**< endpoint (if FI_MR_ENDPOINT is set) */
};
/* https://ofiwg.github.io/libfabric/v1.10.1/man/fi_mr.3.html */
struct mstro_transport_mreg_table_entry {
UT_hash_handle hh; /**< hash on CDO id */
struct mstro_transport_mreg_table_key key; /**< hash key */
uint64_t refcount; /**< number of tickets issued for this ID */
void *addr; /**< the raw-ptr address registered */
uint64_t len; /**< length of the memory region */
struct fid_mr *mr_reg; /**< the OFI memory registration for this CDO's raw-ptr */
};
struct mstro_transport_rdma_pending_entry {
UT_hash_handle hh; /**< hash on CDO id */
struct mstro_cdo_id id; /**< key */
pthread_mutex_t lock;
pthread_cond_t cond;
int done;
};
/** table of memory registrations for transport purposes. Protected by @ref g_mstro_transport_mreg_table_lock */
struct mstro_transport_mreg_table_entry *g_mstro_transport_mreg_table = NULL;
static struct mstro_transport_mreg_table_entry *g_mstro_transport_mreg_table = NULL;
/** Lock protecting @ref g_mstro_transport_mreg_table */
pthread_mutex_t g_mstro_transport_mreg_table_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t g_mstro_transport_mreg_table_lock = PTHREAD_MUTEX_INITIALIZER;
/** condition var to signal change on @ref g_mstro_transport_mreg_table */
pthread_cond_t g_mstro_transport_mreg_table_cond = PTHREAD_COND_INITIALIZER;
static pthread_cond_t g_mstro_transport_mreg_table_cond = PTHREAD_COND_INITIALIZER;
static struct mstro_transport_rdma_pending_entry *g_mstro_transport_rdma_pending_table = NULL;
static pthread_mutex_t g_mstro_transport_rdma_pending_table_mtx = PTHREAD_MUTEX_INITIALIZER;
......@@ -232,8 +261,8 @@ mstro_transport_rdma_src_execute_bh(Mstro__Pool__TransferCompleted *tc)
&key, sizeof(struct mstro_transport_mreg_table_key),
regentry);
if (!regentry) {
ERR("RDMA transfer completion message does not map to a documented transfer on source\n");
status=MSTRO_FAIL;
DEBUG("RDMA transfer completion message does not require memory registration cleanup on source, likely 0-length\n");
status=MSTRO_NOENT;
goto BAILOUT_UNLOCK;
}
......@@ -589,3 +618,23 @@ mstro_transport_rdma_dtor(void* closure)
}
mstro_status
mstro_transport_rdma_finalize(void)
{
mstro_status s;
mstro_cleanup_waitup(s,
g_mstro_transport_mreg_table,
&g_mstro_transport_mreg_table_lock,
&g_mstro_transport_mreg_table_cond,
"RDMA transport memory registration");
if(s!=MSTRO_OK) {
struct mstro_transport_mreg_table_entry *entry, *tmp;
HASH_ITER(hh, g_mstro_transport_mreg_table, entry, tmp) {
WITH_CDO_ID_STR(idstr, &entry->key.id, {
WARN("Still have mreg entry %p: CDO %s\n", entry, idstr);
});
}
}
return s;
}
......@@ -281,19 +281,7 @@ mstro_transport_finalize()
notification might come late. Could also be if transport is PM initiative
*/
mstro_cleanup_waitup(s,
g_mstro_transport_mreg_table,
&g_mstro_transport_mreg_table_lock,
&g_mstro_transport_mreg_table_cond,
"RDMA transport memory registration");
if(s!=MSTRO_OK) {
struct mstro_transport_mreg_table_entry *entry, *tmp;
HASH_ITER(hh, g_mstro_transport_mreg_table, entry, tmp) {
WITH_CDO_ID_STR(idstr, &entry->key.id, {
WARN("Still have mreg entry %p: CDO %s\n", entry, idstr);
});
}
}
s=mstro_transport_rdma_finalize();
if (s != MSTRO_OK)
return s;
......
......@@ -40,43 +40,6 @@
#include "mstro_pool.pb-c.h"
/** hashing of memory registrations is done on an aggregate key: CDO ID, OFI domain and maybe OFI endpoint.
* Be sure to make * padding 0, and set EP to NULL if FI_MR_ENDPOINT is not set in the domain
* attributes */
struct mstro_transport_mreg_table_key {
struct mstro_cdo_id id; /**< CDO ID for which this registration is done */
struct fid_domain *domain; /**< domain in which this registration is valid */
struct fid_ep *ep; /**< endpoint (if FI_MR_ENDPOINT is set) */
};
/* https://ofiwg.github.io/libfabric/v1.10.1/man/fi_mr.3.html */
struct mstro_transport_mreg_table_entry {
UT_hash_handle hh; /**< hash on CDO id */
struct mstro_transport_mreg_table_key key; /**< hash key */
uint64_t refcount; /**< number of tickets issued for this ID */
void *addr; /**< the raw-ptr address registered */
uint64_t len; /**< length of the memory region */
struct fid_mr *mr_reg; /**< the OFI memory registration for this CDO's raw-ptr */
};
extern struct mstro_transport_mreg_table_entry *g_mstro_transport_mreg_table;
extern pthread_mutex_t g_mstro_transport_mreg_table_lock;
extern pthread_cond_t g_mstro_transport_mreg_table_cond;
struct mstro_transport_rdma_pending_entry {
UT_hash_handle hh; /**< hash on CDO id */
struct mstro_cdo_id id; /**< key */
pthread_mutex_t lock;
pthread_cond_t cond;
int done;
};
/* Simple hash to keep track of pending dst rdma transports, linking the
* completion callback to the main code track that must wait for proper
* completion from OFI CQ before changing CDO state and notifying the pool
* entry */
//extern struct mstro_transport_rdma_pending_entry *g_mstro_transport_rdma_pending_table;
extern mstro_event_domain g_transport_rdma_edom;
#define RDMA_EDOM_NAME "RDMA transport completion handling (as dst side)"
......@@ -114,5 +77,8 @@ mstro_transport_rdma_cb(mstro_event ev, void* closure);
void
mstro_transport_rdma_dtor(void* closure);
/**@brief Finalize RDMA transport layer */
mstro_status
mstro_transport_rdma_finalize(void);
#endif /* MAESTRO_TRANSPORT_RDMA_H_ */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment