-
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrun-tests
More file actions
executable file
·470 lines (400 loc) · 14.7 KB
/
run-tests
File metadata and controls
executable file
·470 lines (400 loc) · 14.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
#!/usr/bin/env python
# vim: filetype=python syntax=python tabstop=4 expandtab
import argparse
import collections.abc
import contextlib
import logging
import os
import re
import shutil
import subprocess
import sys
import tempfile
__version__ = "0.0.1"
DESCRIPTION = """
Run integration tests. Call this script from the root of the repository.
Exits with 0 on success, 1 on failure.
Requires the following commands to be installed:
* beku
* stackablectl
* kubectl
* kubectl-kuttl
Examples:
1. Install operators, run all tests and clean up test namespaces:
./scripts/run-tests --parallel 4
2. Install operators but for Airflow use version "0.0.0-pr123" instead of "0.0.0-dev" and run all tests as above:
./scripts/run-tests --operator airflow=0.0.0-pr123 --parallel 4
3. Do not install any operators, run the smoke test suite and keep namespace:
./scripts/run-tests --skip-release --skip-delete --test-suite smoke-latest
4. Run the ldap test(s) from the openshift test suite and keep namespace:
./scripts/run-tests --skip-release --skip-delete --test-suite openshift --test ldap
5. Run the smoke test suite in the namespace "smoke". The namespace will be
created if it doesn't exist and will not be deleted when the tests end.
./scripts/run-tests --test-suite smoke-latest --namespace smoke
"""
class TestRunnerException(Exception):
pass
def parse_args(argv: list[str]) -> argparse.Namespace:
"""Parse command line args."""
parser = argparse.ArgumentParser(
description=DESCRIPTION, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"--version",
help="Display application version",
action="version",
version=f"%(prog)s {__version__}",
)
parser.add_argument(
"--skip-delete",
help="Do not delete test namespaces.",
action="store_true",
)
parser.add_argument(
"--skip-tests",
help="Do not actually run the tests.",
action="store_true",
)
parser.add_argument(
"--skip-release",
help="Do not install operators.",
action="store_true",
)
parser.add_argument(
"--parallel",
help="How many tests to run in parallel. Default 2.",
type=int,
required=False,
default=2,
)
parser.add_argument(
"--operator",
help="Patch operator version in release.yaml. Format <operator>=<version>",
action="append",
type=cli_parse_operator_args,
default=[],
)
parser.add_argument(
"--skip-operator",
help="Skip given operator(s) when installing a release.",
action="append",
default=[],
)
parser.add_argument(
"--test",
help="Kuttl test to run.",
type=str,
required=False,
)
parser.add_argument(
"--test-suite",
help="Name of the test suite to expand. Default: default",
type=str,
required=False,
)
parser.add_argument(
"--log-level",
help="Set log level.",
type=cli_log_level,
required=False,
default=logging.INFO,
)
parser.add_argument(
"--namespace",
help="Namespace to run the tests in. It will be created if it doesn't already exist.",
type=str,
required=False,
)
parser.add_argument(
"--work-dir",
help="Working directory for test generation and execution (default: tests/_work)",
type=str,
required=False,
default=os.path.join("tests", "_work"),
)
parser.add_argument(
"--listener-class-preset",
help="Choose the ListenerClass preset",
type=str,
required=False,
)
return parser.parse_args(argv)
def cli_parse_operator_args(args: str) -> tuple[str, str]:
if "=" not in args:
raise argparse.ArgumentTypeError(
f"Invalid operator argument: {args}. Must be in format <operator>=<version>"
)
op, version = args.split("=", maxsplit=1)
return op, version
def cli_log_level(cli_arg: str) -> int:
match cli_arg:
case "debug":
return logging.DEBUG
case "info":
return logging.INFO
case "error":
return logging.ERROR
case "warning":
return logging.WARNING
case "critical":
return logging.CRITICAL
case _:
raise argparse.ArgumentTypeError("Invalid log level")
def have_requirements() -> None:
commands = [
("beku", "https://github.com/stackabletech/beku.py"),
(
"stackablectl",
"https://github.com/stackabletech/stackable-cockpit/blob/main/rust/stackablectl/README.md",
),
("kubectl", "https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/"),
("kubectl-kuttl", "https://kuttl.dev/"),
]
err = False
for command, url in commands:
if not shutil.which(command):
logging.error(f'Command "{command}" not found, please install from {url}')
err = True
if err:
raise TestRunnerException()
@contextlib.contextmanager
def release_file(
operators: list[tuple[str, str]], skip_ops: list[str]
) -> collections.abc.Generator[str, None, None]:
"""Generate a (possibly modified) copy of the release.yaml file.
Operator versions passed as --operator take precedence over the release.yaml contents.
Operators passed as --skip-operator are excluded from the resulting release.yaml contents.
If an invalid operator name is provided (i.e. one that doesn't exist in the
original release file), a TestRunnerException is raised.
Yields the name of the (potentially patched) release file. This is a temporary
file that will be deleted when the context manager exits.
"""
def _patch():
release_file = os.path.join("tests", "release.yaml")
# A marker to validate that all ops were patched
patched_release = []
with open(release_file, "r") as f:
patched_ops = []
patch_version = ""
for line in f:
if patch_version:
line = re.sub(":.+$", f": {patch_version}", line)
patch_version = ""
else:
for op, version in operators:
if op in line:
patch_version = version
patched_ops.append(op)
break
patched_release.append(line.rstrip("\n"))
# Sanity test that cli didn't contain garbage that is silently discarded
ops_not_patched = set([op for op, _ in operators]) - set(patched_ops)
if ops_not_patched:
logging.error(
f"Patched operators [{', '.join(ops_not_patched)}] not found in {release_file}"
)
raise TestRunnerException()
# Filter out skip operators
release_contents = []
skip_lines = 0
valid_skip_ops = []
for line in patched_release:
if skip_lines:
skip_lines -= 1
continue
for op in skip_ops:
if op in line:
# Every product section has 1 line of additional config to skip
skip_lines = 1
valid_skip_ops.append(op)
break
else:
release_contents.append(line)
# Sanity test that cli didn't contain garbage that is silently discarded
ops_not_skipped = set(skip_ops) - set(valid_skip_ops)
if ops_not_skipped:
logging.error(
f"Skipped operators [{', '.join(ops_not_skipped)}] not found in {release_file}"
)
raise TestRunnerException()
with tempfile.NamedTemporaryFile(
mode="w",
delete=False,
prefix="patched",
) as f:
pcontents = "\n".join(release_contents)
logging.debug(f"Writing patched release to {f.name}: {pcontents}\n")
f.write(pcontents)
return f.name
release_file = _patch()
try:
yield release_file
except TestRunnerException as e:
logging.error(f"Caught exception: {e}")
raise
finally:
if "patched" in release_file:
try:
logging.debug(f"Removing patched release file : {release_file}")
os.remove(release_file)
except FileNotFoundError | OSError:
logging.error(f"Failed to delete patched release file: {release_file}")
def maybe_install_release(
skip_release: bool, release_file: str, listener_class_preset: str
) -> None:
if skip_release:
logging.debug("Skip release installation")
return
try:
stackablectl_cmd = [
"stackablectl",
"release",
"install",
"--release-file",
release_file,
*(
["--listener-class-preset", listener_class_preset]
if listener_class_preset
else []
),
"tests",
]
logging.debug(f"Running : {stackablectl_cmd}")
completed_proc = subprocess.run(
stackablectl_cmd,
capture_output=True,
check=True,
)
# stackablectl doesn't return a non-zero exit code on failure
# so we need to check stderr for errors
stackablectl_err = completed_proc.stderr.decode("utf-8")
if "error" in stackablectl_err.lower():
logging.error(stackablectl_err)
logging.error("stackablectl failed")
raise TestRunnerException()
except subprocess.CalledProcessError as e:
# in case stackablectl starts returning non-zero exit codes
logging.error(e.stderr.decode("utf-8"))
logging.error("stackablectl failed")
raise TestRunnerException()
def gen_tests(test_suite: str, namespace: str, work_dir: str) -> None:
try:
beku_cmd = [
"beku",
"--test_definition",
os.path.join("tests", "test-definition.yaml"),
"--kuttl_test",
os.path.join("tests", "kuttl-test.yaml.jinja2"),
"--template_dir",
os.path.join("tests", "templates", "kuttl"),
"--output_dir",
work_dir,
]
if test_suite:
beku_cmd.extend(["--suite", test_suite])
if namespace:
beku_cmd.extend(["--namespace", namespace])
logging.debug(f"Running : {beku_cmd}")
subprocess.run(
beku_cmd,
check=True,
)
except subprocess.CalledProcessError:
logging.error("beku failed")
raise TestRunnerException()
def run_tests(
test: str, parallel: int, namespace: str, skip_delete: bool, work_dir: str
) -> None:
try:
kuttl_cmd = ["kubectl-kuttl", "test"]
if test:
kuttl_cmd.extend(["--test", test])
if parallel:
kuttl_cmd.extend(["--parallel", str(parallel)])
if skip_delete:
kuttl_cmd.extend(["--skip-delete"])
if namespace:
kuttl_cmd.extend(["--namespace", namespace])
# kuttl doesn't create the namespace so we need to do it ourselves
ensure_namespace_exists(namespace)
logging.debug(f"Running : {kuttl_cmd}")
subprocess.run(
kuttl_cmd,
cwd=work_dir,
check=True,
)
except subprocess.CalledProcessError:
logging.error("kuttl failed")
raise TestRunnerException()
def ensure_namespace_exists(namespace: str) -> None:
"""
Ensure the specified namespace exists, creating it if necessary.
This function handles various permission scenarios:
- If the namespace already exists, it does nothing
- If it doesn't exist and we have permission, it creates it
- If we don't have permission to create/check namespaces, it logs a warning
and assumes the namespace exists or will be created externally (useful for OpenShift)
Examples of (permission) errors we handle:
- Error from server (Forbidden): namespaces is forbidden: User "developer" cannot create resource "namespaces" in API group "" at the cluster scope
- Error from server (Forbidden): namespaces "foobar123" is forbidden: User "developer" cannot get resource "namespaces" in API group "" in the namespace "foobar123"
- Error from server (AlreadyExists): namespaces "kuttl-test-finer-caiman" already exists
"""
# First check if the namespace already exists
check_ns_cmd = ["kubectl", "get", "namespace", namespace]
try:
logging.debug(f"Checking if namespace exists: {check_ns_cmd}")
subprocess.run(
check_ns_cmd,
check=True,
capture_output=True,
)
logging.debug(f"Namespace '{namespace}' already exists")
except subprocess.CalledProcessError:
# Namespace doesn't exist, try to create it
create_ns_cmd = ["kubectl", "create", "namespace", namespace]
try:
logging.debug(f"Creating namespace: {create_ns_cmd}")
subprocess.run(
create_ns_cmd,
check=True,
capture_output=True,
)
logging.debug(f"Successfully created namespace '{namespace}'")
except subprocess.CalledProcessError as e:
stderr = e.stderr.decode("utf-8")
if "already exists" in stderr:
logging.debug(
f"Namespace '{namespace}' already exists (race condition)"
)
elif "forbidden" in stderr.lower():
logging.warning(
f"No permission to create namespace '{namespace}', assuming it exists or will be created externally"
)
else:
logging.error(stderr)
logging.error("namespace creation failed")
raise TestRunnerException()
def main(argv) -> int:
ret = 0
try:
opts = parse_args(argv[1:])
logging.basicConfig(encoding="utf-8", level=opts.log_level)
have_requirements()
gen_tests(opts.test_suite, opts.namespace, opts.work_dir)
with release_file(opts.operator, opts.skip_operator) as f:
maybe_install_release(opts.skip_release, f, opts.listener_class_preset)
if opts.skip_tests:
logging.info("Skip running tests.")
else:
run_tests(
opts.test,
opts.parallel,
opts.namespace,
opts.skip_delete,
opts.work_dir,
)
except TestRunnerException:
ret = 1
return ret
if __name__ == "__main__":
sys.exit(main(sys.argv))