Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,34 @@ test-exp-ssh:

test-pipelines:
make test TEST_PACKAGES="./cmd/pipelines/..." ACCEPTANCE_TEST_FILTER="TestAccept/pipelines"


# Benchmarks:

bench1k:
BENCHMARK_PARAMS="--jobs 1000" go test ./acceptance -v -tail -run TestAccept/bundle/benchmarks -timeout=120m

bench100:
BENCHMARK_PARAMS="--jobs 100" go test ./acceptance -v -tail -run TestAccept/bundle/benchmarks -timeout=120m

# small benchmark to quickly test benchmark-related code
bench10:
BENCHMARK_PARAMS="--jobs 10" go test ./acceptance -v -tail -run TestAccept/bundle/benchmarks -timeout=120m

bench1k.log:
make bench1k | tee $@

bench100.log:
make bench100 | tee $@

bench10.log:
make bench10 | tee $@

bench1k_summary: bench1k.log
./tools/bench_parse.py $<

bench100_summary: bench100.log
./tools/bench_parse.py $<

bench10_summary: bench10.log
./tools/bench_parse.py $<
13 changes: 13 additions & 0 deletions acceptance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ Any file starting with "LOG" will be logged to test log (visible with go test -v

See [selftest](./selftest) for more examples.

## Benchmarks

Benchmarks are regular acceptance test that log measurements in certain format. The output can be fed to `tools/bench_parse.py` to print a summary table.

Test runner recognizes benchmark as having "benchmark" anywhere in the path. For these tests parallel execution is disabled if and only if BENCHMARK\_PARAMS variable is set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a separate top level benchmarks directory? That's better for discoverability since we can eyeball all benchmarks that exists. That also maintains a clear separation between tests and benchmarks.

Copy link
Contributor Author

@denik denik Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be harder to separate different projects inside CLI if we add another entry point. Today we have roughly this structure

  • cmd/project
  • acceptance/project

and we have tools that depend on it (testmask). Adding top level benchmarks directory requires making those tools aware.

cc @pietern

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note, this is something we can revisit later.


The benchmarks make use of two scripts:

- `gen_config.py —jobs N` to generate a config with N jobs
- `benchmark.py` command to run command a few times and log the time measurements.

The default number of runs in benchmark.py depends on BENCHMARK\_PARAMS variable. If it’s set, the default number is 5. Otherwise it is 1.

## Running acceptance tests on Windows

To run the acceptance tests from a terminal on Windows (eg. Git Bash from VS Code),
Expand Down
13 changes: 11 additions & 2 deletions acceptance/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ var InprocessMode bool
// lines with this prefix are not recorded in output.txt but logged instead
const TestLogPrefix = "TESTLOG: "

// In benchmark mode we disable parallel run of all tests that contain work "benchmark" in their path
var benchmarkMode = os.Getenv("BENCHMARK_PARAMS") != ""

func init() {
flag.BoolVar(&InprocessMode, "inprocess", false, "Run CLI in the same process as test (for debugging)")
flag.BoolVar(&KeepTmp, "keeptmp", false, "Do not delete TMP directory after run")
Expand Down Expand Up @@ -328,7 +331,13 @@ func testAccept(t *testing.T, inprocessMode bool, singleTest string) int {
t.Skip(skipReason)
}

if !inprocessMode {
runParallel := !inprocessMode

if benchmarkMode && strings.Contains(dir, "benchmark") {
runParallel = false
}

if runParallel {
t.Parallel()
}

Expand All @@ -344,7 +353,7 @@ func testAccept(t *testing.T, inprocessMode bool, singleTest string) int {
for ind, envset := range expanded {
envname := strings.Join(envset, "/")
t.Run(envname, func(t *testing.T) {
if !inprocessMode {
if runParallel {
t.Parallel()
}
runTest(t, dir, ind, coverDir, repls.Clone(), config, envset, envFilters)
Expand Down
119 changes: 119 additions & 0 deletions acceptance/bin/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#!/usr/bin/env python3
import argparse
import subprocess
import time
import statistics
import sys
import os
import json

try:
import resource
except ImportError:
# n/a on windows
resource = None


def run_benchmark(command, warmup, runs):
times = []

for i in range(runs):
# double fork to reset max statistics like ru_maxrss
cp = subprocess.run([sys.executable, sys.argv[0], "--once"] + command, stdout=subprocess.PIPE)
if cp.returncode != 0:
sys.exit(cp.returncode)

try:
result = json.loads(cp.stdout)
except Exception:
print(f"Failed to parse: {cp.stdout!r}")
raise

run = f"Run #{i} (warm): " if i < warmup else f"Run #{i} (count):"

result_formatted = " ".join(f"{key}={value}" for (key, value) in result.items())

print(f"TESTLOG: {run} {result_formatted}")

if i >= warmup:
times.append(result["wall"])

if not times:
print("No times recorded")
return

if len(times) > 1:
mean = statistics.mean(times)
stdev = statistics.stdev(times)
min_time = min(times)
max_time = max(times)

print(f"TESTLOG: Benchmark: {command}")
print(f"TESTLOG: Time (mean ± σ): {mean:.3f} s ± {stdev:.3f} s")
print(f"TESTLOG: Range (min … max): {min_time:.3f} s … {max_time:.3f} s {len(times)} runs", flush=True)


def run_once(command):
if len(command) == 1 and " " in command[0] or ">" in command[0]:
shell = True
command = command[0]
else:
shell = False

if resource:
rusage_before = resource.getrusage(resource.RUSAGE_CHILDREN)

with open("LOG.process", "a") as log:
start = time.perf_counter()
result = subprocess.run(command, shell=shell, stdout=log, stderr=log)
end = time.perf_counter()

if result.returncode != 0:
print(f"Error: command failed with exit code {result.returncode}", file=sys.stderr)
sys.exit(result.returncode)

result = {"wall": end - start}

if resource:
rusage_after = resource.getrusage(resource.RUSAGE_CHILDREN)

result.update(
{
"ru_utime": rusage_after.ru_utime - rusage_before.ru_utime,
"ru_stime": rusage_after.ru_stime - rusage_before.ru_stime,
# maxrss returns largest process, so subtracting is not correct since rusage_before will be reporting different process
"ru_maxrss": rusage_after.ru_maxrss,
}
)

return result


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--warmup", type=int, default=1)
parser.add_argument("--runs", type=int)
parser.add_argument("--once", action="store_true")
parser.add_argument("command", nargs="+")
args = parser.parse_args()

if args.once:
assert not args.runs
result = run_once(args.command)
print(json.dumps(result))
return

if args.runs is None:
if os.environ.get("BENCHMARK_PARAMS"):
args.runs = 5
else:
args.runs = 1

if args.warmup >= args.runs:
args.warmup = min(1, args.runs - 1)

run_benchmark(args.command, args.warmup, args.runs)


if __name__ == "__main__":
main()
147 changes: 147 additions & 0 deletions acceptance/bin/gen_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#!/usr/bin/env python3
import argparse
import json
import copy

JOB_TEMPLATE_BASE = {
"description": "This job contain multiple tasks that are required to produce the weekly shark sightings report.",
"email_notifications": {
"no_alert_for_skipped_runs": False,
"on_failure": ["user.name@databricks.com"],
"on_success": ["user.name@databricks.com"],
},
"job_clusters": [
{
"job_cluster_key": "auto_scaling_cluster",
"new_cluster": {
"autoscale": {"max_workers": 16, "min_workers": 2},
"node_type_id": "i3.xlarge",
"spark_conf": {"spark.speculation": "true"},
"spark_version": "13.3.x-scala2.12",
},
}
],
"max_concurrent_runs": 10,
"name": "A multitask job",
"notification_settings": {"no_alert_for_canceled_runs": False, "no_alert_for_skipped_runs": False},
"parameters": [{"default": "users", "name": "table"}],
"tags": {"cost-center": "engineering", "team": "jobs"},
"tasks": [
{
"depends_on": [],
"description": "Extracts session data from events",
"job_cluster_key": "auto_scaling_cluster",
"libraries": [{"jar": "dbfs:/mnt/databricks/Sessionize.jar"}],
"max_retries": 3,
"min_retry_interval_millis": 2000,
"retry_on_timeout": False,
"spark_jar_task": {
"main_class_name": "com.databricks.Sessionize",
"parameters": ["--data", "dbfs:/path/to/data.json"],
},
"task_key": "Sessionize",
"timeout_seconds": 86400,
},
{
"depends_on": [],
"description": "Ingests order data",
"job_cluster_key": "auto_scaling_cluster",
"libraries": [{"jar": "dbfs:/mnt/databricks/OrderIngest.jar"}],
"max_retries": 3,
"min_retry_interval_millis": 2000,
"retry_on_timeout": False,
"spark_jar_task": {
"main_class_name": "com.databricks.OrdersIngest",
"parameters": ["--data", "dbfs:/path/to/order-data.json"],
},
"task_key": "Orders_Ingest",
"timeout_seconds": 86400,
},
{
"depends_on": [{"task_key": "Orders_Ingest"}, {"task_key": "Sessionize"}],
"description": "Matches orders with user sessions",
"max_retries": 3,
"min_retry_interval_millis": 2000,
"new_cluster": {
"autoscale": {"max_workers": 16, "min_workers": 2},
"node_type_id": "i3.xlarge",
"spark_conf": {"spark.speculation": "true"},
"spark_version": "13.3.x-scala2.12",
},
"notebook_task": {
"base_parameters": {"age": "35", "name": "John Doe"},
"notebook_path": "/Users/user.name@databricks.com/Match",
},
"retry_on_timeout": False,
"run_if": "ALL_SUCCESS",
"task_key": "Match",
"timeout_seconds": 86400,
},
],
"timeout_seconds": 86400,
}


def gen_config(n):
jobs = {}
for i in range(n):
job = copy.deepcopy(JOB_TEMPLATE_BASE)
job["name"] = f"job_{i}"

# Odd jobs use continuous, even jobs use schedule
if i % 2 == 1:
job["continuous"] = {"pause_status": "UNPAUSED"}
else:
job["schedule"] = {
"pause_status": "UNPAUSED",
"quartz_cron_expression": "20 30 * * * ?",
"timezone_id": "Europe/London",
}

jobs[f"job_{i}"] = job

config = {"bundle": {"name": "test-bundle"}, "resources": {"jobs": jobs}}

return config


def print_yaml(obj, indent=0, list_item=False):
indent_str = " " * indent

if isinstance(obj, dict):
first = True
for key, value in obj.items():
if list_item and first:
prefix = indent_str + "- "
first = False
elif list_item:
prefix = indent_str + " "
else:
prefix = indent_str
nested_indent = indent + 2 if list_item else indent + 1
if isinstance(value, (dict, list)) and value:
print(f"{prefix}{key}:")
print_yaml(value, nested_indent)
else:
print(f"{prefix}{key}: {json.dumps(value)}")
elif isinstance(obj, list):
for item in obj:
if isinstance(item, (dict, list)):
print_yaml(item, indent, list_item=True)
else:
print(f"{indent_str}- {json.dumps(item)}")
else:
prefix = f"{indent_str}- " if list_item else indent_str
print(f"{prefix}{json.dumps(obj)}")


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--jobs", type=int, default=10, help="Number of jobs to generate")
args = parser.parse_args()

print_yaml(gen_config(args.jobs))


if __name__ == "__main__":
main()
8 changes: 8 additions & 0 deletions acceptance/bundle/benchmarks/deploy/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file.
6 changes: 6 additions & 0 deletions acceptance/bundle/benchmarks/deploy/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
gen_config.py ${BENCHMARK_PARAMS:-} > databricks.yml
wc -l databricks.yml >> LOG.wc
# Note, since testserver persists state for the duration of the test, .databricks is kept and benchmark.py skips first run as a warmup, this measures time
# it takes for no-changes deploy.
# Note, terraform is set up by the test runner, so this time does not include TF download time.
benchmark.py $CLI bundle deploy
8 changes: 8 additions & 0 deletions acceptance/bundle/benchmarks/plan/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions acceptance/bundle/benchmarks/plan/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

>>> benchmark.py $CLI bundle plan > /dev/null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also log / commit the benchmarks numbers? For local vs cloud? Useful numbers to eyeball and keep track of.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I logged them as a comment above. Absolute numbers are not useful due to different machines. I expect when someone improves the benchmarks they will post before/after numbers.

3 changes: 3 additions & 0 deletions acceptance/bundle/benchmarks/plan/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
gen_config.py ${BENCHMARK_PARAMS:-} > databricks.yml
wc -l databricks.yml > LOG.wc
trace benchmark.py '$CLI bundle plan > /dev/null'
5 changes: 5 additions & 0 deletions acceptance/bundle/benchmarks/test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Timeout = '4h'
Ignore = ["databricks.yml"]

# Disabled because it fails on CI. We don't need this to work on Windows.
GOOS.windows = false
Loading