Source code for spack.environment.environment

# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import collections
import collections.abc
import contextlib
import errno
import glob
import os
import pathlib
import re
import shutil
import stat
import warnings
from collections.abc import KeysView
from itertools import zip_longest
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Mapping,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
)

import spack
import spack.config
import spack.deptypes as dt
import spack.error
import spack.filesystem_view as fsv
import spack.hash_types as ht
import spack.installer_dispatch
import spack.llnl.util.filesystem as fs
import spack.llnl.util.tty as tty
import spack.llnl.util.tty.color as clr
import spack.package_base
import spack.paths
import spack.repo
import spack.schema.env
import spack.spec
import spack.store
import spack.user_environment as uenv
import spack.util.environment
import spack.util.hash
import spack.util.lock as lk
import spack.util.path
import spack.util.spack_json as sjson
import spack.util.spack_yaml as syaml
import spack.variant as vt
from spack import traverse
from spack.enums import ConfigScopePriority
from spack.llnl.util.filesystem import copy_tree, islink, readlink, symlink
from spack.llnl.util.lang import stable_partition
from spack.llnl.util.link_tree import ConflictingSpecsError
from spack.schema.env import TOP_LEVEL_KEY
from spack.spec import Spec
from spack.spec_filter import SpecFilter
from spack.util.path import substitute_path_variables

from .list import SpecList, SpecListError, SpecListParser

SpecPair = Tuple[Spec, Spec]

DEFAULT_USER_SPEC_GROUP = "default"

#: environment variable used to indicate the active environment
spack_env_var = "SPACK_ENV"

#: environment variable used to indicate the active environment view
spack_env_view_var = "SPACK_ENV_VIEW"

#: currently activated environment
_active_environment: Optional["Environment"] = None

# This is used in spack.main to bypass env failures if the command is `spack config edit`
# It is used in spack.cmd.config to get the path to a failed env for `spack config edit`
#: Validation error for a currently activate environment that failed to parse
_active_environment_error: Optional[spack.config.ConfigFormatError] = None

#: default path where environments are stored in the spack tree
default_env_path = os.path.join(spack.paths.var_path, "environments")


#: Name of the input yaml file for an environment
manifest_name = "spack.yaml"


#: Name of the input yaml file for an environment
lockfile_name = "spack.lock"


#: Name of the directory where environments store repos, logs, views, configs
env_subdir_name = ".spack-env"


