diff --git a/CMakeLists.txt b/CMakeLists.txt index d56f3ea268cec5acf9437e20a9424edf5e0bc002..40f402a8aafc9e45e699f220b29a02861ce7f180 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,8 +20,11 @@ target_link_libraries(pariox ${HDF5_LIBRARIES}) set(R123_INCL "./examples/include/") include_directories(${R123_INCL}) -add_executable(h5_hpx_write ${PARIOX_EX}/async_IO_testbed.cpp ${R123_INCL}/Random123/philox.h ${R123_INCL}/Random123/uniform.hpp)#matrix_mult_write_threadsafe.cpp) +add_executable(h5_hpx_write ${PARIOX_EX}/async_write_testbed.cpp ${R123_INCL}/Random123/philox.h ${R123_INCL}/Random123/uniform.hpp)#matrix_mult_write_threadsafe.cpp) target_link_libraries(h5_hpx_write HPX::hpx HPX::wrap_main HPX::iostreams_component ${HDF5_LIBRARIES} ${MPI_LIBRARIES} ${CMAKE_BINARY_DIR}/libpariox.a) -add_executable(h5_hpx_read ${PARIOX_EX}/matrix_mult_read_threadsafe.cpp) +add_executable(h5_hpx_read ${PARIOX_EX}/async_read_testbed.cpp ${R123_INCL}/Random123/philox.h ${R123_INCL}/Random123/uniform.hpp)#matrix_mult_write_threadsafe.cpp) target_link_libraries(h5_hpx_read HPX::hpx HPX::wrap_main HPX::iostreams_component ${HDF5_LIBRARIES} ${MPI_LIBRARIES} ${CMAKE_BINARY_DIR}/libpariox.a) + +#add_executable(h5_hpx_read ${PARIOX_EX}/matrix_mult_read_threadsafe.cpp) +#target_link_libraries(h5_hpx_read HPX::hpx HPX::wrap_main HPX::iostreams_component ${HDF5_LIBRARIES} ${MPI_LIBRARIES} ${CMAKE_BINARY_DIR}/libpariox.a) diff --git a/examples/async_read_testbed.cpp b/examples/async_read_testbed.cpp new file mode 100644 index 0000000000000000000000000000000000000000..71fd373764bb1b48a6a49e6581c9eaa1796112a9 --- /dev/null +++ b/examples/async_read_testbed.cpp @@ -0,0 +1,438 @@ +#include <hpx/config.hpp> +#include <hpx/hpx_main.hpp> +#include <hpx/algorithm.hpp> +#include <hpx/modules/collectives.hpp> + +#include <hpx/include/actions.hpp> +#include <hpx/include/components.hpp> +#include <hpx/include/lcos.hpp> +#include <hpx/include/parallel_executors.hpp> +#include <hpx/include/runtime.hpp> +#include <hpx/include/util.hpp> +#include <hpx/iostream.hpp> +#include <hpx/runtime.hpp> + +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <list> +#include <set> +#include <vector> +#include <cmath> +#include <time.h> + +#include "./pariox_h5.hpp" +#include <Random123/philox.h> +#include <Random123/uniform.hpp> + +int generate_2D_mat(double*** matrix, std::size_t m, std::size_t n, std::size_t locality){ + typedef r123::Philox4x32 RNG; + RNG rng; + RNG::ctr_type c={{}}; + RNG::ukey_type uk={{}}; + uk[0] = locality; // some user_supplied_seed + RNG::key_type k=uk; + RNG::ctr_type r; + + c[0] = 15112024; + c[1] = 16122024; + + std::srand(locality * 15112024); + + *matrix = (double **)calloc(m, sizeof(double*)); + for (std::size_t i = 0; i < m; i++){ + (*matrix)[i] = (double *)calloc(n, sizeof(double)); + } + + for (std::size_t i = 0; i < m; i++){ + for (std::size_t j = 0; j < n; j++){ + c[0] += 1; + c[1] += 1; + r = rng(c, k); + (*matrix)[i][j] = r123::u01<double>(r.v[0]); //static_cast<double>(std::rand()) / RAND_MAX; + //(*matrix)[i][j] = static_cast<double>(std::rand()) / RAND_MAX; + } + } + return 0; +} + +int generate_vec(double **x_vec, std::size_t n, std::size_t locality){ + std::srand(241119); + + *x_vec = (double *)calloc(n, sizeof(double*)); + for (std::size_t i = 0; i < n; i++){ + (*x_vec)[i] = static_cast<double>(std::rand()) / RAND_MAX; + } + return 0; +} + +double vec_mult(double *mat_row, double *x_vec, std::size_t vec_length){ +// Look at std::span() for defining mat_row input. + double b_entry; + // printf("mat_val = %f %f %f %f\n", mat_row[0], mat_row[1], x_vec[0], x_vec[1]); + + for (std::size_t i = 0; i < vec_length; i++){ + b_entry = b_entry + mat_row[i]*x_vec[i]; + } + + return b_entry; +} + +double demo_task_work(double *mat_start_idx, double *x_vec, std::size_t mat_row_size, double *B_entry){ +// NOTE: Passing by reference `*B_entry` does not, in fact, update the allocated values in the LOCALITY context!!! +// This is left as a demonstrator, to show B_entry in Local context doesn't get updated at task level. + double res; + + res = vec_mult(mat_start_idx, x_vec, mat_row_size); + B_entry = &res; + +// printf("x_vec = %f %f\n", res, B_entry); + return res; +} + +double* task_compute_write_work(double ***loc_A_mat, std::size_t row_offset, std::size_t col_offset, \ + std::size_t m_rows, std::size_t n_cols, double *loc_X_vec, hid_t *h5_ids, \ + hid_t *dset_ids, std::size_t fspace_offset, std::size_t locality_id, \ + std::size_t task_instance){ + //printf("ROW: %u, COL: %u, loc_A_mat: %f %f %f x_vec: %f %f %f\n", row_offset, col_offset,\ + (*loc_A_mat)[row_offset][0], (*loc_A_mat)[row_offset][1], (*loc_A_mat)[row_offset][2],\ + loc_X_vec[0], loc_X_vec[1], loc_X_vec[2]); + struct timespec start, end, io_start, io_end, mat_prep_start, mat_prep_end; + double ** task_A_mat; + std::size_t seed = locality_id + m_rows*n_cols*task_instance; + + clock_gettime(CLOCK_MONOTONIC, &mat_prep_start); + generate_2D_mat(&task_A_mat, m_rows, n_cols, seed); + + std::size_t task_size = m_rows*n_cols; + double *write_buffer; + std::size_t file_idx; + write_buffer = (double*)calloc(task_size, sizeof(double)); + for (std::size_t i = 0; i < m_rows; i++){ + for (std::size_t j = 0; j < n_cols; j++){ + file_idx = i*n_cols + j; + write_buffer[file_idx] = task_A_mat[i][j]; + } + } + clock_gettime(CLOCK_MONOTONIC, &mat_prep_end); + + clock_gettime(CLOCK_MONOTONIC, &start); + + // Compute the mat * vec + double *task_b_vec; + task_b_vec = (double *)calloc(m_rows,sizeof(double)); + + for (std::size_t i = 0; i < m_rows; i++){ + task_b_vec[i] = 0; + //task_b_vec[i] = vec_mult(loc_A_mat[row_offset + i][0], loc_X_vec, n_cols); + for (std::size_t j = 0; j < n_cols; j++){ + file_idx = i*n_cols + j; + task_b_vec[i] = task_b_vec[i] + write_buffer[file_idx]*loc_X_vec[j]; + // task_b_vec[i] = task_b_vec[i] + task_A_mat[i][j]*loc_X_vec[j]; + } + //printf("task_b_vec[%u] = %f\n", i, task_b_vec[i]); + } + + for (std::size_t i = 0; i < m_rows; i++){ + free(task_A_mat[i]); + } + free(task_A_mat); + + // write the task local matrix entries. + clock_gettime(CLOCK_MONOTONIC, &io_start); + write_H5_ndim_data_L(h5_ids, &dset_ids[0], &fspace_offset, &task_size, &write_buffer[0]); + clock_gettime(CLOCK_MONOTONIC, &io_end); + free(write_buffer); + + clock_gettime(CLOCK_MONOTONIC, &end); + + double p_mat_start = mat_prep_start.tv_sec*1.0 + mat_prep_start.tv_nsec*1e-9; + double p_mat_end = mat_prep_end.tv_sec*1.0 + mat_prep_end.tv_nsec*1e-9; + double diff_mat_d = p_mat_end - p_mat_start; + + double p_start = start.tv_sec*1.0 + start.tv_nsec*1e-9; + double p_end = end.tv_sec*1.0 + end.tv_nsec*1e-9; + double diff_d = p_end - p_start; + + double p_io_start = (io_start.tv_sec)*1.0 + io_start.tv_nsec*1e-9; + double p_io_end = (io_end.tv_sec)*1.0 + io_end.tv_nsec*1e-9; + double diff_io_d = p_io_end - p_io_start; + double bytes_tf_rate = (task_size*8)/(diff_io_d*1024*1024); + printf("LOC: %4u, Task: %5u Start: %f, End: %f, Mat Dur: %f, I/O Dur: %f, SUB Dur: %f, Write_rate: %f MB/s\n", \ + locality_id, task_instance, p_io_start, p_io_end, diff_mat_d, diff_io_d, diff_d, bytes_tf_rate); + + return task_b_vec; +} + +double* task_compute_read_work(double ***loc_A_mat, std::size_t row_offset, std::size_t col_offset, \ + std::size_t m_rows, std::size_t n_cols, double **loc_X_vec, hid_t *h5_ids, \ + hid_t *dset_ids, std::size_t fspace_offset, std::size_t locality_id, \ + std::size_t task_instance){ + struct timespec start, end, io_start, io_end; + clock_gettime(CLOCK_MONOTONIC, &start); + + double *task_b_vec; + task_b_vec = (double *)calloc(m_rows, sizeof(double)); + + std::size_t task_local_sizes = m_rows*n_cols; + double *write_buffer; + write_buffer = (double*)calloc(task_local_sizes, sizeof(double)); + + clock_gettime(CLOCK_MONOTONIC, &io_start); + read_H5_ndim_data_L(h5_ids, &dset_ids[0], &fspace_offset, &task_local_sizes, &write_buffer[0]); + clock_gettime(CLOCK_MONOTONIC, &io_end); + + std::size_t file_idx = 0; + /*for (std::size_t i = 0; i < m_rows; i++){ + for (std::size_t j = 0; j < n_cols; j++){ + file_idx = i*n_cols + j; + (*loc_A_mat)[row_offset + i][j] = write_buffer[file_idx]; + } + } + + for (std::size_t i = 0; i < m_rows; i++){ + //printf("Row_off: %u, %u, write_buff: %f %f %f %f\n", row_offset, fspace_offset, \ + write_buffer[0], write_buffer[1], \ + write_buffer[2], write_buffer[3]); + printf("Row_off: %u, %u, Task A mat: %f %f %f %f\n", row_offset, fspace_offset,\ + (*loc_A_mat)[row_offset + i][0], \ + (*loc_A_mat)[row_offset + i][1], \ + (*loc_A_mat)[row_offset + i][2], \ + (*loc_A_mat)[row_offset + i][3]); + } + printf("x_vec: %f %f ... %f %f\n", (*loc_X_vec)[0], (*loc_X_vec)[1], (*loc_X_vec)[n_cols-2], \ + (*loc_X_vec)[n_cols-1]);*/ + // Compute! + for (std::size_t i = 0; i < m_rows; i++){ + for (std::size_t j = 0; j < n_cols; j++){ + file_idx = i*n_cols + j; + task_b_vec[i] = task_b_vec[i] + write_buffer[file_idx] * (*loc_X_vec)[j]; + } + } + free(write_buffer); + clock_gettime(CLOCK_MONOTONIC, &end); + double p_start = start.tv_sec*1.0 + start.tv_nsec*1e-9; + double p_end = end.tv_sec*1.0 + end.tv_nsec*1e-9; + double diff_d = p_end - p_start; + + double p_io_start = (io_start.tv_sec)*1.0 + io_start.tv_nsec*1e-9; + double p_io_end = (io_end.tv_sec)*1.0 + io_end.tv_nsec*1e-9; + double diff_io_d = p_io_end - p_io_start; + double bytes_tf_rate = (task_local_sizes*8)/(diff_io_d*1024*1024); + printf("LOC: %4u, Task: %5u Start: %f, End: %f, I/O Dur: %f, SUB Dur: %f, Read_rate: %f MB/s\n", locality_id, task_instance, p_io_start, p_io_end, diff_io_d, diff_d, bytes_tf_rate); + + return task_b_vec; +} + +std::vector<double> locality_work(std::size_t tot_mat_rows, std::size_t tot_mat_cols, std::uint32_t ntot_localities, std::size_t rows_ptask){ + std::size_t loc_mat_rows; + std::size_t remainder_rows; + std::size_t preceding_rows; + std::size_t str_idx; + std::size_t rows_per_task; + std::size_t num_chunks; + + double *loc_X_vec, *loc_B_vec; + double **loc_A_mat; + + std::size_t const os_threads = hpx::get_os_thread_count(); + std::size_t locality_id = hpx::get_locality_id(); +//***************************************Prepare matrix entries*************************************** + std::vector<std::size_t> total_mat_sizes(2); + std::vector<std::size_t> local_mat_sizes(2); + total_mat_sizes[0] = tot_mat_rows; //32768; // 262144 for 512 GB; + total_mat_sizes[1] = tot_mat_cols; //32768; + std::size_t tot_size_1D = total_mat_sizes[0]*total_mat_sizes[1]; + + local_mat_sizes[0] = total_mat_sizes[0]/ntot_localities; + local_mat_sizes[1] = total_mat_sizes[1]; + std::size_t loc_size_1D = local_mat_sizes[0]*local_mat_sizes[1]; + + std::vector<hid_t> h5_ids; + hid_t data_id, dtype_id, attr_id, dset_id, x_vec_id, b_vec_id; + std::size_t ndims; + std::vector<hid_t> dset_ids(3); + std::size_t task_dim_lens; + std::size_t temp; + std::vector<std::size_t> row_offsets; + //=================Casting shenanigans due to strange behaviour of std::size_t division always return 0 ===================== + int test = rows_ptask; + rows_per_task = static_cast<std::size_t>(test); + //=========================================================================================================================== + + dtype_id = H5T_NATIVE_DOUBLE; +//========================================Start Read section=========================================== + //============================Open file to read datasets============================== + h5_ids = open_H5_file_L(MPI_COMM_WORLD, "./mat_mult_demo.h5"); + + std::size_t *Mat_1D_lens; + dset_ids[0] = access_H5_dataset_L(&h5_ids[0], "/Mat_entries", &dtype_id, &ndims, &Mat_1D_lens); + + hid_t attr_dtype_id = H5T_NATIVE_UINT; + access_H5_scalar_attr_L<std::size_t>(&dset_ids[0], "Total_cols", attr_dtype_id, &total_mat_sizes[1]); + access_H5_scalar_attr_L<std::size_t>(&dset_ids[0], "Total_rows", attr_dtype_id, &total_mat_sizes[0]); + + local_mat_sizes[0] = total_mat_sizes[0]/ntot_localities; + local_mat_sizes[1] = total_mat_sizes[1]; + loc_size_1D = local_mat_sizes[0]*local_mat_sizes[1]; + + std::size_t *dummy_lens; + std::size_t dummy_ndims; + dset_ids[1] = access_H5_dataset_L(&h5_ids[0], "/X_vec", &dtype_id, &dummy_ndims, &dummy_lens); + dset_ids[2] = access_H5_dataset_L(&h5_ids[0], "/B_vec", &dtype_id, &dummy_ndims, &dummy_lens); + + //loc_A_mat = (double **)calloc(local_mat_sizes[0], sizeof(double*)); + //for (std::size_t i = 0; i < local_mat_sizes[0]; i++){ + // loc_A_mat[i] = (double *)calloc(local_mat_sizes[1], sizeof(double)); + //} + + loc_X_vec = (double *)calloc(local_mat_sizes[1], sizeof(double)); + loc_B_vec = (double *)calloc(local_mat_sizes[0], sizeof(double)); + free(dummy_lens); + + std::size_t x_fspace_id = 0; + read_H5_ndim_data_L<double>(&h5_ids[0], &dset_ids[1], &x_fspace_id, &local_mat_sizes[1], &loc_X_vec[0]); + x_fspace_id = locality_id*local_mat_sizes[0]; + read_H5_ndim_data_L<double>(&h5_ids[0], &dset_ids[2], &x_fspace_id, &local_mat_sizes[0], &loc_B_vec[0]); + //============================Begin Task-offloading=================================== + // Compute task-level offsets + num_chunks = local_mat_sizes[0]/rows_per_task; + + std::vector<std::size_t> fspace_R_offsets(num_chunks); + index_offset_calc(ndims, &loc_size_1D, &Mat_1D_lens[0], locality_id, &fspace_R_offsets[0]); + + task_dim_lens = rows_per_task*local_mat_sizes[1]; + temp = 0; + row_offsets.resize(num_chunks); + for (std::size_t i = 0; i < num_chunks; i++){ + index_offset_calc(ndims, &task_dim_lens, &loc_size_1D, i, &temp); + fspace_R_offsets[i] = fspace_R_offsets[0] + temp; + row_offsets[i] = i*rows_per_task; + } + + double *comp_B_vec = (double *)calloc(local_mat_sizes[0], sizeof(double)); + + std::size_t chunks_sent = 0; + std::size_t row_offset = 0; + std::size_t col_offset = 0; + std::vector<hpx::future<double*>> task_readrow_state; + task_readrow_state.reserve(num_chunks); + while (chunks_sent < num_chunks){ + task_readrow_state.push_back(hpx::async(task_compute_read_work, \ + &loc_A_mat, row_offsets[chunks_sent], col_offset, rows_per_task, local_mat_sizes[1], &loc_X_vec, \ + &h5_ids[0], &dset_ids[0], fspace_R_offsets[chunks_sent], locality_id, chunks_sent)); + + chunks_sent += 1; + } + auto indiv_read_handling = [&](std::size_t implicit_i, hpx::future<double*> &&fut){ + char const* mesg ;//= "Writing done from OS-Thread {1} on locality {2}\n"; + double *task_b_vals = fut.get(); + + std::size_t initial_offset = implicit_i*rows_per_task; + for (std::size_t i = 0; i < rows_per_task; i++){ + comp_B_vec[initial_offset + i] = task_b_vals[i]; + } + + /*for (std::size_t i = 0; i < rows_per_task; i++){ + mesg = "B_val {1} at implicit_i of {2} on locality {3}.\n"; + hpx::util::format_to(hpx::cout, mesg, comp_B_vec[initial_offset + i], implicit_i, hpx::find_here()) \ + << std::flush; + }*/ + free(task_b_vals); + }; + hpx::wait_each(indiv_read_handling, task_readrow_state); + + // CHECK B_vec + double error; + for (std::size_t i = 0; i < local_mat_sizes[0]; i++){ + error = fabs(comp_B_vec[i] - loc_B_vec[i]); + if (error > 1e-6) { + printf("idx: %u, expected: %f, outcome: %f\n", locality_id*local_mat_sizes[0] + i, loc_B_vec[i], comp_B_vec[i]); + } + } + + //============================END Task-offloading=================================== + free(loc_X_vec); + free(loc_B_vec); + //for (std::size_t i = 0; i < local_mat_sizes[0]; i++){ + // free(loc_A_mat[i]); + //} + //free(loc_A_mat); + free(Mat_1D_lens); + close_H5_dataset_L(&dset_ids[0]); + close_H5_dataset_L(&dset_ids[1]); + close_H5_dataset_L(&dset_ids[2]); + close_H5_file_L(&h5_ids[0]); +//========================================End Read section============================================= + + std::vector<double> dummy_B(10); + return dummy_B; +} +HPX_PLAIN_ACTION(locality_work, locality_action) + +int main(int argc, char **argv){ + std::size_t tot_mat_rows, tot_mat_cols; + std::size_t remainder_rows, loc_mat_rows; + std::size_t rows_per_task; + if (argc < 3){ + printf("Error: Insufficient number of command line argument. \n \ + Usage: mpiexec -n 2 <executable> <int value for matrix's first dimension lenght> <int value for rows per task>\n"); + exit(0); + } + sscanf(argv[1], "%u", &tot_mat_rows); + sscanf(argv[2], "%u", &rows_per_task); + + std::vector<hpx::id_type> localities = hpx::find_all_localities(); + std::uint32_t ntot_localities = hpx::get_num_localities().get(); + std::vector<double> b_vec; + + std::vector<std::size_t> preceding_rows; + tot_mat_cols = tot_mat_rows; + + preceding_rows.resize(ntot_localities); + b_vec.resize(tot_mat_rows); + + remainder_rows = tot_mat_rows % ntot_localities; + loc_mat_rows = tot_mat_rows / ntot_localities; + + preceding_rows[0] = 0; + for (std::size_t i = 1; i < ntot_localities; i++){ + if (i - 1 < remainder_rows){ + loc_mat_rows = tot_mat_rows / ntot_localities + 1; + } else { + loc_mat_rows = tot_mat_rows / ntot_localities; + } + preceding_rows[i] = preceding_rows[i-1] + loc_mat_rows; +// printf("preceding_rows[%u] = %u\n", i, preceding_rows[i]); + } + + std::vector<hpx::future<std::vector<double>>> locality_futures; + locality_futures.reserve(ntot_localities); + for (hpx::id_type const& locality : localities) + { + locality_futures.push_back(hpx::async<locality_action>(locality, tot_mat_rows, tot_mat_cols, ntot_localities, rows_per_task)); + } + + auto indiv_locality_handling = [&](std::size_t locality_i, hpx::future<std::vector<double>> &&loc_fut){ + // NOTE: every future that gets resolved will call this function! + // locality_i auto returns the indexed sequence of future at the moment of `push_back` above. + // EXTRA: This locality_i can be optionally removed, in the assumption that this function will only be called once + // upon the completion of each remote locality. + // Then, the temp_B_vecs can be converted to only a std::vector<double> type. + std::vector<std::vector<double>> temp_B_vecs; + temp_B_vecs.resize(ntot_localities); + + temp_B_vecs[locality_i] = loc_fut.get(); + auto insert_pos = b_vec.begin() + (preceding_rows[locality_i]); + b_vec.insert(insert_pos, temp_B_vecs[locality_i].begin(), temp_B_vecs[locality_i].end()); + char const* mesg = "Work done on locality {1}\n"; +// hpx::util::format_to(hpx::cout, mesg, locality_i) << std::flush; + }; + hpx::wait_each(indiv_locality_handling, locality_futures); + + /*for (std::size_t i = 0; i < tot_mat_rows; i++){ + printf("global b_vec[%d] = %f\n", i, b_vec[i]); + }*/ + + return 0; +} diff --git a/examples/async_write_testbed.cpp b/examples/async_write_testbed.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2be768b3f9950122ecdfce4e6eb50bb0456308f5 --- /dev/null +++ b/examples/async_write_testbed.cpp @@ -0,0 +1,444 @@ +#include <hpx/config.hpp> +#include <hpx/hpx_main.hpp> +#include <hpx/algorithm.hpp> +#include <hpx/modules/collectives.hpp> + +#include <hpx/include/actions.hpp> +#include <hpx/include/components.hpp> +#include <hpx/include/lcos.hpp> +#include <hpx/include/parallel_executors.hpp> +#include <hpx/include/runtime.hpp> +#include <hpx/include/util.hpp> +#include <hpx/iostream.hpp> +#include <hpx/runtime.hpp> + +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <list> +#include <set> +#include <vector> +#include <cmath> +#include <time.h> + +#include "./pariox_h5.hpp" +#include <Random123/philox.h> +#include <Random123/uniform.hpp> + +int generate_2D_mat(double*** matrix, std::size_t m, std::size_t n, std::size_t locality){ + typedef r123::Philox4x32 RNG; + RNG rng; + RNG::ctr_type c={{}}; + RNG::ukey_type uk={{}}; + uk[0] = locality; // some user_supplied_seed + RNG::key_type k=uk; + RNG::ctr_type r; + + c[0] = 15112024; + c[1] = 16122024; + + std::srand(locality * 15112024); + + *matrix = (double **)calloc(m, sizeof(double*)); + for (std::size_t i = 0; i < m; i++){ + (*matrix)[i] = (double *)calloc(n, sizeof(double)); + } + + for (std::size_t i = 0; i < m; i++){ + for (std::size_t j = 0; j < n; j++){ + c[0] += 1; + c[1] += 1; + r = rng(c, k); + (*matrix)[i][j] = r123::u01<double>(r.v[0]); //static_cast<double>(std::rand()) / RAND_MAX; + //(*matrix)[i][j] = static_cast<double>(std::rand()) / RAND_MAX; + } + } + return 0; +} + +int generate_vec(double **x_vec, std::size_t n, std::size_t locality){ + std::srand(241119); + + *x_vec = (double *)calloc(n, sizeof(double*)); + for (std::size_t i = 0; i < n; i++){ + (*x_vec)[i] = static_cast<double>(std::rand()) / RAND_MAX; + } + return 0; +} + +double vec_mult(double *mat_row, double *x_vec, std::size_t vec_length){ +// Look at std::span() for defining mat_row input. + double b_entry; + // printf("mat_val = %f %f %f %f\n", mat_row[0], mat_row[1], x_vec[0], x_vec[1]); + + for (std::size_t i = 0; i < vec_length; i++){ + b_entry = b_entry + mat_row[i]*x_vec[i]; + } + + return b_entry; +} + +double demo_task_work(double *mat_start_idx, double *x_vec, std::size_t mat_row_size, double *B_entry){ +// NOTE: Passing by reference `*B_entry` does not, in fact, update the allocated values in the LOCALITY context!!! +// This is left as a demonstrator, to show B_entry in Local context doesn't get updated at task level. + double res; + + res = vec_mult(mat_start_idx, x_vec, mat_row_size); + B_entry = &res; + +// printf("x_vec = %f %f\n", res, B_entry); + return res; +} + +double* task_compute_write_work(double ***loc_A_mat, std::size_t row_offset, std::size_t col_offset, \ + std::size_t m_rows, std::size_t n_cols, double *loc_X_vec, hid_t *h5_ids, \ + hid_t *dset_ids, std::size_t fspace_offset, std::size_t locality_id, \ + std::size_t task_instance){ + //printf("ROW: %u, COL: %u, loc_A_mat: %f %f %f x_vec: %f %f %f\n", row_offset, col_offset,\ + (*loc_A_mat)[row_offset][0], (*loc_A_mat)[row_offset][1], (*loc_A_mat)[row_offset][2],\ + loc_X_vec[0], loc_X_vec[1], loc_X_vec[2]); + struct timespec start, end, io_start, io_end, mat_prep_start, mat_prep_end; + double ** task_A_mat; + std::size_t seed = locality_id + m_rows*n_cols*task_instance; + + clock_gettime(CLOCK_MONOTONIC, &mat_prep_start); + generate_2D_mat(&task_A_mat, m_rows, n_cols, seed); + + std::size_t task_size = m_rows*n_cols; + double *write_buffer; + std::size_t file_idx; + write_buffer = (double*)calloc(task_size, sizeof(double)); + for (std::size_t i = 0; i < m_rows; i++){ + for (std::size_t j = 0; j < n_cols; j++){ + file_idx = i*n_cols + j; + write_buffer[file_idx] = task_A_mat[i][j]; + } + } + clock_gettime(CLOCK_MONOTONIC, &mat_prep_end); + + clock_gettime(CLOCK_MONOTONIC, &start); + + // Compute the mat * vec + double *task_b_vec; + task_b_vec = (double *)calloc(m_rows,sizeof(double)); + + for (std::size_t i = 0; i < m_rows; i++){ + task_b_vec[i] = 0; + //task_b_vec[i] = vec_mult(loc_A_mat[row_offset + i][0], loc_X_vec, n_cols); + for (std::size_t j = 0; j < n_cols; j++){ + file_idx = i*n_cols + j; + task_b_vec[i] = task_b_vec[i] + write_buffer[file_idx]*loc_X_vec[j]; + // task_b_vec[i] = task_b_vec[i] + task_A_mat[i][j]*loc_X_vec[j]; + } + //printf("task_b_vec[%u] = %f\n", i, task_b_vec[i]); + } + + for (std::size_t i = 0; i < m_rows; i++){ + free(task_A_mat[i]); + } + free(task_A_mat); + + // write the task local matrix entries. + clock_gettime(CLOCK_MONOTONIC, &io_start); + write_H5_ndim_data_L(h5_ids, &dset_ids[0], &fspace_offset, &task_size, &write_buffer[0]); + clock_gettime(CLOCK_MONOTONIC, &io_end); + free(write_buffer); + + clock_gettime(CLOCK_MONOTONIC, &end); + + double p_mat_start = mat_prep_start.tv_sec*1.0 + mat_prep_start.tv_nsec*1e-9; + double p_mat_end = mat_prep_end.tv_sec*1.0 + mat_prep_end.tv_nsec*1e-9; + double diff_mat_d = p_mat_end - p_mat_start; + + double p_start = start.tv_sec*1.0 + start.tv_nsec*1e-9; + double p_end = end.tv_sec*1.0 + end.tv_nsec*1e-9; + double diff_d = p_end - p_start; + + double p_io_start = (io_start.tv_sec)*1.0 + io_start.tv_nsec*1e-9; + double p_io_end = (io_end.tv_sec)*1.0 + io_end.tv_nsec*1e-9; + double diff_io_d = p_io_end - p_io_start; + double bytes_tf_rate = (task_size*8)/(diff_io_d*1024*1024); + printf("LOC: %4u, Task: %5u Start: %f, End: %f, Mat Dur: %f, I/O Dur: %f, SUB Dur: %f, Write_rate: %f MB/s\n", \ + locality_id, task_instance, p_io_start, p_io_end, diff_mat_d, diff_io_d, diff_d, bytes_tf_rate); + + return task_b_vec; +} + +double* task_compute_read_work(double ***loc_A_mat, std::size_t row_offset, std::size_t col_offset, \ + std::size_t m_rows, std::size_t n_cols, double **loc_X_vec, hid_t *h5_ids, \ + hid_t *dset_ids, std::size_t fspace_offset, std::size_t locality_id, \ + std::size_t task_instance){ + struct timespec start, end, io_start, io_end; + clock_gettime(CLOCK_MONOTONIC, &start); + + double *task_b_vec; + task_b_vec = (double *)calloc(m_rows, sizeof(double)); + + std::size_t task_local_sizes = m_rows*n_cols; + double *write_buffer; + write_buffer = (double*)calloc(task_local_sizes, sizeof(double)); + + clock_gettime(CLOCK_MONOTONIC, &io_start); + read_H5_ndim_data_L(h5_ids, &dset_ids[0], &fspace_offset, &task_local_sizes, &write_buffer[0]); + clock_gettime(CLOCK_MONOTONIC, &io_end); + + std::size_t file_idx = 0; + /*for (std::size_t i = 0; i < m_rows; i++){ + for (std::size_t j = 0; j < n_cols; j++){ + file_idx = i*n_cols + j; + (*loc_A_mat)[row_offset + i][j] = write_buffer[file_idx]; + } + } + + for (std::size_t i = 0; i < m_rows; i++){ + //printf("Row_off: %u, %u, write_buff: %f %f %f %f\n", row_offset, fspace_offset, \ + write_buffer[0], write_buffer[1], \ + write_buffer[2], write_buffer[3]); + printf("Row_off: %u, %u, Task A mat: %f %f %f %f\n", row_offset, fspace_offset,\ + (*loc_A_mat)[row_offset + i][0], \ + (*loc_A_mat)[row_offset + i][1], \ + (*loc_A_mat)[row_offset + i][2], \ + (*loc_A_mat)[row_offset + i][3]); + } + printf("x_vec: %f %f ... %f %f\n", (*loc_X_vec)[0], (*loc_X_vec)[1], (*loc_X_vec)[n_cols-2], \ + (*loc_X_vec)[n_cols-1]);*/ + // Compute! + for (std::size_t i = 0; i < m_rows; i++){ + for (std::size_t j = 0; j < n_cols; j++){ + file_idx = i*n_cols + j; + task_b_vec[i] = task_b_vec[i] + write_buffer[file_idx] * (*loc_X_vec)[j]; + } + } + free(write_buffer); + clock_gettime(CLOCK_MONOTONIC, &end); + double p_start = start.tv_sec*1.0 + start.tv_nsec*1e-9; + double p_end = end.tv_sec*1.0 + end.tv_nsec*1e-9; + double diff_d = p_end - p_start; + + double p_io_start = (io_start.tv_sec)*1.0 + io_start.tv_nsec*1e-9; + double p_io_end = (io_end.tv_sec)*1.0 + io_end.tv_nsec*1e-9; + double diff_io_d = p_io_end - p_io_start; + double bytes_tf_rate = (task_local_sizes*8)/(diff_io_d*1024*1024); + printf("LOC: %4u, Task: %5u Start: %f, End: %f, I/O Dur: %f, SUB Dur: %f, Read_rate: %f MB/s\n", locality_id, task_instance, p_io_start, p_io_end, diff_io_d, diff_d, bytes_tf_rate); + + return task_b_vec; +} + +std::vector<double> locality_work(std::size_t tot_mat_rows, std::size_t tot_mat_cols, std::uint32_t ntot_localities, std::size_t rows_ptask){ + std::size_t loc_mat_rows; + std::size_t remainder_rows; + std::size_t preceding_rows; + std::size_t str_idx; + std::size_t rows_per_task; + std::size_t num_chunks; + + double *loc_X_vec, *loc_B_vec; + double **loc_A_mat; + + std::size_t const os_threads = hpx::get_os_thread_count(); + std::size_t locality_id = hpx::get_locality_id(); +//***************************************Prepare matrix entries*************************************** + std::vector<std::size_t> total_mat_sizes(2); + std::vector<std::size_t> local_mat_sizes(2); + total_mat_sizes[0] = tot_mat_rows; //32768; // 262144 for 512 GB; + total_mat_sizes[1] = tot_mat_cols; //32768; + std::size_t tot_size_1D = total_mat_sizes[0]*total_mat_sizes[1]; + + local_mat_sizes[0] = total_mat_sizes[0]/ntot_localities; + local_mat_sizes[1] = total_mat_sizes[1]; + std::size_t loc_size_1D = local_mat_sizes[0]*local_mat_sizes[1]; + + //=================Casting shenanigans due to strange behaviour of std::size_t division always return 0 ===================== + int test = rows_ptask; + rows_per_task = static_cast<std::size_t>(test); + //=========================================================================================================================== + + // Generate 2D matrix entries, value range [0,1] + // generate_2D_mat(&loc_A_mat, local_mat_sizes[0], local_mat_sizes[1], locality_id); + // printf("LOC: %u %f %f %f\n", locality_id, loc_A_mat[0][0], loc_A_mat[0][1], loc_A_mat[0][2]); + +//***************************************Prepare local X_vec entries*********************************** + generate_vec(&loc_X_vec, local_mat_sizes[1], locality_id); + + loc_B_vec = (double*)calloc(local_mat_sizes[0], sizeof(double)); + +//***************************************Prepare h5 file & datasets************************************ + std::vector<hid_t> h5_ids; + hid_t data_id, dtype_id, attr_id, dset_id, x_vec_id, b_vec_id; + std::size_t ndims = 2; + std::vector<std::size_t> data_sizes(2); + size_t n_datasets = 2; + + h5_ids = prep_H5_file_L(MPI_COMM_WORLD, "./mat_mult_demo.h5"); + + dtype_id = H5T_NATIVE_DOUBLE; + + ndims = 1; + dset_id = attach_H5_dataset_L(&h5_ids[0], "Mat_entries", dtype_id, ndims, &tot_size_1D); + x_vec_id = attach_H5_dataset_L(&h5_ids[0], "X_vec", dtype_id, ndims, &total_mat_sizes[1]); + b_vec_id = attach_H5_dataset_L(&h5_ids[0], "B_vec", dtype_id, ndims, &total_mat_sizes[1]); + + dtype_id = H5T_NATIVE_UINT; + attach_H5_scalar_attr_L<std::size_t>(&dset_id, "Total_rows", dtype_id, &total_mat_sizes[0]); + attach_H5_scalar_attr_L<std::size_t>(&dset_id, "Total_cols", dtype_id, &total_mat_sizes[1]); + attach_H5_scalar_attr_L<std::size_t>(&x_vec_id, "Total_rows", dtype_id, &total_mat_sizes[1]); + attach_H5_scalar_attr_L<std::size_t>(&b_vec_id, "Total_rows", dtype_id, &total_mat_sizes[1]); + +//**************************************Locality ndims data write********************* +/* +// TODO: writing into 1D file data structure is totally fine though... + std::size_t fspace_offsets = 0; + index_offset_calc(ndims, &loc_size_1D, &tot_size_1D, locality_id, &fspace_offsets); + //printf("Loc: %u offset: %u \n", locality_id, fspace_offsets); + std::size_t row_w = 0; + for (std::size_t i = 0; i < local_mat_sizes[0]; i++){ + write_H5_ndim_data_L<double>(&h5_ids[0], &dset_id, &fspace_offsets, &local_mat_sizes[1], &loc_A_mat[row_w][0]); + fspace_offsets += local_mat_sizes[1]; + row_w += 1; + } +*/ +//========================================Write Task-offloading============================================= + //==============Automate offsets calculation for various row size============ + // MAKE SURE the division has no remainders for `num_chunks`! + // Range of rows_per_task = [1, local_mat_sizes[0]] + num_chunks = local_mat_sizes[0]/rows_per_task; + // printf("NUM_CHUNKS: %u %u %u\n", (tot_mat_rows/ntot_localities)/rows_p_task, local_mat_sizes[0], rows_p_task); + + std::vector<std::size_t> fspace_offsets(num_chunks); + index_offset_calc(ndims, &loc_size_1D, &tot_size_1D, locality_id, &fspace_offsets[0]); + + std::vector<std::size_t> row_offsets(num_chunks); + std::size_t task_dim_lens = rows_per_task*local_mat_sizes[1]; + std::size_t temp; + for (std::size_t j = 0; j < num_chunks; j++){ + index_offset_calc(ndims, &task_dim_lens, &loc_size_1D, j, &temp); + fspace_offsets[j] = fspace_offsets[0] + temp; + row_offsets[j] = j*rows_per_task; + } + //=========================================================================== + + std::size_t chunks_sent = 0; + std::size_t row_offset = 0; + std::size_t col_offset = 0; + std::vector<hid_t> dset_ids(3); + dset_ids[0] = dset_id; + dset_ids[1] = x_vec_id; + dset_ids[2] = b_vec_id; + + std::vector<hpx::future<double*>> task_writerow_state; + task_writerow_state.reserve(num_chunks); + + while(chunks_sent < num_chunks){ + task_writerow_state.push_back(hpx::async(task_compute_write_work, \ + &loc_A_mat, row_offsets[chunks_sent], col_offset, rows_per_task, local_mat_sizes[1], &loc_X_vec[0], \ + &h5_ids[0], &dset_ids[0], fspace_offsets[chunks_sent], locality_id, chunks_sent)); + + chunks_sent += 1; + } + auto indiv_write_handling = [&](std::size_t implicit_i, hpx::future<double*> &&fut){ + char const* mesg ;//= "Writing done from OS-Thread {1} on locality {2}\n"; + double *task_b_vals = fut.get(); + + std::size_t initial_offset = implicit_i*rows_per_task; + for (std::size_t i = 0; i < rows_per_task; i++){ + loc_B_vec[initial_offset + i] = task_b_vals[i]; + } + + /*for (std::size_t i = 0; i < rows_per_task; i++){ + mesg = "B_val {1} at implicit_i of {2} on locality {3}.\n"; + hpx::util::format_to(hpx::cout, mesg, loc_B_vec[i], implicit_i, hpx::find_here()) << std::flush; + }*/ + free(task_b_vals); + }; + hpx::wait_each(indiv_write_handling, task_writerow_state); + + ndims = 1; + index_offset_calc(ndims, &local_mat_sizes[0], &local_mat_sizes[1], locality_id, &fspace_offsets[0]); + write_H5_ndim_data_L(&h5_ids[0], &dset_ids[2], &fspace_offsets[0], &local_mat_sizes[0], &loc_B_vec[0]); + write_H5_ndim_data_L(&h5_ids[0], &dset_ids[1], &fspace_offsets[0], &local_mat_sizes[0], &loc_X_vec[fspace_offsets[0]]); + +//======================================clean up arrays=============================================== + free(loc_X_vec); + free(loc_B_vec); + //for (std::size_t i = 0; i < local_mat_sizes[0]; i++){ + // free(loc_A_mat[i]); + //} + //free(loc_A_mat); +//==================================================================================================== + close_H5_dataset_L(&dset_id); + close_H5_dataset_L(&x_vec_id); + close_H5_dataset_L(&b_vec_id); + close_H5_file_L(&h5_ids[0]); +//***************************************End of async-task write I/O ************************************ + + std::vector<double> dummy_B(10); + return dummy_B; +} +HPX_PLAIN_ACTION(locality_work, locality_action) + +int main(int argc, char **argv){ + std::size_t tot_mat_rows, tot_mat_cols; + std::size_t remainder_rows, loc_mat_rows; + std::size_t rows_per_task; + if (argc < 3){ + printf("Error: Insufficient number of command line argument. \n \ + Usage: mpiexec -n 2 <executable> <int value for matrix's first dimension lenght> <int value for rows per task>\n"); + exit(0); + } + sscanf(argv[1], "%u", &tot_mat_rows); + sscanf(argv[2], "%u", &rows_per_task); + + std::vector<hpx::id_type> localities = hpx::find_all_localities(); + std::uint32_t ntot_localities = hpx::get_num_localities().get(); + std::vector<double> b_vec; + + std::vector<std::size_t> preceding_rows; + tot_mat_cols = tot_mat_rows; + + preceding_rows.resize(ntot_localities); + b_vec.resize(tot_mat_rows); + + remainder_rows = tot_mat_rows % ntot_localities; + loc_mat_rows = tot_mat_rows / ntot_localities; + + preceding_rows[0] = 0; + for (std::size_t i = 1; i < ntot_localities; i++){ + if (i - 1 < remainder_rows){ + loc_mat_rows = tot_mat_rows / ntot_localities + 1; + } else { + loc_mat_rows = tot_mat_rows / ntot_localities; + } + preceding_rows[i] = preceding_rows[i-1] + loc_mat_rows; +// printf("preceding_rows[%u] = %u\n", i, preceding_rows[i]); + } + + std::vector<hpx::future<std::vector<double>>> locality_futures; + locality_futures.reserve(ntot_localities); + for (hpx::id_type const& locality : localities) + { + locality_futures.push_back(hpx::async<locality_action>(locality, tot_mat_rows, tot_mat_cols, ntot_localities, rows_per_task)); + } + + auto indiv_locality_handling = [&](std::size_t locality_i, hpx::future<std::vector<double>> &&loc_fut){ + // NOTE: every future that gets resolved will call this function! + // locality_i auto returns the indexed sequence of future at the moment of `push_back` above. + // EXTRA: This locality_i can be optionally removed, in the assumption that this function will only be called once + // upon the completion of each remote locality. + // Then, the temp_B_vecs can be converted to only a std::vector<double> type. + std::vector<std::vector<double>> temp_B_vecs; + temp_B_vecs.resize(ntot_localities); + + temp_B_vecs[locality_i] = loc_fut.get(); + auto insert_pos = b_vec.begin() + (preceding_rows[locality_i]); + b_vec.insert(insert_pos, temp_B_vecs[locality_i].begin(), temp_B_vecs[locality_i].end()); + char const* mesg = "Work done on locality {1}\n"; +// hpx::util::format_to(hpx::cout, mesg, locality_i) << std::flush; + }; + hpx::wait_each(indiv_locality_handling, locality_futures); + + /*for (std::size_t i = 0; i < tot_mat_rows; i++){ + printf("global b_vec[%d] = %f\n", i, b_vec[i]); + }*/ + + return 0; +}