# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""New installer that will ultimately replace installer.py. It features an event loop, non-blocking
I/O, and a POSIX jobserver to limit concurrency. It also has a more advanced terminal UI. It's
mostly self-contained to avoid interfering with the rest of Spack too much while it's being
developed and tested.
The installer consists of a UI process that manages multiple build processes and handles updates
to the database. It detects or creates a jobserver, and then kicks off an event loop in which it
runs through a build queue, always running at least one build. Concurrent builds run as jobserver
tokens are obtained. This means only one -j flag is needed to control concurrency.
The UI process has two modes: an overview mode where it shows the status of all builds, and a
mode where it follows the logs of a specific build. It listens to keyboard input to switch between
modes.
The build process does an ordinary install, but also spawns a "tee" thread that forwards its build
output to both a log file and the UI process (if the UI process has requested it). This thread also
runs an event loop to listen for control messages from the UI process (to enable/disable echoing
of logs), and for output from the build process."""
import codecs
import fcntl
import glob
import io
import json
import multiprocessing
import os
import re
import selectors
import shlex
import shutil
import signal
import sys
import tempfile
import termios
import threading
import time
import traceback
import tty
import warnings
from gzip import GzipFile
from multiprocessing import Pipe, Process
from multiprocessing.connection import Connection
from typing import (
TYPE_CHECKING,
Callable,
Dict,
FrozenSet,
Generator,
List,
NamedTuple,
Optional,
Set,
Tuple,
Union,
)
from spack.vendor.typing_extensions import Literal
import spack.binary_distribution
import spack.build_environment
import spack.builder
import spack.config
import spack.database
import spack.deptypes as dt
import spack.error
import spack.hooks
import spack.llnl.util.filesystem as fs
import spack.llnl.util.tty
import spack.llnl.util.tty.color
import spack.mirrors.mirror
import spack.paths
import spack.report
import spack.sandbox
import spack.spec
import spack.stage
import spack.store
import spack.subprocess_context
import spack.traverse
import spack.url_buildcache
import spack.util.environment
import spack.util.lock
from spack.installer import _do_fake_install, dump_packages
from spack.llnl.util.lang import pretty_duration
from spack.llnl.util.tty.log import _is_background_tty, ignore_signal
from spack.util.executable import ProcessError
from spack.util.log_parse import make_log_context, parse_log_events
from spack.util.path import padding_filter, padding_filter_bytes
if TYPE_CHECKING:
import spack.package_base
#: Type for specifying installation source modes
InstallPolicy = Literal["auto", "cache_only", "source_only"]
#: How often to update a spinner in seconds
SPINNER_INTERVAL = 0.1
#: How often to wake up in headless mode to check for background->foreground transition (seconds)
HEADLESS_WAKE_INTERVAL = 1.0
#: How long to display finished packages before graying them out
CLEANUP_TIMEOUT = 2.0
#: How often to flush completed builds to the database
DATABASE_WRITE_INTERVAL = 5.0
#: Size of the output buffer for child processes
OUTPUT_BUFFER_SIZE = 32768
#: Suffix for temporary backup during overwrite install
OVERWRITE_BACKUP_SUFFIX = ".old"
#: Suffix for temporary cleanup during failed install
OVERWRITE_GARBAGE_SUFFIX = ".garbage"
[docs]
class ExitCode:
SUCCESS = 0
BUILD_ERROR = 1
#: Exit code used by the child process to signal that the build was stopped at a phase boundary
STOPPED_AT_PHASE = 3
#: Exit code used by the child process to signal a binary cache miss (no source fallback)
BUILD_CACHE_MISS = 4
[docs]
class DatabaseAction:
"""Base class for objects that need to be persisted to the database."""
__slots__ = ("spec", "prefix_lock")
spec: "spack.spec.Spec"
prefix_lock: Optional[spack.util.lock.Lock]
[docs]
def save_to_db(self, db: spack.database.Database) -> None: ...
[docs]
def release_prefix_lock(self) -> None:
if self.prefix_lock is not None:
try:
self.prefix_lock.release_write()
except Exception:
pass
self.prefix_lock = None
[docs]
class MarkExplicitAction(DatabaseAction):
"""Action to mark an already installed spec as explicitly installed. Similar to ChildInfo, but
used when no build process was needed."""
__slots__ = ()
def __init__(self, spec: "spack.spec.Spec") -> None:
self.spec = spec
self.prefix_lock = None
[docs]
def save_to_db(self, db: spack.database.Database) -> None:
db._mark(self.spec, "explicit", True)
[docs]
class ChildInfo(DatabaseAction):
"""Information about a child process."""
__slots__ = ("proc", "output_r_conn", "state_r_conn", "control_w_conn", "explicit", "log_path")
def __init__(
self,
proc: Process,
spec: spack.spec.Spec,
output_r_conn: Connection,
state_r_conn: Connection,
control_w_conn: Connection,
log_path: str,
explicit: bool = False,
) -> None:
self.proc = proc
self.spec = spec
self.output_r_conn = output_r_conn
self.state_r_conn = state_r_conn
self.control_w_conn = control_w_conn
self.log_path = log_path
self.explicit = explicit
self.prefix_lock: Optional[spack.util.lock.Lock] = None
[docs]
def save_to_db(self, db: spack.database.Database) -> None:
return db._add(self.spec, explicit=self.explicit)
[docs]
def close(self, selector: selectors.BaseSelector) -> int:
"""Unregister and close file descriptors, and join the child process.
Returns the exit code of the child process."""
try:
selector.unregister(self.output_r_conn.fileno())
except KeyError:
pass
try:
selector.unregister(self.state_r_conn.fileno())
except KeyError:
pass
try:
selector.unregister(self.proc.sentinel)
except (KeyError, ValueError):
pass
self.output_r_conn.close()
self.state_r_conn.close()
self.control_w_conn.close()
self.proc.join()
exit_code = self.proc.exitcode
assert exit_code is not None, "Finished build should have exit code set"
if hasattr(self.proc, "close"): # No known equivalent in Python 3.6
self.proc.close()
return exit_code
[docs]
def send_state(state: str, state_pipe: io.TextIOWrapper) -> None:
"""Send a state update message."""
json.dump({"state": state}, state_pipe, separators=(",", ":"))
state_pipe.write("\n")
[docs]
def send_progress(current: int, total: int, state_pipe: io.TextIOWrapper) -> None:
"""Send a progress update message."""
json.dump({"progress": current, "total": total}, state_pipe, separators=(",", ":"))
state_pipe.write("\n")
[docs]
def send_installed_from_binary_cache(state_pipe: io.TextIOWrapper) -> None:
"""Send a notification that the package was installed from binary cache."""
json.dump({"installed_from_binary_cache": True}, state_pipe, separators=(",", ":"))
state_pipe.write("\n")
[docs]
def tee(control_r: int, log_r: int, log_file: io.BufferedWriter, parent_w: int) -> None:
"""Forward log_r to file_w and parent_w (if echoing is enabled).
Echoing is enabled and disabled by reading from control_r."""
echo_on = False
selector = selectors.DefaultSelector()
selector.register(log_r, selectors.EVENT_READ)
selector.register(control_r, selectors.EVENT_READ)
try:
with log_file, open(parent_w, "wb", closefd=False) as parent:
while True:
for key, _ in selector.select():
if key.fd == log_r:
data = os.read(log_r, OUTPUT_BUFFER_SIZE)
if not data: # EOF: exit the thread
return
log_file.write(data)
log_file.flush()
if echo_on:
parent.write(data)
parent.flush()
elif key.fd == control_r:
control_data = os.read(control_r, 1)
if not control_data:
return
else:
echo_on = control_data == b"1"
except OSError: # do not raise
pass
finally:
os.close(log_r)
[docs]
class Tee:
"""Emulates ./build 2>&1 | tee build.log. The output is sent both to a log file and the parent
process (if echoing is enabled). The control_fd is used to enable/disable echoing."""
def __init__(self, control: Connection, parent: Connection, log_path: str) -> None:
self.control = control
self.parent = parent
# sys.stdout and sys.stderr may have been replaced with file objects under pytest, so
# redirect their file descriptors in addition to the original fds 1 and 2.
fds = {sys.stdout.fileno(), sys.stderr.fileno(), 1, 2}
self.saved_fds = {fd: os.dup(fd) for fd in fds}
#: The path of the log file
self.log_path = log_path
log_file = open(self.log_path, "ab")
r, w = os.pipe()
self.tee_thread = threading.Thread(
target=tee,
args=(self.control.fileno(), r, log_file, self.parent.fileno()),
daemon=True,
)
self.tee_thread.start()
for fd in fds:
os.dup2(w, fd)
os.close(w)
[docs]
def close(self) -> None:
# Closing stdout and stderr should close the last reference to the write end of the pipe,
# causing the tee thread to wake up, flush the last data, and exit. We restore stdout and
# stderr, because between sys.exit and the actual process exit buffers may be flushed, and
# can cause exit code 120 (witnessed under pytest+coverage on macOS).
sys.stdout.flush()
sys.stderr.flush()
for fd, saved_fd in self.saved_fds.items():
os.dup2(saved_fd, fd)
os.close(saved_fd)
self.tee_thread.join()
# Only then close the other fds.
self.control.close()
self.parent.close()
[docs]
def install_from_buildcache(
mirrors: List[spack.url_buildcache.MirrorMetadata],
spec: spack.spec.Spec,
unsigned: Optional[bool],
state_stream: io.TextIOWrapper,
) -> bool:
send_state("fetching from build cache", state_stream)
try:
tarball_stage = spack.binary_distribution.download_tarball(
spec.build_spec, unsigned, mirrors
)
except spack.binary_distribution.NoConfiguredBinaryMirrors:
return False
if tarball_stage is None:
return False
send_state("relocating", state_stream)
spack.binary_distribution.extract_tarball(spec, tarball_stage, force=False)
if spec.spliced: # overwrite old metadata with new
spack.store.STORE.layout.write_spec(spec, spack.store.STORE.layout.spec_file_path(spec))
# now a block of curious things follow that should be fixed.
pkg = spec.package
if hasattr(pkg, "_post_buildcache_install_hook"):
pkg._post_buildcache_install_hook()
pkg.installed_from_binary_cache = True
# inform also the parent that this package was installed from binary cache.
send_installed_from_binary_cache(state_stream)
return True
[docs]
class GlobalState:
"""Global state needed in a build subprocess. This is similar to spack.subprocess_context,
but excludes the Spack environment, which is slow to serialize and should not be needed
during the build."""
__slots__ = ("store", "config", "monkey_patches", "spack_working_dir", "repo_cache")
def __init__(self):
if multiprocessing.get_start_method() == "fork":
return
self.config = spack.config.CONFIG.ensure_unwrapped()
self.store = spack.store.STORE
self.monkey_patches = spack.subprocess_context.TestPatches.create()
self.spack_working_dir = spack.paths.spack_working_dir
[docs]
def restore(self):
if multiprocessing.get_start_method() == "fork":
# In the forking case we must erase SSL contexts.
from spack.oci import opener
from spack.util import web
from spack.util.s3 import s3_client_cache
web.urlopen._instance = None
opener.urlopen._instance = None
s3_client_cache.clear()
return
spack.store.STORE = self.store
spack.config.CONFIG = self.config
self.monkey_patches.restore()
spack.paths.spack_working_dir = self.spack_working_dir
[docs]
class PrefixPivoter:
"""Manages the installation prefix of a build."""
def __init__(self, prefix: str, keep_prefix: bool = False) -> None:
"""Initialize the prefix pivoter.
Args:
prefix: The installation prefix path
keep_prefix: Whether to keep a failed installation prefix
"""
self.prefix = prefix
#: Whether to keep a failed installation prefix
self.keep_prefix = keep_prefix
#: Temporary location for the original prefix
self.tmp_prefix: Optional[str] = None
self.parent = os.path.dirname(prefix)
def __enter__(self) -> "PrefixPivoter":
"""Enter the context: move existing prefix to temporary location if needed."""
if not self._lexists(self.prefix):
return self
# Move the existing prefix to a temporary location so the build starts fresh
self.tmp_prefix = self._mkdtemp(
dir=self.parent, prefix=".", suffix=OVERWRITE_BACKUP_SUFFIX
)
self._rename(self.prefix, self.tmp_prefix)
return self
def __exit__(
self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[object]
) -> None:
"""Exit the context: cleanup on success, restore on failure."""
if exc_type is None:
# Success: remove the backup
if self.tmp_prefix is not None:
self._rmtree_ignore_errors(self.tmp_prefix)
return
# Failure handling:
if self.keep_prefix and not issubclass(exc_type, BinaryCacheMiss):
# Leave the failed prefix in place, discard the backup. Except for binary cache misses,
# which is a scheduling failure and not a build failure.
if self.tmp_prefix is not None:
self._rmtree_ignore_errors(self.tmp_prefix)
elif self.tmp_prefix is not None:
# There was a pre-existing prefix: pivot back to it and discard the failed build
garbage = self._mkdtemp(dir=self.parent, prefix=".", suffix=OVERWRITE_GARBAGE_SUFFIX)
try:
self._rename(self.prefix, garbage)
has_failed_prefix = True
except FileNotFoundError: # build never created the prefix dir
has_failed_prefix = False
self._rename(self.tmp_prefix, self.prefix)
if has_failed_prefix:
self._rmtree_ignore_errors(garbage)
elif self._lexists(self.prefix):
# No backup, just remove the failed installation
garbage = self._mkdtemp(dir=self.parent, prefix=".", suffix=OVERWRITE_GARBAGE_SUFFIX)
self._rename(self.prefix, garbage)
self._rmtree_ignore_errors(garbage)
def _lexists(self, path: str) -> bool:
return os.path.lexists(path)
def _rename(self, src: str, dst: str) -> None:
os.rename(src, dst)
def _mkdtemp(self, dir: str, prefix: str, suffix: str) -> str:
return tempfile.mkdtemp(dir=dir, prefix=prefix, suffix=suffix)
def _rmtree_ignore_errors(self, path: str) -> None:
shutil.rmtree(path, ignore_errors=True)
[docs]
def worker_function(
spec: spack.spec.Spec,
explicit: bool,
mirrors: List[spack.url_buildcache.MirrorMetadata],
unsigned: Optional[bool],
install_policy: InstallPolicy,
dirty: bool,
keep_stage: bool,
restage: bool,
keep_prefix: bool,
skip_patch: bool,
fake: bool,
install_source: bool,
run_tests: bool,
state: Connection,
parent: Connection,
echo_control: Connection,
makeflags: str,
js1: Optional[Connection],
js2: Optional[Connection],
log_path: str,
global_state: GlobalState,
stop_before: Optional[str] = None,
stop_at: Optional[str] = None,
):
"""
Function run in the build child process. Installs the specified spec, sending state updates
and build output back to the parent process.
Args:
spec: Spec to install
explicit: Whether the spec was explicitly requested by the user
mirrors: List of buildcache mirrors to try
unsigned: Whether to allow unsigned buildcache entries
install_policy: ``"auto"``, ``"cache_only"``, or ``"source_only"``
dirty: Whether to preserve user environment in the build environment
keep_stage: Whether to keep the build stage after installation
restage: Whether to restage the source before building
keep_prefix: Whether to keep a failed installation prefix
skip_patch: Whether to skip the patch phase
run_tests: Whether to run install-time tests for this package
state: Connection to send state updates to
parent: Connection to send build output to
echo_control: Connection to receive echo control messages from
makeflags: MAKEFLAGS to set, so that the build process uses the POSIX jobserver
js1: Connection for old style jobserver read fd (if any). Unused, just to inherit fd.
js2: Connection for old style jobserver write fd (if any). Unused, just to inherit fd.
log_path: Path to the log file to write build output to
global_state: Global state to restore
"""
# TODO: don't start a build for external packages
if spec.external:
return
global_state.restore()
# Isolate the process group to shield against Ctrl+C and enable safe killpg() cleanup. In
# constrast to setsid(), this keeps a neat process group hierarchy for utils like pstree.
os.setpgid(0, 0)
# Reset SIGTSTP to default in case the parent had a custom handler.
signal.signal(signal.SIGTSTP, signal.SIG_DFL)
def handle_sigterm(signum, frame):
# This SIGTERM handler forwards the signal to child processes (cmake, make, etc). We wait
# for all child processes to exit before raising KeyboardInterrupt. This ensures all
# __exit__ and finally blocks run after the child processes have stopped, meaning that we
# get to clean up the prefix without risking that the child process writes to it
# afterwards.
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(0, signal.SIGTERM)
try:
while True:
os.waitpid(-1, 0)
except ChildProcessError:
pass
raise KeyboardInterrupt("Installation interrupted")
signal.signal(signal.SIGTERM, handle_sigterm)
os.environ["MAKEFLAGS"] = makeflags
# Force line buffering for Python's textio wrappers of stdout/stderr. We're not going to print
# much ourselves, but what we print should appear before output from `make` and other build
# tools.
sys.stdout = os.fdopen(
sys.stdout.fileno(), "w", buffering=1, encoding=sys.stdout.encoding, closefd=False
)
sys.stderr = os.fdopen(
sys.stderr.fileno(), "w", buffering=1, encoding=sys.stderr.encoding, closefd=False
)
# Detach stdin from the terminal like `./build < /dev/null`. This would not be necessary if we
# used os.setsid() instead of os.setpgid(), but that would "break" pstree output.
devnull_fd = os.open(os.devnull, os.O_RDONLY)
os.dup2(devnull_fd, 0)
os.close(devnull_fd)
sys.stdin = open(os.devnull, "r", encoding=sys.stdin.encoding)
# Start the tee thread to forward output to the log file and parent process.
tee = Tee(echo_control, parent, log_path)
# Use closedfd=false because of the connection objects. Use line buffering.
state_stream = os.fdopen(state.fileno(), "w", buffering=1, closefd=False)
exit_code = ExitCode.SUCCESS
try:
with PrefixPivoter(spec.prefix, keep_prefix):
_install(
spec,
explicit,
mirrors,
unsigned,
install_policy,
dirty,
keep_stage,
restage,
skip_patch,
fake,
install_source,
state_stream,
log_path,
spack.store.STORE,
run_tests,
stop_before,
stop_at,
)
except spack.error.StopPhase:
exit_code = ExitCode.STOPPED_AT_PHASE
except ProcessError as e:
print(e, file=sys.stderr)
exit_code = ExitCode.BUILD_ERROR
except BinaryCacheMiss:
exit_code = ExitCode.BUILD_CACHE_MISS
except BaseException:
traceback.print_exc(limit=-4)
exit_code = ExitCode.BUILD_ERROR
finally:
tee.close()
state_stream.close()
if exit_code == ExitCode.SUCCESS:
# Try to install the compressed log file
if not os.path.lexists(spec.package.install_log_path):
try:
with open(log_path, "rb") as f, open(spec.package.install_log_path, "wb") as g:
# Use GzipFile directly so we can omit filename / mtime in header
gzip_file = GzipFile(
filename="", mode="wb", compresslevel=6, mtime=0, fileobj=g
)
shutil.copyfileobj(f, gzip_file)
gzip_file.close()
except Exception:
pass # don't fail the build just because log compression failed
sys.exit(exit_code)
def _archive_build_metadata(pkg: "spack.package_base.PackageBase") -> None:
"""Copy build metadata from stage to install prefix .spack directory.
Mirrors what the old installer's log() function does in the parent process.
Only called after a successful source build (not for binary cache installs).
Errors are suppressed to avoid failing the build over metadata archiving."""
try:
if os.path.lexists(pkg.env_mods_path):
shutil.copy2(pkg.env_mods_path, pkg.install_env_path)
except OSError as e:
spack.llnl.util.tty.debug(e)
try:
if os.path.lexists(pkg.configure_args_path):
shutil.copy2(pkg.configure_args_path, pkg.install_configure_args_path)
except OSError as e:
spack.llnl.util.tty.debug(e)
# Archive install-phase test log if present
try:
pkg.archive_install_test_log()
except Exception as e:
spack.llnl.util.tty.debug(e)
# Archive package-specific files matched by archive_files glob patterns
try:
with fs.working_dir(pkg.stage.path):
target_dir = os.path.join(
spack.store.STORE.layout.metadata_path(pkg.spec), "archived-files"
)
errors = io.StringIO()
for glob_expr in spack.builder.create(pkg).archive_files:
abs_expr = os.path.realpath(glob_expr)
if os.path.realpath(pkg.stage.path) not in abs_expr:
errors.write(f"[OUTSIDE SOURCE PATH]: {glob_expr}\n")
continue
if os.path.isabs(glob_expr):
glob_expr = os.path.relpath(glob_expr, pkg.stage.path)
for f in glob.glob(glob_expr):
try:
target = os.path.join(target_dir, f)
fs.mkdirp(os.path.dirname(target))
fs.install(f, target)
except Exception as e:
spack.llnl.util.tty.debug(e)
errors.write(f"[FAILED TO ARCHIVE]: {f}")
if errors.getvalue():
error_file = os.path.join(target_dir, "errors.txt")
fs.mkdirp(target_dir)
with open(error_file, "w", encoding="utf-8") as err:
err.write(errors.getvalue())
spack.llnl.util.tty.warn(
f"Errors occurred when archiving files.\n\tSee: {error_file}"
)
except Exception as e:
spack.llnl.util.tty.debug(e)
try:
packages_dir = spack.store.STORE.layout.build_packages_path(pkg.spec)
dump_packages(pkg.spec, packages_dir)
except Exception as e:
spack.llnl.util.tty.debug(e)
try:
spack.store.STORE.layout.write_host_environment(pkg.spec)
except Exception as e:
spack.llnl.util.tty.debug(e)
def _enable_sandbox(config: dict, spec: spack.spec.Spec, stage_path: str) -> None:
if not config.get("enable", False):
return
try:
sandbox = spack.sandbox.get_sandbox()
except spack.sandbox.SandboxError as e:
raise spack.error.InstallError(f"Cannot enable build sandbox: {e}") from e
for dep in spec.traverse(root=False):
if not dep.external:
sandbox.allow_read(dep.prefix)
sandbox.allow_write(stage_path)
sandbox.allow_write(spec.prefix)
# POSIX prescribes /tmp and /dev/null are present. In the future we can consider setting
# TMPPATH to a sibling of the stage path to isolate concurrent builds better.
sandbox.allow_write(tempfile.gettempdir())
sandbox.allow_write(os.devnull)
# Allow read access to sbang, which might be needed to run build scripts.
sandbox.allow_read(os.path.join(spack.store.STORE.unpadded_root, "bin", "sbang"))
for upstream_db in spack.store.STORE.upstreams or []:
sandbox.allow_read(os.path.join(upstream_db.root, "bin", "sbang"))
# User-configured paths
for p in config.get("allow_read", []):
sandbox.allow_read(p)
for p in config.get("allow_write", []):
sandbox.allow_write(p)
try:
sandbox.apply(block_network=not config.get("allow_network", True))
except spack.sandbox.SandboxError as e:
raise spack.error.InstallError(f"Cannot enable build sandbox: {e}") from e
def _install(
spec: spack.spec.Spec,
explicit: bool,
mirrors: List[spack.url_buildcache.MirrorMetadata],
unsigned: Optional[bool],
install_policy: InstallPolicy,
dirty: bool,
keep_stage: bool,
restage: bool,
skip_patch: bool,
fake: bool,
install_source: bool,
state_stream: io.TextIOWrapper,
log_path: str,
store: spack.store.Store = spack.store.STORE,
run_tests: bool = False,
stop_before: Optional[str] = None,
stop_at: Optional[str] = None,
) -> None:
"""Install a spec from build cache or source."""
# Create the stage and log file before starting the tee thread.
pkg = spec.package
pkg.run_tests = run_tests
if fake:
store.layout.create_install_directory(spec)
_do_fake_install(pkg)
spack.hooks.post_install(spec, explicit)
return
# Try to install from buildcache, unless user asked for source only
if install_policy != "source_only":
if install_from_buildcache(mirrors, spec, unsigned, state_stream):
spack.hooks.post_install(spec, explicit)
return
elif install_policy == "cache_only":
send_state("no binary available", state_stream)
raise BinaryCacheMiss(f"No binary available for {spec}")
unmodified_env = os.environ.copy()
env_mods = spack.build_environment.setup_package(pkg, dirty=dirty)
store.layout.create_install_directory(spec)
stage = pkg.stage
stage.keep = keep_stage
# Then try a source build.
with stage:
if restage:
stage.destroy()
stage.create()
# Write build environment and env-mods to stage
spack.util.environment.dump_environment(pkg.env_path)
with open(pkg.env_mods_path, "w", encoding="utf-8") as f:
f.write(env_mods.shell_modifications(explicit=True, env=unmodified_env))
# Try to snapshot configure/cmake args before phases run
for attr in ("configure_args", "cmake_args"):
try:
args = getattr(pkg, attr)()
with open(pkg.configure_args_path, "w", encoding="utf-8") as f:
f.write(" ".join(shlex.quote(a) for a in args))
break
except Exception:
pass
# For develop packages or non-develop packages with --keep-stage there may be a
# pre-existing symlink at pkg.log_path which would cause the new symlink to fail.
# Try removing it if it exists.
try:
os.unlink(pkg.log_path)
except OSError:
pass
os.symlink(log_path, pkg.log_path)
send_state("staging", state_stream)
if not skip_patch:
pkg.do_patch()
else:
pkg.do_stage()
os.chdir(stage.source_path)
if install_source and os.path.isdir(stage.source_path):
src_target = os.path.join(spec.prefix, "share", spec.name, "src")
fs.install_tree(stage.source_path, src_target)
spack.hooks.pre_install(spec)
builder = spack.builder.create(pkg)
if stop_before is not None and stop_before not in builder.phases:
raise spack.error.InstallError(f"'{stop_before}' is not a valid phase for {pkg.name}")
if stop_at is not None and stop_at not in builder.phases:
raise spack.error.InstallError(f"'{stop_at}' is not a valid phase for {pkg.name}")
_enable_sandbox(spack.config.get("config:sandbox", {}), spec, stage.path)
for phase in builder:
if stop_before is not None and phase.name == stop_before:
send_state(f"stopped before {stop_before}", state_stream)
raise spack.error.StopPhase(f"Stopping before '{stop_before}'")
send_state(phase.name, state_stream)
spack.llnl.util.tty.msg(f"{pkg.name}: Executing phase: '{phase.name}'")
# Run the install phase with debug output enabled.
old_debug = spack.llnl.util.tty.debug_level()
spack.llnl.util.tty.set_debug(1)
try:
phase.execute()
finally:
spack.llnl.util.tty.set_debug(old_debug)
if stop_at is not None and phase.name == stop_at:
send_state(f"stopped after {stop_at}", state_stream)
raise spack.error.StopPhase(f"Stopping at '{stop_at}'")
_archive_build_metadata(pkg)
spack.hooks.post_install(spec, explicit)
[docs]
class JobServer:
"""Attach to an existing POSIX jobserver or create a FIFO-based one."""
def __init__(self, num_jobs: int) -> None:
#: Keep track of how many tokens Spack itself has acquired, which is used to release them.
self.tokens_acquired = 0
#: The number of jobs to run concurrently. This translates to `num_jobs - 1` tokens in the
#: jobserver.
self.num_jobs = num_jobs
#: The target number of jobs to run concurrently, which may differ from num_jobs if the
#: user has requested a decrease in parallelism, but we haven't consumed enough tokens to
#: reflect that yet. This value is used in the UI. The invariant is that self.target_jobs
#: can only be modified if self.created is True.
self.target_jobs = num_jobs
self.fifo_path: Optional[str] = None
self.created = False
self._setup()
# Ensure that Executable()(...) in build processes ultimately inherit jobserver fds.
os.set_inheritable(self.r, True)
os.set_inheritable(self.w, True)
# r_conn and w_conn are used to make build processes inherit the jobserver fds if needed.
# Connection objects close the fd as they are garbage collected, so store them.
self.r_conn = Connection(self.r)
self.w_conn = Connection(self.w)
def _setup(self) -> None:
fifo_config = get_jobserver_config()
if type(fifo_config) is str:
# FIFO-based jobserver. Try to open the FIFO.
open_attempt = open_existing_jobserver_fifo(fifo_config)
if open_attempt:
self.r, self.w = open_attempt
self.fifo_path = fifo_config
return
elif type(fifo_config) is tuple:
# Old style pipe-based jobserver. Validate the fds before using them.
r, w = fifo_config
if fcntl.fcntl(r, fcntl.F_GETFD) != -1 and fcntl.fcntl(w, fcntl.F_GETFD) != -1:
self.r, self.w = r, w
return
# No existing jobserver we can connect to: create a FIFO-based one.
self.r, self.w, self.fifo_path = create_jobserver_fifo(self.num_jobs)
self.created = True
[docs]
def makeflags(self, gmake: Optional[spack.spec.Spec]) -> str:
"""Return the MAKEFLAGS for a build process, depending on its gmake build dependency."""
if self.fifo_path and (not gmake or gmake.satisfies("@4.4:")):
return f" -j{self.num_jobs} --jobserver-auth=fifo:{self.fifo_path}"
elif not gmake or gmake.satisfies("@4.0:"):
return f" -j{self.num_jobs} --jobserver-auth={self.r},{self.w}"
else:
return f" -j{self.num_jobs} --jobserver-fds={self.r},{self.w}"
[docs]
def has_target_parallelism(self) -> bool:
return self.num_jobs == self.target_jobs
[docs]
def increase_parallelism(self) -> None:
"""Add one token to the jobserver to increase parallelism; this should always work."""
if not self.created:
return
self.target_jobs += 1
# If a decrease was pending, don't add a token.
if self.target_jobs <= self.num_jobs:
return
os.write(self.w, b"+")
self.num_jobs += 1
[docs]
def decrease_parallelism(self) -> None:
"""Request an eventual concurrency decrease by 1."""
if not self.created or self.target_jobs <= 1:
return
self.target_jobs -= 1
self.maybe_discard_tokens()
[docs]
def maybe_discard_tokens(self) -> None:
"""Try to get reduce parallelism by discarding tokens."""
to_discard = self.num_jobs - self.target_jobs
if to_discard <= 0:
return
try:
# The read may return zero or just fewer bytes than requested; we'll try again later.
self.num_jobs -= len(os.read(self.r, to_discard))
except BlockingIOError:
pass
[docs]
def acquire(self, jobs: int) -> int:
"""Try and acquire at most 'jobs' tokens from the jobserver. Returns the number of
tokens actually acquired (may be less than requested, or zero)."""
try:
num_acquired = len(os.read(self.r, jobs))
self.tokens_acquired += num_acquired
return num_acquired
except BlockingIOError:
return 0
[docs]
def release(self) -> None:
"""Release a token back to the jobserver."""
# The last job to quit has an implicit token, so don't release if we have none.
if self.tokens_acquired == 0:
return
self.tokens_acquired -= 1
if self.target_jobs < self.num_jobs:
# If a decrease in parallelism is requested, discard a token instead of releasing it.
self.num_jobs -= 1
else:
os.write(self.w, b"+")
[docs]
def close(self) -> None:
if self.created and self.num_jobs > 1:
if self.tokens_acquired != 0:
# It's a non-fatal internal error to close the jobserver with acquired tokens.
warnings.warn("Spack failed to release jobserver tokens", stacklevel=2)
else:
# Verify that all build processes released the tokens they acquired.
total = self.num_jobs - 1
drained = self.acquire(total)
if drained != total:
n = total - drained
warnings.warn(
f"{n} jobserver {'token was' if n == 1 else 'tokens were'} not released "
"by the build processes. This can indicate that the build ran with "
"limited parallelism.",
stacklevel=2,
)
self.r_conn.close()
self.w_conn.close()
# Remove the FIFO if we created it.
if self.created and self.fifo_path:
try:
os.unlink(self.fifo_path)
except OSError:
pass
try:
os.rmdir(os.path.dirname(self.fifo_path))
except OSError:
pass
[docs]
def start_build(
spec: spack.spec.Spec,
explicit: bool,
mirrors: List[spack.url_buildcache.MirrorMetadata],
unsigned: Optional[bool],
install_policy: InstallPolicy,
dirty: bool,
keep_stage: bool,
restage: bool,
keep_prefix: bool,
skip_patch: bool,
fake: bool,
install_source: bool,
run_tests: bool,
jobserver: JobServer,
log_path: str,
stop_before: Optional[str] = None,
stop_at: Optional[str] = None,
) -> ChildInfo:
"""Start a new build."""
# Create pipes for the child's output, state reporting, and control.
state_r_conn, state_w_conn = Pipe(duplex=False)
output_r_conn, output_w_conn = Pipe(duplex=False)
control_r_conn, control_w_conn = Pipe(duplex=False)
# Obtain the MAKEFLAGS to be set in the child process, and determine whether it's necessary
# for the child process to inherit our jobserver fds.
gmake = next(iter(spec.dependencies("gmake")), None)
makeflags = jobserver.makeflags(gmake)
fifo = "--jobserver-auth=fifo:" in makeflags
proc = Process(
target=worker_function,
args=(
spec,
explicit,
mirrors,
unsigned,
install_policy,
dirty,
keep_stage,
restage,
keep_prefix,
skip_patch,
fake,
install_source,
run_tests,
state_w_conn,
output_w_conn,
control_r_conn,
makeflags,
None if fifo else jobserver.r_conn,
None if fifo else jobserver.w_conn,
log_path,
GlobalState(),
stop_before,
stop_at,
),
)
proc.start()
# The parent process does not need the write ends of the main pipes or the read end of control.
state_w_conn.close()
output_w_conn.close()
control_r_conn.close()
# Set the read ends to non-blocking: in principle redundant with epoll/kqueue, but safer.
os.set_blocking(output_r_conn.fileno(), False)
os.set_blocking(state_r_conn.fileno(), False)
return ChildInfo(proc, spec, output_r_conn, state_r_conn, control_w_conn, log_path, explicit)
[docs]
def get_jobserver_config(makeflags: Optional[str] = None) -> Optional[Union[str, Tuple[int, int]]]:
"""Parse MAKEFLAGS for jobserver. Either it's a FIFO or (r, w) pair of file descriptors.
Args:
makeflags: MAKEFLAGS string to parse. If None, reads from os.environ.
"""
makeflags = os.environ.get("MAKEFLAGS", "") if makeflags is None else makeflags
if not makeflags:
return None
# We can have the following flags:
# --jobserver-fds=R,W (before GNU make 4.2)
# --jobserver-auth=fifo:PATH or --jobserver-auth=R,W (after GNU make 4.2)
# In case of multiple, the last one wins.
matches = re.findall(r" --jobserver-[^=]+=([^ ]+)", makeflags)
if not matches:
return None
last_match: str = matches[-1]
assert isinstance(last_match, str)
if last_match.startswith("fifo:"):
return last_match[5:]
parts = last_match.split(",", 1)
if len(parts) != 2:
return None
try:
return int(parts[0]), int(parts[1])
except ValueError:
return None
[docs]
def create_jobserver_fifo(num_jobs: int) -> Tuple[int, int, str]:
"""Create a new jobserver FIFO with the specified number of job tokens."""
tmpdir = tempfile.mkdtemp()
fifo_path = os.path.join(tmpdir, "jobserver_fifo")
try:
os.mkfifo(fifo_path, 0o600)
read_fd = os.open(fifo_path, os.O_RDONLY | os.O_NONBLOCK)
write_fd = os.open(fifo_path, os.O_WRONLY)
# write num_jobs - 1 tokens, because the first job is implicit
os.write(write_fd, b"+" * (num_jobs - 1))
return read_fd, write_fd, fifo_path
except Exception:
try:
os.unlink(fifo_path)
except OSError as e:
spack.llnl.util.tty.debug(f"Failed to remove POSIX jobserver FIFO: {e}", level=3)
pass
try:
os.rmdir(tmpdir)
except OSError as e:
spack.llnl.util.tty.debug(f"Failed to remove POSIX jobserver FIFO dir: {e}", level=3)
pass
raise
[docs]
def open_existing_jobserver_fifo(fifo_path: str) -> Optional[Tuple[int, int]]:
"""Open an existing jobserver FIFO for reading and writing."""
try:
read_fd = os.open(fifo_path, os.O_RDONLY | os.O_NONBLOCK)
write_fd = os.open(fifo_path, os.O_WRONLY)
return read_fd, write_fd
except OSError:
return None
[docs]
class FdInfo:
"""Information about a file descriptor mapping."""
__slots__ = ("pid", "name")
def __init__(self, pid: int, name: str) -> None:
self.pid = pid
self.name = name
[docs]
class BuildInfo:
"""Information about a package being built."""
__slots__ = (
"state",
"explicit",
"version",
"hash",
"name",
"external",
"prefix",
"finished_time",
"start_time",
"duration",
"progress_percent",
"control_w_conn",
"log_path",
"log_summary",
)
def __init__(
self,
spec: spack.spec.Spec,
explicit: bool,
control_w_conn: Optional[Connection],
log_path: Optional[str] = None,
start_time: float = 0.0,
) -> None:
self.state: str = "starting"
self.explicit: bool = explicit
self.version: str = str(spec.version)
self.hash: str = spec.dag_hash(7)
self.name: str = spec.name
self.external: bool = spec.external
self.prefix: str = spec.prefix
self.finished_time: Optional[float] = None
self.start_time: float = start_time
self.duration: Optional[float] = None
self.progress_percent: Optional[int] = None
self.control_w_conn = control_w_conn
self.log_path = log_path
self.log_summary: Optional[str] = None
[docs]
class BuildStatus:
"""Tracks the build status display for terminal output."""
def __init__(
self,
total: int,
stdout: io.TextIOWrapper = sys.stdout, # type: ignore[assignment]
get_terminal_size: Callable[[], os.terminal_size] = os.get_terminal_size,
get_time: Callable[[], float] = time.monotonic,
is_tty: Optional[bool] = None,
color: Optional[bool] = None,
verbose: bool = False,
filter_padding: bool = False,
) -> None:
#: Ordered dict of build ID -> info
self.total = total
self.completed = 0
self.builds: Dict[str, BuildInfo] = {}
self.finished_builds: List[BuildInfo] = []
self.spinner_chars = ["|", "/", "-", "\\"]
self.spinner_index = 0
self.dirty = True # Start dirty to draw initial state
self.active_area_rows = 0
self.total_lines = 0
self.next_spinner_update = 0.0
self.next_update = 0.0
self.overview_mode = True # Whether to draw the package overview
self.tracked_build_id = "" # identifier of the package whose logs we follow
self.search_term = ""
self.search_mode = False
self.log_ends_with_newline = True
self.actual_jobs: int = 0
self.target_jobs: int = 0
self.blocked: bool = False
self.stdout = stdout
self.get_terminal_size = get_terminal_size
self.terminal_size = os.terminal_size((0, 0))
self.terminal_size_changed: bool = True
self.get_time = get_time
self.is_tty = is_tty if is_tty is not None else stdout.isatty()
if color is not None:
self.color = color
else:
self.color = spack.llnl.util.tty.color.get_color_when(stdout)
#: Verbose mode only applies to non-TTY where we want to track a single build log.
self.verbose = verbose and not self.is_tty
self.filter_padding = filter_padding
#: When True, suppress all terminal output (process is in background).
#: Controlling code is responsible for modifying this variable based on process state
self.headless = False
[docs]
def on_resize(self) -> None:
"""Refresh cached terminal size and trigger a redraw."""
self.terminal_size_changed = True
self.dirty = True
[docs]
def add_build(
self,
spec: spack.spec.Spec,
explicit: bool,
control_w_conn: Optional[Connection] = None,
log_path: Optional[str] = None,
) -> None:
"""Add a new build to the display and mark the display as dirty."""
build_info = BuildInfo(spec, explicit, control_w_conn, log_path, int(self.get_time()))
self.builds[spec.dag_hash()] = build_info
self.dirty = True
# Track the new build's logs when we're not already following another build. This applies
# only in non-TTY verbose mode.
if self.verbose and not self.tracked_build_id and control_w_conn is not None:
self.tracked_build_id = spec.dag_hash()
try:
os.write(control_w_conn.fileno(), b"1")
except OSError:
pass
[docs]
def remove_build(self, build_id: str) -> None:
"""Remove a build from the display (e.g. after a binary cache miss before retry)."""
self.builds.pop(build_id, None)
if self.tracked_build_id == build_id:
self.tracked_build_id = ""
if not self.overview_mode:
self.overview_mode = True
self.dirty = True
[docs]
def toggle(self) -> None:
"""Toggle between overview mode and following a specific build."""
if self.overview_mode:
self.next()
else:
if not self.log_ends_with_newline:
self.stdout.buffer.write(b"\n")
self.log_ends_with_newline = True
self.active_area_rows = 0
self.search_term = ""
self.search_mode = False
self.overview_mode = True
self.dirty = True
try:
conn = self.builds[self.tracked_build_id].control_w_conn
if conn is not None:
os.write(conn.fileno(), b"0")
except (KeyError, OSError):
pass
self.tracked_build_id = ""
[docs]
def enter_search(self) -> None:
self.search_mode = True
self.dirty = True
def _is_displayed(self, build: BuildInfo) -> bool:
"""Returns true if the build matches the search term, or when no search term is set."""
# When not in search mode, the search_term is "", which always evaluates to True below
return self.search_term in build.name or build.hash.startswith(self.search_term)
def _get_next(self, direction: int) -> Optional[str]:
"""Returns the next or previous unfinished build ID matching the search term, or None if
none found. Direction should be 1 for next, -1 for previous."""
matching = [
build_id
for build_id, build in self.builds.items()
if (build.finished_time is None or build.state == "failed")
and self._is_displayed(build)
]
if not matching:
return None
try:
idx = matching.index(self.tracked_build_id)
except ValueError:
return matching[0] if direction == 1 else matching[-1]
return matching[(idx + direction) % len(matching)]
[docs]
def next(self, direction: int = 1) -> None:
"""Follow the logs of the next build in the list."""
new_build_id = self._get_next(direction)
if not new_build_id or self.tracked_build_id == new_build_id:
return
new_build = self.builds[new_build_id]
if self.overview_mode:
self.overview_mode = False
# Stop following the previous and start following the new build.
if self.tracked_build_id:
try:
conn = self.builds[self.tracked_build_id].control_w_conn
if conn is not None:
os.write(conn.fileno(), b"0")
except (KeyError, OSError):
pass
self.tracked_build_id = new_build_id
version_str = (
f"\033[0;36m@{new_build.version}\033[0m" if self.color else f"@{new_build.version}"
)
prefix = "" if self.log_ends_with_newline else "\n"
if new_build.state == "failed":
# For failed builds, show the stored log summary instead of following live logs.
self.stdout.write(f"{prefix}==> Log summary of {new_build.name}{version_str}\n")
self.log_ends_with_newline = True
if new_build.log_summary:
self.stdout.write(new_build.log_summary)
if new_build.log_path:
if not new_build.log_summary:
self.stdout.write("No errors parsed from log, see full log: ")
else:
self.stdout.write("Full log: ")
self.stdout.write(f"{new_build.log_path}\n")
self.stdout.flush()
else:
# Tell the user we're following new logs, and instruct the child to start sending.
self.stdout.write(f"{prefix}==> Following logs of {new_build.name}{version_str}\n")
self.log_ends_with_newline = True
self.stdout.flush()
try:
conn = new_build.control_w_conn
if conn is not None:
os.write(conn.fileno(), b"1")
except (KeyError, OSError):
pass
[docs]
def set_blocked(self, blocked: bool) -> None:
"""Set whether all pending builds are blocked by another Spack process."""
if blocked == self.blocked:
return
self.blocked = blocked
self.dirty = True
[docs]
def set_jobs(self, actual: int, target: int) -> None:
"""Set the actual and target number of jobs to run concurrently."""
if actual == self.actual_jobs and target == self.target_jobs:
return
self.actual_jobs = actual
self.target_jobs = target
self.dirty = True
[docs]
def update_state(self, build_id: str, state: str) -> None:
"""Update the state of a package and mark the display as dirty."""
build_info = self.builds[build_id]
build_info.state = state
build_info.progress_percent = None
if state in ("finished", "failed"):
self.completed += 1
now = self.get_time()
build_info.duration = now - build_info.start_time
build_info.finished_time = now + CLEANUP_TIMEOUT
# Stop tracking the finished build's logs.
if build_id == self.tracked_build_id:
if not self.overview_mode:
self.toggle()
if self.verbose:
self.tracked_build_id = ""
self.dirty = True
# For non-TTY output, print state changes immediately
if not self.is_tty and not self.headless:
line = "".join(
self._generate_line_components(build_info, static=True, now=self.get_time())
)
self.stdout.write(line + "\n")
self.stdout.flush()
[docs]
def parse_log_summary(self, build_id: str) -> None:
"""Parse the build log for errors/warnings and store the summary."""
build_info = self.builds[build_id]
if not build_info.log_path or not os.path.exists(build_info.log_path):
return
errors, warnings, tail_event = parse_log_events(build_info.log_path, tail=20)
events = [*errors, *warnings]
if tail_event is not None:
events.append(tail_event)
if events:
build_info.log_summary = make_log_context(events)
[docs]
def update_progress(self, build_id: str, current: int, total: int) -> None:
"""Update the progress of a package and mark the display as dirty."""
percent = int((current / total) * 100)
build_info = self.builds[build_id]
if build_info.progress_percent != percent:
build_info.progress_percent = percent
self.dirty = True
[docs]
def update(self, finalize: bool = False) -> None:
"""Redraw the interactive display."""
if self.headless or not self.is_tty or not self.overview_mode:
return
now = self.get_time()
# Avoid excessive redraws
if not finalize and now < self.next_update:
return
# Only update the spinner if there are still running packages
if now >= self.next_spinner_update and any(
pkg.finished_time is None for pkg in self.builds.values()
):
self.spinner_index = (self.spinner_index + 1) % len(self.spinner_chars)
self.dirty = True
self.next_spinner_update = now + SPINNER_INTERVAL
for build_id in list(self.builds):
build_info = self.builds[build_id]
if build_info.state == "failed" or build_info.finished_time is None:
continue
if finalize or now >= build_info.finished_time:
self.finished_builds.append(build_info)
del self.builds[build_id]
self.dirty = True
if not self.dirty and not finalize:
return
# Build the overview output in a buffer and print all at once to avoid flickering.
buffer = io.StringIO()
# Move cursor up to the start of the display area assuming the same terminal width. If the
# terminal resized, lines may have wrapped, and we should've moved up further. We do not
# try to track that (would require keeping track of each line's width).
if self.active_area_rows > 0:
buffer.write(f"\033[{self.active_area_rows}A\r")
if self.terminal_size_changed:
self.terminal_size = self.get_terminal_size()
self.terminal_size_changed = False
# After resize, active_area_rows is invalidated due to possible line wrapping. Set to
# 0 to force newlines instead of cursor movement.
self.active_area_rows = 0
max_width, max_height = self.terminal_size
# First flush the finished builds. These are "persisted" in terminal history.
if self.finished_builds:
for build in self.finished_builds:
self._render_build(build, buffer, now=now)
self._println(buffer, force_newline=True) # should scroll the terminal
self.finished_builds.clear()
# Finished builds can span multiple lines, overlapping our "active area", invalidating
# active_area_rows. Set to 0 to force newlines instead of cursor movement.
self.active_area_rows = 0
# Then a header followed by the active builds. This is the "mutable" part of the display.
self.total_lines = 0
if not finalize:
if self.color:
bold = "\033[1m"
reset = "\033[0m"
cyan = "\033[36m"
else:
bold = reset = cyan = ""
if self.actual_jobs != self.target_jobs:
jobs_str = f"{self.actual_jobs}=>{self.target_jobs}"
else:
jobs_str = str(self.target_jobs)
long_header_len = len(
f"Progress: {self.completed}/{self.total} +/-: {jobs_str} jobs"
" /: filter v: logs n/p: next/prev"
)
if long_header_len < max_width:
self._println(
buffer,
f"{bold}Progress:{reset} {self.completed}/{self.total}"
f" {cyan}+{reset}/{cyan}-{reset}: "
f"{jobs_str} jobs"
f" {cyan}/{reset}: filter {cyan}v{reset}: logs"
f" {cyan}n{reset}/{cyan}p{reset}: next/prev",
)
else:
self._println(buffer, f"{bold}Progress:{reset} {self.completed}/{self.total}")
if self.blocked and not any(pkg.finished_time is None for pkg in self.builds.values()):
self._println(buffer, "Waiting for other Spack install process...")
displayed_builds = (
[b for b in self.builds.values() if self._is_displayed(b)]
if self.search_term
else self.builds.values()
)
len_builds = len(displayed_builds)
# Truncate if we have more builds than fit on the screen. In that case we have to reserve
# an additional line for the "N more..." message.
truncate_at = max_height - 3 if len_builds + 2 > max_height else len_builds
for i, build in enumerate(displayed_builds, 1):
if i > truncate_at:
self._println(buffer, f"{len_builds - i + 1} more...")
break
self._render_build(build, buffer, max_width, now=now)
self._println(buffer)
if self.search_mode:
buffer.write(f"filter> {self.search_term}\033[K")
# Clear any remaining lines from previous display
buffer.write("\033[0J")
# Print everything at once to avoid flickering
self.stdout.write(buffer.getvalue())
self.stdout.flush()
# Update the number of lines drawn for next time. It reflects the number of active builds.
self.active_area_rows = self.total_lines
self.dirty = False
# Schedule next UI update
self.next_update = now + SPINNER_INTERVAL / 2
def _println(self, buffer: io.StringIO, line: str = "", force_newline: bool = False) -> None:
"""Print a line to the buffer, handling line clearing and cursor movement."""
self.total_lines += 1
if line:
buffer.write(line)
if self.total_lines > self.active_area_rows or force_newline:
buffer.write("\033[0m\033[K\n") # reset, clear to EOL, newline
else:
buffer.write("\033[0m\033[K\033[1B\r") # reset, clear to EOL, move to next line
[docs]
def print_logs(self, build_id: str, data: bytes) -> None:
if self.headless:
return
# Discard logs we are not following. Generally this should not happen as we tell the child
# to only send logs when we are following it. It could maybe happen while transitioning
# between builds.
if build_id != self.tracked_build_id:
return
if self.filter_padding:
data = padding_filter_bytes(data)
self.stdout.buffer.write(data)
self.stdout.flush()
self.log_ends_with_newline = data.endswith(b"\n")
def _render_build(
self, build_info: BuildInfo, buffer: io.StringIO, max_width: int = 0, now: float = 0.0
) -> None:
"""Print a single build line to the buffer, truncating to max_width (if > 0)."""
line_width = 0
for component in self._generate_line_components(build_info, now=now):
# ANSI escape sequence(s), does not contribute to width
if not component.startswith("\033") and max_width > 0:
line_width += len(component)
if line_width > max_width:
break
buffer.write(component)
def _generate_line_components(
self, build_info: BuildInfo, static: bool = False, now: float = 0.0
) -> Generator[str, None, None]:
"""Yield formatted line components for a package. Escape sequences are yielded as separate
strings so they do not contribute to the line width."""
if build_info.external:
indicator = "[e]"
elif build_info.state == "finished":
indicator = "[+]"
elif build_info.state == "failed":
indicator = "[x]"
elif static:
indicator = "[ ]"
else:
indicator = f"[{self.spinner_chars[self.spinner_index]}]"
if self.color:
if build_info.state == "failed":
yield "\033[31m" # red
elif build_info.state == "finished":
yield "\033[32m" # green
yield indicator
if self.color:
yield "\033[0m" # reset
yield " "
if self.color:
yield "\033[0;90m" # dark gray
yield build_info.hash
if self.color:
yield "\033[0m" # reset
yield " "
# Package name in bold if explicit, default otherwise
if build_info.explicit:
if self.color:
yield "\033[1m"
yield build_info.name
if self.color:
yield "\033[0m" # reset
else:
yield build_info.name
if self.color:
yield "\033[0;36m" # cyan
yield f"@{build_info.version}"
if self.color:
yield "\033[0m" # reset
# progress or state
if build_info.progress_percent is not None:
yield " fetching"
yield f": {build_info.progress_percent}%"
elif build_info.state == "finished":
prefix = build_info.prefix
yield f" {padding_filter(prefix) if self.filter_padding else prefix}"
elif build_info.state == "failed":
yield " failed"
if build_info.log_path:
yield f": {build_info.log_path}"
else:
yield f" {build_info.state}"
# Duration
elapsed = (
build_info.duration
if build_info.duration is not None
else (now - build_info.start_time)
)
if elapsed > 0:
if self.color:
yield "\033[0;90m" # dark gray
yield f" ({pretty_duration(elapsed)})"
if self.color:
yield "\033[0m"
Nodes = Dict[str, spack.spec.Spec]
Edges = Dict[str, Set[str]]
[docs]
class BuildGraph:
"""Represents the dependency graph for package installation."""
def __init__(
self,
specs: List[spack.spec.Spec],
root_policy: InstallPolicy,
dependencies_policy: InstallPolicy,
include_build_deps: bool,
install_package: bool,
install_deps: bool,
database: spack.database.Database,
overwrite_set: Optional[Set[str]] = None,
tests: Union[bool, List[str], Set[str]] = False,
explicit_set: Optional[Set[str]] = None,
):
"""Construct a build graph from the given specs. This includes only packages that need to
be installed. Installed packages are pruned from the graph, and build dependencies are only
included when necessary."""
self.roots = {s.dag_hash() for s in specs}
self.nodes = {s.dag_hash(): s for s in specs}
self.parent_to_child: Dict[str, Set[str]] = {}
self.child_to_parent: Dict[str, Set[str]] = {}
overwrite_set = overwrite_set or set()
explicit_set = explicit_set or set()
self.pruned: Set[str] = set()
self.done: Set[str] = set()
self.force_source: Set[str] = set()
stack: List[Tuple[spack.spec.Spec, InstallPolicy]] = [
(s, root_policy) for s in self.nodes.values()
]
self.tests = tests
with database.read_transaction():
# Set the install prefix for each spec based on the db record or store layout
for s in spack.traverse.traverse_nodes(specs):
_, record = database.query_by_spec_hash(s.dag_hash())
if record and record.path:
s.set_prefix(record.path)
else:
s.set_prefix(spack.store.STORE.layout.path_for_spec(s))
# Build the graph and determine which specs to prune
while stack:
spec, install_policy = stack.pop()
key = spec.dag_hash()
_, record = database.query_by_spec_hash(key)
depflag = self._base_deptypes(spec)
# Conditionally include build dependencies. Don't prune installed specs
# that need to be marked explicit so they flow through the DB write path.
if record and record.installed and key not in overwrite_set:
# If it needs to be marked explicit, keep it in the graph (don't prune).
if key not in explicit_set or record.explicit:
self.pruned.add(key)
elif install_policy == "source_only" or include_build_deps:
depflag |= dt.BUILD
dependencies = spec.dependencies(deptype=depflag)
self.parent_to_child[key] = {d.dag_hash() for d in dependencies}
# Enqueue new dependencies
for d in dependencies:
if d.dag_hash() in self.nodes:
continue
self.nodes[d.dag_hash()] = d
stack.append((d, dependencies_policy))
# Construct reverse lookup from child to parent
for parent, children in self.parent_to_child.items():
for child in children:
if child in self.child_to_parent:
self.child_to_parent[child].add(parent)
else:
self.child_to_parent[child] = {parent}
# If we're not installing the package itself, mark root specs for pruning too
if not install_package:
self.pruned.update(s.dag_hash() for s in specs)
# Prune specs from the build graph. Their parents become parents of their children and
# their children become children of their parents.
for key in self.pruned:
for parent in self.child_to_parent.get(key, ()):
self.parent_to_child[parent].remove(key)
self.parent_to_child[parent].update(self.parent_to_child.get(key, ()))
for child in self.parent_to_child.get(key, ()):
self.child_to_parent[child].remove(key)
self.child_to_parent[child].update(self.child_to_parent.get(key, ()))
self.parent_to_child.pop(key, None)
self.child_to_parent.pop(key, None)
self.nodes.pop(key, None)
# Check that all prefixes to be created are unique.
prefixes = [s.prefix for s in self.nodes.values() if not s.external]
if len(prefixes) != len(set(prefixes)):
raise spack.error.InstallError(
"Install prefix collision: "
+ ", ".join(p for p in prefixes if prefixes.count(p) > 1)
)
# If we're not installing dependencies, verify that all remaining nodes in the build graph
# after pruning are roots. If there are any non-root nodes, it means there are uninstalled
# dependencies that we're not supposed to install.
if not install_deps:
non_root_spec = next((v for k, v in self.nodes.items() if k not in self.roots), None)
if non_root_spec is not None:
raise spack.error.InstallError(
f"Failed to install in package only mode: dependency {non_root_spec} is not "
"installed"
)
def _base_deptypes(self, spec: spack.spec.Spec) -> dt.DepFlag:
"""Returns the dependency types that are always eagerly traversed. These are LINK, RUN, and
conditionally TEST, but excludes BUILD. Build deps are deferred until after a build cache
miss."""
deptypes = dt.LINK | dt.RUN
if self.tests is True or (self.tests and spec.name in self.tests):
deptypes |= dt.TEST
return deptypes
[docs]
def enqueue_parents(self, dag_hash: str, pending_builds: List[str]) -> None:
"""After a spec is installed, remove it from the graph and enqueue any parents that are
now ready to install.
Args:
dag_hash: The dag_hash of the spec that was just installed
pending_builds: List to append parent specs that are ready to build
"""
self.done.add(dag_hash)
# Remove node and edges from the node in the build graph
self.parent_to_child.pop(dag_hash, None)
self.nodes.pop(dag_hash, None)
parents = self.child_to_parent.pop(dag_hash, None)
if not parents:
return
# Enqueue any parents and remove edges to the installed child
for parent in parents:
children = self.parent_to_child[parent]
children.remove(dag_hash)
if not children:
pending_builds.append(parent)
[docs]
def has_unexpanded_build_deps(self, dag_hash: str) -> bool:
return bool(self.get_unexpanded_build_deps(dag_hash))
[docs]
def get_unexpanded_build_deps(self, dag_hash: str) -> List["spack.spec.Spec"]:
"""Returns a list of unprocessed build deps for a spec."""
spec = self.nodes[dag_hash]
base_deptypes = self._base_deptypes(spec)
unexpanded = []
for edge in spec.edges_to_dependencies(depflag=dt.BUILD):
if (edge.depflag & base_deptypes) == 0:
unexpanded.append(edge.spec)
return unexpanded
[docs]
def expand_build_deps(
self,
spec_hashes: List[str],
pending_builds: List[str],
database: "spack.database.Database",
dependencies_policy: InstallPolicy = "auto",
) -> List[str]:
"""Expand build dependencies for a list of specs after binary cache misses.
Adds the spec's build deps and their transitive runtime deps to the graph. When
``dependencies_policy`` is ``"source_only"``, build deps of newly added specs are included
immediately. Installed deps are skipped without adding edges.
The caller must hold the database read lock and have called ``db._read()``.
Returns the list of newly added dag hashes."""
# Seed with tuples of (parent_hash, dep) for each to-be-expanded build dep
stack = [(h, dep) for h in spec_hashes for dep in self.get_unexpanded_build_deps(h)]
newly_added: List[str] = []
while stack:
parent_hash, dep = stack.pop()
dep_hash = dep.dag_hash()
# Skip installed deps
if dep_hash in self.pruned or dep_hash in self.done:
continue
# If already in the graph (e.g. overwrite build in progress), add edge but don't
# re-add node. This must be checked before the DB installed check, because an
# overwrite build is installed in the DB but not yet done.
if dep_hash in self.nodes:
self.parent_to_child.setdefault(parent_hash, set()).add(dep_hash)
self.child_to_parent.setdefault(dep_hash, set()).add(parent_hash)
continue
_, record = database.query_by_spec_hash(dep_hash)
if record and record.installed:
self.done.add(dep_hash)
continue
# Add forward/reverse edge
self.parent_to_child.setdefault(parent_hash, set()).add(dep_hash)
self.child_to_parent.setdefault(dep_hash, set()).add(parent_hash)
# New node: add to graph and recurse into its link/run/test deps
self.nodes[dep_hash] = dep
self.parent_to_child.setdefault(dep_hash, set())
newly_added.append(dep_hash)
deptype = self._base_deptypes(dep)
if dependencies_policy == "source_only":
deptype |= dt.BUILD
for child in dep.dependencies(deptype=deptype):
stack.append((dep_hash, child))
# Enqueue nodes that are ready (no uninstalled children)
for h in newly_added:
if not self.parent_to_child[h]:
pending_builds.append(h)
for dag_hash in spec_hashes:
if not self.parent_to_child[dag_hash]:
pending_builds.append(dag_hash)
return newly_added
[docs]
class ScheduleResult(NamedTuple):
"""Return value of :func:`schedule_builds`."""
#: True if any pending builds were blocked on locks held by other processes.
blocked: bool
#: ``(dag_hash, lock)`` pairs where the write lock is held and the caller must start the build
#: and eventually release the lock.
to_start: List[Tuple[str, spack.util.lock.Lock]]
#: ``(dag_hash, spec, lock)`` triples found already installed by another process; the read lock
#: is held and the caller must add it to retained_read_locks.
newly_installed: List[Tuple[str, spack.spec.Spec, spack.util.lock.Lock]]
#: Actions to mark already installed specs explicit in the DB.
to_mark_explicit: List[MarkExplicitAction]
[docs]
def schedule_builds(
pending: List[str],
build_graph: BuildGraph,
db: spack.database.Database,
prefix_locker: spack.database.SpecLocker,
overwrite: Set[str],
overwrite_time: float,
capacity: int,
needs_jobserver_token: bool,
jobserver: JobServer,
explicit: Set[str],
) -> ScheduleResult:
"""Try to schedule as many pending builds as possible.
For each pending spec, attempts to acquire a non-blocking per-spec write lock. If the write
lock times out, a read lock is tried as a fallback: a successful read lock means the first
process finished and downgraded its write lock. If the DB confirms the spec is installed, it
is captured as newly_installed; if the DB says it is not installed, the concurrent process was
likely killed mid-build, and the spec is retried next iteration. Under both the DB read lock
and the prefix lock, checks whether another process has already installed the spec. If so,
captures it as newly_installed (caller enqueues parents) and keeps a read lock on the prefix
to prevent concurrent uninstall. Otherwise, acquires a jobserver token if needed and adds the
(dag_hash, lock) pair to to_start (caller launches the build).
Args:
pending: List of dag hashes pending installation; modified in-place.
build_graph: The build dependency graph; used for node lookup and parent enqueueing.
db: Package database; used for read lock and installed-status queries.
prefix_locker: Per-spec write locker.
overwrite: Set of dag hashes to overwrite even if already installed.
overwrite_time: Timestamp (from time.time()) at which the overwrite install was requested.
A spec in ``overwrite`` whose DB installation_time >= overwrite_time was installed by
a concurrent process after our request started and should be treated as done.
capacity: Maximum number of new builds to add to to_start in this call.
needs_jobserver_token: True if a jobserver token is required for the first new build.
jobserver: Jobserver for acquiring tokens.
explicit: Set of dag hashes to mark explicit in the DB if found already installed.
Returns:
A :class:`ScheduleResult` with ``blocked``, ``to_start``, and ``newly_installed``
fields; see :class:`ScheduleResult` for field semantics.
"""
to_start: List[Tuple[str, spack.util.lock.Lock]] = []
newly_installed: List[Tuple[str, spack.spec.Spec, spack.util.lock.Lock]] = []
to_mark_explicit: List[MarkExplicitAction] = []
blocked = True
# Acquire the DB read lock non-blocking; hold it throughout the loop so the in-memory snapshot
# stays consistent while we acquire per-spec prefix locks.
if not db.lock.try_acquire_read():
return ScheduleResult(blocked, to_start, newly_installed, to_mark_explicit)
try:
db._read() # refresh in-memory snapshot under the read lock
idx = 0
while capacity and idx < len(pending):
dag_hash = pending[idx]
spec = build_graph.nodes[dag_hash]
lock = prefix_locker.lock(spec)
if lock.try_acquire_write():
blocked = False
have_write = True
elif lock.try_acquire_read():
have_write = False
else:
idx += 1
continue
# Check installed status under the DB read lock and prefix lock.
upstream, record = db.query_by_spec_hash(dag_hash)
# If the spec is already installed, treat it as done regardless of lock type.
# A spec in the overwrite set is also treated as done if another process installed it
# after our overwrite request was created (installation_time >= overwrite_time).
if (
record
and record.installed
and (dag_hash not in overwrite or record.installation_time >= overwrite_time)
):
if have_write:
lock.downgrade_write_to_read()
# keep the read lock (either downgraded or already a read lock)
del pending[idx]
newly_installed.append((dag_hash, spec, lock))
# It's already installed, but needs to be marked as explicitly installed in the DB.
if dag_hash in explicit and not record.explicit:
to_mark_explicit.append(MarkExplicitAction(spec))
build_graph.enqueue_parents(dag_hash, pending)
continue
if not have_write:
# If have to install but only got a read lock, try it in next iteration of the
# event loop.
lock.release_read()
idx += 1
continue
# Write lock acquired: proceed with scheduling.
# Don't schedule builds for specs from upstream databases.
if upstream and record and not record.installed:
lock.release_write()
raise spack.error.InstallError(
f"Cannot install {spec}: it is uninstalled in an upstream database."
)
# Defensively assert prefix invariants
if not spec.external:
if (
dag_hash in overwrite
and record
and record.installed
and record.path != spec.prefix
):
# Cannot do an overwrite install to a different prefix.
lock.release_write()
raise spack.error.InstallError(
f"Prefix mismatch in overwrite of {spec}: expected {record.path}, "
f"got {spec.prefix}"
)
elif dag_hash not in overwrite and spec.prefix in db._installed_prefixes:
# Prevent install prefix collision with other specs.
lock.release_write()
raise spack.error.InstallError(
f"Cannot install {spec}: prefix {spec.prefix} already exists"
)
# Acquire a jobserver token if needed. The first (implicit) job needs no token.
if needs_jobserver_token and not jobserver.acquire(1):
lock.release_write()
break # no tokens available right now; stop scheduling
del pending[idx]
to_start.append((dag_hash, lock))
capacity -= 1
needs_jobserver_token = True # all subsequent jobs need a token
finally:
db.lock.release_read()
return ScheduleResult(blocked, to_start, newly_installed, to_mark_explicit)
def _node_to_roots(roots: List[spack.spec.Spec]) -> Dict[str, FrozenSet[str]]:
"""Map each node in a graph to the set of root node DAG hashes that can reach it.
Args:
roots: List of root specs.
Returns:
A dictionary mapping each node's dag_hash to a frozenset of root dag_hashes.
"""
node_to_roots: Dict[str, FrozenSet[str]] = {
s.dag_hash(): frozenset([s.dag_hash()]) for s in roots
}
for edge in spack.traverse.traverse_edges(
roots, order="topo", cover="edges", root=False, key=spack.traverse.by_dag_hash
):
parent_roots = node_to_roots[edge.parent.dag_hash()]
child_hash = edge.spec.dag_hash()
existing = node_to_roots.get(child_hash)
if existing is None:
node_to_roots[child_hash] = parent_roots # keep a reference if no mutation is needed
elif not parent_roots.issubset(existing):
node_to_roots[child_hash] = existing | parent_roots
return node_to_roots
[docs]
class ReportData:
"""Data collected for reports during installation."""
def __init__(self, roots: List[spack.spec.Spec]):
self.roots = roots
self.build_records: Dict[str, spack.report.InstallRecord] = {}
[docs]
def start_record(self, spec: spack.spec.Spec) -> None:
"""Begin an InstallRecord for a spec that is about to be built."""
if spec.external:
return
record = spack.report.InstallRecord(spec)
record.start()
self.build_records[spec.dag_hash()] = record
[docs]
def finish_record(
self, spec: spack.spec.Spec, exitcode: int, log_path: Optional[str] = None
) -> None:
"""Mark the InstallRecord for a spec as succeeded or failed."""
record = self.build_records.get(spec.dag_hash())
if record is None or spec.external:
return
if exitcode == ExitCode.SUCCESS:
record.succeed(log_path)
else:
record.fail(
spack.error.InstallError(
f"Installation of {spec.name} failed; see log for details"
),
log_path,
)
[docs]
def finalize(
self, reports: Dict[str, spack.report.RequestRecord], build_graph: BuildGraph
) -> None:
"""Finalize InstallRecords and append them to RequestRecords after all builds finish.
Args:
reports: Map of root dag_hash to RequestRecord to append to.
build_graph: The build graph containing all nodes and their states.
"""
node_to_roots = _node_to_roots(self.roots)
for spec in spack.traverse.traverse_nodes(self.roots):
h = spec.dag_hash()
if h in self.build_records:
record = self.build_records[h]
else:
record = spack.report.InstallRecord(spec)
if spec.external:
msg = "Spec is external"
elif h in build_graph.pruned:
msg = "Spec was not scheduled for installation"
elif h in build_graph.nodes:
msg = "Dependencies failed to install"
else:
# If not installed or failed (build_records), not statically pruned ahead of
# time (build_graph.pruned), and also not scheduled (build_graph.nodes), it
# means it was in pending_builds or running_builds but never started/finished.
# This branch is followed on KeyboardInterrupt and --fail-fast.
msg = "Installation was interrupted"
record.skip(msg=msg)
for root_hash in node_to_roots[h]:
reports[root_hash].append_record(record)
[docs]
class NullReportData(ReportData):
"""No-op drop-in for ReportData when no reporter is configured.
Avoids creating InstallRecords and reading log files on every completed build."""
def __init__(self) -> None:
pass
[docs]
def start_record(self, spec: spack.spec.Spec) -> None:
pass
[docs]
def finish_record(
self, spec: spack.spec.Spec, exitcode: int, log_path: Optional[str] = None
) -> None:
pass
[docs]
def finalize(
self, reports: Dict[str, spack.report.RequestRecord], build_graph: "BuildGraph"
) -> None:
pass
[docs]
class TerminalState:
"""Manages terminal settings, stdin selector registration, and suspend/resume signals.
Installs a SIGTSTP handler that restores the terminal before suspending and re-applies it
on resume. After waking up it checks whether the process is in the foreground or background
and enables or suppresses interactive output accordingly.
Optional ``on_suspend`` / ``on_resume`` hooks are called just before the process suspends
and just after it wakes, allowing callers to pause and resume child processes."""
def __init__(
self,
selector: selectors.BaseSelector,
build_status: BuildStatus,
on_suspend: Optional[Callable[[], None]] = None,
on_resume: Optional[Callable[[], None]] = None,
) -> None:
self.selector = selector
self.build_status = build_status
self.on_suspend = on_suspend
self.on_resume = on_resume
self.old_stdin_settings = termios.tcgetattr(sys.stdin)
self.sigwinch_r = -1
self.sigwinch_w = -1
[docs]
def setup(self) -> None:
"""Set cbreak mode, register stdin and signal pipes in the selector."""
# SIGWINCH self-pipe (stdout must be a tty too)
if sys.stdout.isatty():
self.sigwinch_r, self.sigwinch_w = os.pipe()
os.set_blocking(self.sigwinch_r, False)
os.set_blocking(self.sigwinch_w, False)
self.selector.register(self.sigwinch_r, selectors.EVENT_READ, "sigwinch")
self.old_sigwinch = signal.signal(signal.SIGWINCH, self._handle_sigwinch)
else:
self.old_sigwinch = None
self.old_sigtstp = signal.signal(signal.SIGTSTP, self._handle_sigtstp)
# Start correctly depending on whether we're foregrounded or backgrounded
self.build_status.headless = True
if not _is_background_tty(sys.stdin):
self.enter_foreground()
[docs]
def teardown(self) -> None:
"""Restore terminal settings and signal handlers, close pipes."""
with ignore_signal(signal.SIGTTOU):
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_stdin_settings)
for sig, old in ((signal.SIGTSTP, self.old_sigtstp), (signal.SIGWINCH, self.old_sigwinch)):
if old is not None:
try:
signal.signal(sig, old)
except Exception as e:
spack.llnl.util.tty.debug(f"Failed to restore signal handler for {sig}: {e}")
if sys.stdin.fileno() in self.selector.get_map():
self.selector.unregister(sys.stdin.fileno())
for fd in (self.sigwinch_r, self.sigwinch_w):
if fd < 0:
continue
if fd in self.selector.get_map():
self.selector.unregister(fd)
try:
os.close(fd)
except Exception as e:
spack.llnl.util.tty.debug(f"Failed to close sigwinch pipe {fd}: {e}")
def _handle_sigtstp(self, signum: int, frame: object) -> None:
"""Restore terminal before suspending, then re-install handler after resume."""
# Reset so the first redraw after resume doesn't overwrite the shell's
# prompt / "$ fg" line.
self.build_status.active_area_rows = 0
# Restore terminal so the user's shell works normally while we're stopped.
with ignore_signal(signal.SIGTTOU):
termios.tcsetattr(sys.stdin, termios.TCSANOW, self.old_stdin_settings)
# Force headless mode before suspending so that enter_foreground() doesn't
# exit early when we resume, ensuring terminal settings are re-applied.
self.build_status.headless = True
# Actually suspend: reset to default handler then re-send SIGTSTP.
if self.on_suspend is not None:
self.on_suspend()
signal.signal(signal.SIGTSTP, signal.SIG_DFL)
os.kill(os.getpid(), signal.SIGTSTP)
# Execution resumes here after SIGCONT. Re-install our handler.
signal.signal(signal.SIGTSTP, self._handle_sigtstp)
if self.on_resume is not None:
self.on_resume()
self.handle_continue()
def _handle_sigwinch(self, signum: int, frame: object) -> None:
try:
os.write(self.sigwinch_w, b"\x00")
except OSError:
pass
[docs]
def enter_foreground(self) -> None:
"""Restore interactive terminal mode."""
if not self.build_status.headless:
return
# We save old settings right before applying cbreak.
# If we started in the background, bash may have had the terminal in its own
# readline (raw) mode when __init__ ran. Waiting until we are foregrounded
# ensures we capture the shell's exported 'sane' configuration for this job.
self.old_stdin_settings = termios.tcgetattr(sys.stdin)
with ignore_signal(signal.SIGTTOU):
tty.setcbreak(sys.stdin.fileno())
if sys.stdin.fileno() not in self.selector.get_map():
self.selector.register(sys.stdin.fileno(), selectors.EVENT_READ, "stdin")
self.build_status.headless = False
self.build_status.dirty = True
[docs]
def enter_background(self) -> None:
"""Suppress output and stop reading stdin to avoid SIGTTIN/SIGTTOU."""
if sys.stdin.fileno() in self.selector.get_map():
self.selector.unregister(sys.stdin.fileno())
self.build_status.headless = True
[docs]
def handle_continue(self) -> None:
"""Detect whether the process is in the foreground or background and adjust accordingly."""
if _is_background_tty(sys.stdin):
self.enter_background()
else:
self.enter_foreground()
def _signal_children(running_builds: Dict[int, ChildInfo], sig: signal.Signals) -> None:
"""Send a signal to the process group of each running build."""
for child in running_builds.values():
try:
pid = child.proc.pid
if pid is not None:
os.killpg(pid, sig)
except OSError:
pass
[docs]
class StdinReader:
"""Helper class to do non-blocking, incremental decoding of stdin, stripping ANSI escape
sequences. The input is the backing file descriptor for stdin (instead of the TextIOWrapper) to
avoid double buffering issues: the event loop triggers when the fd is ready to read, and if we
do a partial read from the TextIOWrapper, it will likely drain the fd and buffer the remainder
internally, which the event loop is not aware of, and user input doesn't come through."""
def __init__(self, fd: int) -> None:
self.fd = fd
#: Handle multi-byte UTF-8 characters
self.decoder = codecs.getincrementaldecoder("utf-8")(errors="replace")
#: For stripping out arrow and navigation keys
self.ansi_escape_re = re.compile(r"\x1b\[[0-9;]*[A-Za-z~]")
[docs]
def read(self) -> str:
try:
chars = self.decoder.decode(os.read(self.fd, 1024))
return self.ansi_escape_re.sub("", chars)
except OSError:
return ""
[docs]
class PackageInstaller:
explicit: Set[str]
def __init__(
self,
packages: List["spack.package_base.PackageBase"],
*,
dirty: bool = False,
explicit: Union[Set[str], bool] = False,
overwrite: Optional[Union[List[str], Set[str]]] = None,
fail_fast: bool = False,
fake: bool = False,
include_build_deps: bool = False,
install_deps: bool = True,
install_package: bool = True,
install_source: bool = False,
keep_prefix: bool = False,
keep_stage: bool = False,
restage: bool = True,
skip_patch: bool = False,
stop_at: Optional[str] = None,
stop_before: Optional[str] = None,
tests: Union[bool, List[str], Set[str]] = False,
unsigned: Optional[bool] = None,
verbose: bool = False,
concurrent_packages: Optional[int] = None,
root_policy: InstallPolicy = "auto",
dependencies_policy: InstallPolicy = "auto",
create_reports: bool = False,
) -> None:
assert install_package or install_deps, "Must install package, dependencies or both"
self.install_source = install_source
self.stop_at = stop_at
self.stop_before = stop_before
self.tests: Union[bool, List[str], Set[str]] = tests
self.db = spack.store.STORE.db
specs = [pkg.spec for pkg in packages]
# No point trying cache when there are no binary mirrors configured.
if not spack.mirrors.mirror.MirrorCollection(binary=True):
if root_policy == "auto":
root_policy = "source_only"
if dependencies_policy == "auto":
dependencies_policy = "source_only"
self.root_policy: InstallPolicy = root_policy
self.dependencies_policy: InstallPolicy = dependencies_policy
self.include_build_deps = include_build_deps
#: Set of DAG hashes to overwrite (if already installed)
self.overwrite: Set[str] = set(overwrite) if overwrite else set()
#: Time at which the overwrite install was requested; used to detect concurrent overwrites.
self.overwrite_time: float = time.time()
self.keep_prefix = keep_prefix
self.fail_fast = fail_fast
# Buffer for incoming, partially received state data from child processes
self.state_buffers: Dict[int, str] = {}
if explicit is True:
self.explicit = {spec.dag_hash() for spec in specs}
elif explicit is False:
self.explicit = set()
else:
self.explicit = explicit
# Build the dependency graph
self.build_graph = BuildGraph(
specs,
root_policy,
dependencies_policy,
include_build_deps,
install_package,
install_deps,
self.db,
self.overwrite,
tests,
self.explicit,
)
#: check what specs we could fetch from binaries (checks against cache, not remotely)
try:
spack.binary_distribution.BINARY_INDEX.update()
except spack.binary_distribution.FetchCacheError:
pass
self.binary_cache_for_spec = {
s.dag_hash(): spack.binary_distribution.BINARY_INDEX.find_by_hash(s.dag_hash())
for s in self.build_graph.nodes.values()
}
self.unsigned = unsigned
self.dirty = dirty
self.fake = fake
self.restage = restage
self.keep_stage = keep_stage
self.skip_patch = skip_patch
#: queue of packages ready to install (no children)
self.pending_builds = [
parent for parent, children in self.build_graph.parent_to_child.items() if not children
]
#: specs awaiting build-dep expansion (deferred until DB read lock is available)
self.pending_expansions: List[str] = []
self.verbose = verbose
self.running_builds: Dict[int, ChildInfo] = {}
self.log_paths: Dict[str, str] = {}
self.build_status = BuildStatus(
len(self.build_graph.nodes),
verbose=verbose,
filter_padding=spack.store.STORE.has_padding(),
)
self.jobs = spack.config.determine_number_of_jobs(parallel=True)
self.build_status.actual_jobs = self.jobs
self.build_status.target_jobs = self.jobs
if concurrent_packages is None:
concurrent_packages_config = spack.config.get("config:concurrent_packages", 0)
# The value 0 in config means no limit (other than self.jobs)
if concurrent_packages_config == 0:
self.capacity = sys.maxsize
else:
self.capacity = concurrent_packages_config
else:
self.capacity = concurrent_packages
# The reports property is what the old installer has and used as public interface.
if create_reports:
self.reports = {spec.dag_hash(): spack.report.RequestRecord(spec) for spec in specs}
self.report_data = ReportData(specs)
else:
self.reports = {}
self.report_data = NullReportData()
self.next_database_write = 0.0
[docs]
def install(self) -> None:
self._installer()
def _installer(self) -> None:
spack.store.STORE.install_sbang()
jobserver = JobServer(self.jobs)
selector = selectors.DefaultSelector()
# Set up terminal handling (cbreak, signals, stdin registration)
terminal: Optional[TerminalState] = None
stdin_reader: Optional[StdinReader] = None
if sys.stdin.isatty():
stdin_reader = StdinReader(sys.stdin.fileno())
terminal = TerminalState(
selector,
self.build_status,
on_suspend=lambda: _signal_children(self.running_builds, signal.SIGSTOP),
on_resume=lambda: _signal_children(self.running_builds, signal.SIGCONT),
)
terminal.setup()
# Finished builds that have not yet been written to the database.
database_actions: List[DatabaseAction] = []
# Prefix read locks retained after DB flush (downgraded from write locks in _save_to_db).
retained_read_locks: List[spack.util.lock.Lock] = []
failures: List[spack.spec.Spec] = []
finished_pids: List[int] = []
try:
# Try to schedule builds immediately. The first job does not require a token.
if self.pending_builds:
blocked = self._schedule_builds(
selector, jobserver, retained_read_locks, database_actions
)
self.build_status.set_blocked(blocked and not self.running_builds)
while (
self.pending_builds
or self.running_builds
or database_actions
or self.pending_expansions
):
# Monitor the jobserver when we have pending builds, capacity, and at least one
# spec is not locked by another process. Also listen if the target parallelism is
# reduced.
wake_on_jobserver = (
self.pending_builds
and self.capacity
and not blocked
or not jobserver.has_target_parallelism()
)
if wake_on_jobserver and jobserver.r not in selector.get_map():
selector.register(jobserver.r, selectors.EVENT_READ, "jobserver")
elif not wake_on_jobserver and jobserver.r in selector.get_map():
selector.unregister(jobserver.r)
stdin_ready = False
if self.build_status.headless:
# no UI to update, but check background to foreground transition periodically
timeout = HEADLESS_WAKE_INTERVAL
elif self.build_status.is_tty:
timeout = SPINNER_INTERVAL
else:
# when not in interactive mode, wake least often (no spinner/terminal updates)
timeout = DATABASE_WRITE_INTERVAL
events = selector.select(timeout=timeout)
finished_pids.clear()
# The transition "suspended to foreground/background" is handled in the signal
# handler, but there's no SIGCONT event in the transition of background to
# foreground, so we conditionally poll for that here (headless case). In the
# headless case the event loop only fires once per second, so this is cheap enough.
if terminal and self.build_status.headless and not _is_background_tty(sys.stdin):
terminal.enter_foreground()
for key, _ in events:
data = key.data
if isinstance(data, FdInfo):
# Child output (logs and state updates)
child_info = self.running_builds[data.pid]
if data.name == "output":
self._handle_child_logs(key.fd, child_info, selector)
elif data.name == "state":
self._handle_child_state(key.fd, child_info, selector)
elif data.name == "sentinel":
finished_pids.append(data.pid)
elif data == "stdin":
stdin_ready = True
elif data == "sigwinch":
assert terminal is not None
os.read(terminal.sigwinch_r, 64) # drain the pipe
self.build_status.on_resize()
elif data == "jobserver" and not jobserver.has_target_parallelism():
jobserver.maybe_discard_tokens()
self.build_status.set_jobs(jobserver.num_jobs, jobserver.target_jobs)
current_time = time.monotonic()
for pid in finished_pids:
self._handle_finished_build(
pid, current_time, jobserver, selector, failures, database_actions
)
if failures and self.fail_fast:
# Terminate other builds to actually fail fast. We continue in the event loop
# waiting for child processes to finish, which may take a little while.
for child in self.running_builds.values():
child.proc.terminate()
self.pending_builds.clear()
if stdin_ready and stdin_reader is not None:
for char in stdin_reader.read():
overview = self.build_status.overview_mode
if overview and self.build_status.search_mode:
self.build_status.search_input(char)
elif overview and char == "/":
self.build_status.enter_search()
elif char == "v" or char in ("q", "\x1b") and not overview:
self.build_status.toggle()
elif char == "n":
self.build_status.next(1)
elif char == "p" or char == "N":
self.build_status.next(-1)
elif char == "+":
jobserver.increase_parallelism()
self.build_status.set_jobs(jobserver.num_jobs, jobserver.target_jobs)
elif char == "-":
jobserver.decrease_parallelism()
self.build_status.set_jobs(jobserver.num_jobs, jobserver.target_jobs)
# Insert into the database if we have any finished builds, and either the delay
# interval has passed, or we're done with all builds. The database save is not
# guaranteed; it fails if another process holds the lock. We'll try again next
# iteration of the event loop in that case.
if (
database_actions
and (
current_time >= self.next_database_write
or not (self.pending_builds or self.running_builds)
)
and self._save_to_db(database_actions, retained_read_locks)
):
database_actions.clear()
# Try to expand build deps for cache-miss specs. This requires a read lock on the
# database, meaning that it can take several iterations of the event loop in case
# of contention with other processes.
if self.pending_expansions:
self._try_expand_build_deps()
# Try to schedule more builds, acquiring per-spec locks and jobserver tokens.
if self.capacity and self.pending_builds:
blocked = self._schedule_builds(
selector, jobserver, retained_read_locks, database_actions
)
self.build_status.set_blocked(blocked and not self.running_builds)
# Finally update the UI
self.build_status.update()
finally:
# First ensure that the user's terminal state is restored.
if terminal is not None:
terminal.teardown()
# Flush any not-yet-written successful builds to the DB; save the exception on error
# to be re-raised after best-effort cleanup.
db_exc = None
if database_actions:
try:
with self.db.write_transaction():
for action in database_actions:
action.save_to_db(self.db)
except Exception as e:
db_exc = e
# Send SIGTERM to running builds; this is a no-op in the successful case.
for child in self.running_builds.values():
try:
child.proc.terminate()
except Exception:
pass
# Release our jobserver token for each terminated build and then join.
for child in self.running_builds.values():
try:
jobserver.release()
child.proc.join(timeout=30)
if child.proc.is_alive():
child.proc.kill()
child.proc.join()
except Exception:
pass
# Release all held locks best-effort, so that one failure does not prevent the others
# from being released.
for child in self.running_builds.values():
child.release_prefix_lock()
for lock in retained_read_locks:
try:
lock.release_read()
except Exception:
pass
for action in database_actions:
action.release_prefix_lock()
try:
self.build_status.overview_mode = True
self.build_status.update(finalize=True)
selector.close()
jobserver.close()
except Exception:
pass
# Re-raise the DB exception if any.
if db_exc is not None:
raise db_exc
try:
self.report_data.finalize(self.reports, build_graph=self.build_graph)
except Exception as e:
spack.llnl.util.tty.debug(f"[{__name__}]: Failed to finalize reports: {e}]")
# Clean up temp log files of successful builds now that reports have consumed them.
if not self.keep_stage:
failed_hashes = {s.dag_hash() for s in failures}
for dag_hash, log_path in self.log_paths.items():
if log_path == os.devnull or dag_hash in failed_hashes:
continue
try:
os.unlink(log_path)
except OSError:
pass
if failures:
for s in failures:
build_info = self.build_status.builds[s.dag_hash()]
if build_info and build_info.log_summary:
sys.stderr.write(build_info.log_summary)
lines = [f"{s}: {self.log_paths[s.dag_hash()]}" for s in failures]
raise spack.error.InstallError(
"The following packages failed to install:\n" + "\n".join(lines)
)
def _handle_finished_build(
self,
pid: int,
current_time: float,
jobserver: JobServer,
selector: selectors.BaseSelector,
failures: List[spack.spec.Spec],
database_actions: List[DatabaseAction],
) -> None:
"""Handle a build that has finished. Remove from running_builds; release jobserver token;
update UI state; defer database insertion if successful; possibly reschedule if failed with
cache miss; register failures."""
build = self.running_builds.pop(pid)
dag_hash = build.spec.dag_hash()
self.capacity += 1
jobserver.release()
self.build_status.set_jobs(jobserver.num_jobs, jobserver.target_jobs)
self._drain_child_output(build, selector)
self._drain_child_state(build, selector)
exitcode = build.close(selector)
self.report_data.finish_record(build.spec, exitcode, build.log_path)
if exitcode == ExitCode.SUCCESS:
# Schedule successful builds for batched database insertion. We don't release the
# prefix lock here; that strictly happens after a successful db write.
database_actions.append(build)
self.build_graph.enqueue_parents(dag_hash, self.pending_builds)
self.next_database_write = current_time + DATABASE_WRITE_INTERVAL
self.build_status.update_state(dag_hash, "finished")
return
# When we don't have to do a db write, we can release the lock immediately.
build.release_prefix_lock()
is_root = dag_hash in self.build_graph.roots
user_policy = self.root_policy if is_root else self.dependencies_policy
if exitcode == ExitCode.STOPPED_AT_PHASE:
return # the user requested early stopping; don't treat as failure
elif exitcode == ExitCode.BUILD_CACHE_MISS and user_policy == "auto":
# Check if we can reschedule this as a source build after a build cache miss. If so,
# return early without recording a failure.
self.build_graph.force_source.add(dag_hash)
self.build_status.remove_build(dag_hash)
if self.build_graph.has_unexpanded_build_deps(dag_hash):
self.pending_expansions.append(dag_hash)
else:
self.pending_builds.append(dag_hash)
elif not failures or not self.fail_fast:
# Record a failure. In fail-fast mode, only record the first failure; subsequent
# failures may be a consequence of us terminating other builds.
failures.append(build.spec)
self.build_status.update_state(dag_hash, "failed")
self.build_status.parse_log_summary(dag_hash)
def _try_expand_build_deps(self) -> None:
"""Try to expand build deps for specs with cache misses. Non-blocking: returns immediately
if the DB read lock is unavailable."""
if not self.db.lock.try_acquire_read():
return
try:
self.db._read()
newly_added = self.build_graph.expand_build_deps(
self.pending_expansions, self.pending_builds, self.db, self.dependencies_policy
)
for h in newly_added:
self.binary_cache_for_spec[h] = (
spack.binary_distribution.BINARY_INDEX.find_by_hash(h)
)
self.build_status.total += len(newly_added)
self.pending_expansions.clear()
finally:
self.db.lock.release_read()
def _save_to_db(
self,
database_actions: List[DatabaseAction],
retained_read_locks: List[spack.util.lock.Lock],
) -> bool:
if not self.db.lock.try_acquire_write():
return False
try:
self.db._read()
for action in database_actions:
action.save_to_db(self.db)
finally:
self.db.lock.release_write(self.db._write)
# DB has been written and flushed; downgrade per-spec prefix write locks to read locks so
# other processes can see the specs are installed, while preventing concurrent uninstalls.
for action in database_actions:
if action.prefix_lock is not None:
try:
action.prefix_lock.downgrade_write_to_read()
retained_read_locks.append(action.prefix_lock)
except Exception:
action.prefix_lock.release_write()
raise
finally:
action.prefix_lock = None
return True
def _schedule_builds(
self,
selector: selectors.BaseSelector,
jobserver: JobServer,
retained_read_locks: List[spack.util.lock.Lock],
database_actions: List[DatabaseAction],
) -> bool:
"""Try to schedule as many pending builds as possible.
Delegates to the module-level schedule_builds() function and then performs the
side-effects that require the selector and running-build state: updating build_status for
specs that were found already installed, and launching new builds via _start().
Preconditions: self.capacity > 0 and self.pending_builds is not empty.
Returns True if we had capacity to schedule, but were blocked by locks held by other
processes. In that case we should not monitor the jobserver for new tokens, since we'd end
up in a busy wait loop until the locks are released.
"""
result = schedule_builds(
pending=self.pending_builds,
build_graph=self.build_graph,
db=self.db,
prefix_locker=spack.store.STORE.prefix_locker,
overwrite=self.overwrite,
overwrite_time=self.overwrite_time,
capacity=self.capacity,
needs_jobserver_token=bool(self.running_builds),
jobserver=jobserver,
explicit=self.explicit,
)
blocked = result.blocked
database_actions.extend(result.to_mark_explicit)
# Specs installed by another process.
for dag_hash, spec, lock in result.newly_installed:
retained_read_locks.append(lock)
explicit = dag_hash in self.explicit
self.build_status.add_build(spec, explicit=explicit)
self.build_status.update_state(dag_hash, "finished")
# Specs we can start building ourselves.
for dag_hash, lock in result.to_start:
self._start(selector, jobserver, dag_hash, lock)
return blocked
def _install_policy(self, dag_hash: str, is_root: bool) -> InstallPolicy:
if dag_hash in self.build_graph.force_source:
return "source_only"
policy = self.root_policy if is_root else self.dependencies_policy
if policy == "auto" and not self.include_build_deps:
return "cache_only"
return policy
def _start(
self,
selector: selectors.BaseSelector,
jobserver: JobServer,
dag_hash: str,
prefix_lock: spack.util.lock.Lock,
) -> None:
self.capacity -= 1
explicit = dag_hash in self.explicit
spec = self.build_graph.nodes[dag_hash]
is_develop = spec.is_develop
tests = self.tests
run_tests = tests is True or bool(tests and spec.name in tests)
is_root = dag_hash in self.build_graph.roots
# Both possible sub-processes (cache install, source build) append to the same log file.
if dag_hash not in self.log_paths:
if spec.external:
self.log_paths[dag_hash] = os.devnull
else:
log_fd, log_path = tempfile.mkstemp(
prefix=f"spack-stage-{spec.name}-{spec.version}-{spec.dag_hash()}-",
suffix=".log",
dir=spack.stage.get_stage_root(),
)
os.close(log_fd)
self.log_paths[dag_hash] = log_path
child_info = start_build(
spec,
explicit=explicit,
mirrors=self.binary_cache_for_spec[dag_hash],
unsigned=self.unsigned,
install_policy=self._install_policy(dag_hash, is_root),
dirty=self.dirty,
# keep_stage/restage logic taken from installer.py
keep_stage=self.keep_stage or is_develop,
restage=self.restage and not is_develop,
keep_prefix=self.keep_prefix,
skip_patch=self.skip_patch,
fake=self.fake,
install_source=self.install_source,
run_tests=run_tests,
jobserver=jobserver,
log_path=self.log_paths[dag_hash],
stop_before=self.stop_before if is_root else None,
stop_at=self.stop_at if is_root else None,
)
child_info.prefix_lock = prefix_lock
pid = child_info.proc.pid
assert type(pid) is int
self.running_builds[pid] = child_info
selector.register(
child_info.output_r_conn.fileno(), selectors.EVENT_READ, FdInfo(pid, "output")
)
selector.register(
child_info.state_r_conn.fileno(), selectors.EVENT_READ, FdInfo(pid, "state")
)
selector.register(child_info.proc.sentinel, selectors.EVENT_READ, FdInfo(pid, "sentinel"))
self.build_status.add_build(
child_info.spec,
explicit=explicit,
control_w_conn=child_info.control_w_conn,
log_path=child_info.log_path,
)
self.report_data.start_record(spec)
def _handle_child_logs(
self, r_fd: int, child_info: ChildInfo, selector: selectors.BaseSelector
) -> None:
"""Handle reading output logs from a child process pipe."""
try:
# There might be more data than OUTPUT_BUFFER_SIZE, but we will read that in the next
# iteration of the event loop to keep things responsive.
data = os.read(r_fd, OUTPUT_BUFFER_SIZE)
except BlockingIOError:
return
except OSError:
data = None
if not data: # EOF or error
try:
selector.unregister(r_fd)
except KeyError:
pass
return
self.build_status.print_logs(child_info.spec.dag_hash(), data)
def _drain_child_output(self, child_info: ChildInfo, selector: selectors.BaseSelector) -> None:
"""Read and print any remaining output from a finished child's pipe."""
r_fd = child_info.output_r_conn.fileno()
while r_fd in selector.get_map():
self._handle_child_logs(r_fd, child_info, selector)
def _drain_child_state(self, child_info: ChildInfo, selector: selectors.BaseSelector) -> None:
"""Read and process any remaining state messages from a finished child's pipe."""
r_fd = child_info.state_r_conn.fileno()
while r_fd in selector.get_map():
self._handle_child_state(r_fd, child_info, selector)
def _handle_child_state(
self, r_fd: int, child_info: ChildInfo, selector: selectors.BaseSelector
) -> None:
"""Handle reading state updates from a child process pipe."""
try:
# There might be more data than OUTPUT_BUFFER_SIZE, but we will read that in the next
# iteration of the event loop to keep things responsive.
data = os.read(r_fd, OUTPUT_BUFFER_SIZE)
except BlockingIOError:
return
except OSError:
data = None
if not data: # EOF or error
try:
selector.unregister(r_fd)
except KeyError:
pass
self.state_buffers.pop(r_fd, None)
return
# Append new data to the buffer for this fd and process it
buffer = self.state_buffers.get(r_fd, "") + data.decode(errors="replace")
lines = buffer.split("\n")
# The last element of split() will be a partial line or an empty string.
# We store it back in the buffer for the next read.
self.state_buffers[r_fd] = lines.pop()
for line in lines:
if not line:
continue
try:
message = json.loads(line)
except json.JSONDecodeError:
continue
if "state" in message:
self.build_status.update_state(child_info.spec.dag_hash(), message["state"])
elif "progress" in message and "total" in message:
self.build_status.update_progress(
child_info.spec.dag_hash(), message["progress"], message["total"]
)
elif "installed_from_binary_cache" in message:
child_info.spec.package.installed_from_binary_cache = True
[docs]
class BinaryCacheMiss(spack.error.SpackError):
pass