-
Notifications
You must be signed in to change notification settings - Fork 125
Add benchmarks for DABs #4194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add benchmarks for DABs #4194
Changes from all commits
ccdd85c
4f5b52e
cfb78ed
9a2b6ee
146344c
056b6c1
bcd6c7c
f485170
252cf1a
1ec802f
61f6733
415cf3c
348c810
a59fa0d
2682b4d
0a604d6
290713e
f4995ea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| #!/usr/bin/env python3 | ||
| import argparse | ||
| import subprocess | ||
| import time | ||
| import statistics | ||
| import sys | ||
| import os | ||
| import json | ||
|
|
||
| try: | ||
| import resource | ||
| except ImportError: | ||
| # n/a on windows | ||
| resource = None | ||
|
|
||
|
|
||
| def run_benchmark(command, warmup, runs): | ||
| times = [] | ||
|
|
||
| for i in range(runs): | ||
| # double fork to reset max statistics like ru_maxrss | ||
| cp = subprocess.run([sys.executable, sys.argv[0], "--once"] + command, stdout=subprocess.PIPE) | ||
| if cp.returncode != 0: | ||
| sys.exit(cp.returncode) | ||
|
|
||
| try: | ||
| result = json.loads(cp.stdout) | ||
| except Exception: | ||
| print(f"Failed to parse: {cp.stdout!r}") | ||
| raise | ||
|
|
||
| run = f"Run #{i} (warm): " if i < warmup else f"Run #{i} (count):" | ||
|
|
||
| result_formatted = " ".join(f"{key}={value}" for (key, value) in result.items()) | ||
|
|
||
| print(f"TESTLOG: {run} {result_formatted}") | ||
|
|
||
| if i >= warmup: | ||
| times.append(result["wall"]) | ||
|
|
||
| if not times: | ||
| print("No times recorded") | ||
| return | ||
|
|
||
| if len(times) > 1: | ||
| mean = statistics.mean(times) | ||
| stdev = statistics.stdev(times) | ||
| min_time = min(times) | ||
| max_time = max(times) | ||
|
|
||
| print(f"TESTLOG: Benchmark: {command}") | ||
| print(f"TESTLOG: Time (mean ± σ): {mean:.3f} s ± {stdev:.3f} s") | ||
| print(f"TESTLOG: Range (min … max): {min_time:.3f} s … {max_time:.3f} s {len(times)} runs", flush=True) | ||
|
|
||
|
|
||
| def run_once(command): | ||
| if len(command) == 1 and " " in command[0] or ">" in command[0]: | ||
| shell = True | ||
| command = command[0] | ||
| else: | ||
| shell = False | ||
|
|
||
| if resource: | ||
| rusage_before = resource.getrusage(resource.RUSAGE_CHILDREN) | ||
|
|
||
| with open("LOG.process", "a") as log: | ||
| start = time.perf_counter() | ||
| result = subprocess.run(command, shell=shell, stdout=log, stderr=log) | ||
| end = time.perf_counter() | ||
|
|
||
| if result.returncode != 0: | ||
| print(f"Error: command failed with exit code {result.returncode}", file=sys.stderr) | ||
| sys.exit(result.returncode) | ||
|
|
||
| result = {"wall": end - start} | ||
|
|
||
| if resource: | ||
| rusage_after = resource.getrusage(resource.RUSAGE_CHILDREN) | ||
|
|
||
| result.update( | ||
| { | ||
| "ru_utime": rusage_after.ru_utime - rusage_before.ru_utime, | ||
| "ru_stime": rusage_after.ru_stime - rusage_before.ru_stime, | ||
| # maxrss returns largest process, so subtracting is not correct since rusage_before will be reporting different process | ||
| "ru_maxrss": rusage_after.ru_maxrss, | ||
| } | ||
| ) | ||
|
|
||
| return result | ||
|
|
||
|
|
||
| def main(): | ||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument("--warmup", type=int, default=1) | ||
| parser.add_argument("--runs", type=int) | ||
| parser.add_argument("--once", action="store_true") | ||
| parser.add_argument("command", nargs="+") | ||
| args = parser.parse_args() | ||
|
|
||
| if args.once: | ||
| assert not args.runs | ||
| result = run_once(args.command) | ||
| print(json.dumps(result)) | ||
| return | ||
|
|
||
| if args.runs is None: | ||
| if os.environ.get("BENCHMARK_PARAMS"): | ||
| args.runs = 5 | ||
| else: | ||
| args.runs = 1 | ||
|
|
||
| if args.warmup >= args.runs: | ||
| args.warmup = min(1, args.runs - 1) | ||
|
|
||
| run_benchmark(args.command, args.warmup, args.runs) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| #!/usr/bin/env python3 | ||
| import argparse | ||
| import json | ||
| import copy | ||
|
|
||
| JOB_TEMPLATE_BASE = { | ||
| "description": "This job contain multiple tasks that are required to produce the weekly shark sightings report.", | ||
| "email_notifications": { | ||
| "no_alert_for_skipped_runs": False, | ||
| "on_failure": ["user.name@databricks.com"], | ||
| "on_success": ["user.name@databricks.com"], | ||
| }, | ||
| "job_clusters": [ | ||
| { | ||
| "job_cluster_key": "auto_scaling_cluster", | ||
| "new_cluster": { | ||
| "autoscale": {"max_workers": 16, "min_workers": 2}, | ||
| "node_type_id": "i3.xlarge", | ||
| "spark_conf": {"spark.speculation": "true"}, | ||
| "spark_version": "13.3.x-scala2.12", | ||
| }, | ||
| } | ||
| ], | ||
| "max_concurrent_runs": 10, | ||
| "name": "A multitask job", | ||
| "notification_settings": {"no_alert_for_canceled_runs": False, "no_alert_for_skipped_runs": False}, | ||
| "parameters": [{"default": "users", "name": "table"}], | ||
| "tags": {"cost-center": "engineering", "team": "jobs"}, | ||
| "tasks": [ | ||
| { | ||
| "depends_on": [], | ||
| "description": "Extracts session data from events", | ||
| "job_cluster_key": "auto_scaling_cluster", | ||
| "libraries": [{"jar": "dbfs:/mnt/databricks/Sessionize.jar"}], | ||
| "max_retries": 3, | ||
| "min_retry_interval_millis": 2000, | ||
| "retry_on_timeout": False, | ||
| "spark_jar_task": { | ||
| "main_class_name": "com.databricks.Sessionize", | ||
| "parameters": ["--data", "dbfs:/path/to/data.json"], | ||
| }, | ||
| "task_key": "Sessionize", | ||
| "timeout_seconds": 86400, | ||
| }, | ||
| { | ||
| "depends_on": [], | ||
| "description": "Ingests order data", | ||
| "job_cluster_key": "auto_scaling_cluster", | ||
| "libraries": [{"jar": "dbfs:/mnt/databricks/OrderIngest.jar"}], | ||
| "max_retries": 3, | ||
| "min_retry_interval_millis": 2000, | ||
| "retry_on_timeout": False, | ||
| "spark_jar_task": { | ||
| "main_class_name": "com.databricks.OrdersIngest", | ||
| "parameters": ["--data", "dbfs:/path/to/order-data.json"], | ||
| }, | ||
| "task_key": "Orders_Ingest", | ||
| "timeout_seconds": 86400, | ||
| }, | ||
| { | ||
| "depends_on": [{"task_key": "Orders_Ingest"}, {"task_key": "Sessionize"}], | ||
| "description": "Matches orders with user sessions", | ||
| "max_retries": 3, | ||
| "min_retry_interval_millis": 2000, | ||
| "new_cluster": { | ||
| "autoscale": {"max_workers": 16, "min_workers": 2}, | ||
| "node_type_id": "i3.xlarge", | ||
| "spark_conf": {"spark.speculation": "true"}, | ||
| "spark_version": "13.3.x-scala2.12", | ||
| }, | ||
| "notebook_task": { | ||
| "base_parameters": {"age": "35", "name": "John Doe"}, | ||
| "notebook_path": "/Users/user.name@databricks.com/Match", | ||
| }, | ||
| "retry_on_timeout": False, | ||
| "run_if": "ALL_SUCCESS", | ||
| "task_key": "Match", | ||
| "timeout_seconds": 86400, | ||
| }, | ||
| ], | ||
| "timeout_seconds": 86400, | ||
| } | ||
|
|
||
|
|
||
| def gen_config(n): | ||
| jobs = {} | ||
| for i in range(n): | ||
| job = copy.deepcopy(JOB_TEMPLATE_BASE) | ||
| job["name"] = f"job_{i}" | ||
|
|
||
| # Odd jobs use continuous, even jobs use schedule | ||
| if i % 2 == 1: | ||
| job["continuous"] = {"pause_status": "UNPAUSED"} | ||
| else: | ||
| job["schedule"] = { | ||
| "pause_status": "UNPAUSED", | ||
| "quartz_cron_expression": "20 30 * * * ?", | ||
| "timezone_id": "Europe/London", | ||
| } | ||
|
|
||
| jobs[f"job_{i}"] = job | ||
|
|
||
| config = {"bundle": {"name": "test-bundle"}, "resources": {"jobs": jobs}} | ||
|
|
||
| return config | ||
|
|
||
|
|
||
| def print_yaml(obj, indent=0, list_item=False): | ||
| indent_str = " " * indent | ||
|
|
||
| if isinstance(obj, dict): | ||
| first = True | ||
| for key, value in obj.items(): | ||
| if list_item and first: | ||
| prefix = indent_str + "- " | ||
| first = False | ||
| elif list_item: | ||
| prefix = indent_str + " " | ||
| else: | ||
| prefix = indent_str | ||
| nested_indent = indent + 2 if list_item else indent + 1 | ||
| if isinstance(value, (dict, list)) and value: | ||
| print(f"{prefix}{key}:") | ||
| print_yaml(value, nested_indent) | ||
| else: | ||
| print(f"{prefix}{key}: {json.dumps(value)}") | ||
| elif isinstance(obj, list): | ||
| for item in obj: | ||
| if isinstance(item, (dict, list)): | ||
| print_yaml(item, indent, list_item=True) | ||
| else: | ||
| print(f"{indent_str}- {json.dumps(item)}") | ||
| else: | ||
| prefix = f"{indent_str}- " if list_item else indent_str | ||
| print(f"{prefix}{json.dumps(obj)}") | ||
|
|
||
|
|
||
| def main(): | ||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument("--jobs", type=int, default=10, help="Number of jobs to generate") | ||
| args = parser.parse_args() | ||
|
|
||
| print_yaml(gen_config(args.jobs)) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| gen_config.py ${BENCHMARK_PARAMS:-} > databricks.yml | ||
| wc -l databricks.yml >> LOG.wc | ||
| # Note, since testserver persists state for the duration of the test, .databricks is kept and benchmark.py skips first run as a warmup, this measures time | ||
| # it takes for no-changes deploy. | ||
| # Note, terraform is set up by the test runner, so this time does not include TF download time. | ||
| benchmark.py $CLI bundle deploy |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
|
|
||
| >>> benchmark.py $CLI bundle plan > /dev/null | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we also log / commit the benchmarks numbers? For local vs cloud? Useful numbers to eyeball and keep track of.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I logged them as a comment above. Absolute numbers are not useful due to different machines. I expect when someone improves the benchmarks they will post before/after numbers. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| gen_config.py ${BENCHMARK_PARAMS:-} > databricks.yml | ||
| wc -l databricks.yml > LOG.wc | ||
| trace benchmark.py '$CLI bundle plan > /dev/null' |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| Timeout = '4h' | ||
| Ignore = ["databricks.yml"] | ||
|
|
||
| # Disabled because it fails on CI. We don't need this to work on Windows. | ||
| GOOS.windows = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a separate top level benchmarks directory? That's better for discoverability since we can eyeball all benchmarks that exists. That also maintains a clear separation between tests and benchmarks.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be harder to separate different projects inside CLI if we add another entry point. Today we have roughly this structure
and we have tools that depend on it (testmask). Adding top level benchmarks directory requires making those tools aware.
cc @pietern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note, this is something we can revisit later.