diff --git a/DataExtraction/concurrency_test/distribution_job_100.log b/DataExtraction/concurrency_test/distribution_job_100.log new file mode 100644 index 0000000000000000000000000000000000000000..6170efe3c4c12c4704fa3c8d2f9b3dd0c7c53c4a --- /dev/null +++ b/DataExtraction/concurrency_test/distribution_job_100.log @@ -0,0 +1,90 @@ +2020-06-03 16:20:09,092:INFO:The destination exist +2020-06-03 16:20:09,092:CRITICAL:Destination : /home/a.mozaffari/data_dest/ exist -> Remove and Re-Cereate +2020-06-03 16:20:09,092:INFO:The source path is : //home/a.mozaffari/data_era5/2017/ +2020-06-03 16:20:09,092:INFO:The destination path is : /home/a.mozaffari/data_dest/ +2020-06-03 16:20:09,092:INFO:==== Directory scanner : start ==== +2020-06-03 16:20:09,105:INFO:=== Directory Scanner output === +2020-06-03 16:20:09,106:INFO:Total size of the source directory is:18.975132Gb. +2020-06-03 16:20:09,106:INFO:Total number of the files in the source directory is: 95 +2020-06-03 16:20:09,106:INFO:Total number of the directories in the source directory is: 3 +2020-06-03 16:20:09,106:INFO:==== Directory scanner : end ==== +2020-06-03 16:20:09,106:INFO:==== Data Structure Builder : start ==== +2020-06-03 16:20:09,106:INFO:The destination path exists +2020-06-03 16:20:09,106:INFO:Load Level = 0 : Data Sctructure will be build +2020-06-03 16:20:09,112:INFO:01 is created +2020-06-03 16:20:09,117:INFO:02 is created +2020-06-03 16:20:09,124:INFO:03 is created +2020-06-03 16:20:09,125:INFO:==== Data Structure Builder : end ==== +2020-06-03 16:20:09,125:INFO:MA0: Next to be processed is 01 loacted in //home/a.mozaffari/data_era5/2017/01 +2020-06-03 16:20:09,126:INFO:=== Directory Scanner output === +2020-06-03 16:20:09,126:INFO:Total size of the source directory is:5522.964184Gb. +2020-06-03 16:20:09,126:INFO:Total number of the files in the source directory is: 27 +2020-06-03 16:20:09,126:INFO:Total number of the directories in the source directory is: 0 +2020-06-03 16:20:09,126:INFO:The follwoing is in the load Balancer +2020-06-03 16:20:09,126:INFO:{1: None, 2: None, 3: None, 4: None, 5: None, 6: None, 7: None, 8: None, 9: None, 10: None, 11: None, 12: None} +2020-06-03 16:20:09,126:INFO:['ecmwf_era5_17010115.nc', 'ecmwf_era5_17010204.nc', 'ecmwf_era5_17010121.nc', 'ecmwf_era5_17010117.nc', 'ecmwf_era5_17010113.nc', 'ecmwf_era5_17010201.nc', 'ecmwf_era5_17010120.nc', 'ecmwf_era5_17010205.nc', 'ecmwf_era5_17010118.nc', 'ecmwf_era5_17010106.nc', 'ecmwf_era5_17010103.nc', 'ecmwf_era5_17010119.nc', 'ecmwf_era5_17010116.nc', 'ecmwf_era5_17010122.nc', 'ecmwf_era5_17010105.nc', 'ecmwf_era5_17010110.nc', 'ecmwf_era5_17010112.nc', 'ecmwf_era5_17010203.nc', 'ecmwf_era5_17010114.nc', 'ecmwf_era5_17010108.nc', 'ecmwf_era5_17010100.nc', 'ecmwf_era5_17010102.nc', 'ecmwf_era5_17010111.nc', 'ecmwf_era5_17010123.nc', 'ecmwf_era5_17010104.nc', 'ecmwf_era5_17010101.nc', 'ecmwf_era5_17010202.nc'] +2020-06-03 16:20:09,126:INFO:0 +2020-06-03 16:20:09,127:INFO:27 +2020-06-03 16:20:09,127:INFO:{1: 'ecmwf_era5_17010115.nc;ecmwf_era5_17010116.nc;ecmwf_era5_17010104.nc', 2: 'ecmwf_era5_17010204.nc;ecmwf_era5_17010122.nc;ecmwf_era5_17010101.nc', 3: 'ecmwf_era5_17010121.nc;ecmwf_era5_17010105.nc;ecmwf_era5_17010202.nc', 4: 'ecmwf_era5_17010117.nc;ecmwf_era5_17010110.nc', 5: 'ecmwf_era5_17010113.nc;ecmwf_era5_17010112.nc', 6: 'ecmwf_era5_17010201.nc;ecmwf_era5_17010203.nc', 7: 'ecmwf_era5_17010120.nc;ecmwf_era5_17010114.nc', 8: 'ecmwf_era5_17010205.nc;ecmwf_era5_17010108.nc', 9: 'ecmwf_era5_17010118.nc;ecmwf_era5_17010100.nc', 10: 'ecmwf_era5_17010106.nc;ecmwf_era5_17010102.nc', 11: 'ecmwf_era5_17010103.nc;ecmwf_era5_17010111.nc', 12: 'ecmwf_era5_17010119.nc;ecmwf_era5_17010123.nc'} +2020-06-03 16:20:09,127:INFO:{1: 'ecmwf_era5_17010115.nc;ecmwf_era5_17010116.nc;ecmwf_era5_17010104.nc', 2: 'ecmwf_era5_17010204.nc;ecmwf_era5_17010122.nc;ecmwf_era5_17010101.nc', 3: 'ecmwf_era5_17010121.nc;ecmwf_era5_17010105.nc;ecmwf_era5_17010202.nc', 4: 'ecmwf_era5_17010117.nc;ecmwf_era5_17010110.nc', 5: 'ecmwf_era5_17010113.nc;ecmwf_era5_17010112.nc', 6: 'ecmwf_era5_17010201.nc;ecmwf_era5_17010203.nc', 7: 'ecmwf_era5_17010120.nc;ecmwf_era5_17010114.nc', 8: 'ecmwf_era5_17010205.nc;ecmwf_era5_17010108.nc', 9: 'ecmwf_era5_17010118.nc;ecmwf_era5_17010100.nc', 10: 'ecmwf_era5_17010106.nc;ecmwf_era5_17010102.nc', 11: 'ecmwf_era5_17010103.nc;ecmwf_era5_17010111.nc', 12: 'ecmwf_era5_17010119.nc;ecmwf_era5_17010123.nc'} +2020-06-03 16:20:09,127:INFO:MA0: Next to be processed is 02 loacted in //home/a.mozaffari/data_era5/2017/02 +2020-06-03 16:20:09,127:INFO:=== Directory Scanner output === +2020-06-03 16:20:09,128:INFO:Total size of the source directory is:6953.662487Gb. +2020-06-03 16:20:09,128:INFO:Total number of the files in the source directory is: 34 +2020-06-03 16:20:09,128:INFO:Total number of the directories in the source directory is: 0 +2020-06-03 16:20:09,128:INFO:The follwoing is in the load Balancer +2020-06-03 16:20:09,128:INFO:{1: None, 2: None, 3: None, 4: None, 5: None, 6: None, 7: None, 8: None, 9: None, 10: None, 11: None, 12: None} +2020-06-03 16:20:09,128:INFO:['ecmwf_era5_17020206.nc', 'ecmwf_era5_17020201.nc', 'ecmwf_era5_17020111.nc', 'ecmwf_era5_17020114.nc', 'ecmwf_era5_17020108.nc', 'ecmwf_era5_17020104.nc', 'ecmwf_era5_17020200.nc', 'ecmwf_era5_17020123.nc', 'ecmwf_era5_17020105.nc', 'ecmwf_era5_17020116.nc', 'ecmwf_era5_17020204.nc', 'ecmwf_era5_17020113.nc', 'ecmwf_era5_17020205.nc', 'ecmwf_era5_17020101.nc', 'ecmwf_era5_17020115.nc', 'ecmwf_era5_17020121.nc', 'ecmwf_era5_17020112.nc', 'ecmwf_era5_17020119.nc', 'ecmwf_era5_17020110.nc', 'ecmwf_era5_17020203.nc', 'ecmwf_era5_17020109.nc', 'ecmwf_era5_17020117.nc', 'ecmwf_era5_17020207.nc', 'ecmwf_era5_17020209.nc', 'ecmwf_era5_17020120.nc', 'ecmwf_era5_17020100.nc', 'ecmwf_era5_17020118.nc', 'ecmwf_era5_17020208.nc', 'ecmwf_era5_17020103.nc', 'ecmwf_era5_17020202.nc', 'ecmwf_era5_17020107.nc', 'ecmwf_era5_17020106.nc', 'ecmwf_era5_17020122.nc', 'ecmwf_era5_17020102.nc'] +2020-06-03 16:20:09,128:INFO:0 +2020-06-03 16:20:09,128:INFO:34 +2020-06-03 16:20:09,128:INFO:{1: 'ecmwf_era5_17020206.nc;ecmwf_era5_17020205.nc;ecmwf_era5_17020120.nc', 2: 'ecmwf_era5_17020201.nc;ecmwf_era5_17020101.nc;ecmwf_era5_17020100.nc', 3: 'ecmwf_era5_17020111.nc;ecmwf_era5_17020115.nc;ecmwf_era5_17020118.nc', 4: 'ecmwf_era5_17020114.nc;ecmwf_era5_17020121.nc;ecmwf_era5_17020208.nc', 5: 'ecmwf_era5_17020108.nc;ecmwf_era5_17020112.nc;ecmwf_era5_17020103.nc', 6: 'ecmwf_era5_17020104.nc;ecmwf_era5_17020119.nc;ecmwf_era5_17020202.nc', 7: 'ecmwf_era5_17020200.nc;ecmwf_era5_17020110.nc;ecmwf_era5_17020107.nc', 8: 'ecmwf_era5_17020123.nc;ecmwf_era5_17020203.nc;ecmwf_era5_17020106.nc', 9: 'ecmwf_era5_17020105.nc;ecmwf_era5_17020109.nc;ecmwf_era5_17020122.nc', 10: 'ecmwf_era5_17020116.nc;ecmwf_era5_17020117.nc;ecmwf_era5_17020102.nc', 11: 'ecmwf_era5_17020204.nc;ecmwf_era5_17020207.nc', 12: 'ecmwf_era5_17020113.nc;ecmwf_era5_17020209.nc'} +2020-06-03 16:20:09,128:INFO:{1: 'ecmwf_era5_17020206.nc;ecmwf_era5_17020205.nc;ecmwf_era5_17020120.nc', 2: 'ecmwf_era5_17020201.nc;ecmwf_era5_17020101.nc;ecmwf_era5_17020100.nc', 3: 'ecmwf_era5_17020111.nc;ecmwf_era5_17020115.nc;ecmwf_era5_17020118.nc', 4: 'ecmwf_era5_17020114.nc;ecmwf_era5_17020121.nc;ecmwf_era5_17020208.nc', 5: 'ecmwf_era5_17020108.nc;ecmwf_era5_17020112.nc;ecmwf_era5_17020103.nc', 6: 'ecmwf_era5_17020104.nc;ecmwf_era5_17020119.nc;ecmwf_era5_17020202.nc', 7: 'ecmwf_era5_17020200.nc;ecmwf_era5_17020110.nc;ecmwf_era5_17020107.nc', 8: 'ecmwf_era5_17020123.nc;ecmwf_era5_17020203.nc;ecmwf_era5_17020106.nc', 9: 'ecmwf_era5_17020105.nc;ecmwf_era5_17020109.nc;ecmwf_era5_17020122.nc', 10: 'ecmwf_era5_17020116.nc;ecmwf_era5_17020117.nc;ecmwf_era5_17020102.nc', 11: 'ecmwf_era5_17020204.nc;ecmwf_era5_17020207.nc', 12: 'ecmwf_era5_17020113.nc;ecmwf_era5_17020209.nc'} +2020-06-03 16:20:09,129:INFO:MA0: Next to be processed is 03 loacted in //home/a.mozaffari/data_era5/2017/03 +2020-06-03 16:20:09,129:INFO:=== Directory Scanner output === +2020-06-03 16:20:09,129:INFO:Total size of the source directory is:6953.662487Gb. +2020-06-03 16:20:09,129:INFO:Total number of the files in the source directory is: 34 +2020-06-03 16:20:09,129:INFO:Total number of the directories in the source directory is: 0 +2020-06-03 16:20:09,129:INFO:The follwoing is in the load Balancer +2020-06-03 16:20:09,129:INFO:{1: None, 2: None, 3: None, 4: None, 5: None, 6: None, 7: None, 8: None, 9: None, 10: None, 11: None, 12: None} +2020-06-03 16:20:09,129:INFO:['ecmwf_era5_17020206.nc', 'ecmwf_era5_17020201.nc', 'ecmwf_era5_17020111.nc', 'ecmwf_era5_17020114.nc', 'ecmwf_era5_17020108.nc', 'ecmwf_era5_17020104.nc', 'ecmwf_era5_17020200.nc', 'ecmwf_era5_17020123.nc', 'ecmwf_era5_17020105.nc', 'ecmwf_era5_17020116.nc', 'ecmwf_era5_17020204.nc', 'ecmwf_era5_17020113.nc', 'ecmwf_era5_17020205.nc', 'ecmwf_era5_17020101.nc', 'ecmwf_era5_17020115.nc', 'ecmwf_era5_17020121.nc', 'ecmwf_era5_17020112.nc', 'ecmwf_era5_17020119.nc', 'ecmwf_era5_17020110.nc', 'ecmwf_era5_17020203.nc', 'ecmwf_era5_17020109.nc', 'ecmwf_era5_17020117.nc', 'ecmwf_era5_17020207.nc', 'ecmwf_era5_17020209.nc', 'ecmwf_era5_17020120.nc', 'ecmwf_era5_17020100.nc', 'ecmwf_era5_17020118.nc', 'ecmwf_era5_17020208.nc', 'ecmwf_era5_17020103.nc', 'ecmwf_era5_17020202.nc', 'ecmwf_era5_17020107.nc', 'ecmwf_era5_17020106.nc', 'ecmwf_era5_17020122.nc', 'ecmwf_era5_17020102.nc'] +2020-06-03 16:20:09,130:INFO:0 +2020-06-03 16:20:09,130:INFO:34 +2020-06-03 16:20:09,130:INFO:{1: 'ecmwf_era5_17020206.nc;ecmwf_era5_17020205.nc;ecmwf_era5_17020120.nc', 2: 'ecmwf_era5_17020201.nc;ecmwf_era5_17020101.nc;ecmwf_era5_17020100.nc', 3: 'ecmwf_era5_17020111.nc;ecmwf_era5_17020115.nc;ecmwf_era5_17020118.nc', 4: 'ecmwf_era5_17020114.nc;ecmwf_era5_17020121.nc;ecmwf_era5_17020208.nc', 5: 'ecmwf_era5_17020108.nc;ecmwf_era5_17020112.nc;ecmwf_era5_17020103.nc', 6: 'ecmwf_era5_17020104.nc;ecmwf_era5_17020119.nc;ecmwf_era5_17020202.nc', 7: 'ecmwf_era5_17020200.nc;ecmwf_era5_17020110.nc;ecmwf_era5_17020107.nc', 8: 'ecmwf_era5_17020123.nc;ecmwf_era5_17020203.nc;ecmwf_era5_17020106.nc', 9: 'ecmwf_era5_17020105.nc;ecmwf_era5_17020109.nc;ecmwf_era5_17020122.nc', 10: 'ecmwf_era5_17020116.nc;ecmwf_era5_17020117.nc;ecmwf_era5_17020102.nc', 11: 'ecmwf_era5_17020204.nc;ecmwf_era5_17020207.nc', 12: 'ecmwf_era5_17020113.nc;ecmwf_era5_17020209.nc'} +2020-06-03 16:20:09,130:INFO:{1: 'ecmwf_era5_17020206.nc;ecmwf_era5_17020205.nc;ecmwf_era5_17020120.nc', 2: 'ecmwf_era5_17020201.nc;ecmwf_era5_17020101.nc;ecmwf_era5_17020100.nc', 3: 'ecmwf_era5_17020111.nc;ecmwf_era5_17020115.nc;ecmwf_era5_17020118.nc', 4: 'ecmwf_era5_17020114.nc;ecmwf_era5_17020121.nc;ecmwf_era5_17020208.nc', 5: 'ecmwf_era5_17020108.nc;ecmwf_era5_17020112.nc;ecmwf_era5_17020103.nc', 6: 'ecmwf_era5_17020104.nc;ecmwf_era5_17020119.nc;ecmwf_era5_17020202.nc', 7: 'ecmwf_era5_17020200.nc;ecmwf_era5_17020110.nc;ecmwf_era5_17020107.nc', 8: 'ecmwf_era5_17020123.nc;ecmwf_era5_17020203.nc;ecmwf_era5_17020106.nc', 9: 'ecmwf_era5_17020105.nc;ecmwf_era5_17020109.nc;ecmwf_era5_17020122.nc', 10: 'ecmwf_era5_17020116.nc;ecmwf_era5_17020117.nc;ecmwf_era5_17020102.nc', 11: 'ecmwf_era5_17020204.nc;ecmwf_era5_17020207.nc', 12: 'ecmwf_era5_17020113.nc;ecmwf_era5_17020209.nc'} +2020-06-03 16:21:20,780:INFO:MA0: S7: is finished the 01 . +2020-06-03 16:21:20,781:INFO:MA0: S10: is finished the 01 . +2020-06-03 16:21:21,017:INFO:MA0: S4: is finished the 01 . +2020-06-03 16:21:21,084:INFO:MA0: S9: is finished the 01 . +2020-06-03 16:21:21,088:INFO:MA0: S6: is finished the 01 . +2020-06-03 16:21:21,102:INFO:MA0: S5: is finished the 01 . +2020-06-03 16:21:21,104:INFO:MA0: S11: is finished the 01 . +2020-06-03 16:21:21,230:INFO:MA0: S12: is finished the 01 . +2020-06-03 16:21:21,410:INFO:MA0: S8: is finished the 01 . +2020-06-03 16:21:54,059:INFO:MA0: S3: is finished the 01 . +2020-06-03 16:21:54,095:INFO:MA0: S2: is finished the 01 . +2020-06-03 16:21:54,256:INFO:MA0: S1: is finished the 01 . +2020-06-03 16:22:26,617:INFO:MA0: S12: is finished the 02 . +2020-06-03 16:22:26,747:INFO:MA0: S11: is finished the 02 . +2020-06-03 16:22:58,655:INFO:MA0: S10: is finished the 02 . +2020-06-03 16:22:59,255:INFO:MA0: S6: is finished the 02 . +2020-06-03 16:22:59,522:INFO:MA0: S4: is finished the 02 . +2020-06-03 16:22:59,585:INFO:MA0: S7: is finished the 02 . +2020-06-03 16:22:59,591:INFO:MA0: S8: is finished the 02 . +2020-06-03 16:22:59,722:INFO:MA0: S9: is finished the 02 . +2020-06-03 16:22:59,780:INFO:MA0: S5: is finished the 02 . +2020-06-03 16:23:32,178:INFO:MA0: S2: is finished the 02 . +2020-06-03 16:23:32,470:INFO:MA0: S12: is finished the 03 . +2020-06-03 16:23:32,507:INFO:MA0: S3: is finished the 02 . +2020-06-03 16:23:32,743:INFO:MA0: S11: is finished the 03 . +2020-06-03 16:23:33,204:INFO:MA0: S1: is finished the 02 . +2020-06-03 16:24:33,815:INFO:MA0: S10: is finished the 03 . +2020-06-03 16:24:34,418:INFO:MA0: S6: is finished the 03 . +2020-06-03 16:24:35,163:INFO:MA0: S8: is finished the 03 . +2020-06-03 16:24:35,181:INFO:MA0: S4: is finished the 03 . +2020-06-03 16:24:35,525:INFO:MA0: S5: is finished the 03 . +2020-06-03 16:24:35,625:INFO:MA0: S9: is finished the 03 . +2020-06-03 16:24:35,743:INFO:MA0: S7: is finished the 03 . +2020-06-03 16:25:05,500:INFO:MA0: S2: is finished the 03 . +2020-06-03 16:25:06,343:INFO:MA0: S3: is finished the 03 . +2020-06-03 16:25:06,744:INFO:MA0: S1: is finished the 03 . +2020-06-03 16:25:06,745:INFO:MA0: Sucssfully terminated with total time : 297.6529154777527 diff --git a/DataExtraction/concurrency_test/stager.log b/DataExtraction/concurrency_test/stager.log new file mode 100644 index 0000000000000000000000000000000000000000..0f04f86e0f8a230746d5b95a1ea0fb97959a1e2c --- /dev/null +++ b/DataExtraction/concurrency_test/stager.log @@ -0,0 +1,9 @@ +2020-06-03 16:54:21,422:DEBUG: === PyStager is started === +2020-06-03 16:54:21,437:WARNING:('Node', '4', 'is idle') +2020-06-03 16:54:21,437:WARNING:('Node', '5', 'is idle') +2020-06-03 17:07:59,395:INFO:('Node:', '1', 'finished :', 'rsync -r //home/a.mozaffari/data_era5/2017/01/ /home/a.mozaffari/data_dest/01', '\r\n') +2020-06-03 17:11:27,336:INFO:('Node:', '3', 'finished :', 'rsync -r //home/a.mozaffari/data_era5/2017/03/ /home/a.mozaffari/data_dest/03', '\r\n') +2020-06-03 17:11:30,417:INFO:('Node:', '2', 'finished :', 'rsync -r //home/a.mozaffari/data_era5/2017/02/ /home/a.mozaffari/data_dest/02', '\r\n') +2020-06-03 17:11:30,417:DEBUG:1028.9953315258026 +2020-06-03 17:11:30,418:INFO:== PyStager is done == +2020-06-03 17:11:30,418:INFO:exit status : 0 diff --git a/DataExtraction/helper_single_master.py b/DataExtraction/helper_single_master.py new file mode 100644 index 0000000000000000000000000000000000000000..a26d76395cca0ecb6d49ba53684e72f5d5d7f5b0 --- /dev/null +++ b/DataExtraction/helper_single_master.py @@ -0,0 +1,245 @@ +from mpi4py import MPI +from os import walk +import os +import sys +import subprocess +import logging +import time +import hashlib +import argparse +from os import listdir +from os.path import isfile, join + +# ini. MPI +comm = MPI.COMM_WORLD +my_rank = comm.Get_rank() # rank of the node +p = comm.Get_size() # number of assigned nods +my_rank = comm.Get_rank() # rank of the node + + +# ======================= List of functions ====================================== # +if my_rank == 0: # node is master + + logger = logging.getLogger(__file__) + logger.addHandler(logging.StreamHandler(sys.stdout)) + + +def directory_scanner(source_path,load_level): + # Take a look inside a directories and make a list of ll the folders, sub directories, number of the files and size + # NOTE : It will neglect if there is a sub-directories inside directories!!! + # NOTE : It will discriminate between the load level : sub-directories / Files + + dir_detail_list = [] # directories details + list_items_to_process = [] + total_size_source = 0 + total_num_files = 0 + list_directories = [] + + ## =================== Here will be for the Files ================= ## + + if load_level == 1: + + # Listing all the files in the directory + for dirpath, dirnames, filenames in os.walk(source_path): + list_items_to_process.extend(filenames) + + for f in list_items_to_process : + path = source_path +"/"+ str(f) + statinfo = os.stat(path) + size = statinfo.st_size + total_size_source = total_size_source + int(size) + + total_num_files = len(list_items_to_process) # number of the files in the source + total_num_directories = int(0) # TODO need to unify the concept as the number of items + + ## ===================== Here will be for the directories ========== ## + + if load_level == 0: + list_directories = os.listdir(source_path) + + for d in list_directories: + path = source_path + d + if os.path.isdir(path): + list_items_to_process.append(d) + list_items_to_process.sort() + num_files = 0 + # size of the files and subdirectories + size_dir = subprocess.check_output(['du', '-sc', path]) + splitted = size_dir.split() # fist item is the size of the folder + size = (splitted[0]) + num_files = len([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]) + dir_detail_list.extend([d, size, num_files]) + total_num_files = total_num_files + int(num_files) + total_size_source = total_size_source + int(size) + + else: + message = path,'does not exist' + logging.error(message) + + + total_num_directories = int(len(list_directories)) + + ## ======================= End of the Directory case =================== ## + total_size_source = float(total_size_source / 1000000) # human readable size source + + logger.info("=== Directory Scanner output ===") + message = 'Total size of the source directory is:' + str(total_size_source) + 'Gb.' + logger.info(message) + message = "Total number of the files in the source directory is: " + str(total_num_files) + logger.info(message) + message = "Total number of the directories in the source directory is: " + str(total_num_directories) + logger.info(message) + + # Unifying the naming of this section for both cases : Sub - Directory or File + # dir_detail_list == > Including the name of the directories, size and number of teh files in each directory / for files is empty + # list_items_to_process === > List of items to process (Sub-Directories / Files) + # total_size_source === > Total size of the items to process + # total_num_files === > for Sub - Directories : sum of all files in different directories / for Files is sum of all + # total_num_directories === > for Files = 0 + + return dir_detail_list, list_items_to_process, total_size_source, total_num_files, total_num_directories + +# Source - Directoy +# Destination Rirectory +# Dir_detail_list +# list_items_to_process +# load level + +def data_structure_builder (source_dir, destination_dir, dir_detail_list, list_items_to_process,load_level): + + + if not os.path.exists(destination_dir): # check if the Destination dir. is existing + os_command = ("mkdir " + destination_dir) + os.system(os_command) + logger.info('destination path is created') + else: + logger.info('The destination path exists') + + + os.chdir(destination_dir) # chnage the directory to the destination + + if load_level == 0: + logging.info('Load Level = 0 : Data Sctructure will be build') + + for dir_name in list_items_to_process: + #print(dir_name) + dir_path = destination_dir + dir_name + + # TODO : os.mkdir() it can be cleaned up to use the OS predifnie functions + if not os.path.exists(dir_path): + #print(dir_name + " will be created ") + os_command = ("mkdir " + dir_name) + os.system(os_command) + logging.info(dir_name + " is created ") + + + if load_level == 1: + logger.info('Load Level = 1 : File will be processed') + + return + + + +def load_distributor(dir_detail_list, list_items_to_process, total_size_source, total_num_files, total_num_directories,load_level, processor_num): + firs_slave_processor_id = 1 + # create a dictionary with p number of keys + # for each directory they add the name to one of the keys + # here we define the first availabe slave node as well + transfer_dict = dict.fromkeys(list(range(firs_slave_processor_id, processor_num))) + print(transfer_dict) + logger.info("The follwoing is in the load Balancer ") + logger.info(transfer_dict) + logger.info(list_items_to_process) + logger.info(total_num_directories) + logger.info(total_num_files) + + # package_counter = 0 possibility to use the counter to fill + counter = firs_slave_processor_id # this is the ID of the first available slave to p! + + if load_level == 0: + for Directory_counter in range(0, total_num_directories): + if transfer_dict[counter] is None: # if the value for the key is None add to it + transfer_dict[counter] = list_items_to_process[Directory_counter] + else: # if key has a value join the new value to the old value + transfer_dict[counter] = "{};{}".format(transfer_dict[counter], list_items_to_process[Directory_counter]) + counter = counter + 1 + if counter == processor_num: + counter = firs_slave_processor_id + + if load_level == 1: + for File_counter in range(0, total_num_files): + if transfer_dict[counter] is None: # if the value for the key is None add to it + #print(" M1: New key made for a free processor number {my_rank}".format(my_rank = counter)) + # statemnet if we have more than number of the files processor available + if counter > len(list_items_to_process) + (firs_slave_processor_id - 1 ): + transfer_dict[counter] = None + else: + transfer_dict[counter] = list_items_to_process[File_counter] + + + + else: # if key has a value join the new value to the old value + transfer_dict[counter] = "{};{}".format(transfer_dict[counter], list_items_to_process[File_counter]) + counter = counter + 1 + if counter == processor_num: + counter = firs_slave_processor_id + + logging.info(transfer_dict) + return transfer_dict + +def sync_file(source_path, destination_dir, job_name, rsync_status): + rsync_msg = ("rsync -r " + source_path + job_name + "/" + " " + destination_dir + "/" + job_name) + # print('Node:', str(my_rank),'will execute :', rsync_str,'\r\n') + # sync the assigned folder + + if rsync_status == 1: + os.system(rsync_msg) + + return + + + +def hash_directory(source_path,job_name,hash_rep_file,input_status): + #sha256_hash = hashlib.sha256() + md5_hash = hashlib.md5() + + ########## Create a hashed file repasitory for direcotry(ies) assigned to node ####### + hash_repo_text = input_status + "_"+job_name +"_hashed.txt" + os.chdir(hash_rep_file) + hashed_text_note=open(hash_repo_text,"w+") + + # job_name is the name of the subdirectory that is going to be processed + directory_to_process = source_path + job_name + # print(directory_to_process) + files_list = [] + for dirpath, dirnames, filenames in os.walk(directory_to_process): + files_list.extend(filenames) + + os.chdir(directory_to_process) # change to the working directory + + for file_to_process in filenames: + + ## ======= this is the sha256 checksum ========= # + #with open(file_to_process,"rb") as f: + # # Read and update hash in chunks of 4K + # for byte_block in iter(lambda: f.read(4096),b""): + # sha256_hash.update(byte_block) + # hashed_file = sha256_hash.hexdigest() + + with open(file_to_process,"rb") as f: + # Read and update hash in chunks of 4K + for byte_block in iter(lambda: f.read(4096),b""): + md5_hash.update(byte_block) + hashed_file = md5_hash.hexdigest() + + hashed_text_note.write(hashed_file) + + return + +def md5(fname): + md5_hash = hashlib.md5() + with open(fname,"rb") as f: + # Read and update hash in chunks of 4K + for byte_block in iter(lambda: f.read(4096),b""): + md5_hash.update(byte_block) + return md5_hash.hexdigest() diff --git a/DataExtraction/main_single_master.py b/DataExtraction/main_single_master.py new file mode 100644 index 0000000000000000000000000000000000000000..ab21aef815857af463042dbe404534be79805bd7 --- /dev/null +++ b/DataExtraction/main_single_master.py @@ -0,0 +1,236 @@ +from mpi4py import MPI +from os import walk +import sys +import subprocess +import logging +import time +import shutil +import glob +import argparse +import os + + +from helper_single_master import directory_scanner +from helper_single_master import load_distributor +from helper_single_master import hash_directory +from helper_single_master import data_structure_builder +from helper_single_master import md5 + +from prepare_era5_data import prepare_era5_data_one_file + +# How to Run it! +# mpirun -np 6 python mpi_stager_v2.py +# mpiexec -np 6 python mpi_stager_v2.py + + +def main(): + parser=argparse.ArgumentParser() + parser.add_argument("--job_id",type=int,default=100) + parser.add_argument("--source_dir",type=str,default="//home/a.mozaffari/data_era5/2017/") + parser.add_argument("--destination_dir",type=str,default="/home/a.mozaffari/data_dest/") + parser.add_argument("--log_temp",type=str,default="log_temp") + parser.add_argument("--checksum_status",type=int,default = 0) + parser.add_argument("--rsync_status",type=int,default=0) + parser.add_argument("--load_level",type=int,default=0) + parser.add_argument("--clear_destination",type=int,default=1) + args = parser.parse_args() + # for the local machine test + current_path = os.getcwd() + job_id = args.job_id + source_dir = args.source_dir + destination_dir = args.destination_dir + checksum_status = args.checksum_status + rsync_status = args.rsync_status + clear_destination = args.clear_destination + log_temp = args.log_temp + + + # for the local machine test + current_path = os.path.dirname(os.path.abspath(__file__)) + os.chdir(current_path) + time.sleep(0) + +# ini. MPI + comm = MPI.COMM_WORLD + my_rank = comm.Get_rank() # rank of the node + p = comm.Get_size() # number of assigned nods + firs_slave_processor_id = 1 + + + # ==================================== Master Logging ==================================================== # + # DEBUG: Detailed information, typically of interest only when diagnosing problems. + # INFO: Confirmation that things are working as expected. + # WARNING: An indication that something unexpected happened, or indicative of some problem in the near + # ERROR: Due to a more serious problem, the software has not been able to perform some function. + # CRITICAL: A serious error, indicating that the program itself may be unable to continue running. + # It will copy the logging messages to the stdout, for the case of container version on HPC + + if my_rank == 0: # node is master + + # delete the general logger if exist + logger_path = current_path + '/distribution_job_{job_id}.log'.format(job_id=job_id) + if os.path.isfile(logger_path): + print("Logger Exists -> Logger Deleted") + os.remove(logger_path) + logging.basicConfig(filename='distribution_job_{job_id}.log'.format(job_id=job_id), level=logging.DEBUG, + format='%(asctime)s:%(levelname)s:%(message)s') + logger = logging.getLogger(__file__) + logger.addHandler(logging.StreamHandler(sys.stdout)) + start = time.time() # start of the MPI + +# check the existence of the source path : + if not os.path.exists(source_dir): # check if the source dir. is existing + if my_rank == 0: + logger.critical('The source does not exist') + message_out = "Source : {source} is not existing -> Abort".format(source=source_dir) + logger.info('exit status : 1') + sys.exit(1) + +# Check if the destination is existing, if so, it will delete and recreate the destination_dir + if os.path.exists(destination_dir): + if my_rank == 0: + logger.info('The destination exist') + if clear_destination == 1: + shutil.rmtree(destination_dir) + os.mkdir(destination_dir) + logger.critical("Destination : {destination} exist -> Remove and Re-Cereate".format(destination=destination_dir)) + print("Destination : {destination} exist -> Remove and Re-Cereate".format(destination=destination_dir)) + + else: + logger.critical("Destination : {destination} exist -> will not be removed (caution : overwrite)".format(destination=destination_dir)) + print("Destination : {destination} exist -> will not be rmeoved (caution : overwrite)".format(destination=destination_dir)) + + # Create a log folder for slave-nodes to write down their processes + slave_log_path = os.path.join(destination_dir,log_temp) + + if my_rank == 0: + if os.path.exists(slave_log_path) == False: + os.mkdir(slave_log_path) + + if my_rank == 0: # node is master + + # ==================================== Master : Directory scanner {Parent level load level = 0} ================================= # + + logger.info("The source path is : {path}".format(path=source_dir)) + logger.info("The destination path is : {path}".format(path=destination_dir)) + logger.info("==== Directory scanner : start ====") + load_level = 0 + ret_dir_scanner = directory_scanner(source_dir,load_level) + #print(ret_dir_scanner) + + # Unifying the naming of this section for both cases : Sub - Directory or File + # dir_detail_list == > Including the name of the directories, size and number of teh files in each directory / for files is empty + # list_items_to_process === > List of items to process (Sub-Directories / Files) + # total_size_source === > Total size of the items to process + # total_num_files === > for Sub - Directories : sum of all files in different directories / for Files is sum of all + # total_num_directories === > for Files = 0 + + dir_detail_list = ret_dir_scanner[0] + list_items_to_process = ret_dir_scanner[1] + total_size_source = ret_dir_scanner[2] + total_num_files = ret_dir_scanner[3] + total_num_dir = ret_dir_scanner[4] + logger.info("==== Directory scanner : end ====") + + # ================================= Master : Data Structure Builder {Parent level load level = 0} ========================= # + + logger.info("==== Data Structure Builder : start ====") + data_structure_builder(source_dir, destination_dir, dir_detail_list, list_items_to_process,load_level) + logger.info("==== Data Structure Builder : end ====") + # message to inform the slaves that they will recive #Batch of messages including the logger_p + batch_info = list_items_to_process + for slaves in range (1,p): + comm.send(batch_info, dest=slaves) + + for batch_counter in range (0,len(batch_info)): + #relative_source = source_dir + str(batch_info[batch_counter]) +"/" + relative_source = os.path.join(source_dir,str(batch_info[batch_counter])) + print(relative_source) + logger.info("MA{my_rank}: Next to be processed is {task} loacted in {path} ".format(my_rank = my_rank,task=batch_info[batch_counter], path=relative_source)) + load_level = 1 # it will process the files in the relative source + + #________ Directory Scanner ______# + relative_ret_dir_scanner = directory_scanner(relative_source,load_level) + relative_dir_detail_list = relative_ret_dir_scanner[0] + relative_list_items_to_process = relative_ret_dir_scanner[1] + relative_total_size_source = relative_ret_dir_scanner[2] + relative_total_num_files = relative_ret_dir_scanner[3] + relative_total_num_dir = relative_ret_dir_scanner[4] + #________ Load Distribution ________# + relative_ret_load_balancer = load_distributor(relative_dir_detail_list, relative_list_items_to_process, relative_total_size_source, relative_total_num_files, relative_total_num_dir,load_level, p) + relative_transfer_dict = relative_ret_load_balancer + logger.info(relative_transfer_dict) + + #________ Communication ________# + + for processor in range(firs_slave_processor_id, p): + broadcast_list = relative_transfer_dict[processor] + comm.send(broadcast_list, dest=processor) + + receive_counter = 0 + total_number_messages = (p-1) * len(batch_info) - 1 + while receive_counter <= total_number_messages: + message_in = comm.recv() + logger.info("MA{my_rank}: S{message_in} ".format(my_rank=my_rank,message_in=message_in)) + receive_counter = receive_counter + 1 + + end = time.time() + termination_message = "MA{my_rank}: Sucssfully terminated with total time : {wall_time}".format(my_rank=my_rank,wall_time= end-start) + logger.info(termination_message) + sys.exit(0) + + else: # Processor is slave + + # ============================================= Slave : Send / Receive ============================================ # + # recive the #Batch process that will be recived + batch_info = comm.recv(source = 0) + #print("S{my_rank} will receive {todo_message} batch of task to process".format(my_rank=my_rank, todo_message=len(batch_info))) + batch_counter = 0 + + # here will be a loop around all the #batchs + + while batch_counter <= len(batch_info) -1: + message_in = comm.recv(source = 0) + relative_source_directory = os.path.join(source_dir,str(batch_info[batch_counter])) + relative_destination_directory = os.path.join(destination_dir,str(batch_info[batch_counter])) + + if message_in is None: # in case more than number of the dir. processor is assigned ! + slave_out_message = "{my_rank} is idle".format(my_rank=my_rank) + # comm.send(message_out, dest=1) + + else: # if the Slave node has joblist to do + job_list = message_in.split(';') + for job_count in range(0, len(job_list)): + job = job_list[job_count] # job is the name of the directory(ies) assigned to slave_node + #print(job) + if rsync_status == 1: + # prepare the rsync commoand to be excexuted by the worker node + rsync_message = "rsync {relative_source_directory}/{job} {relative_destination_directory}/{job}".format(relative_source_directory=relative_source_directory,job=job, relative_destination_directory=relative_destination_directory) + os.system(rsync_message) + #slave_out_message= " RSYNC process" + else : + ## @Bing here is the job for the slaves + print("S{my_rank} will execute era5 preperation on {job}".format(my_rank=my_rank, job=job)) + prepare_era5_data_one_file(src_file=job,directory_to_process=relative_source_directory, target=job, target_dir=relative_destination_directory) + + + + #if job.endswith(".nc"): + # if os.path.exists(os.path.join(relative_destination_directory, job)): + # print("{job} is has been processed in directory {directory}".format(job=job,directory=relative_destination_directory)) + #else: + # prepare_era5_data_one_file(src_file=job,directory_to_process=relative_source_directory, target=job, target_dir=relative_destination_directory) + # print("File {job} in directory {directory} has been processed in directory".format(job=job,directory=relative_destination_directory)) + # + #slave_out_message = " {in_message} process".format(in_message=my_rank) + # Generate a hash of the output + + message_out = "{my_rank}: is finished the {in_message} .".format(my_rank=my_rank,in_message=batch_info[batch_counter]) + comm.send(message_out, dest=0) + batch_counter = batch_counter + 1 + + MPI.Finalize() + + +if __name__ == "__main__": + main() diff --git a/DataExtraction/mpi_stager_v2.py b/DataExtraction/mpi_stager_v2.py old mode 100755 new mode 100644 index 3bb8e8d7a66e66cb6dff6bb67239a584b0ff7f77..632c29567590466264a39b60bdcd9ae0cd820b05 --- a/DataExtraction/mpi_stager_v2.py +++ b/DataExtraction/mpi_stager_v2.py @@ -9,16 +9,16 @@ from external_function import load_distributor from external_function import hash_directory from external_function import md5 from prepare_era5_data import * -# How to Run it! -# mpirun -np 6 python mpi_stager_v2.py +# How to Run it! +# mpirun -np 6 python mpi_stager_v2.py import os from pathlib import Path import argparse def main(): parser=argparse.ArgumentParser() - parser.add_argument("--source_dir",type=str,default="/p/fastdata/slmet/slmet111/met_data/ecmwf/era5/nc/2017/") - parser.add_argument("--destination_dir",type=str,default="/p/scratch/deepacf/bing/extractedData") + parser.add_argument("--source_dir",type=str,default="//home/a.mozaffari/data_era5/2017/") + parser.add_argument("--destination_dir",type=str,default="/home/a.mozaffari/data_dest") parser.add_argument("--checksum_status",type=int,default = 0) parser.add_argument("--rsync_status",type=int,default=1) args = parser.parse_args() diff --git a/DataExtraction/parameters.dat b/DataExtraction/parameters.dat deleted file mode 100755 index c8f2c21ebaa3c3fc147a23b299a72fc381f7fa1f..0000000000000000000000000000000000000000 --- a/DataExtraction/parameters.dat +++ /dev/null @@ -1,14 +0,0 @@ -import os - -# ============ input parameters =================== # -# 0:deactivate 1: active - -Pleas fill the following parameters list for PyStager -Source_Directory = /p/fastdata/slmet/slmet111/met_data/ecmwf/era5/nc/2017/ -Destination_Directory = /p/scratch/cjjsc42/bing/PredNet/extractedData_test1 -Log_Directory = /p/project/cjjsc42/bing/pystager-development/DataExtraction/test1/log -Rsync_Status = 1 -Checksum_Status = 0 - - - diff --git a/DataExtraction/readme.md b/DataExtraction/readme.md new file mode 100644 index 0000000000000000000000000000000000000000..9e97dae81e2f5aa45e3cc676b2b90a1fa318145b --- /dev/null +++ b/DataExtraction/readme.md @@ -0,0 +1,2 @@ +`source create_env_zam347.sh {MPI}` <br/> +`mpirun -np {number of processors max 13 on zam347} python main_single_master.py` \ No newline at end of file diff --git a/DataExtraction/stager.log b/DataExtraction/stager.log deleted file mode 100644 index c27bab5305f3fdee25076ce10c90b878b209f7fc..0000000000000000000000000000000000000000 --- a/DataExtraction/stager.log +++ /dev/null @@ -1,9 +0,0 @@ -2019-11-05 13:55:57,877:DEBUG: === PyStager is started === -2019-11-05 13:55:57,931:CRITICAL:The Destination does not exist -2019-11-05 13:55:57,931:INFO:exit status : 1 -2019-11-05 14:02:21,920:DEBUG: === PyStager is started === -2019-11-05 14:05:54,994:DEBUG: === PyStager is started === -2019-11-05 14:21:39,817:DEBUG: === PyStager is started === -2019-11-05 14:43:46,742:DEBUG: === PyStager is started === -2019-11-05 14:52:44,396:DEBUG: === PyStager is started === -2019-11-05 14:53:14,620:DEBUG: === PyStager is started ===