diff --git a/include/maestro/i_pool_client.h b/include/maestro/i_pool_client.h index b075a394f8dfb60d355b5c99ad4fce8c675f26fa..661356acbbcc17d5933d1157a4188a824cecda34 100644 --- a/include/maestro/i_pool_client.h +++ b/include/maestro/i_pool_client.h @@ -188,7 +188,7 @@ mstro_pc__bye(mstro_pool_operation op); * @return mstro_status return status */ mstro_status -mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg); +mstro_pc__op_maker(Mstro__Pool__MstroMsg *msg); /**@} (end of group MSTRO_I_POOL_CLIENT) */ /**@} (end of group MSTRO_Internal) */ diff --git a/maestro/pool_client.c b/maestro/pool_client.c index 1e0c6275e714c497baa8914d523e28cab85577ce..6d9c3245c2efc3dd929fe323ba16110a276b2aa9 100644 --- a/maestro/pool_client.c +++ b/maestro/pool_client.c @@ -1218,7 +1218,7 @@ mstro_pc__event(mstro_pool_operation op) * pushes the created operations to the *queue* */ mstro_status -mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) +mstro_pc__op_maker(Mstro__Pool__MstroMsg *msg) { mstro_status status=MSTRO_OK; mstro_pool_operation op; @@ -1231,7 +1231,7 @@ mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) } /**handle msg */ - switch((*msg)->msg_case) { + switch(msg->msg_case) { /* 'good' messages: */ case MSTRO__POOL__MSTRO_MSG__MSG_INITIATE_TRANSFER: /*fill initiate transfer specific parts */ @@ -1240,9 +1240,9 @@ mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) op->pc_transport.target_addr = 0; op->pc_transport.target_ep = NULL; op->pc_transport.request = NULL; - op->pc_transport.target_appid = (*msg)->initiate_transfer->dst_appid->id; - op->pc_transport.methods = (*msg)->initiate_transfer->methods; - op->pc_transport.target_serialized_endpoint = (*msg)->initiate_transfer->dst_serialized_endpoint; + op->pc_transport.target_appid = msg->initiate_transfer->dst_appid->id; + op->pc_transport.methods = msg->initiate_transfer->methods; + op->pc_transport.target_serialized_endpoint = msg->initiate_transfer->dst_serialized_endpoint; break; case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_TICKET: /*fill transfer specific parts*/ @@ -1251,9 +1251,9 @@ mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) op->pc_transport.target_addr = 0; op->pc_transport.target_ep = NULL; op->pc_transport.request = NULL; - op->pc_transport.target_appid = (*msg)->transfer_ticket->srcid->id; + op->pc_transport.target_appid = msg->transfer_ticket->srcid->id; op->pc_transport.methods = NULL; - op->pc_transport.target_serialized_endpoint = (*msg)->transfer_ticket->src_serialized_endpoint; + op->pc_transport.target_serialized_endpoint = msg->transfer_ticket->src_serialized_endpoint; break; case MSTRO__POOL__MSTRO_MSG__MSG_DECLARE_ACK: op->kind = MSTRO_OP_PC_DECLARE_ACK; @@ -1285,10 +1285,10 @@ mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) break; default: WARN("%s message received, dropping it, not even sending ACK\n", - (*msg)->base.descriptor->name); + msg->base.descriptor->name); /*cleanup*/ mstro_pool_op__free(op); - mstro_pm__msg_free(*msg); + mstro_pm__msg_free(msg); status = MSTRO_UNIMPL; break; } @@ -1297,9 +1297,8 @@ mstro_pc__op_maker(Mstro__Pool__MstroMsg **msg) { /* continue to fill general operation fields from msg and push to queue */ op->step = MSTRO_OP_ST_ANNOUNCE; // first step is to announce - op->msg = *msg; - op->appid = g_pool_app_id; - *msg = NULL; /**consume the message here because caller (mstro_pc_handle_msg) will free it*/ + op->msg = msg; + op->appid = msg->token->appid->id; DEBUG("Filled operation structure from msg\n"); status = erl_thread_team_enqueue(g_pool_operations_team, op); assert(status == MSTRO_OK); @@ -1392,9 +1391,12 @@ mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope) g_pool_apptoken.appid = &g_pool_appid; /* FIXME: possibly add security token at this time */ atomic_store(&g_mstro_pm_attached, true); + mstro_pm__msg_free(msg); break; } + /**handled by operations */ + case MSTRO__POOL__MSTRO_MSG__MSG_DECLARE_ACK: //DEBUG("PC DECLARE-ACK result: %d\n", status); case MSTRO__POOL__MSTRO_MSG__MSG_SUBSCRIBE_ACK: @@ -1413,13 +1415,14 @@ mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope) //DEBUG("PC MSG-EVENT result: %d\n", status); case MSTRO__POOL__MSTRO_MSG__MSG_TRANSFER_COMPLETED: //DEBUG("PC MSG-TRANSFER-COMPLETED result: %d\n", status); - status = mstro_pc__op_maker(&msg); + status = mstro_pc__op_maker(msg); break; case MSTRO__POOL__MSTRO_MSG__MSG_DEMAND_ATTR_RES: case MSTRO__POOL__MSTRO_MSG__MSG_QUERY_RES: WARN("Unhandled message kind: %s\n", msg->declare->base.descriptor->name); + mstro_pm__msg_free(msg); return MSTRO_UNIMPL; @@ -1445,17 +1448,12 @@ mstro_pc_handle_msg(const struct mstro_msg_envelope *envelope) default: ERR("Invalid message kind (incoming on PC side): %s\n", msg->declare->base.descriptor->name); - status=MSTRO_INVMSG; - goto BAILOUT; + mstro_pm__msg_free(msg); + return MSTRO_INVMSG; } } -BAILOUT: - /* kill message. Everything should already be contained in the operation */ - if(msg!=NULL) { - assert(msg->token!=&g_pool_apptoken); - mstro__pool__mstro_msg__free_unpacked(msg, NULL); - } + BAILOUT: return status; }