-
-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
mypy fixes and always import fs image
- Loading branch information
Showing
151 changed files
with
6,018 additions
and
5,338 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,39 @@ | ||
# -*- coding: utf-8 -*- | ||
"""The Context the generation will run in. | ||
"""An object used to find/get host system binaries. | ||
@author: Tobias Hunger <[email protected]> | ||
""" | ||
|
||
|
||
from .context import Binaries, Context | ||
from ..exceptions import PreflightError | ||
from ..printer import (debug, error, info, h2, warn,) | ||
from .printer import fail, info, trace, warn | ||
|
||
from enum import Enum, auto, unique | ||
import os | ||
import typing | ||
|
||
|
||
@unique | ||
class Binaries(Enum): | ||
"""Important binaries.""" | ||
|
||
BORG = auto() | ||
BTRFS = auto() | ||
PACMAN = auto() | ||
PACMAN_KEY = auto() | ||
PACSTRAP = auto() | ||
APT_GET = auto() | ||
DPKG = auto() | ||
DEBOOTSTRAP = auto() | ||
SBSIGN = auto() | ||
OBJCOPY = auto() | ||
MKSQUASHFS = auto() | ||
VERITYSETUP = auto() | ||
TAR = auto() | ||
USERMOD = auto() | ||
USERADD = auto() | ||
GROUPADD = auto() | ||
GROUPMOD = auto() | ||
CHROOT_HELPER = auto() | ||
|
||
|
||
def _check_for_binary(binary: str) -> str: | ||
|
@@ -18,15 +42,15 @@ def _check_for_binary(binary: str) -> str: | |
|
||
|
||
def _get_distribution(): | ||
with open("/usr/lib/os-release") as os: | ||
for line in os.readlines(): | ||
with open("/usr/lib/os-release") as os_release: | ||
for line in os_release.readlines(): | ||
line = line.strip() | ||
if line.startswith('ID_LIKE='): | ||
return line[8:].strip('"') | ||
return "<UNSUPPORTED>" | ||
|
||
|
||
def _find_binaries(ctx: Context) -> None: | ||
def _find_binaries() -> typing.Dict[Binaries, str]: | ||
binaries = { | ||
Binaries.BORG: _check_for_binary('/usr/bin/borg'), | ||
Binaries.BTRFS: _check_for_binary('/usr/bin/btrfs'), | ||
|
@@ -41,57 +65,50 @@ def _find_binaries(ctx: Context) -> None: | |
# in arch-install-scripts in ubuntu:-) | ||
Binaries.CHROOT_HELPER: _check_for_binary('/usr/bin/arch-chroot'), | ||
} | ||
os_binaries = {} | ||
os_binaries: typing.Dict[Binaries, str] = {} | ||
distribution = _get_distribution() | ||
if (distribution == "debian"): | ||
if distribution == "debian": | ||
os_binaries = { | ||
Binaries.APT_GET: _check_for_binary('/usr/bin/apt-get'), | ||
Binaries.DPKG: _check_for_binary('/usr/bin/dpkg'), | ||
Binaries.DEBOOTSTRAP: _check_for_binary('/usr/sbin/debootstrap'), | ||
Binaries.VERITYSETUP: _check_for_binary('/usr/sbin/veritysetup'), | ||
} | ||
elif (distribution == "archlinux"): | ||
elif distribution == "archlinux": | ||
os_binaries = { | ||
Binaries.PACMAN: _check_for_binary('/usr/bin/pacman'), | ||
Binaries.PACMAN_KEY: _check_for_binary('/usr/bin/pacman-key'), | ||
Binaries.PACSTRAP: _check_for_binary('/usr/bin/pacstrap'), | ||
Binaries.VERITYSETUP: _check_for_binary('/usr/bin/veritysetup'), | ||
} | ||
else: | ||
error("Unsupported Linux flavor.") | ||
fail("Unsupported Linux flavor.") | ||
|
||
ctx.set_binaries({**binaries, **os_binaries}) | ||
return {**binaries, **os_binaries} | ||
|
||
|
||
def preflight_check(ctx: Context) -> None: | ||
"""Run a fast pre-flight check on the context.""" | ||
h2('Running Preflight Checks.', verbosity=2) | ||
class BinaryManager: | ||
"""The find and allow access to all the different system binaries | ||
Cleanroom may need.""" | ||
|
||
_find_binaries(ctx) | ||
def __init__(self) -> None: | ||
"""Constructor.""" | ||
self._binaries = _find_binaries() | ||
|
||
binaries = _preflight_binaries_check(ctx) | ||
users = _preflight_users_check(ctx) | ||
def preflight_check(self) -> bool: | ||
passed = True | ||
for b in self._binaries.items(): | ||
if b[1]: | ||
info('{} found: {}...'.format(b[0], b[1])) | ||
else: | ||
warn('{} not found.'.format(b[0])) | ||
passed = False | ||
return passed | ||
|
||
if not binaries or not users: | ||
raise PreflightError('Preflight Check failed.') | ||
def binary(self, selector: Binaries) -> typing.Optional[str]: | ||
"""Get a binary from the context.""" | ||
assert self._binaries | ||
|
||
|
||
def _preflight_binaries_check(ctx: Context) -> bool: | ||
"""Check executables.""" | ||
passed = True | ||
for b in ctx._binaries.items(): | ||
if b[1]: | ||
info('{} found: {}...'.format(b[0], b[1])) | ||
else: | ||
warn('{} not found.'.format(b[0])) | ||
passed = False | ||
return passed | ||
|
||
|
||
def _preflight_users_check(ctx: Context) -> bool: | ||
"""Check tha the script is running as root.""" | ||
if os.geteuid() == 0: | ||
debug('Running as root.') | ||
return True | ||
warn('Not running as root.') | ||
return False | ||
binary = self._binaries[selector] | ||
trace('Binary for {}: {}.'.format(selector, binary)) | ||
return binary |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Base class for print_commands usable in the system definition files. | ||
The Command class will be used to derive all system definition file print_commands | ||
from. | ||
@author: Tobias Hunger <[email protected]> | ||
""" | ||
|
||
|
||
from __future__ import annotations | ||
|
||
from .exceptions import GenerateError, ParseError | ||
from .location import Location | ||
from .printer import trace, fail, success | ||
from .systemcontext import SystemContext | ||
|
||
import os | ||
import os.path | ||
import typing | ||
|
||
|
||
ServicesType = typing.Mapping[str, typing.Any] | ||
|
||
|
||
def _stringify(command: str, *args, **kwargs): | ||
args_str = ' "' + '" "'.join(args) + '"' if args else '' | ||
kwargs_str = ' '.join(map(lambda kv: kv[0] + '="' + str(kv[1]) + '"', | ||
kwargs.items())) if kwargs else '' | ||
return '"{}"'.format(command) + args_str + kwargs_str | ||
|
||
|
||
class Command: | ||
"""A command that can be used in to set up a machines.""" | ||
|
||
def __init__(self, name: str, *, file: str, | ||
syntax: str = '', help_string: str, | ||
services: ServicesType) \ | ||
-> None: | ||
"""Constructor.""" | ||
self._name = name | ||
self._syntax_string = syntax | ||
self._help_string = help_string | ||
helper_directory = os.path.join(os.path.dirname(os.path.realpath(file)), | ||
'helper', self._name) | ||
self.__helper_directory = helper_directory if os.path.isdir(helper_directory) else None | ||
self._services = services | ||
|
||
@property | ||
def syntax_string(self) -> str: | ||
"""Return syntax description.""" | ||
if self._syntax_string: | ||
return '{} {}'.format(self._name, self._syntax_string) | ||
return self._name | ||
|
||
@property | ||
def name(self) -> str: | ||
return self._name | ||
|
||
@property | ||
def help_string(self) -> str: | ||
"""Print help string.""" | ||
return self._help_string | ||
|
||
def validate(self, location: Location, | ||
*args: typing.Any, **kwargs: typing.Any) -> None: | ||
"""Implement this! | ||
Validate all arguments. | ||
""" | ||
fail('Command "{}"" called validate_arguments illegally!' | ||
.format(self.name)) | ||
return None | ||
|
||
def dependency(self, *args: typing.Any, **kwargs: typing.Any) \ | ||
-> typing.Optional[str]: | ||
"""Maybe implement this, but this default implementation should be ok.""" | ||
return None | ||
|
||
def execute(self, location: Location, system_context: SystemContext, | ||
*args: typing.Any, **kwargs: typing.Any) -> None: | ||
command_str = _stringify(self.name, *args, **kwargs) | ||
trace('{}: Execute {}.'.format(location, self.name, command_str)) | ||
self(location, system_context, *args, **kwargs) | ||
success('{}: Executed {}{}'.format(location, self.name, command_str)) | ||
|
||
def __call__(self, location: Location, system_context: SystemContext, | ||
*args: typing.Any, **kwargs: typing.Any) -> None: | ||
"""Implement this!""" | ||
|
||
def _execute(self, location: Location, system_context: SystemContext, | ||
command: str, *args: typing.Any, **kwargs: typing.Any) -> None: | ||
command_str = _stringify(command, *args, **kwargs) | ||
trace('{}: Execute {}.'.format(location, self.name, command_str)) | ||
|
||
command_info = self.service('command_manager').command(command) | ||
if not command_info: | ||
raise GenerateError('Command "{}" not found.'.format(command)) | ||
command_info.validate_func(location, *args, **kwargs) | ||
command_info.execute_func(location, system_context, *args, **kwargs) | ||
success('{}: Executed {}{}'.format(location, self.name, command_str)) | ||
|
||
def service(self, service_name: str) -> typing.Any: | ||
return self._services.get(service_name, None) | ||
|
||
@property | ||
def _helper_directory(self) -> typing.Optional[str]: | ||
"""Return the helper directory.""" | ||
return self.__helper_directory | ||
|
||
def _validate_no_arguments(self, location: Location, | ||
*args: typing.Any, **kwargs: typing.Any) -> None: | ||
self._validate_no_args(location, *args) | ||
self._validate_kwargs(location, (), **kwargs) | ||
|
||
def _validate_arguments_exact(self, location: Location, arg_count: int, message: str, | ||
*args: typing.Any, **kwargs: typing.Any) -> None: | ||
self._validate_args_exact(location, arg_count, message, *args) | ||
self._validate_kwargs(location, (), **kwargs) | ||
|
||
def _validate_arguments_at_least(self, location: Location, arg_count: int, message: str, | ||
*args: typing.Any, **kwargs: typing.Any) -> None: | ||
self._validate_args_at_least(location, arg_count, message, *args) | ||
self._validate_kwargs(location, (), **kwargs) | ||
|
||
def _validate_no_args(self, location: Location, *args: typing.Any) -> None: | ||
if args is list: | ||
trace('Validating arguments: "{}".'.format('", "'.join(str(args)))) | ||
else: | ||
trace('Validating argument: "{}".'.format(args)) | ||
self._validate_args_exact(location, 0, | ||
'"{}" does not take arguments.', *args) | ||
|
||
def _validate_args_exact(self, location: Location, arg_count: int, | ||
message: str, *args: typing.Any) -> None: | ||
if args is list: | ||
trace('Validating arguments: "{}".'.format('", "'.join(str(args)))) | ||
else: | ||
trace('Validating argument: "{}".'.format(args)) | ||
if len(args) != arg_count: | ||
raise ParseError(message.format(self.name), location=location) | ||
|
||
def _validate_args_at_least(self, location: Location, arg_count: int, | ||
message: str, *args: typing.Any) -> None: | ||
if args is list: | ||
trace('Validating arguments: "{}".'.format('", "'.join(str(args)))) | ||
else: | ||
trace('Validating argument: "{}".'.format(args)) | ||
if len(args) < arg_count: | ||
raise ParseError(message.format(self.name), location=location) | ||
|
||
def _validate_kwargs(self, location: Location, known_kwargs: typing.Tuple[str, ...], | ||
**kwargs: typing.Any) -> None: | ||
trace('Validating keyword arguments: "{}"' | ||
.format('", "'.join(['{}={}'.format(k, str(kwargs[k])) | ||
for k in kwargs.keys()]))) | ||
if not known_kwargs: | ||
if kwargs: | ||
raise ParseError('"{}" does not accept keyword arguments.' | ||
.format(self.name), location=location) | ||
else: | ||
for key, value in kwargs.items(): | ||
if key not in known_kwargs: | ||
raise ParseError('"{}" does not accept the keyword ' | ||
'arguments "{}".' | ||
.format(self.name, key), | ||
location=location) | ||
|
||
def _require_kwargs(self, location: Location, required_kwargs: typing.Tuple[str, ...], | ||
**kwargs: typing.Any) -> None: | ||
for key in required_kwargs: | ||
if key not in kwargs: | ||
raise ParseError('"{}" requires the keyword ' | ||
'arguments "{}" to be passed.' | ||
.format(self.name, key), | ||
location=location) |
Oops, something went wrong.