[docs] def env_root_path() -> str: """Override default root path if the user specified it""" return spack.util.path.canonicalize_path( spack.config.get("config:environments_root", default=default_env_path) )
[docs] def environment_name(path: Union[str, pathlib.Path]) -> str: """Human-readable representation of the environment. This is the path for independent environments, and just the name for managed environments. """ env_root = pathlib.Path(env_root_path()).resolve() path_path = pathlib.Path(path) # For a managed environment created in Spack, env.path is ENV_ROOT/NAME # For a tracked environment from `spack env track`, the path is symlinked to ENV_ROOT/NAME # So if ENV_ROOT/NAME resolves to env.path we know the environment is tracked/managed. # Otherwise, it is an independent environment and we return the path. # # We resolve both paths fully because the env_root itself could also be a symlink, # and any directory in env.path could be a symlink. if (env_root / path_path.name).resolve() == path_path.resolve(): return path_path.name else: return str(path)
[docs] def ensure_no_disallowed_env_config_mods(scope: spack.config.ConfigScope) -> None: config = scope.get_section("config") if config and "environments_root" in config["config"]: raise SpackEnvironmentError( "Spack environments are prohibited from modifying 'config:environments_root' " "because it can make the definition of the environment ill-posed. Please " "remove from your environment and place it in a permanent scope such as " "defaults, system, site, etc." )
[docs] def default_manifest_yaml(): """default spack.yaml file to put in new environments""" return """\ # This is a Spack Environment file. # # It describes a set of packages to be installed, along with # configuration settings. spack: # add package specs to the `specs` list specs: [] view: true concretizer: unify: {} """.format("true" if spack.config.get("concretizer:unify") else "false")
sep_re = re.escape(os.sep) #: regex for validating environment names valid_environment_name_re = rf"^\w[{sep_re}\w-]*$" #: version of the lockfile format. Must increase monotonically. CURRENT_LOCKFILE_VERSION = 7 READER_CLS = { 1: spack.spec.SpecfileV1, 2: spack.spec.SpecfileV1, 3: spack.spec.SpecfileV2, 4: spack.spec.SpecfileV3, 5: spack.spec.SpecfileV4, 6: spack.spec.SpecfileV5, 7: spack.spec.SpecfileV5, } # Magic names # The name of the standalone spec list in the manifest yaml USER_SPECS_KEY = "specs" # The name of the default view (the view loaded on env.activate) default_view_name = "default" # Default behavior to link all packages into views (vs. only root packages) default_view_link = "all" # (DEPRECATED) Use as the heading/name in the manifest is deprecated. # The key for any concrete specs included in a lockfile. lockfile_include_key = "include_concrete" # The name/heading for include paths in the manifest file. manifest_include_name = "include"
[docs] def installed_specs(): """ Returns the specs of packages installed in the active environment or None if no packages are installed. """ env = active_environment() hashes = env.all_hashes() if env else None return spack.store.STORE.db.query(hashes=hashes)
[docs] def valid_env_name(name): return re.match(valid_environment_name_re, name)
[docs] def validate_env_name(name): if not valid_env_name(name): raise ValueError( f"{name}: names may only contain letters, numbers, _, and -, and may not start with -." ) return name
[docs] def activate(env, use_env_repo=False): """Activate an environment. To activate an environment, we add its manifest's configuration scope to the existing Spack configuration, and we set active to the current environment. Arguments: env (Environment): the environment to activate use_env_repo (bool): use the packages exactly as they appear in the environment's repository """ global _active_environment try: _active_environment = env # Fail early to avoid ending in an invalid state if not isinstance(env, Environment): raise TypeError("`env` should be of type {0}".format(Environment.__name__)) # Check if we need to reinitialize spack.store.STORE and spack.repo.REPO due to # config changes. install_tree_before = spack.config.get("config:install_tree") upstreams_before = spack.config.get("upstreams") repos_before = spack.config.get("repos") env.manifest.prepare_config_scope() install_tree_after = spack.config.get("config:install_tree") upstreams_after = spack.config.get("upstreams") repos_after = spack.config.get("repos") if install_tree_before != install_tree_after or upstreams_before != upstreams_after: setattr(env, "store_token", spack.store.reinitialize()) if repos_before != repos_after: setattr(env, "repo_token", spack.repo.PATH) spack.repo.PATH.disable() new_repo = spack.repo.RepoPath.from_config(spack.config.CONFIG) if use_env_repo: new_repo.put_first(env.repo) spack.repo.enable_repo(new_repo) tty.debug(f"Using environment '{env.name}'") except Exception: _active_environment = None raise
[docs] def deactivate(): """Undo any configuration or repo settings modified by ``activate()``.""" global _active_environment if not _active_environment: return # If any config changes affected spack.store.STORE or spack.repo.PATH, undo them. store = getattr(_active_environment, "store_token", None) if store is not None: spack.store.restore(store) delattr(_active_environment, "store_token") repo = getattr(_active_environment, "repo_token", None) if repo is not None: spack.repo.PATH.disable() spack.repo.enable_repo(repo) _active_environment.manifest.deactivate_config_scope() tty.debug(f"Deactivated environment '{_active_environment.name}'") _active_environment = None
[docs] def active_environment() -> Optional["Environment"]: """Returns the active environment when there is any""" return _active_environment
def _root(name): """Non-validating version of root(), to be used internally.""" return os.path.join(env_root_path(), name)
[docs] def root(name): """Get the root directory for an environment by name.""" validate_env_name(name) return _root(name)
[docs] def exists(name): """Whether an environment with this name exists or not.""" return valid_env_name(name) and os.path.lexists(os.path.join(_root(name), manifest_name))
[docs] def active(name): """True if the named environment is active.""" return _active_environment and name == _active_environment.name
[docs] def is_env_dir(path): """Whether a directory contains a spack environment.""" return os.path.isdir(path) and os.path.exists(os.path.join(path, manifest_name))
[docs] def as_env_dir(name_or_dir): """Translate an environment name or directory to the environment directory""" if is_env_dir(name_or_dir): return name_or_dir else: validate_env_name(name_or_dir) if not exists(name_or_dir): raise SpackEnvironmentError("no such environment '%s'" % name_or_dir) return _root(name_or_dir)
[docs] def environment_from_name_or_dir(name_or_dir): """Get an environment with the supplied name.""" return Environment(as_env_dir(name_or_dir))
[docs] def read(name): """Get an environment with the supplied name.""" validate_env_name(name) if not exists(name): raise SpackEnvironmentError("no such environment '%s'" % name) return Environment(root(name))
[docs] def create( name: str, init_file: Optional[Union[str, pathlib.Path]] = None, with_view: Optional[Union[str, pathlib.Path, bool]] = None, keep_relative: bool = False, include_concrete: Optional[List[str]] = None, ) -> "Environment": """Create a managed environment in Spack and returns it. A managed environment is created in a root directory managed by this Spack instance, so that Spack can keep track of them. Files with suffix ``.json`` or ``.lock`` are considered lockfiles. Files with any other name are considered manifest files. Args: name: name of the managed environment init_file: either a lockfile, a manifest file, or None with_view: whether a view should be maintained for the environment. If the value is a string, it specifies the path to the view keep_relative: if True, develop paths are copied verbatim into the new environment file, otherwise they are made absolute include_concrete: concrete environment names/paths to be included """ environment_dir = environment_dir_from_name(name, exists_ok=False) return create_in_dir( environment_dir, init_file=init_file, with_view=with_view, keep_relative=keep_relative, include_concrete=include_concrete, )
[docs] def create_in_dir( root: Union[str, pathlib.Path], init_file: Optional[Union[str, pathlib.Path]] = None, with_view: Optional[Union[str, pathlib.Path, bool]] = None, keep_relative: bool = False, include_concrete: Optional[List[str]] = None, ) -> "Environment": """Create an environment in the directory passed as input and returns it. Files with suffix ``.json`` or ``.lock`` are considered lockfiles. Files with any other name are considered manifest files. Args: root: directory where to create the environment. init_file: either a lockfile, a manifest file, an env directory, or None with_view: whether a view should be maintained for the environment. If the value is a string, it specifies the path to the view keep_relative: if True, develop paths are copied verbatim into the new environment file, otherwise they are made absolute include_concrete: concrete environment names/paths to be included """ # If the initfile is a named environment, get its path if init_file and exists(str(init_file)): init_file = read(str(init_file)).path initialize_environment_dir(root, envfile=init_file) if with_view is None and keep_relative: return Environment(root) try: manifest = EnvironmentManifestFile(root) if with_view is not None: manifest.set_default_view(with_view) if include_concrete is not None: set_included_envs_to_env_paths(include_concrete) validate_included_envs_exists(include_concrete) validate_included_envs_concrete(include_concrete) manifest.set_include_concrete(include_concrete) manifest.flush() except (spack.config.ConfigFormatError, SpackEnvironmentConfigError) as e: shutil.rmtree(root) raise e env = Environment(root) if init_file: if os.path.isdir(init_file): init_file_dir = init_file copied = True else: init_file_dir = os.path.abspath(os.path.dirname(init_file)) copied = False if not keep_relative: if env.path != init_file_dir: # If we are here, we are creating an environment based on an # spack.yaml file in another directory, and moreover we want # dev paths in this environment to refer to their original # locations. # If the full env was copied including internal files, only rewrite # relative paths outside of env _rewrite_relative_dev_paths_on_relocation(env, init_file_dir, copied_env=copied) _rewrite_relative_repos_paths_on_relocation(env, init_file_dir, copied_env=copied) return env
def _rewrite_relative_dev_paths_on_relocation(env, init_file_dir, copied_env=False): """When initializing the environment from a manifest file and we plan to store the environment in a different directory, we have to rewrite relative paths to absolute ones.""" with env: dev_specs = spack.config.get("develop", default={}, scope=env.scope_name) if not dev_specs: return for name, entry in dev_specs.items(): dev_path = substitute_path_variables(entry["path"]) expanded_path = spack.util.path.canonicalize_path(dev_path, default_wd=init_file_dir) # Skip if the substituted and expanded path is the same (e.g. when absolute) if entry["path"] == expanded_path: continue # If copied and it's inside the env, we copied it and don't need to relativize if copied_env and expanded_path.startswith(init_file_dir): continue tty.debug("Expanding develop path for {0} to {1}".format(name, expanded_path)) dev_specs[name]["path"] = expanded_path spack.config.set("develop", dev_specs, scope=env.scope_name) env._dev_specs = None # If we changed the environment's spack.yaml scope, that will not be reflected # in the manifest that we read env._re_read() def _rewrite_relative_repos_paths_on_relocation(env, init_file_dir, copied_env=False): """When initializing the environment from a manifest file and we plan to store the environment in a different directory, we have to rewrite relative repo paths to absolute ones and expand environment variables.""" with env: repos_specs = spack.config.get("repos", default={}, scope=env.scope_name) if not repos_specs: return for name, entry in list(repos_specs.items()): # only rewrite when we have a path-based repository if not isinstance(entry, str): continue repo_path = substitute_path_variables(entry) expanded_path = spack.util.path.canonicalize_path(repo_path, default_wd=init_file_dir) # Skip if the substituted and expanded path is the same (e.g. when absolute) if entry == expanded_path: continue # If copied and it's inside the env, we copied it and don't need to relativize if copied_env and expanded_path.startswith(init_file_dir): continue tty.debug("Expanding repo path for {0} to {1}".format(entry, expanded_path)) repos_specs[name] = expanded_path spack.config.set("repos", repos_specs, scope=env.scope_name) env.repos_specs = None # If we changed the environment's spack.yaml scope, that will not be reflected # in the manifest that we read env._re_read()
[docs] def environment_dir_from_name(name: str, exists_ok: bool = True) -> str: """Returns the directory associated with a named environment. Args: name: name of the environment exists_ok: if False, raise an error if the environment exists already Raises: SpackEnvironmentError: if exists_ok is False and the environment exists already """ if not exists_ok and exists(name): raise SpackEnvironmentError(f"'{name}': environment already exists at {root(name)}") ensure_env_root_path_exists() validate_env_name(name) return root(name)
[docs] def ensure_env_root_path_exists(): if not os.path.isdir(env_root_path()): fs.mkdirp(env_root_path())
[docs] def set_included_envs_to_env_paths(include_concrete: List[str]) -> None: """If the included environment(s) is the environment name it is replaced by the path to the environment Args: include_concrete: list of env name or path to env""" for i, env_name in enumerate(include_concrete): if is_env_dir(env_name): include_concrete[i] = env_name elif exists(env_name): include_concrete[i] = root(env_name)
[docs] def validate_included_envs_exists(include_concrete: List[str]) -> None: """Checks that all of the included environments exist Args: include_concrete: list of already existing concrete environments to include Raises: SpackEnvironmentError: if any of the included environments do not exist """ missing_envs = set() for i, env_name in enumerate(include_concrete): if not is_env_dir(env_name): missing_envs.add(env_name) if missing_envs: msg = "The following environment(s) are missing: {0}".format(", ".join(missing_envs)) raise SpackEnvironmentError(msg)
[docs] def validate_included_envs_concrete(include_concrete: List[str]) -> None: """Checks that all of the included environments are concrete Args: include_concrete: list of already existing concrete environments to include Raises: SpackEnvironmentError: if any of the included environments are not concrete """ non_concrete_envs = set() for env_path in include_concrete: if not os.path.exists(os.path.join(env_path, lockfile_name)): non_concrete_envs.add(environment_name(env_path)) if non_concrete_envs: msg = "The following environment(s) are not concrete: {0}\nPlease run:".format( ", ".join(non_concrete_envs) ) for env in non_concrete_envs: msg += f"\n\t`spack -e {env} concretize`" raise SpackEnvironmentError(msg)
[docs] def all_environment_names(): """List the names of environments that currently exist.""" # just return empty if the env path does not exist. A read-only # operation like list should not try to create a directory. if not os.path.exists(env_root_path()): return [] env_root = pathlib.Path(env_root_path()).resolve() def yaml_paths(): for root, dirs, files in os.walk(env_root, topdown=True, followlinks=True): dirs[:] = [ d for d in dirs if not d.startswith(".") and not env_root.samefile(os.path.join(root, d)) ] if manifest_name in files: yield os.path.join(root, manifest_name) names = [] for yaml_path in yaml_paths(): candidate = str(pathlib.Path(yaml_path).relative_to(env_root).parent) if valid_env_name(candidate): names.append(candidate) return names
[docs] def all_environments(): """Generator for all managed Environments.""" for name in all_environment_names(): yield read(name)
def _read_yaml(str_or_file): """Read YAML from a file for round-trip parsing.""" try: data = syaml.load_config(str_or_file) except syaml.SpackYAMLError as e: raise SpackEnvironmentConfigError( f"Invalid environment configuration detected: {e.message}", e.filename ) filename = getattr(str_or_file, "name", None) spack.config.validate(data, spack.schema.env.schema, filename) return data def _write_yaml(data, str_or_file): """Write YAML to a file preserving comments and dict order.""" filename = getattr(str_or_file, "name", None) spack.config.validate(data, spack.schema.env.schema, filename) syaml.dump_config(data, str_or_file, default_flow_style=False) def _is_dev_spec_and_has_changed(spec): """Check if the passed spec is a dev build and whether it has changed since the last installation""" # First check if this is a dev build and in the process already try to get # the dev_path if not spec.variants.get("dev_path", None): return False # Now we can check whether the code changed since the last installation if not spec.installed: # Not installed -> nothing to compare against return False # hook so packages can use to write their own method for checking the dev_path # use package so attributes about concretization such as variant state can be # utilized return spec.package.detect_dev_src_change() def _error_on_nonempty_view_dir(new_root): """Defensively error when the target view path already exists and is not an empty directory. This usually happens when the view symlink was removed, but not the directory it points to. In those cases, it's better to just error when the new view dir is non-empty, since it indicates the user removed part but not all of the view, and it likely in an inconsistent state.""" # Check if the target path lexists try: st = os.lstat(new_root) except OSError: return # Empty directories are fine if stat.S_ISDIR(st.st_mode) and len(os.listdir(new_root)) == 0: return # Anything else is an error raise SpackEnvironmentViewError( "Failed to generate environment view, because the target {} already " "exists or is not empty. To update the view, remove this path, and run " "`spack env view regenerate`".format(new_root) )
[docs] class ViewDescriptor: def __init__( self, base_path: str, root: str, *, projections: Optional[Dict[str, str]] = None, select: Optional[List[str]] = None, exclude: Optional[List[str]] = None, link: str = default_view_link, link_type: fsv.LinkType = "symlink", link_dirs: bool = True, groups: Optional[Union[str, List[str]]] = None, ) -> None: self.base = base_path self.raw_root = root self.root = spack.util.path.canonicalize_path(root, default_wd=base_path) self.projections = projections or {} self.select = select or [] self.exclude = exclude or [] self.link_type: fsv.LinkType = fsv.canonicalize_link_type(link_type) self.link_dirs: bool = link_type == "symlink" and link_dirs self.link = link if isinstance(groups, str): groups = [groups] self.groups: Optional[List[str]] = groups
[docs] def select_fn(self, spec: Spec) -> bool: return any(spec.satisfies(s) for s in self.select)
[docs] def exclude_fn(self, spec: Spec) -> bool: return not any(spec.satisfies(e) for e in self.exclude)
[docs] def update_root(self, new_path: str) -> None: self.raw_root = new_path self.root = spack.util.path.canonicalize_path(new_path, default_wd=self.base)
def __eq__(self, other: object) -> bool: return ( isinstance(other, ViewDescriptor) and self.root == other.root and self.projections == other.projections and self.select == other.select and self.exclude == other.exclude and self.link == other.link and self.link_type == other.link_type and self.link_dirs == other.link_dirs )
[docs] def to_dict(self): ret = syaml.syaml_dict([("root", self.raw_root)]) if self.projections: ret["projections"] = self.projections if self.select: ret["select"] = self.select if self.exclude: ret["exclude"] = self.exclude if self.link_type: ret["link_type"] = self.link_type if self.link_dirs: ret["link_dirs"] = self.link_dirs if self.link != default_view_link: ret["link"] = self.link return ret
[docs] @staticmethod def from_dict(base_path: str, d) -> "ViewDescriptor": return ViewDescriptor( base_path, d["root"], projections=d.get("projections", {}), select=d.get("select", []), exclude=d.get("exclude", []), link=d.get("link", default_view_link), link_type=d.get("link_type", "symlink"), link_dirs=d.get("link_dirs", True), groups=d.get("group", None), )
@property def _current_root(self) -> Optional[str]: if not islink(self.root): return None root = readlink(self.root) if os.path.isabs(root): return root root_dir = os.path.dirname(self.root) return os.path.join(root_dir, root) def _next_root(self, specs): content_hash = self.content_hash(specs) root_dir = os.path.dirname(self.root) root_name = os.path.basename(self.root) return os.path.join(root_dir, "._%s" % root_name, content_hash)
[docs] def content_hash(self, specs): d = syaml.syaml_dict( [ ("descriptor", self.to_dict()), ("specs", [(spec.dag_hash(), spec.prefix) for spec in sorted(specs)]), ] ) contents = sjson.dumps(d) return spack.util.hash.b32_hash(contents)
[docs] def get_projection_for_spec(self, spec): """Get projection for spec. This function does not require the view to exist on the filesystem.""" return self._view(self.root).get_projection_for_spec(spec)
[docs] def view(self, new: Optional[str] = None) -> fsv.SimpleFilesystemView: """ Returns a view object for the *underlying* view directory. This means that the self.root symlink is followed, and that the view has to exist on the filesystem (unless ``new``). This function is useful when writing to the view. Raise if new is None and there is no current view Arguments: new: If a string, create a FilesystemView rooted at that path. Default None. This should only be used to regenerate the view, and cannot be used to access specs. """ path = new if new else self._current_root if not path: # This can only be hit if we write a future bug raise SpackEnvironmentViewError( f"Attempting to get nonexistent view from environment. View root is at {self.root}" ) return self._view(path)
def _view(self, root: str) -> fsv.SimpleFilesystemView: """Returns a view object for a given root dir.""" return fsv.SimpleFilesystemView( root, spack.store.STORE.layout, ignore_conflicts=True, projections=self.projections, link_type=self.link_type, link_dirs=self.link_dirs, ) def __contains__(self, spec): """Is the spec described by the view descriptor Note: This does not claim the spec is already linked in the view. It merely checks that the spec is selected if a select operation is specified and is not excluded if an exclude operator is specified. """ if self.select: if not self.select_fn(spec): return False if self.exclude: if not self.exclude_fn(spec): return False return True
[docs] def specs_for_view(self, concrete_roots: List[Spec]) -> List[Spec]: """Flatten the DAGs of the concrete roots, keep only unique, selected, and installed specs in topological order from root to leaf.""" if self.link == "all": deptype = dt.LINK | dt.RUN elif self.link == "run": deptype = dt.RUN else: deptype = dt.NONE specs = traverse.traverse_nodes( concrete_roots, order="topo", deptype=deptype, key=traverse.by_dag_hash ) # Filter selected, installed specs with spack.store.STORE.db.read_transaction(): result = [s for s in specs if s in self and s.installed] return self._exclude_duplicate_runtimes(result)
[docs] def regenerate(self, env: "Environment") -> None: if self.groups is None: concrete_roots = env.concrete_roots() else: concrete_roots = [c for g in self.groups for _, c in env.concretized_specs_by(group=g)] specs = self.specs_for_view(concrete_roots) # To ensure there are no conflicts with packages being installed # that cannot be resolved or have repos that have been removed # we always regenerate the view from scratch. # We will do this by hashing the view contents and putting the view # in a directory by hash, and then having a symlink to the real # view in the root. The real root for a view at /dirname/basename # will be /dirname/._basename_<hash>. # This allows for atomic swaps when we update the view # cache the roots because the way we determine which is which does # not work while we are updating new_root = self._next_root(specs) old_root = self._current_root if new_root == old_root: tty.debug(f"View at {self.root} does not need regeneration.") return _error_on_nonempty_view_dir(new_root) # construct view at new_root if specs: tty.msg(f"Updating view at {self.root}") view = self.view(new=new_root) root_dirname = os.path.dirname(self.root) tmp_symlink_name = os.path.join(root_dirname, "._view_link") # Remove self.root if is it an empty dir, since we need a symlink there. Note that rmdir # fails if self.root is a symlink. try: os.rmdir(self.root) except (FileNotFoundError, NotADirectoryError): pass except OSError as e: if e.errno == errno.ENOTEMPTY: msg = "it is a non-empty directory" elif e.errno == errno.EACCES: msg = "of insufficient permissions" else: raise raise SpackEnvironmentViewError( f"The environment view in {self.root} cannot not be created because {msg}." ) from e # Create a new view try: fs.mkdirp(new_root) view.add_specs(*specs) # create symlink from tmp_symlink_name to new_root if os.path.exists(tmp_symlink_name): os.unlink(tmp_symlink_name) symlink(new_root, tmp_symlink_name) # mv symlink atomically over root symlink to old_root fs.rename(tmp_symlink_name, self.root) except Exception as e: # Clean up new view and temporary symlink on any failure. try: shutil.rmtree(new_root, ignore_errors=True) os.unlink(tmp_symlink_name) except OSError: pass # Give an informative error message for the typical error case: two specs, same package # project to same prefix. if isinstance(e, ConflictingSpecsError): spec_a = e.args[0].format(color=clr.get_color_when()) spec_b = e.args[1].format(color=clr.get_color_when()) raise SpackEnvironmentViewError( f"The environment view in {self.root} could not be created, " "because the following two specs project to the same prefix:\n" f" {spec_a}, and\n" f" {spec_b}.\n" " To resolve this issue:\n" " a. use `concretization:unify:true` to ensure there is only one " "package per spec in the environment, or\n" " b. disable views with `view:false`, or\n" " c. create custom view projections." ) from e raise # Remove the old root when it's in the same folder as the new root. This guards # against removal of an arbitrary path when the original symlink in self.root # was not created by the environment, but by the user. if ( old_root and os.path.exists(old_root) and os.path.samefile(os.path.dirname(new_root), os.path.dirname(old_root)) ): try: shutil.rmtree(old_root) except OSError as e: msg = "Failed to remove old view at %s\n" % old_root msg += str(e) tty.warn(msg)
def _exclude_duplicate_runtimes(self, specs: List[Spec]) -> List[Spec]: """Stably filter out duplicates of "runtime" tagged packages, keeping only latest.""" # Maps packages tagged "runtime" to the spec with latest version. latest: Dict[str, Spec] = {} for s in specs: if "runtime" not in getattr(s.package, "tags", ()): continue elif s.name not in latest or latest[s.name].version < s.version: latest[s.name] = s return [x for x in specs if x.name not in latest or latest[x.name] is x]
[docs] def env_subdir_path(manifest_dir: Union[str, pathlib.Path]) -> str: """Path to where the environment stores repos, logs, views, configs. Args: manifest_dir: directory containing the environment manifest file Returns: directory the environment uses to manage its files """ return os.path.join(str(manifest_dir), env_subdir_name)
[docs] class ConcretizedRootInfo: """Data on root specs that have been concretized""" __slots__ = ("root", "hash", "new", "group") def __init__( self, *, root_spec: spack.spec.Spec, root_hash: str, new: bool = False, group: str ): self.root = root_spec self.hash = root_hash self.new = new self.group = group def __str__(self): return f"{self.root} -> {self.hash} [new={self.new}]" def __eq__(self, other: object) -> bool: return ( isinstance(other, ConcretizedRootInfo) and self.root == other.root and self.hash == other.hash and self.new == other.new and self.group == other.group ) def __hash__(self) -> int: return hash((self.root, self.hash, self.new, self.group))
[docs] @staticmethod def from_info_dict(info_dict: Dict[str, str]) -> "ConcretizedRootInfo": # Lockfile versions < 7 don't have the "group" attribute return ConcretizedRootInfo( root_spec=Spec(info_dict["spec"]), root_hash=info_dict["hash"], new=False, group=info_dict.get("group", DEFAULT_USER_SPEC_GROUP), )
[docs] class Environment: """A Spack environment, which bundles together configuration and a list of specs.""" def __init__(self, manifest_dir: Union[str, pathlib.Path]) -> None: """An environment can be constructed from a directory containing a "spack.yaml" file, and optionally a consistent "spack.lock" file. Args: manifest_dir: directory with the "spack.yaml" associated with the environment """ self.path = os.path.abspath(str(manifest_dir)) self.name = environment_name(self.path) self.env_subdir_path = env_subdir_path(self.path) self.txlock = lk.Lock(self._transaction_lock_path) self._unify = None self.views: Dict[str, ViewDescriptor] = {} #: Parser for spec lists self._spec_lists_parser = SpecListParser() #: Specs from "spack.yaml" self.spec_lists: Dict[str, SpecList] = {} #: Information on concretized roots self.concretized_roots: List[ConcretizedRootInfo] = [] #: Concretized specs by hash self.specs_by_hash: Dict[str, Spec] = {} #: Repository for this environment (memoized) self._repo = None #: Environment root dirs for concrete (lockfile) included environments self.included_concrete_env_root_dirs: List[str] = [] #: First-level included concretized spec data from/to the lockfile. self.included_concrete_spec_data: Dict[str, Dict[str, List[str]]] = {} #: Roots from included environments from the last concretization, keyed by env path self.included_concretized_roots: Dict[str, List[ConcretizedRootInfo]] = {} #: Concretized specs by hash from the included environments self.included_specs_by_hash: Dict[str, Dict[str, Spec]] = {} #: Previously active environment self._previous_active = None self._dev_specs = None # Load the manifest file contents into memory self._load_manifest_file() def _load_manifest_file(self): """Instantiate and load the manifest file contents into memory.""" with lk.ReadTransaction(self.txlock): self.manifest = EnvironmentManifestFile(self.path, self.name) with self.manifest.use_config(): self._read()
[docs] @contextlib.contextmanager def config_override_for_group(self, *, group: str): key = self.manifest._ensure_group_exists(group=group) internal_scope = self.manifest.config_override(group=key) if internal_scope is None: # No internal scope tty.debug( f"[{__name__}] No configuration override necessary for the '{group}' group " f"in the environment at {self.manifest_path}" ) yield return try: tty.debug( f"[{__name__}] Overriding the configuration for the '{group}' group defined " f"in {self.manifest_path} before concretization" ) spack.config.CONFIG.push_scope( internal_scope, priority=ConfigScopePriority.ENVIRONMENT_SPEC_GROUPS ) yield finally: spack.config.CONFIG.remove_scope(internal_scope.name)
def __getstate__(self): state = self.__dict__.copy() state.pop("txlock", None) state.pop("_repo", None) state.pop("repo_token", None) state.pop("store_token", None) return state def __setstate__(self, state): self.__dict__.update(state) self.txlock = lk.Lock(self._transaction_lock_path) self._repo = None def _re_read(self): """Reinitialize the environment object.""" self.clear() self._load_manifest_file() def _read(self): self._construct_state_from_manifest() if os.path.exists(self.lock_path): with open(self.lock_path, encoding="utf-8") as f: read_lock_version = self._read_lockfile(f)["_meta"]["lockfile-version"] if read_lock_version == 1: tty.debug(f"Storing backup of {self.lock_path} at {self._lock_backup_v1_path}") shutil.copy(self.lock_path, self._lock_backup_v1_path)
[docs] def write_transaction(self): """Get a write lock context manager for use in a ``with`` block.""" return lk.WriteTransaction(self.txlock, acquire=self._re_read)
def _process_view(self, env_view: Optional[Union[bool, str, Dict]]): """Process view option(s), which can be boolean, string, or None. A boolean environment view option takes precedence over any that may be included. So ``view: True`` results in the default view only. And ``view: False`` means the environment will have no view. Args: env_view: view option provided in the manifest or configuration """ def add_view(name, values): """Add the view with the name and the string or dict values.""" if isinstance(values, str): self.views[name] = ViewDescriptor(self.path, values) elif isinstance(values, dict): self.views[name] = ViewDescriptor.from_dict(self.path, values) else: tty.error(f"Cannot add view named {name} for {type(values)} values {values}") # If the configuration specifies 'view: False' then we are done # processing views. If this is called with the environment's view # view (versus an included view), then there are to be NO views. if env_view is False: return # If the configuration specifies 'view: True' then only the default # view will be created for the environment and we are done processing # views. if env_view is True: add_view(default_view_name, self.view_path_default) return # Otherwise, the configuration has a subdirectory or dictionary. if isinstance(env_view, str): add_view(default_view_name, env_view) elif env_view: for name, values in env_view.items(): add_view(name, values) # If we reach this point without an explicit view option then we # provide the default view. if self.views == dict(): self.views[default_view_name] = ViewDescriptor(self.path, self.view_path_default) def _load_concrete_include_data(self): """Load concrete include specs data from included concrete directories.""" if self.included_concrete_env_root_dirs: if os.path.exists(self.lock_path): with open(self.lock_path, encoding="utf-8") as f: data = self._read_lockfile(f) if lockfile_include_key in data: self.included_concrete_spec_data = data[lockfile_include_key] else: self.include_concrete_envs() def _process_included_lockfiles(self): """Extract and load into memory included lock file data.""" includes = self.manifest[TOP_LEVEL_KEY].get(lockfile_include_key, []) if includes: tty.warn( f"Use of '{lockfile_include_key}' in manifest files " f"is deprecated. The key should be '{manifest_include_name}' " f"and the path should end with '{lockfile_name}'. Run " f"'spack env update {self.name}' to update the manifest." ) includes = [os.path.join(inc, lockfile_name) for inc in includes] includes += self.manifest[TOP_LEVEL_KEY].get(manifest_include_name, []) if not includes: return # Expand config and environment variables for concrete environments, # indicated by the inclusion of lock files. self.included_concrete_env_root_dirs = [] for entry in includes: include = spack.config.included_path(entry) if isinstance(include, spack.config.GitIncludePaths): # Git includes must be cloned first; paths are relative to the # clone destination, not to the manifest directory. destination = include._clone(self.manifest.env_config_scope) if destination is None: continue resolved = [os.path.join(destination, p) for p in include.paths] else: resolved = [ spack.util.path.canonicalize_path(p, default_wd=self.path) for p in include.paths ] for path in resolved: if os.path.basename(path) != lockfile_name: continue tty.debug(f"Adding {path} to the concrete environment root directories") self.included_concrete_env_root_dirs.append(os.path.dirname(path)) # Cache concrete environments for required lock files. self._load_concrete_include_data() def _construct_state_from_manifest(self): """Set up user specs and views from the manifest file.""" self.views = {} self._sync_speclists() self._process_view(spack.config.get("view", True)) self._process_included_lockfiles() def _sync_speclists(self): self._spec_lists_parser = SpecListParser( toolchains=spack.config.CONFIG.get("toolchains", {}) ) self.spec_lists = {} self.spec_lists.update( self._spec_lists_parser.parse_definitions( data=spack.config.CONFIG.get("definitions", []) ) ) for group in self.manifest.groups(): tty.debug(f"[{__name__}]: Synchronizing user specs from the '{group}' group", level=2) key = self._user_specs_key(group=group) self.spec_lists[key] = self._spec_lists_parser.parse_user_specs( name=key, yaml_list=self.manifest.user_specs(group=group) ) def _user_specs_key(self, *, group: Optional[str] = None) -> str: if group is None or group == DEFAULT_USER_SPEC_GROUP: return USER_SPECS_KEY return f"{USER_SPECS_KEY}:{group}" @property def user_specs(self) -> SpecList: return self.user_specs_by(group=DEFAULT_USER_SPEC_GROUP)
[docs] def user_specs_by(self, *, group: Optional[str]) -> SpecList: """Returns a dictionary of user specs keyed by their group.""" key = self._user_specs_key(group=group) return self.spec_lists[key]
[docs] def explicit_roots(self): for x in self.concretized_roots: if self.manifest.is_explicit(group=x.group): yield x
@property def dev_specs(self): dev_specs = {} dev_config = spack.config.get("develop", {}) for name, entry in dev_config.items(): local_entry = {"spec": str(entry["spec"])} # default path is the spec name if "path" not in entry: local_entry["path"] = name else: local_entry["path"] = entry["path"] dev_specs[name] = local_entry return dev_specs @property def included_user_specs(self) -> SpecList: """Included concrete user (or root) specs from last concretization.""" spec_list = SpecList() if not self.included_concrete_env_root_dirs: return spec_list def add_root_specs(included_concrete_specs): # add specs from the include *and* any nested includes it may have for env, info in included_concrete_specs.items(): for root_list in info["roots"]: spec_list.add(root_list["spec"]) if lockfile_include_key in info: add_root_specs(info[lockfile_include_key]) add_root_specs(self.included_concrete_spec_data) return spec_list
[docs] def clear(self): """Clear the contents of the environment""" self.spec_lists = {} self._dev_specs = {} self.concretized_roots = [] self.specs_by_hash = {} # concretized specs by hash self.included_concrete_spec_data = {} # concretized specs from lockfile of included envs self.included_concretized_roots = {} # root specs of the included envs, keyed by env path self.included_specs_by_hash = {} # concretized specs by hash from the included envs self.invalidate_repository_cache() self._previous_active = None # previously active environment self.manifest.clear()
@property def active(self): """True if this environment is currently active.""" return _active_environment and self.path == _active_environment.path @property def manifest_path(self): """Path to spack.yaml file in this environment.""" return os.path.join(self.path, manifest_name) @property def _transaction_lock_path(self): """The location of the lock file used to synchronize multiple processes updating the same environment. """ return os.path.join(self.env_subdir_path, "transaction_lock") @property def lock_path(self): """Path to spack.lock file in this environment.""" return os.path.join(self.path, lockfile_name) @property def _lock_backup_v1_path(self): """Path to backup of v1 lockfile before conversion to v2""" return self.lock_path + ".backup.v1" @property def repos_path(self): return os.path.join(self.env_subdir_path, "repos") @property def view_path_default(self) -> str: # default path for environment views return os.path.join(self.env_subdir_path, "view") @property def repo(self): if self._repo is None: self._repo = make_repo_path(self.repos_path) return self._repo @property def scope_name(self): """Name of the config scope of this environment's manifest file.""" return self.manifest.scope_name
[docs] def include_concrete_envs(self): """Copy and save the included environments' specs internally.""" root_hash_seen = set() concrete_hash_seen = set() self.included_concrete_spec_data = {} for env_path in self.included_concrete_env_root_dirs: # Check that the environment (lockfile) exists if not is_env_dir(env_path): raise SpackEnvironmentError(f"Unable to find env at {env_path}") env = Environment(env_path) self.included_concrete_spec_data[env_path] = {"roots": [], "concrete_specs": {}} # Copy unique root specs from env for root_dict in env._concrete_roots_dict(): if root_dict["hash"] not in root_hash_seen: self.included_concrete_spec_data[env_path]["roots"].append(root_dict) root_hash_seen.add(root_dict["hash"]) # Copy unique concrete specs from env for dag_hash, spec_details in env._concrete_specs_dict().items(): if dag_hash not in concrete_hash_seen: self.included_concrete_spec_data[env_path]["concrete_specs"].update( {dag_hash: spec_details} ) concrete_hash_seen.add(dag_hash) # Copy transitive include data transitive = env.included_concrete_spec_data if transitive: self.included_concrete_spec_data[env_path][lockfile_include_key] = transitive self.unify_specs() self.write()
[docs] def destroy(self): """Remove this environment from Spack entirely.""" shutil.rmtree(self.path)
[docs] def add(self, user_spec, list_name=USER_SPECS_KEY) -> bool: """Add a single user_spec (non-concretized) to the Environment Returns: True if the spec was added, False if it was already present and did not need to be added """ spec = Spec(user_spec) if list_name not in self.spec_lists: raise SpackEnvironmentError(f"No list {list_name} exists in environment {self.name}") if list_name == USER_SPECS_KEY: if spec.anonymous: raise SpackEnvironmentError("cannot add anonymous specs to an environment") elif not spack.repo.PATH.exists(spec.name) and not spec.abstract_hash: virtuals = spack.repo.PATH.provider_index.providers.keys() if spec.name not in virtuals: raise SpackEnvironmentError(f"no such package: {spec.name}") list_to_change = self.spec_lists[list_name] existing = str(spec) in list_to_change.yaml_list if not existing: list_to_change.add(spec) if list_name == USER_SPECS_KEY: self.manifest.add_user_spec(str(user_spec)) else: self.manifest.add_definition(str(user_spec), list_name=list_name) self._sync_speclists() return bool(not existing)
[docs] def change_existing_spec( self, change_spec: Spec, list_name: str = USER_SPECS_KEY, match_spec: Optional[Spec] = None, allow_changing_multiple_specs=False, ): """ Find the spec identified by ``match_spec`` and change it to ``change_spec``. Arguments: change_spec: defines the spec properties that need to be changed. This will not change attributes of the matched spec unless they conflict with ``change_spec``. list_name: identifies the spec list in the environment that should be modified match_spec: if set, this identifies the spec that should be changed. If not set, it is assumed we are looking for a spec with the same name as ``change_spec``. """ if not (change_spec.name or match_spec): raise ValueError( "Must specify a spec name or match spec to identify a single spec" " in the environment that will be changed (or multiple with '--all')" ) match_spec = match_spec or Spec(change_spec.name) list_to_change = self.spec_lists[list_name] if list_to_change.is_matrix: raise SpackEnvironmentError( "Cannot directly change specs in matrices:" " specify a named list that is not a matrix" ) matches = list((idx, x) for idx, x in enumerate(list_to_change) if x.satisfies(match_spec)) if len(matches) == 0: raise ValueError( "There are no specs named {0} in {1}".format(match_spec.name, list_name) ) elif len(matches) > 1 and not allow_changing_multiple_specs: raise ValueError(f"{str(match_spec)} matches multiple specs") for idx, spec in matches: override_spec = Spec.override(spec, change_spec) if list_name == USER_SPECS_KEY: self.manifest.override_user_spec(str(override_spec), idx=idx) else: self.manifest.override_definition( str(spec), override=str(override_spec), list_name=list_name ) self._sync_speclists()
[docs] def remove(self, query_spec, list_name=USER_SPECS_KEY, force=False): """Remove specs from an environment that match a query_spec""" err_msg_header = ( f"Cannot remove '{query_spec}' from '{list_name}' definition " f"in {self.manifest.manifest_file}" ) query_spec = Spec(query_spec) try: list_to_change = self.spec_lists[list_name] except KeyError as e: msg = f"{err_msg_header}, since '{list_name}' does not exist" raise SpackEnvironmentError(msg) from e if not query_spec.concrete: matches = [s for s in list_to_change if s.satisfies(query_spec)] else: # concrete specs match against concrete specs in the env by dag hash. matches = [x.root for x in self.concretized_roots if query_spec.dag_hash() == x.hash] if not matches: raise SpackEnvironmentError(f"{err_msg_header}, no spec matches") old_specs = set(self.user_specs) # Remove specs from the appropriate spec list for spec in matches: if spec not in list_to_change: continue try: list_to_change.remove(spec) except SpecListError as e: msg = str(e) if force: msg += " It will be removed from the concrete specs." tty.warn(msg) else: if list_name == USER_SPECS_KEY: self.manifest.remove_user_spec(str(spec)) else: self.manifest.remove_definition(str(spec), list_name=list_name) # Recompute "definitions" and user specs self._sync_speclists() new_specs = set(self.user_specs) # If 'force', update stale concretized specs if force: stale_specs = old_specs - new_specs self.concretized_roots, removed = stable_partition( self.concretized_roots, lambda x: x.root not in stale_specs ) for x in removed: del self.specs_by_hash[x.hash]
[docs] def is_develop(self, spec): """Returns true when the spec is built from local sources""" return spec.name in self.dev_specs
[docs] def apply_develop(self, specs: List[spack.spec.Spec], paths: Optional[List[str]] = None): """Mutate concrete specs to include dev_path provenance pointing to path. This will fail if any existing concrete spec for the same package does not satisfy the given develop spec.""" selectors = [] mutators = [] msgs = [] assert not paths or len(specs) == len(paths) for spec, path in zip_longest(specs, paths or [], fillvalue=None): assert spec selector = spack.spec.Spec(spec.name) mutator = spack.spec.Spec() if path: variant = vt.SingleValuedVariant("dev_path", path) else: variant = vt.VariantValueRemoval("dev_path") mutator.variants["dev_path"] = variant msg = ( f"Develop spec '{spec}' conflicts with concrete specs in environment." " Try again with 'spack develop --no-modify-concrete-specs'" " and run 'spack concretize --force' to apply your changes." ) selectors.append(selector) mutators.append(mutator) msgs.append(msg) self.mutate(selectors, mutators, validators=specs, msgs=msgs)
[docs] def mutate( self, selectors: List[spack.spec.Spec], mutators: List[spack.spec.Spec], validators: Optional[List[spack.spec.Spec]] = None, msgs: Optional[List[str]] = None, ): """Mutate concrete specs of an environment Mutate any spec that matches ``selector``. Invalidate caches on parents of mutated specs. If a validator spec is supplied, throw an error if a selected spec does not satisfy the validator. """ # Find all specs that this mutation applies to modify_specs = [] modified_specs = [] if len(selectors) != len(mutators): raise ValueError( f"Length mismatch: selectors ({len(selectors)}) != mutators ({len(mutators)})" ) if validators and len(validators) != len(selectors): raise ValueError( f"Length mismatch: validators ({len(validators)}) != selectors ({len(selectors)})" ) if msgs and len(msgs) != len(selectors): raise ValueError( f"Length mismatch: msgs ({len(msgs)}) != selectors ({len(selectors)})" ) for dep in self.all_specs_generator(): for selector, mutator, validator, msg in zip_longest( selectors, mutators, validators or [], msgs or [], fillvalue=None ): assert selector assert mutator if dep.satisfies(selector): if not dep.satisfies(validator or selector): if not msg: msg = f"spec {dep} satisfies selector {selector}" msg += f" but not validator {validator}" raise SpackEnvironmentDevelopError(msg) modify_specs.append((dep, mutator)) # Manipulate selected specs for s, mutator in modify_specs: modified = s.mutate(mutator, rehash=False) if modified: modified_specs.append(s) # Identify roots modified and invalidate all dependent hashes modified_roots = [] for parent in traverse.traverse_nodes(modified_specs, direction="parents"): # record whether this parent is a root before we modify the hash if parent.dag_hash() in self.specs_by_hash: modified_roots.append((parent, parent.dag_hash())) # modify the parent to invalidate hashes parent._mark_root_concrete(False) parent.clear_caches() # Compute new hashes and update the env list of specs hash_mutations = {} for root, old_hash in modified_roots: # New hash must be computed after we finalize concretization root._finalize_concretization() new_hash = root.dag_hash() self.specs_by_hash.pop(old_hash) self.specs_by_hash[new_hash] = root hash_mutations[old_hash] = new_hash for x in self.concretized_roots: if x.hash in hash_mutations: x.hash = hash_mutations[x.hash] if modified_roots: self.write()
[docs] def concretize( self, *, force: Optional[bool] = None, tests: Union[bool, Sequence[str]] = False ) -> Sequence[SpecPair]: """Concretize user_specs in this environment. Only concretizes specs that haven't been concretized yet unless force is ``True``. This only modifies the environment in memory. ``write()`` will write out a lockfile containing concretized specs. Arguments: force: re-concretize ALL specs, even those that were already concretized; defaults to ``spack.config.get("concretizer:force")`` tests: False to run no tests, True to test all packages, or a list of package names to run tests for some Returns: List of specs that have been concretized. Each entry is a tuple of the user spec and the corresponding concretized spec. """ return EnvironmentConcretizer(self).concretize(force=force, tests=tests)
[docs] def sync_concretized_specs(self) -> None: """Removes concrete specs that no longer correlate to a user spec""" if not self.concretized_roots: return to_deconcretize, user_specs = [], self._all_user_specs_with_group() for x in self.concretized_roots: if (x.group, x.root) not in user_specs: to_deconcretize.append(x) for x in to_deconcretize: self.deconcretize_by_user_spec(x.root, group=x.group)
def _all_user_specs_with_group(self) -> Set[Tuple[str, Spec]]: result = set() for group in self.manifest.groups(): result.update([(group, x) for x in self.user_specs_by(group=group)]) return result
[docs] def clear_concretized_specs(self) -> None: """Clears the currently concretized specs""" self.concretized_roots = [] self.specs_by_hash = {}
[docs] def deconcretize_by_hash(self, dag_hash: str) -> None: """Removes a concrete spec from the environment concretization""" self.concretized_roots = [x for x in self.concretized_roots if x.hash != dag_hash] self._maybe_remove_dag_hash(dag_hash)
[docs] def deconcretize_by_user_spec( self, spec: spack.spec.Spec, *, group: Optional[str] = None ) -> None: """Removes a user spec from the environment concretization Arguments: spec: user spec to deconcretize group: group of the spec to remove. If not specified, the spec is removed from the default group """ group = group or DEFAULT_USER_SPEC_GROUP # spec has to be a root of the environment discarded, self.concretized_roots = stable_partition( self.concretized_roots, lambda x: x.group == group and x.root == spec ) assert len({x.hash for x in discarded}) == 1, ( "More than one hash associated with a single user spec" ) dag_hash = discarded[0].hash self._maybe_remove_dag_hash(dag_hash)
def _maybe_remove_dag_hash(self, dag_hash: str): # If this was the only user spec that concretized to this concrete spec, remove it if not self.user_spec_with_hash(dag_hash) and dag_hash in self.specs_by_hash: # if we deconcretized a dependency that doesn't correspond to a root, it won't be here. del self.specs_by_hash[dag_hash]
[docs] def user_spec_with_hash(self, dag_hash: str) -> bool: """Returns True if any user spec is associated with a concrete spec with the given hash""" return any(x.hash == dag_hash for x in self.concretized_roots)
[docs] def unify_specs(self) -> None: # Keep the information on new specs by copying the concretized roots old_concretized_roots = self.concretized_roots self._read_lockfile_dict(self._to_lockfile_dict()) self.concretized_roots = old_concretized_roots
@property def default_view(self): if not self.has_view(default_view_name): raise SpackEnvironmentError(f"{self.name} does not have a default view enabled") return self.views[default_view_name]
[docs] def has_view(self, view_name: str) -> bool: return view_name in self.views
[docs] def update_default_view(self, path_or_bool: Union[str, bool]) -> None: """Updates the path of the default view. If the argument passed as input is False the default view is deleted, if present. The manifest will have an entry ``view: false``. If the argument passed as input is True a default view is created, if not already present. The manifest will have an entry ``view: true``. If a default view is already declared, it will be left untouched. If the argument passed as input is a path a default view pointing to that path is created, if not present already. If a default view is already declared, only its "root" will be changed. Args: path_or_bool: either True, or False or a path """ view_path = self.view_path_default if path_or_bool is True else path_or_bool # We don't have the view, and we want to remove it if default_view_name not in self.views and path_or_bool is False: return # We want to enable the view, but we have it already if default_view_name in self.views and path_or_bool is True: return # We have the view, and we want to set it to the same path if default_view_name in self.views and self.default_view.root == view_path: return self.delete_default_view() if path_or_bool is False: self.views.pop(default_view_name, None) self.manifest.remove_default_view() return # If we had a default view already just update its path, # else create a new one and add it to views if default_view_name in self.views: self.default_view.update_root(view_path) else: assert isinstance(view_path, str), f"expected str for 'view_path', but got {view_path}" self.views[default_view_name] = ViewDescriptor(self.path, view_path) self.manifest.set_default_view(self._default_view_as_yaml())
[docs] def delete_default_view(self) -> None: """Deletes the default view associated with this environment.""" if default_view_name not in self.views: return try: view = pathlib.Path(self.default_view.root) shutil.rmtree(view.resolve()) view.unlink() except FileNotFoundError as e: msg = f"[ENVIRONMENT] error trying to delete the default view: {str(e)}" tty.debug(msg)
[docs] def regenerate_views(self): if not self.views: tty.debug("Skip view update, this environment does not maintain a view") return for view in self.views.values(): view.regenerate(self)
[docs] def check_views(self): """Checks if the environments default view can be activated.""" try: # This is effectively a no-op, but it touches all packages in the # default view if they are installed. for view_name, view in self.views.items(): for spec in self.concrete_roots(): if spec in view and spec.package and spec.installed: msg = '{0} in view "{1}"' tty.debug(msg.format(spec.name, view_name)) except (spack.repo.UnknownPackageError, spack.repo.UnknownNamespaceError) as e: tty.warn(e) tty.warn( "Environment %s includes out of date packages or repos. " "Loading the environment view will require reconcretization." % self.name )
def _env_modifications_for_view( self, view: ViewDescriptor, reverse: bool = False ) -> spack.util.environment.EnvironmentModifications: try: with spack.store.STORE.db.read_transaction(): installed_roots = [s for s in self.concrete_roots() if s.installed] mods = uenv.environment_modifications_for_specs(*installed_roots, view=view) except Exception as e: # Failing to setup spec-specific changes shouldn't be a hard error. tty.warn( f"could not {'unload' if reverse else 'load'} runtime environment due " f"to {e.__class__.__name__}: {e}" ) return spack.util.environment.EnvironmentModifications() return mods.reversed() if reverse else mods
[docs] def add_view_to_env( self, env_mod: spack.util.environment.EnvironmentModifications, view: str ) -> spack.util.environment.EnvironmentModifications: """Collect the environment modifications to activate an environment using the provided view. Removes duplicate paths. Args: env_mod: the environment modifications object that is modified. view: the name of the view to activate.""" descriptor = self.views.get(view) if not descriptor: return env_mod env_mod.extend(uenv.unconditional_environment_modifications(descriptor)) env_mod.extend(self._env_modifications_for_view(descriptor)) # deduplicate paths from specs mapped to the same location for env_var in env_mod.group_by_name(): env_mod.prune_duplicate_paths(env_var) return env_mod
[docs] def rm_view_from_env( self, env_mod: spack.util.environment.EnvironmentModifications, view: str ) -> spack.util.environment.EnvironmentModifications: """Collect the environment modifications to deactivate an environment using the provided view. Reverses the action of ``add_view_to_env``. Args: env_mod: the environment modifications object that is modified. view: the name of the view to deactivate.""" descriptor = self.views.get(view) if not descriptor: return env_mod env_mod.extend(uenv.unconditional_environment_modifications(descriptor).reversed()) env_mod.extend(self._env_modifications_for_view(descriptor, reverse=True)) return env_mod
[docs] def add_concrete_spec( self, spec: spack.spec.Spec, concrete: spack.spec.Spec, *, new: bool = True, group: Optional[str] = None, ): """Called when a new concretized spec is added to the environment. This ensures that all internal data structures are kept in sync. Arguments: spec: user spec that resulted in the concrete spec concrete: spec concretized within this environment new: whether to write this spec's package to the env repo on write() """ assert concrete.concrete h = concrete.dag_hash() group = group or DEFAULT_USER_SPEC_GROUP self.concretized_roots.append( ConcretizedRootInfo(root_spec=spec, root_hash=h, new=new, group=group) ) self.specs_by_hash[h] = concrete
def _dev_specs_that_need_overwrite(self): """Return the hashes of all specs that need to be reinstalled due to source code change.""" changed_dev_specs = [ s for s in traverse.traverse_nodes( self.concrete_roots(), order="breadth", key=traverse.by_dag_hash ) if _is_dev_spec_and_has_changed(s) ] # Collect their hashes, and the hashes of their installed parents. # Notice: with order=breadth all changed dev specs are at depth 0, # even if they occur as parents of one another. return [ spec.dag_hash() for depth, spec in traverse.traverse_nodes( changed_dev_specs, root=True, order="breadth", depth=True, direction="parents", key=traverse.by_dag_hash, ) if depth == 0 or spec.installed ] def _partition_roots_by_install_status(self): """Partition root specs into those that do not have to be passed to the installer, and those that should be, taking into account development specs. This is done in a single read transaction per environment instead of per spec.""" with spack.store.STORE.db.read_transaction(): uninstalled, installed = stable_partition(self.concrete_roots(), _is_uninstalled) return installed, uninstalled
[docs] def uninstalled_specs(self): """Return root specs that are not installed, or are installed, but are development specs themselves or have those among their dependencies.""" return self._partition_roots_by_install_status()[1]
[docs] def install_all(self, **install_args): """Install all concretized specs in an environment. Note: this does not regenerate the views for the environment; that needs to be done separately with a call to write(). Args: install_args (dict): keyword install arguments """ self.install_specs(None, **install_args)
[docs] def install_specs(self, specs: Optional[List[Spec]] = None, **install_args): roots = self.concrete_roots() specs = specs if specs is not None else roots # Extract reporter arguments reporter = install_args.pop("reporter", None) report_file = install_args.pop("report_file", None) # Extend the set of specs to overwrite with modified dev specs and their parents install_args["overwrite"] = { *install_args.get("overwrite", ()), *self._dev_specs_that_need_overwrite(), } # Only environment roots in explicit groups are marked explicit install_args["explicit"] = { *install_args.get("explicit", ()), *(x.hash for x in self.explicit_roots()), } builder = spack.installer_dispatch.create_installer( [spec.package for spec in specs], create_reports=reporter is not None, **install_args ) try: builder.install() finally: if reporter: if isinstance(builder.reports, dict): reporter.build_report(report_file, list(builder.reports.values())) elif isinstance(builder.reports, list): reporter.build_report(report_file, builder.reports) else: raise TypeError("builder.reports must be either a dictionary or a list")
[docs] def all_specs_generator(self) -> Iterable[Spec]: """Returns a generator for all concrete specs""" return traverse.traverse_nodes(self.concrete_roots(), key=traverse.by_dag_hash)
[docs] def all_specs(self) -> List[Spec]: """Returns a list of all concrete specs""" return list(self.all_specs_generator())
[docs] def all_hashes(self): """Return hashes of all specs.""" return [s.dag_hash() for s in self.all_specs_generator()]
[docs] def roots(self): """Specs explicitly requested by the user *in this environment*. Yields both added and installed specs that have user specs in ``spack.yaml``. """ concretized = dict(self.concretized_specs()) for spec in self.user_specs: concrete = concretized.get(spec) yield concrete if concrete else spec
[docs] def added_specs(self): """Specs that are not yet installed. Yields the user spec for non-concretized specs, and the concrete spec for already concretized but not yet installed specs. """ # use a transaction to avoid overhead of repeated calls # to `package.spec.installed` with spack.store.STORE.db.read_transaction(): concretized = dict(self.concretized_specs()) for spec in self.user_specs: concrete = concretized.get(spec) if not concrete: yield spec elif not concrete.installed: yield concrete
[docs] def concretized_specs(self): """Tuples of (user spec, concrete spec) for all concrete specs.""" for x in self.concretized_roots: yield x.root, self.specs_by_hash[x.hash] yield from self.concretized_specs_from_all_included_environments()
[docs] def concretized_specs_from_all_included_environments(self): seen = {(x.root, x.hash) for x in self.concretized_roots} for included_env in self.included_concretized_roots: yield from self.concretized_specs_from_included_environment(included_env, _seen=seen)
[docs] def concretized_specs_from_included_environment( self, included_env: str, *, _seen: Optional[Set[Tuple[spack.spec.Spec, str]]] = None ): _seen = set() if _seen is None else _seen for x in self.included_concretized_roots[included_env]: if (x.root, x.hash) in _seen: continue _seen.add((x.root, x.hash)) yield x.root, self.included_specs_by_hash[included_env][x.hash]
[docs] def concrete_roots(self): """Same as concretized_specs, except it returns the list of concrete roots *without* associated user spec""" return [root for _, root in self.concretized_specs()]
[docs] def concretized_specs_by(self, *, group: str) -> Iterable[Tuple[Spec, Spec]]: """Generates all the (abstract, concrete) spec pairs for a given group""" for x in self.concretized_roots: if x.group != group: continue yield x.root, self.specs_by_hash[x.hash]
[docs] def get_by_hash(self, dag_hash: str) -> List[Spec]: # If it's not a partial hash prefix we can early exit early_exit = len(dag_hash) == 32 matches = [] for spec in traverse.traverse_nodes( self.concrete_roots(), key=traverse.by_dag_hash, order="breadth" ): if spec.dag_hash().startswith(dag_hash): matches.append(spec) if early_exit: break return matches
[docs] def get_one_by_hash(self, dag_hash): """Returns the single spec from the environment which matches the provided hash. Raises an AssertionError if no specs match or if more than one spec matches.""" hash_matches = self.get_by_hash(dag_hash) assert len(hash_matches) == 1 return hash_matches[0]
[docs] def all_matching_specs(self, *specs: spack.spec.Spec) -> List[Spec]: """Returns all concretized specs in the environment satisfying any of the input specs""" return [ s for s in traverse.traverse_nodes(self.concrete_roots(), key=traverse.by_dag_hash) if any(s.satisfies(t) for t in specs) ]
[docs] @spack.repo.autospec def matching_spec(self, spec): """ Given a spec (likely not concretized), find a matching concretized spec in the environment. The matching spec does not have to be installed in the environment, but must be concrete (specs added with ``spack add`` without an intervening ``spack concretize`` will not be matched). If there is a single root spec that matches the provided spec or a single dependency spec that matches the provided spec, then the concretized instance of that spec will be returned. If multiple root specs match the provided spec, or no root specs match and multiple dependency specs match, then this raises an error and reports all matching specs. """ env_root_to_user = {root.dag_hash(): user for user, root in self.concretized_specs()} root_matches, dep_matches = [], [] for env_spec in traverse.traverse_nodes( specs=[root for _, root in self.concretized_specs()], key=traverse.by_dag_hash, order="breadth", ): if not env_spec.satisfies(spec): continue # If the spec is concrete, then there is no possibility of multiple matches, # and we immediately return the single match if spec.concrete: return env_spec # Distinguish between environment roots and deps. Specs that are both # are classified as environment roots. user_spec = env_root_to_user.get(env_spec.dag_hash()) if user_spec: root_matches.append((env_spec, user_spec)) else: dep_matches.append(env_spec) # No matching spec if not root_matches and not dep_matches: return None # Single root spec, any number of dep specs => return root spec. if len(root_matches) == 1: return root_matches[0][0] if not root_matches and len(dep_matches) == 1: return dep_matches[0] # More than one spec matched, and either multiple roots matched or # none of the matches were roots # If multiple root specs match, it is assumed that the abstract # spec will most-succinctly summarize the difference between them # (and the user can enter one of these to disambiguate) fmt_str = "{hash:7} " + spack.spec.DEFAULT_FORMAT color = clr.get_color_when() match_strings = [ f"Root spec {abstract.format(color=color)}\n {concrete.format(fmt_str, color=color)}" for concrete, abstract in root_matches ] match_strings.extend( f"Dependency spec\n {s.format(fmt_str, color=color)}" for s in dep_matches ) matches_str = "\n".join(match_strings) raise SpackEnvironmentError( f"{spec} matches multiple specs in the environment {self.name}: \n{matches_str}" )
[docs] def removed_specs(self): """Tuples of (user spec, concrete spec) for all specs that will be removed on next concretize.""" needed = set() for s, c in self.concretized_specs(): if s in self.user_specs: for d in c.traverse(): needed.add(d) for s, c in self.concretized_specs(): for d in c.traverse(): if d not in needed: yield d
def _concrete_specs_dict(self): concrete_specs = {} for s in traverse.traverse_nodes(self.specs_by_hash.values(), key=traverse.by_dag_hash): spec_dict = s.node_dict_with_hashes(hash=ht.dag_hash) # Assumes no legacy formats, since this was just created. spec_dict[ht.dag_hash.name] = s.dag_hash() concrete_specs[s.dag_hash()] = spec_dict if s.build_spec is not s: for d in s.build_spec.traverse(): build_spec_dict = d.node_dict_with_hashes(hash=ht.dag_hash) build_spec_dict[ht.dag_hash.name] = d.dag_hash() concrete_specs[d.dag_hash()] = build_spec_dict return concrete_specs def _concrete_roots_dict(self): if not self.has_groups(): return [{"hash": x.hash, "spec": str(x.root)} for x in self.concretized_roots] return [ {"hash": x.hash, "spec": str(x.root), "group": x.group} for x in self.concretized_roots ]
[docs] def has_groups(self) -> bool: groups = self.manifest.groups() # True if groups != {DEFAULT_USER_SPEC_GROUP} return len(groups) != 1 or DEFAULT_USER_SPEC_GROUP not in groups
def _to_lockfile_dict(self): """Create a dictionary to store a lockfile for this environment.""" lockfile_version = CURRENT_LOCKFILE_VERSION if self.has_groups() else 6 concrete_specs = self._concrete_specs_dict() root_specs = self._concrete_roots_dict() spack_dict = {"version": spack.spack_version} spack_commit = spack.get_spack_commit() if spack_commit: spack_dict["type"] = "git" spack_dict["commit"] = spack_commit else: spack_dict["type"] = "release" # this is the lockfile we'll write out data = { # metadata about the format "_meta": { "file-type": "spack-lockfile", "lockfile-version": lockfile_version, "specfile-version": spack.spec.SPECFILE_FORMAT_VERSION, }, # spack version information "spack": spack_dict, # users specs + hashes are the 'roots' of the environment "roots": root_specs, # Concrete specs by hash, including dependencies "concrete_specs": concrete_specs, } if self.included_concrete_env_root_dirs: data[lockfile_include_key] = self.included_concrete_spec_data return data def _read_lockfile(self, file_or_json): """Read a lockfile from a file or from a raw string.""" lockfile_dict = sjson.load(file_or_json) self._read_lockfile_dict(lockfile_dict) return lockfile_dict def _set_included_env_roots( self, env_name: str, env_info: Dict[str, Dict[str, Any]], included_json_specs_by_hash: Dict[str, Dict[str, Any]], ) -> Dict[str, Dict[str, Any]]: """Populates included_concretized_roots from included environment data, including any transitively nested included environments. Args: env_name: the path of the included environment env_info: included concrete environment data included_json_specs_by_hash: concrete spec data keyed by hash Returns: updated specs_by_hash """ self.included_concretized_roots[env_name] = [] def add_specs(name, info, specs_by_hash): # Add specs from the environment as well as any of its nested # environments. for root_info in info["roots"]: self.included_concretized_roots[name].append( ConcretizedRootInfo.from_info_dict(root_info) ) if "concrete_specs" in info: specs_by_hash.update(info["concrete_specs"]) if lockfile_include_key in info: for included_name, included_info in info[lockfile_include_key].items(): if included_name not in self.included_concretized_roots: self.included_concretized_roots[included_name] = [] add_specs(included_name, included_info, specs_by_hash) add_specs(env_name, env_info, included_json_specs_by_hash) return included_json_specs_by_hash def _read_lockfile_dict(self, d): """Read a lockfile dictionary into this environment.""" self.specs_by_hash = {} self.included_specs_by_hash = {} self.included_concretized_roots = {} roots = d["roots"] self.concretized_roots = [ConcretizedRootInfo.from_info_dict(r) for r in roots] json_specs_by_hash = d["concrete_specs"] included_json_specs_by_hash = {} if lockfile_include_key in d: for env_name, env_info in d[lockfile_include_key].items(): included_json_specs_by_hash.update( self._set_included_env_roots(env_name, env_info, included_json_specs_by_hash) ) current_lockfile_format = d["_meta"]["lockfile-version"] try: reader = READER_CLS[current_lockfile_format] except KeyError: msg = ( f"Spack {spack.__version__} cannot read the lockfile '{self.lock_path}', using " f"the v{current_lockfile_format} format." ) if CURRENT_LOCKFILE_VERSION < current_lockfile_format: msg += " You need to use a newer Spack version." raise SpackEnvironmentError(msg) concretized_order = [x.hash for x in self.concretized_roots] first_seen, concretized_order = self._filter_specs( reader, json_specs_by_hash, concretized_order ) for idx, spec_dag_hash in enumerate(concretized_order): self.concretized_roots[idx].hash = spec_dag_hash self.specs_by_hash[spec_dag_hash] = first_seen[spec_dag_hash] if any(self.included_concretized_roots.values()): first_seen = {} for env_name, roots in self.included_concretized_roots.items(): order = [x.hash for x in roots] filtered_spec, new_order = self._filter_specs( reader, included_json_specs_by_hash, order ) first_seen.update(filtered_spec) for idx, spec_dag_hash in enumerate(new_order): roots[idx].hash = spec_dag_hash for env_path, roots in self.included_concretized_roots.items(): self.included_specs_by_hash[env_path] = {x.hash: first_seen[x.hash] for x in roots} def _filter_specs(self, reader, json_specs_by_hash, order_concretized): # Track specs by their lockfile key. Currently, spack uses the finest # grained hash as the lockfile key, while older formats used the build # hash or a previous incarnation of the DAG hash (one that did not # include build deps or package hash). specs_by_hash = {} # Track specs by their DAG hash, allows handling DAG hash collisions first_seen = {} # First pass: Put each spec in the map ignoring dependencies for lockfile_key, node_dict in json_specs_by_hash.items(): spec = reader.from_node_dict(node_dict) if not spec._hash: # in v1 lockfiles, the hash only occurs as a key spec._hash = lockfile_key specs_by_hash[lockfile_key] = spec # Second pass: For each spec, get its dependencies from the node dict # and add them to the spec, including build specs for lockfile_key, node_dict in json_specs_by_hash.items(): name, data = reader.name_and_data(node_dict) for _, dep_hash, deptypes, _, virtuals, direct in reader.dependencies_from_node_dict( data ): specs_by_hash[lockfile_key]._add_dependency( specs_by_hash[dep_hash], depflag=dt.canonicalize(deptypes), virtuals=virtuals, direct=direct, ) if "build_spec" in node_dict: _, bhash, _ = reader.extract_build_spec_info_from_node_dict(node_dict) specs_by_hash[lockfile_key]._build_spec = specs_by_hash[bhash] # Traverse the root specs one at a time in the order they appear. # The first time we see each DAG hash, that's the one we want to # keep. This is only required as long as we support older lockfile # formats where the mapping from DAG hash to lockfile key is possibly # one-to-many. for lockfile_key in order_concretized: for s in specs_by_hash[lockfile_key].traverse(): if s.dag_hash() not in first_seen: first_seen[s.dag_hash()] = s # Now make sure concretized_order and our internal specs dict # contains the keys used by modern spack (i.e. the dag_hash # that includes build deps and package hash). order_concretized = [specs_by_hash[h_key].dag_hash() for h_key in order_concretized] return first_seen, order_concretized
[docs] def write(self, regenerate: bool = True) -> None: """Writes an in-memory environment to its location on disk. Write out package files for each newly concretized spec. Also regenerate any views associated with the environment and run post-write hooks, if regenerate is True. Args: regenerate: regenerate views and run post-write hooks as well as writing if True. """ self.manifest_uptodate_or_warn() if self.specs_by_hash or self.included_concrete_env_root_dirs: self.ensure_env_directory_exists(dot_env=True) self.update_environment_repository() self.manifest.flush() # Write the lock file last. This is useful for Makefiles # with `spack.lock: spack.yaml` rules, where the target # should be newer than the prerequisite to avoid # redundant re-concretization. self.update_lockfile() else: self.ensure_env_directory_exists(dot_env=False) with fs.safe_remove(self.lock_path): self.manifest.flush() if regenerate: self.regenerate_views() for x in self.concretized_roots: x.new = False
[docs] def update_lockfile(self) -> None: with fs.write_tmp_and_move(self.lock_path, encoding="utf-8") as f: sjson.dump(self._to_lockfile_dict(), stream=f)
[docs] def ensure_env_directory_exists(self, dot_env: bool = False) -> None: """Ensure that the root directory of the environment exists Args: dot_env: if True also ensures that the <root>/.env directory exists """ fs.mkdirp(self.path) if dot_env: fs.mkdirp(self.env_subdir_path)
[docs] def update_environment_repository(self) -> None: """Updates the repository associated with the environment.""" new_specs = [self.specs_by_hash[x.hash] for x in self.concretized_roots if x.new] for spec in traverse.traverse_nodes(new_specs): if not spec.concrete: raise ValueError("specs passed to environment.write() must be concrete!") self._add_to_environment_repository(spec)
def _add_to_environment_repository(self, spec_node: Spec) -> None: """Add the root node of the spec to the environment repository""" namespace: str = spec_node.namespace repository = spack.repo.create_or_construct( root=os.path.join(self.repos_path, namespace), namespace=namespace, package_api=spack.repo.PATH.get_repo(namespace).package_api, ) pkg_dir = repository.dirname_for_package_name(spec_node.name) fs.mkdirp(pkg_dir) spack.repo.PATH.dump_provenance(spec_node, pkg_dir)
[docs] def manifest_uptodate_or_warn(self): """Emits a warning if the manifest file is not up-to-date.""" if not is_latest_format(self.manifest_path): ver = ".".join(str(s) for s in spack.spack_version_info[:2]) msg = ( 'The environment "{}" is written to disk in a deprecated format. ' "Please update it using:\n\n" "\tspack env update {}\n\n" "Note that versions of Spack older than {} may not be able to " "use the updated configuration." ) warnings.warn(msg.format(self.name, self.name, ver))
def _default_view_as_yaml(self): """This internal function assumes the default view is set""" path = self.default_view.raw_root if ( self.default_view == ViewDescriptor(self.path, self.view_path_default) and len(self.views) == 1 ): return True if self.default_view == ViewDescriptor(self.path, path) and len(self.views) == 1: return path return self.default_view.to_dict()
[docs] def invalidate_repository_cache(self): self._repo = None
def __enter__(self): self._previous_active = _active_environment if self._previous_active: deactivate() activate(self) return self def __exit__(self, exc_type, exc_val, exc_tb): deactivate() if self._previous_active: activate(self._previous_active)
def _is_uninstalled(spec): return not spec.installed or (spec.satisfies("dev_path=*") or spec.satisfies("^dev_path=*"))
[docs] class ReusableSpecsFactory: """Creates a list of SpecFilters to generate the reusable specs for the environment""" def __init__(self, *, env: Environment, group: str): self.env = env self.group = group @staticmethod def _const(specs: List[Spec]) -> Callable[[], List[Spec]]: """Returns a zero-argument callable that always returns the given list.""" return lambda: specs def __call__( self, is_usable: Callable[[Spec], bool], configuration: spack.config.Configuration ) -> List[SpecFilter]: result = [] # Specs from group dependencies _must_ be reused, regardless of configuration dependencies = self.env.manifest.needs(group=self.group) necessary_specs = [] for d in dependencies: necessary_specs.extend([x for _, x in self.env.concretized_specs_by(group=d)]) # Specs from groups listed as dependencies if necessary_specs: necessary_specs = list( traverse.traverse_nodes(necessary_specs, deptype=("link", "run")) ) result.append( SpecFilter( self._const(necessary_specs), include=[], exclude=[], is_usable=is_usable ) ) # Included environments and _this_ group, instead, are subject to configuration concretizer_yaml = configuration.get_config("concretizer") reuse_yaml = concretizer_yaml.get("reuse", False) # With no reuse don't account for previously concretized specs in _this_ group if reuse_yaml is False: return result this_group_specs = [x for _, x in self.env.concretized_specs_by(group=self.group)] included_specs = [ x for _, x in self.env.concretized_specs_from_all_included_environments() ] additional_specs = list(traverse.traverse_nodes(this_group_specs + included_specs)) if not isinstance(reuse_yaml, Mapping): result.append( SpecFilter( self._const(additional_specs), include=[], exclude=[], is_usable=is_usable ) ) return result # Here we know we have a complex reuse configuration default_include = reuse_yaml.get("include", []) default_exclude = reuse_yaml.get("exclude", []) for source in reuse_yaml.get("from", []): # We just need to take care of the environment-related parts if source["type"] != "environment": continue include = source.get("include", default_include) exclude = source.get("exclude", default_exclude) if "path" not in source: result.append( SpecFilter( self._const(additional_specs), include=include, exclude=exclude, is_usable=is_usable, ) ) continue env_dir = as_env_dir(source["path"]) if env_dir in self.env.included_concrete_env_root_dirs: spec_pairs_from_included_envs = [ x for _, x in self.env.concretized_specs_from_included_environment(env_dir) ] included_specs = list(traverse.traverse_nodes(spec_pairs_from_included_envs)) result.append( SpecFilter( self._const(included_specs), include=include, exclude=exclude, is_usable=is_usable, ) ) return result
[docs] class EnvironmentConcretizer: def __init__(self, env: Environment): self.env = env
[docs] def concretize( self, *, force: Optional[bool] = None, tests: Union[bool, Sequence[str]] = False ) -> List[SpecPair]: if force is None: force = spack.config.get("concretizer:force") self._prepare_environment_for_concretization(force=force) result = [] # Sort so that the ordering is deterministic, and "default" specs are first for current_group in self._order_groups(): with self.env.config_override_for_group(group=current_group): partial_result = self._concretize_single_group(group=current_group, tests=tests) result.extend(partial_result) # Unify the specs objects, so we get correct references to all parents if result: self.env.unify_specs() return result
def _concretize_single_group( self, *, group: str, tests: Union[bool, Sequence[str]] ) -> List[SpecPair]: # Exit early if the set of concretized specs is the set of user specs new_user_specs, kept_user_specs = self._partition_user_specs(group=group) if not new_user_specs: return [] # Pick the right concretization strategy if group != DEFAULT_USER_SPEC_GROUP: tty.msg(f"Concretizing the '{group}' group of specs") unify = spack.config.CONFIG.get_config("concretizer").get("unify", False) factory = ReusableSpecsFactory(env=self.env, group=group) if unify == "when_possible": partial_result = self._concretize_together_where_possible( new_user_specs, kept_user_specs, tests=tests, group=group, factory=factory ) elif unify is True: partial_result = self._concretize_together( new_user_specs, kept_user_specs, tests=tests, group=group, factory=factory ) elif unify is False: partial_result = self._concretize_separately( new_user_specs, kept_user_specs, tests=tests, group=group, factory=factory ) else: raise SpackEnvironmentError(f"concretization strategy not implemented [{unify}]") return partial_result def _prepare_environment_for_concretization(self, *, force: bool): """Reset the environment concrete state and ensure consistency with user specs.""" if force: self.env.clear_concretized_specs() else: self.env.sync_concretized_specs() # If a combined env, check updated spec is in the linked envs if self.env.included_concrete_env_root_dirs: self.env.include_concrete_envs() def _partition_user_specs( self, *, group: str ) -> Tuple[List[spack.spec.Spec], List[spack.spec.Spec]]: """Splits the users specs in the list of the ones to be computed, and the list of the ones to retain. """ concretized_user_specs = {x.root for x in self.env.concretized_roots if x.group == group} kept_user_specs, new_user_specs = stable_partition( self.env.user_specs_by(group=group), lambda x: x in concretized_user_specs ) kept_user_specs += self.env.included_user_specs return new_user_specs, kept_user_specs def _order_groups(self) -> List[str]: done, result = {DEFAULT_USER_SPEC_GROUP}, [DEFAULT_USER_SPEC_GROUP] all_groups = self.env.manifest.groups() remaining = all_groups - {DEFAULT_USER_SPEC_GROUP} # Validate upfront that all 'needs' references point to defined groups for group in remaining: for dep in self.env.manifest.needs(group=group): if dep not in all_groups: raise SpackEnvironmentConfigError( f"group '{group}' needs '{dep}', but '{dep}' is not a defined group", self.env.manifest.manifest_file, ) while remaining: # Check we have groups that are "ready" ready = [] for current in remaining: deps = self.env.manifest.needs(group=current) if all(d in done for d in deps): ready.append(current) # Check we can progress - if nothing is ready, there is a cycle if not ready: raise SpackEnvironmentConfigError( f"cyclic dependency detected among groups: {', '.join(sorted(remaining))}", self.env.manifest.manifest_file, ) result.extend(ready) done.update(ready) remaining.difference_update(ready) return result def _user_spec_pairs( self, user_specs_to_compute: List[Spec], user_specs_to_keep: List[Spec] ) -> List[SpecPair]: specs_to_concretize = [(s, None) for s in user_specs_to_compute] + [ (abstract, concrete) for abstract, concrete in self.env.concretized_specs() if abstract in user_specs_to_keep ] return specs_to_concretize def _concretize_together_where_possible( self, to_compute: List[Spec], to_keep: List[Spec], *, group: Optional[str] = None, tests: Union[bool, Sequence] = False, factory: ReusableSpecsFactory, ) -> List[SpecPair]: import spack.concretize specs_to_concretize = self._user_spec_pairs(to_compute, to_keep) result = spack.concretize.concretize_together_when_possible( specs_to_concretize, tests=tests, factory=factory ) result = [x for x in result if x[0] in to_compute] for abstract, concrete in result: self.env.add_concrete_spec(abstract, concrete, new=True, group=group) return result def _concretize_together( self, to_compute: List[Spec], to_keep: List[Spec], *, group: Optional[str] = None, tests: Union[bool, Sequence] = False, factory: ReusableSpecsFactory, ) -> List[SpecPair]: import spack.concretize to_concretize = self._user_spec_pairs(to_compute, to_keep) try: concrete_pairs = spack.concretize.concretize_together( to_concretize, tests=tests, factory=factory ) except spack.error.UnsatisfiableSpecError as e: # "Enhance" the error message for multiple root specs, suggest a less strict # form of concretization. if len(self.env.user_specs_by(group=group)) > 1: e.message += ". " if to_keep: e.message += ( "Couldn't concretize without changing the existing environment. " "If you are ok with changing it, try `spack concretize --force`. " ) e.message += ( "You could consider setting `concretizer:unify` to `when_possible` " "or `false` to allow multiple versions of some packages." ) raise # Return the portion of the return value that is new result = concrete_pairs[: len(to_compute)] for abstract, concrete in result: self.env.add_concrete_spec(abstract, concrete, new=True, group=group) return result def _concretize_separately( self, to_compute: List[Spec], to_keep: List[Spec], *, group: Optional[str] = None, tests: Union[bool, Sequence] = False, factory: ReusableSpecsFactory, ) -> List[SpecPair]: """Concretization strategy that concretizes separately one user spec after the other""" import spack.concretize to_concretize = [(x, None) for x in to_compute] concrete_pairs = spack.concretize.concretize_separately( to_concretize, tests=tests, factory=factory ) for abstract, concrete in concrete_pairs: self.env.add_concrete_spec(abstract, concrete, new=True, group=group) return concrete_pairs
[docs] def yaml_equivalent(first, second) -> bool: """Returns whether two spack yaml items are equivalent, including overrides""" # YAML has timestamps and dates, but we don't use them yet in schemas if isinstance(first, dict): return isinstance(second, dict) and _equiv_dict(first, second) elif isinstance(first, list): return isinstance(second, list) and _equiv_list(first, second) elif isinstance(first, bool): return isinstance(second, bool) and first is second elif isinstance(first, int): return isinstance(second, int) and first == second elif first is None: return second is None else: # it's a string return isinstance(second, str) and first == second
def _equiv_list(first, second): """Returns whether two spack yaml lists are equivalent, including overrides""" if len(first) != len(second): return False return all(yaml_equivalent(f, s) for f, s in zip(first, second)) def _equiv_dict(first, second): """Returns whether two spack yaml dicts are equivalent, including overrides""" if len(first) != len(second): return False same_values = all(yaml_equivalent(fv, sv) for fv, sv in zip(first.values(), second.values())) same_keys_with_same_overrides = all( fk == sk and getattr(fk, "override", False) == getattr(sk, "override", False) for fk, sk in zip(first.keys(), second.keys()) ) return same_values and same_keys_with_same_overrides
[docs] def display_specs(specs: List[spack.spec.Spec], *, highlight_non_defaults: bool = False) -> None: """Displays a list of specs traversed breadth-first, covering nodes, with install status. Args: specs: list of specs to be displayed highlight_non_defaults: if True, highlights non-default versions and variants in the specs being displayed """ tree_string = spack.spec.tree( specs, format=spack.spec.DISPLAY_FORMAT, hashes=True, hashlen=7, status_fn=spack.spec.Spec.install_status, highlight_version_fn=( spack.package_base.non_preferred_version if highlight_non_defaults else None ), highlight_variant_fn=( spack.package_base.non_default_variant if highlight_non_defaults else None ), key=traverse.by_dag_hash, ) print(tree_string)
[docs] def make_repo_path(root): """Make a RepoPath from the repo subdirectories in an environment.""" repos = ( spack.repo.from_path(os.path.dirname(p)) for p in glob.glob(os.path.join(root, "**", "repo.yaml"), recursive=True) ) return spack.repo.RepoPath(*repos)
[docs] def manifest_file(env_name_or_dir): """Return the absolute path to a manifest file given the environment name or directory. Args: env_name_or_dir (str): either the name of a valid environment or a directory where a manifest file resides Raises: AssertionError: if the environment is not found """ env_dir = None if is_env_dir(env_name_or_dir): env_dir = os.path.abspath(env_name_or_dir) elif exists(env_name_or_dir): env_dir = os.path.abspath(root(env_name_or_dir)) assert env_dir, "environment not found [env={0}]".format(env_name_or_dir) return os.path.join(env_dir, manifest_name)
[docs] def update_yaml(manifest, backup_file): """Update a manifest file from an old format to the current one. Args: manifest (str): path to a manifest file backup_file (str): file where to copy the original manifest Returns: True if the manifest was updated, False otherwise. Raises: AssertionError: in case anything goes wrong during the update """ # Check if the environment needs update with open(manifest, encoding="utf-8") as f: data = syaml.load(f) top_level_key = _top_level_key(data) needs_update = spack.schema.env.update(data[top_level_key]) if not needs_update: msg = "No update needed [manifest={0}]".format(manifest) tty.debug(msg) return False # Copy environment to a backup file and update it msg = ( 'backup file "{0}" already exists on disk. Check its content ' "and remove it before trying to update again." ) assert not os.path.exists(backup_file), msg.format(backup_file) shutil.copy(manifest, backup_file) with open(manifest, "w", encoding="utf-8") as f: syaml.dump_config(data, f) return True
def _top_level_key(data): """Return the top level key used in this environment Args: data (dict): raw yaml data of the environment Returns: Either 'spack' or 'env' """ msg = 'cannot find top level attribute "spack" or "env" in the environment' assert any(x in data for x in ("spack", "env")), msg if "spack" in data: return "spack" return "env"
[docs] def is_latest_format(manifest): """Return False if the manifest file exists and is not in the latest schema format. Args: manifest (str): manifest file to be analyzed """ try: with open(manifest, encoding="utf-8") as f: data = syaml.load(f) except OSError: return True top_level_key = _top_level_key(data) changed = spack.schema.env.update(data[top_level_key]) return not changed
[docs] @contextlib.contextmanager def no_active_environment(): """Deactivate the active environment for the duration of the context. Has no effect when there is no active environment.""" env = active_environment() try: deactivate() yield finally: # TODO: we don't handle `use_env_repo` here. if env: activate(env)
[docs] def initialize_environment_dir( environment_dir: Union[str, pathlib.Path], envfile: Optional[Union[str, pathlib.Path]] ) -> None: """Initialize an environment directory starting from an envfile. Files with suffix .json or .lock are considered lockfiles. Files with any other name are considered manifest files. Args: environment_dir: directory where the environment should be placed envfile: manifest file or lockfile used to initialize the environment Raises: SpackEnvironmentError: if the directory can't be initialized """ environment_dir = pathlib.Path(environment_dir) target_lockfile = environment_dir / lockfile_name target_manifest = environment_dir / manifest_name if target_manifest.exists(): msg = f"cannot initialize environment, {target_manifest} already exists" raise SpackEnvironmentError(msg) if target_lockfile.exists(): msg = f"cannot initialize environment, {target_lockfile} already exists" raise SpackEnvironmentError(msg) def _ensure_env_dir(): try: environment_dir.mkdir(parents=True, exist_ok=True) except FileExistsError as e: msg = f"cannot initialize the environment, '{environment_dir}' already exists" raise SpackEnvironmentError(msg) from e if envfile is None: _ensure_env_dir() target_manifest.write_text(default_manifest_yaml()) return envfile = pathlib.Path(envfile) if not envfile.exists(): msg = f"cannot initialize environment, {envfile} is not a valid file" raise SpackEnvironmentError(msg) if envfile.is_dir(): # initialization file is an entire env directory if not (envfile / "spack.yaml").is_file(): msg = f"cannot initialize environment, {envfile} is not a valid environment" raise SpackEnvironmentError(msg) copy_tree(str(envfile), str(environment_dir)) return _ensure_env_dir() # When we have a lockfile we should copy that and produce a consistent default manifest if str(envfile).endswith(".lock") or str(envfile).endswith(".json"): shutil.copy(envfile, target_lockfile) # This constructor writes a spack.yaml which is consistent with the root # specs in the spack.lock try: EnvironmentManifestFile.from_lockfile(environment_dir) except Exception as e: msg = f"cannot initialize environment, '{environment_dir}' from lockfile" raise SpackEnvironmentError(msg) from e return shutil.copy(envfile, target_manifest) # Copy relative path includes that live inside the environment dir try: manifest = EnvironmentManifestFile(environment_dir) except Exception: # error handling for bad manifests is handled on other code paths return # TODO: make this recursive includes = manifest[TOP_LEVEL_KEY].get(manifest_include_name, []) paths = spack.config.paths_from_includes(includes) for path in paths: if os.path.isabs(path): continue abspath = pathlib.Path(os.path.normpath(environment_dir / path)) common_path = pathlib.Path(os.path.commonpath([environment_dir, abspath])) if common_path != environment_dir: tty.debug(f"Will not copy relative include file from outside environment: {path}") continue orig_abspath = os.path.normpath(envfile.parent / path) if os.path.isfile(orig_abspath): fs.touchp(abspath) shutil.copy(orig_abspath, abspath) continue if not os.path.exists(orig_abspath): tty.warn(f"Skipping copy of non-existent include path: '{path}'") continue if os.path.exists(abspath): tty.warn(f"Skipping copy of directory over existing path: {path}") continue shutil.copytree(orig_abspath, abspath, symlinks=True)
[docs] class EnvironmentManifestFile(collections.abc.Mapping): """Manages the in-memory representation of a manifest file, and its synchronization with the actual manifest on disk. """
[docs] @staticmethod def from_lockfile(manifest_dir: Union[pathlib.Path, str]) -> "EnvironmentManifestFile": """Returns an environment manifest file compatible with the lockfile already present in the environment directory. This function also writes a spack.yaml file that is consistent with the spack.lock already existing in the directory. Args: manifest_dir: directory containing the manifest and lockfile """ # TBD: Should this be the abspath? manifest_dir = pathlib.Path(manifest_dir) lockfile = manifest_dir / lockfile_name with lockfile.open("r", encoding="utf-8") as f: data = sjson.load(f) roots = data["roots"] user_specs_by_group: Dict[str, List[str]] = {} for item in roots: # "group" is not there for Lockfile v6 and lower group = item.get("group", DEFAULT_USER_SPEC_GROUP) user_specs_by_group.setdefault(group, []).append(item["spec"]) default_content = manifest_dir / manifest_name default_content.write_text(default_manifest_yaml()) manifest = EnvironmentManifestFile(manifest_dir) for group, specs in user_specs_by_group.items(): for spec in specs: manifest.add_user_spec(spec, group=group) manifest.flush() return manifest
def __init__(self, manifest_dir: Union[pathlib.Path, str], name: Optional[str] = None) -> None: self.manifest_dir = pathlib.Path(manifest_dir) self.name = name or str(manifest_dir) self.manifest_file = self.manifest_dir / manifest_name self.scope_name = f"env:{self.name}" self.config_stage_dir = os.path.join(env_subdir_path(manifest_dir), "config") #: Configuration scope associated with this environment. Note that this is not #: invalidated by a re-read of the manifest file. self._env_config_scope: Optional[spack.config.ConfigScope] = None if not self.manifest_file.exists(): msg = f"cannot find '{manifest_name}' in {self.manifest_dir}" raise SpackEnvironmentError(msg) with self.manifest_file.open(encoding="utf-8") as f: self.yaml_content = _read_yaml(f) # Maps groups to their dependencies self._groups: Dict[str, Tuple[str, ...]] = {DEFAULT_USER_SPEC_GROUP: tuple()} # Raw YAML definitions of the user specs for each group self._user_specs: Dict[str, List] = {DEFAULT_USER_SPEC_GROUP: []} # Configuration overrides for each group self._config_override: Dict[str, Any] = {DEFAULT_USER_SPEC_GROUP: None} # Whether specs in each group are marked explicit self._explicit: Dict[str, bool] = {DEFAULT_USER_SPEC_GROUP: True} self._init_user_specs() self.changed = False def _init_user_specs(self): specs_yaml = self.configuration.get(USER_SPECS_KEY, []) for item in specs_yaml: if isinstance(item, str): self._user_specs[DEFAULT_USER_SPEC_GROUP].append(item) elif isinstance(item, dict): group = item.get("group", DEFAULT_USER_SPEC_GROUP) # Error if a group is defined more than once if group != DEFAULT_USER_SPEC_GROUP and group in self._groups: raise SpackEnvironmentConfigError( f"group '{group}' defined more than once", self.manifest_file ) # Add an entry for the user specs and store group dependencies if group not in self._user_specs: self._user_specs[group] = [] self._groups[group] = tuple(item.get("needs", ())) self._config_override[group] = item.get("override", None) self._explicit[group] = item.get("explicit", True) if "matrix" in item: # Short form if the group is composed of only one matrix self._user_specs[group].append({"matrix": item["matrix"]}) elif "specs" in item: self._user_specs[group].extend(item["specs"]) def _clear_user_specs(self) -> None: self._user_specs = {DEFAULT_USER_SPEC_GROUP: []} self._groups = {DEFAULT_USER_SPEC_GROUP: tuple()} self._config_override = {DEFAULT_USER_SPEC_GROUP: None} self._explicit = {DEFAULT_USER_SPEC_GROUP: True} def _all_matches(self, user_spec: str) -> List[str]: """Maps the input string to the first equivalent user spec in the manifest, and returns it. Args: user_spec: user spec to be found Raises: ValueError: if no equivalent match is found """ result = [] for yaml_spec_str in self.configuration["specs"]: if Spec(yaml_spec_str) == Spec(user_spec): result.append(yaml_spec_str) if not result: raise ValueError(f"cannot find a spec equivalent to {user_spec}") return result
[docs] def user_specs(self, *, group: Optional[str] = None) -> List: group = self._ensure_group_exists(group) return self._user_specs[group]
[docs] def config_override( self, *, group: Optional[str] = None ) -> Optional[spack.config.InternalConfigScope]: group = self._ensure_group_exists(group) data = self._config_override[group] if data is None: return None return spack.config.InternalConfigScope(f"env:groups:{group}", data)
[docs] def groups(self) -> KeysView: """Returns the list of groups defined in the manifest""" return self._groups.keys()
[docs] def needs(self, *, group: Optional[str] = None) -> Tuple[str, ...]: """Returns the dependencies of a group of user specs.""" group = self._ensure_group_exists(group) return self._groups[group]
[docs] def is_explicit(self, *, group: Optional[str] = None) -> bool: """Returns whether specs in a group are marked explicit. When False, specs in the group are installed as implicit dependencies and are eligible for garbage collection once no other spec depends on them. """ group = self._ensure_group_exists(group) return self._explicit[group]
def _ensure_group_exists(self, group: Optional[str]) -> str: group = DEFAULT_USER_SPEC_GROUP if group is None else group if group not in self._groups: raise ValueError(f"user specs group '{group}' not found in {self.manifest_file}") return group
[docs] def add_user_spec(self, user_spec: str, *, group: Optional[str] = None) -> None: """Appends the user spec passed as input to the list of root specs for the given group. Args: user_spec: user spec to be appended group: group where the spec should be added. If None, the default group is used. """ group = group or DEFAULT_USER_SPEC_GROUP if group == DEFAULT_USER_SPEC_GROUP: # Append to top-most specs: attribute specs_yaml = self.configuration.setdefault("specs", []) specs_yaml.append(user_spec) else: # Append to specs: attribute within a group group_in_yaml = self._get_group(group) group_in_yaml.setdefault("specs", []).append(user_spec) self._user_specs[group].append(user_spec) self.changed = True
def _get_group(self, group: str) -> Dict: """Find or create the group entry in the manifest""" specs_yaml = self.configuration.setdefault("specs", []) group_entry = None for item in specs_yaml: if isinstance(item, dict) and item.get("group") == group: group_entry = item break if group_entry is None: group_entry = {"group": group, "specs": []} specs_yaml.append(group_entry) self._groups[group] = tuple() self._config_override[group] = None self._user_specs[group] = [] self._explicit[group] = True return group_entry
[docs] def remove_user_spec(self, user_spec: str) -> None: """Removes the user spec passed as input from the default list of root specs Args: user_spec: user spec to be removed Raises: SpackEnvironmentError: when the user spec is not in the list """ try: for key in self._all_matches(user_spec): self.configuration["specs"].remove(key) self._user_specs[DEFAULT_USER_SPEC_GROUP].remove(key) except ValueError as e: msg = f"cannot remove {user_spec} from {self}, no such spec exists" raise SpackEnvironmentError(msg) from e self.changed = True
[docs] def clear(self) -> None: """Clear all user specs from the list of root specs""" self.configuration["specs"] = [] self._clear_user_specs() self.changed = True
[docs] def override_user_spec(self, user_spec: str, idx: int) -> None: """Overrides the user spec at index idx with the one passed as input. Args: user_spec: new user spec idx: index of the spec to be overridden Raises: SpackEnvironmentError: when the user spec cannot be overridden """ try: self.configuration["specs"][idx] = user_spec self._clear_user_specs() self._init_user_specs() except ValueError as e: msg = f"cannot override {user_spec} from {self}" raise SpackEnvironmentError(msg) from e self.changed = True
[docs] def set_include_concrete(self, include_concrete: List[str]) -> None: """Sets the included concrete environments in the manifest to the value(s) passed as input. Args: include_concrete: list of already existing concrete environments to include """ self.configuration[lockfile_include_key] = list(include_concrete) self.changed = True
[docs] def add_definition(self, user_spec: str, list_name: str) -> None: """Appends a user spec to the first active definition matching the name passed as argument. Args: user_spec: user spec to be appended list_name: name of the definition where to append Raises: SpackEnvironmentError: is no valid definition exists already """ defs = self.configuration.get("definitions", []) msg = f"cannot add {user_spec} to the '{list_name}' definition, no valid list exists" for idx, item in self._iterate_on_definitions(defs, list_name=list_name, err_msg=msg): item[list_name].append(user_spec) break # "definitions" can be remote, so we need to update the global config too spack.config.CONFIG.set("definitions", defs, scope=self.scope_name) self.changed = True
[docs] def remove_definition(self, user_spec: str, list_name: str) -> None: """Removes a user spec from an active definition that matches the name passed as argument. Args: user_spec: user spec to be removed list_name: name of the definition where to remove the spec from Raises: SpackEnvironmentError: if the user spec cannot be removed from the list, or the list does not exist """ defs = self.configuration.get("definitions", []) msg = f"cannot remove {user_spec} from the '{list_name}' definition, no valid list exists" for idx, item in self._iterate_on_definitions(defs, list_name=list_name, err_msg=msg): try: item[list_name].remove(user_spec) break except ValueError: pass # "definitions" can be remote, so we need to update the global config too spack.config.CONFIG.set("definitions", defs, scope=self.scope_name) self.changed = True
[docs] def override_definition(self, user_spec: str, *, override: str, list_name: str) -> None: """Overrides a user spec from an active definition that matches the name passed as argument. Args: user_spec: user spec to be overridden override: new spec to be used list_name: name of the definition where to override the spec Raises: SpackEnvironmentError: if the user spec cannot be overridden """ defs = self.configuration.get("definitions", []) msg = f"cannot override {user_spec} with {override} in the '{list_name}' definition" for idx, item in self._iterate_on_definitions(defs, list_name=list_name, err_msg=msg): try: sub_index = item[list_name].index(user_spec) item[list_name][sub_index] = override break except ValueError: pass # "definitions" can be remote, so we need to update the global config too spack.config.CONFIG.set("definitions", defs, scope=self.scope_name) self.changed = True
def _iterate_on_definitions(self, definitions, *, list_name, err_msg): """Iterates on definitions, returning the active ones matching a given name.""" def extract_name(_item): names = list(x for x in _item if x != "when") assert len(names) == 1, f"more than one name in {_item}" return names[0] for idx, item in enumerate(definitions): name = extract_name(item) if name != list_name: continue condition_str = item.get("when", "True") if not spack.spec.eval_conditional(condition_str): continue yield idx, item else: raise SpackEnvironmentError(err_msg)
[docs] def set_default_view(self, view: Union[bool, str, pathlib.Path, Dict[str, str]]) -> None: """Sets the default view root in the manifest to the value passed as input. Args: view: If the value is a string or a path, it specifies the path to the view. If True the default view is used for the environment, if False there's no view. """ if isinstance(view, dict): self.configuration["view"][default_view_name].update(view) self.changed = True return if not isinstance(view, bool): view = str(view) self.configuration["view"] = view self.changed = True
[docs] def remove_default_view(self) -> None: """Removes the default view from the manifest file""" view_data = self.configuration.get("view") if isinstance(view_data, collections.abc.Mapping): self.configuration["view"].pop(default_view_name) self.changed = True return self.set_default_view(view=False)
[docs] def flush(self) -> None: """Synchronizes the object with the manifest file on disk.""" if not self.changed: return with fs.write_tmp_and_move(os.path.realpath(self.manifest_file)) as f: _write_yaml(self.yaml_content, f) self.changed = False
@property def configuration(self): """Return the dictionaries in the pristine YAML, without the top level attribute""" return self.yaml_content[TOP_LEVEL_KEY] def __len__(self): return len(self.yaml_content) def __getitem__(self, key): return self.yaml_content[key] def __iter__(self): return iter(self.yaml_content) def __str__(self): return str(self.manifest_file) @property def env_config_scope(self) -> spack.config.ConfigScope: """The configuration scope for the environment manifest""" if self._env_config_scope is None: self._env_config_scope = spack.config.SingleFileScope( self.scope_name, str(self.manifest_file), spack.schema.env.schema, yaml_path=[TOP_LEVEL_KEY], ) ensure_no_disallowed_env_config_mods(self._env_config_scope) return self._env_config_scope
[docs] def prepare_config_scope(self) -> None: """Add the manifest's scope to the global configuration search path.""" spack.config.CONFIG.push_scope( self.env_config_scope, priority=ConfigScopePriority.ENVIRONMENT )
[docs] def deactivate_config_scope(self) -> None: """Remove the manifest's scope from the global config path.""" spack.config.CONFIG.remove_scope(self.env_config_scope.name)
[docs] @contextlib.contextmanager def use_config(self): """Ensure only the manifest's configuration scopes are global.""" with no_active_environment(): self.prepare_config_scope() yield self.deactivate_config_scope()
[docs] def environment_path_scope(name: str, path: str) -> Optional[spack.config.ConfigScope]: """Retrieve the suitably named environment path scope Arguments: name: configuration scope name path: path to configuration file(s) Returns: list of environment scopes, if any, or None """ if exists(path): # managed environment manifest = EnvironmentManifestFile(root(path)) elif is_env_dir(path): # anonymous environment manifest = EnvironmentManifestFile(path) else: return None manifest.env_config_scope.name = f"{name}:{manifest.env_config_scope.name}" manifest.env_config_scope.writable = False return manifest.env_config_scope
[docs] class SpackEnvironmentError(spack.error.SpackError): """Superclass for all errors to do with Spack environments."""
[docs] class SpackEnvironmentViewError(SpackEnvironmentError): """Class for errors regarding view generation."""
[docs] class SpackEnvironmentConfigError(SpackEnvironmentError): """Class for Spack environment-specific configuration errors.""" def __init__(self, msg, filename): super().__init__(f"{msg} in {filename}")
[docs] class SpackEnvironmentDevelopError(SpackEnvironmentError): """Class for errors in applying develop information to an environment."""