|  | # SPDX-License-Identifier: GPL-2.0 | 
|  | # | 
|  | # Runs UML kernel, collects output, and handles errors. | 
|  | # | 
|  | # Copyright (C) 2019, Google LLC. | 
|  | # Author: Felix Guo <felixguoxiuping@gmail.com> | 
|  | # Author: Brendan Higgins <brendanhiggins@google.com> | 
|  |  | 
|  | import importlib.abc | 
|  | import importlib.util | 
|  | import logging | 
|  | import subprocess | 
|  | import os | 
|  | import shlex | 
|  | import shutil | 
|  | import signal | 
|  | import threading | 
|  | from typing import Iterator, List, Optional, Tuple | 
|  | from types import FrameType | 
|  |  | 
|  | import kunit_config | 
|  | import qemu_config | 
|  |  | 
|  | KCONFIG_PATH = '.config' | 
|  | KUNITCONFIG_PATH = '.kunitconfig' | 
|  | OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig' | 
|  | DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config' | 
|  | ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config' | 
|  | UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config' | 
|  | OUTFILE_PATH = 'test.log' | 
|  | ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__)) | 
|  | QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs') | 
|  |  | 
|  | class ConfigError(Exception): | 
|  | """Represents an error trying to configure the Linux kernel.""" | 
|  |  | 
|  |  | 
|  | class BuildError(Exception): | 
|  | """Represents an error trying to build the Linux kernel.""" | 
|  |  | 
|  |  | 
|  | class LinuxSourceTreeOperations: | 
|  | """An abstraction over command line operations performed on a source tree.""" | 
|  |  | 
|  | def __init__(self, linux_arch: str, cross_compile: Optional[str]): | 
|  | self._linux_arch = linux_arch | 
|  | self._cross_compile = cross_compile | 
|  |  | 
|  | def make_mrproper(self) -> None: | 
|  | try: | 
|  | subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT) | 
|  | except OSError as e: | 
|  | raise ConfigError('Could not call make command: ' + str(e)) | 
|  | except subprocess.CalledProcessError as e: | 
|  | raise ConfigError(e.output.decode()) | 
|  |  | 
|  | def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: | 
|  | return base_kunitconfig | 
|  |  | 
|  | def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None: | 
|  | command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig'] | 
|  | if self._cross_compile: | 
|  | command += ['CROSS_COMPILE=' + self._cross_compile] | 
|  | if make_options: | 
|  | command.extend(make_options) | 
|  | print('Populating config with:\n$', ' '.join(command)) | 
|  | try: | 
|  | subprocess.check_output(command, stderr=subprocess.STDOUT) | 
|  | except OSError as e: | 
|  | raise ConfigError('Could not call make command: ' + str(e)) | 
|  | except subprocess.CalledProcessError as e: | 
|  | raise ConfigError(e.output.decode()) | 
|  |  | 
|  | def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None: | 
|  | command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)] | 
|  | if make_options: | 
|  | command.extend(make_options) | 
|  | if self._cross_compile: | 
|  | command += ['CROSS_COMPILE=' + self._cross_compile] | 
|  | print('Building with:\n$', ' '.join(command)) | 
|  | try: | 
|  | proc = subprocess.Popen(command, | 
|  | stderr=subprocess.PIPE, | 
|  | stdout=subprocess.DEVNULL) | 
|  | except OSError as e: | 
|  | raise BuildError('Could not call execute make: ' + str(e)) | 
|  | except subprocess.CalledProcessError as e: | 
|  | raise BuildError(e.output) | 
|  | _, stderr = proc.communicate() | 
|  | if proc.returncode != 0: | 
|  | raise BuildError(stderr.decode()) | 
|  | if stderr:  # likely only due to build warnings | 
|  | print(stderr.decode()) | 
|  |  | 
|  | def start(self, params: List[str], build_dir: str) -> subprocess.Popen: | 
|  | raise RuntimeError('not implemented!') | 
|  |  | 
|  |  | 
|  | class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations): | 
|  |  | 
|  | def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]): | 
|  | super().__init__(linux_arch=qemu_arch_params.linux_arch, | 
|  | cross_compile=cross_compile) | 
|  | self._kconfig = qemu_arch_params.kconfig | 
|  | self._qemu_arch = qemu_arch_params.qemu_arch | 
|  | self._kernel_path = qemu_arch_params.kernel_path | 
|  | self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot' | 
|  | self._extra_qemu_params = qemu_arch_params.extra_qemu_params | 
|  | self._serial = qemu_arch_params.serial | 
|  |  | 
|  | def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: | 
|  | kconfig = kunit_config.parse_from_string(self._kconfig) | 
|  | kconfig.merge_in_entries(base_kunitconfig) | 
|  | return kconfig | 
|  |  | 
|  | def start(self, params: List[str], build_dir: str) -> subprocess.Popen: | 
|  | kernel_path = os.path.join(build_dir, self._kernel_path) | 
|  | qemu_command = ['qemu-system-' + self._qemu_arch, | 
|  | '-nodefaults', | 
|  | '-m', '1024', | 
|  | '-kernel', kernel_path, | 
|  | '-append', ' '.join(params + [self._kernel_command_line]), | 
|  | '-no-reboot', | 
|  | '-nographic', | 
|  | '-serial', self._serial] + self._extra_qemu_params | 
|  | # Note: shlex.join() does what we want, but requires python 3.8+. | 
|  | print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command)) | 
|  | return subprocess.Popen(qemu_command, | 
|  | stdin=subprocess.PIPE, | 
|  | stdout=subprocess.PIPE, | 
|  | stderr=subprocess.STDOUT, | 
|  | text=True, errors='backslashreplace') | 
|  |  | 
|  | class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations): | 
|  | """An abstraction over command line operations performed on a source tree.""" | 
|  |  | 
|  | def __init__(self, cross_compile: Optional[str]=None): | 
|  | super().__init__(linux_arch='um', cross_compile=cross_compile) | 
|  |  | 
|  | def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: | 
|  | kconfig = kunit_config.parse_file(UML_KCONFIG_PATH) | 
|  | kconfig.merge_in_entries(base_kunitconfig) | 
|  | return kconfig | 
|  |  | 
|  | def start(self, params: List[str], build_dir: str) -> subprocess.Popen: | 
|  | """Runs the Linux UML binary. Must be named 'linux'.""" | 
|  | linux_bin = os.path.join(build_dir, 'linux') | 
|  | params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt']) | 
|  | return subprocess.Popen([linux_bin] + params, | 
|  | stdin=subprocess.PIPE, | 
|  | stdout=subprocess.PIPE, | 
|  | stderr=subprocess.STDOUT, | 
|  | text=True, errors='backslashreplace') | 
|  |  | 
|  | def get_kconfig_path(build_dir: str) -> str: | 
|  | return os.path.join(build_dir, KCONFIG_PATH) | 
|  |  | 
|  | def get_kunitconfig_path(build_dir: str) -> str: | 
|  | return os.path.join(build_dir, KUNITCONFIG_PATH) | 
|  |  | 
|  | def get_old_kunitconfig_path(build_dir: str) -> str: | 
|  | return os.path.join(build_dir, OLD_KUNITCONFIG_PATH) | 
|  |  | 
|  | def get_parsed_kunitconfig(build_dir: str, | 
|  | kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig: | 
|  | if not kunitconfig_paths: | 
|  | path = get_kunitconfig_path(build_dir) | 
|  | if not os.path.exists(path): | 
|  | shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path) | 
|  | return kunit_config.parse_file(path) | 
|  |  | 
|  | merged = kunit_config.Kconfig() | 
|  |  | 
|  | for path in kunitconfig_paths: | 
|  | if os.path.isdir(path): | 
|  | path = os.path.join(path, KUNITCONFIG_PATH) | 
|  | if not os.path.exists(path): | 
|  | raise ConfigError(f'Specified kunitconfig ({path}) does not exist') | 
|  |  | 
|  | partial = kunit_config.parse_file(path) | 
|  | diff = merged.conflicting_options(partial) | 
|  | if diff: | 
|  | diff_str = '\n\n'.join(f'{a}\n  vs from {path}\n{b}' for a, b in diff) | 
|  | raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}') | 
|  | merged.merge_in_entries(partial) | 
|  | return merged | 
|  |  | 
|  | def get_outfile_path(build_dir: str) -> str: | 
|  | return os.path.join(build_dir, OUTFILE_PATH) | 
|  |  | 
|  | def _default_qemu_config_path(arch: str) -> str: | 
|  | config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py') | 
|  | if os.path.isfile(config_path): | 
|  | return config_path | 
|  |  | 
|  | options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')] | 
|  | raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options))) | 
|  |  | 
|  | def _get_qemu_ops(config_path: str, | 
|  | extra_qemu_args: Optional[List[str]], | 
|  | cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]: | 
|  | # The module name/path has very little to do with where the actual file | 
|  | # exists (I learned this through experimentation and could not find it | 
|  | # anywhere in the Python documentation). | 
|  | # | 
|  | # Bascially, we completely ignore the actual file location of the config | 
|  | # we are loading and just tell Python that the module lives in the | 
|  | # QEMU_CONFIGS_DIR for import purposes regardless of where it actually | 
|  | # exists as a file. | 
|  | module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path)) | 
|  | spec = importlib.util.spec_from_file_location(module_path, config_path) | 
|  | assert spec is not None | 
|  | config = importlib.util.module_from_spec(spec) | 
|  | # See https://github.com/python/typeshed/pull/2626 for context. | 
|  | assert isinstance(spec.loader, importlib.abc.Loader) | 
|  | spec.loader.exec_module(config) | 
|  |  | 
|  | if not hasattr(config, 'QEMU_ARCH'): | 
|  | raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path) | 
|  | params: qemu_config.QemuArchParams = config.QEMU_ARCH | 
|  | if extra_qemu_args: | 
|  | params.extra_qemu_params.extend(extra_qemu_args) | 
|  | return params.linux_arch, LinuxSourceTreeOperationsQemu( | 
|  | params, cross_compile=cross_compile) | 
|  |  | 
|  | class LinuxSourceTree: | 
|  | """Represents a Linux kernel source tree with KUnit tests.""" | 
|  |  | 
|  | def __init__( | 
|  | self, | 
|  | build_dir: str, | 
|  | kunitconfig_paths: Optional[List[str]]=None, | 
|  | kconfig_add: Optional[List[str]]=None, | 
|  | arch: Optional[str]=None, | 
|  | cross_compile: Optional[str]=None, | 
|  | qemu_config_path: Optional[str]=None, | 
|  | extra_qemu_args: Optional[List[str]]=None) -> None: | 
|  | signal.signal(signal.SIGINT, self.signal_handler) | 
|  | if qemu_config_path: | 
|  | self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) | 
|  | else: | 
|  | self._arch = 'um' if arch is None else arch | 
|  | if self._arch == 'um': | 
|  | self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile) | 
|  | else: | 
|  | qemu_config_path = _default_qemu_config_path(self._arch) | 
|  | _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) | 
|  |  | 
|  | self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths) | 
|  | if kconfig_add: | 
|  | kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add)) | 
|  | self._kconfig.merge_in_entries(kconfig) | 
|  |  | 
|  | def arch(self) -> str: | 
|  | return self._arch | 
|  |  | 
|  | def clean(self) -> bool: | 
|  | try: | 
|  | self._ops.make_mrproper() | 
|  | except ConfigError as e: | 
|  | logging.error(e) | 
|  | return False | 
|  | return True | 
|  |  | 
|  | def validate_config(self, build_dir: str) -> bool: | 
|  | kconfig_path = get_kconfig_path(build_dir) | 
|  | validated_kconfig = kunit_config.parse_file(kconfig_path) | 
|  | if self._kconfig.is_subset_of(validated_kconfig): | 
|  | return True | 
|  | missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries()) | 
|  | message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \ | 
|  | 'This is probably due to unsatisfied dependencies.\n' \ | 
|  | 'Missing: ' + ', '.join(str(e) for e in missing) | 
|  | if self._arch == 'um': | 
|  | message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \ | 
|  | 'on a different architecture with something like "--arch=x86_64".' | 
|  | logging.error(message) | 
|  | return False | 
|  |  | 
|  | def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool: | 
|  | kconfig_path = get_kconfig_path(build_dir) | 
|  | if build_dir and not os.path.exists(build_dir): | 
|  | os.mkdir(build_dir) | 
|  | try: | 
|  | self._kconfig = self._ops.make_arch_config(self._kconfig) | 
|  | self._kconfig.write_to_file(kconfig_path) | 
|  | self._ops.make_olddefconfig(build_dir, make_options) | 
|  | except ConfigError as e: | 
|  | logging.error(e) | 
|  | return False | 
|  | if not self.validate_config(build_dir): | 
|  | return False | 
|  |  | 
|  | old_path = get_old_kunitconfig_path(build_dir) | 
|  | if os.path.exists(old_path): | 
|  | os.remove(old_path)  # write_to_file appends to the file | 
|  | self._kconfig.write_to_file(old_path) | 
|  | return True | 
|  |  | 
|  | def _kunitconfig_changed(self, build_dir: str) -> bool: | 
|  | old_path = get_old_kunitconfig_path(build_dir) | 
|  | if not os.path.exists(old_path): | 
|  | return True | 
|  |  | 
|  | old_kconfig = kunit_config.parse_file(old_path) | 
|  | return old_kconfig != self._kconfig | 
|  |  | 
|  | def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool: | 
|  | """Creates a new .config if it is not a subset of the .kunitconfig.""" | 
|  | kconfig_path = get_kconfig_path(build_dir) | 
|  | if not os.path.exists(kconfig_path): | 
|  | print('Generating .config ...') | 
|  | return self.build_config(build_dir, make_options) | 
|  |  | 
|  | existing_kconfig = kunit_config.parse_file(kconfig_path) | 
|  | self._kconfig = self._ops.make_arch_config(self._kconfig) | 
|  |  | 
|  | if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir): | 
|  | return True | 
|  | print('Regenerating .config ...') | 
|  | os.remove(kconfig_path) | 
|  | return self.build_config(build_dir, make_options) | 
|  |  | 
|  | def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool: | 
|  | try: | 
|  | self._ops.make_olddefconfig(build_dir, make_options) | 
|  | self._ops.make(jobs, build_dir, make_options) | 
|  | except (ConfigError, BuildError) as e: | 
|  | logging.error(e) | 
|  | return False | 
|  | return self.validate_config(build_dir) | 
|  |  | 
|  | def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', timeout: Optional[int]=None) -> Iterator[str]: | 
|  | if not args: | 
|  | args = [] | 
|  | if filter_glob: | 
|  | args.append('kunit.filter_glob='+filter_glob) | 
|  | args.append('kunit.enable=1') | 
|  |  | 
|  | process = self._ops.start(args, build_dir) | 
|  | assert process.stdout is not None  # tell mypy it's set | 
|  |  | 
|  | # Enforce the timeout in a background thread. | 
|  | def _wait_proc() -> None: | 
|  | try: | 
|  | process.wait(timeout=timeout) | 
|  | except Exception as e: | 
|  | print(e) | 
|  | process.terminate() | 
|  | process.wait() | 
|  | waiter = threading.Thread(target=_wait_proc) | 
|  | waiter.start() | 
|  |  | 
|  | output = open(get_outfile_path(build_dir), 'w') | 
|  | try: | 
|  | # Tee the output to the file and to our caller in real time. | 
|  | for line in process.stdout: | 
|  | output.write(line) | 
|  | yield line | 
|  | # This runs even if our caller doesn't consume every line. | 
|  | finally: | 
|  | # Flush any leftover output to the file | 
|  | output.write(process.stdout.read()) | 
|  | output.close() | 
|  | process.stdout.close() | 
|  |  | 
|  | waiter.join() | 
|  | subprocess.call(['stty', 'sane']) | 
|  |  | 
|  | def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None: | 
|  | logging.error('Build interruption occurred. Cleaning console.') | 
|  | subprocess.call(['stty', 'sane']) |