Skip to content
Snippets Groups Projects
Commit f90dee30 authored by Carsten Hinz's avatar Carsten Hinz
Browse files

added a tool to combine a number of contributor files.

added a decoding of strings to output levels

added a.i. generated tests for new script
parent 3291b37b
No related branches found
No related tags found
1 merge request!11Creation of first beta release version
......@@ -6,6 +6,25 @@ from pathlib import Path
handlerPair = namedtuple("registeredLogger", ["handler", "formatter"])
#dict to decode the log level
levels = {
'critical': logging.CRITICAL,
'error': logging.ERROR,
'warn': logging.WARNING,
'warning': logging.WARNING,
'info': logging.INFO,
'debug': logging.DEBUG
}
def decode_log_level(level: str) -> int:
"""!decode the log level from a string to the values defined in the logging module
The log level is case insensitive.
"""
level_out = levels.get(level.lower())
if level_out is None:
raise ValueError(
f"log level given: {level}"
f" -- must be one of: {' | '.join(levels.keys())}")
return level_out
class toargridding_defaultLogging:
"""! class to setup default loggers for toargridding
......
import pytest
from unittest import mock
import os
import tools.combine_contributor_files as combine_contributor_files
# Mock the os.path.exists function to control its behavior
@mock.patch('os.path.exists', return_value=True)
# Mock the open function to control file reading/writing without touching the file system
@mock.patch('builtins.open', mock.mock_open(read_data="1\n2\n3\n"))
def test_process_input_files(mock_exists):
# Setup
test_args = ["script_name", "--input_files", "file1.txt", "file2.txt", "--output", "output.txt"]
with mock.patch('sys.argv', test_args):
# Execute
combine_contributor_files.main() # Assuming your script has a main function to initiate processing
# Verify
# Check if the output file was written as expected
mock_open = mock.mock_open()
with mock.patch('builtins.open', mock_open):
combine_contribitors_files.main()
mock_open.assert_called_with('output.txt', 'w')
handle = mock_open()
handle.write.assert_called_with("1\n2\n3\n")
def test_file_not_exists():
# Setup
test_args = ["script_name", "--input_files", "nonexistent_file.txt", "--output", "output.txt"]
with mock.patch('sys.argv', test_args), \
mock.patch('os.path.exists', return_value=False), \
mock.patch('logging.Logger.exception') as mock_log_exception:
# Execute
combine_contributor_files.main()
# Verify
mock_log_exception.assert_called_with('The input file nonexistent_file.txt does not exist.')
def test_invalid_timeseries_id():
# Setup
test_args = ["script_name", "--input_files", "file_with_invalid_id.txt", "--output", "output.txt"]
with mock.patch('sys.argv', test_args), \
mock.patch('builtins.open', mock.mock_open(read_data="invalid\n")), \
mock.patch('logging.Logger.exception') as mock_log_exception:
# Execute
combine_contributor_files.main()
# Verify
mock_log_exception.assert_called_with('Invalid timeseries id: invalid. Expecting integer.')
from unittest import mock
import combine_contributors_files
@mock.patch('os.path.exists', return_value=True)
@mock.patch('builtins.open', mock.mock_open())
@mock.patch('combine_contributors_files.toargridding_defaultLogging')
def test_combine_files_with_duplicate_ids(mock_logging_setup):
# Mock input files content
mock_open = mock.mock_open(read_data="1\n2\n3\n")
mock_open.side_effect = [
mock.mock_open(read_data="1\n4\n5\n").return_value, # First file
mock.mock_open(read_data="2\n6\n7\n").return_value, # Second file
mock.mock_open(read_data="3\n1\n8\n").return_value, # Third file, with '1' being the duplicate
]
# Setup arguments to mimic command-line input
test_args = ["script_name", "--input_files", "file1.txt", "file2.txt", "file3.txt", "--output", "output.txt"]
with mock.patch('sys.argv', test_args), mock.patch('builtins.open', mock_open):
combine_contributors_files.main() # Assuming your script has a main function to initiate processing
# Verify that the output file was written correctly
# Expected output: 1, 2, 3, 4, 5, 6, 7, 8 (each number on a new line, sorted)
expected_output = "".join(f"{n}\n" for n in range(1, 9))
mock_open().write.assert_called_once_with(expected_output)
# Additional tests can be written to cover more scenarios, such as testing different logging levels,
# handling of multiple input files, and the behavior when no input files are provided.
\ No newline at end of file
import argparse
import os
import logging
#setup of logging
from toargridding.defaultLogging import toargridding_defaultLogging, decode_log_level
# Define the arguments for your script
parser = argparse.ArgumentParser(description='Combine a number of contributor files provided by toargridding to a single file. This removes duplicates in the timeseries ids.')
parser.add_argument('input_files', metavar='files', type=str, nargs='+',
help='input files')
parser.add_argument('--output', metavar='FILE', type=str, default='combined_contributors.txt',
help='output file (default: combined_contributors.txt)')
parser.add_argument("-log", "--log", default="warning",
help=("Provide logging level. Example --log debug', default='warning'"))
# Parse the arguments
args = parser.parse_args()
logger = logging.getLogger(__name__)
level = decode_log_level(args.log)
logger_setup = toargridding_defaultLogging(__name__)
logger_setup.addShellLogger(level)
logger_setup.logExceptions()
# Print the arguments for verification
logger.debug(f'Input files: {args.input_files}')
logger.debug(f'Output file: {args.output}')
# Here you can add the logic of your script
# that processes the input files and writes the output to the specified file
def main():
if not args.input_files:
parser.error('At least one input file is required.')
# Process the input files and write the output
set_of_timeseries_ids = set()
for input_file in args.input_files:
if not os.path.exists(input_file):
logger.exception(f'The input file {input_file} does not exist.')
with open(input_file, 'r') as input_file:
for line in input_file:
try:
set_of_timeseries_ids.add(int(line.strip()))
except ValueError:
logger.exception(f'Invalid timeseries id: {line.strip()}. Expecting integer.')
with open(args.output, 'w') as output_file:
for timeseries_id in sorted(set_of_timeseries_ids):
output_file.write(f"{timeseries_id}\n")
if __name__ == '__main__':
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment