Skip to content
Snippets Groups Projects
Commit d10ea998 authored by Michael Reuscher's avatar Michael Reuscher
Browse files

some rework and clean up

parent 0ba7451c
Branches
No related tags found
1 merge request!4Python driver and data analysis part
import os
import time
import signal
import subprocess
import logging
import concurrent.futures as cf
import simulation as sim
import output_processing as op
import config_processing as con
# Set up logging
# Set up logging global
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
stop_requested = False
# Function to print terminal output and handle errors
def terminal_output(output, errors, return_code, msg):
if return_code > 0:
logger.error(f'return code: {return_code}')
if errors:
logger.error(errors)
if output:
logger.info(output)
else:
if output:
logger.info(output)
logger.info(msg)
# Function to pull data from remote machines to the local machine
def pull_data(source_path, desti_path):
desti = os.path.join(desti_path, 'output')
os.makedirs(desti, mode=0o777, exist_ok=True)
if args.hosts:
for host in args.hosts:
ssh_target = f'{args.user}@{host}'
scp_cmd = f'scp -r {ssh_target}:{source_path}/*{host}* {desti}'
subprocess.run(scp_cmd, shell=True, check=True)
def rm_data(t_data_path):
if args.hosts:
logger.info('remove test_data folder')
host = args.hosts[0]
ssh_target = f'{args.user}@{host}'
rm_cmd = f'ssh {ssh_target} rm -r {t_data_path}'
subprocess.run(rm_cmd, shell=True, check=True)
def handle_sigint(signum, frame):
logger.info("Main process received KeyboardInterrupt")
global stop_requested
stop_requested = True
for h in args.hosts:
kill_target = f'{args.user}@{h}'
kill_cmd = f"ssh {kill_target} pkill -f {args.sim}"
re = subprocess.run(kill_cmd, shell=True)
ou = re.stdout
er = re.stderr
rc = re.returncode
terminal_msg = f'{kill_target} killed'
terminal_output(ou, er, rc, terminal_msg)
# Function to run the detector simulation with the given event arguments
def run_sim(e_dict, d_path, sim_path, main_path):
with cf.ThreadPoolExecutor() as executor:
signal.signal(signal.SIGINT, handle_sigint)
for event in e_dict:
if stop_requested:
break
futures = []
folder_name = e_dict[event]['-o']
e_dict[event]['--startTime'] = f'@{int(time.time() + 10)}'
e_dict[event]['--posixDataDir'] = os.path.join(e_dict[event]['--posixDataDir'], event)
output_path = os.path.join(d_path, folder_name)
if args and args.hosts:
# Remote execution on multiple machines with specified user
for host in args.hosts:
ssh_target = f'{args.user}@{host}'
future = executor.submit(sim.remote_simulation, sim_path, e_dict[event], output_path,
ssh_target, host)
futures.append((folder_name, host, future))
# Local execution
else:
future = executor.submit(sim.local_simulation, sim_path, e_dict[event], output_path)
futures.append((folder_name, None, future))
for folder_name, host, future in futures:
output, errors, result = future.result()
terminal_msg = f'{folder_name}_{host if host else ""} done!'
if not stop_requested:
terminal_output(output, errors, result, terminal_msg)
if output:
if host:
stat_file_name = f'statistics_{folder_name}_{host}'
else:
stat_file_name = f'statistics_{folder_name}_lokal'
op.write_statistics(main_path, stat_file_name, output)
# Parse arguments global
args = con.parse_arguments()
# --- MAIN ---
if __name__ == '__main__':
# Parse arguments
args = con.parse_arguments()
# Setup all paths and folders
main_folder_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
......@@ -113,13 +23,10 @@ if __name__ == '__main__':
# Setup arguments for Sim
event_dict = con.create_event_list(args, test_data_path)
# Run detectorSimulation
run_sim(event_dict, data_path, simulation_path, main_folder_path)
pull_data(data_path, main_folder_path)
rm_data(test_data_path)
# Run detectorSimulation
sim.run_sim(event_dict, data_path, simulation_path, main_folder_path)
# Clean up
op.pull_data(data_path, main_folder_path)
op.rm_data(test_data_path)
import os
import subprocess
from main import args
from main import logger
# Function to create local folders for test data and output
......@@ -27,3 +29,33 @@ def write_statistics(main_path, stat_file_name, output):
file_path = os.path.join(stat_path, stat_file_name + '.dat')
with open(file_path, 'w') as stat_file:
stat_file.write(output)
# Function to pull data from remote machines to the local machine
def pull_data(source_path, desti_path):
desti = os.path.join(desti_path, 'output')
os.makedirs(desti, mode=0o777, exist_ok=True)
if args.hosts:
for host in args.hosts:
ssh_target = f'{args.user}@{host}'
scp_cmd = f'scp -r {ssh_target}:{source_path}/*{host}* {desti}'
subprocess.run(scp_cmd, shell=True, check=True)
def rm_data(t_data_path):
if args.hosts:
logger.info('remove test_data folder')
host = args.hosts[0]
ssh_target = f'{args.user}@{host}'
rm_cmd = f'ssh {ssh_target} rm -r {t_data_path}'
subprocess.run(rm_cmd, shell=True, check=True)
else:
logger.info('remove test_data folder')
for root, dirs, files in os.walk(t_data_path, topdown=False):
for name in files:
file_path = os.path.join(root, name)
os.remove(file_path)
for name in dirs:
dir_path = os.path.join(root, name)
os.rmdir(dir_path)
os.rmdir(t_data_path)
import os
import time
import signal
import subprocess
import output_processing as op
import concurrent.futures as cf
from main import args
from main import logger
stop_requested = False
# Function to run the detector simulation remote
......@@ -35,3 +42,74 @@ def local_simulation(sim_path, event, output_path):
output = result.stdout
errors = result.stderr
return output, errors, result.returncode
# Function to print terminal output and handle errors
def terminal_output(output, errors, return_code, msg):
if return_code > 0:
logger.error(f'return code: {return_code}')
if errors:
logger.error(errors)
if output:
logger.info(output)
else:
if output:
logger.info(output)
logger.info(msg)
def handle_sigint(signum, frame):
logger.info("Main process received KeyboardInterrupt")
global stop_requested
stop_requested = True
if args.hosts:
for h in args.hosts:
kill_target = f'{args.user}@{h}'
kill_cmd = f"ssh {kill_target} pkill -f {args.sim}"
re = subprocess.run(kill_cmd, shell=True)
ou = re.stdout
er = re.stderr
rc = re.returncode
terminal_msg = f'{kill_target} killed'
terminal_output(ou, er, rc, terminal_msg)
else:
logger.info('Sim killed')
# Function to run the detector simulation with the given event arguments
def run_sim(e_dict, d_path, sim_path, main_path):
with cf.ThreadPoolExecutor() as executor:
signal.signal(signal.SIGINT, handle_sigint)
for event in e_dict:
if stop_requested:
break
futures = []
folder_name = e_dict[event]['-o']
e_dict[event]['--startTime'] = f'@{int(time.time() + 10)}'
e_dict[event]['--posixDataDir'] = os.path.join(e_dict[event]['--posixDataDir'], event)
output_path = os.path.join(d_path, folder_name)
if args and args.hosts:
# Remote execution on multiple machines with specified user
for host in args.hosts:
ssh_target = f'{args.user}@{host}'
future = executor.submit(remote_simulation, sim_path, e_dict[event], output_path,
ssh_target, host)
futures.append((folder_name, host, future))
# Local execution
else:
future = executor.submit(local_simulation, sim_path, e_dict[event], output_path)
futures.append((folder_name, None, future))
for folder_name, host, future in futures:
output, errors, result = future.result()
terminal_msg = f'{folder_name}_{host if host else ""} done!'
if not stop_requested:
terminal_output(output, errors, result, terminal_msg)
if output:
if host:
stat_file_name = f'statistics_{folder_name}_{host}'
else:
stat_file_name = f'statistics_{folder_name}_lokal'
op.write_statistics(main_path, stat_file_name, output)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment