diff --git a/attributes/maestro-schema.c b/attributes/maestro-schema.c index 52b1088b3803d2383659f9aab4b7e0bb9e91b562..0e82b902f6a3492429a881b78ae433eaac5bef29 100644 --- a/attributes/maestro-schema.c +++ b/attributes/maestro-schema.c @@ -2185,7 +2185,7 @@ BAILOUT: mstro_status mstro_attribute_dict_set(mstro_attribute_dict dict, const char *key, enum mstro_cdo_attr_value_type valtype, - void *val) + void *val, bool copy_value) { mstro_symbol sym; mstro_status status; @@ -2194,13 +2194,6 @@ mstro_attribute_dict_set(mstro_attribute_dict dict, const char *key, return MSTRO_INVARG; } - /* It's not clear if this function should should do the full - * namespace qualification, as the default namespace is a property - * of the CDO; but then the dictionary is likely not shared among - * multiple CDOs (while the schema is). OTOH, allocation of a - * temporary qualifying prefix string buffer would better be handled - * further up for efficiency ... */ - const char *fqkey=NULL; /* a const ref to the fully qualified version */ char *tmpfqkey=NULL; /* a locally allocated fq key if needed */ if(key[0]!='.') { @@ -2227,6 +2220,7 @@ mstro_attribute_dict_set(mstro_attribute_dict dict, const char *key, HASH_FIND(hh, dict->dict, &sym, sizeof(mstro_symbol), entry); if(entry==NULL) { DEBUG("Key |%s| found, but no value in attribute dictionary at %p, adding it\n", fqkey, dict); + /* lookup key in schema */ struct mstro_schema_attribute_ *decl; @@ -2266,7 +2260,8 @@ mstro_attribute_dict_set(mstro_attribute_dict dict, const char *key, } entry->kind = tdecl->parsed_type->kind; - + + HASH_ADD(hh, dict->dict, key, sizeof(mstro_symbol), entry); DEBUG("Added fresh entry for |%s|\n", fqkey); } else { @@ -2323,10 +2318,32 @@ mstro_attribute_dict_set(mstro_attribute_dict dict, const char *key, /* modify value */ if(val==NULL) { WARN("value is NULL, setting value of |%s| back to default not implemented\n", fqkey); + /* something like */ + /* status = mstro_attribute_dict__insert_default(dict, sym); */ + /* if(status!=MSTRO_OK) { */ + /* if status==MSTRO_NOENT : no default available */ + /* otherwise */ + /* ERR("Failed to create new entry with default value for %s: %d (%s)\n", */ + /* fqkey, status, mstro_status_description(status)); */ + /* goto BAILOUT; */ + /* } */ } WARN("Not checking type restrictions\n"); - entry->val = val; - entry->user_owned_val = true; + if(copy_value) { + status = mstro_attribute_val__compute_size(entry->kind, NULL, val, &entry->valsize); + if(status!=MSTRO_OK) { + ERR("Cannot compute value size\n"); + goto BAILOUT; + } + if(entry->val==NULL) { + entry->val = malloc(entry->valsize); + } + memcpy(entry->val, val, entry->valsize); + entry->user_owned_val = false; + } else { + entry->val = val; + entry->user_owned_val = true; + } status = mstro_attribute_entry__set_size(entry, entry->kind); if(status!=MSTRO_OK) { diff --git a/attributes/maestro-schema.h b/attributes/maestro-schema.h index f6ae7170bb7b0b08f74902afe6d2dadd062b6d25..d01715866f1e2ae28f3bc1c5edc11ec253fc3a25 100644 --- a/attributes/maestro-schema.h +++ b/attributes/maestro-schema.h @@ -213,10 +213,17 @@ mstro_attribute_dict_get(mstro_attribute_dict dict, * * val==NULL is handled specially: It will reset the attribute to the default, or 'UNSET' if no default exists. * This may be lead to an error being returned, if the attribute is required and has no default. + * + * If @arg copy_value is given, create an internal allocation for the + * value and copy @arg val into it. Otherwise only store the reference + * (which must remain valid until the dictionary is disposed, so in + * particular stack allocated values are forbidden if the dictionary + * outlives their scope). */ mstro_status mstro_attribute_dict_set(mstro_attribute_dict dict, const char *key, enum mstro_cdo_attr_value_type valtype, - void *val); + void *val, + bool copy_value); #endif diff --git a/include/maestro/attributes.h b/include/maestro/attributes.h index 3bbaddb01a38df7815204d325446d3c643ad62a7..4125bd3eb612046bc2118de499be9352bf1c8b7c 100644 --- a/include/maestro/attributes.h +++ b/include/maestro/attributes.h @@ -279,6 +279,10 @@ typedef struct mstro_cdo_ *mstro_cdo; ** @param[in] key Attribute key string ** @param[in] val Pointer to the value to be set ** + ** BEWARE: The memory pointed to by val must remain valid for the + ** entire lifetime of the CDO. Stack-allocated variables passed in by + ** address are a source of ugly to trace bugs. + ** ** @returns A status code, ::MSTRO_OK on success. **/ mstro_status diff --git a/maestro/attributes.c b/maestro/attributes.c index 01272fe7bf5a9826b0e697cb3a2efe228e4b836c..16778a47e617299d4ab3697e348dda2f8c4c6caa 100644 --- a/maestro/attributes.c +++ b/maestro/attributes.c @@ -128,7 +128,7 @@ mstro_cdo_attribute_set(mstro_cdo cdo, const char* key, void* val) mstro_status status = mstro_attribute_dict_set(cdo->attributes, fqkey, MSTRO_CDO_ATTR_VALUE_INVALID, /* we dont help in checking */ - val); + val, false); if(tmpfqkey) free(tmpfqkey); return status; @@ -139,10 +139,14 @@ mstro_cdo_attribute_get(mstro_cdo cdo, const char* key, enum mstro_cdo_attr_value_type *type, const void ** val_p) { - if (key == NULL || cdo == NULL) + if (key == NULL || cdo == NULL) { + ERR("Invalid CDO or key\n"); return MSTRO_INVARG; - if(val_p==NULL) + } + if(val_p==NULL) { + ERR("NULL value destination location\n"); return MSTRO_INVOUT; + } const char *fqkey=key; char *tmpkey = NULL; diff --git a/maestro/cdo.c b/maestro/cdo.c index 059f44799e8f17bb8d04a552c7881bad6d2cce5a..67a945c8ee5b2bedc3e6d8e52d13487ef4bde438 100644 --- a/maestro/cdo.c +++ b/maestro/cdo.c @@ -434,7 +434,8 @@ mstro_cdo_declaration_seal(mstro_cdo cdo) /* create mamba array for raw ptr */ int64_t raw_ptr_size; /* size_t is not portable enough, so the schema says '64 bit int' */ const int64_t * val; - status = mstro_cdo_attribute_get(cdo, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, NULL, (const void**)&val); + status = mstro_cdo_attribute_get(cdo, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + NULL, (const void**)&val); /* DEBUG("Retrieved size attribute: type %d, val %p, *val %" PRIi64 "\n", */ /* type, val, *((int64_t *)val)); */ @@ -443,7 +444,7 @@ mstro_cdo_declaration_seal(mstro_cdo cdo) cdo->name); goto BAILOUT; } - + raw_ptr_size = *val; if(raw_ptr_size<0 && cdo->raw_ptr!=NULL) { ERR("CDO %s has negative local-size but non-NULL raw-ptr, unsupported\n", cdo->name); @@ -641,10 +642,10 @@ mstro_cdo_offer(mstro_cdo cdo) mstro_cdo_block_until(cdo, MSTRO_CDO_STATE_OFFERED, "OFFERED"); { - const void* val; + const int64_t* val; enum mstro_cdo_attr_value_type type; status = mstro_cdo_attribute_get(cdo, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - NULL, &val); + NULL, (const void**)&val); if(status!=MSTRO_OK) { ERR("CDO has no local-size\n"); return MSTRO_FAIL; @@ -653,7 +654,7 @@ mstro_cdo_offer(mstro_cdo cdo) status = mstro_stats_add_counter(MSTRO_STATS_CAT_POOL, MSTRO_STATS_L_BYTES_POOLED, /* type 0 CDOs have no size */ - *(const int64_t*)val==-1? 0 : *(const int64_t*)val); + *val==-1? 0 : *val); } return MSTRO_OK; diff --git a/maestro/ofi.c b/maestro/ofi.c index 6851c21e7e0d2639ee4f2d526cb6b432386902a6..b96d10ac0642c13e5fbd6529d05c4c41f8cd7c4e 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -2090,19 +2090,20 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) id.qw1 = cdoid.qw[1]; Mstro__Pool__TransferTicket ticket = MSTRO__POOL__TRANSFER_TICKET__INIT; ticket.cdoid = &id; - - const void *size; - enum mstro_cdo_attr_value_type type; - if (! (MSTRO_OK == mstro_cdo_attribute_get(src_cdo, "local_size", &type, &size))) { - ERR("Couldn't retrieve CDO %s local_size needed for transport\n", src_cdo->name); - return MSTRO_FAIL; - } - if (!g_mio_available || - (( *(size_t*)size % getpagesize()) != 0 - && init->methods->supported[0] == MSTRO__POOL__TRANSPORT_KIND__MIO) + + const int64_t *size; + enum mstro_cdo_attr_value_type type; + if (! (MSTRO_OK == mstro_cdo_attribute_get(src_cdo, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + &type, (const void**)&size))) { + ERR("Couldn't retrieve CDO %s local_size needed for transport\n", src_cdo->name); + return MSTRO_FAIL; + } + if (!g_mio_available || + (( *size % getpagesize()) != 0 + && init->methods->supported[0] == MSTRO__POOL__TRANSPORT_KIND__MIO) ){ - init->methods->supported[0] = MSTRO__POOL__TRANSPORT_KIND__GFS; - WARN("Not issuing a ticket with MIO since CDO size (%zu) is not a multiple of the page size(%d). Will use maestro-core default transport (GFS)\n", *(size_t*)size, getpagesize()); + init->methods->supported[0] = MSTRO__POOL__TRANSPORT_KIND__GFS; + WARN("Not issuing a ticket with MIO since CDO size (%zu) is not a multiple of the page size(%d). Will use maestro-core default transport (GFS)\n", *size, getpagesize()); } status = @@ -2117,8 +2118,8 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) Mstro__Pool__TransferTicketGFS gfs = MSTRO__POOL__TRANSFER_TICKET_GFS__INIT; Mstro__Pool__TransferTicketMIO mio = MSTRO__POOL__TRANSFER_TICKET_MIO__INIT; - ticket.data_size = *(const uint64_t*)size; - ticket.gfs = &gfs; + ticket.data_size = *(const int64_t*)size; + ticket.gfs = &gfs; /* even conditionals are problematic, had to take out protobuf INITs ----^ */ if (ticket.ticket_case == MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS) { diff --git a/protocols/mstro_pool.pb-c.c b/protocols/mstro_pool.pb-c.c index e4991229422e8da3d28fe538340b06f8fef07699..bab9c01cc267df474f01dc576f86a66e3668cf68 100644 --- a/protocols/mstro_pool.pb-c.c +++ b/protocols/mstro_pool.pb-c.c @@ -4178,7 +4178,7 @@ static const ProtobufCFieldDescriptor mstro__pool__transfer_ticket__field_descri "data_size", 10, PROTOBUF_C_LABEL_NONE, - PROTOBUF_C_TYPE_FIXED64, + PROTOBUF_C_TYPE_SFIXED64, 0, /* quantifier_offset */ offsetof(Mstro__Pool__TransferTicket, data_size), NULL, diff --git a/protocols/mstro_pool.pb-c.h b/protocols/mstro_pool.pb-c.h index 2e7f3b0afbb7ced8c95aa68d26ccf9c95a4221b5..557a8ef19003b81e43415f0db10b3442f38d38ea 100644 --- a/protocols/mstro_pool.pb-c.h +++ b/protocols/mstro_pool.pb-c.h @@ -777,7 +777,7 @@ struct _Mstro__Pool__TransferTicket /* * FIXME: proper attribute structure, plus eventual Mamba array info */ - uint64_t data_size; + int64_t data_size; Mstro__Pool__TransferTicket__TicketCase ticket_case; union { Mstro__Pool__TransferTicketGFS *gfs; diff --git a/protocols/mstro_pool.proto b/protocols/mstro_pool.proto index 0c3e2c8eadd66f94b3b3684e70a3afee277ca0c3..cdcefddb59c33e979150bbc43af2b91c74b7de82 100644 --- a/protocols/mstro_pool.proto +++ b/protocols/mstro_pool.proto @@ -361,7 +361,7 @@ message TransferTicket { }; /* FIXME: proper attribute structure, plus eventual Mamba array info */ - fixed64 data_size = 10; + sfixed64 data_size = 10; }; diff --git a/tests/check_schema_parse.c b/tests/check_schema_parse.c index 0a2acdb48c7036f2fbc2687bc534f848404feb99..186fc837868132267c8f34b7c6f4babb54db20f2 100644 --- a/tests/check_schema_parse.c +++ b/tests/check_schema_parse.c @@ -159,21 +159,21 @@ CHEAT_TEST(core_schema_parse_builtin, int64_t newval = 42; cheat_assert(MSTRO_OK== - mstro_attribute_dict_set(dict, key, MSTRO_CDO_ATTR_VALUE_int64, &newval)); + mstro_attribute_dict_set(dict, key, MSTRO_CDO_ATTR_VALUE_int64, &newval, false)); /* wrong type expectation must fail */ cheat_assert(MSTRO_INVARG== - mstro_attribute_dict_set(dict, key, MSTRO_CDO_ATTR_VALUE_uint64, &newval)); + mstro_attribute_dict_set(dict, key, MSTRO_CDO_ATTR_VALUE_uint64, &newval, false)); /* illegal key must fail */ char *wrongkey = ".maestro.core.cdo.funny"; cheat_assert(MSTRO_NOENT== - mstro_attribute_dict_set(dict, wrongkey, MSTRO_CDO_ATTR_VALUE_int64, &newval)); + mstro_attribute_dict_set(dict, wrongkey, MSTRO_CDO_ATTR_VALUE_int64, &newval, false)); /* legal, but previously unset key */ mstro_timestamp ts = { 1,2,3}; char *ecmwfdate = ".maestro.ecmwf.date"; cheat_assert(MSTRO_OK== - mstro_attribute_dict_set(dict, ecmwfdate, MSTRO_CDO_ATTR_VALUE_INVALID, &ts)); + mstro_attribute_dict_set(dict, ecmwfdate, MSTRO_CDO_ATTR_VALUE_INVALID, &ts, false)); /* and query the TS */ const void *tsval; diff --git a/tests/check_transport_gfs.c b/tests/check_transport_gfs.c index 87bc1ff2cf0608dfc27337f61d763d9838c62610..a36df6fca2361ecf70043248a61df5246d0a9474 100644 --- a/tests/check_transport_gfs.c +++ b/tests/check_transport_gfs.c @@ -44,13 +44,14 @@ #include <inttypes.h> #include <sys/stat.h> #include <errno.h> +#include "maestro/logging.h" CHEAT_DECLARE ( void - transport_checksum(const char* name, void* rawptr, uint64_t size) + transport_checksum(const char* name, void* rawptr, size_t size) { unsigned char x; - int i; + size_t i; for (i=0,x=0;i<size; i++) x ^= ((unsigned char*)rawptr)[i]; @@ -62,7 +63,7 @@ CHEAT_DECLARE ( /* this tests GFS transport by using this single thread as both Producer and Consumer */ CHEAT_TEST(transport_gfs_works, size_t data_count = 1031; - size_t bytes = data_count*sizeof(double); + int64_t bytes = data_count*sizeof(double); double src_data[data_count]; for(size_t i=0; i<data_count; i++) { src_data[i]=random(); @@ -83,22 +84,28 @@ CHEAT_DECLARE ( src_data)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_set(cdo_src, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, - &bytes)); + (void**)&bytes)); + cheat_yield(); + enum mstro_cdo_attr_value_type ttype; - const void* tval; - + const int64_t* tval=NULL; cheat_assert(MSTRO_OK == mstro_cdo_attribute_get(cdo_src, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, &ttype, - &tval)); - cheat_assert(*(const uint64_t*)tval==bytes); + (const void**)&tval)); + cheat_assert(*tval==bytes); + cheat_assert(ttype==MSTRO_CDO_ATTR_VALUE_int64); + cheat_yield(); + cheat_assert(MSTRO_OK == mstro_cdo_declaration_seal(cdo_src)); cheat_assert(MSTRO_OK == mstro_cdo_attribute_get(cdo_src, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, &ttype, - &tval)); - cheat_assert(*(const uint64_t*)tval==bytes); - + (const void**)&tval)); + cheat_assert(*tval==bytes); + cheat_assert(ttype==MSTRO_CDO_ATTR_VALUE_int64); + cheat_yield(); + cheat_assert(MSTRO_OK == mstro_cdo_declare(name, MSTRO_ATTR_DEFAULT, &cdo_dst)); /* Particular case where the offerer is also the src, so let us execute @@ -106,8 +113,8 @@ CHEAT_DECLARE ( * accessible anymore */ /* Manually issue a ticket for src */ - Mstro__Pool__UUID* id; // bypassed - Mstro__Pool__TransferTicket__TicketCase method; + //Mstro__Pool__UUID id = MSTRO__POOL__UUID__INIT; // unused + //Mstro__Pool__TransferTicket__TicketCase method; Mstro__Pool__TransferTicket ticket = MSTRO__POOL__TRANSFER_TICKET__INIT; Mstro__Pool__TransferTicketGFS gfs = MSTRO__POOL__TRANSFER_TICKET_GFS__INIT; gfs.path = filename_dst; @@ -129,14 +136,20 @@ CHEAT_DECLARE ( cheat_assert(MSTRO_OK == mstro_transport_gfs_dst_execute(cdo_dst, &ticket)); double* data; - size_t len; - enum mstro_cdo_attr_value_type type; - const void* val; + int64_t len; + const int64_t* val=NULL; cheat_assert(MSTRO_OK == mstro_cdo_access_ptr(cdo_dst, (void**)&data, NULL)); - cheat_assert(MSTRO_OK == mstro_cdo_attribute_get(cdo_dst, "local_size", &type, &val)); - len = *(size_t*)val; + cheat_assert(MSTRO_OK == mstro_cdo_attribute_get(cdo_dst, "scope.local-size", + &ttype, (const void**)&val)); + cheat_assert(ttype==MSTRO_CDO_ATTR_VALUE_int64); + cheat_yield(); + + DEBUG("val %p, *val %" PRIi64 "\n", val, *val); + len = *val; len /= sizeof(double); + DEBUG("val %p, *val %" PRIi64 ", len %" PRIi64 " \n", val, *val, len); + transport_checksum("pioneer arrival", data, len); cheat_assert(MSTRO_OK == mstro_cdo_dispose(cdo_dst)); diff --git a/transport/gfs.c b/transport/gfs.c index 5359b292e595b45473212a8e3bf9fd1ffd0e7916..8e25ac4a4789b0d263da16cc471668b32b349f45 100644 --- a/transport/gfs.c +++ b/transport/gfs.c @@ -1,5 +1,7 @@ #include "maestro/logging.h" #include "transport.h" +#include <inttypes.h> +#include "attributes/maestro-schema.h" #include <unistd.h> @@ -60,44 +62,47 @@ mstro_transport_gfs_dst_execute(mstro_cdo dst, return MSTRO_INVARG; } INFO("Executing gfs transport dst side for CDO %s\n", dst->name); - + mstro_status status; - uint64_t len = ticket->data_size; + int64_t len = ticket->data_size; char* data; - DEBUG("Incoming CDO, size %zu\n", len); + DEBUG("Incoming CDO, size %" PRIi64 "\n", len); -/* FIXME code duplication, use mstro_transport_get_dst_buffer*/ + /* FIXME code duplication, use mstro_transport_get_dst_buffer*/ if (dst->raw_ptr == NULL) { // TODO make it smarter if (len == 0 || len > 10000000000) { - ERR("CDO size %d likely unreasonable for transport\n", len); + ERR("CDO size %" PRIi64 " likely unreasonable for transport\n", len); return MSTRO_FAIL; } - INFO ("CDO size: %zu\n", len); + INFO ("CDO size: %" PRIi64 "zu\n", len); data = malloc(sizeof(char)*len); if (data == NULL) { ERR("No memory left in DRAM to load %s from GFS transport\n", ticket->gfs->path); return MSTRO_FAIL; + } else { + DEBUG("Allocated %" PRIi64 " byte GFS transport buffer\n", len); } } else { - const void *available_size; + const int64_t *available_size; enum mstro_cdo_attr_value_type type; status = mstro_cdo_attribute_get( - dst, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, &type, &available_size); + dst, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, &type, (const void**)&available_size); if(status!=MSTRO_OK) { - ERR("Failed to fetch size attribute\n"); + ERR("Failed to fetch %s attribute\n", MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE); return MSTRO_FAIL; } - if(*(const uint64_t*)available_size<len) { - ERR("local CDO handle has raw-ptr allocation, but it is too small for incoming CDO: have %zu, need %zu\n", - *(const uint64_t*)available_size, len); + if(*available_size<len) { + ERR("local CDO handle has raw-ptr allocation, but it is too small for incoming CDO:" + " have %" PRIi64 ", need %" PRIi64 "\n", + *available_size, len); return MSTRO_FAIL; } else { - DEBUG("Pre-allocated RAW-PTR (have %zu, need %zu), good\n", - *(const uint64_t*)available_size, len); + DEBUG("Pre-allocated RAW-PTR (have %" PRIi64 ", need %" PRIi64 "), good\n", + *available_size, len); data = dst->raw_ptr; } } @@ -126,20 +131,46 @@ mstro_transport_gfs_dst_execute(mstro_cdo dst, return MSTRO_FAIL; } - if (!(MSTRO_OK == mstro_cdo_attribute_set( - dst, MSTRO_ATTR_CORE_CDO_RAW_PTR, data))) { - ERR("Failed to set raw-ptr attribute of CDO %s for transport\n", - dst->name); - return MSTRO_FAIL; + + status = mstro_cdo_attribute_set(dst, MSTRO_ATTR_CORE_CDO_RAW_PTR, data); + if(status!=MSTRO_OK) { + ERR("Failed to set %s attribute of CDO %s for transport\n", + MSTRO_ATTR_CORE_CDO_RAW_PTR, dst->name); + return status; } - if (!(MSTRO_OK == mstro_cdo_attribute_set( - dst, ".maestro.core.cdo.scope.local-size", &len))) { - ERR("Failed to set local_size attribute of CDO %s for transport\n", - dst->name); - return MSTRO_FAIL; + /* need to use internal API to achieve copying of value which is on our stack */ + status = mstro_attribute_dict_set(dst->attributes, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + MSTRO_CDO_ATTR_VALUE_INVALID, &len, true); + if(status!=MSTRO_OK) { + ERR("Failed to set %s attribute of CDO %s for transport dst cdo\n", + MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, dst->name); + return status; } + /* const int64_t *v1=NULL; */ + /* const int64_t *v2=NULL; */ + /* if (!(MSTRO_OK == mstro_cdo_attribute_get( */ + /* dst, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, NULL, &v1))) { */ + /* ERR("Failed to get %s attribute of CDO %s for transport\n", */ + /* MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, dst->name); */ + /* return MSTRO_FAIL; */ + /* } */ + /* if (!(MSTRO_OK == mstro_cdo_attribute_get( */ + /* dst, ".maestro.core.cdo.scope.local-size", NULL, &v2))) { */ + /* ERR("Failed to get local-size attribute of CDO %s for transport\n", */ + /* MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, dst->name); */ + /* return MSTRO_FAIL; */ + /* } */ + /* if(v1!=v2) { */ + /* ERR("different key string pointers yield different value pointers\n"); */ + /* abort(); */ + /* } */ + /* if(*v1!=*v2) { */ + /* ERR("different key string pointers yield different values\n"); */ + /* abort(); */ + /* } */ + INFO("dst app successfully executed transport CDO %s\n", dst->name); if (! (ticket->gfs->keep_file)) { diff --git a/transport/mio.c b/transport/mio.c index a281318a65472e8f69b30cc23ecce32fe113362f..9ce1511b274c89c4f9a80269342e780affc2e8a4 100644 --- a/transport/mio.c +++ b/transport/mio.c @@ -18,19 +18,20 @@ struct obj_io_cb_args { }; inline -size_t -mstro_mio_roundup_size(size_t raw) +int64_t +mstro_mio_roundup_size(int64_t raw) { - size_t remainder = raw % MSTRO_MIO_PAGESIZE; - if (remainder == 0) - return raw; - size_t missing = MSTRO_MIO_PAGESIZE - remainder; - size_t ret = raw + missing; - - /* XXX MIO reads and writes multiples of page size, therefore `missing` - * bytes of junk will be read/written */ - - return ret; + assert(raw>=0); + size_t remainder = raw % MSTRO_MIO_PAGESIZE; + if (remainder == 0) + return raw; + size_t missing = MSTRO_MIO_PAGESIZE - remainder; + int64_t ret = raw + missing; + + /* XXX MIO reads and writes multiples of page size, therefore `missing` + * bytes of junk will be read/written */ + + return ret; } int mio_cmd_wait_on_op(struct mio_op *op) @@ -535,74 +536,76 @@ mstro_transport_mio_dst_execute(mstro_cdo dst, INFO("Executing mio transport dst side for CDO %s\n", dst->name); -uint64_t len = ticket->data_size; -void* data = NULL; -DEBUG("receiving %zu bytes\n", len); - mstro_status s; - if (! (MSTRO_OK == mstro_transport_get_dst_buffer(dst, len, &data))) { - /* Already printed an error */ - return MSTRO_FAIL; - } - - if (!(MSTRO_OK == mstro_cdo_attribute_set( - dst, ".maestro.core.cdo.scope.local-size", &len))) { - ERR("Failed to set local_size attribute of CDO %s for transport\n", - dst->name); - return MSTRO_FAIL; - } - if (!(MSTRO_OK == mstro_cdo_attribute_set( - dst, MSTRO_ATTR_CORE_CDO_RAW_PTR, data))) { - ERR("Failed to set raw-ptr attribute of CDO %s for transport\n", - dst->name); - return MSTRO_FAIL; - } - -// FIXME chh asserts in the right place the size does not make trouble -len = mstro_mio_roundup_size(len); -DEBUG("rounding up len to %zu bytes to accomodate MIO\n", len); -// FIXME chh checks why this line is necessary not to make MIO panic -bzero(data, len); -DEBUG("buffer of %zu bytes ready\n", len); - - -// struct mio_thread thread; -// mio_thread_init(&thread); - - struct mstro_cdo_id oid; - struct mstro_cdo_id semid; - memcpy(&semid, ticket->mio->semid.data, ticket->mio->semid.len); - memcpy(&oid, ticket->mio->objid.data, ticket->mio->objid.len); - - WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)&(semid), - INFO("Sanity check semaphore ID from ticket: %s\n", - idstr);); - WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)&(oid), - INFO("Sanity check object ID from ticket: %s\n", - idstr);); - - if (! (MSTRO_OK == mstro_mio_obj_read_sync((struct mio_obj_id*)&oid, data, len, (struct mio_obj_id*)&semid))) { - /* Already printed an error */ - return MSTRO_FAIL; - } - + int64_t len = ticket->data_size; + void* data = NULL; + DEBUG("receiving %zu bytes\n", len); + mstro_status s; + if (! (MSTRO_OK == mstro_transport_get_dst_buffer(dst, len, &data))) { + /* Already printed an error */ + return MSTRO_FAIL; + } + + if (!(MSTRO_OK == mstro_attribute_dict_set( + dst, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, MSTRO_CDO_ATTR_VALUE_INVALID, + &len, true))) { + ERR("Failed to set %s attribute of CDO %s for transport\n", + MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + dst->name); + return MSTRO_FAIL; + } + if (!(MSTRO_OK == mstro_cdo_attribute_set( + dst, MSTRO_ATTR_CORE_CDO_RAW_PTR, data))) { + ERR("Failed to set raw-ptr attribute of CDO %s for transport\n", + dst->name); + return MSTRO_FAIL; + } + + // FIXME chh asserts in the right place the size does not make trouble + len = mstro_mio_roundup_size(len); + DEBUG("rounding up len to %" PRIi64 " bytes to accomodate MIO\n", len); + // FIXME chh checks why this line is necessary not to make MIO panic + bzero(data, len); + DEBUG("buffer of %zu bytes ready\n", len); + + + // struct mio_thread thread; + // mio_thread_init(&thread); + + struct mstro_cdo_id oid; + struct mstro_cdo_id semid; + memcpy(&semid, ticket->mio->semid.data, ticket->mio->semid.len); + memcpy(&oid, ticket->mio->objid.data, ticket->mio->objid.len); + + WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)&(semid), + INFO("Sanity check semaphore ID from ticket: %s\n", + idstr);); + WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)&(oid), + INFO("Sanity check object ID from ticket: %s\n", + idstr);); + + if (! (MSTRO_OK == mstro_mio_obj_read_sync((struct mio_obj_id*)&oid, data, len, (struct mio_obj_id*)&semid))) { + /* Already printed an error */ + return MSTRO_FAIL; + } + INFO("dst app successfully executed transport CDO %s\n", dst->name); - + #if 0 if (! (ticket->mio->keep_obj)) { - /* Remove transport object itself */ - if (! (MSTRO_OK == mstro_mio_obj_unlink((struct mio_obj_id*)&oid))) { - WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)&(oid), - ERR("MIO failed to remove transport obj %s\n", - idstr);); - } else { - WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)&(oid), - INFO("MIO removed transport obj %s successfully\n", - idstr);); - } + /* Remove transport object itself */ + if (! (MSTRO_OK == mstro_mio_obj_unlink((struct mio_obj_id*)&oid))) { + WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)&(oid), + ERR("MIO failed to remove transport obj %s\n", + idstr);); + } else { + WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)&(oid), + INFO("MIO removed transport obj %s successfully\n", + idstr);); + } } #endif - -// mio_thread_fini(&thread); - - return MSTRO_OK; + + // mio_thread_fini(&thread); + + return MSTRO_OK; } diff --git a/transport/transport.c b/transport/transport.c index 4334d4652c576f5b79da51b8f8257567db78380b..bf8cb50576506c07356cd48114abd53e4b3abd02 100644 --- a/transport/transport.c +++ b/transport/transport.c @@ -81,13 +81,13 @@ mstro_transport_get_dst_buffer(mstro_cdo dst, size_t len, void** data) ERR("Failed to fetch size attribute\n"); return MSTRO_FAIL; } - if(*(const uint64_t*)available_size<len) { + if(*(const int64_t*)available_size<len) { ERR("local CDO handle has raw-ptr allocation, but it is too small for incoming CDO: have %zu, need %zu\n", - *(const uint64_t*)available_size, len); + *(const int64_t*)available_size, len); return MSTRO_FAIL; } else { DEBUG("Pre-allocated RAW-PTR (have %zu, need %zu), good\n", - *(const uint64_t*)available_size, len); + *(const int64_t*)available_size, len); *data = dst->raw_ptr; } } @@ -105,9 +105,9 @@ mstro_transport_get_datalen(mstro_cdo src, struct mstro_transport_datalen* dl) INFO ("Found a raw ptr in CDO %s\n", src->name); dl->data = src->raw_ptr; if (!(MSTRO_OK == mstro_cdo_attribute_get( - src, "local_size", &type, &val))) { - ERR("Failed to get attribute of CDO %s for transport\n", src->name); - return MSTRO_FAIL; + src, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, &type, &val))) { + ERR("Failed to get size attribute of CDO %s for transport\n", src->name); + return MSTRO_FAIL; } dl->len = *(size_t*)val; INFO("CDO size is %zu\n", dl->len); @@ -135,51 +135,46 @@ mstro_transport_get_datalen(mstro_cdo src, struct mstro_transport_datalen* dl) mstro_status mstro_transport_init() { - int status; - - srand(time(NULL)); /* We'll need that to generate random IDs */ - + srand(time(NULL)); /* We'll need that to generate random IDs */ + #ifdef HAVE_MIO - /* MIO */ - if(g_mio_available) { - DEBUG("MIO already initialized\n"); - } else { - const char* env_mio_config = getenv(MSTRO_ENV_MIO_CONFIG); - if (env_mio_config == NULL - || (strlen(env_mio_config) < 1) ) { - ERR("MIO available, but no configuration file in %s, DISABLING MIO\n", - MSTRO_ENV_MIO_CONFIG); - } else { - INFO("Initializing MIO with configuration file %s\n", env_mio_config); - status = mio_init(env_mio_config); - if (status != 0) { - ERR("Failed to initialize MIO (errno: %d\n)", status); - return MSTRO_FAIL; - } - /* since we can't do that at the right time, at least put it into the termination sequence */ - atexit(mio_fini); - g_mio_available = true; - } - } + /* MIO */ + if(g_mio_available) { + DEBUG("MIO already initialized\n"); + } else { + const char* env_mio_config = getenv(MSTRO_ENV_MIO_CONFIG); + if (env_mio_config == NULL + || (strlen(env_mio_config) < 1) ) { + ERR("MIO available, but no configuration file in %s, DISABLING MIO\n", + MSTRO_ENV_MIO_CONFIG); + } else { + INFO("Initializing MIO with configuration file %s\n", env_mio_config); + status = mio_init(env_mio_config); + if (status != 0) { + ERR("Failed to initialize MIO (errno: %d\n)", status); + return MSTRO_FAIL; + } + /* since we can't do that at the right time, at least put it into the termination sequence */ + atexit(mio_fini); + g_mio_available = true; + } + } #endif -BAILOUT: - return MSTRO_OK; + return MSTRO_OK; } mstro_status mstro_transport_finalize() { - int status; - #ifdef HAVE_MIO - if(g_mio_available) { - INFO("Skipping MIO finalize -- otherwise we could not-reinit maestro\n"); - // FIXME: This really needs to be fixed in MIO - //mio_fini(); /* returns void */ - //g_mio_enabled = false; - } + if(g_mio_available) { + INFO("Skipping MIO finalize -- otherwise we could not-reinit maestro\n"); + // FIXME: This really needs to be fixed in MIO + //mio_fini(); /* returns void */ + //g_mio_enabled = false; + } #endif - return MSTRO_OK; + return MSTRO_OK; } mstro_status @@ -195,76 +190,78 @@ mstro_transport_ticket_issue( mstro_status status; - enum mstro_cdo_attr_value_type type; - const void* val; - if (! (MSTRO_OK == mstro_cdo_attribute_get(src_cdo, "local_size", &type, &val))) { - ERR("Couldn't retrieve CDO %s local_size needed for transport\n", src_cdo->name); - return MSTRO_FAIL; - } - ticket->data_size = *(size_t*)val; - - switch (ticket->ticket_case) { - case MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS: ; - //char* filename = malloc(sizeof(char)*MSTRO_MAX_PATH_LEN); - //sprintf(filename, MSTRO_TRANSPORT_GFS_PATH "_%s__%ld",src_cdo->name, random()); // FIXME collision detection? although unlikely - // gfs.path = filename; - ticket->gfs->path = MSTRO_TRANSPORT_DEBUG_PATH; - ticket->gfs->keep_file = 0; // Arbitrarily rm the transport file on dst - break; - - case MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO: ; //FIXME PM chooses a semid at initiate-transfer time - struct mstro_cdo_id* semid; - semid = malloc(sizeof(struct mio_obj_id)); - if (semid == NULL) { - ERR("No more memory.\n"); - return MSTRO_FAIL; - } - struct mstro_cdo_id* objid; - objid = malloc(sizeof(struct mio_obj_id)); - if (objid == NULL) { - ERR("No more memory.\n"); - return MSTRO_FAIL; - } - char* semname = NULL; - mstro_str_random(&semname, MSTRO_MIO_SEM_STR_MAXLEN); - if (semname == NULL) { - ERR("Couldn't prepare an id for semaphore obj\n"); - return MSTRO_FAIL; - } - status = mstro_cdo_id_from_name(semname, semid); /* So we do collision - detection in only - one place */ - if (status != MSTRO_OK) { - ERR("Couldn't make an id from name for semaphore obj\n"); - return MSTRO_FAIL; - } - - WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)semid, - INFO("Semaphore has ID: %s\n", - idstr);); - WITH_CDO_ID_STR(idstr, &(src_cdo->gid), - INFO("(CDO associated has ID: %s)\n", - idstr);); - - assert(sizeof(struct mstro_cdo_id) == 2*sizeof(uint64_t)); - assert(sizeof(struct mstro_cdo_id) == sizeof(struct mio_obj_id)); - - ticket->mio->semid.len = sizeof(struct mstro_cdo_id); - ticket->mio->semid.data = semid; - ticket->mio->objid.len = sizeof(struct mstro_cdo_id); - objid->qw[0] = ticket->cdoid->qw0; - objid->qw[1] = ticket->cdoid->qw1; - ticket->mio->objid.data = objid; - - ticket->mio->keep_obj = 0; - break; - - default: - ERR("No ticket issueing case for this transport\n"); - return MSTRO_UNIMPL; - } + enum mstro_cdo_attr_value_type type; + const void* val; + if (! (MSTRO_OK == mstro_cdo_attribute_get(src_cdo, + MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, + &type, &val))) { + ERR("Couldn't retrieve CDO %s size attribute needed for transport\n", src_cdo->name); + return MSTRO_FAIL; + } + ticket->data_size = *(size_t*)val; + + switch (ticket->ticket_case) { + case MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS: ; + //char* filename = malloc(sizeof(char)*MSTRO_MAX_PATH_LEN); + //sprintf(filename, MSTRO_TRANSPORT_GFS_PATH "_%s__%ld",src_cdo->name, random()); // FIXME collision detection? although unlikely + // gfs.path = filename; + ticket->gfs->path = MSTRO_TRANSPORT_DEBUG_PATH; + ticket->gfs->keep_file = 0; // Arbitrarily rm the transport file on dst + break; + + case MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO: ; //FIXME PM chooses a semid at initiate-transfer time + struct mstro_cdo_id* semid; + semid = malloc(sizeof(struct mio_obj_id)); + if (semid == NULL) { + ERR("No more memory.\n"); + return MSTRO_FAIL; + } + struct mstro_cdo_id* objid; + objid = malloc(sizeof(struct mio_obj_id)); + if (objid == NULL) { + ERR("No more memory.\n"); + return MSTRO_FAIL; + } + char* semname = NULL; + mstro_str_random(&semname, MSTRO_MIO_SEM_STR_MAXLEN); + if (semname == NULL) { + ERR("Couldn't prepare an id for semaphore obj\n"); + return MSTRO_FAIL; + } + status = mstro_cdo_id_from_name(semname, semid); /* So we do collision + detection in only + one place */ + if (status != MSTRO_OK) { + ERR("Couldn't make an id from name for semaphore obj\n"); + return MSTRO_FAIL; + } + + WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)semid, + INFO("Semaphore has ID: %s\n", + idstr);); + WITH_CDO_ID_STR(idstr, &(src_cdo->gid), + INFO("(CDO associated has ID: %s)\n", + idstr);); + + assert(sizeof(struct mstro_cdo_id) == 2*sizeof(uint64_t)); + assert(sizeof(struct mstro_cdo_id) == sizeof(struct mio_obj_id)); + + ticket->mio->semid.len = sizeof(struct mstro_cdo_id); + ticket->mio->semid.data = semid; + ticket->mio->objid.len = sizeof(struct mstro_cdo_id); + objid->qw[0] = ticket->cdoid->qw0; + objid->qw[1] = ticket->cdoid->qw1; + ticket->mio->objid.data = objid; + + ticket->mio->keep_obj = 0; + break; + + default: + ERR("No ticket issueing case for this transport\n"); + return MSTRO_UNIMPL; + } #endif - return MSTRO_UNIMPL; /* Issuing is done in maestro/ofi.c */ + return MSTRO_UNIMPL; /* Issuing is done in maestro/ofi.c */ } mstro_status