diff --git a/maestro/ofi.c b/maestro/ofi.c index 9c229804a62d861739e95a6e6ad916b1f0c047d4..41e3b16966e022ad33ef0ed9de7e44d210853df0 100644 --- a/maestro/ofi.c +++ b/maestro/ofi.c @@ -2029,10 +2029,20 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) return MSTRO_FAIL; } + if(init->dst_attributes==NULL) { + WARN("No attributes on CDO\n"); + } else { + if(init->dst_attributes->val_case==MSTRO__POOL__ATTRIBUTES__VAL_KV_MAP) { + DEBUG("%zu attributes in kv-map\n", init->dst_attributes->kv_map->n_map); + } else { + WARN("non-kv attributes\n"); + } + } + DEBUG("Initiating transfer from src app %zu (me) to dst app %zu of CDO %s\n", g_pool_app_id, init->dst_appid->id, src_cdo->name); -// TODO rm, transport layer checks that + // TODO rm, transport layer checks that if (src_cdo->raw_ptr == NULL) INFO ("No rawptr dirty for CDO %s\n", src_cdo->name); else @@ -2081,7 +2091,7 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) id.qw1 = cdoid.qw[1]; Mstro__Pool__TransferTicket ticket = MSTRO__POOL__TRANSFER_TICKET__INIT; ticket.cdoid = &id; - + const int64_t *size; enum mstro_cdo_attr_value_type type; if (! (MSTRO_OK == mstro_cdo_attribute_get(src_cdo, MSTRO_ATTR_CORE_CDO_SCOPE_LOCAL_SIZE, @@ -2112,82 +2122,86 @@ mstro_pc__handle_initiate_transfer(const Mstro__Pool__InitiateTransfer* init) ticket.data_size = *(const int64_t*)size; ticket.gfs = &gfs; - /* even conditionals are problematic, had to take out protobuf INITs ----^ */ - if (ticket.ticket_case == MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS) { -INFO("TICKET CASE GFS\n"); - gfs.path = src_cdo->name; - gfs.keep_file = 0; // Arbitrarily rm the transport file on dst - - - } else if (ticket.ticket_case == MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO) { - - INFO("TICKET CASE MIO\n"); + /* even conditionals are problematic, had to take out protobuf INITs ----^ */ + switch(ticket.ticket_case) { + case MSTRO__POOL__TRANSFER_TICKET__TICKET_GFS: + INFO("TICKET CASE GFS\n"); + gfs.path = src_cdo->name; + gfs.keep_file = 0; // Arbitrarily rm the transport file on dst + break; + case MSTRO__POOL__TRANSFER_TICKET__TICKET_MIO: + INFO("TICKET CASE MIO\n"); #ifdef HAVE_MIO - struct mstro_cdo_id* semid; - semid = malloc(sizeof(struct mio_obj_id)); - if (semid == NULL) { - ERR("No more memory.\n"); - return MSTRO_FAIL; - } - struct mstro_cdo_id* objid; - objid = malloc(sizeof(struct mio_obj_id)); - if (objid == NULL) { - ERR("No more memory.\n"); - return MSTRO_FAIL; - } - char* semname = NULL; - mstro_str_random(&semname, MSTRO_MIO_SEM_STR_MAXLEN); - if (semname == NULL) { - ERR("Couldn't prepare an id for semaphore obj\n"); - return MSTRO_FAIL; - } - status = mstro_cdo_id_from_name(semname, semid); /* So we do collision - detection in only - one place */ - if (status != MSTRO_OK) { - ERR("Couldn't make an id from name for semaphore obj\n"); - return MSTRO_FAIL; - } - - WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)semid, - INFO("Semaphore has ID: %s\n", - idstr);); - WITH_CDO_ID_STR(idstr, &(src_cdo->gid), - INFO("(CDO associated has ID: %s)\n", - idstr);); - - assert(sizeof(struct mstro_cdo_id) == 2*sizeof(uint64_t)); - assert(sizeof(struct mstro_cdo_id) == sizeof(struct mio_obj_id)); - - mio.semid.len = sizeof(struct mstro_cdo_id); - mio.semid.data = (uint8_t*)semid; - mio.objid.len = sizeof(struct mstro_cdo_id); - objid->qw[0] = src_cdo->gid.qw[0]; - objid->qw[1] = src_cdo->gid.qw[1]; - mio.objid.data = (uint8_t*)objid; - - mio.keep_obj = 0; - - ticket.mio = &mio; -#else - ERR("Request to issue an MIO ticket, but built without MIO support\n"); + struct mstro_cdo_id* semid; + semid = malloc(sizeof(struct mio_obj_id)); + if (semid == NULL) { + ERR("No more memory.\n"); + return MSTRO_FAIL; + } + struct mstro_cdo_id* objid; + objid = malloc(sizeof(struct mio_obj_id)); + if (objid == NULL) { + ERR("No more memory.\n"); return MSTRO_FAIL; + } + char* semname = NULL; + mstro_str_random(&semname, MSTRO_MIO_SEM_STR_MAXLEN); + if (semname == NULL) { + ERR("Couldn't prepare an id for semaphore obj\n"); + return MSTRO_FAIL; + } + status = mstro_cdo_id_from_name(semname, semid); /* So we do collision + detection in only + one place */ + if (status != MSTRO_OK) { + ERR("Couldn't make an id from name for semaphore obj\n"); + return MSTRO_FAIL; + } + + WITH_MIO_OBJ_STR(idstr, (struct mstro_cdo_id*)semid, + INFO("Semaphore has ID: %s\n", + idstr);); + WITH_CDO_ID_STR(idstr, &(src_cdo->gid), + INFO("(CDO associated has ID: %s)\n", + idstr);); + + assert(sizeof(struct mstro_cdo_id) == 2*sizeof(uint64_t)); + assert(sizeof(struct mstro_cdo_id) == sizeof(struct mio_obj_id)); + + mio.semid.len = sizeof(struct mstro_cdo_id); + mio.semid.data = (uint8_t*)semid; + mio.objid.len = sizeof(struct mstro_cdo_id); + objid->qw[0] = src_cdo->gid.qw[0]; + objid->qw[1] = src_cdo->gid.qw[1]; + mio.objid.data = (uint8_t*)objid; + + mio.keep_obj = 0; + + ticket.mio = &mio; +#else + ERR("Request to issue an MIO ticket, but built without MIO support\n"); + return MSTRO_FAIL; #endif - } else { - ERR("Add a protobuf INIT in TicketCase switch\n"); + break; + default: + ERR("Unsupported ticket kind %d\n", ticket.ticket_case); return MSTRO_UNIMPL; } -/* - status = mstro_transport_ticket_issue(src_cdo, &ticket); - if (MSTRO_OK != status) { - ERR("Cannot issue a ticket for CDO %s\n", src_cdo->name); - return MSTRO_FAIL; - } -*/ /**/ - /* Execute transport (non-blocking) */ + if(src_cdo->attributes_msg==NULL) { + ERR("source CDO has no attributes message data -- should not happen\n"); + return MSTRO_FAIL; + } + ticket.attributes = src_cdo->attributes_msg; + + INFO("Issued ticket for CDO %s, and starting execute process\n", src_cdo->name); + + //INFO("TransferTicket using path %s\n", ticket.gfs->path); + INFO("TransferTicket cdo size %zu\n", ticket.data_size); + + /* Execute transport (non-blocking) */ status = mstro_transport_execute(src_cdo, &ticket); if(MSTRO_OK != status) { ERR("Failure in transport execute for CDO %s\n", @@ -2195,11 +2209,6 @@ INFO("TICKET CASE GFS\n"); return MSTRO_FAIL; } - INFO("Issued a ticket for CDO %s\n", src_cdo->name); - - //INFO("TransferTicket using path %s\n", ticket.gfs->path); - INFO("TransferTicket cdo size %zu\n", ticket.data_size); - /* do some clean-up if applicable */ if (init->cp == 0) { // That means mv diff --git a/protocols/mstro_pool.pb-c.c b/protocols/mstro_pool.pb-c.c index 916acbf0738ee1099d4b6d8bb1ffed1b0dd765ae..f9fe16f8d4289878f54724c91ed3afea398a416a 100644 --- a/protocols/mstro_pool.pb-c.c +++ b/protocols/mstro_pool.pb-c.c @@ -4103,7 +4103,7 @@ const ProtobufCMessageDescriptor mstro__pool__transfer_ticket_ofi__descriptor = (ProtobufCMessageInit) mstro__pool__transfer_ticket_ofi__init, NULL,NULL,NULL /* reserved[123] */ }; -static const ProtobufCFieldDescriptor mstro__pool__transfer_ticket__field_descriptors[10] = +static const ProtobufCFieldDescriptor mstro__pool__transfer_ticket__field_descriptors[11] = { { "cdoid", @@ -4225,8 +4225,21 @@ static const ProtobufCFieldDescriptor mstro__pool__transfer_ticket__field_descri 0, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, + { + "attributes", + 11, + PROTOBUF_C_LABEL_NONE, + PROTOBUF_C_TYPE_MESSAGE, + 0, /* quantifier_offset */ + offsetof(Mstro__Pool__TransferTicket, attributes), + &mstro__pool__attributes__descriptor, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, }; static const unsigned mstro__pool__transfer_ticket__field_indices_by_name[] = { + 10, /* field[10] = attributes */ 0, /* field[0] = cdoid */ 9, /* field[9] = data_size */ 5, /* field[5] = gfs */ @@ -4241,7 +4254,7 @@ static const unsigned mstro__pool__transfer_ticket__field_indices_by_name[] = { static const ProtobufCIntRange mstro__pool__transfer_ticket__number_ranges[1 + 1] = { { 1, 0 }, - { 0, 10 } + { 0, 11 } }; const ProtobufCMessageDescriptor mstro__pool__transfer_ticket__descriptor = { @@ -4251,7 +4264,7 @@ const ProtobufCMessageDescriptor mstro__pool__transfer_ticket__descriptor = "Mstro__Pool__TransferTicket", "mstro.pool", sizeof(Mstro__Pool__TransferTicket), - 10, + 11, mstro__pool__transfer_ticket__field_descriptors, mstro__pool__transfer_ticket__field_indices_by_name, 1, mstro__pool__transfer_ticket__number_ranges, diff --git a/protocols/mstro_pool.pb-c.h b/protocols/mstro_pool.pb-c.h index 2a0b02ff9491f66aa503a1cc393d6a583dfb0ac8..efb3db132abe86e7ce95e9ae300e4d75c37877ac 100644 --- a/protocols/mstro_pool.pb-c.h +++ b/protocols/mstro_pool.pb-c.h @@ -781,6 +781,7 @@ struct _Mstro__Pool__TransferTicket * FIXME: proper attribute structure, plus eventual Mamba array info */ int64_t data_size; + Mstro__Pool__Attributes *attributes; Mstro__Pool__TransferTicket__TicketCase ticket_case; union { Mstro__Pool__TransferTicketGFS *gfs; @@ -791,7 +792,7 @@ struct _Mstro__Pool__TransferTicket }; #define MSTRO__POOL__TRANSFER_TICKET__INIT \ { PROTOBUF_C_MESSAGE_INIT (&mstro__pool__transfer_ticket__descriptor) \ - , NULL, NULL, 0, (char *)protobuf_c_empty_string, MSTRO__POOL__TRANSPORT_KIND__INVALID_TRANSPORT_KIND, 0, MSTRO__POOL__TRANSFER_TICKET__TICKET__NOT_SET, {0} } + , NULL, NULL, 0, (char *)protobuf_c_empty_string, MSTRO__POOL__TRANSPORT_KIND__INVALID_TRANSPORT_KIND, 0, NULL, MSTRO__POOL__TRANSFER_TICKET__TICKET__NOT_SET, {0} } /* diff --git a/protocols/mstro_pool.proto b/protocols/mstro_pool.proto index 3f143ea767d19eccc9095864d20ed55c6c26056f..e3d2184e3e3621042769353591ee703ed8459556 100644 --- a/protocols/mstro_pool.proto +++ b/protocols/mstro_pool.proto @@ -367,8 +367,10 @@ message TransferTicket { TransferTicketOFI ofi = 9; }; + sfixed64 data_size = 10; /* FIXME: proper attribute structure, plus eventual Mamba array info */ - sfixed64 data_size = 10; + + Attributes attributes = 11; };