Skip to content
Snippets Groups Projects
Commit 0acafb2d authored by Benedikt Steinbusch's avatar Benedikt Steinbusch
Browse files

add documentation

parent 575fe78c
Branches
No related tags found
No related merge requests found
Pipeline #67290 passed
......@@ -52,6 +52,22 @@ endif()
set(SIONfwd_BUILD_TESTING OFF CACHE BOOL "Build SIONfwd tests suite, even if SIONfwd is a subproject")
find_package(Doxygen)
if(DOXYGEN_FOUND)
set(DOXYGEN_USE_MDFILE_AS_MAINPAGE "README.md")
set(DOXYGEN_EXTRACT_STATIC "YES")
set(DOXYGEN_QUIET "YES")
doxygen_add_docs(sionfwd-docs
README.md
doc/protocol.md
include/sionfwd/client.h
src/client.c
src/server.cxx
ALL
)
endif()
add_subdirectory(src)
add_subdirectory(pkgconfig)
......
......@@ -3,28 +3,70 @@
#include <stdint.h>
/// \file
/// \defgroup client Client API
/// @{
/// \brief Send SIONlib specific RPCs to the SIONfwd server
#ifdef __cplusplus
extern "C" {
#endif
/// \brief Constants used in the creation of bit-masks passed to `sionfwd_open`'s `flags` argument
/// \note Needs to stay in sync with `server.cxx`
enum {
SIONFWD_CREATE = 1,
SIONFWD_WRITE = 2,
SIONFWD_READ = 4,
SIONFWD_CREATE = 1, ///< create the file if it does not exist, see POSIX `O_CREAT`
SIONFWD_WRITE = 2, ///< open the file for writing, see POSIX `O_RDWR`
SIONFWD_READ = 4, ///< open the file for reading, see POSIX `O_RDONLY`
};
/// \brief Open a file
/// \param[in] path Path to the file
/// \param[in] flags Determines the mode of opening the file, a bit mask that can contain combinations of \ref SIONFWD_CREATE, \ref SIONFWD_WRITE, \ref SIONFWD_READ
/// \returns `-1` on error, otherwise a file descriptor
/// \see Man page for POSIX `open(2)`
int sionfwd_open(const char*, int);
/// \brief Close a file
/// \param[in] fd File descriptor of the file to close
/// \returns `0` on success, `-1` on error
/// \see Man page for POSIX `close(2)`
int sionfwd_close(int);
/// \brief Write data to a file at a position
/// \param[in] fd File descriptor of the file to write to
/// \param[in] buf Pointer to buffer containing the data to write
/// \param[in] nbyte Number of bytes to write
/// \param[in] offset Offset from the beginning of the file to start writing at (in bytes)
/// \returns `-1` on error, otherwise the number of bytes written
/// \see Man page for POSIX `pwrite(2)`
int64_t sionfwd_pwrite(int, const void*, int64_t, int64_t);
/// \brief Read data from a file at a position
/// \param[in] fd File descriptor of the file to read from
/// \param[out] buf Pointer to buffer to store the read data in
/// \param[in] nbyte Number of bytes to read
/// \param[in] offset Offset from the beginning of the file to start reading at (in bytes)
/// \returns `-1` on error, otherwise the number of bytes read
/// \see Man page for POSIX `pread(2)`
int64_t sionfwd_pread(int, void*, int64_t, int64_t);
/// \brief Find the block size of a file system storing an open file
/// \param[in] fd File descriptor of an open file
/// \returns `-1` on error, otherwise the block size
/// \see Man page for POSIX `fstat(2)`
long sionfwd_stat_blksize(int);
/// \brief Check for existence of a file
/// \param[in] path Path to the file to check for
/// \returns `-1` on error (including file does not exist), `0` otherwise
/// \see Man page for POSIX `stat(2)`
int sionfwd_stat_file(const char*);
/// \brief Flush file content to storage
/// \param[in] fd File descriptor of the file to flush
/// \returns `-1` on error, `0` otherwise
/// \see Man page for POSIX `fsync(2)`
int sionfwd_flush(int);
#ifdef __cplusplus
......@@ -32,3 +74,5 @@ int sionfwd_flush(int);
#endif
#endif
/// @}
......@@ -8,19 +8,35 @@
#include <sionfwd/client.h>
/// \file
/// \addtogroup client
/// @{
/// \brief Constants for the different message types sent to the server
/// \note Needs to stay in sync with `server.cxx`
enum {
SIONFWD_MESSAGE_TYPE_SHUTDOWN,
SIONFWD_MESSAGE_TYPE_HELLO,
SIONFWD_MESSAGE_TYPE_DISCONNECT,
SIONFWD_MESSAGE_TYPE_OPEN,
SIONFWD_MESSAGE_TYPE_CLOSE,
SIONFWD_MESSAGE_TYPE_PWRITE,
SIONFWD_MESSAGE_TYPE_PREAD,
SIONFWD_MESSAGE_TYPE_STAT_BLKSIZE,
SIONFWD_MESSAGE_TYPE_STAT_FILE,
SIONFWD_MESSAGE_TYPE_FLUSH,
SIONFWD_MESSAGE_TYPE_SHUTDOWN, ///< instruct the server to shut down
SIONFWD_MESSAGE_TYPE_HELLO, ///< greet the server after establishing a connection
SIONFWD_MESSAGE_TYPE_DISCONNECT, ///< disconnect from the server
SIONFWD_MESSAGE_TYPE_OPEN, ///< open a file
SIONFWD_MESSAGE_TYPE_CLOSE, ///< close a file
SIONFWD_MESSAGE_TYPE_PWRITE, ///< write at position
SIONFWD_MESSAGE_TYPE_PREAD, ///< read at position
SIONFWD_MESSAGE_TYPE_STAT_BLKSIZE, ///< inspect file system block size
SIONFWD_MESSAGE_TYPE_STAT_FILE, ///< check for existence of a file
SIONFWD_MESSAGE_TYPE_FLUSH, ///< flush file contents to storage
};
/// \brief Bring up a connection to the server
///
/// Information about server ports is read from the environment variables
/// `SIONFWD_NUM_PORTS` and `SIONFWD_PORT<num>`.
///
/// Due to limitations in MPI libraries, a new connection is brought up for
/// every RPC call, so this function is called from all public functions in
/// this unit.
///
/// \returns An MPI communicator in which the server is process `0`
static MPI_Comm sionfwd_connect(void) {
char *num_ports = getenv("SIONFWD_NUM_PORTS");
if (!num_ports) { return MPI_COMM_NULL; }
......@@ -40,6 +56,13 @@ static MPI_Comm sionfwd_connect(void) {
return connection;
}
/// \brief Tear down a connection to the server
///
/// Due to limitations in MPI libraries, a new connection is brought up for
/// every RPC call, so this function is called from all public functions in
/// this unit.
///
/// \param[in,out] comm The connection to tear down, set to `MPI_COMM_NULL` on return
static void sionfwd_disconnect(MPI_Comm *comm) {
MPI_Ssend(&(int){ SIONFWD_MESSAGE_TYPE_DISCONNECT }, 1, MPI_INT, 0, 0, *comm);
MPI_Comm_disconnect(comm);
......@@ -213,3 +236,5 @@ int sionfwd_flush(int fd) {
sionfwd_disconnect(&connection);
return status;
}
/// @}
......@@ -16,26 +16,38 @@
#include <mpi.h>
/// \file
/// \defgroup server Server program
/// @{
/// \brief Receive RPCs from the client library and execute them
/// \brief Whether to print debug messages to `std::cerr`
static bool debug = false;
/// \brief Process rank in `MPI_COMM_WORLD`
static int rank() {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
return rank;
}
/// \brief Number of processes in `MPI_COMM_WORLD`
static int size() {
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
return size;
}
/// \brief Constants used in the creation of bit-masks sent from `sionfwd_open`
/// \note Needs to stay in sync with `client.h`
enum {
SIONFWD_CREATE = 1,
SIONFWD_WRITE = 2,
SIONFWD_READ = 4,
SIONFWD_CREATE = 1, ///< create the file if it does not exist, see POSIX `O_CREAT`
SIONFWD_WRITE = 2, ///< open the file for writing, see POSIX `O_RDWR`
SIONFWD_READ = 4, ///< open the file for reading, see POSIX `O_RDONLY`
};
/// \brief Constants for the different message types sent from the client
/// \note Needs to stay in sync with `client.c`
enum class MessageType : int {
Shutdown,
Hello,
......@@ -49,19 +61,33 @@ enum class MessageType : int {
Flush,
};
/// \brief Template that translates C++ types into MPI datatypes
template<typename T> MPI_Datatype get_mpi_datatype() noexcept;
/// \brief Translate `char` to `MPI_CHAR`
template<> MPI_Datatype get_mpi_datatype<char>() noexcept { return MPI_CHAR; }
/// \brief Translate `int` to `MPI_INT`
template<> MPI_Datatype get_mpi_datatype<int>() noexcept { return MPI_INT; }
/// \brief Translate `long` to `MPI_LONG`
template<> MPI_Datatype get_mpi_datatype<long>() noexcept { return MPI_LONG; }
/// \brief Translate `long long` to `MPI_LONG_LONG`
template<> MPI_Datatype get_mpi_datatype<long long>() noexcept { return MPI_LONG_LONG; }
/// \brief Uses MPI to decode data items from a raw packed message
///
/// Keeps an implicit position into the message that is moved ahead by decoding
/// message elements.
struct MessageDecoder {
/// \brief Construct a decoder
/// \param[in] comm The MPI communicator which the message was received on
/// \param[in] packed The raw packed message data
MessageDecoder(MPI_Comm comm, std::vector<char>&& packed) noexcept : position_{0}, comm_{comm}, packed_{std::move(packed)} {}
/// \brief Decode a single instance of the given type from the message
/// \returns The decoded data
template<typename T>
T decode() noexcept {
T x;
......@@ -69,6 +95,9 @@ struct MessageDecoder {
return x;
}
/// \brief Decode multiple instances of the given type from the message
/// \param[in] n How many elements to decode
/// \returns A `std::vector` containing the decoded elements
template<typename T>
std::vector<T> decode_multiple(int n) {
std::vector<T> xs(n);
......@@ -82,23 +111,30 @@ private:
std::vector<char> packed_;
};
/// \brief An instruction to shut down
struct Shutdown {};
/// \brief A greeting from the client
struct Hello {};
/// \brief A farewell from a disconnecting client
struct Disconnect {};
/// \brief Instruction to open a file
struct Open {
/// \brief Constructs from a partially decoded message
explicit Open(MessageDecoder&& decoder) noexcept {
flags_ = decoder.decode<int>();
auto path_length = decoder.decode<int>();
path_ = decoder.decode_multiple<char>(path_length);
}
/// \brief Path of the file to open
[[nodiscard]] const char* path() const noexcept {
return path_.data();
}
/// \brief Additional flags for opening (`SIONFWD_CREATE`, etc.)
[[nodiscard]] int flags() const noexcept {
return flags_;
}
......@@ -108,11 +144,14 @@ private:
int flags_;
};
/// \brief Instruction to close an open file
struct Close{
/// \brief Constructs from a partially decoded message
explicit Close(MessageDecoder&& decoder) noexcept {
fd_ = decoder.decode<int>();
}
/// \brief File descriptor of the file to close
[[nodiscard]] int fd() const noexcept {
return fd_;
}
......@@ -121,21 +160,26 @@ private:
int fd_;
};
/// \brief Instruction to write to a file
struct Pwrite{
/// \brief Constructs from a partially decoded message
explicit Pwrite(MessageDecoder&& decoder) noexcept {
fd_ = decoder.decode<int>();
nbyte_ = decoder.decode<int64_t>();
offset_ = decoder.decode<int64_t>();
}
/// \brief File descriptor of the file to write to
[[nodiscard]] int fd() const noexcept {
return fd_;
}
/// \brief Number of bytes to write
[[nodiscard]] int64_t nbyte() const noexcept {
return nbyte_;
}
/// \brief Offset (in bytes) into the file to start writing at
[[nodiscard]] int64_t offset() const noexcept {
return offset_;
}
......@@ -146,21 +190,26 @@ private:
int64_t offset_;
};
/// \brief Instruction to read from a file
struct Pread{
/// \brief Constructs from a partially decoded message
explicit Pread(MessageDecoder&& decoder) noexcept {
fd_ = decoder.decode<int>();
nbyte_ = decoder.decode<int64_t>();
offset_ = decoder.decode<int64_t>();
}
/// \brief File descriptor of the file to read from
[[nodiscard]] int fd() const noexcept {
return fd_;
}
/// \brief Number of bytes to write
[[nodiscard]] int64_t nbyte() const noexcept {
return nbyte_;
}
/// \brief Offset (in bytes) into the file to start reading at
[[nodiscard]] int64_t offset() const noexcept {
return offset_;
}
......@@ -171,11 +220,14 @@ private:
int64_t offset_;
};
/// \brief Instruction to inspect the file system block size
struct StatBlksize {
/// \brief Constructs from a partially decoded message
explicit StatBlksize(MessageDecoder&& decoder) noexcept {
fd_ = decoder.decode<int>();
}
/// \brief File descriptor of an open file
[[nodiscard]] int fd() const noexcept {
return fd_;
}
......@@ -184,12 +236,15 @@ private:
int fd_;
};
/// \brief Instruction to check for the existence of a file
struct StatFile {
/// \brief Constructs from a partially decoded message
explicit StatFile(MessageDecoder&& decoder) noexcept {
auto path_length = decoder.decode<int>();
path_ = decoder.decode_multiple<char>(path_length);
}
/// \brief Path to the file to check for
[[nodiscard]] const char* path() const noexcept {
return path_.data();
}
......@@ -198,11 +253,14 @@ private:
std::vector<char> path_;
};
/// \brief Instruction to flush the contents of a file to storage
struct Flush {
/// \brief Constructs from a partially decoded message
explicit Flush(MessageDecoder&& decoder) noexcept {
fd_ = decoder.decode<int>();
}
/// \brief File descriptor of the file to flush
[[nodiscard]] int fd() const noexcept {
return fd_;
}
......@@ -211,8 +269,10 @@ private:
int fd_;
};
/// \brief An unknown message
struct Unknown {};
/// \brief Messages of various kinds
using Message = std::variant<
Shutdown,
Hello,
......@@ -227,15 +287,21 @@ using Message = std::variant<
Unknown
>;
/// \brief An MPI communicator for communication with a client
struct Communicator {
/// \brief Construct from raw MPI handle
explicit Communicator(MPI_Comm comm) noexcept : comm_{comm} {}
/// \brief Copying not allowed
Communicator(const Communicator&) = delete;
/// \brief Move a communicator
Communicator(Communicator&& other) noexcept : comm_{other.comm_} {
other.comm_ = MPI_COMM_NULL;
}
/// \brief Copying not allowed
Communicator& operator=(const Communicator&) = delete;
/// \brief Assign by move
Communicator& operator=(Communicator&& other) noexcept {
MPI_Comm temp{other.comm_};
other.comm_ = MPI_COMM_NULL;
......@@ -245,11 +311,13 @@ struct Communicator {
return *this;
}
/// \brief Disconnect at end of life time
~Communicator() noexcept {
if (comm_ != MPI_COMM_NULL) { MPI_Comm_disconnect(&comm_); }
assert(comm_ == MPI_COMM_NULL);
}
/// \brief Receive a \ref Message from a client
[[nodiscard]] Message receive() const {
MPI_Message message;
MPI_Status status;
......@@ -290,15 +358,22 @@ struct Communicator {
}
}
/// \brief Send a single (integer) element to the client
/// \param[in] message The message data
template<typename T>
void send(const T& message) const noexcept {
MPI_Ssend(&message, 1, get_mpi_datatype<T>(), 0, 0, comm_);
}
/// \brief Send several (character) elements to the client
/// \param[in] data The message data
void send_data(const std::vector<char>& data) const noexcept {
MPI_Ssend(data.data(), data.size(), MPI_BYTE, 0, 0, comm_);
}
/// \brief Receive raw (character) data from a client
/// \param[in] nbyte How many bytes to receive
/// \returns A `std::vector` containing the data received
[[nodiscard]] std::vector<char> receive_data(int64_t nbyte) const noexcept {
std::vector<char> data(nbyte);
MPI_Status status;
......@@ -313,10 +388,14 @@ private:
MPI_Comm comm_;
};
/// \brief Receives (and handles) messages from a client until the client disconnects
struct CommunicatorReceiver {
/// \brief Performs actions based on a message from the client
struct MessageVisitor {
/// \brief Construct from a \ref Communicator
explicit MessageVisitor(Communicator& communicator) noexcept : communicator_{communicator} {}
/// \brief Handle \ref Disconnect message
bool operator()(Disconnect&& args) {
if (debug) {
std::cerr << "got a disconnect message" << std::endl;
......@@ -324,6 +403,7 @@ struct CommunicatorReceiver {
return false;
}
/// \brief Handle \ref Open message
bool operator()(Open&& args) {
if (debug) {
std::cerr << "got an open message for path '" << args.path()
......@@ -342,6 +422,7 @@ struct CommunicatorReceiver {
return true;
}
/// \brief Handle \ref Close message
bool operator()(Close&& args) {
if (debug) {
std::cerr << "got a close message for fd " << args.fd() << std::endl;
......@@ -354,6 +435,7 @@ struct CommunicatorReceiver {
return true;
}
/// \brief Handle \ref Pwrite message
bool operator()(Pwrite&& args) {
if (debug) {
std::cerr << "got a pwrite message for fd " << args.fd() << " for " << args.nbyte()
......@@ -377,6 +459,7 @@ struct CommunicatorReceiver {
return true;
}
/// \brief Handle \ref Pread message
bool operator()(Pread&& args) {
if (debug) {
std::cerr << "got a pread message for fd " << args.fd() << " for " << args.nbyte()
......@@ -403,6 +486,7 @@ struct CommunicatorReceiver {
return true;
}
/// \brief Handle \ref StatBlksize message
bool operator()(StatBlksize&& args) {
if (debug) {
std::cerr << "got a stat_blksize message for fd " << args.fd() << std::endl;
......@@ -413,6 +497,7 @@ struct CommunicatorReceiver {
return true;
}
/// \brief Handle \ref StatFile message
bool operator()(StatFile&& args) {
if (debug) {
std::cerr << "got a stat_file message for path \"" << args.path() << "\"" << std::endl;
......@@ -423,6 +508,7 @@ struct CommunicatorReceiver {
return true;
}
/// \brief Handle \ref Flush message
bool operator()(Flush&& args) {
if (debug) {
std::cerr << "got a flush message for fd " << args.fd() << std::endl;
......@@ -435,6 +521,7 @@ struct CommunicatorReceiver {
return true;
}
/// \brief Handle an unexpected message
template<typename T>
bool operator()(T&& args) {
throw std::runtime_error{"unexpected message type"};
......@@ -444,12 +531,14 @@ struct CommunicatorReceiver {
Communicator& communicator_;
};
/// \brief Construct from a \ref Communicator
explicit CommunicatorReceiver(Communicator&& communicator) noexcept : communicator_{std::move(communicator)} {
if (debug) {
std::cerr << "launching a CommunicatorReceiver" << std::endl;
}
}
/// \brief Receive (and handle) messages
void operator()() {
bool go_on;
do {
......@@ -462,19 +551,25 @@ private:
Communicator communicator_;
};
/// \brief An MPI port that can accept new connections
struct Port {
/// \brief Open a port
Port() noexcept {
std::array<char, MPI_MAX_PORT_NAME> name;
MPI_Open_port(MPI_INFO_NULL, name.data());
name_ = name.data();
}
/// \brief Copying not allowed
Port(const Port&) = delete;
/// \brief Construct by moving
Port(Port&& other) noexcept : name_{std::move(other.name_)} {
other.name_.clear();
}
/// \brief Copying not allowed
Port& operator=(const Port&) = delete;
/// \brief Assign by moving
Port& operator=(Port&& other) noexcept {
std::string temp{std::move(other.name_)};
other.name_.clear();
......@@ -483,14 +578,17 @@ struct Port {
return *this;
}
/// \brief Close the port at end of life time
~Port() {
if (!name_.empty()) { MPI_Close_port(name_.c_str()); }
}
/// \brief Name of the port, used for connecting
[[nodiscard]] const std::string& name() const {
return this->name_;
}
/// \brief Accept a new connection from a client
[[nodiscard]] Communicator accept() const {
MPI_Comm comm;
MPI_Comm_accept(name_.c_str(), MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm);
......@@ -501,13 +599,16 @@ private:
std::string name_;
};
/// \brief Accepts new client connections on an open port
struct PortAcceptor {
/// \brief Construct from an open \ref Port
explicit PortAcceptor(Port&& port) noexcept : port_{std::move(port)} {
if (debug) {
std::cerr << "launching a PortAcceptor" << std::endl;
}
}
/// \brief Accept new connections until \ref Shutdown \ref Message is received
void operator()() {
while (true) {
if (debug) {
......@@ -527,6 +628,7 @@ private:
Port port_;
};
/// \brief Sub-command that acts as a client and sends a \ref Shutdown \ref Message
int shutdown(int argc, char* argv[]) {
MPI_Init(&argc, &argv);
int rank;
......@@ -559,6 +661,7 @@ int shutdown(int argc, char* argv[]) {
return EXIT_SUCCESS;
}
/// \brief Sub-command to print definitions for use from bash job scripts
int bash_defs(int argc, char* argv[]) {
const char* defs = R"#(
function sionfwd-spawn {
......@@ -579,6 +682,11 @@ int bash_defs(int argc, char* argv[]) {
return EXIT_SUCCESS;
}
/// \brief Main function of the "server" sub-command
///
/// Every process opens a port. Process 0 gathers port names and prints
/// them to `std::cout`. New client connections are accepted until a
/// \ref Shutdown \ref Message is received.
int server(int argc, char *argv[]) {
// int provided;
// MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
......@@ -612,6 +720,7 @@ int server(int argc, char *argv[]) {
return EXIT_SUCCESS;
}
/// \brief Dispatch to sub-command main funcions
int main(int argc, char *argv[]) {
if (argc > 1) {
if (argv[1] == std::string{"shutdown"}) {
......@@ -624,3 +733,5 @@ int main(int argc, char *argv[]) {
return server(argc, argv);
}
/// @}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment