Skip to content
Snippets Groups Projects
Commit 69cb8582 authored by Benedikt Steinbusch's avatar Benedikt Steinbusch
Browse files

simplify protocol

parent 0acafb2d
No related branches found
No related tags found
No related merge requests found
Pipeline #67293 passed
...@@ -8,11 +8,7 @@ Session::= ...@@ -8,11 +8,7 @@ Session::=
Server: Accept -> Server: Accept ->
Client: Connect -> Client: Connect ->
OR ( OR (
( Procedure,
Client: Hello ->
Procedure ->
Client: Disconnect
),
Client: Shutdown Client: Shutdown
) )
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
/// \note Needs to stay in sync with `server.cxx` /// \note Needs to stay in sync with `server.cxx`
enum { enum {
SIONFWD_MESSAGE_TYPE_SHUTDOWN, ///< instruct the server to shut down SIONFWD_MESSAGE_TYPE_SHUTDOWN, ///< instruct the server to shut down
SIONFWD_MESSAGE_TYPE_HELLO, ///< greet the server after establishing a connection
SIONFWD_MESSAGE_TYPE_DISCONNECT, ///< disconnect from the server
SIONFWD_MESSAGE_TYPE_OPEN, ///< open a file SIONFWD_MESSAGE_TYPE_OPEN, ///< open a file
SIONFWD_MESSAGE_TYPE_CLOSE, ///< close a file SIONFWD_MESSAGE_TYPE_CLOSE, ///< close a file
SIONFWD_MESSAGE_TYPE_PWRITE, ///< write at position SIONFWD_MESSAGE_TYPE_PWRITE, ///< write at position
...@@ -51,8 +49,6 @@ static MPI_Comm sionfwd_connect(void) { ...@@ -51,8 +49,6 @@ static MPI_Comm sionfwd_connect(void) {
MPI_Comm connection; MPI_Comm connection;
MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_SELF, &connection); MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_SELF, &connection);
MPI_Ssend(&(int){ SIONFWD_MESSAGE_TYPE_HELLO }, 1, MPI_INT, 0, 0, connection);
return connection; return connection;
} }
...@@ -64,7 +60,6 @@ static MPI_Comm sionfwd_connect(void) { ...@@ -64,7 +60,6 @@ static MPI_Comm sionfwd_connect(void) {
/// ///
/// \param[in,out] comm The connection to tear down, set to `MPI_COMM_NULL` on return /// \param[in,out] comm The connection to tear down, set to `MPI_COMM_NULL` on return
static void sionfwd_disconnect(MPI_Comm *comm) { static void sionfwd_disconnect(MPI_Comm *comm) {
MPI_Ssend(&(int){ SIONFWD_MESSAGE_TYPE_DISCONNECT }, 1, MPI_INT, 0, 0, *comm);
MPI_Comm_disconnect(comm); MPI_Comm_disconnect(comm);
assert(*comm == MPI_COMM_NULL); assert(*comm == MPI_COMM_NULL);
} }
......
...@@ -50,8 +50,6 @@ enum { ...@@ -50,8 +50,6 @@ enum {
/// \note Needs to stay in sync with `client.c` /// \note Needs to stay in sync with `client.c`
enum class MessageType : int { enum class MessageType : int {
Shutdown, Shutdown,
Hello,
Disconnect,
Open, Open,
Close, Close,
Pwrite, Pwrite,
...@@ -114,12 +112,6 @@ private: ...@@ -114,12 +112,6 @@ private:
/// \brief An instruction to shut down /// \brief An instruction to shut down
struct Shutdown {}; struct Shutdown {};
/// \brief A greeting from the client
struct Hello {};
/// \brief A farewell from a disconnecting client
struct Disconnect {};
/// \brief Instruction to open a file /// \brief Instruction to open a file
struct Open { struct Open {
/// \brief Constructs from a partially decoded message /// \brief Constructs from a partially decoded message
...@@ -275,8 +267,6 @@ struct Unknown {}; ...@@ -275,8 +267,6 @@ struct Unknown {};
/// \brief Messages of various kinds /// \brief Messages of various kinds
using Message = std::variant< using Message = std::variant<
Shutdown, Shutdown,
Hello,
Disconnect,
Open, Open,
Close, Close,
Pwrite, Pwrite,
...@@ -335,10 +325,6 @@ struct Communicator { ...@@ -335,10 +325,6 @@ struct Communicator {
switch (MessageType{type}) { switch (MessageType{type}) {
case MessageType::Shutdown: case MessageType::Shutdown:
return Shutdown{}; return Shutdown{};
case MessageType::Hello:
return Hello{};
case MessageType::Disconnect:
return Disconnect{};
case MessageType::Open: case MessageType::Open:
return Open{std::move(decoder)}; return Open{std::move(decoder)};
case MessageType::Close: case MessageType::Close:
...@@ -388,21 +374,61 @@ private: ...@@ -388,21 +374,61 @@ private:
MPI_Comm comm_; MPI_Comm comm_;
}; };
/// \brief Receives (and handles) messages from a client until the client disconnects /// \brief An MPI port that can accept new connections
struct CommunicatorReceiver { struct Port {
/// \brief Open a port
Port() noexcept {
std::array<char, MPI_MAX_PORT_NAME> name;
MPI_Open_port(MPI_INFO_NULL, name.data());
name_ = name.data();
}
/// \brief Copying not allowed
Port(const Port&) = delete;
/// \brief Construct by moving
Port(Port&& other) noexcept : name_{std::move(other.name_)} {
other.name_.clear();
}
/// \brief Copying not allowed
Port& operator=(const Port&) = delete;
/// \brief Assign by moving
Port& operator=(Port&& other) noexcept {
std::string temp{std::move(other.name_)};
other.name_.clear();
if (!name_.empty()) { MPI_Close_port(name_.c_str()); }
name_ = std::move(temp);
return *this;
}
/// \brief Close the port at end of life time
~Port() {
if (!name_.empty()) { MPI_Close_port(name_.c_str()); }
}
/// \brief Name of the port, used for connecting
[[nodiscard]] const std::string& name() const {
return this->name_;
}
/// \brief Accept a new connection from a client
[[nodiscard]] Communicator accept() const {
MPI_Comm comm;
MPI_Comm_accept(name_.c_str(), MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm);
return Communicator{comm};
}
private:
std::string name_;
};
/// \brief Accepts new client connections on an open port
struct PortAcceptor {
/// \brief Performs actions based on a message from the client /// \brief Performs actions based on a message from the client
struct MessageVisitor { struct MessageVisitor {
/// \brief Construct from a \ref Communicator /// \brief Construct from a \ref Communicator
explicit MessageVisitor(Communicator& communicator) noexcept : communicator_{communicator} {} explicit MessageVisitor(Communicator& communicator) noexcept : communicator_{communicator} {}
/// \brief Handle \ref Disconnect message
bool operator()(Disconnect&& args) {
if (debug) {
std::cerr << "got a disconnect message" << std::endl;
}
return false;
}
/// \brief Handle \ref Open message /// \brief Handle \ref Open message
bool operator()(Open&& args) { bool operator()(Open&& args) {
if (debug) { if (debug) {
...@@ -521,6 +547,14 @@ struct CommunicatorReceiver { ...@@ -521,6 +547,14 @@ struct CommunicatorReceiver {
return true; return true;
} }
/// \brief Handle \ref Shutdown message
bool operator()(Shutdown&& args) {
if (debug) {
std::cerr << "got a shutdown message" << std::endl;
}
return false;
}
/// \brief Handle an unexpected message /// \brief Handle an unexpected message
template<typename T> template<typename T>
bool operator()(T&& args) { bool operator()(T&& args) {
...@@ -531,76 +565,6 @@ struct CommunicatorReceiver { ...@@ -531,76 +565,6 @@ struct CommunicatorReceiver {
Communicator& communicator_; Communicator& communicator_;
}; };
/// \brief Construct from a \ref Communicator
explicit CommunicatorReceiver(Communicator&& communicator) noexcept : communicator_{std::move(communicator)} {
if (debug) {
std::cerr << "launching a CommunicatorReceiver" << std::endl;
}
}
/// \brief Receive (and handle) messages
void operator()() {
bool go_on;
do {
Message message{communicator_.receive()};
go_on = std::visit(MessageVisitor{communicator_}, std::move(message));
} while (go_on);
}
private:
Communicator communicator_;
};
/// \brief An MPI port that can accept new connections
struct Port {
/// \brief Open a port
Port() noexcept {
std::array<char, MPI_MAX_PORT_NAME> name;
MPI_Open_port(MPI_INFO_NULL, name.data());
name_ = name.data();
}
/// \brief Copying not allowed
Port(const Port&) = delete;
/// \brief Construct by moving
Port(Port&& other) noexcept : name_{std::move(other.name_)} {
other.name_.clear();
}
/// \brief Copying not allowed
Port& operator=(const Port&) = delete;
/// \brief Assign by moving
Port& operator=(Port&& other) noexcept {
std::string temp{std::move(other.name_)};
other.name_.clear();
if (!name_.empty()) { MPI_Close_port(name_.c_str()); }
name_ = std::move(temp);
return *this;
}
/// \brief Close the port at end of life time
~Port() {
if (!name_.empty()) { MPI_Close_port(name_.c_str()); }
}
/// \brief Name of the port, used for connecting
[[nodiscard]] const std::string& name() const {
return this->name_;
}
/// \brief Accept a new connection from a client
[[nodiscard]] Communicator accept() const {
MPI_Comm comm;
MPI_Comm_accept(name_.c_str(), MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm);
return Communicator{comm};
}
private:
std::string name_;
};
/// \brief Accepts new client connections on an open port
struct PortAcceptor {
/// \brief Construct from an open \ref Port /// \brief Construct from an open \ref Port
explicit PortAcceptor(Port&& port) noexcept : port_{std::move(port)} { explicit PortAcceptor(Port&& port) noexcept : port_{std::move(port)} {
if (debug) { if (debug) {
...@@ -610,18 +574,15 @@ struct PortAcceptor { ...@@ -610,18 +574,15 @@ struct PortAcceptor {
/// \brief Accept new connections until \ref Shutdown \ref Message is received /// \brief Accept new connections until \ref Shutdown \ref Message is received
void operator()() { void operator()() {
while (true) { bool go_on;
do {
if (debug) { if (debug) {
std::cerr << "accepting new connections" << std::endl; std::cerr << "accepting new connections" << std::endl;
} }
Communicator client{port_.accept()}; Communicator client{port_.accept()};
Message message{client.receive()}; Message message{client.receive()};
if (std::holds_alternative<Shutdown>(message)) { break; } go_on = std::visit(MessageVisitor{client}, std::move(message));
if (!std::holds_alternative<Hello>(message)) { } while (go_on);
throw std::runtime_error{"unexpected message type"};
}
CommunicatorReceiver{std::move(client)}();
}
} }
private: private:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment