-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoperator-test-runner.py
More file actions
422 lines (345 loc) · 15.2 KB
/
operator-test-runner.py
File metadata and controls
422 lines (345 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
"""
Main module of the Operator Test Runner application
"""
import os
import sys
import uuid
from datetime import UTC, datetime, timedelta
from time import sleep
from jinja2 import Template
import modules.catalog as catalog
from modules.cluster import create_cluster, terminate_cluster
from modules.cluster_logging import install_cluster_logging
from modules.command import run_command
# values of the params (given as env vars during Docker run)
param_output_file_user = "0:0"
param_platform = None
param_platform_version = None
param_operator = None
param_git_branch = None
param_test_script_params = ""
param_cluster_logging_endpoint = None
param_cluster_logging_username = None
param_cluster_logging_password = None
param_opensearch_dashboards_url = None
# keys for the env vars
PARAM_KEY_PLATFORM = "PLATFORM"
PARAM_KEY_PLATFORM_VERSION = "PLATFORM_VERSION"
PARAM_KEY_OPERATOR = "OPERATOR"
PARAM_KEY_OUTPUT_FILE_USER = "OUTPUT_FILE_USER"
PARAM_KEY_REPLICATED_API_TOKEN = "REPLICATED_API_TOKEN"
PARAM_KEY_IONOS_USERNAME = "IONOS_USERNAME"
PARAM_KEY_IONOS_PASSWORD = "IONOS_PASSWORD"
PARAM_KEY_GIT_BRANCH = "GIT_BRANCH"
PARAM_KEY_OPERATOR_VERSION = "OPERATOR_VERSION"
PARAM_KEY_TEST_SCRIPT_PARAMS = "TEST_SCRIPT_PARAMS"
PARAM_KEY_CLUSTER_LOGGING_ENDPOINT = "CLUSTER_LOGGING_ENDPOINT"
PARAM_KEY_CLUSTER_LOGGING_USERNAME = "CLUSTER_LOGGING_USERNAME"
PARAM_KEY_CLUSTER_LOGGING_PASSWORD = "CLUSTER_LOGGING_PASSWORD"
PARAM_KEY_OPENSEARCH_DASHBOARDS_URL = "OPENSEARCH_DASHBOARDS_URL"
# by convention, this is the return code for "unstable cluster"
EXIT_CODE_CLUSTER_FAILED = 255
# constants for the file handling
TARGET_FOLDER = "/target/"
TESTDRIVER_LOGFILE = f"{TARGET_FOLDER}testdriver.log"
TEST_OUTPUT_LOGFILE = f"{TARGET_FOLDER}test-output.log"
CLUSTER_INFO_FILE = f"{TARGET_FOLDER}cluster-info.txt"
LOG_INDEX_LINKS_FILE = f"{TARGET_FOLDER}logs.html"
def init():
"""
Initializes this app, checks if all params are provided as environment variables.
Returns True if initialization succeeded, False otherwise.
"""
global param_platform
global param_platform_version
global param_operator
global param_output_file_user
global param_operator_version
global param_git_branch
global param_test_script_params
global param_cluster_logging_endpoint
global param_cluster_logging_username
global param_cluster_logging_password
global param_opensearch_dashboards_url
if PARAM_KEY_REPLICATED_API_TOKEN not in os.environ:
print(f"Error: Please supply {PARAM_KEY_REPLICATED_API_TOKEN} as an environment variable.")
return False
if PARAM_KEY_IONOS_USERNAME not in os.environ:
print(f"Error: Please supply {PARAM_KEY_IONOS_USERNAME} as an environment variable.")
return False
if PARAM_KEY_IONOS_PASSWORD not in os.environ:
print(f"Error: Please supply {PARAM_KEY_IONOS_PASSWORD} as an environment variable.")
return False
if PARAM_KEY_PLATFORM not in os.environ:
print(f"Error: Please supply {PARAM_KEY_PLATFORM} as an environment variable.")
return False
if PARAM_KEY_PLATFORM_VERSION not in os.environ:
print(f"Error: Please supply {PARAM_KEY_PLATFORM_VERSION} as an environment variable.")
return False
if PARAM_KEY_OPERATOR not in os.environ:
print(f"Error: Please supply {PARAM_KEY_OPERATOR} as an environment variable.")
return False
if PARAM_KEY_OPERATOR_VERSION not in os.environ:
print(f"Error: Please supply {PARAM_KEY_OPERATOR_VERSION} as an environment variable.")
return False
if PARAM_KEY_CLUSTER_LOGGING_ENDPOINT not in os.environ:
print(
f"Error: Please supply {PARAM_KEY_CLUSTER_LOGGING_ENDPOINT} as an environment variable."
)
return False
if PARAM_KEY_CLUSTER_LOGGING_USERNAME not in os.environ:
print(
f"Error: Please supply {PARAM_KEY_CLUSTER_LOGGING_USERNAME} as an environment variable."
)
return False
if PARAM_KEY_CLUSTER_LOGGING_PASSWORD not in os.environ:
print(
f"Error: Please supply {PARAM_KEY_CLUSTER_LOGGING_PASSWORD} as an environment variable."
)
return False
param_platform = os.environ[PARAM_KEY_PLATFORM].strip()
param_platform_version = os.environ[PARAM_KEY_PLATFORM_VERSION].strip()
param_operator = os.environ[PARAM_KEY_OPERATOR].strip()
param_operator_version = os.environ[PARAM_KEY_OPERATOR_VERSION].strip()
param_cluster_logging_endpoint = os.environ[PARAM_KEY_CLUSTER_LOGGING_ENDPOINT].strip()
param_cluster_logging_username = os.environ[PARAM_KEY_CLUSTER_LOGGING_USERNAME].strip()
param_cluster_logging_password = os.environ[PARAM_KEY_CLUSTER_LOGGING_PASSWORD].strip()
if not os.path.isdir(TARGET_FOLDER):
print(f"Error: A target folder volume has to be supplied as mount on {TARGET_FOLDER}. ")
return False
if PARAM_KEY_OUTPUT_FILE_USER in os.environ:
param_output_file_user = os.environ[PARAM_KEY_OUTPUT_FILE_USER].strip()
if PARAM_KEY_GIT_BRANCH in os.environ:
param_git_branch = os.environ[PARAM_KEY_GIT_BRANCH].strip()
if PARAM_KEY_TEST_SCRIPT_PARAMS in os.environ:
param_test_script_params = os.environ[PARAM_KEY_TEST_SCRIPT_PARAMS].strip()
if PARAM_KEY_OPENSEARCH_DASHBOARDS_URL in os.environ:
param_opensearch_dashboards_url = os.environ[PARAM_KEY_OPENSEARCH_DASHBOARDS_URL].strip()
return True
def set_target_folder_owner():
"""
As the Docker container is run with the root user (0:0), the files it produces will not be manageable by the Jenkins user.
That's why a UID/GID combo is to be specified as the OUTPUT_FILE_USER env var.
This method recursively sets the ownership of the output files.
"""
os.system(f"chown -R {param_output_file_user} {TARGET_FOLDER}")
def log(msg=""):
"""
Logs the given text message to stdout AND the logfile.
"""
timestamp = f"{datetime.now(UTC):%Y-%m-%d %H:%M:%S}"
print(f"{timestamp} :: {msg}")
sys.stdout.flush()
with open(TESTDRIVER_LOGFILE, "a") as f:
f.write(f"{timestamp} :: {msg}\n")
def clone_git_repo(repo):
"""
Clones the given Stackable GitHub repo
"""
git_branch_option = f"-b {param_git_branch}" if param_git_branch else ""
exit_code, output = run_command(
f"git clone {git_branch_option} https://github.com/stackabletech/{repo}.git", "git clone"
)
if exit_code != 0:
for line in output:
log(line)
return False
return True
def run_tests(operator, operator_version, test_script_params):
"""
Runs the tests using the test script in the operator repo.
The test script can be either 'run-tests' (default) or 'auto-retry-tests.py'
based on the operator's catalog configuration.
operator: name of the operator-repo (usually with suffix '-operator')
operator_version: Version of the operator to be tested
test_script_params: additional params
"""
# Get test script configuration - check for runtime override first
test_script_override = os.environ.get("TEST_SCRIPT", "").strip()
if test_script_override == "Auto-retry":
test_script = "auto-retry-tests.py"
log(f"Using test script: {test_script} (user override)")
elif test_script_override == "run-tests":
test_script = "run-tests"
log(f"Using test script: {test_script} (user override)")
else:
# Default or empty - use catalog configuration
test_script = catalog.get_test_script(operator)
log(f"Using test script: {test_script} (from catalog)")
# Step 1: Installation of the SDP (retried max. 10 times to reduce flakiness)
# This step is always done with run-tests regardless of the test script choice
command_install_sdp = f"cd {operator}/ && python ./scripts/run-tests --skip-tests --operator {operator.replace('-operator', '')}={operator_version}"
log("Running the following command to install SDP for test:")
log(command_install_sdp)
exit_code, output = run_command(command_install_sdp, "install sdp", retries=10, delay=60)
if exit_code != 0:
for line in output:
log(line)
return exit_code
# Step 2: Run the actual tests
# (The aux. method run_command() is NOT used here because we want the output to be streamed, not captured!)
if test_script == "auto-retry-tests.py":
# Use auto-retry test runner
retry_config = catalog.get_auto_retry_config(operator)
log(
f"Auto-retry configuration: attempts_parallel={retry_config['attempts_parallel']}, "
f"attempts_serial={retry_config['attempts_serial']}, "
f"delete_failed_namespaces={retry_config['delete_failed_namespaces']}"
)
# Extract parallel value from test_script_params (default to 0)
parallel_value = "0"
if "--parallel" in test_script_params:
try:
parts = test_script_params.split("--parallel")
if len(parts) > 1:
parallel_value = parts[1].strip().split()[0]
except Exception:
pass
# Build command for auto-retry-tests.py
command_parts = [
f"cd {operator}/",
"&&",
"python ./scripts/auto-retry-tests.py",
f"--parallel {parallel_value}",
f"--attempts-parallel {retry_config['attempts_parallel']}",
f"--attempts-serial {retry_config['attempts_serial']}",
f"--output-dir {TARGET_FOLDER}test-results",
]
if retry_config["delete_failed_namespaces"]:
command_parts.append("--delete-failed-namespaces")
# Add extra args from test_script_params (excluding --parallel which we already handled)
extra_params = []
skip_next = False
for param in test_script_params.split():
if skip_next:
skip_next = False
continue
if param == "--parallel":
skip_next = True
continue
extra_params.append(param)
if extra_params:
command_parts.append("--extra-args")
command_parts.extend(extra_params)
command_parts.extend(["2>&1", ";", "echo $? > /test_exit_code"])
command_run_tests = f"({' '.join(command_parts)}) | tee {TEST_OUTPUT_LOGFILE}"
else:
# Use traditional run-tests script
params = " --log-level debug" if "--log-level" not in test_script_params else ""
command_run_tests = f"(cd {operator}/ && python ./scripts/run-tests --skip-release {params} {test_script_params} 2>&1; echo $? > /test_exit_code) | tee {TEST_OUTPUT_LOGFILE}"
log("Running the following test command:")
log(command_run_tests)
os.system(command_run_tests)
sleep(15)
with open("/test_exit_code") as f:
return int(f.read().strip())
def write_logs_html(cluster_id, timestamp_start, timestamp_stop, opensearch_dashboards_url):
"""
The output file 'logs.html' contains links to the OpenSearch Dashboards application which
are prepared to filter the matching cluster id and timeframe.
"""
date_from = (timestamp_start - timedelta(hours=0, minutes=5)).strftime("%Y-%m-%dT%H:%M:00Z")
date_to = (timestamp_stop + timedelta(hours=0, minutes=5)).strftime("%Y-%m-%dT%H:%M:00Z")
with open("/src/modules/.cluster_logging/logs.html.j2") as f:
logs_html = Template(f.read())
with open(LOG_INDEX_LINKS_FILE, "w") as f:
f.write(
logs_html.render(
{
"cluster_id": cluster_id,
"date_from": date_from,
"date_to": date_to,
"opensearch_dashboards_url": opensearch_dashboards_url,
}
)
)
f.close()
if __name__ == "__main__":
job_start_timestamp_utc = datetime.now(UTC)
print("testing.stackable.tech operator-test-runner")
print()
print("This app runs an operator integration test.")
print()
if not init():
exit(EXIT_CODE_CLUSTER_FAILED)
set_target_folder_owner()
log("Reading catalog...")
if not catalog.read_catalog(log):
log("Error reading catalog, operator-test-runner is aborted.")
exit(EXIT_CODE_CLUSTER_FAILED)
# Read the platform and version data from the catalog
platform = catalog.get_platform(param_platform)
if not platform:
log(f"The platform '{param_platform}' does not exist.")
exit(EXIT_CODE_CLUSTER_FAILED)
if param_platform_version not in platform["versions"]:
log(
f"The version '{param_platform_version}' does not exist for platform '{param_platform}'."
)
exit(EXIT_CODE_CLUSTER_FAILED)
log(f"Test running on platform '{platform['id']}', version {param_platform_version}.")
# Read the cluster spec for the given platform
cluster_spec = catalog.get_spec_for_operator_test(param_operator, platform["id"], log)
if not cluster_spec:
log("Cluster spec could not be determined.")
exit(EXIT_CODE_CLUSTER_FAILED)
log(
f"Test running on Git Branch {param_git_branch} with the test script parameters '{param_test_script_params}'..."
)
# random cluster ID
cluster_id = uuid.uuid4().hex
log("Creating cluster...")
cluster = create_cluster(
platform["provider"],
cluster_id,
cluster_spec,
param_platform_version,
CLUSTER_INFO_FILE,
log,
)
if not cluster:
log("Cluster could not be created.")
exit(EXIT_CODE_CLUSTER_FAILED)
log("Cloning git repo...")
clone_git_repo(param_operator)
log("Waiting 1 minute for the cluster to become ready...")
sleep(60)
log("Install Cluster Logging (powered by Vector)...")
installed_cluster_logging = install_cluster_logging(
cluster_id,
param_cluster_logging_endpoint,
param_cluster_logging_username,
param_cluster_logging_password,
log,
)
if installed_cluster_logging:
log("Installed Cluster Logging (powered by Vector).")
else:
log("Error installing Cluster Logging, continuing without it...")
set_target_folder_owner()
log("Waiting 1 minute for the cluster logging to become ready...")
sleep(60)
log("Running tests...")
test_exit_code = run_tests(param_operator, param_operator_version, param_test_script_params)
log(f"Test exited with code {test_exit_code}")
log()
log("Waiting 1 minute to allow logs to be processed...")
sleep(60)
termination_successful = terminate_cluster(platform["provider"], cluster, log)
job_finished_timestamp_utc = datetime.now(UTC)
# Write file which links to the logs if URL set
if param_opensearch_dashboards_url:
write_logs_html(
cluster_id,
job_start_timestamp_utc,
job_finished_timestamp_utc,
param_opensearch_dashboards_url,
)
# Set output file ownership recursively
# This is important as the test script might have added files which are not controlled
# by this Python script and therefore most probably are owned by root
set_target_folder_owner()
if not termination_successful:
log("Cluster could not be terminated.")
exit(EXIT_CODE_CLUSTER_FAILED)
exit(test_exit_code)