Source code for spack.solver.runtimes

# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import itertools
from typing import Any, Dict, Set, Tuple

import spack.compilers.config
import spack.compilers.libraries
import spack.config
import spack.repo
import spack.spec
import spack.util.libc
import spack.version

from .core import SourceContext, fn, using_libc_compatibility
from .versions import Provenance


[docs] class RuntimePropertyRecorder: """An object of this class is injected in callbacks to compilers, to let them declare properties of the runtimes they support and of the runtimes they provide, and to add runtime dependencies to the nodes using said compiler. The usage of the object is the following. First, a runtime package name or the wildcard "*" are passed as an argument to __call__, to set which kind of package we are referring to. Then we can call one method with a directive-like API. Examples: >>> pkg = RuntimePropertyRecorder(setup) >>> # Every package compiled with %gcc has a link dependency on 'gcc-runtime' >>> pkg("*").depends_on( ... "gcc-runtime", ... when="%gcc", ... type="link", ... description="If any package uses %gcc, it depends on gcc-runtime" ... ) >>> # The version of gcc-runtime is the same as the %gcc used to "compile" it >>> pkg("gcc-runtime").requires("@=9.4.0", when="%gcc@=9.4.0") """ def __init__(self, setup): self._setup = setup self.rules = [] self.runtime_conditions = set() self.injected_dependencies = set() # State of this object set in the __call__ method, and reset after # each directive-like method self.current_package = None def __call__(self, package_name: str) -> "RuntimePropertyRecorder": """Sets a package name for the next directive-like method call""" assert self.current_package is None, f"state was already set to '{self.current_package}'" self.current_package = package_name return self
[docs] def reset(self): """Resets the current state.""" self.current_package = None
[docs] def depends_on(self, dependency_str: str, *, when: str, type: str, description: str) -> None: """Injects conditional dependencies on packages. Conditional dependencies can be either "real" packages or virtual dependencies. Args: dependency_str: the dependency spec to inject when: anonymous condition to be met on a package to have the dependency type: dependency type description: human-readable description of the rule for adding the dependency """ # TODO: The API for this function is not final, and is still subject to change. At # TODO: the moment, we implemented only the features strictly needed for the # TODO: functionality currently provided by Spack, and we assert nothing else is required. msg = "the 'depends_on' method can be called only with pkg('*')" assert self.current_package == "*", msg when_spec = spack.spec.Spec(when) assert not when_spec.name, "only anonymous when specs are accepted" dependency_spec = spack.spec.Spec(dependency_str) if dependency_spec.versions != spack.version.any_version: self._setup.version_constraints[dependency_spec.name].add(dependency_spec.versions) self.injected_dependencies.add(dependency_spec) body_str, node_variable = self.rule_body_from(when_spec) head_clauses = self._setup.spec_clauses(dependency_spec, body=False) runtime_pkg = dependency_spec.name is_virtual = head_clauses[0].args[0] == "virtual_node" main_rule = ( f"% {description}\n" f'1 {{ attr("depends_on", {node_variable}, node(0..X-1, "{runtime_pkg}"), "{type}") :' f' max_dupes("{runtime_pkg}", X)}} 1:-\n' f"{body_str}." ) if is_virtual: main_rule = ( f"% {description}\n" f'attr("dependency_holds", {node_variable}, "{runtime_pkg}", "{type}") :-\n' f"{body_str}." ) self.rules.append(main_rule) for clause in head_clauses: if clause.args[0] == "node": continue runtime_node = f'node(RuntimeID, "{runtime_pkg}")' head_str = str(clause).replace(f'"{runtime_pkg}"', runtime_node) depends_on_constraint = ( f' attr("depends_on", {node_variable}, {runtime_node}, "{type}"),\n' ) if is_virtual: depends_on_constraint = ( f' attr("depends_on", {node_variable}, ProviderNode, "{type}"),\n' f" provider(ProviderNode, {runtime_node}),\n" ) rule = f"{head_str} :-\n{depends_on_constraint}{body_str}." self.rules.append(rule) self.reset()
[docs] @staticmethod def node_for(name: str) -> str: return f'node(ID{name.replace("-", "_")}, "{name}")'
[docs] def rule_body_from(self, when_spec: "spack.spec.Spec") -> Tuple[str, str]: """Computes the rule body from a "when" spec, and returns it, along with the node variable. """ node_placeholder = "XXX" node_variable = "node(ID, Package)" when_substitutions = {} for s in when_spec.traverse(root=False): when_substitutions[f'"{s.name}"'] = self.node_for(s.name) body_clauses = self._setup.spec_clauses(when_spec, name=node_placeholder, body=True) for clause in body_clauses: if clause.args[0] == "virtual_on_incoming_edges": # Substitute: attr("virtual_on_incoming_edges", ProviderNode, Virtual) # with: attr("virtual_on_edge", ParentNode, ProviderNode, Virtual) # (avoid adding virtuals everywhere, if a single edge needs it) _, provider, virtual = clause.args clause.args = "virtual_on_edge", node_placeholder, provider, virtual # Check for abstract hashes in the body for s in when_spec.traverse(root=False): if s.abstract_hash: body_clauses.append(fn.attr("hash", s.name, s.abstract_hash)) body_str = ",\n".join(f" {x}" for x in body_clauses) body_str = body_str.replace(f'"{node_placeholder}"', f"{node_variable}") for old, replacement in when_substitutions.items(): body_str = body_str.replace(old, replacement) return body_str, node_variable
[docs] def requires(self, impose: str, *, when: str): """Injects conditional requirements on a given package. Args: impose: constraint to be imposed when: condition triggering the constraint """ msg = "the 'requires' method cannot be called with pkg('*') or without setting the package" assert self.current_package is not None and self.current_package != "*", msg imposed_spec = spack.spec.Spec(f"{self.current_package}{impose}") when_spec = spack.spec.Spec(f"{self.current_package}{when}") assert imposed_spec.versions.concrete, f"{impose} must have a concrete version" # Add versions to possible versions for s in (imposed_spec, when_spec): if not s.versions.concrete: continue self._setup.possible_versions[s.name][s.version].append(Provenance.RUNTIME) self.runtime_conditions.add((imposed_spec, when_spec)) self.reset()
[docs] def propagate(self, constraint_str: str, *, when: str): msg = "the 'propagate' method can be called only with pkg('*')" assert self.current_package == "*", msg when_spec = spack.spec.Spec(when) assert not when_spec.name, "only anonymous when specs are accepted" when_substitutions = {} for s in when_spec.traverse(root=False): when_substitutions[f'"{s.name}"'] = self.node_for(s.name) body_str, node_variable = self.rule_body_from(when_spec) constraint_spec = spack.spec.Spec(constraint_str) constraint_clauses = self._setup.spec_clauses(constraint_spec, body=False) for clause in constraint_clauses: if clause.args[0] == "node_version_satisfies": self._setup.version_constraints[constraint_spec.name].add(constraint_spec.versions) args = f'"{constraint_spec.name}", "{constraint_spec.versions}"' head_str = f"propagate({node_variable}, node_version_satisfies({args}))" rule = f"{head_str} :-\n{body_str}." self.rules.append(rule) self.reset()
[docs] def default_flags(self, spec: "spack.spec.Spec"): if not spec.external or "flags" not in spec.extra_attributes: self.reset() return when_spec = spack.spec.Spec(f"%[deptypes=build] {spec}") body_str, node_variable = self.rule_body_from(when_spec) node_placeholder = "XXX" flags = spec.extra_attributes["flags"] root_spec_str = f"{node_placeholder}" for flag_type, default_values in flags.items(): root_spec_str = f"{root_spec_str} {flag_type}='{default_values}'" root_spec = spack.spec.Spec(root_spec_str) head_clauses = self._setup.spec_clauses( root_spec, body=False, context=SourceContext(source="compiler") ) self.rules.append(f"% Default compiler flags for {spec}\n") for clause in head_clauses: if clause.args[0] == "node": continue head_str = str(clause).replace(f'"{node_placeholder}"', f"{node_variable}") rule = f"{head_str} :-\n{body_str}." self.rules.append(rule) self.reset()
[docs] def consume_facts(self): """Consume the facts collected by this object, and emits rules and facts for the runtimes. """ self._setup.gen.h2("Runtimes: declarations") runtime_pkgs = sorted( {x.name for x in self.injected_dependencies if not spack.repo.PATH.is_virtual(x.name)} ) for runtime_pkg in runtime_pkgs: self._setup.gen.fact(fn.runtime(runtime_pkg)) self._setup.gen.newline() self._setup.gen.h2("Runtimes: rules") self._setup.gen.newline() for rule in self.rules: self._setup.gen.append(rule) self._setup.gen.newline() self._setup.gen.h2("Runtimes: requirements") for imposed_spec, when_spec in sorted(self.runtime_conditions): msg = f"{when_spec} requires {imposed_spec} at runtime" _ = self._setup.condition(when_spec, imposed_spec=imposed_spec, msg=msg) self._setup.trigger_rules() self._setup.effect_rules()
def _normalize_packages_yaml(packages_yaml: Dict[str, Any]) -> None: for pkg_name in list(packages_yaml.keys()): is_virtual = spack.repo.PATH.is_virtual(pkg_name) if pkg_name == "all" or not is_virtual: continue # Remove the virtual entry from the normalized configuration data = packages_yaml.pop(pkg_name) is_buildable = data.get("buildable", True) if not is_buildable: for provider in spack.repo.PATH.providers_for(pkg_name): entry = packages_yaml.setdefault(provider.name, {}) entry["buildable"] = False externals = data.get("externals", []) def keyfn(x): return spack.spec.Spec(x["spec"]).name for provider, specs in itertools.groupby(externals, key=keyfn): entry = packages_yaml.setdefault(provider, {}) entry.setdefault("externals", []).extend(specs)
[docs] def external_config_with_implicit_externals( configuration: spack.config.Configuration, ) -> Dict[str, Any]: # Read packages.yaml and normalize it so that it will not contain entries referring to # virtual packages. packages_yaml = configuration.deepcopy_as_builtin("packages", line_info=True) _normalize_packages_yaml(packages_yaml) # Add externals for libc from compilers on Linux if not using_libc_compatibility(): return packages_yaml seen = set() for compiler in spack.compilers.config.all_compilers_from(configuration): libc = spack.compilers.libraries.CompilerPropertyDetector(compiler).default_libc() if libc and libc not in seen: seen.add(libc) entry = {"spec": f"{libc}", "prefix": libc.external_path} packages_yaml.setdefault(libc.name, {}).setdefault("externals", []).append(entry) return packages_yaml
[docs] def all_libcs() -> Set[spack.spec.Spec]: """Return a set of all libc specs targeted by any configured compiler. If none, fall back to libc determined from the current Python process if dynamically linked.""" libcs = set() for c in spack.compilers.config.all_compilers_from(spack.config.CONFIG): candidate = spack.compilers.libraries.CompilerPropertyDetector(c).default_libc() if candidate is not None: libcs.add(candidate) if libcs: return libcs libc = spack.util.libc.libc_from_current_python_process() return {libc} if libc else set()