From ca1c8f33a0442d243efe4952f62ee0ec192c696c Mon Sep 17 00:00:00 2001 From: Jamil Gafur Date: Wed, 3 Jun 2020 13:40:51 -0600 Subject: [PATCH 01/30] added parallel processing for regression suites removed global variables and passed them as local parameters added escape if commands are empty updated manage regession suite for gloabl parameter passing fixed and tested xylar requests, currrently running on the nightly regression suite --- .../compass/manage_regression_suite.py | 493 +++++++++++++----- 1 file changed, 369 insertions(+), 124 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index a64e708305..74710e78ce 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -11,7 +11,8 @@ When cleaning a regression suite, this script will remove any generated files for each individual test case, and the run script that runs all test cases. """ - +# -----import libraries----- +# {{{ from __future__ import absolute_import, division, print_function, \ unicode_literals @@ -22,10 +23,12 @@ import xml.etree.ElementTree as ET import subprocess +# }}} -def process_test_setup(test_tag, config_file, work_dir, model_runtime, - suite_script, baseline_dir, verbose): # {{{ +def process_test_setup(test_tag, config_file, work_dir, model_runtime, + regression_script_code, baseline_dir, verbose): + # {{{ if verbose: stdout = open(work_dir + '/manage_regression_suite.py.out', 'a') stderr = stdout @@ -85,28 +88,28 @@ def process_test_setup(test_tag, config_file, work_dir, model_runtime, subprocess.check_call( ['./setup_testcase.py', '-q', '-f', config_file, '--work_dir', work_dir, '-o', test_core, '-c', test_configuration, - '-r', test_resolution, '-t', test_test, '-m', model_runtime], + '-r', test_resolution, '-t', test_test, '-m', model_runtime], stdout=stdout, stderr=stderr) else: subprocess.check_call( ['./setup_testcase.py', '-q', '-f', config_file, '--work_dir', work_dir, '-o', test_core, '-c', test_configuration, - '-r', test_resolution, '-t', test_test, '-m', model_runtime, + '-r', test_resolution, '-t', test_test, '-m', model_runtime, '-b', baseline_dir], stdout=stdout, stderr=stderr) print(" -- Setup case '{}': -o {} -c {} -r {} -t {}".format( test_name, test_core, test_configuration, test_resolution, test_test)) # Write step into suite script to cd into the base of the regression suite - suite_script.write("os.chdir(base_path)\n") + regression_script_code += "os.chdir(base_path)\n" # Write the step to define the output file - suite_script.write("case_output = open('case_outputs/{}', 'w')\n".format( - case_output_name)) + regression_script_code += "case_output = open('case_outputs/{}', 'w')\n".format( + case_output_name) # Write step to cd into test case directory - suite_script.write("os.chdir('{}/{}/{}/{}')\n".format( - test_core, test_configuration, test_resolution, test_test)) + regression_script_code += "os.chdir('{}/{}/{}/{}')\n".format( + test_core, test_configuration, test_resolution, test_test) for script in test_tag: # Process test case script @@ -124,32 +127,32 @@ def process_test_setup(test_tag, config_file, work_dir, model_runtime, test_resolution, test_test, script_name) command = '{}, stdout=case_output, stderr=case_output)'.format( command) - # Write test case run step - suite_script.write("print(' ** Running case {}')\n".format( - test_name)) - suite_script.write('try:\n') - suite_script.write(' {}\n'.format(command)) - suite_script.write(" print(' PASS')\n") - suite_script.write('except subprocess.CalledProcessError:\n') - suite_script.write(" print(' ** FAIL (See case_outputs/{} " - "for more information)')\n".format( - case_output_name)) - suite_script.write(" test_failed = True\n") + regression_script_code += "print(' ** Running case {}')\n".format( + test_name) + regression_script_code += 'try:\n' + regression_script_code += ' {}\n'.format(command) + regression_script_code += " print(' PASS')\n" + regression_script_code += 'except subprocess.CalledProcessError:\n' + regression_script_code += " print(' ** FAIL (See case_outputs/{} for more information)')\n".format( + case_output_name) + regression_script_code += " test_failed = True\n" # Finish writing test case output - suite_script.write("case_output.close()\n") - suite_script.write("\n") + regression_script_code += "case_output.close()\n" + regression_script_code += "\n" if verbose: stdout.close() else: dev_null.close() -# }}} + return regression_script_code + # }}} -def process_test_clean(test_tag, work_dir): # {{{ - dev_null = open('/dev/null', 'a') +def process_test_clean(test_tag, work_dir): + # {{{ + dev_null = open('/dev/null', 'a') # Process test attributes try: test_name = test_tag.attrib['name'] @@ -193,19 +196,271 @@ def process_test_clean(test_tag, work_dir): # {{{ # Clean test case subprocess.check_call( ['./clean_testcase.py', '-q', '--work_dir', work_dir, '-o', test_core, - '-c', test_configuration, '-r', test_resolution, '-t', test_test], + '-c', test_configuration, '-r', test_resolution, '-t', test_test], stdout=dev_null, stderr=dev_null) print(" -- Cleaned case '{}': -o {} -c {} -r {} -t {}".format( test_name, test_core, test_configuration, test_resolution, test_test)) dev_null.close() + # }}} -# }}} + +def local_parallel_setup_script( + work_dir, suite_tag, verbose, args, suite_root, testcase_data, testcase_data_prereq): + # {{{ + try: + suite_name = suite_tag.attrib['name'] + except KeyError: + print("ERROR: tag is missing 'name' attribute.") + print('Exiting...') + sys.exit(1) + + if not os.path.exists('{}'.format(work_dir)): + os.makedirs('{}'.format(work_dir)) + + if verbose: + # flush existing regression suite output file + open(work_dir + '/manage_regression_suite.py.out', 'w').close() + + regression_script_name = '{}/parallel_{}.py'.format(work_dir, suite_name) + local_parallel_script = open('{}'.format(regression_script_name), 'w') + local_parallel_code = write_regression_local_parallel_top( + work_dir, suite_tag, args.nodes) + + setup_suite( + suite_root, + args.work_dir, + args.model_runtime, + args.config_file, + args.baseline_dir, + args.verbose, + args) + local_parallel_code = write_regression_local_parallel_testcase_data( + local_parallel_code, work_dir, testcase_data, testcase_data_prereq) + local_parallel_code = write_regression_local_parallel_bottom( + local_parallel_code) + + return local_parallel_script, local_parallel_code + # }}} + + +def get_prereq(testcase_data, testcase_data_prereq): + # {{{ + names = [i for i in testcase_data.keys() if i in testcase_data_prereq] + pre = {} + for i in names: + pre[i] = testcase_data[i] + return pre + # }}} + + +def get_testcase_data(testcase_data, testcase_data_prereq): + # {{{ + names = [i for i in testcase_data.keys() if i not in testcase_data_prereq] + testcase_data_values = {} + for i in names: + testcase_data_values[i] = testcase_data[i] + return testcase_data_values + # }}} + + +def write_regression_local_parallel_testcase_data(local_parallel_code, work_dir, testcase_data, testcase_data_prereq): + # {{{ + index = 0 + testcase_data_values = get_testcase_data( testcase_data , testcase_data_prereq) + pre_testcase_data = get_prereq(testcase_data , testcase_data_prereq) + + for key in pre_testcase_data.keys(): + local_parallel_code += "locations.append('" + \ + pre_testcase_data[key]['path'] + "')\n" + local_parallel_code += "commands.append(['time' , '-p' , '" + os.getcwd( + ) + "/" + pre_testcase_data[key]['path'] + "/' + str(run_file_names[{}])])\n".format(str(index)) + local_parallel_code += "procs.append(" + \ + str(pre_testcase_data[key]['procs']) + ")\n" + local_parallel_code += "prereq_commands.append([locations[" + str( + index) + "], commands[" + str(index) + "]])\n\n\n" + index += 1 + + for key in testcase_data_values.keys(): + local_parallel_code += "locations.append('" + \ + testcase_data_values[key]['path'] + "')\n" + local_parallel_code += "commands.append(['time' , '-p' , '" + os.getcwd( + ) + "/" + testcase_data_values[key]['path'] + "/' + str(run_file_names[{}])])\n".format(str(index)) + local_parallel_code += "procs.append(" + \ + str(testcase_data_values[key]['procs']) + ")\n" + local_parallel_code += "allocation_commands.append([locations[" + str( + index) + "], commands[" + str(index) + "]])\n\n\n" + index += 1 + + return local_parallel_code + # }}} + + +def write_regression_local_parallel_bottom(local_parallel_code): + # {{{ + local_parallel_code += "if max(procs) > number_of_procs:\n" + local_parallel_code += " print('more processors are used than allocated')\n" + local_parallel_code += " quit()\n" + local_parallel_code += "start_time = time.time()\n" + local_parallel_code += "def stream_queue(command, number_of_procs):\n" + local_parallel_code += " if command == []:\n" + local_parallel_code += " return \n" + local_parallel_code += " base = os.getcwd()\n" + local_parallel_code += " Queue_running = []\n" + local_parallel_code += " index = 0\n" + local_parallel_code += " continue_add = True\n" + local_parallel_code += " Done = False\n" + local_parallel_code += " while True:\n" + local_parallel_code += " if continue_add and not Done:\n" + local_parallel_code += " if number_of_procs >= procs[index]:\n" + local_parallel_code += " try:\n" + local_parallel_code += " if index < len(command):\n" + local_parallel_code += " print('-----IN ADDING PHASE-----')\n" + local_parallel_code += " print('we have {} number_of_procs, current task uses {} Adding to queue'.format(number_of_procs, procs[index]))\n" + local_parallel_code += " case_output = open('case_outputs/'+command[index][0].replace('/', '_'), 'w+')\n" + local_parallel_code += " print('processing command: {}'.format(command[index][1]))\n" + local_parallel_code += " os.chdir(command[index][0])\n" + local_parallel_code += " open_proc = subprocess.Popen(command[index][1], stdout=case_output, stderr=case_output)\n" + local_parallel_code += " os.chdir(base)\n" + local_parallel_code += " Queue_running.append([open_proc, procs[index],command[index][1]]) \n" + local_parallel_code += " number_of_procs = number_of_procs - procs[index]\n" + local_parallel_code += " print(' New number_of_procs: {}'.format(number_of_procs)) \n" + local_parallel_code += " except subprocess.CalledProcessError: \n" + local_parallel_code += " print('error for : {}'.format(str(command[index][1]))) \n" + local_parallel_code += " elif number_of_procs < procs[index]: \n" + local_parallel_code += " print('NOT ENOUGH WAIT FOR PROCESSES') \n" + local_parallel_code += " continue_add = False \n" + local_parallel_code += " index = index + 1 \n" + local_parallel_code += " if index > len(procs) -1: \n" + local_parallel_code += " print('No more to add moving to processing phase') \n" + local_parallel_code += " continue_add = False \n" + local_parallel_code += " elif not continue_add and not Done: \n" + local_parallel_code += " for background_process in Queue_running: \n" + local_parallel_code += " background_process[0].wait() \n" + local_parallel_code += " pid = background_process[0].pid \n" + local_parallel_code += " if not psutil.pid_exists(pid): \n" + local_parallel_code += " Queue_running.remove(background_process) \n" + local_parallel_code += " print(str(pid) +': compleate') \n" + local_parallel_code += " number_of_procs += background_process[1] \n" + local_parallel_code += " if index < len(procs)-1: \n" + local_parallel_code += " print('STILL HAVE MORE TO PROCESS') \n" + local_parallel_code += " if number_of_procs >= procs[index]: \n" + local_parallel_code += " continue_add = True \n" + local_parallel_code += " else: \n" + local_parallel_code += " if len(Queue_running) == 0: \n" + local_parallel_code += " Done = True \n" + local_parallel_code += " continue_add = False \n" + local_parallel_code += " elif Done and not continue_add: \n" + local_parallel_code += " return \n" + local_parallel_code += " \n" + local_parallel_code += " return\n" + local_parallel_code += "print('--- PROCESSING PREREQS ---')\n" + local_parallel_code += "stream_queue(prereq_commands, number_of_procs)\n" + local_parallel_code += "print('--- PROCESSING commands ---')\n" + local_parallel_code += "stream_queue(allocation_commands, number_of_procs)\n" + local_parallel_code += "end_time = time.time()\n" + local_parallel_code += "print('parallel run time: {} min'.format((end_time - start_time) / 60))\n" + local_parallel_code = write_regression_script_testcase_data_bottom( + local_parallel_code) + + return local_parallel_code + # }}} + + +def write_regression_local_parallel_top(work_dir, suite_tag, nodes): + # {{{ + local_parallel_code = "#!/usr/bin/env python\n\n\n" + local_parallel_code += "import time\n" + local_parallel_code += "import psutil\n" + local_parallel_code += "import sys\n" + local_parallel_code += "import os\n" + local_parallel_code += "import subprocess\n" + local_parallel_code += "import numpy as np\n" + local_parallel_code += "import multiprocessing as mp\n" + local_parallel_code += "out, err = subprocess.Popen(['nproc'],stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()\n" + local_parallel_code += "nodes = {}\n".format(nodes) + local_parallel_code += "number_of_procs = nodes * int(out.split()[0])\n" + local_parallel_code += "print('Number of usable Processors: {}'.format(number_of_procs))\n" + local_parallel_code += "os.environ['PYTHONUNBUFFERED'] = '1'\n" + local_parallel_code += "test_failed=False\n" + local_parallel_code += "if not os.path.exists('case_outputs'):\n" + local_parallel_code += " os.makedirs('case_outputs')\n" + local_parallel_code += "base_path = '" + work_dir + "'\n" + local_parallel_code += "os.chdir(base_path)\n" + local_parallel_code += "locations = []\n" + local_parallel_code += "procs = []\n" + local_parallel_code += "run_file_names = []\n" + local_parallel_code += "prereq_commands = []\n" + local_parallel_code += "allocation_commands = []\n" + local_parallel_code += "commands = []\n\n\n" + + for child in suite_tag: + for script in child: + script_name = script.attrib['name'] + local_parallel_code += "run_file_names.append('{}')\n".format( + script_name) + + local_parallel_code += "\n\n\n" + return local_parallel_code + # }}} + + +def write_regression_script_testcase_data_bottom(regression_script_code): + # {{{ + regression_script_code += "print('TEST RUNTIMES:')\n" + regression_script_code += "case_output = '/case_outputs/'\n" + regression_script_code += "totaltime = 0\n" + regression_script_code += "for _, _, files in os.walk(base_path + case_output):\n" + regression_script_code += " for afile in sorted(files):\n" + regression_script_code += " outputfile = base_path + case_output + afile\n" + regression_script_code += " runtime = np.ceil(float(subprocess. check_output(\n ['grep', 'real', outputfile])." + regression_script_code += "decode('utf-8').split('\\n')[-2].split()[1]))\n" + regression_script_code += " totaltime += runtime\n" + regression_script_code += " mins = int(np.floor(runtime/60.0))\n" + regression_script_code += " secs = int(np.ceil(runtime - mins*60))\n" + regression_script_code += " print('{:02d}:{:02d} {}'.format(mins, secs, afile))\n" + regression_script_code += "mins = int(np.floor(totaltime/60.0))\n" + regression_script_code += "secs = int(np.ceil(totaltime - mins*60))\n" + regression_script_code += "print('Total runtime {:02d}:{:02d}'.format(mins, secs))\n" + regression_script_code += "\n" + regression_script_code += "if test_failed:\n" + regression_script_code += " sys.exit(1)\n" + regression_script_code += "else:\n" + regression_script_code += " sys.exit(0)\n" + + return regression_script_code + # }}} + + +def write_regression_script_testcase_data_top(work_dir): + # {{{ + regression_script_code = "" + regression_script_code += '#!/usr/bin/env python\n' + regression_script_code += '\n' + regression_script_code += '# This script was written by ' + regression_script_code += 'manage_regression_suite.py as part of a\n' + regression_script_code += '# regression_suite file\n' + regression_script_code += '\n' + regression_script_code += 'import sys\n' + regression_script_code += 'import os\n' + regression_script_code += 'import subprocess\n' + regression_script_code += 'import numpy as np\n' + regression_script_code += '\n' + regression_script_code += "os.environ['PYTHONUNBUFFERED'] = '1'\n" + regression_script_code += "test_failed = False\n" + regression_script_code += '\n' + regression_script_code += "if not os.path.exists('case_outputs'):\n" + regression_script_code += " os.makedirs('case_outputs')\n" + regression_script_code += '\n' + regression_script_code += "base_path = '{}'\n".format(work_dir) + + return regression_script_code + # }}} def setup_suite(suite_tag, work_dir, model_runtime, config_file, baseline_dir, - verbose): + verbose, args): # {{{ try: suite_name = suite_tag.attrib['name'] @@ -217,78 +472,48 @@ def setup_suite(suite_tag, work_dir, model_runtime, config_file, baseline_dir, if not os.path.exists('{}'.format(work_dir)): os.makedirs('{}'.format(work_dir)) - # Create regression suite run script - regression_script_name = '{}/{}.py'.format(work_dir, suite_name) - regression_script = open('{}'.format(regression_script_name), 'w') - - # Write script header - regression_script.write('#!/usr/bin/env python\n') - regression_script.write('\n') - regression_script.write('# This script was written by ' - 'manage_regression_suite.py as part of a\n' - '# regression_suite file\n') - regression_script.write('\n') - regression_script.write('import sys\n') - regression_script.write('import os\n') - regression_script.write('import subprocess\n') - regression_script.write('import numpy as np\n') - regression_script.write('\n') - regression_script.write("os.environ['PYTHONUNBUFFERED'] = '1'\n") - regression_script.write("test_failed = False\n") - regression_script.write('\n') - regression_script.write("if not os.path.exists('case_outputs'):\n") - regression_script.write(" os.makedirs('case_outputs')\n") - regression_script.write('\n') - regression_script.write("base_path = '{}'\n".format(work_dir)) - if verbose: # flush existing regression suite output file open(work_dir + '/manage_regression_suite.py.out', 'w').close() + regression_script_code = "" + + if not args.local_parallel: + # Create regression suite run script + regression_script_name = '{}/{}.py'.format(work_dir, suite_name) + regression_script = open('{}'.format(regression_script_name), 'w') + + regression_script_code = write_regression_script_testcase_data_top(work_dir) + for child in suite_tag: # Process tags within the test suite if child.tag == 'test': - process_test_setup(child, config_file, work_dir, model_runtime, - regression_script, baseline_dir, verbose) - - regression_script.write("print('TEST RUNTIMES:')\n") - regression_script.write("case_output = '/case_outputs/'\n") - regression_script.write("totaltime = 0\n") - regression_script.write("for _, _, files in os.walk(base_path + " - "case_output):\n") - regression_script.write(" for afile in sorted(files):\n") - regression_script.write(" outputfile = base_path + case_output + " - "afile\n") - regression_script.write(" runtime = np.ceil(float(subprocess." - "check_output(\n" - " ['grep', 'real', outputfile])." - "decode('utf-8').split('\\n')[-2].split()[1]))\n") - regression_script.write(" totaltime += runtime\n") - regression_script.write(" mins = int(np.floor(runtime/60.0))\n") - regression_script.write(" secs = int(np.ceil(runtime - mins*60))\n") - regression_script.write(" print('{:02d}:{:02d} {}'.format(mins, " - "secs, afile))\n") - regression_script.write("mins = int(np.floor(totaltime/60.0))\n") - regression_script.write("secs = int(np.ceil(totaltime - mins*60))\n") - regression_script.write("print('Total runtime {:02d}:{:02d}'.format(mins, " - "secs))\n") - regression_script.write("\n") - - regression_script.write("if test_failed:\n") - regression_script.write(" sys.exit(1)\n") - regression_script.write("else:\n") - regression_script.write(" sys.exit(0)\n") - regression_script.close() + regression_script_code = process_test_setup( + child, + config_file, + work_dir, + model_runtime, + regression_script_code, + baseline_dir, + verbose) + + if not args.local_parallel: + dev_null = open('/dev/null', 'a') + subprocess.check_call(['chmod', + 'a+x', + '{}'.format(regression_script_name)], + stdout=dev_null, + stderr=dev_null) + dev_null.close() - dev_null = open('/dev/null', 'a') - subprocess.check_call( - ['chmod', 'a+x', '{}'.format(regression_script_name)], - stdout=dev_null, stderr=dev_null) - dev_null.close() -# }}} + regression_script_code = write_regression_script_testcase_data_bottom( + regression_script_code) + return regression_script, regression_script_code + # }}} -def clean_suite(suite_tag, work_dir): # {{{ +def clean_suite(suite_tag, work_dir): + # {{{ try: suite_name = suite_tag.attrib['name'] except KeyError: @@ -305,13 +530,11 @@ def clean_suite(suite_tag, work_dir): # {{{ # Process children within the if child.tag == 'test': process_test_clean(child, work_dir) -# }}} - - -def get_test_case_procs(suite_tag): # {{{ + # }}} - testcases = {} +def get_test_case_procs(suite_tag, testcase_data, testcase_data_prereq): + # {{{ for child in suite_tag: if child.tag == 'test': try: @@ -373,7 +596,7 @@ def get_test_case_procs(suite_tag): # {{{ # Make sure the prerequisite is already in the test suite found = False - for other_name, other_test in testcases.items(): + for other_name, other_test in testcase_data.items(): match = [prereq[tag] == other_test[tag] for tag in ['core', 'configuration', 'resolution', 'test']] if all(match): @@ -400,10 +623,8 @@ def get_test_case_procs(suite_tag): # {{{ if fnmatch.fnmatch(file, '*.xml'): # Build full file name config_file = '{}/{}'.format(test_path, file) - config_tree = ET.parse(config_file) config_root = config_tree.getroot() - if config_root.tag == 'config': case = config_root.attrib['case'] if case in cases: @@ -419,29 +640,33 @@ def get_test_case_procs(suite_tag): # {{{ threads = int(threads_str) except (KeyError, ValueError): threads = 1 - del config_root del config_tree - testcases[test_name] = {'core': test_core, - 'configuration': test_configuration, - 'resolution': test_resolution, - 'test': test_test, - 'path': test_path, - 'procs': procs, - 'threads': threads, - 'prereqs': prereqs} - - return testcases # }}} - - -def summarize_suite(testcases): # {{{ + testcase_data[test_name] = {'core': test_core, + 'configuration': test_configuration, + 'resolution': test_resolution, + 'test': test_test, + 'path': test_path, + 'procs': procs, + 'threads': threads, + 'prereqs': prereqs} + for sub in testcase_data[test_name]['prereqs']: + name = sub['name'] + if name not in testcase_data_prereq: + testcase_data_prereq.append(name) + return testcase_data , testcase_data_prereq + # }}} + + +def summarize_suite(testcase_data): + # {{{ max_procs = 1 max_threads = 1 max_cores = 1 - for name in testcases: - procs = testcases[name]['procs'] - threads = testcases[name]['threads'] + for name in testcase_data: + procs = testcase_data[name]['procs'] + threads = testcase_data[name]['threads'] cores = threads * procs max_procs = max(max_procs, procs) @@ -453,10 +678,12 @@ def summarize_suite(testcases): # {{{ print(" Maximum MPI tasks used: {:d}".format(max_procs)) print(" Maximum OpenMP threads used: {:d}".format(max_threads)) print(" Maximum Total Cores used: {:d}".format(max_cores)) -# }}} + return testcase_data + # }}} -def main (): # {{{ +def main(): + # {{{ # Define and process input arguments parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawTextHelpFormatter) @@ -485,6 +712,14 @@ def main (): # {{{ help="If set, script will setup the test suite in " "work_dir rather in this script's location.", metavar="PATH") + parser.add_argument( + "-lp", + "--local_parallel", + help="If set, script will setup the test suite to " + "run in parallel on an interactive allocated node", + action="store_true") + parser.add_argument('-N', '--nodes', type=int, default=1, + help="Number of allocated Nodes; default to one") args = parser.parse_args() @@ -525,6 +760,7 @@ def main (): # {{{ # If the file was a file, process it if suite_root.tag == 'regression_suite': + testcase_data = {} # If cleaning, clean the suite if args.clean: print("Cleaning Test Cases:") @@ -533,11 +769,19 @@ def main (): # {{{ # If setting up, set up the suite if args.setup: print("\n") - print("Setting Up Test Cases:") - testcases = get_test_case_procs(suite_root) - setup_suite(suite_root, args.work_dir, args.model_runtime, - args.config_file, args.baseline_dir, args.verbose) - summarize_suite(testcases) + testcase_data_prereq = [] + testcase_data, testcase_data_prereq = get_test_case_procs(suite_root, testcase_data, testcase_data_prereq) + testcase_data = summarize_suite(testcase_data) + print("\n\nSetting Up Test Cases:") + if args.local_parallel: + local_parallel_script, local_parallel_code = local_parallel_setup_script( + args.work_dir, suite_root, args.verbose, args, suite_root, testcase_data, testcase_data_prereq) + local_parallel_script.write(local_parallel_code) + else: + regression_script, regression_script_code = setup_suite( + suite_root, args.work_dir, args.model_runtime, + args.config_file, args.baseline_dir, args.verbose, args) + regression_script.write(regression_script_code) if args.verbose: cmd = ['cat', args.work_dir + '/manage_regression_suite.py.out'] @@ -573,10 +817,11 @@ def main (): # {{{ history_file.write('**************************************************' '*********************\n') history_file.close() -# }}} + # }}} if __name__ == "__main__": main() + # vim: foldmethod=marker ai ts=4 sts=4 et sw=4 ft=python From 84d8f9bcfcd76d9fdc691a4b0e93de12345a3908 Mon Sep 17 00:00:00 2001 From: jamil gafur Date: Tue, 23 Jun 2020 08:47:50 -0600 Subject: [PATCH 02/30] added error checking for the test cases --- testing_and_setup/compass/manage_regression_suite.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index 74710e78ce..daee086feb 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -338,10 +338,11 @@ def write_regression_local_parallel_bottom(local_parallel_code): local_parallel_code += " elif not continue_add and not Done: \n" local_parallel_code += " for background_process in Queue_running: \n" local_parallel_code += " background_process[0].wait() \n" - local_parallel_code += " pid = background_process[0].pid \n" local_parallel_code += " if not psutil.pid_exists(pid): \n" local_parallel_code += " Queue_running.remove(background_process) \n" - local_parallel_code += " print(str(pid) +': compleate') \n" + local_parallel_code += " name = background_process[-1][-1]\n" + local_parallel_code += " name = name[name.find('compass')+len('compass')+1:-3].replace('/','_')\n" + local_parallel_code += " print(str(name) + ' compleate')\n" local_parallel_code += " number_of_procs += background_process[1] \n" local_parallel_code += " if index < len(procs)-1: \n" local_parallel_code += " print('STILL HAVE MORE TO PROCESS') \n" @@ -420,9 +421,15 @@ def write_regression_script_testcase_data_bottom(regression_script_code): regression_script_code += " mins = int(np.floor(runtime/60.0))\n" regression_script_code += " secs = int(np.ceil(runtime - mins*60))\n" regression_script_code += " print('{:02d}:{:02d} {}'.format(mins, secs, afile))\n" + regression_script_code += " runtime = subprocess.call(['grep', 'Error', outputfile], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)\n" + regression_script_code += " if runtime == 1:\n" + regression_script_code += " print('error found in: {} '.format(afile))\n" regression_script_code += "mins = int(np.floor(totaltime/60.0))\n" regression_script_code += "secs = int(np.ceil(totaltime - mins*60))\n" regression_script_code += "print('Total runtime {:02d}:{:02d}'.format(mins, secs))\n" + regression_script_code += "runtime = subprocess.call(['grep', 'Error', outputfile], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)\n" + regression_script_code += "if runtime == 1:\n" + regression_script_code += " print('error found in: {} '.format(afile))" regression_script_code += "\n" regression_script_code += "if test_failed:\n" regression_script_code += " sys.exit(1)\n" From 1dd32c7f80206104976f56d03a253cd2d1ca402c Mon Sep 17 00:00:00 2001 From: jamil gafur Date: Tue, 23 Jun 2020 08:54:20 -0600 Subject: [PATCH 03/30] added formatting --- testing_and_setup/compass/manage_regression_suite.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index daee086feb..ec5c3f1050 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -423,13 +423,10 @@ def write_regression_script_testcase_data_bottom(regression_script_code): regression_script_code += " print('{:02d}:{:02d} {}'.format(mins, secs, afile))\n" regression_script_code += " runtime = subprocess.call(['grep', 'Error', outputfile], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)\n" regression_script_code += " if runtime == 1:\n" - regression_script_code += " print('error found in: {} '.format(afile))\n" + regression_script_code += " print('\terror found in: {} '.format(afile))\n" regression_script_code += "mins = int(np.floor(totaltime/60.0))\n" regression_script_code += "secs = int(np.ceil(totaltime - mins*60))\n" regression_script_code += "print('Total runtime {:02d}:{:02d}'.format(mins, secs))\n" - regression_script_code += "runtime = subprocess.call(['grep', 'Error', outputfile], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)\n" - regression_script_code += "if runtime == 1:\n" - regression_script_code += " print('error found in: {} '.format(afile))" regression_script_code += "\n" regression_script_code += "if test_failed:\n" regression_script_code += " sys.exit(1)\n" From 66616315cf9a3cc01151c9eb8676ac886fe4296d Mon Sep 17 00:00:00 2001 From: jamil gafur Date: Tue, 23 Jun 2020 09:01:28 -0600 Subject: [PATCH 04/30] updated error output --- testing_and_setup/compass/manage_regression_suite.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index ec5c3f1050..84f1778656 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -421,8 +421,8 @@ def write_regression_script_testcase_data_bottom(regression_script_code): regression_script_code += " mins = int(np.floor(runtime/60.0))\n" regression_script_code += " secs = int(np.ceil(runtime - mins*60))\n" regression_script_code += " print('{:02d}:{:02d} {}'.format(mins, secs, afile))\n" - regression_script_code += " runtime = subprocess.call(['grep', 'Error', outputfile], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)\n" - regression_script_code += " if runtime == 1:\n" + regression_script_code += " error_out = subprocess.call(['grep', 'Error', outputfile], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)\n" + regression_script_code += " if error_out != 1:\n" regression_script_code += " print('\terror found in: {} '.format(afile))\n" regression_script_code += "mins = int(np.floor(totaltime/60.0))\n" regression_script_code += "secs = int(np.ceil(totaltime - mins*60))\n" From 820041385a4eb4343174ed8d7e2f94839c7c2118 Mon Sep 17 00:00:00 2001 From: jamil gafur Date: Tue, 23 Jun 2020 09:23:14 -0600 Subject: [PATCH 05/30] fixed output formatting --- .../compass/manage_regression_suite.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index 84f1778656..5abc300d7e 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -316,36 +316,37 @@ def write_regression_local_parallel_bottom(local_parallel_code): local_parallel_code += " if number_of_procs >= procs[index]:\n" local_parallel_code += " try:\n" local_parallel_code += " if index < len(command):\n" - local_parallel_code += " print('-----IN ADDING PHASE-----')\n" - local_parallel_code += " print('we have {} number_of_procs, current task uses {} Adding to queue'.format(number_of_procs, procs[index]))\n" + local_parallel_code += "# print('-----IN ADDING PHASE-----')\n" + local_parallel_code += "# print('we have {} number_of_procs, current task uses {} Adding to queue'.format(number_of_procs, procs[index]))\n" local_parallel_code += " case_output = open('case_outputs/'+command[index][0].replace('/', '_'), 'w+')\n" - local_parallel_code += " print('processing command: {}'.format(command[index][1]))\n" + local_parallel_code += "# print('processing command: {}'.format(command[index][1]))\n" local_parallel_code += " os.chdir(command[index][0])\n" local_parallel_code += " open_proc = subprocess.Popen(command[index][1], stdout=case_output, stderr=case_output)\n" local_parallel_code += " os.chdir(base)\n" local_parallel_code += " Queue_running.append([open_proc, procs[index],command[index][1]]) \n" local_parallel_code += " number_of_procs = number_of_procs - procs[index]\n" - local_parallel_code += " print(' New number_of_procs: {}'.format(number_of_procs)) \n" + local_parallel_code += "# print(' New number_of_procs: {}'.format(number_of_procs)) \n" local_parallel_code += " except subprocess.CalledProcessError: \n" local_parallel_code += " print('error for : {}'.format(str(command[index][1]))) \n" local_parallel_code += " elif number_of_procs < procs[index]: \n" - local_parallel_code += " print('NOT ENOUGH WAIT FOR PROCESSES') \n" + local_parallel_code += "# print('NOT ENOUGH WAIT FOR PROCESSES') \n" local_parallel_code += " continue_add = False \n" local_parallel_code += " index = index + 1 \n" local_parallel_code += " if index > len(procs) -1: \n" - local_parallel_code += " print('No more to add moving to processing phase') \n" + local_parallel_code += "# print('No more to add moving to processing phase') \n" local_parallel_code += " continue_add = False \n" local_parallel_code += " elif not continue_add and not Done: \n" local_parallel_code += " for background_process in Queue_running: \n" local_parallel_code += " background_process[0].wait() \n" + local_parallel_code += " pid =background_process[0].pid \n" local_parallel_code += " if not psutil.pid_exists(pid): \n" local_parallel_code += " Queue_running.remove(background_process) \n" local_parallel_code += " name = background_process[-1][-1]\n" local_parallel_code += " name = name[name.find('compass')+len('compass')+1:-3].replace('/','_')\n" - local_parallel_code += " print(str(name) + ' compleate')\n" + local_parallel_code += " print('compleate\t' + str(name))\n" local_parallel_code += " number_of_procs += background_process[1] \n" local_parallel_code += " if index < len(procs)-1: \n" - local_parallel_code += " print('STILL HAVE MORE TO PROCESS') \n" + local_parallel_code += "# print('STILL HAVE MORE TO PROCESS') \n" local_parallel_code += " if number_of_procs >= procs[index]: \n" local_parallel_code += " continue_add = True \n" local_parallel_code += " else: \n" @@ -409,7 +410,7 @@ def write_regression_local_parallel_top(work_dir, suite_tag, nodes): def write_regression_script_testcase_data_bottom(regression_script_code): # {{{ - regression_script_code += "print('TEST RUNTIMES:')\n" + regression_script_code += "print('\n\n\nTEST RUNTIMES & ERROR:')\n" regression_script_code += "case_output = '/case_outputs/'\n" regression_script_code += "totaltime = 0\n" regression_script_code += "for _, _, files in os.walk(base_path + case_output):\n" From 96e8a3a4d20716617691838f6afb8d4a06d42565 Mon Sep 17 00:00:00 2001 From: jamilgafur <58782189+jamilgafur@users.noreply.github.com> Date: Thu, 25 Jun 2020 12:43:58 -0600 Subject: [PATCH 06/30] Update testing_and_setup/compass/manage_regression_suite.py Co-authored-by: Xylar Asay-Davis --- testing_and_setup/compass/manage_regression_suite.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index 5abc300d7e..b79f648578 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -410,7 +410,7 @@ def write_regression_local_parallel_top(work_dir, suite_tag, nodes): def write_regression_script_testcase_data_bottom(regression_script_code): # {{{ - regression_script_code += "print('\n\n\nTEST RUNTIMES & ERROR:')\n" + regression_script_code += "print('\\n\\n\\nTEST RUNTIMES & ERROR:')\n" regression_script_code += "case_output = '/case_outputs/'\n" regression_script_code += "totaltime = 0\n" regression_script_code += "for _, _, files in os.walk(base_path + case_output):\n" From f4b2f2ded5c274adb937d8382cd0b00969c1bccc Mon Sep 17 00:00:00 2001 From: jamilgafur <58782189+jamilgafur@users.noreply.github.com> Date: Thu, 25 Jun 2020 12:44:52 -0600 Subject: [PATCH 07/30] Update testing_and_setup/compass/manage_regression_suite.py Co-authored-by: Xylar Asay-Davis --- testing_and_setup/compass/manage_regression_suite.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index b79f648578..1e5d93e550 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -274,8 +274,8 @@ def write_regression_local_parallel_testcase_data(local_parallel_code, work_dir, for key in pre_testcase_data.keys(): local_parallel_code += "locations.append('" + \ pre_testcase_data[key]['path'] + "')\n" - local_parallel_code += "commands.append(['time' , '-p' , '" + os.getcwd( - ) + "/" + pre_testcase_data[key]['path'] + "/' + str(run_file_names[{}])])\n".format(str(index)) + local_parallel_code += "commands.append(['time' , '-p' , '" + work_dir \ + + "/" + pre_testcase_data[key]['path'] + "/' + str(run_file_names[{}])])\n".format(str(index)) local_parallel_code += "procs.append(" + \ str(pre_testcase_data[key]['procs']) + ")\n" local_parallel_code += "prereq_commands.append([locations[" + str( From 5071629389716cb14fc12b6c18bfa1b2fc01028b Mon Sep 17 00:00:00 2001 From: jamil gafur Date: Thu, 25 Jun 2020 13:27:09 -0600 Subject: [PATCH 08/30] turns parallel output file to executable --- .../compass/manage_regression_suite.py | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index 1e5d93e550..0824e5bf13 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -228,6 +228,17 @@ def local_parallel_setup_script( local_parallel_code = write_regression_local_parallel_top( work_dir, suite_tag, args.nodes) + + + dev_null = open('/dev/null', 'a') + subprocess.check_call(['chmod', + 'a+x', + '{}'.format(regression_script_name)], + stdout=dev_null, + stderr=dev_null) + dev_null.close() + + setup_suite( suite_root, args.work_dir, @@ -483,12 +494,11 @@ def setup_suite(suite_tag, work_dir, model_runtime, config_file, baseline_dir, regression_script_code = "" - if not args.local_parallel: - # Create regression suite run script - regression_script_name = '{}/{}.py'.format(work_dir, suite_name) - regression_script = open('{}'.format(regression_script_name), 'w') + # Create regression suite run script + regression_script_name = '{}/{}.py'.format(work_dir, suite_name) + regression_script = open('{}'.format(regression_script_name), 'w') - regression_script_code = write_regression_script_testcase_data_top(work_dir) + regression_script_code = write_regression_script_testcase_data_top(work_dir) for child in suite_tag: # Process tags within the test suite @@ -502,17 +512,18 @@ def setup_suite(suite_tag, work_dir, model_runtime, config_file, baseline_dir, baseline_dir, verbose) + dev_null = open('/dev/null', 'a') + subprocess.check_call(['chmod', + 'a+x', + '{}'.format(regression_script_name)], + stdout=dev_null, + stderr=dev_null) + dev_null.close() if not args.local_parallel: - dev_null = open('/dev/null', 'a') - subprocess.check_call(['chmod', - 'a+x', - '{}'.format(regression_script_name)], - stdout=dev_null, - stderr=dev_null) - dev_null.close() regression_script_code = write_regression_script_testcase_data_bottom( regression_script_code) + return regression_script, regression_script_code # }}} From a50672f105b8a3df8d39bc033cfd08bf01d8c6dc Mon Sep 17 00:00:00 2001 From: jamil gafur Date: Mon, 29 Jun 2020 15:39:21 -0600 Subject: [PATCH 09/30] generates dicitionary WIP need to update processing code --- .../compass/manage_regression_suite.py | 93 ++++++++++--------- 1 file changed, 49 insertions(+), 44 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index 0824e5bf13..00f1585e3b 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -207,8 +207,11 @@ def process_test_clean(test_tag, work_dir): def local_parallel_setup_script( - work_dir, suite_tag, verbose, args, suite_root, testcase_data, testcase_data_prereq): + args, suite_tag, testcase_data, testcase_data_prereq, data_dic): # {{{ + work_dir = args.work_dir + verbose = args.verbose + suite_root = suite_tag try: suite_name = suite_tag.attrib['name'] except KeyError: @@ -226,8 +229,7 @@ def local_parallel_setup_script( regression_script_name = '{}/parallel_{}.py'.format(work_dir, suite_name) local_parallel_script = open('{}'.format(regression_script_name), 'w') local_parallel_code = write_regression_local_parallel_top( - work_dir, suite_tag, args.nodes) - + work_dir, suite_tag, args.nodes, data_dic) dev_null = open('/dev/null', 'a') @@ -246,9 +248,12 @@ def local_parallel_setup_script( args.config_file, args.baseline_dir, args.verbose, - args) + args, + data_dic) + local_parallel_code = write_regression_local_parallel_testcase_data( - local_parallel_code, work_dir, testcase_data, testcase_data_prereq) + local_parallel_code, work_dir, testcase_data, testcase_data_prereq, data_dic) + local_parallel_code = write_regression_local_parallel_bottom( local_parallel_code) @@ -256,53 +261,48 @@ def local_parallel_setup_script( # }}} -def get_prereq(testcase_data, testcase_data_prereq): +def get_prereq(testcase_data, testcase_data_prereq, data_dic): # {{{ names = [i for i in testcase_data.keys() if i in testcase_data_prereq] - pre = {} + cwd = os.getcwd() + "/" for i in names: - pre[i] = testcase_data[i] - return pre + command = ['time', '-p', str(cwd)+str(testcase_data[i]['path'])] + data_dic['procs'].insert(0, testcase_data[i]['procs']) + data_dic['prereq_commands'].insert(0,[testcase_data[i]['path'], command]) + + return data_dic # }}} -def get_testcase_data(testcase_data, testcase_data_prereq): +def get_testcase_data(testcase_data, testcase_data_prereq, data_dic): # {{{ names = [i for i in testcase_data.keys() if i not in testcase_data_prereq] - testcase_data_values = {} + cwd = os.getcwd() + "/" for i in names: - testcase_data_values[i] = testcase_data[i] - return testcase_data_values + command = ['time', '-p', str(cwd)+str(testcase_data[i]['path'])] + data_dic['procs'].append(testcase_data[i]['procs']) + data_dic['allocation_commands'].append([testcase_data[i]['path'], command]) + return data_dic # }}} -def write_regression_local_parallel_testcase_data(local_parallel_code, work_dir, testcase_data, testcase_data_prereq): +def write_regression_local_parallel_testcase_data(local_parallel_code, work_dir, testcase_data, testcase_data_prereq, data_dic): # {{{ index = 0 - testcase_data_values = get_testcase_data( testcase_data , testcase_data_prereq) - pre_testcase_data = get_prereq(testcase_data , testcase_data_prereq) - - for key in pre_testcase_data.keys(): - local_parallel_code += "locations.append('" + \ - pre_testcase_data[key]['path'] + "')\n" - local_parallel_code += "commands.append(['time' , '-p' , '" + work_dir \ - + "/" + pre_testcase_data[key]['path'] + "/' + str(run_file_names[{}])])\n".format(str(index)) - local_parallel_code += "procs.append(" + \ - str(pre_testcase_data[key]['procs']) + ")\n" - local_parallel_code += "prereq_commands.append([locations[" + str( - index) + "], commands[" + str(index) + "]])\n\n\n" - index += 1 - - for key in testcase_data_values.keys(): - local_parallel_code += "locations.append('" + \ - testcase_data_values[key]['path'] + "')\n" - local_parallel_code += "commands.append(['time' , '-p' , '" + os.getcwd( - ) + "/" + testcase_data_values[key]['path'] + "/' + str(run_file_names[{}])])\n".format(str(index)) - local_parallel_code += "procs.append(" + \ - str(testcase_data_values[key]['procs']) + ")\n" - local_parallel_code += "allocation_commands.append([locations[" + str( - index) + "], commands[" + str(index) + "]])\n\n\n" - index += 1 + data_dic = get_testcase_data( testcase_data , testcase_data_prereq, data_dic) + data_dic = get_prereq(testcase_data , testcase_data_prereq, data_dic) + + local_parallel_code += "data_dic = {}\n" + for key in data_dic.keys(): + local_parallel_code += "data_dic['"+str(key)+"'] = []\n" + + for key in data_dic.keys(): + for value in data_dic[key]: + if type(value) == list: + local_parallel_code += "data_dic['"+str(key)+"'].append(" + str(value) + ")\n" + else: + local_parallel_code += "data_dic['"+str(key)+"'].append('"+ str(value) +"')\n" + local_parallel_code += "\n\n" return local_parallel_code # }}} @@ -381,7 +381,7 @@ def write_regression_local_parallel_bottom(local_parallel_code): # }}} -def write_regression_local_parallel_top(work_dir, suite_tag, nodes): +def write_regression_local_parallel_top(work_dir, suite_tag, nodes, data_dic): # {{{ local_parallel_code = "#!/usr/bin/env python\n\n\n" local_parallel_code += "import time\n" @@ -411,8 +411,7 @@ def write_regression_local_parallel_top(work_dir, suite_tag, nodes): for child in suite_tag: for script in child: script_name = script.attrib['name'] - local_parallel_code += "run_file_names.append('{}')\n".format( - script_name) + data_dic['run_file_names'].append(script_name) local_parallel_code += "\n\n\n" return local_parallel_code @@ -476,7 +475,7 @@ def write_regression_script_testcase_data_top(work_dir): def setup_suite(suite_tag, work_dir, model_runtime, config_file, baseline_dir, - verbose, args): + verbose, args, data_dic): # {{{ try: suite_name = suite_tag.attrib['name'] @@ -786,12 +785,18 @@ def main(): if args.setup: print("\n") testcase_data_prereq = [] + data_dic = {} testcase_data, testcase_data_prereq = get_test_case_procs(suite_root, testcase_data, testcase_data_prereq) testcase_data = summarize_suite(testcase_data) print("\n\nSetting Up Test Cases:") if args.local_parallel: - local_parallel_script, local_parallel_code = local_parallel_setup_script( - args.work_dir, suite_root, args.verbose, args, suite_root, testcase_data, testcase_data_prereq) + data_dic["procs"] = [] + data_dic["run_file_names"] = [] + data_dic["prereq_commands"] = [] + data_dic["allocation_commands"] =[] + print("--------") + local_parallel_script, local_parallel_code = local_parallel_setup_script(args, suite_root, testcase_data, testcase_data_prereq, data_dic) + print("not here") local_parallel_script.write(local_parallel_code) else: regression_script, regression_script_code = setup_suite( From 3bafd40f514fbbba744cffbe009a18ccc645995e Mon Sep 17 00:00:00 2001 From: jamil gafur Date: Tue, 30 Jun 2020 12:38:47 -0600 Subject: [PATCH 10/30] working partial parallel processing using a dictionary --- .../compass/manage_regression_suite.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index 00f1585e3b..1021338248 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -310,11 +310,12 @@ def write_regression_local_parallel_testcase_data(local_parallel_code, work_dir, def write_regression_local_parallel_bottom(local_parallel_code): # {{{ - local_parallel_code += "if max(procs) > number_of_procs:\n" + local_parallel_code += "data_dic['procs'] = [int(i) for i in data_dic['procs']]\n" + local_parallel_code += "if max(data_dic['procs']) > number_of_procs:\n" local_parallel_code += " print('more processors are used than allocated')\n" local_parallel_code += " quit()\n" local_parallel_code += "start_time = time.time()\n" - local_parallel_code += "def stream_queue(command, number_of_procs):\n" + local_parallel_code += "def stream_queue(command, number_of_procs, procs):\n" local_parallel_code += " if command == []:\n" local_parallel_code += " return \n" local_parallel_code += " base = os.getcwd()\n" @@ -332,6 +333,7 @@ def write_regression_local_parallel_bottom(local_parallel_code): local_parallel_code += " case_output = open('case_outputs/'+command[index][0].replace('/', '_'), 'w+')\n" local_parallel_code += "# print('processing command: {}'.format(command[index][1]))\n" local_parallel_code += " os.chdir(command[index][0])\n" + local_parallel_code += " command[index][1][2] = command[index][1][2] +'/'+data_dic['run_file_names'][index]\n" local_parallel_code += " open_proc = subprocess.Popen(command[index][1], stdout=case_output, stderr=case_output)\n" local_parallel_code += " os.chdir(base)\n" local_parallel_code += " Queue_running.append([open_proc, procs[index],command[index][1]]) \n" @@ -369,9 +371,9 @@ def write_regression_local_parallel_bottom(local_parallel_code): local_parallel_code += " \n" local_parallel_code += " return\n" local_parallel_code += "print('--- PROCESSING PREREQS ---')\n" - local_parallel_code += "stream_queue(prereq_commands, number_of_procs)\n" + local_parallel_code += "stream_queue(data_dic['prereq_commands'], number_of_procs, [data_dic['procs'][i] for i in range(0, len(data_dic['prereq_commands']))])\n" local_parallel_code += "print('--- PROCESSING commands ---')\n" - local_parallel_code += "stream_queue(allocation_commands, number_of_procs)\n" + local_parallel_code += "stream_queue(data_dic['allocation_commands'], number_of_procs, [data_dic['procs'][i+len(data_dic['prereq_commands'])] for i in range(0, len(data_dic['allocation_commands']))])\n" local_parallel_code += "end_time = time.time()\n" local_parallel_code += "print('parallel run time: {} min'.format((end_time - start_time) / 60))\n" local_parallel_code = write_regression_script_testcase_data_bottom( @@ -401,12 +403,6 @@ def write_regression_local_parallel_top(work_dir, suite_tag, nodes, data_dic): local_parallel_code += " os.makedirs('case_outputs')\n" local_parallel_code += "base_path = '" + work_dir + "'\n" local_parallel_code += "os.chdir(base_path)\n" - local_parallel_code += "locations = []\n" - local_parallel_code += "procs = []\n" - local_parallel_code += "run_file_names = []\n" - local_parallel_code += "prereq_commands = []\n" - local_parallel_code += "allocation_commands = []\n" - local_parallel_code += "commands = []\n\n\n" for child in suite_tag: for script in child: @@ -794,9 +790,7 @@ def main(): data_dic["run_file_names"] = [] data_dic["prereq_commands"] = [] data_dic["allocation_commands"] =[] - print("--------") local_parallel_script, local_parallel_code = local_parallel_setup_script(args, suite_root, testcase_data, testcase_data_prereq, data_dic) - print("not here") local_parallel_script.write(local_parallel_code) else: regression_script, regression_script_code = setup_suite( From 8f8a2194fa0bf6b960956b6aa536e989e476d9c4 Mon Sep 17 00:00:00 2001 From: jamil gafur Date: Wed, 1 Jul 2020 07:31:11 -0600 Subject: [PATCH 11/30] using args.work_dir instead of os.getcwd --- testing_and_setup/compass/manage_regression_suite.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index 1021338248..95a35d71db 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -255,7 +255,7 @@ def local_parallel_setup_script( local_parallel_code, work_dir, testcase_data, testcase_data_prereq, data_dic) local_parallel_code = write_regression_local_parallel_bottom( - local_parallel_code) + local_parallel_code,args.work_dir) return local_parallel_script, local_parallel_code # }}} @@ -308,7 +308,7 @@ def write_regression_local_parallel_testcase_data(local_parallel_code, work_dir, # }}} -def write_regression_local_parallel_bottom(local_parallel_code): +def write_regression_local_parallel_bottom(local_parallel_code,work_dir): # {{{ local_parallel_code += "data_dic['procs'] = [int(i) for i in data_dic['procs']]\n" local_parallel_code += "if max(data_dic['procs']) > number_of_procs:\n" @@ -318,7 +318,7 @@ def write_regression_local_parallel_bottom(local_parallel_code): local_parallel_code += "def stream_queue(command, number_of_procs, procs):\n" local_parallel_code += " if command == []:\n" local_parallel_code += " return \n" - local_parallel_code += " base = os.getcwd()\n" + local_parallel_code += " base = '{}'\n".format(work_dir) local_parallel_code += " Queue_running = []\n" local_parallel_code += " index = 0\n" local_parallel_code += " continue_add = True\n" From ca2c582c6ccbff2323e21075800369ff6d96247e Mon Sep 17 00:00:00 2001 From: jamil gafur Date: Wed, 1 Jul 2020 07:35:08 -0600 Subject: [PATCH 12/30] added backslash to the end of the base --- testing_and_setup/compass/manage_regression_suite.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index 95a35d71db..1102574673 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -318,7 +318,7 @@ def write_regression_local_parallel_bottom(local_parallel_code,work_dir): local_parallel_code += "def stream_queue(command, number_of_procs, procs):\n" local_parallel_code += " if command == []:\n" local_parallel_code += " return \n" - local_parallel_code += " base = '{}'\n".format(work_dir) + local_parallel_code += " base = '{}/'\n".format(work_dir) local_parallel_code += " Queue_running = []\n" local_parallel_code += " index = 0\n" local_parallel_code += " continue_add = True\n" From 4dec5abf9d7f597448056ff308e170df09a14832 Mon Sep 17 00:00:00 2001 From: jamil gafur Date: Wed, 1 Jul 2020 08:42:19 -0600 Subject: [PATCH 13/30] write out testcase_data to the parallel*.py for processing --- .../compass/manage_regression_suite.py | 100 +++++------------- 1 file changed, 27 insertions(+), 73 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index 1102574673..89e5066492 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -2,12 +2,9 @@ """ This script is used to manage regression suites. A regression suite is a set of test cases that ensure one or more features in a model meet certain criteria. - Using this script one can setup or clean a regression suite. - When setting up a regression suite, this script will generate a script to run all tests in the suite, and additionally setup each individual test case. - When cleaning a regression suite, this script will remove any generated files for each individual test case, and the run script that runs all test cases. """ @@ -207,11 +204,8 @@ def process_test_clean(test_tag, work_dir): def local_parallel_setup_script( - args, suite_tag, testcase_data, testcase_data_prereq, data_dic): + work_dir, suite_tag, verbose, args, suite_root, testcase_data): # {{{ - work_dir = args.work_dir - verbose = args.verbose - suite_root = suite_tag try: suite_name = suite_tag.attrib['name'] except KeyError: @@ -229,7 +223,8 @@ def local_parallel_setup_script( regression_script_name = '{}/parallel_{}.py'.format(work_dir, suite_name) local_parallel_script = open('{}'.format(regression_script_name), 'w') local_parallel_code = write_regression_local_parallel_top( - work_dir, suite_tag, args.nodes, data_dic) + work_dir, suite_tag, args.nodes) + dev_null = open('/dev/null', 'a') @@ -248,77 +243,41 @@ def local_parallel_setup_script( args.config_file, args.baseline_dir, args.verbose, - args, - data_dic) - + args) local_parallel_code = write_regression_local_parallel_testcase_data( - local_parallel_code, work_dir, testcase_data, testcase_data_prereq, data_dic) - + local_parallel_code, work_dir, testcase_data) local_parallel_code = write_regression_local_parallel_bottom( - local_parallel_code,args.work_dir) + local_parallel_code) return local_parallel_script, local_parallel_code # }}} -def get_prereq(testcase_data, testcase_data_prereq, data_dic): - # {{{ - names = [i for i in testcase_data.keys() if i in testcase_data_prereq] - cwd = os.getcwd() + "/" - for i in names: - command = ['time', '-p', str(cwd)+str(testcase_data[i]['path'])] - data_dic['procs'].insert(0, testcase_data[i]['procs']) - data_dic['prereq_commands'].insert(0,[testcase_data[i]['path'], command]) - - return data_dic - # }}} - -def get_testcase_data(testcase_data, testcase_data_prereq, data_dic): - # {{{ - names = [i for i in testcase_data.keys() if i not in testcase_data_prereq] - cwd = os.getcwd() + "/" - for i in names: - command = ['time', '-p', str(cwd)+str(testcase_data[i]['path'])] - data_dic['procs'].append(testcase_data[i]['procs']) - data_dic['allocation_commands'].append([testcase_data[i]['path'], command]) - return data_dic - # }}} - - -def write_regression_local_parallel_testcase_data(local_parallel_code, work_dir, testcase_data, testcase_data_prereq, data_dic): +def write_regression_local_parallel_testcase_data(local_parallel_code, work_dir, testcase_data): # {{{ - index = 0 - data_dic = get_testcase_data( testcase_data , testcase_data_prereq, data_dic) - data_dic = get_prereq(testcase_data , testcase_data_prereq, data_dic) - - local_parallel_code += "data_dic = {}\n" - for key in data_dic.keys(): - local_parallel_code += "data_dic['"+str(key)+"'] = []\n" - - for key in data_dic.keys(): - for value in data_dic[key]: - if type(value) == list: - local_parallel_code += "data_dic['"+str(key)+"'].append(" + str(value) + ")\n" - else: - local_parallel_code += "data_dic['"+str(key)+"'].append('"+ str(value) +"')\n" - local_parallel_code += "\n\n" - + for i in testcase_data.keys(): + print("{}\n\t{}\n\n".format(i, testcase_data[i])) + + local_parallel_code += "#code to save testcase_data here\n" + local_parallel_code += "testcase_data = {}\n" + for name in testcase_data.keys(): + local_parallel_code += "testcase_data['{}'] = {}\n".format(name, testcase_data[name]) return local_parallel_code # }}} -def write_regression_local_parallel_bottom(local_parallel_code,work_dir): +def write_regression_local_parallel_bottom(local_parallel_code): # {{{ - local_parallel_code += "data_dic['procs'] = [int(i) for i in data_dic['procs']]\n" - local_parallel_code += "if max(data_dic['procs']) > number_of_procs:\n" + local_parallel_code += "# rewrite algorithm to read in testcase_data\n# must use args.work_dir instead of os.getcwd\n" + local_parallel_code += "if max(procs) > number_of_procs:\n" local_parallel_code += " print('more processors are used than allocated')\n" local_parallel_code += " quit()\n" local_parallel_code += "start_time = time.time()\n" - local_parallel_code += "def stream_queue(command, number_of_procs, procs):\n" + local_parallel_code += "def stream_queue(command, number_of_procs):\n" local_parallel_code += " if command == []:\n" local_parallel_code += " return \n" - local_parallel_code += " base = '{}/'\n".format(work_dir) + local_parallel_code += " base = os.getcwd()\n" local_parallel_code += " Queue_running = []\n" local_parallel_code += " index = 0\n" local_parallel_code += " continue_add = True\n" @@ -333,7 +292,6 @@ def write_regression_local_parallel_bottom(local_parallel_code,work_dir): local_parallel_code += " case_output = open('case_outputs/'+command[index][0].replace('/', '_'), 'w+')\n" local_parallel_code += "# print('processing command: {}'.format(command[index][1]))\n" local_parallel_code += " os.chdir(command[index][0])\n" - local_parallel_code += " command[index][1][2] = command[index][1][2] +'/'+data_dic['run_file_names'][index]\n" local_parallel_code += " open_proc = subprocess.Popen(command[index][1], stdout=case_output, stderr=case_output)\n" local_parallel_code += " os.chdir(base)\n" local_parallel_code += " Queue_running.append([open_proc, procs[index],command[index][1]]) \n" @@ -371,9 +329,9 @@ def write_regression_local_parallel_bottom(local_parallel_code,work_dir): local_parallel_code += " \n" local_parallel_code += " return\n" local_parallel_code += "print('--- PROCESSING PREREQS ---')\n" - local_parallel_code += "stream_queue(data_dic['prereq_commands'], number_of_procs, [data_dic['procs'][i] for i in range(0, len(data_dic['prereq_commands']))])\n" + local_parallel_code += "stream_queue(prereq_commands, number_of_procs)\n" local_parallel_code += "print('--- PROCESSING commands ---')\n" - local_parallel_code += "stream_queue(data_dic['allocation_commands'], number_of_procs, [data_dic['procs'][i+len(data_dic['prereq_commands'])] for i in range(0, len(data_dic['allocation_commands']))])\n" + local_parallel_code += "stream_queue(allocation_commands, number_of_procs)\n" local_parallel_code += "end_time = time.time()\n" local_parallel_code += "print('parallel run time: {} min'.format((end_time - start_time) / 60))\n" local_parallel_code = write_regression_script_testcase_data_bottom( @@ -383,7 +341,7 @@ def write_regression_local_parallel_bottom(local_parallel_code,work_dir): # }}} -def write_regression_local_parallel_top(work_dir, suite_tag, nodes, data_dic): +def write_regression_local_parallel_top(work_dir, suite_tag, nodes): # {{{ local_parallel_code = "#!/usr/bin/env python\n\n\n" local_parallel_code += "import time\n" @@ -407,7 +365,6 @@ def write_regression_local_parallel_top(work_dir, suite_tag, nodes, data_dic): for child in suite_tag: for script in child: script_name = script.attrib['name'] - data_dic['run_file_names'].append(script_name) local_parallel_code += "\n\n\n" return local_parallel_code @@ -428,7 +385,7 @@ def write_regression_script_testcase_data_bottom(regression_script_code): regression_script_code += " mins = int(np.floor(runtime/60.0))\n" regression_script_code += " secs = int(np.ceil(runtime - mins*60))\n" regression_script_code += " print('{:02d}:{:02d} {}'.format(mins, secs, afile))\n" - regression_script_code += " error_out = subprocess.call(['grep', 'Error', outputfile], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)\n" + regression_script_code += " error_out = subprocess.call(['grep', 'FAIL', outputfile], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)\n" regression_script_code += " if error_out != 1:\n" regression_script_code += " print('\terror found in: {} '.format(afile))\n" regression_script_code += "mins = int(np.floor(totaltime/60.0))\n" @@ -471,7 +428,7 @@ def write_regression_script_testcase_data_top(work_dir): def setup_suite(suite_tag, work_dir, model_runtime, config_file, baseline_dir, - verbose, args, data_dic): + verbose, args): # {{{ try: suite_name = suite_tag.attrib['name'] @@ -781,16 +738,13 @@ def main(): if args.setup: print("\n") testcase_data_prereq = [] - data_dic = {} testcase_data, testcase_data_prereq = get_test_case_procs(suite_root, testcase_data, testcase_data_prereq) testcase_data = summarize_suite(testcase_data) + print("\n\nSetting Up Test Cases:") if args.local_parallel: - data_dic["procs"] = [] - data_dic["run_file_names"] = [] - data_dic["prereq_commands"] = [] - data_dic["allocation_commands"] =[] - local_parallel_script, local_parallel_code = local_parallel_setup_script(args, suite_root, testcase_data, testcase_data_prereq, data_dic) + local_parallel_script, local_parallel_code = local_parallel_setup_script( + args.work_dir, suite_root, args.verbose, args, suite_root, testcase_data) local_parallel_script.write(local_parallel_code) else: regression_script, regression_script_code = setup_suite( From b1fd12301f82298039a40c2b660c70c7d0e63801 Mon Sep 17 00:00:00 2001 From: jamil gafur Date: Mon, 6 Jul 2020 08:30:40 -0600 Subject: [PATCH 14/30] cleaned up the method calls and updated generated script to allow for dictionary processing (WIP) --- .../compass/manage_regression_suite.py | 337 +++++++++--------- 1 file changed, 163 insertions(+), 174 deletions(-) diff --git a/testing_and_setup/compass/manage_regression_suite.py b/testing_and_setup/compass/manage_regression_suite.py index 89e5066492..9a2b5408c0 100755 --- a/testing_and_setup/compass/manage_regression_suite.py +++ b/testing_and_setup/compass/manage_regression_suite.py @@ -24,7 +24,7 @@ def process_test_setup(test_tag, config_file, work_dir, model_runtime, - regression_script_code, baseline_dir, verbose): + script_code, baseline_dir, verbose, isparallel): # {{{ if verbose: stdout = open(work_dir + '/manage_regression_suite.py.out', 'a') @@ -97,53 +97,54 @@ def process_test_setup(test_tag, config_file, work_dir, model_runtime, print(" -- Setup case '{}': -o {} -c {} -r {} -t {}".format( test_name, test_core, test_configuration, test_resolution, test_test)) - # Write step into suite script to cd into the base of the regression suite - regression_script_code += "os.chdir(base_path)\n" - - # Write the step to define the output file - regression_script_code += "case_output = open('case_outputs/{}', 'w')\n".format( - case_output_name) - - # Write step to cd into test case directory - regression_script_code += "os.chdir('{}/{}/{}/{}')\n".format( - test_core, test_configuration, test_resolution, test_test) - - for script in test_tag: - # Process test case script - if script.tag == 'script': - try: - script_name = script.attrib['name'] - except KeyError: - print("ERROR: