Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions avaframe/ana3AIMEC/aimecTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ def computeRunOut(
resAnalysisDF.loc[simRowHash, "zRelease"] = zThalweg[cUpper]
resAnalysisDF.loc[simRowHash, "elevRel"] = zThalweg[cUpper]
resAnalysisDF.loc[simRowHash, "deltaZ"] = zThalweg[cUpper] - zThalweg[cLower]
resAnalysisDF.loc[simRowHash, "runoutFound"] = runoutFound
resAnalysisDF.loc[simRowHash, "runoutFound"] = str(runoutFound)

return resAnalysisDF

Expand Down Expand Up @@ -1520,11 +1520,11 @@ def analyzeArea(
if (
simRowHash != refSimRowHash
and cfgPlots.getboolean("extraPlots")
and resAnalysisDF.loc[simRowHash, "runoutFound"]
and resAnalysisDF.loc[simRowHash, "runoutFound"].lower() == "true"
):
# only plot comparisons of simulations to reference
compPlotPath = outAimec.visuComparison(rasterTransfo, inputs, pathDict)
elif not resAnalysisDF.loc[simRowHash, "runoutFound"] and cfgPlots.getboolean("extraPlots"):
elif not resAnalysisDF.loc[simRowHash, "runoutFound"].lower() == "true" and cfgPlots.getboolean("extraPlots"):
log.warning(
"ContourComparisonToReference plot not generated as only 0 values for comparison simulation: %s"
% resAnalysisDF.loc[simRowHash, "simName"]
Expand Down
5 changes: 4 additions & 1 deletion avaframe/out3Plot/statsPlots.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ def plotHistCDFDiff(dataDiffPlot, ax1, ax2, insert='True', title=['', '']):
width = diffMax - diffMin
stepsInterval = int(pU.cfg['steps2Centile2'])
stepWidth = 2*sortedDiffPlot[ind]/stepsInterval # stepsInterval bins in the [-2sigma,+2sigma] interval
bins = int(width/stepWidth)
if stepWidth > 0 and width > 0:
bins = int(width/stepWidth)
else:
bins = 1

# reduce bins to a sensible size
if bins > 1000:
Expand Down
286 changes: 144 additions & 142 deletions avaframe/runStandardTestsCom1DFA.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
"""

# Load modules
import os
import time
import pathlib
import logging
import logging.handlers
import multiprocessing
import shutil
import tempfile
from concurrent.futures import ProcessPoolExecutor, as_completed

# Local imports
Expand Down Expand Up @@ -87,9 +89,7 @@ def runSingleTest(
avaName = avaDirOriginal.name

# Create unique temp directory for this test to avoid conflicts when multiple tests share same avaDir
import os

testTempDir = tmpTestsDir / f"{avaName}_{test['NAME']}_{os.getpid()}"
testTempDir = tmpTestsDir / f"{test['NAME']}_{os.getpid()}"
testTempDir.mkdir(parents=True, exist_ok=True)

# Copy only Inputs directory to temp location (Outputs and Work will be created fresh)
Expand Down Expand Up @@ -231,144 +231,146 @@ def runSingleTest(
return (test["NAME"], reportD, benchDict, avaName, cfgRep)


# +++++++++REQUIRED+++++++++++++
# Which result types for comparison plots
outputVariable = ["ppr", "pft", "pfv"]
# aimec settings that are not used from default aimecCfg or aimecCfg in benchmark folders
aimecDiffLim = "5"
aimecContourLevels = "1|3|5|10"
aimecFlagMass = "False"
aimecComModules = "benchmarkReference|com1DFA"
# ++++++++++++++++++++++++++++++

# log file name; leave empty to use default runLog.log
logName = "runStandardTestsCom1DFA"

# Load settings from general configuration file
cfgMain = cfgUtils.getGeneralConfig()

# load all benchmark info as dictionaries from description files
testDictList = tU.readAllBenchmarkDesDicts(info=False)

# filter benchmarks for tag standardTest
# filterType = "TAGS"
# valuesList = ["resistance"]
filterType = "TAGS"
valuesList = ["standardTest", "standardTestSnowGlide"]
# filterType = "NAME"
# valuesList = ["avaInclinedPlaneEntresTest"]

testList = tU.filterBenchmarks(testDictList, filterType, valuesList, condition="or")

# Clean temporary test directory used for parallel execution
tmpTestsDir = pathlib.Path(__file__).parent / "data" / "tmpStdTests"
if tmpTestsDir.exists():
shutil.rmtree(tmpTestsDir)
tmpTestsDir.mkdir(parents=True, exist_ok=True)

# Set directory for full standard test report
outDir = pathlib.Path.cwd() / "tests" / "reportsCom1DFA"
fU.makeADir(outDir)

# Start writing markdown style report for standard tests
reportFile = outDir / "standardTestsReportCom1DFA.md"
with open(reportFile, "w") as pfile:

# Write header
pfile.write("# Standard Tests Report \n")
pfile.write("## Compare com1DFA simulations to benchmark results \n")

log = logUtils.initiateLogger(outDir, logName)
log.info("The following benchmark tests will be fetched ")
for test in testList:
log.info("%s" % test["NAME"])

# Set up queue-based logging for parallel execution
logQueue = multiprocessing.Manager().Queue()
queueListener = logging.handlers.QueueListener(logQueue, *log.handlers)
queueListener.start()

# Convert cfgMain to dictionary for pickling
cfgMainDict = {section: dict(cfgMain[section]) for section in cfgMain.sections()}

# Get number of CPU cores for parallel execution
nCPU = cfgUtils.getNumberOfProcesses(cfgMain, len(testList))

# Run tests in parallel using ProcessPoolExecutor
results = []
failedTests = []
with ProcessPoolExecutor(max_workers=nCPU) as executor:
# Submit all tests
futures = {}
if __name__ == "__main__":
# +++++++++REQUIRED+++++++++++++
# Which result types for comparison plots
outputVariable = ["ppr", "pft", "pfv"]
# aimec settings that are not used from default aimecCfg or aimecCfg in benchmark folders
aimecDiffLim = "5"
aimecContourLevels = "1|3|5|10"
aimecFlagMass = "False"
aimecComModules = "benchmarkReference|com1DFA"
# ++++++++++++++++++++++++++++++

# log file name; leave empty to use default runLog.log
logName = "runStandardTestsCom1DFA"

# Load settings from general configuration file
cfgMain = cfgUtils.getGeneralConfig()

# load all benchmark info as dictionaries from description files
testDictList = tU.readAllBenchmarkDesDicts(info=False)

# filter benchmarks for tag standardTest
# filterType = "TAGS"
# valuesList = ["resistance"]
filterType = "TAGS"
valuesList = ["standardTest", "standardTestSnowGlide"]
#filterType = "NAME"
#valuesList = ["avaFlatPlaneNullTest"]

testList = tU.filterBenchmarks(testDictList, filterType, valuesList, condition="or")

# Create system temporary directory for parallel test execution (auto-cleaned on exit)
tmpTestsDirObj = tempfile.TemporaryDirectory(prefix="avaframe_stdTests_")
tmpTestsDir = pathlib.Path(tmpTestsDirObj.name)

# Set directory for full standard test report
outDir = pathlib.Path.cwd() / "tests" / "reportsCom1DFA"
fU.makeADir(outDir)

# Start writing markdown style report for standard tests
reportFile = outDir / "standardTestsReportCom1DFA.md"
with open(reportFile, "w") as pfile:

# Write header
pfile.write("# Standard Tests Report \n")
pfile.write("## Compare com1DFA simulations to benchmark results \n")

log = logUtils.initiateLogger(outDir, logName)
log.info("The following benchmark tests will be fetched ")
for test in testList:
future = executor.submit(
runSingleTest,
test,
cfgMainDict,
outputVariable,
aimecDiffLim,
aimecContourLevels,
aimecFlagMass,
aimecComModules,
outDir,
tmpTestsDir,
logQueue,
)
futures[future] = test["NAME"]

# Collect results as they complete
for future in as_completed(futures):
testName = futures[future]
try:
result = future.result()
results.append(result)
log.info("Completed test: %s" % testName)
except Exception as exc:
log.error("Test %s generated an exception: %s" % (testName, exc))
failedTests.append({"name": testName, "error": str(exc)})

# Stop the queue listener
queueListener.stop()

# Write reports sequentially in completion order
for testName, reportD, benchDict, avaName, cfgRep in results:
generateCompareReport.writeCompareReport(reportFile, reportD, benchDict, avaName, cfgRep)

# Collect CPU time data from results
cpuTimeName = []
cpuTimeBench = []
cpuTimeSim = []
for testName, reportD, benchDict, avaName, cfgRep in results:
cpuTimeName.append(testName)
cpuTimeBench.append(benchDict["Simulation Parameters"]["Computation time [s]"])
cpuTimeSim.append(reportD["Simulation Parameters"]["Computation time [s]"])

# Display CPU time in log if we have any successful results
if results:
# Get version info from first successful test
_, reportD, benchDict, _, _ = results[0]
log.info(
"CPU performance comparison between benchmark results (version : %s) and curent branch (version : %s)"
% (
benchDict["Simulation Parameters"]["Program version"],
reportD["Simulation Parameters"]["Program version"],
log.info("%s" % test["NAME"])

# Set up queue-based logging for parallel execution
logQueue = multiprocessing.Manager().Queue()
queueListener = logging.handlers.QueueListener(logQueue, *log.handlers)
queueListener.start()

# Convert cfgMain to dictionary for pickling
cfgMainDict = {section: dict(cfgMain[section]) for section in cfgMain.sections()}

# Get number of CPU cores for parallel execution
nCPU = cfgUtils.getNumberOfProcesses(cfgMain, len(testList))

# Run tests in parallel using ProcessPoolExecutor
results = []
failedTests = []
with ProcessPoolExecutor(max_workers=nCPU) as executor:
# Submit all tests
futures = {}
for test in testList:
future = executor.submit(
runSingleTest,
test,
cfgMainDict,
outputVariable,
aimecDiffLim,
aimecContourLevels,
aimecFlagMass,
aimecComModules,
outDir,
tmpTestsDir,
logQueue,
)
futures[future] = test["NAME"]

# Collect results as they complete
for future in as_completed(futures):
testName = futures[future]
try:
result = future.result()
results.append(result)
log.info("Completed test: %s" % testName)
except Exception as exc:
log.error("Test %s generated an exception: %s" % (testName, exc))
failedTests.append({"name": testName, "error": str(exc)})

# Stop the queue listener
queueListener.stop()

# Write reports sequentially in completion order
for testName, reportD, benchDict, avaName, cfgRep in results:
generateCompareReport.writeCompareReport(reportFile, reportD, benchDict, avaName, cfgRep)

# Collect CPU time data from results
cpuTimeName = []
cpuTimeBench = []
cpuTimeSim = []
for testName, reportD, benchDict, avaName, cfgRep in results:
cpuTimeName.append(testName)
cpuTimeBench.append(benchDict["Simulation Parameters"]["Computation time [s]"])
cpuTimeSim.append(reportD["Simulation Parameters"]["Computation time [s]"])

# Display CPU time in log if we have any successful results
if results:
# Get version info from first successful test
_, reportD, benchDict, _, _ = results[0]
log.info(
"CPU performance comparison between benchmark results (version : %s) and curent branch (version : %s)"
% (
benchDict["Simulation Parameters"]["Program version"],
reportD["Simulation Parameters"]["Program version"],
)
)
)
log.info(("{:<30}" * 3).format("Test Name", "cpu time Benchmark [s]", "cpu time curent version [s]"))
for name, cpuBench, cpuSim in zip(cpuTimeName, cpuTimeBench, cpuTimeSim):
log.info(("{:<30s}" * 3).format(name, cpuBench, cpuSim))

# Display summary of test results
log.info("=" * 80)
log.info("TEST SUMMARY")
log.info("=" * 80)
log.info("Total tests: %d" % len(testList))
log.info("Successful: %d" % len(results))
log.info("Failed: %d" % len(failedTests))

if failedTests:
log.info("Failed tests:")
for failure in failedTests:
log.info(" - %s: %s" % (failure["name"], failure["error"]))
else:
log.info("All tests completed successfully!")
log.info(("{:<30}" * 3).format("Test Name", "cpu time Benchmark [s]", "cpu time curent version [s]"))
for name, cpuBench, cpuSim in zip(cpuTimeName, cpuTimeBench, cpuTimeSim):
log.info(("{:<30s}" * 3).format(name, cpuBench, cpuSim))

# Display summary of test results
log.info("=" * 80)
log.info("TEST SUMMARY")
log.info("=" * 80)
log.info("Total tests: %d" % len(testList))
log.info("Successful: %d" % len(results))
log.info("Failed: %d" % len(failedTests))

if failedTests:
log.info("Failed tests:")
for failure in failedTests:
log.info(" - %s: %s" % (failure["name"], failure["error"]))
else:
log.info("All tests completed successfully!")

# Clean up temporary test directory
tmpTestsDirObj.cleanup()
Loading
Loading