# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import collections
import collections.abc
import enum
import functools
import gzip
import io
import itertools
import json
import os
import pathlib
import pprint
import random
import re
import sys
import time
import warnings
from typing import (
Any,
Callable,
Dict,
Generator,
Iterable,
Iterator,
List,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Type,
Union,
)
import spack.vendor.archspec.cpu
import spack
import spack.caches
import spack.compilers.config
import spack.compilers.flags
import spack.concretize
import spack.config
import spack.deptypes as dt
import spack.error
import spack.llnl.util.lang
import spack.llnl.util.tty as tty
import spack.package_base
import spack.package_prefs
import spack.platforms
import spack.repo
import spack.solver.splicing
import spack.spec
import spack.store
import spack.util.crypto
import spack.util.hash
import spack.util.lock as lk
import spack.util.module_cmd as md
import spack.util.path
import spack.util.timer
import spack.variant as vt
import spack.version as vn
import spack.version.git_ref_lookup
from spack import traverse
from spack.compilers.libraries import CompilerPropertyDetector
from spack.llnl.util.lang import elide_list
from spack.spec import EMPTY_SPEC
from spack.util.compression import GZipFileType
from .core import (
AspFunction,
AspVar,
NodeId,
SourceContext,
clingo,
extract_args,
fn,
using_libc_compatibility,
)
from .input_analysis import create_counter, create_graph_analyzer
from .requirements import RequirementKind, RequirementOrigin, RequirementParser, RequirementRule
from .reuse import ReusableSpecsSelector, SpecFiltersFactory, create_external_parser
from .runtimes import RuntimePropertyRecorder, all_libcs, external_config_with_implicit_externals
from .versions import Provenance
GitOrStandardVersion = Union[vn.GitVersion, vn.StandardVersion]
TransformFunction = Callable[[str, spack.spec.Spec, List[AspFunction]], List[AspFunction]]
[docs]
class OutputConfiguration(NamedTuple):
"""Data class that contains configuration on what a clingo solve should output."""
#: Print out coarse timers for different solve phases
timers: bool
#: Whether to output Clingo's internal solver statistics
stats: bool
#: Optional output stream for the generated ASP program
out: Optional[io.IOBase]
#: If True, stop after setup and don't solve
setup_only: bool
#: Default output configuration for a solve
DEFAULT_OUTPUT_CONFIGURATION = OutputConfiguration(
timers=False, stats=False, out=None, setup_only=False
)
[docs]
def default_clingo_control():
"""Return a control object with the default settings used in Spack"""
control = clingo().Control()
control.configuration.configuration = "tweety"
control.configuration.solver.heuristic = "Domain"
control.configuration.solver.opt_strategy = "usc"
return control
# Below numbers are used to map names of criteria to the order
# they appear in the solution. See concretize.lp
# The space of possible priorities for optimization targets
# is partitioned in the following ranges:
#
# [0-100) Optimization criteria for software being reused
# [100-200) Fixed criteria that are higher priority than reuse, but lower than build
# [200-300) Optimization criteria for software being built
# [300-1000) High-priority fixed criteria
# [1000-inf) Error conditions
#
# Each optimization target is a minimization with optimal value 0.
#: High fixed priority offset for criteria that supersede all build criteria
high_fixed_priority_offset = 300
#: Priority offset for "build" criteria (regular criterio shifted to
#: higher priority for specs we have to build)
build_priority_offset = 200
#: Priority offset of "fixed" criteria (those w/o build criteria)
fixed_priority_offset = 100
[docs]
class OptimizationKind:
"""Enum for the optimization KIND of a criteria.
It's not using enum.Enum since it must be serializable.
"""
BUILD = 0
CONCRETE = 1
OTHER = 2
[docs]
class OptimizationCriteria(NamedTuple):
"""A named tuple describing an optimization criteria."""
priority: int
value: int
name: str
kind: OptimizationKind
[docs]
def build_criteria_names(costs, arg_tuples):
"""Construct an ordered mapping from criteria names to costs."""
# pull optimization criteria names out of the solution
priorities_names = []
for args in arg_tuples:
priority, name = args[:2]
priority = int(priority)
# Add the priority of this opt criterion and its name
if priority < fixed_priority_offset:
# if the priority is less than fixed_priority_offset, then it
# has an associated build priority -- the same criterion but for
# nodes that we have to build.
priorities_names.append((priority, name, OptimizationKind.CONCRETE))
build_priority = priority + build_priority_offset
priorities_names.append((build_priority, name, OptimizationKind.BUILD))
else:
priorities_names.append((priority, name, OptimizationKind.OTHER))
# sort the criteria by priority
priorities_names = sorted(priorities_names, reverse=True)
# We only have opt-criterion values for non-error types
# error type criteria are excluded (they come first)
error_criteria = len(costs) - len(priorities_names)
costs = costs[error_criteria:]
return [
OptimizationCriteria(priority, value, name, status)
for (priority, name, status), value in zip(priorities_names, costs)
]
[docs]
def specify(spec):
if isinstance(spec, spack.spec.Spec):
return spec
return spack.spec.Spec(spec)
# Caching because the returned function id is used as a cache key
[docs]
@functools.lru_cache(maxsize=None)
def remove_facts(*to_be_removed: str) -> TransformFunction:
"""Returns a transformation function that removes facts from the input list of facts."""
def _remove(name: str, spec: spack.spec.Spec, facts: List[AspFunction]) -> List[AspFunction]:
return [x for x in facts if x.args[0] not in to_be_removed]
return _remove
[docs]
def identity_for_facts(
name: str, spec: spack.spec.Spec, facts: List[AspFunction]
) -> List[AspFunction]:
return facts
# Caching because the returned function id is used as a cache key
[docs]
@functools.lru_cache(maxsize=None)
def dependency_holds(
*, dependency_flags: dt.DepFlag, pkg_cls: Type[spack.package_base.PackageBase]
) -> TransformFunction:
def _transform_fn(
name: str, input_spec: spack.spec.Spec, requirements: List[AspFunction]
) -> List[AspFunction]:
result = remove_facts("node", "virtual_node")(name, input_spec, requirements) + [
fn.attr("dependency_holds", pkg_cls.name, name, dt.flag_to_string(t))
for t in dt.ALL_FLAGS
if t & dependency_flags
]
if name not in pkg_cls.extendees:
return result
return result + [fn.attr("extends", pkg_cls.name, name)]
return _transform_fn
[docs]
def dag_closure_by_deptype(
name: str, spec: spack.spec.Spec, facts: List[AspFunction]
) -> List[AspFunction]:
edges = spec.edges_to_dependencies()
# Compute the "link" transitive closure with `when: root ^[deptypes=link] <this_pkg>`
if len(edges) == 1:
edge = edges[0]
if not edge.direct and edge.depflag == dt.LINK | dt.RUN:
root, leaf = edge.parent.name, edge.spec.name
return [fn.attr("closure", root, leaf, "linkrun")]
return facts
[docs]
def libc_is_compatible(lhs: spack.spec.Spec, rhs: spack.spec.Spec) -> bool:
return (
lhs.name == rhs.name
and lhs.external_path == rhs.external_path
and lhs.version >= rhs.version
)
[docs]
def c_compiler_runs(compiler) -> bool:
return CompilerPropertyDetector(compiler).compiler_verbose_output() is not None
[docs]
def extend_flag_list(flag_list, new_flags):
"""Extend a list of flags, preserving order and precedence.
Add new_flags at the end of flag_list. If any flags in new_flags are
already in flag_list, they are moved to the end so that they take
higher precedence on the compile line.
"""
for flag in new_flags:
if flag in flag_list:
flag_list.remove(flag)
flag_list.append(flag)
def _reorder_flags(flag_list: List[spack.spec.CompilerFlag]) -> List[spack.spec.CompilerFlag]:
"""Reorder a list of flags to ensure that the order matches that of the flag group."""
if not flag_list:
return []
if len({x.flag_group for x in flag_list}) != 1 or len({x.source for x in flag_list}) != 1:
raise InternalConcretizerError(
"internal solver error: cannot reorder compiler flags for concretized specs. "
"Please report a bug at https://github.com/spack/spack/issues"
)
flag_group = flag_list[0].flag_group
flag_source = flag_list[0].source
flag_propagate = flag_list[0].propagate
# Once we have the flag_group, no need to iterate over the flag_list because the
# group represents all of them
return [
spack.spec.CompilerFlag(
flag, propagate=flag_propagate, flag_group=flag_group, source=flag_source
)
for flag, propagate in spack.compilers.flags.tokenize_flags(
flag_group, propagate=flag_propagate
)
]
[docs]
def check_packages_exist(specs):
"""Ensure all packages mentioned in specs exist."""
repo = spack.repo.PATH
for spec in specs:
for s in spec.traverse():
try:
check_passed = repo.repo_for_pkg(s).exists(s.name) or repo.is_virtual(s.name)
except Exception as e:
msg = "Cannot find package: {0}".format(str(e))
check_passed = False
tty.debug(msg)
if not check_passed:
raise spack.repo.UnknownPackageError(str(s.fullname))
[docs]
class Result:
"""Result of an ASP solve."""
def __init__(self, specs):
self.satisfiable = None
self.optimal = None
self.warnings = None
self.nmodels = 0
# specs ordered by optimization level
self.answers = []
# names of optimization criteria
self.criteria = []
# Abstract user requests
self.abstract_specs = specs
# possible dependencies
self.possible_dependencies = None
# Concrete specs
self._concrete_specs_by_input = None
self._concrete_specs = None
self._unsolved_specs = None
[docs]
def raise_if_unsat(self):
"""Raise a generic internal error if the result is unsatisfiable."""
if self.satisfiable:
return
constraints = self.abstract_specs
if len(constraints) == 1:
constraints = constraints[0]
raise SolverError(constraints)
@property
def specs(self):
"""List of concretized specs satisfying the initial
abstract request.
"""
if self._concrete_specs is None:
self._compute_specs_from_answer_set()
return self._concrete_specs
@property
def unsolved_specs(self):
"""List of tuples pairing abstract input specs that were not
solved with their associated candidate spec from the solver
(if the solve completed).
"""
if self._unsolved_specs is None:
self._compute_specs_from_answer_set()
return self._unsolved_specs
@property
def specs_by_input(self) -> Dict[spack.spec.Spec, spack.spec.Spec]:
if self._concrete_specs_by_input is None:
self._compute_specs_from_answer_set()
return self._concrete_specs_by_input # type: ignore
def _compute_specs_from_answer_set(self):
if not self.satisfiable:
self._concrete_specs = []
self._unsolved_specs = list((x, None) for x in self.abstract_specs)
self._concrete_specs_by_input = {}
return
self._concrete_specs, self._unsolved_specs = [], []
self._concrete_specs_by_input = {}
best = min(self.answers)
opt, _, answer = best
for input_spec in self.abstract_specs:
# The specs must be unified to get here, so it is safe to associate any satisfying spec
# with the input. Multiple inputs may be matched to the same concrete spec
node = SpecBuilder.make_node(pkg=input_spec.name)
if spack.repo.PATH.is_virtual(input_spec.name):
providers = [
spec.name for spec in answer.values() if spec.package.provides(input_spec.name)
]
node = SpecBuilder.make_node(pkg=providers[0])
candidate = answer.get(node)
if candidate and candidate.satisfies(input_spec):
self._concrete_specs.append(answer[node])
self._concrete_specs_by_input[input_spec] = answer[node]
elif candidate and candidate.build_spec.satisfies(input_spec):
tty.warn(
"explicit splice configuration has caused the concretized spec"
f" {candidate} not to satisfy the input spec {input_spec}"
)
self._concrete_specs.append(answer[node])
self._concrete_specs_by_input[input_spec] = answer[node]
else:
self._unsolved_specs.append((input_spec, candidate))
[docs]
def to_dict(self) -> dict:
"""Produces dict representation of Result object
Does not include anything related to unsatisfiability as we
are only interested in storing satisfiable results
"""
serial_node_arg = lambda node_dict: (
f"""{{"id": "{node_dict.id}", "pkg": "{node_dict.pkg}"}}"""
)
ret = dict()
ret["criteria"] = self.criteria
ret["optimal"] = self.optimal
ret["warnings"] = self.warnings
ret["nmodels"] = self.nmodels
ret["abstract_specs"] = [str(x) for x in self.abstract_specs]
ret["satisfiable"] = self.satisfiable
serial_answers = []
for answer in self.answers:
serial_answer = answer[:2]
serial_answer_dict = {}
for node, spec in answer[2].items():
serial_answer_dict[serial_node_arg(node)] = spec.to_dict()
serial_answer = serial_answer + (serial_answer_dict,)
serial_answers.append(serial_answer)
ret["answers"] = serial_answers
ret["specs_by_input"] = {}
input_specs = {} if not self.specs_by_input else self.specs_by_input
for input, spec in input_specs.items():
ret["specs_by_input"][str(input)] = spec.to_dict()
return ret
[docs]
@staticmethod
def from_dict(obj: dict):
"""Returns Result object from compatible dictionary"""
def _dict_to_node_argument(dict):
id = dict["id"]
pkg = dict["pkg"]
return NodeId(id=id, pkg=pkg)
def _str_to_spec(spec_str):
return spack.spec.Spec(spec_str)
def _dict_to_spec(spec_dict):
loaded_spec = spack.spec.Spec.from_dict(spec_dict)
_ensure_external_path_if_external(loaded_spec)
spack.spec.Spec.ensure_no_deprecated(loaded_spec)
return loaded_spec
spec_list = obj.get("abstract_specs")
if not spec_list:
raise RuntimeError("Invalid json for concretization Result object")
if spec_list:
spec_list = [_str_to_spec(x) for x in spec_list]
result = Result(spec_list)
criteria = obj.get("criteria")
result.criteria = (
None if criteria is None else [OptimizationCriteria(*t) for t in criteria]
)
result.optimal = obj.get("optimal")
result.warnings = obj.get("warnings")
result.nmodels = obj.get("nmodels")
result.satisfiable = obj.get("satisfiable")
result._unsolved_specs = []
answers = []
for answer in obj.get("answers", []):
loaded_answer = answer[:2]
answer_node_dict = {}
for node, spec in answer[2].items():
answer_node_dict[_dict_to_node_argument(json.loads(node))] = _dict_to_spec(spec)
loaded_answer.append(answer_node_dict)
answers.append(tuple(loaded_answer))
result.answers = answers
result._concrete_specs_by_input = {}
result._concrete_specs = []
for input, spec in obj.get("specs_by_input", {}).items():
result._concrete_specs_by_input[_str_to_spec(input)] = _dict_to_spec(spec)
result._concrete_specs.append(_dict_to_spec(spec))
return result
def __eq__(self, other):
eq = (
self.satisfiable == other.satisfiable,
self.optimal == other.optimal,
self.warnings == other.warnings,
self.nmodels == other.nmodels,
self.criteria == other.criteria,
self.answers == other.answers,
self.abstract_specs == other.abstract_specs,
self._concrete_specs_by_input == other._concrete_specs_by_input,
self._concrete_specs == other._concrete_specs,
self._unsolved_specs == other._unsolved_specs,
# Not considered for equality
# self.control
# self.possible_dependencies
# self.possible_dependencies
)
return all(eq)
[docs]
class ConcretizationCache:
"""Store for Spack concretization results and statistics
Serializes solver result objects and statistics to json and stores
at a given endpoint in a cache associated by the sha256 of the
asp problem and the involved control files.
"""
def __init__(self, root: Union[str, None] = None):
root = root or spack.config.get("concretizer:concretization_cache:url", None)
if root is None:
root = os.path.join(spack.caches.misc_cache_location(), "concretization")
self.root = pathlib.Path(spack.util.path.canonicalize_path(root))
self.root.mkdir(parents=True, exist_ok=True)
self._lockfile = self.root / ".cc_lock"
[docs]
def cleanup(self):
"""Prunes the concretization cache according to configured entry
count limits. Cleanup is done in LRU ordering."""
entry_limit = spack.config.get("concretizer:concretization_cache:entry_limit", 1000)
# determine if we even need to clean up
entries = list(self.cache_entries())
if len(entries) <= entry_limit:
return
# collect stat info for mod time about all entries
removal_queue = []
for entry in entries:
try:
entry_stat_info = entry.stat()
# mtime will always be time of last use as we update it after
# each read and obviously after each write
mod_time = entry_stat_info.st_mtime
removal_queue.append((mod_time, entry))
except FileNotFoundError:
# don't need to cleanup the file, it's not there!
pass
removal_queue.sort() # sort items for removal, ascending, so oldest first
# Try to remove the oldest half of the cache.
for _, entry_to_rm in removal_queue[: entry_limit // 2]:
# cache bucket was removed by another process -- that's fine; move on
if not entry_to_rm.exists():
continue
try:
with self.write_transaction(entry_to_rm, timeout=1e-6):
self._safe_remove(entry_to_rm)
except lk.LockTimeoutError:
# if we can't get a lock, it's either
# 1) being read, so it's been used recently, i.e. not a good candidate for LRU,
# 2) it's already being removed by another process, so we don't care, or
# 3) system is busy, but we don't really need to wait just for cache cleanup.
pass # so skip it
[docs]
def cache_entries(self):
"""Generator producing cache entries within a bucket"""
for cache_entry in self.root.iterdir():
# Lockfile starts with "."
# old style concretization cache entries are in directories
if not cache_entry.name.startswith(".") and cache_entry.is_file():
yield cache_entry
def _results_from_cache(self, cache_entry_file: str) -> Union[Result, None]:
"""Returns a Results object from the concretizer cache
Reads the cache hit and uses `Result`'s own deserializer
to produce a new Result object
"""
cache_entry = json.loads(cache_entry_file)
result_json = cache_entry["results"]
return Result.from_dict(result_json)
def _stats_from_cache(self, cache_entry_file: str) -> Union[Dict, None]:
"""Returns concretization statistic from the
concretization associated with the cache.
Deserializes the the json representation of the
statistics covering the cached concretization run
and returns the Python data structures
"""
return json.loads(cache_entry_file)["statistics"]
def _prefix_digest(self, problem: str) -> str:
"""Return the first two characters of, and the full, sha256 of the given asp problem"""
return spack.util.hash.b32_hash(problem)
def _cache_path_from_problem(self, problem: str) -> pathlib.Path:
"""Returns a Path object representing the path to the cache
entry for the given problem where the problem is the sha256 of the given asp problem"""
prefix = self._prefix_digest(problem)
return self.root / prefix
def _safe_remove(self, cache_dir: pathlib.Path) -> bool:
"""Removes cache entries with handling for the case where the entry has been
removed already or there are multiple cache entries in a directory"""
try:
cache_dir.unlink()
return True
except FileNotFoundError:
# That's fine, removal is idempotent
pass
except OSError as e:
# Catch other timing/access related issues
tty.debug(
f"Exception occurred while attempting to remove Concretization Cache entry, {e}"
)
pass
return False
def _lock(self, path: pathlib.Path) -> lk.Lock:
"""Returns a lock over the byte range corresponding to the hash of the asp problem.
``path`` is a path to a file in the cache, and its basename is the hash of the problem.
Args:
path: absolute or relative path to concretization cache entry to be locked
"""
return lk.Lock(
str(self._lockfile),
start=spack.util.hash.base32_prefix_bits(
path.name, spack.util.crypto.bit_length(sys.maxsize)
),
length=1,
desc=f"Concretization cache lock for {path}",
)
[docs]
def read_transaction(
self, path: pathlib.Path, timeout: Optional[float] = None
) -> lk.ReadTransaction:
"""Read transactions for concretization cache entries.
Args:
path: absolute or relative path to the concretization cache entry to be locked
timeout: give up after this many seconds
"""
return lk.ReadTransaction(self._lock(path), timeout=timeout)
[docs]
def write_transaction(
self, path: pathlib.Path, timeout: Optional[float] = None
) -> lk.WriteTransaction:
"""Write transactions for concretization cache entries
Args:
path: absolute or relative path to the concretization cache entry to be locked
timeout: give up after this many seconds
"""
return lk.WriteTransaction(self._lock(path), timeout=timeout)
[docs]
def store(self, problem: str, result: Result, statistics: List) -> None:
"""Creates entry in concretization cache for problem if none exists,
storing the concretization Result object and statistics in the cache
as serialized json joined as a single file.
Hash membership is computed based on the sha256 of the provided asp
problem.
"""
cache_path = self._cache_path_from_problem(problem)
with self.write_transaction(cache_path, timeout=30):
if cache_path.exists():
# if cache path file exists, we already have a cache entry, likely created
# by another process. Exit early.
return
with gzip.open(cache_path, "xb", compresslevel=6) as cache_entry:
cache_dict = {"results": result.to_dict(), "statistics": statistics}
cache_entry.write(json.dumps(cache_dict).encode())
[docs]
def fetch(self, problem: str) -> Union[Tuple[Result, Dict], Tuple[None, None]]:
"""Returns the concretization cache result for a lookup based on the given problem.
Checks the concretization cache for the given problem, and either returns the
Python objects cached on disk representing the concretization results and statistics
or returns none if no cache entry was found.
"""
cache_path = self._cache_path_from_problem(problem)
if not cache_path.exists():
return None, None # if exists is false, then there's no chance of a hit
cache_content = None
try:
with self.read_transaction(cache_path, timeout=2):
try:
with gzip.open(cache_path, "rb", compresslevel=6) as f:
f.peek(1) # Try to read at least one byte
f.seek(0)
cache_content = f.read().decode("utf-8")
except OSError:
# Cache may have been created pre compression check if gzip, and if not,
# read from plaintext otherwise re raise
with open(cache_path, "rb") as f:
# raise if this is a gzip file we failed to open
if GZipFileType().matches_magic(f):
raise
cache_content = f.read().decode()
except FileNotFoundError:
pass # cache miss, already cleaned up
except lk.LockTimeoutError:
pass # if the lock times, out skip the cache
if not cache_content:
return None, None
# update mod/access time for use w/ LRU cleanup
os.utime(cache_path)
return (self._results_from_cache(cache_content), self._stats_from_cache(cache_content)) # type: ignore
def _is_checksummed_git_version(v):
return isinstance(v, vn.GitVersion) and v.is_commit
def _is_checksummed_version(version_info: Tuple[GitOrStandardVersion, dict]):
"""Returns true iff the version is not a moving target"""
version, info = version_info
if isinstance(version, vn.StandardVersion):
if any(h in info for h in spack.util.crypto.hashes.keys()) or "checksum" in info:
return True
return "commit" in info and len(info["commit"]) == 40
return _is_checksummed_git_version(version)
def _spec_with_default_name(spec_str, name):
"""Return a spec with a default name if none is provided, used for requirement specs"""
spec = spack.spec.Spec(spec_str)
if not spec.name:
spec.name = name
return spec
[docs]
class ErrorHandler:
def __init__(self, model, input_specs: List[spack.spec.Spec]):
self.model = model
self.input_specs = input_specs
self.full_model = None
[docs]
def multiple_values_error(self, attribute, pkg):
return f'Cannot select a single "{attribute}" for package "{pkg}"'
[docs]
def no_value_error(self, attribute, pkg):
return f'Cannot select a single "{attribute}" for package "{pkg}"'
def _get_cause_tree(
self,
cause: Tuple[str, str],
conditions: Dict[str, str],
condition_causes: List[Tuple[Tuple[str, str], Tuple[str, str]]],
seen: Set,
indent: str = " ",
) -> List[str]:
"""
Implementation of recursion for self.get_cause_tree. Much of this operates on tuples
(condition_id, set_id) in which the latter idea means that the condition represented by
the former held in the condition set represented by the latter.
"""
seen.add(cause)
parents = [c for e, c in condition_causes if e == cause and c not in seen]
local = f"required because {conditions[cause[0]]} "
return [indent + local] + [
c
for parent in parents
for c in self._get_cause_tree(
parent, conditions, condition_causes, seen, indent=indent + " "
)
]
[docs]
def get_cause_tree(self, cause: Tuple[str, str]) -> List[str]:
"""
Get the cause tree associated with the given cause.
Arguments:
cause: The root cause of the tree (final condition)
Returns:
A list of strings describing the causes, formatted to display tree structure.
"""
conditions: Dict[str, str] = dict(extract_args(self.full_model, "condition_reason"))
condition_causes: List[Tuple[Tuple[str, str], Tuple[str, str]]] = list(
((Effect, EID), (Cause, CID))
for Effect, EID, Cause, CID in extract_args(self.full_model, "condition_cause")
)
return self._get_cause_tree(cause, conditions, condition_causes, set())
[docs]
def handle_error(self, msg, *args):
"""Handle an error state derived by the solver."""
if msg == "multiple_values_error":
return self.multiple_values_error(*args)
if msg == "no_value_error":
return self.no_value_error(*args)
try:
idx = args.index("startcauses")
except ValueError:
msg_args = args
causes = []
else:
msg_args = args[:idx]
cause_args = args[idx + 1 :]
cause_args_conditions = cause_args[::2]
cause_args_ids = cause_args[1::2]
causes = list(zip(cause_args_conditions, cause_args_ids))
msg = msg.format(*msg_args)
# For variant formatting, we sometimes have to construct specs
# to format values properly. Find/replace all occurrences of
# Spec(...) with the string representation of the spec mentioned
specs_to_construct = re.findall(r"Spec\(([^)]*)\)", msg)
for spec_str in specs_to_construct:
msg = msg.replace(f"Spec({spec_str})", str(spack.spec.Spec(spec_str)))
for cause in set(causes):
for c in self.get_cause_tree(cause):
msg += f"\n{c}"
return msg
[docs]
def message(self, errors) -> str:
input_specs = ", ".join(elide_list([f"`{s}`" for s in self.input_specs], 5))
header = f"failed to concretize {input_specs} for the following reasons:"
messages = (
f" {idx + 1:2}. {self.handle_error(msg, *args)}"
for idx, (_, msg, args) in enumerate(errors)
)
return "\n".join((header, *messages))
[docs]
def raise_if_errors(self):
initial_error_args = extract_args(self.model, "error")
if not initial_error_args:
return
error_causation = clingo().Control()
parent_dir = pathlib.Path(__file__).parent
errors_lp = parent_dir / "error_messages.lp"
def on_model(model):
self.full_model = model.symbols(shown=True, terms=True)
with error_causation.backend() as backend:
for atom in self.model:
atom_id = backend.add_atom(atom)
backend.add_rule([atom_id], [], choice=False)
error_causation.load(str(errors_lp))
error_causation.ground([("base", []), ("error_messages", [])])
_ = error_causation.solve(on_model=on_model)
# No choices so there will be only one model
error_args = extract_args(self.full_model, "error")
errors = sorted(
[(int(priority), msg, args) for priority, msg, *args in error_args], reverse=True
)
try:
msg = self.message(errors)
except Exception as e:
msg = (
f"unexpected error during concretization [{str(e)}]. "
f"Please report a bug at https://github.com/spack/spack/issues"
)
raise spack.error.SpackError(msg) from e
raise UnsatisfiableSpecError(msg)
[docs]
class PyclingoDriver:
def __init__(self, conc_cache: Optional[ConcretizationCache] = None) -> None:
"""Driver for the Python clingo interface.
Args:
conc_cache: concretization cache
"""
# This attribute will be reset at each call to solve
self.control: Any = None # TODO: fix typing of dynamic clingo import
self._conc_cache = conc_cache
def _control_file_paths(self, control_files: List[str]) -> List[str]:
"""Get absolute paths based on relative paths of control files.
Right now the control files just live next to this file in the Spack tree.
"""
parent_dir = os.path.dirname(__file__)
return [os.path.join(parent_dir, rel_path) for rel_path in control_files]
def _make_cache_key(self, asp_problem: List[str], control_file_paths: List[str]) -> str:
"""Make a key for fetching a solve from the concretization cache.
A key comprises the entire input to clingo, i.e., the problem instance plus the
control files. The problem instance is assumed to already be sorted and stripped of
comments and empty lines.
The control files are stripped but not sorted, so changes to the control files will cause
cache misses if they modify any code.
Arguments:
asp_problem: list of statements in the ASP program
control_file_paths: list of paths to control files we'll send to clingo
"""
lines = list(asp_problem)
for path in control_file_paths:
with open(path, "r", encoding="utf-8") as f:
lines.extend(strip_asp_problem(f.readlines()))
return "\n".join(lines)
def _run_clingo(
self,
specs: List[spack.spec.Spec],
setup: "SpackSolverSetup",
problem_str: str,
control_file_paths: List[str],
timer: spack.util.timer.Timer,
) -> Result:
"""Actually run clingo and generate a result.
This is the core solve logic once the setup is done and once we know we can't
fetch a result from cache. See ``solve()`` for caching and setup logic.
"""
# We could just take the cache_key and add it to clingo (since it is the
# full problem representation), but we load control files separately as it
# makes clingo give us better, file-aware error messages.
with timer.measure("load"):
# Add the problem instance
self.control.add("base", [], problem_str)
# Load additinoal files
for path in control_file_paths:
self.control.load(path)
# Grounding is the first step in the solve -- it turns our facts
# and first-order logic rules into propositional logic.
with timer.measure("ground"):
self.control.ground([("base", [])])
# With a grounded program, we can run the solve.
models = [] # stable models if things go well
def on_model(model):
models.append((model.cost, model.symbols(shown=True, terms=True)))
timer.start("solve")
# A timeout of 0 means no timeout
time_limit = spack.config.CONFIG.get("concretizer:timeout", 0)
timeout_end = time.monotonic() + time_limit if time_limit > 0 else float("inf")
error_on_timeout = spack.config.CONFIG.get("concretizer:error_on_timeout", True)
with self.control.solve(on_model=on_model, async_=True) as handle:
# Allow handling of interrupts every second.
#
# pyclingo's `SolveHandle` blocks the calling thread for the duration of each
# `.wait()` call. Python also requires that signal handlers must be handled in
# the main thread, so any `KeyboardInterrupt` is postponed until after the
# `.wait()` call exits the control of pyclingo.
finished = False
while not finished and time.monotonic() < timeout_end:
finished = handle.wait(1.0)
if not finished:
specs_str = ", ".join(spack.llnl.util.lang.elide_list([str(s) for s in specs], 4))
header = f"Spack is taking more than {time_limit} seconds to solve for {specs_str}"
if error_on_timeout:
raise UnsatisfiableSpecError(f"{header}, stopping concretization")
warnings.warn(f"{header}, using the best configuration found so far")
handle.cancel()
solve_result = handle.get()
timer.stop("solve")
# once done, construct the solve result
result = Result(specs)
result.satisfiable = solve_result.satisfiable
if result.satisfiable:
timer.start("construct_specs")
# get the best model
builder = SpecBuilder(specs, hash_lookup=setup.reusable_and_possible)
min_cost, best_model = min(models)
# first check for errors
error_handler = ErrorHandler(best_model, specs)
error_handler.raise_if_errors()
# build specs from spec attributes in the model
spec_attrs = [(name, tuple(rest)) for name, *rest in extract_args(best_model, "attr")]
answers = builder.build_specs(spec_attrs)
# add best spec to the results
result.answers.append((list(min_cost), 0, answers))
# get optimization criteria
criteria_args = extract_args(best_model, "opt_criterion")
result.criteria = build_criteria_names(min_cost, criteria_args)
# record the number of models the solver considered
result.nmodels = len(models)
# record the possible dependencies in the solve
result.possible_dependencies = setup.pkgs
timer.stop("construct_specs")
timer.stop()
result.raise_if_unsat()
if result.satisfiable and result.unsolved_specs and setup.concretize_everything:
raise OutputDoesNotSatisfyInputError(result.unsolved_specs)
return result
[docs]
def solve(
self,
setup: "SpackSolverSetup",
specs: List[spack.spec.Spec],
reuse: Optional[List[spack.spec.Spec]] = None,
packages_with_externals=None,
output: Optional[OutputConfiguration] = None,
control: Optional[Any] = None, # TODO: figure out how to annotate clingo.Control
allow_deprecated: bool = False,
) -> Tuple[Result, Optional[spack.util.timer.Timer], Optional[Dict]]:
"""Set up the input and solve for dependencies of ``specs``.
Arguments:
setup: An object to set up the ASP problem.
specs: List of ``Spec`` objects to solve for.
reuse: list of concrete specs that can be reused
output: configuration object to set the output of this solve.
control: configuration for the solver. If None, default values will be used
allow_deprecated: if True, allow deprecated versions in the solve
Return:
A tuple of the solve result, the timer for the different phases of the
solve, and the internal statistics from clingo.
"""
from spack.bootstrap import ensure_winsdk_external_or_raise
output = output or DEFAULT_OUTPUT_CONFIGURATION
timer = spack.util.timer.Timer()
# Initialize the control object for the solver
self.control = control or default_clingo_control()
# ensure core deps are present on Windows
# needs to modify active config scope, so cannot be run within
# bootstrap config scope
if sys.platform == "win32":
ensure_winsdk_external_or_raise()
# assemble a list of the control files needed for this problem. Some are conditionally
# included depending on what features we're using in the solve.
control_files = ["concretize.lp", "heuristic.lp", "display.lp", "direct_dependency.lp"]
if not setup.concretize_everything:
control_files.append("when_possible.lp")
if using_libc_compatibility():
control_files.append("libc_compatibility.lp")
else:
control_files.append("os_compatibility.lp")
if setup.enable_splicing:
control_files.append("splices.lp")
timer.start("setup")
problem_builder = setup.setup(
specs,
reuse=reuse,
packages_with_externals=packages_with_externals,
allow_deprecated=allow_deprecated,
)
timer.stop("setup")
timer.start("ordering")
# print the output with comments, etc. if the user asked
problem = problem_builder.asp_problem
if output.out is not None:
output.out.write("\n".join(problem))
if output.setup_only:
return Result(specs), None, None
# strip the problem of comments and empty lines
problem = strip_asp_problem(problem)
randomize = "SPACK_SOLVER_RANDOMIZATION" in os.environ
if randomize:
# create a shuffled copy -- useful for understanding performance variation
problem = random.sample(problem, len(problem))
else:
problem.sort() # sort for deterministic output
timer.stop("ordering")
timer.start("cache-check")
# load control files to add to the input representation
control_file_paths = self._control_file_paths(control_files)
cache_key = self._make_cache_key(problem, control_file_paths)
result, concretization_stats = None, None
conc_cache_enabled = spack.config.get("concretizer:concretization_cache:enable", False)
if conc_cache_enabled and self._conc_cache:
result, concretization_stats = self._conc_cache.fetch(cache_key)
timer.stop("cache-check")
tty.debug("Starting concretizer")
# run the solver and store the result, if it wasn't cached already
if not result:
problem_repr = "\n".join(problem)
result = self._run_clingo(specs, setup, problem_repr, control_file_paths, timer)
if conc_cache_enabled and self._conc_cache:
self._conc_cache.store(cache_key, result, self.control.statistics)
if output.timers:
timer.write_tty()
print()
concretization_stats = concretization_stats or self.control.statistics
if output.stats:
print("Statistics:")
pprint.pprint(concretization_stats)
return result, timer, concretization_stats
[docs]
class ConcreteSpecsByHash(collections.abc.Mapping):
"""Mapping containing concrete specs keyed by DAG hash.
The mapping is ensured to be consistent, i.e. if a spec in the mapping has a dependency with
hash X, it is ensured to be the same object in memory as the spec keyed by X.
"""
def __init__(self) -> None:
self.data: Dict[str, spack.spec.Spec] = {}
self.explicit: Set[str] = set()
def __getitem__(self, dag_hash: str) -> spack.spec.Spec:
return self.data[dag_hash]
[docs]
def explicit_items(self) -> Iterator[Tuple[str, spack.spec.Spec]]:
"""Iterate on items that have been added explicitly, and not just as a dependency
of other nodes.
"""
for h, s in self.items():
# We need to make an exception for gcc-runtime, until we can splice it.
if h in self.explicit or s.name == "gcc-runtime":
yield h, s
[docs]
def add(self, spec: spack.spec.Spec) -> bool:
"""Adds a new concrete spec to the mapping. Returns True if the spec was just added,
False if the spec was already in the mapping.
Calling this function marks the spec as added explicitly.
Args:
spec: spec to be added
Raises:
ValueError: if the spec is not concrete
"""
if not spec.concrete:
msg = (
f"trying to store the non-concrete spec '{spec}' in a container "
f"that only accepts concrete"
)
raise ValueError(msg)
dag_hash = spec.dag_hash()
self.explicit.add(dag_hash)
if dag_hash in self.data:
return False
# Here we need to iterate on the input and rewire the copy.
self.data[spec.dag_hash()] = spec.copy(deps=False)
nodes_to_reconstruct = [spec]
while nodes_to_reconstruct:
input_parent = nodes_to_reconstruct.pop()
container_parent = self.data[input_parent.dag_hash()]
for edge in input_parent.edges_to_dependencies():
input_child = edge.spec
container_child = self.data.get(input_child.dag_hash())
# Copy children that don't exist yet
if container_child is None:
container_child = input_child.copy(deps=False)
self.data[input_child.dag_hash()] = container_child
nodes_to_reconstruct.append(input_child)
# Rewire edges
container_parent.add_dependency_edge(
dependency_spec=container_child, depflag=edge.depflag, virtuals=edge.virtuals
)
return True
def __len__(self) -> int:
return len(self.data)
def __iter__(self):
return iter(self.data)
# types for condition caching in solver setup
ConditionSpecKey = Tuple[str, Optional[TransformFunction]]
ConditionIdFunctionPair = Tuple[int, List[AspFunction]]
ConditionSpecCache = Dict[str, Dict[ConditionSpecKey, ConditionIdFunctionPair]]
[docs]
class ConstraintOrigin(enum.Enum):
"""Generates identifiers that can be passed into the solver attached
to constraints, and then later retrieved to determine the origin of
those constraints when ``SpecBuilder`` creates Specs from the solve
result.
"""
CONDITIONAL_SPEC = 0
DEPENDS_ON = 1
REQUIRE = 2
@staticmethod
def _SUFFIXES() -> Dict["ConstraintOrigin", str]:
return {
ConstraintOrigin.CONDITIONAL_SPEC: "_cond",
ConstraintOrigin.DEPENDS_ON: "_dep",
ConstraintOrigin.REQUIRE: "_req",
}
[docs]
@staticmethod
def append_type_suffix(pkg_id: str, kind: "ConstraintOrigin") -> str:
"""Given a package identifier and a constraint kind, generate a string ID."""
suffix = ConstraintOrigin._SUFFIXES()[kind]
return f"{pkg_id}{suffix}"
[docs]
@staticmethod
def strip_type_suffix(source: str) -> Tuple[int, Optional[str]]:
"""Take a combined package/type ID generated by
``append_type_suffix``, and extract the package ID and
an associated weight.
"""
if not source:
return -1, None
for kind, suffix in ConstraintOrigin._SUFFIXES().items():
if source.endswith(suffix):
return kind.value, source[: -len(suffix)]
return -1, source
[docs]
class ConditionIdContext(SourceContext):
"""Derived from a ``ConditionContext``: for clause-sets generated by
imposed/required specs, stores an associated transform.
This is primarily used for tracking whether we are generating clauses
in the context of a required spec, or for an imposed spec.
Is not a subclass of ``ConditionContext`` because it exists in a
lower-level context with less information.
"""
def __init__(self):
super().__init__()
self.transform: Optional[TransformFunction] = None
[docs]
class ConditionContext(SourceContext):
"""Tracks context in which a condition (i.e. ``SpackSolverSetup.condition``)
is generated (e.g. for a ``depends_on``).
This may modify the required/imposed specs generated as relevant
for the context.
"""
def __init__(self):
super().__init__()
# transformation applied to facts from the required spec. Defaults
# to leave facts as they are.
self.transform_required: Optional[TransformFunction] = None
# transformation applied to facts from the imposed spec. Defaults
# to removing "node" and "virtual_node" facts.
self.transform_imposed: Optional[TransformFunction] = None
# Whether to wrap direct dependency facts as node requirements,
# imposed by the parent. If None, the default is used, which is:
# - wrap head of rules
# - do not wrap body of rules
self.wrap_node_requirement: Optional[bool] = None
[docs]
def requirement_context(self) -> ConditionIdContext:
ctxt = ConditionIdContext()
ctxt.source = self.source
ctxt.transform = self.transform_required
ctxt.wrap_node_requirement = self.wrap_node_requirement
return ctxt
[docs]
def impose_context(self) -> ConditionIdContext:
ctxt = ConditionIdContext()
ctxt.source = self.source
ctxt.transform = self.transform_imposed
ctxt.wrap_node_requirement = self.wrap_node_requirement
return ctxt
[docs]
class SpackSolverSetup:
"""Class to set up and run a Spack concretization solve."""
gen: "ProblemInstanceBuilder"
possible_versions: Dict[str, Dict[GitOrStandardVersion, List[Provenance]]]
def __init__(self, tests: spack.concretize.TestsType = False):
self.possible_graph = create_graph_analyzer()
# these are all initialized in setup()
self.requirement_parser = RequirementParser(spack.config.CONFIG)
self.possible_virtuals: Set[str] = set()
# pkg_name -> version -> list of possible origins (package.py, installed, etc.)
self.possible_versions = collections.defaultdict(lambda: collections.defaultdict(list))
self.versions_from_yaml: Dict[str, List[GitOrStandardVersion]] = {}
self.git_commit_versions: Dict[str, Dict[GitOrStandardVersion, str]] = (
collections.defaultdict(dict)
)
self.deprecated_versions: Dict[str, Set[GitOrStandardVersion]] = collections.defaultdict(
set
)
self.possible_compilers: List[spack.spec.Spec] = []
self.rejected_compilers: Set[spack.spec.Spec] = set()
self.possible_oses: Set = set()
self.variant_values_from_specs: Set = set()
self.version_constraints: Dict[str, Set] = collections.defaultdict(set)
self.target_constraints: Set = set()
self.default_targets: List = []
self.variant_ids_by_def_id: Dict[int, int] = {}
self.reusable_and_possible: ConcreteSpecsByHash = ConcreteSpecsByHash()
self._id_counter: Iterator[int] = itertools.count()
self._trigger_cache: ConditionSpecCache = collections.defaultdict(dict)
self._effect_cache: ConditionSpecCache = collections.defaultdict(dict)
# Caches to optimize the setup phase of the solver
self.target_specs_cache = None
# whether to add installed/binary hashes to the solve
self.tests = tests
# If False allows for input specs that are not solved
self.concretize_everything = True
# Set during the call to setup
self.pkgs: Set[str] = set()
self.explicitly_required_namespaces: Dict[str, str] = {}
# list of unique libc specs targeted by compilers (or an educated guess if no compiler)
self.libcs: List[spack.spec.Spec] = []
# If true, we have to load the code for synthesizing splices
self.enable_splicing: bool = spack.config.CONFIG.get("concretizer:splice:automatic")
[docs]
def pkg_version_rules(self, pkg: Type[spack.package_base.PackageBase]) -> None:
"""Declares known versions, their origins, and their weights."""
version_provenance = self.possible_versions[pkg.name]
ordered_versions = spack.package_base.sort_by_pkg_preference(
self.possible_versions[pkg.name], pkg=pkg
)
# Account for preferences in packages.yaml, if any
if pkg.name in self.versions_from_yaml:
ordered_versions = list(
spack.llnl.util.lang.dedupe(self.versions_from_yaml[pkg.name] + ordered_versions)
)
# Set the deprecation penalty, according to the package. This should be enough to move the
# first version last if deprecated.
if ordered_versions:
self.gen.fact(
fn.pkg_fact(pkg.name, fn.version_deprecation_penalty(len(ordered_versions)))
)
for weight, declared_version in enumerate(ordered_versions):
self.gen.fact(fn.pkg_fact(pkg.name, fn.version_declared(declared_version, weight)))
for origin in version_provenance[declared_version]:
self.gen.fact(
fn.pkg_fact(pkg.name, fn.version_origin(declared_version, str(origin)))
)
for v in self.possible_versions[pkg.name]:
if pkg.needs_commit(v):
commit = pkg.version_or_package_attr("commit", v, "")
self.git_commit_versions[pkg.name][v] = commit
# Declare deprecated versions for this package, if any
deprecated = self.deprecated_versions[pkg.name]
for v in sorted(deprecated):
self.gen.fact(fn.pkg_fact(pkg.name, fn.deprecated_version(v)))
[docs]
def spec_versions(
self, spec: spack.spec.Spec, *, name: Optional[str] = None
) -> List[AspFunction]:
"""Return list of clauses expressing spec's version constraints."""
name = spec.name or name
assert name, "Internal Error: spec with no name occurred. Please file an issue."
if spec.concrete:
return [fn.attr("version", name, spec.version)]
if spec.versions == vn.any_version:
return []
# record all version constraints for later
self.version_constraints[name].add(spec.versions)
return [fn.attr("node_version_satisfies", name, spec.versions)]
[docs]
def target_ranges(
self, spec: spack.spec.Spec, single_target_fn, *, name: Optional[str] = None
) -> List[AspFunction]:
name = spec.name or name
assert name, "Internal Error: spec with no name occurred. Please file an issue."
target = spec.architecture.target
# Check if the target is a concrete target
if str(target) in spack.vendor.archspec.cpu.TARGETS:
return [single_target_fn(name, target)]
self.target_constraints.add(target)
return [fn.attr("node_target_satisfies", name, target)]
[docs]
def conflict_rules(self, pkg):
for when_spec, conflict_specs in pkg.conflicts.items():
when_spec_msg = f"conflict constraint {when_spec}"
when_spec_id = self.condition(when_spec, required_name=pkg.name, msg=when_spec_msg)
when_spec_str = str(when_spec)
for conflict_spec, conflict_msg in conflict_specs:
conflict_spec_str = str(conflict_spec)
if conflict_msg is None:
conflict_msg = f"{pkg.name}: "
if not when_spec_str:
conflict_msg += f"conflicts with '{conflict_spec_str}'"
else:
conflict_msg += f"'{conflict_spec_str}' conflicts with '{when_spec_str}'"
if not conflict_spec_str:
conflict_spec_msg = f"conflict is triggered when {pkg.name}"
else:
conflict_spec_msg = f"conflict is triggered when {conflict_spec_str}"
conflict_spec_id = self.condition(
conflict_spec,
required_name=conflict_spec.name or pkg.name,
msg=conflict_spec_msg,
)
self.gen.fact(
fn.pkg_fact(
pkg.name, fn.conflict(conflict_spec_id, when_spec_id, conflict_msg)
)
)
self.gen.newline()
[docs]
def config_compatible_os(self):
"""Facts about compatible os's specified in configs"""
self.gen.h2("Compatible OS from concretizer config file")
os_data = spack.config.get("concretizer:os_compatible", {})
for recent, reusable in os_data.items():
for old in reusable:
self.gen.fact(fn.os_compatible(recent, old))
self.gen.newline()
[docs]
def package_requirement_rules(self, pkg):
self.emit_facts_from_requirement_rules(self.requirement_parser.rules(pkg))
[docs]
def pkg_rules(self, pkg, tests):
pkg = self.pkg_class(pkg)
# Namespace of the package
self.gen.fact(fn.pkg_fact(pkg.name, fn.namespace(pkg.namespace)))
# versions
self.pkg_version_rules(pkg)
self.gen.newline()
# variants
self.variant_rules(pkg)
# conflicts
self.conflict_rules(pkg)
# virtuals
self.package_provider_rules(pkg)
# dependencies
self.package_dependencies_rules(pkg)
# splices
if self.enable_splicing:
self.package_splice_rules(pkg)
self.package_requirement_rules(pkg)
[docs]
def trigger_rules(self):
"""Flushes all the trigger rules collected so far, and clears the cache."""
if not self._trigger_cache:
return
self.gen.h2("Trigger conditions")
for name in self._trigger_cache:
cache = self._trigger_cache[name]
for (spec_str, _), (trigger_id, requirements) in cache.items():
self.gen.fact(fn.pkg_fact(name, fn.trigger_id(trigger_id)))
self.gen.fact(fn.pkg_fact(name, fn.trigger_msg(spec_str)))
for predicate in requirements:
self.gen.fact(fn.condition_requirement(trigger_id, *predicate.args))
self.gen.newline()
self._trigger_cache.clear()
[docs]
def effect_rules(self):
"""Flushes all the effect rules collected so far, and clears the cache."""
if not self._effect_cache:
return
self.gen.h2("Imposed requirements")
for name in sorted(self._effect_cache):
cache = self._effect_cache[name]
for (spec_str, _), (effect_id, requirements) in cache.items():
self.gen.fact(fn.pkg_fact(name, fn.effect_id(effect_id)))
self.gen.fact(fn.pkg_fact(name, fn.effect_msg(spec_str)))
for predicate in requirements:
self.gen.fact(fn.imposed_constraint(effect_id, *predicate.args))
self.gen.newline()
self._effect_cache.clear()
[docs]
def define_variant(
self,
pkg: Type[spack.package_base.PackageBase],
name: str,
when: spack.spec.Spec,
variant_def: vt.Variant,
):
pkg_fact = lambda f: self.gen.fact(fn.pkg_fact(pkg.name, f))
# Every variant id has a unique definition (conditional or unconditional), and
# higher variant id definitions take precedence when variants intersect.
vid = next(self._id_counter)
# used to find a variant id from its variant definition (for variant values on specs)
self.variant_ids_by_def_id[id(variant_def)] = vid
if when == EMPTY_SPEC:
# unconditional variant
pkg_fact(fn.variant_definition(name, vid))
else:
# conditional variant
msg = f"Package {pkg.name} has variant '{name}' when {when}"
cond_id = self.condition(when, required_name=pkg.name, msg=msg)
pkg_fact(fn.variant_condition(name, vid, cond_id))
# record type so we can construct the variant when we read it back in
self.gen.fact(fn.variant_type(vid, variant_def.variant_type.string))
if variant_def.sticky:
pkg_fact(fn.variant_sticky(vid))
# Get the default values for this variant definition as a tuple
default_values: Tuple[Union[bool, str], ...] = (variant_def.default,)
if variant_def.multi:
default_values = variant_def.make_default().values
for val in default_values:
pkg_fact(fn.variant_default_value_from_package_py(vid, val))
# Deal with variants that use validator functions
if variant_def.values_defined_by_validator():
for penalty, value in enumerate(default_values, 1):
pkg_fact(fn.variant_possible_value(vid, value))
pkg_fact(fn.variant_penalty(vid, value, penalty))
self.gen.newline()
return
values = variant_def.values or default_values
# If we deal with disjoint sets of values, define the sets
if isinstance(values, vt.DisjointSetsOfValues):
for sid, s in enumerate(values.sets):
for value in s:
pkg_fact(fn.variant_value_from_disjoint_sets(vid, value, sid))
# Define penalties. Put default values first, otherwise keep the order
penalty = 1
for v in default_values:
pkg_fact(fn.variant_penalty(vid, v, penalty))
penalty += 1
for v in values:
if v not in default_values:
pkg_fact(fn.variant_penalty(vid, v, penalty))
penalty += 1
# Deal with conditional values
for value in values:
if not isinstance(value, vt.ConditionalValue):
continue
# make a spec indicating whether the variant has this conditional value
variant_has_value = spack.spec.Spec()
variant_has_value.variants[name] = vt.VariantValue(
vt.VariantType.MULTI, name, (value.value,)
)
if value.when:
# the conditional value is always "possible", but it imposes its when condition as
# a constraint if the conditional value is taken. This may seem backwards, but it
# ensures that the conditional can only occur when its condition holds.
self.condition(
required_spec=variant_has_value,
imposed_spec=value.when,
required_name=pkg.name,
imposed_name=pkg.name,
msg=f"{pkg.name} variant {name} has value '{value.value}' when {value.when}",
)
else:
vstring = f"{name}='{value.value}'"
# We know the value is never allowed statically (when was None), but we can't just
# ignore it b/c it could come in as a possible value and we need a good error msg.
# So, it's a conflict -- if the value is somehow used, it'll trigger an error.
trigger_id = self.condition(
variant_has_value,
required_name=pkg.name,
msg=f"invalid variant value: {vstring}",
)
constraint_id = self.condition(
EMPTY_SPEC, required_name=pkg.name, msg="empty (total) conflict constraint"
)
msg = f"variant value {vstring} is conditionally disabled"
pkg_fact(fn.conflict(trigger_id, constraint_id, msg))
self.gen.newline()
[docs]
def define_auto_variant(self, name: str, multi: bool):
self.gen.h3(f"Special variant: {name}")
vid = next(self._id_counter)
self.gen.fact(fn.auto_variant(name, vid))
self.gen.fact(
fn.variant_type(
vid, vt.VariantType.MULTI.value if multi else vt.VariantType.SINGLE.value
)
)
[docs]
def variant_rules(self, pkg: Type[spack.package_base.PackageBase]):
for name in pkg.variant_names():
self.gen.h3(f"Variant {name} in package {pkg.name}")
for when, variant_def in pkg.variant_definitions(name):
self.define_variant(pkg, name, when, variant_def)
def _get_condition_id(
self,
name: str,
cond: spack.spec.Spec,
cache: ConditionSpecCache,
body: bool,
context: ConditionIdContext,
) -> int:
"""Get the id for one half of a condition (either a trigger or an imposed constraint).
Construct a key from the condition spec and any associated transformation, and
cache the ASP functions that they imply. The saved functions will be output
later in ``trigger_rules()`` and ``effect_rules()``.
Returns:
The id of the cached trigger or effect.
"""
pkg_cache = cache[name]
cond_str = str(cond) if cond.name else f"{name} {cond}"
named_cond_key = (cond_str, context.transform)
result = pkg_cache.get(named_cond_key)
if result:
return result[0]
cond_id = next(self._id_counter)
requirements = self.spec_clauses(cond, name=name, body=body, context=context)
if context.transform:
requirements = context.transform(name, cond, requirements)
pkg_cache[named_cond_key] = (cond_id, requirements)
return cond_id
def _condition_clauses(
self,
required_spec: spack.spec.Spec,
imposed_spec: Optional[spack.spec.Spec] = None,
*,
required_name: Optional[str] = None,
imposed_name: Optional[str] = None,
msg: Optional[str] = None,
context: Optional[ConditionContext] = None,
) -> Tuple[List[AspFunction], int]:
clauses = []
required_name = required_spec.name or required_name
if not required_name:
raise ValueError(f"Must provide a name for anonymous condition: '{required_spec}'")
if not context:
context = ConditionContext()
context.transform_imposed = remove_facts("node", "virtual_node")
# Check if we can emit the requirements before updating the condition ID counter.
# In this way, if a condition can't be emitted but the exception is handled in the
# caller, we won't emit partial facts.
condition_id = next(self._id_counter)
requirement_context = context.requirement_context()
trigger_id = self._get_condition_id(
required_name,
required_spec,
cache=self._trigger_cache,
body=True,
context=requirement_context,
)
clauses.append(fn.pkg_fact(required_name, fn.condition(condition_id)))
clauses.append(fn.condition_reason(condition_id, msg))
clauses.append(fn.pkg_fact(required_name, fn.condition_trigger(condition_id, trigger_id)))
if not imposed_spec:
return clauses, condition_id
imposed_name = imposed_spec.name or imposed_name
if not imposed_name:
raise ValueError(f"Must provide a name for imposed constraint: '{imposed_spec}'")
impose_context = context.impose_context()
effect_id = self._get_condition_id(
imposed_name,
imposed_spec,
cache=self._effect_cache,
body=False,
context=impose_context,
)
clauses.append(fn.pkg_fact(required_name, fn.condition_effect(condition_id, effect_id)))
return clauses, condition_id
[docs]
def condition(
self,
required_spec: spack.spec.Spec,
imposed_spec: Optional[spack.spec.Spec] = None,
*,
required_name: Optional[str] = None,
imposed_name: Optional[str] = None,
msg: Optional[str] = None,
context: Optional[ConditionContext] = None,
) -> int:
"""Generate facts for a dependency or virtual provider condition.
Arguments:
required_spec: the constraints that triggers this condition
imposed_spec: the constraints that are imposed when this condition is triggered
required_name: name for ``required_spec``
(required if required_spec is anonymous, ignored if not)
imposed_name: name for ``imposed_spec``
(required if imposed_spec is anonymous, ignored if not)
msg: description of the condition
context: if provided, indicates how to modify the clause-sets for the required/imposed
specs based on the type of constraint they are generated for (e.g. ``depends_on``)
Returns:
int: id of the condition created by this function
"""
clauses, condition_id = self._condition_clauses(
required_spec=required_spec,
imposed_spec=imposed_spec,
required_name=required_name,
imposed_name=imposed_name,
msg=msg,
context=context,
)
for clause in clauses:
self.gen.fact(clause)
return condition_id
[docs]
def package_provider_rules(self, pkg: Type[spack.package_base.PackageBase]) -> None:
for vpkg_name in pkg.provided_virtual_names():
if vpkg_name not in self.possible_virtuals:
continue
self.gen.fact(fn.pkg_fact(pkg.name, fn.possible_provider(vpkg_name)))
for when, provided in pkg.provided.items():
for vpkg in sorted(provided): # type: ignore[type-var]
if vpkg.name not in self.possible_virtuals:
continue
msg = f"{pkg.name} provides {vpkg}{'' if when == EMPTY_SPEC else f' when {when}'}"
condition_id = self.condition(when, vpkg, required_name=pkg.name, msg=msg)
self.gen.fact(
fn.pkg_fact(pkg.name, fn.provider_condition(condition_id, vpkg.name))
)
self.gen.newline()
for when, sets_of_virtuals in pkg.provided_together.items():
condition_id = self.condition(
when, required_name=pkg.name, msg="Virtuals are provided together"
)
for set_id, virtuals_together in enumerate(sorted(sets_of_virtuals)):
for name in sorted(virtuals_together):
self.gen.fact(
fn.pkg_fact(pkg.name, fn.provided_together(condition_id, set_id, name))
)
self.gen.newline()
[docs]
def package_dependencies_rules(self, pkg):
"""Translate ``depends_on`` directives into ASP logic."""
for cond, deps_by_name in pkg.dependencies.items():
cond_str = str(cond)
cond_str_suffix = f" when {cond_str}" if cond_str else ""
for _, dep in deps_by_name.items():
depflag = dep.depflag
# Skip test dependencies if they're not requested
if not self.tests:
depflag &= ~dt.TEST
# ... or if they are requested only for certain packages
elif not isinstance(self.tests, bool) and pkg.name not in self.tests:
depflag &= ~dt.TEST
# if there are no dependency types to be considered
# anymore, don't generate the dependency
if not depflag:
continue
msg = f"{pkg.name} depends on {dep.spec}{cond_str_suffix}"
context = ConditionContext()
context.source = ConstraintOrigin.append_type_suffix(
pkg.name, ConstraintOrigin.DEPENDS_ON
)
context.transform_imposed = dependency_holds(dependency_flags=depflag, pkg_cls=pkg)
self.condition(cond, dep.spec, required_name=pkg.name, msg=msg, context=context)
self.gen.newline()
def _gen_match_variant_splice_constraints(
self,
pkg,
cond_spec: spack.spec.Spec,
splice_spec: spack.spec.Spec,
hash_asp_var: "AspVar",
splice_node,
match_variants: List[str],
):
# If there are no variants to match, no constraints are needed
variant_constraints = []
for i, variant_name in enumerate(match_variants):
vari_defs = pkg.variant_definitions(variant_name)
# the spliceable config of the package always includes the variant
if vari_defs != [] and any(cond_spec.satisfies(s) for (s, _) in vari_defs):
variant = vari_defs[0][1]
if variant.multi:
continue # cannot automatically match multi-valued variants
value_var = AspVar(f"VariValue{i}")
attr_constraint = fn.attr("variant_value", splice_node, variant_name, value_var)
hash_attr_constraint = fn.hash_attr(
hash_asp_var, "variant_value", splice_spec.name, variant_name, value_var
)
variant_constraints.append(attr_constraint)
variant_constraints.append(hash_attr_constraint)
return variant_constraints
[docs]
def package_splice_rules(self, pkg):
self.gen.h2("Splice rules")
for i, (cond, (spec_to_splice, match_variants)) in enumerate(
sorted(pkg.splice_specs.items())
):
self.version_constraints[pkg.name].add(cond.versions)
self.version_constraints[spec_to_splice.name].add(spec_to_splice.versions)
hash_var = AspVar("Hash")
splice_node = fn.node(AspVar("NID"), pkg.name)
when_spec_attrs = [
fn.attr(c.args[0], splice_node, *(c.args[2:]))
for c in self.spec_clauses(cond, name=pkg.name, body=True, required_from=None)
if c.args[0] != "node"
]
splice_spec_hash_attrs = [
fn.hash_attr(hash_var, *(c.args))
for c in self.spec_clauses(spec_to_splice, body=True, required_from=None)
if c.args[0] != "node"
]
if match_variants is None:
variant_constraints = []
elif match_variants == "*":
filt_match_variants = set()
for map in pkg.variants.values():
for k in map:
filt_match_variants.add(k)
filt_match_variants = sorted(filt_match_variants)
variant_constraints = self._gen_match_variant_splice_constraints(
pkg, cond, spec_to_splice, hash_var, splice_node, filt_match_variants
)
else:
if any(v in cond.variants or v in spec_to_splice.variants for v in match_variants):
raise spack.error.PackageError(
"Overlap between match_variants and explicitly set variants"
)
variant_constraints = self._gen_match_variant_splice_constraints(
pkg, cond, spec_to_splice, hash_var, splice_node, match_variants
)
rule_head = fn.abi_splice_conditions_hold(
i, splice_node, spec_to_splice.name, hash_var
)
rule_body_components = (
[
# splice_set_fact,
fn.attr("node", splice_node),
fn.installed_hash(spec_to_splice.name, hash_var),
]
+ when_spec_attrs
+ splice_spec_hash_attrs
+ variant_constraints
)
rule_body = ",\n ".join(str(r) for r in rule_body_components)
rule = f"{rule_head} :-\n {rule_body}."
self.gen.append(rule)
self.gen.newline()
[docs]
def virtual_requirements_and_weights(self):
virtual_preferences = spack.config.CONFIG.get("packages:all:providers", {})
self.gen.h1("Virtual requirements and weights")
for virtual_str in sorted(self.possible_virtuals):
self.gen.newline()
self.gen.h2(f"Virtual: {virtual_str}")
self.gen.fact(fn.virtual(virtual_str))
rules = self.requirement_parser.rules_from_virtual(virtual_str)
if not rules and virtual_str not in virtual_preferences:
continue
required, preferred, removed = [], [], set()
for rule in rules:
# We don't deal with conditional requirements
if rule.condition != EMPTY_SPEC:
continue
if rule.origin == RequirementOrigin.PREFER_YAML:
preferred.extend(x.name for x in rule.requirements if x.name)
elif rule.origin == RequirementOrigin.REQUIRE_YAML:
required.extend(x.name for x in rule.requirements if x.name)
elif rule.origin == RequirementOrigin.CONFLICT_YAML:
conflict_spec = rule.requirements[0]
# For conflicts, we take action only if just a name is used
if spack.spec.Spec(conflict_spec.name).satisfies(conflict_spec):
removed.add(conflict_spec.name)
current_preferences = required + preferred + virtual_preferences.get(virtual_str, [])
current_preferences = [x for x in current_preferences if x not in removed]
for i, provider in enumerate(spack.llnl.util.lang.dedupe(current_preferences)):
provider_name = spack.spec.Spec(provider).name
self.gen.fact(fn.provider_weight_from_config(virtual_str, provider_name, i))
self.gen.newline()
if rules:
self.emit_facts_from_requirement_rules(rules)
self.trigger_rules()
self.effect_rules()
[docs]
def emit_facts_from_requirement_rules(self, rules: List[RequirementRule]):
"""Generate facts to enforce requirements.
Args:
rules: rules for which we want facts to be emitted
"""
for requirement_grp_id, rule in enumerate(rules):
virtual = rule.kind == RequirementKind.VIRTUAL
pkg_name, policy, requirement_grp = rule.pkg_name, rule.policy, rule.requirements
requirement_weight = 0
# Propagated preferences have a higher penalty that normal preferences
weight_multiplier = 2 if rule.origin == RequirementOrigin.INPUT_SPECS else 1
# Write explicitly if a requirement is conditional or not
if rule.condition != EMPTY_SPEC:
msg = f"activate requirement {requirement_grp_id} if {rule.condition} holds"
context = ConditionContext()
context.transform_required = dag_closure_by_deptype
try:
main_condition_id = self.condition(
rule.condition, required_name=pkg_name, msg=msg, context=context
)
except Exception as e:
if rule.kind != RequirementKind.DEFAULT:
raise RuntimeError(
"cannot emit requirements for the solver: " + str(e)
) from e
continue
self.gen.fact(
fn.requirement_conditional(pkg_name, requirement_grp_id, main_condition_id)
)
self.gen.fact(fn.requirement_group(pkg_name, requirement_grp_id))
self.gen.fact(fn.requirement_policy(pkg_name, requirement_grp_id, policy))
if rule.message:
self.gen.fact(fn.requirement_message(pkg_name, requirement_grp_id, rule.message))
self.gen.newline()
for input_spec in requirement_grp:
spec = spack.spec.Spec(input_spec)
spec.replace_hash()
if not spec.name:
spec.name = pkg_name
spec.attach_git_version_lookup()
when_spec = spec
if virtual and spec.name != pkg_name:
when_spec = spack.spec.Spec(f"^[virtuals={pkg_name}] {spec}")
try:
context = ConditionContext()
context.source = ConstraintOrigin.append_type_suffix(
pkg_name, ConstraintOrigin.REQUIRE
)
context.wrap_node_requirement = True
if not virtual:
context.transform_required = remove_facts("depends_on")
context.transform_imposed = remove_facts(
"node", "virtual_node", "depends_on"
)
# else: for virtuals we want to emit "node" and
# "virtual_node" in imposed specs
info_msg = f"{input_spec} is a requirement for package {pkg_name}"
if rule.condition != EMPTY_SPEC:
info_msg += f" when {rule.condition}"
if rule.message:
info_msg += f" ({rule.message})"
member_id = self.condition(
required_spec=when_spec,
imposed_spec=spec,
required_name=pkg_name,
msg=info_msg,
context=context,
)
# Conditions don't handle conditional dependencies directly
# Those are handled separately here
self.generate_conditional_dep_conditions(spec, member_id)
except Exception as e:
# Do not raise if the rule comes from the 'all' subsection, since usability
# would be impaired. If a rule does not apply for a specific package, just
# discard it.
if rule.kind != RequirementKind.DEFAULT:
raise RuntimeError(
"cannot emit requirements for the solver: " + str(e)
) from e
continue
self.gen.fact(fn.requirement_group_member(member_id, pkg_name, requirement_grp_id))
self.gen.fact(
fn.requirement_has_weight(member_id, requirement_weight * weight_multiplier)
)
self.gen.newline()
requirement_weight += 1
[docs]
def external_packages(self, packages_with_externals):
"""Facts on external packages, from packages.yaml and implicit externals."""
self.gen.h1("External packages")
for pkg_name, data in packages_with_externals.items():
if pkg_name == "all":
continue
# This package is not among possible dependencies
if pkg_name not in self.pkgs:
continue
if not data.get("buildable", True):
self.gen.h2(f"External package: {pkg_name}")
self.gen.fact(fn.buildable_false(pkg_name))
[docs]
def preferred_variants(self, pkg_name):
"""Facts on concretization preferences, as read from packages.yaml"""
preferences = spack.package_prefs.PackagePrefs
preferred_variants = preferences.preferred_variants(pkg_name)
if not preferred_variants:
return
self.gen.h2(f"Package preferences: {pkg_name}")
for variant_name in sorted(preferred_variants):
variant = preferred_variants[variant_name]
# perform validation of the variant and values
try:
variant_defs = vt.prevalidate_variant_value(self.pkg_class(pkg_name), variant)
except (vt.InvalidVariantValueError, KeyError, ValueError) as e:
tty.debug(
f"[SETUP]: rejected {str(variant)} as a preference for {pkg_name}: {str(e)}"
)
continue
for value in variant.values:
for variant_def in variant_defs:
self.variant_values_from_specs.add((pkg_name, id(variant_def), value))
self.gen.fact(
fn.variant_default_value_from_packages_yaml(pkg_name, variant.name, value)
)
[docs]
def target_preferences(self):
key_fn = spack.package_prefs.PackagePrefs("all", "target")
if not self.target_specs_cache:
self.target_specs_cache = [
spack.spec.Spec("target={0}".format(target_name))
for _, target_name in self.default_targets
]
package_targets = self.target_specs_cache[:]
package_targets.sort(key=key_fn)
for i, preferred in enumerate(package_targets):
self.gen.fact(fn.target_weight(str(preferred.architecture.target), i))
[docs]
def spec_clauses(
self,
spec: spack.spec.Spec,
*,
name: Optional[str] = None,
body: bool = False,
transitive: bool = True,
expand_hashes: bool = False,
concrete_build_deps=False,
include_runtimes=False,
required_from: Optional[str] = None,
context: Optional[SourceContext] = None,
) -> List[AspFunction]:
"""Wrap a call to ``_spec_clauses()`` into a try/except block with better error handling.
Arguments are as for ``_spec_clauses()`` except ``required_from``.
Arguments:
required_from: name of package that caused this call.
"""
try:
clauses = self._spec_clauses(
spec,
name=spec.name or name,
body=body,
transitive=transitive,
expand_hashes=expand_hashes,
concrete_build_deps=concrete_build_deps,
include_runtimes=include_runtimes,
context=context,
)
except RuntimeError as exc:
msg = str(exc)
if required_from:
msg += f" [required from package '{required_from}']"
raise RuntimeError(msg)
return clauses
def _spec_clauses(
self,
spec: spack.spec.Spec,
*,
name: Optional[str] = None,
body: bool = False,
transitive: bool = True,
expand_hashes: bool = False,
concrete_build_deps: bool = False,
include_runtimes: bool = False,
context: Optional[SourceContext] = None,
seen: Optional[Set[int]] = None,
) -> List[AspFunction]:
"""Return a list of clauses for a spec mandates are true.
Arguments:
spec: the spec to analyze
name: optional fallback of spec.name (used for anonymous roots)
body: if True, generate clauses to be used in rule bodies (final values) instead
of rule heads (setters).
transitive: if False, don't generate clauses from dependencies (default True)
expand_hashes: if True, descend into hashes of concrete specs (default False)
concrete_build_deps: if False, do not include pure build deps of concrete specs
(as they have no effect on runtime constraints)
include_runtimes: generate full dependency clauses from runtime libraries that
are omitted from the solve.
context: tracks what constraint this clause set is generated for (e.g. a
``depends_on`` constraint in a package.py file)
seen: set of ids of specs that have already been processed (for internal use only)
Normally, if called with ``transitive=True``, ``spec_clauses()`` just generates
hashes for the dependency requirements of concrete specs. If ``expand_hashes``
is ``True``, we'll *also* output all the facts implied by transitive hashes,
which are redundant during a solve but useful outside of one (e.g.,
for spec ``diff``).
"""
clauses = []
seen = seen if seen is not None else set()
name = spec.name or name or ""
seen.add(id(spec))
f: Union[Type[_Head], Type[_Body]] = _Body if body else _Head
if name:
clauses.append(
f.node(name) if not spack.repo.PATH.is_virtual(name) else f.virtual_node(name)
)
if spec.namespace:
clauses.append(f.namespace(name, spec.namespace))
clauses.extend(self.spec_versions(spec, name=name))
# seed architecture at the root (we'll propagate later)
# TODO: use better semantics.
arch = spec.architecture
if arch:
if arch.platform:
clauses.append(f.node_platform(name, arch.platform))
if arch.os:
clauses.append(f.node_os(name, arch.os))
if arch.target:
clauses.extend(self.target_ranges(spec, f.node_target, name=name))
# variants
for vname, variant in sorted(spec.variants.items()):
# TODO: variant="*" means 'variant is defined to something', which used to
# be meaningless in concretization, as all variants had to be defined. But
# now that variants can be conditional, it should force a variant to exist.
if not variant.values:
continue
for value in variant.values:
# ensure that the value *can* be valid for the spec
if name and not spec.concrete and not spack.repo.PATH.is_virtual(name):
variant_defs = vt.prevalidate_variant_value(
self.pkg_class(name), variant, spec
)
# Record that that this is a valid possible value. Accounts for
# int/str/etc., where valid values can't be listed in the package
for variant_def in variant_defs:
self.variant_values_from_specs.add((name, id(variant_def), value))
if variant.propagate:
clauses.append(f.propagate(name, fn.variant_value(vname, value)))
if self.pkg_class(name).has_variant(vname):
clauses.append(f.variant_value(name, vname, value))
else:
variant_clause = f.variant_value(name, vname, value)
if (
variant.concrete
and variant.type == vt.VariantType.MULTI
and not spec.concrete
):
if body is False:
variant_clause.args = (
f"concrete_{variant_clause.args[0]}",
*variant_clause.args[1:],
)
else:
clauses.append(fn.attr("concrete_variant_request", name, vname, value))
clauses.append(variant_clause)
# compiler flags
source = context.source if context else "none"
for flag_type, flags in spec.compiler_flags.items():
flag_group = " ".join(flags)
for flag in flags:
clauses.append(
f.node_flag(name, fn.node_flag(flag_type, flag, flag_group, source))
)
if not spec.concrete and flag.propagate is True:
clauses.append(
f.propagate(
name,
fn.node_flag(flag_type, flag, flag_group, source),
fn.edge_types("link", "run"),
)
)
# Hash for concrete specs
if spec.concrete:
# older specs do not have package hashes, so we have to do this carefully
package_hash = getattr(spec, "_package_hash", None)
if package_hash:
clauses.append(fn.attr("package_hash", name, package_hash))
clauses.append(fn.attr("hash", name, spec.dag_hash()))
if spec.external:
clauses.append(fn.attr("external", name))
# TODO: a loop over `edges_to_dependencies` is preferred over `edges_from_dependents`
# since dependents can point to specs out of scope for the solver.
edges = spec.edges_from_dependents()
if not body and not spec.concrete:
virtuals = sorted(set(itertools.chain.from_iterable(edge.virtuals for edge in edges)))
for virtual in virtuals:
clauses.append(fn.attr("provider_set", name, virtual))
clauses.append(fn.attr("virtual_node", virtual))
else:
# direct dependencies are handled under `edges_to_dependencies()`
virtual_iter = (edge.virtuals for edge in edges if not edge.direct)
virtuals = sorted(set(itertools.chain.from_iterable(virtual_iter)))
for virtual in virtuals:
clauses.append(fn.attr("virtual_on_incoming_edges", name, virtual))
# If the spec is external and concrete, we allow all the libcs on the system
if spec.external and spec.concrete and using_libc_compatibility():
clauses.append(fn.attr("needs_libc", name))
for libc in self.libcs:
clauses.append(fn.attr("compatible_libc", name, libc.name, libc.version))
if not transitive:
return clauses
# Dependencies
edge_clauses = []
for dspec in spec.edges_to_dependencies():
# Ignore conditional dependencies, they are handled by caller
if dspec.when != EMPTY_SPEC:
continue
dep = dspec.spec
if spec.concrete:
# GCC runtime is solved again by clingo, even on concrete specs, to give
# the possibility to reuse specs built against a different runtime.
if dep.name == "gcc-runtime":
edge_clauses.append(
fn.attr("compatible_runtime", name, dep.name, f"{dep.version}:")
)
constraint_spec = spack.spec.Spec(f"{dep.name}@{dep.version}")
self.spec_versions(constraint_spec)
if not include_runtimes:
continue
# libc is also solved again by clingo, but in this case the compatibility
# is not encoded in the parent node - so we need to emit explicit facts
if "libc" in dspec.virtuals:
edge_clauses.append(fn.attr("needs_libc", name))
for libc in self.libcs:
if libc_is_compatible(libc, dep):
edge_clauses.append(
fn.attr("compatible_libc", name, libc.name, libc.version)
)
if not include_runtimes:
continue
# We know dependencies are real for concrete specs. For abstract
# specs they just mean the dep is somehow in the DAG.
for dtype in dt.ALL_FLAGS:
if not dspec.depflag & dtype:
continue
# skip build dependencies of already-installed specs
if concrete_build_deps or dtype != dt.BUILD:
edge_clauses.append(
fn.attr("depends_on", name, dep.name, dt.flag_to_string(dtype))
)
for virtual_name in dspec.virtuals:
edge_clauses.append(
fn.attr("virtual_on_edge", name, dep.name, virtual_name)
)
edge_clauses.append(fn.attr("virtual_node", virtual_name))
# imposing hash constraints for all but pure build deps of
# already-installed concrete specs.
if concrete_build_deps or dspec.depflag != dt.BUILD:
edge_clauses.append(fn.attr("hash", dep.name, dep.dag_hash()))
elif not concrete_build_deps and dspec.depflag:
edge_clauses.append(
fn.attr("concrete_build_dependency", name, dep.name, dep.dag_hash())
)
for virtual_name in dspec.virtuals:
edge_clauses.append(
fn.attr("virtual_on_build_edge", name, dep.name, virtual_name)
)
# if the spec is abstract, descend into dependencies.
# if it's concrete, then the hashes above take care of dependency
# constraints, but expand the hashes if asked for.
if (not spec.concrete or expand_hashes) and id(dep) not in seen:
dependency_clauses = self._spec_clauses(
dep,
body=body,
expand_hashes=expand_hashes,
concrete_build_deps=concrete_build_deps,
context=context,
seen=seen,
)
###
# Dependency expressed with "^"
###
if not dspec.direct:
edge_clauses.extend(dependency_clauses)
continue
###
# Direct dependencies expressed with "%"
###
for dependency_type in dt.flag_to_tuple(dspec.depflag):
edge_clauses.append(fn.attr("depends_on", name, dep.name, dependency_type))
for virtual in dspec.virtuals:
dependency_clauses.append(fn.attr("virtual_on_edge", name, dep.name, virtual))
# By default, wrap head of rules, unless the context says otherwise
wrap_node_requirement = body is False
if context and context.wrap_node_requirement is not None:
wrap_node_requirement = context.wrap_node_requirement
if not wrap_node_requirement:
edge_clauses.extend(dependency_clauses)
continue
for clause in dependency_clauses:
clause.name = "node_requirement"
edge_clauses.append(fn.attr("direct_dependency", name, clause))
clauses.extend(edge_clauses)
return clauses
[docs]
def define_package_versions_and_validate_preferences(
self, possible_pkgs: Set[str], *, require_checksum: bool, allow_deprecated: bool
):
"""Declare any versions in specs not declared in packages."""
packages_yaml = spack.config.CONFIG.get_config("packages")
for pkg_name in sorted(possible_pkgs):
pkg_cls = self.pkg_class(pkg_name)
# All the versions from the corresponding package.py file. Since concepts
# like being a "develop" version or being preferred exist only at a
# package.py level, sort them in this partial list here
from_package_py = list(pkg_cls.versions.items())
if require_checksum and pkg_cls.has_code:
from_package_py = [x for x in from_package_py if _is_checksummed_version(x)]
for v, version_info in from_package_py:
if version_info.get("deprecated", False):
self.deprecated_versions[pkg_name].add(v)
if not allow_deprecated:
continue
self.possible_versions[pkg_name][v].append(Provenance.PACKAGE_PY)
if pkg_name not in packages_yaml or "version" not in packages_yaml[pkg_name]:
continue
# TODO(psakiev) Need facts about versions
# - requires_commit (associated with tag or branch)
from_packages_yaml: List[GitOrStandardVersion] = []
for vstr in packages_yaml[pkg_name]["version"]:
cfg_ver = vn.ver(vstr)
if isinstance(cfg_ver, vn.GitVersion):
if not require_checksum or cfg_ver.is_commit:
from_packages_yaml.append(cfg_ver)
else:
matches = [x for x in self.possible_versions[pkg_name] if x.satisfies(cfg_ver)]
matches.sort(reverse=True)
if not matches:
raise spack.error.ConfigError(
f"Preference for version {cfg_ver} does not match any known "
f"version of {pkg_name}"
)
from_packages_yaml.extend(matches)
from_packages_yaml = list(spack.llnl.util.lang.dedupe(from_packages_yaml))
for v in from_packages_yaml:
provenance = Provenance.PACKAGES_YAML
if isinstance(v, vn.GitVersion):
provenance = Provenance.PACKAGES_YAML_GIT_VERSION
self.possible_versions[pkg_name][v].append(provenance)
self.versions_from_yaml[pkg_name] = from_packages_yaml
[docs]
def define_ad_hoc_versions_from_specs(
self, specs, origin, *, allow_deprecated: bool, require_checksum: bool
):
"""Add concrete versions to possible versions from lists of CLI/dev specs."""
for s in traverse.traverse_nodes(specs):
# If there is a concrete version on the CLI *that we know nothing
# about*, add it to the known versions.
version = s.versions.concrete
if version is None or (any((v == version) for v in self.possible_versions[s.name])):
continue
if require_checksum and not _is_checksummed_git_version(version):
raise UnsatisfiableSpecError(
s.format("No matching version for constraint {name}{@versions}")
)
if not allow_deprecated and version in self.deprecated_versions[s.name]:
continue
self.possible_versions[s.name][version].append(origin)
def _supported_targets(self, compiler_name, compiler_version, targets):
"""Get a list of which targets are supported by the compiler.
Results are ordered most to least recent.
"""
supported, unsupported = [], []
for target in targets:
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
target.optimization_flags(
compiler_name, compiler_version.dotted_numeric_string
)
supported.append(target)
except spack.vendor.archspec.cpu.UnsupportedMicroarchitecture:
unsupported.append(target)
except ValueError:
unsupported.append(target)
return supported, unsupported
[docs]
def os_defaults(self, specs):
self.gen.h2("Possible operating systems")
platform = spack.platforms.host()
# create set of OS's to consider
buildable = set(platform.operating_sys.keys())
# Consider any OS's mentioned on the command line. We need this to
# cross-concretize in CI, and for some tests.
# TODO: OS should really be more than just a label -- rework this.
for spec in specs:
if spec.architecture and spec.architecture.os:
buildable.add(spec.architecture.os)
# make directives for buildable OS's
for build_os in sorted(buildable):
self.gen.fact(fn.buildable_os(build_os))
def keyfun(os):
return (
os == platform.default_os, # prefer default
os not in buildable, # then prefer buildables
os, # then sort by name
)
all_oses = buildable.union(self.possible_oses)
ordered_oses = sorted(all_oses, key=keyfun, reverse=True)
# output the preference order of OS's for the concretizer to choose
for i, os_name in enumerate(ordered_oses):
self.gen.fact(fn.os(os_name, i))
[docs]
def target_defaults(self, specs):
"""Add facts about targets and target compatibility."""
self.gen.h2("Target compatibility")
# Add targets explicitly requested from specs
candidate_targets = []
for x in self.possible_graph.candidate_targets():
if all(
self.possible_graph.unreachable(pkg_name=pkg_name, when_spec=f"target={x}")
for pkg_name in self.pkgs
):
tty.debug(f"[{__name__}] excluding target={x}, cause no package can use it")
continue
candidate_targets.append(x)
host_compatible = spack.config.CONFIG.get("concretizer:targets:host_compatible")
for spec in specs:
if not spec.architecture or not spec.architecture.target:
continue
target_name = spec.target.name
target = spack.vendor.archspec.cpu.TARGETS.get(target_name)
if not target:
if spec.architecture.target_concrete:
raise spack.error.SpecError(
f"the target '{target_name}' in '{spec} is not a known target. "
f"Run 'spack arch --known-targets' to see valid targets."
)
# range/list constraint (contains ':' or ','): keep existing path
self.target_ranges(spec, None)
continue
if target not in candidate_targets and not host_compatible:
candidate_targets.append(target)
for ancestor in target.ancestors:
if ancestor not in candidate_targets:
candidate_targets.append(ancestor)
platform = spack.platforms.host()
uarch = spack.vendor.archspec.cpu.TARGETS.get(platform.default)
best_targets = {uarch.family.name}
for compiler in self.possible_compilers:
supported, unsupported = self._supported_targets(
compiler.name, compiler.version, candidate_targets
)
for target in supported:
best_targets.add(target.name)
self.gen.fact(fn.target_supported(compiler.name, compiler.version, target.name))
if supported:
self.gen.fact(
fn.target_supported(compiler.name, compiler.version, uarch.family.name)
)
for target in unsupported:
self.gen.fact(
fn.target_not_supported(compiler.name, compiler.version, target.name)
)
self.gen.newline()
i = 0
for target in candidate_targets:
self.gen.fact(fn.target(target.name))
self.gen.fact(fn.target_family(target.name, target.family.name))
self.gen.fact(fn.target_compatible(target.name, target.name))
# Code for ancestor can run on target
for ancestor in target.ancestors:
self.gen.fact(fn.target_compatible(target.name, ancestor.name))
# prefer best possible targets; weight others poorly so
# they're not used unless set explicitly
# these are stored to be generated as facts later offset by the
# number of preferred targets
if target.name in best_targets:
self.default_targets.append((i, target.name))
i += 1
else:
self.default_targets.append((100, target.name))
self.gen.newline()
self.default_targets = list(sorted(set(self.default_targets)))
self.target_preferences()
[docs]
def define_version_constraints(self):
"""Define what version_satisfies(...) means in ASP logic."""
sorted_versions = {}
for pkg_name in self.possible_versions:
possible_versions = list(self.possible_versions[pkg_name])
possible_versions.sort()
sorted_versions[pkg_name] = possible_versions
for idx, v in enumerate(possible_versions):
self.gen.fact(fn.pkg_fact(pkg_name, fn.version_order(v, idx)))
if v in self.git_commit_versions[pkg_name]:
sha = self.git_commit_versions[pkg_name].get(v)
if sha:
self.gen.fact(fn.pkg_fact(pkg_name, fn.version_has_commit(v, sha)))
else:
self.gen.fact(fn.pkg_fact(pkg_name, fn.version_needs_commit(v)))
self.gen.newline()
self.gen.newline()
for pkg_name, set_of_versions in sorted(self.version_constraints.items()):
possible_versions = sorted_versions.get(pkg_name)
if possible_versions is None:
continue
for versions in sorted(set_of_versions):
# Look for contiguous ranges of versions that satisfy the constraint
start_idx = None
for current_idx, v in enumerate(possible_versions):
if v.satisfies(versions):
if start_idx is None:
start_idx = current_idx
elif start_idx is not None:
# End of a contiguous satisfying range found
version_range = fn.version_range(versions, start_idx, current_idx - 1)
self.gen.fact(fn.pkg_fact(pkg_name, version_range))
start_idx = None
if start_idx is not None:
version_range = fn.version_range(
versions, start_idx, len(possible_versions) - 1
)
self.gen.fact(fn.pkg_fact(pkg_name, version_range))
self.gen.newline()
[docs]
def collect_virtual_constraints(self):
"""Define versions for constraints on virtuals.
Must be called before define_version_constraints().
"""
# extract all the real versions mentioned in version ranges
def versions_for(v):
if isinstance(v, vn.StandardVersion):
yield v
elif isinstance(v, vn.ClosedOpenRange):
yield v.lo
yield vn._prev_version(v.hi)
elif isinstance(v, vn.VersionList):
for e in v:
yield from versions_for(e)
else:
raise TypeError(f"expected version type, found: {type(v)}")
# Define a set of synthetic possible versions for virtuals that don't define versions in a
# package.py file. This ensures that `version_satisfies(Package, Constraint, Version)` has
# the same semantics for virtuals as for regular packages.
for pkg_name, versions in self.version_constraints.items():
# Not a virtual package
if pkg_name not in self.possible_virtuals:
continue
possible_versions = {pv for v in versions for pv in versions_for(v)}
for version in possible_versions:
self.possible_versions[pkg_name][version].append(Provenance.VIRTUAL_CONSTRAINT)
[docs]
def define_target_constraints(self):
def _all_targets_satisfiying(single_constraint):
allowed_targets = []
if ":" not in single_constraint:
return [single_constraint]
t_min, _, t_max = single_constraint.partition(":")
for test_target in spack.vendor.archspec.cpu.TARGETS.values():
# Check lower bound
if t_min and not t_min <= test_target:
continue
# Check upper bound
if t_max and not t_max >= test_target:
continue
allowed_targets.append(test_target)
return allowed_targets
cache = {}
for target_constraint in sorted(self.target_constraints, key=lambda x: x.name):
# Construct the list of allowed targets for this constraint
allowed_targets = []
for single_constraint in str(target_constraint).split(","):
if single_constraint not in cache:
cache[single_constraint] = _all_targets_satisfiying(single_constraint)
allowed_targets.extend(cache[single_constraint])
for target in allowed_targets:
self.gen.fact(fn.target_satisfies(target_constraint, target))
self.gen.newline()
[docs]
def define_variant_values(self):
"""Validate variant values from the command line.
Add valid variant values from the command line to the possible values for
variant definitions.
"""
# for determinism, sort by variant ids, not variant def ids (which are object ids)
def_info = []
for pkg_name, variant_def_id, value in sorted(self.variant_values_from_specs):
try:
vid = self.variant_ids_by_def_id[variant_def_id]
except KeyError:
tty.debug(
f"[{__name__}] cannot retrieve id of the {value} variant from {pkg_name}"
)
continue
def_info.append((pkg_name, vid, value))
# Tell the concretizer about possible values from specs seen in spec_clauses().
for pkg_name, vid, value in sorted(def_info):
self.gen.fact(fn.pkg_fact(pkg_name, fn.variant_possible_value(vid, value)))
[docs]
def register_concrete_spec(self, spec, possible: set):
# tell the solver about any installed packages that could
# be dependencies (don't tell it about the others)
if spec.name not in possible:
return
try:
# Only consider installed packages for repo we know
spack.repo.PATH.get(spec)
except (spack.repo.UnknownNamespaceError, spack.repo.UnknownPackageError) as e:
tty.debug(f"[REUSE] Issues when trying to reuse {spec.short_spec}: {str(e)}")
return
self.reusable_and_possible.add(spec)
[docs]
def concrete_specs(self):
"""Emit facts for reusable specs"""
for h, spec in self.reusable_and_possible.explicit_items():
# this indicates that there is a spec like this installed
self.gen.fact(fn.installed_hash(spec.name, h))
# indirection layer between hash constraints and imposition to allow for splicing
for pred in self.spec_clauses(spec, body=True, required_from=None):
self.gen.fact(fn.hash_attr(h, *pred.args))
self.gen.newline()
# Declare as possible parts of specs that are not in package.py
# - Add versions to possible versions
# - Add OS to possible OS's
for dep in spec.traverse():
provenance = Provenance.INSTALLED
if isinstance(dep.version, vn.GitVersion):
provenance = Provenance.INSTALLED_GIT_VERSION
self.possible_versions[dep.name][dep.version].append(provenance)
self.possible_oses.add(dep.os)
[docs]
def impossible_dependencies_check(self, specs) -> None:
for edge in traverse.traverse_edges(specs):
possible_deps = self.pkgs
if spack.repo.PATH.is_virtual(edge.spec.name):
possible_deps = self.possible_virtuals
if edge.spec.name not in possible_deps and not str(edge.when):
raise InvalidDependencyError(
f"'{edge.spec.name}' is not a possible dependency of any root spec"
)
[docs]
def setup(
self,
specs: Sequence[spack.spec.Spec],
*,
reuse: Optional[List[spack.spec.Spec]] = None,
packages_with_externals=None,
allow_deprecated: bool = False,
) -> "ProblemInstanceBuilder":
"""Generate an ASP program with relevant constraints for specs.
This calls methods on the solve driver to set up the problem with
facts and rules from all possible dependencies of the input
specs, as well as constraints from the specs themselves.
Arguments:
specs: list of Specs to solve
reuse: list of concrete specs that can be reused
packages_with_externals: precomputed packages config with implicit externals
allow_deprecated: if True adds deprecated versions into the solve
Return:
A ProblemInstanceBuilder populated with facts and rules for an ASP solve.
"""
# TODO: remove this local import and get rid of dependency on globals
import spack.environment as ev
reuse = reuse or []
if packages_with_externals is None:
packages_with_externals = external_config_with_implicit_externals(spack.config.CONFIG)
check_packages_exist(specs)
self.gen = ProblemInstanceBuilder()
# Get compilers from buildcaches only if injected through "reuse" specs
supported_compilers = spack.compilers.config.supported_compilers()
compilers_from_reuse = {
x for x in reuse if x.name in supported_compilers and not x.external
}
candidate_compilers, self.rejected_compilers = possible_compilers(
configuration=spack.config.CONFIG
)
reuse_from_compilers = traverse.traverse_nodes(
[x for x in candidate_compilers if not x.external], deptype=("link", "run")
)
reused_set = set(reuse)
reuse += [x for x in reuse_from_compilers if x not in reused_set]
candidate_compilers.update(compilers_from_reuse)
self.possible_compilers = list(candidate_compilers)
# TODO: warning is because mypy doesn't know Spec supports rich comparison via decorator
self.possible_compilers.sort() # type: ignore[call-arg,call-overload]
self.compiler_mixing()
self.gen.h1("Runtimes")
injected_dependencies = self.define_runtime_constraints()
node_counter = create_counter(
list(specs) + injected_dependencies,
tests=self.tests,
possible_graph=self.possible_graph,
)
self.possible_virtuals = node_counter.possible_virtuals()
self.pkgs = node_counter.possible_dependencies()
self.libcs = sorted(all_libcs()) # type: ignore[type-var]
for node in traverse.traverse_nodes(specs):
if node.namespace is not None:
self.explicitly_required_namespaces[node.name] = node.namespace
self.requirement_parser.parse_rules_from_input_specs(specs)
self.gen.h1("Generic information")
if using_libc_compatibility():
for libc in self.libcs:
self.gen.fact(fn.host_libc(libc.name, libc.version))
if not allow_deprecated:
self.gen.fact(fn.deprecated_versions_not_allowed())
self.gen.newline()
for pkg_name in spack.compilers.config.supported_compilers():
self.gen.fact(fn.compiler_package(pkg_name))
# Calculate develop specs
# they will be used in addition to command line specs
# in determining known versions/targets/os
dev_specs: Tuple[spack.spec.Spec, ...] = ()
env = ev.active_environment()
if env:
dev_specs = tuple(
spack.spec.Spec(info["spec"]).constrained(
'dev_path="%s"'
% spack.util.path.canonicalize_path(info["path"], default_wd=env.path)
)
for name, info in env.dev_specs.items()
)
specs = tuple(specs) # ensure compatible types to add
self.gen.h1("Reusable concrete specs")
self.define_concrete_input_specs(specs, self.pkgs)
if reuse:
self.gen.fact(fn.optimize_for_reuse())
for reusable_spec in reuse:
self.register_concrete_spec(reusable_spec, self.pkgs)
self.concrete_specs()
self.gen.h1("Generic statements on possible packages")
node_counter.possible_packages_facts(self.gen, fn)
self.gen.h1("Possible flags on nodes")
for flag in spack.spec.FlagMap.valid_compiler_flags():
self.gen.fact(fn.flag_type(flag))
self.gen.newline()
self.gen.h1("General Constraints")
self.config_compatible_os()
# architecture defaults
self.platform_defaults()
self.os_defaults(specs + dev_specs)
self.target_defaults(specs + dev_specs)
self.virtual_requirements_and_weights()
self.external_packages(packages_with_externals)
# TODO: make a config option for this undocumented feature
checksummed = "SPACK_CONCRETIZER_REQUIRE_CHECKSUM" in os.environ
self.define_package_versions_and_validate_preferences(
self.pkgs, allow_deprecated=allow_deprecated, require_checksum=checksummed
)
self.define_ad_hoc_versions_from_specs(
specs, Provenance.SPEC, allow_deprecated=allow_deprecated, require_checksum=checksummed
)
self.define_ad_hoc_versions_from_specs(
dev_specs,
Provenance.DEV_SPEC,
allow_deprecated=allow_deprecated,
require_checksum=checksummed,
)
self.validate_and_define_versions_from_requirements(
allow_deprecated=allow_deprecated, require_checksum=checksummed
)
self.gen.h1("Package Constraints")
for pkg in sorted(self.pkgs):
self.gen.h2(f"Package rules: {pkg}")
self.pkg_rules(pkg, tests=self.tests)
self.preferred_variants(pkg)
self.gen.h1("Condition Triggers and Imposed Effects")
self.trigger_rules()
self.effect_rules()
self.gen.h1("Special variants")
self.define_auto_variant("dev_path", multi=False)
self.define_auto_variant("commit", multi=False)
self.define_auto_variant("patches", multi=True)
self.gen.h1("Develop specs")
# Inject dev_path from environment
for ds in dev_specs:
self.condition(spack.spec.Spec(ds.name), ds, msg=f"{ds.name} is a develop spec")
self.trigger_rules()
self.effect_rules()
self.gen.h1("Spec Constraints")
self.literal_specs(specs)
self.gen.h1("Variant Values defined in specs")
self.define_variant_values()
self.gen.h1("Version Constraints")
self.collect_virtual_constraints()
self.define_version_constraints()
self.gen.h1("Target Constraints")
self.define_target_constraints()
# once we've done a full traversal and know possible versions, check that the
# requested solve is at least consistent.
# do not check dependency and version availability for already concrete specs
# as they come from reusable specs
abstract_specs = [s for s in specs if not s.concrete]
self.impossible_dependencies_check(abstract_specs)
self.input_spec_version_check(abstract_specs, allow_deprecated)
return self.gen
[docs]
def compiler_mixing(self):
should_mix = spack.config.get("concretizer:compiler_mixing", True)
if should_mix is True:
return
# anything besides should_mix: true
for lang in ["c", "cxx", "fortran"]:
self.gen.fact(fn.no_compiler_mixing(lang))
# user specified an allow-list
if isinstance(should_mix, list):
for pkg_name in should_mix:
self.gen.fact(fn.allow_mixing(pkg_name))
[docs]
def define_runtime_constraints(self) -> List[spack.spec.Spec]:
"""Define the constraints to be imposed on the runtimes, and returns a list of
injected packages.
"""
recorder = RuntimePropertyRecorder(self)
for compiler in self.possible_compilers:
try:
compiler_cls = spack.repo.PATH.get_pkg_class(compiler.name)
except spack.repo.UnknownPackageError:
pass
else:
if hasattr(compiler_cls, "runtime_constraints"):
compiler_cls.runtime_constraints(spec=compiler, pkg=recorder)
# Inject default flags for compilers
recorder("*").default_flags(compiler)
# Add a dependency on the compiler wrapper
compiler_str = f"{compiler.name} /{compiler.dag_hash()}"
for language in ("c", "cxx", "fortran"):
# Using compiler.name causes a bit of duplication, but that is taken care of by
# clingo during grounding.
recorder("*").depends_on(
"compiler-wrapper",
when=f"%[deptypes=build virtuals={language}] {compiler.name}",
type="build",
description=f"Add compiler wrapper when using {compiler.name} for {language}",
)
if not using_libc_compatibility():
continue
current_libc = None
if compiler.external or compiler.installed:
current_libc = CompilerPropertyDetector(compiler).default_libc()
else:
try:
current_libc = compiler["libc"]
except (KeyError, RuntimeError) as e:
tty.debug(f"{compiler} cannot determine libc because: {e}")
if current_libc:
recorder("*").depends_on(
"libc",
when=f"%[deptypes=build] {compiler.name}",
type="link",
description=f"Add libc when using {compiler.name}",
)
recorder("*").depends_on(
f"{current_libc.name}@={current_libc.version}",
when=f"%[deptypes=build] {compiler_str}",
type="link",
description=f"Libc is {current_libc} when using {compiler}",
)
recorder.consume_facts()
return sorted(recorder.injected_dependencies)
[docs]
def literal_specs(self, specs):
for spec in sorted(specs):
self.gen.h2(f"Spec: {str(spec)}")
condition_id = next(self._id_counter)
trigger_id = next(self._id_counter)
# Special condition triggered by "literal_solved"
self.gen.fact(fn.literal(trigger_id))
self.gen.fact(fn.pkg_fact(spec.name, fn.condition_trigger(condition_id, trigger_id)))
self.gen.fact(fn.condition_reason(condition_id, f"{spec} requested explicitly"))
imposed_spec_key = str(spec), None
cache = self._effect_cache[spec.name]
if imposed_spec_key in cache:
effect_id, requirements = cache[imposed_spec_key]
else:
effect_id = next(self._id_counter)
context = SourceContext()
context.source = "literal"
requirements = self.spec_clauses(spec, context=context)
root_name = spec.name
for clause in requirements:
clause_name = clause.args[0]
if clause_name == "variant_set":
requirements.append(
fn.attr("variant_default_value_from_cli", *clause.args[1:])
)
elif clause_name in ("node", "virtual_node", "hash"):
# These facts are needed to compute the "condition_set" of the root
pkg_name = clause.args[1]
self.gen.fact(fn.mentioned_in_literal(trigger_id, root_name, pkg_name))
requirements.append(
fn.attr(
"virtual_root" if spack.repo.PATH.is_virtual(spec.name) else "root", spec.name
)
)
requirements = [x for x in requirements if x.args[0] != "depends_on"]
cache[imposed_spec_key] = (effect_id, requirements)
self.gen.fact(fn.pkg_fact(spec.name, fn.condition_effect(condition_id, effect_id)))
# Create subcondition with any conditional dependencies
# self.spec_clauses does not do anything with conditional
# dependencies
self.generate_conditional_dep_conditions(spec, condition_id)
if self.concretize_everything:
self.gen.fact(fn.solve_literal(trigger_id))
# Trigger rules are needed to allow conditional specs
self.trigger_rules()
self.effect_rules()
[docs]
def generate_conditional_dep_conditions(self, spec: spack.spec.Spec, condition_id: int):
"""Generate a subcondition in the trigger for any conditional dependencies.
Dependencies are always modeled by a condition. For conditional dependencies,
the when-spec is added as a subcondition of the trigger to ensure the dependency
is only activated when the subcondition holds.
"""
for dspec in spec.traverse_edges():
# Ignore unconditional deps
if dspec.when == EMPTY_SPEC:
continue
# Cannot use "virtual_node" attr as key for condition
# because reused specs do not track virtual nodes.
# Instead, track whether the parent uses the virtual
def virtual_handler(
name: str, input_spec: spack.spec.Spec, requirements: List[AspFunction]
) -> List[AspFunction]:
ret = remove_facts("virtual_node")(name, input_spec, requirements)
for edge in input_spec.traverse_edges(root=False, cover="edges"):
if spack.repo.PATH.is_virtual(edge.spec.name):
parent_name = name if edge.parent is input_spec else edge.parent.name
ret.append(fn.attr("uses_virtual", parent_name, edge.spec.name))
return ret
context = ConditionContext()
context.source = ConstraintOrigin.append_type_suffix(
dspec.parent.name, ConstraintOrigin.CONDITIONAL_SPEC
)
# Default is to remove node-like attrs, override here
context.transform_required = virtual_handler
context.transform_imposed = identity_for_facts
try:
subcondition_id = self.condition(
dspec.when,
spack.spec.Spec(dspec.format(unconditional=True)),
required_name=dspec.parent.name,
context=context,
msg=f"Conditional dependency in ^[when={dspec.when}]{dspec.spec}",
)
self.gen.fact(fn.subcondition(subcondition_id, condition_id))
except vt.UnknownVariantError as e:
# A variant in the 'when=' condition can't apply to the parent of the edge
tty.debug(f"[{__name__}] cannot emit subcondition for {dspec.format()}: {e}")
[docs]
def validate_and_define_versions_from_requirements(
self, *, allow_deprecated: bool, require_checksum: bool
):
"""If package requirements mention concrete versions that are not mentioned
elsewhere, then we need to collect those to mark them as possible
versions. If they are abstract and statically have no match, then we
need to throw an error. This function assumes all possible versions are already
registered in self.possible_versions."""
for pkg_name, d in spack.config.CONFIG.get_config("packages").items():
if pkg_name == "all" or "require" not in d:
continue
for s in traverse.traverse_nodes(self._specs_from_requires(pkg_name, d["require"])):
name, versions = s.name, s.versions
if name not in self.pkgs or versions == vn.any_version:
continue
s.attach_git_version_lookup()
v = versions.concrete
if not v:
# If the version is not concrete, check it's statically concretizable. If
# not throw an error, which is just so that users know they need to change
# their config, instead of getting a hard to decipher concretization error.
if not any(x for x in self.possible_versions[name] if x.satisfies(versions)):
raise spack.error.ConfigError(
f"Version requirement {versions} on {pkg_name} for {name} "
f"cannot match any known version from package.py or externals"
)
continue
if v in self.possible_versions[name]:
continue
if not allow_deprecated and v in self.deprecated_versions[name]:
continue
# If concrete an not yet defined, conditionally define it, like we do for specs
# from the command line.
if not require_checksum or _is_checksummed_git_version(v):
self.possible_versions[name][v].append(Provenance.PACKAGE_REQUIREMENT)
def _specs_from_requires(self, pkg_name, section):
"""Collect specs from a requirement rule"""
if isinstance(section, str):
yield _spec_with_default_name(section, pkg_name)
return
for spec_group in section:
if isinstance(spec_group, str):
yield _spec_with_default_name(spec_group, pkg_name)
continue
# Otherwise it is an object. The object can contain a single
# "spec" constraint, or a list of them with "any_of" or
# "one_of" policy.
if "spec" in spec_group:
yield _spec_with_default_name(spec_group["spec"], pkg_name)
continue
key = "one_of" if "one_of" in spec_group else "any_of"
for s in spec_group[key]:
yield _spec_with_default_name(s, pkg_name)
[docs]
def pkg_class(self, pkg_name: str) -> Type[spack.package_base.PackageBase]:
request = pkg_name
if pkg_name in self.explicitly_required_namespaces:
namespace = self.explicitly_required_namespaces[pkg_name]
request = f"{namespace}.{pkg_name}"
return spack.repo.PATH.get_pkg_class(request)
class _Head:
"""ASP functions used to express spec clauses in the HEAD of a rule"""
node = fn.attr("node")
namespace = fn.attr("namespace_set")
virtual_node = fn.attr("virtual_node")
node_platform = fn.attr("node_platform_set")
node_os = fn.attr("node_os_set")
node_target = fn.attr("node_target_set")
variant_value = fn.attr("variant_set")
node_flag = fn.attr("node_flag_set")
propagate = fn.attr("propagate")
class _Body:
"""ASP functions used to express spec clauses in the BODY of a rule"""
node = fn.attr("node")
namespace = fn.attr("namespace")
virtual_node = fn.attr("virtual_node")
node_platform = fn.attr("node_platform")
node_os = fn.attr("node_os")
node_target = fn.attr("node_target")
variant_value = fn.attr("variant_value")
node_flag = fn.attr("node_flag")
propagate = fn.attr("propagate")
[docs]
def strip_asp_problem(asp_problem: Iterable[str]) -> List[str]:
"""Remove comments and empty lines from an ASP program."""
def strip_statement(stmt: str) -> str:
lines = [line for line in stmt.split("\n") if not line.startswith("%")]
return "".join(line.strip() for line in lines if line)
value = [strip_statement(stmt) for stmt in asp_problem]
value = [s for s in value if s]
return value
[docs]
class ProblemInstanceBuilder:
"""Provides an interface to construct a problem instance.
Once all the facts and rules have been added, the problem instance can be retrieved with:
>>> builder = ProblemInstanceBuilder()
>>> ...
>>> problem_instance = builder.value()
The problem instance can be added directly to the "control" structure of clingo.
"""
def __init__(self) -> None:
self.asp_problem: List[str] = []
[docs]
def fact(self, atom: AspFunction) -> None:
self.asp_problem.append(f"{atom}.")
[docs]
def append(self, rule: str) -> None:
self.asp_problem.append(rule)
[docs]
def title(self, header: str, char: str) -> None:
sep = char * 76
self.newline()
self.asp_problem.append(f"%{sep}")
self.asp_problem.append(f"% {header}")
self.asp_problem.append(f"%{sep}")
[docs]
def h1(self, header: str) -> None:
self.title(header, "=")
[docs]
def h2(self, header: str) -> None:
self.title(header, "-")
[docs]
def h3(self, header: str):
self.asp_problem.append(f"% {header}")
[docs]
def newline(self):
self.asp_problem.append("")
[docs]
def possible_compilers(*, configuration) -> Tuple[Set["spack.spec.Spec"], Set["spack.spec.Spec"]]:
result, rejected = set(), set()
# Compilers defined in configuration
for c in spack.compilers.config.all_compilers_from(configuration):
if using_libc_compatibility() and not c_compiler_runs(c):
rejected.add(c)
try:
compiler = c.extra_attributes["compilers"]["c"]
tty.debug(
f"the C compiler {compiler} does not exist, or does not run correctly."
f" The compiler {c} will not be used during concretization."
)
except KeyError:
tty.debug(f"the spec {c} does not provide a C compiler.")
continue
if using_libc_compatibility() and not CompilerPropertyDetector(c).default_libc():
rejected.add(c)
warnings.warn(
f"cannot detect libc from {c}. The compiler will not be used "
f"during concretization."
)
continue
if c in result:
tty.debug(f"[{__name__}] duplicate {c.long_spec} compiler found")
continue
result.add(c)
# Compilers from the local store
supported_compilers = spack.compilers.config.supported_compilers()
for pkg_name in supported_compilers:
result.update(spack.store.STORE.db.query(pkg_name))
return result, rejected
FunctionTupleT = Tuple[str, Tuple[Union[str, NodeId], ...]]
[docs]
class SpecBuilder:
"""Class with actions to rebuild a spec from ASP results."""
#: Regex for attributes that don't need actions b/c they aren't used to construct specs.
ignored_attributes = re.compile(
"|".join(
[
r"^.*_propagate$",
r"^.*_satisfies$",
r"^.*_set$",
r"^compatible_libc$",
r"^dependency_holds$",
r"^package_hash$",
r"^root$",
r"^uses_virtual$",
r"^variant_default_value_from_cli$",
r"^virtual_node$",
r"^virtual_on_incoming_edges$",
r"^virtual_root$",
]
)
)
[docs]
@staticmethod
def make_node(*, pkg: str) -> NodeId:
"""Given a package name, returns the string representation of the "min_dupe_id" node in
the ASP encoding.
Args:
pkg: name of a package
"""
return NodeId(id="0", pkg=pkg)
def __init__(self, specs, hash_lookup=None):
self._specs: Dict[NodeId, spack.spec.Spec] = {}
# Matches parent nodes to splice node
self._splices: Dict[spack.spec.Spec, List[spack.solver.splicing.Splice]] = {}
self._result = None
self._command_line_specs = specs
self._flag_sources: Dict[Tuple[NodeId, str], Set[str]] = collections.defaultdict(
lambda: set()
)
# Pass in as arguments reusable specs and plug them in
# from this dictionary during reconstruction
self._hash_lookup = hash_lookup or ConcreteSpecsByHash()
[docs]
def hash(self, node, h):
if node not in self._specs:
self._specs[node] = self._hash_lookup[h]
[docs]
def node(self, node):
if node not in self._specs:
self._specs[node] = spack.spec.Spec(node.pkg)
for flag_type in spack.spec.FlagMap.valid_compiler_flags():
self._specs[node].compiler_flags[flag_type] = []
def _arch(self, node):
arch = self._specs[node].architecture
if not arch:
arch = spack.spec.ArchSpec()
self._specs[node].architecture = arch
return arch
[docs]
def namespace(self, node, namespace):
self._specs[node].namespace = namespace
[docs]
def node_os(self, node, os):
self._arch(node).os = os
[docs]
def node_target(self, node, target):
self._arch(node).target = target
[docs]
def variant_selected(self, node, name: str, value: str, variant_type: str, variant_id):
spec = self._specs[node]
variant = spec.variants.get(name)
if not variant:
spec.variants[name] = vt.VariantValue.from_concretizer(name, value, variant_type)
else:
assert variant_type == "multi", (
f"Can't have multiple values for single-valued variant: "
f"{node}, {name}, {value}, {variant_type}, {variant_id}"
)
variant.append(value)
[docs]
def version(self, node, version):
self._specs[node].versions = vn.VersionList([vn.Version(version)])
[docs]
def node_flag(self, node, node_flag):
self._specs[node].compiler_flags.add_flag(
node_flag.flag_type, node_flag.flag, False, node_flag.flag_group, node_flag.source
)
[docs]
def depends_on(self, parent_node, dependency_node, type):
dependency_spec = self._specs[dependency_node]
depflag = dt.flag_from_string(type)
self._specs[parent_node].add_dependency_edge(dependency_spec, depflag=depflag, virtuals=())
[docs]
def virtual_on_edge(self, parent_node, provider_node, virtual):
dependencies = self._specs[parent_node].edges_to_dependencies(name=(provider_node.pkg))
provider_spec = self._specs[provider_node]
dependencies = [x for x in dependencies if id(x.spec) == id(provider_spec)]
assert len(dependencies) == 1, f"{virtual}: {provider_node.pkg}"
dependencies[0].update_virtuals(virtual)
[docs]
def reorder_flags(self):
"""For each spec, determine the order of compiler flags applied to it.
The solver determines which flags are on nodes; this routine
imposes order afterwards. The order is:
1. Flags applied in compiler definitions should come first
2. Flags applied by dependents are ordered topologically (with a
dependency on ``traverse`` to resolve the partial order into a
stable total order)
3. Flags from requirements are then applied (requirements always
come from the package and never a parent)
4. Command-line flags should come last
Additionally, for each source (requirements, compiler, command line, and
dependents), flags from that source should retain their order and grouping:
e.g. for ``y cflags="-z -a"`` ``-z`` and ``-a`` should never have any intervening
flags inserted, and should always appear in that order.
"""
for node, spec in self._specs.items():
# if bootstrapping, compiler is not in config and has no flags
flagmap_from_compiler = {
flag_type: [x for x in values if x.source == "compiler"]
for flag_type, values in spec.compiler_flags.items()
}
flagmap_from_cli = {}
for flag_type, values in spec.compiler_flags.items():
if not values:
continue
flags = [x for x in values if x.source == "literal"]
if not flags:
continue
# For compiler flags from literal specs, reorder any flags to
# the input order from flag.flag_group
flagmap_from_cli[flag_type] = _reorder_flags(flags)
for flag_type in spec.compiler_flags.valid_compiler_flags():
ordered_flags = []
# 1. Put compiler flags first
from_compiler = tuple(flagmap_from_compiler.get(flag_type, []))
extend_flag_list(ordered_flags, from_compiler)
# 2. Add all sources (the compiler is one of them, so skip any
# flag group that matches it exactly)
flag_groups = set()
for flag in self._specs[node].compiler_flags.get(flag_type, []):
flag_groups.add(
spack.spec.CompilerFlag(
flag.flag_group,
propagate=flag.propagate,
flag_group=flag.flag_group,
source=flag.source,
)
)
# For flags that are applied by dependents, put flags from parents
# before children; we depend on the stability of traverse() to
# achieve a stable flag order for flags introduced in this manner.
topo_order = list(s.name for s in spec.traverse(order="post", direction="parents"))
lex_order = list(sorted(flag_groups))
def _order_index(flag_group):
source = flag_group.source
# Note: if 'require: ^dependency cflags=...' is ever possible,
# this will topologically sort for require as well
type_index, pkg_source = ConstraintOrigin.strip_type_suffix(source)
if pkg_source in topo_order:
major_index = topo_order.index(pkg_source)
# If for x->y, x has multiple depends_on declarations that
# are activated, and each adds cflags to y, we fall back on
# alphabetical ordering to maintain a total order
minor_index = lex_order.index(flag_group)
else:
major_index = len(topo_order) + lex_order.index(flag_group)
minor_index = 0
return (type_index, major_index, minor_index)
prioritized_groups = sorted(flag_groups, key=lambda x: _order_index(x))
for grp in prioritized_groups:
grp_flags = tuple(
x for (x, y) in spack.compilers.flags.tokenize_flags(grp.flag_group)
)
if grp_flags == from_compiler:
continue
as_compiler_flags = list(
spack.spec.CompilerFlag(
x,
propagate=grp.propagate,
flag_group=grp.flag_group,
source=grp.source,
)
for x in grp_flags
)
extend_flag_list(ordered_flags, as_compiler_flags)
# 3. Now put cmd-line flags last
if flag_type in flagmap_from_cli:
extend_flag_list(ordered_flags, flagmap_from_cli[flag_type])
compiler_flags = spec.compiler_flags.get(flag_type, [])
msg = f"{set(compiler_flags)} does not equal {set(ordered_flags)}"
assert set(compiler_flags) == set(ordered_flags), msg
spec.compiler_flags.update({flag_type: ordered_flags})
[docs]
def deprecated(self, node: NodeId, version: str) -> None:
tty.warn(f'using "{node.pkg}@{version}" which is a deprecated version')
[docs]
def splice_at_hash(
self, parent_node: NodeId, splice_node: NodeId, child_name: str, child_hash: str
):
parent_spec = self._specs[parent_node]
splice_spec = self._specs[splice_node]
splice = spack.solver.splicing.Splice(
splice_spec, child_name=child_name, child_hash=child_hash
)
self._splices.setdefault(parent_spec, []).append(splice)
[docs]
def build_specs(self, function_tuples: List[FunctionTupleT]) -> List[spack.spec.Spec]:
# TODO: remove this local import and get rid of dependency on globals
import spack.environment as ev
attr_key = {
# hash attributes are handled first, since they imply entire concrete specs
"hash": -5,
# node attributes are handled next, since they instantiate nodes
"node": -4,
# evaluated last, so all nodes are fully constructed
"virtual_on_edge": 2,
}
# Sort functions so that directives building objects are called in the right order
function_tuples.sort(key=lambda x: attr_key.get(x[0], 0))
self._specs = {}
for name, args in function_tuples:
if SpecBuilder.ignored_attributes.match(name):
continue
action = getattr(self, name, None)
# print out unknown actions so we can display them for debugging
if not action:
msg = f'UNKNOWN SYMBOL: attr("{name}", {", ".join(str(a) for a in args)})'
tty.debug(msg)
continue
msg = (
"Internal Error: Uncallable action found in asp.py. Please report to the spack"
" maintainers."
)
assert action and callable(action), msg
# ignore predicates on virtual packages, as they're used for
# solving but don't construct anything. Do not ignore error
# predicates on virtual packages.
if name != "error":
node = args[0]
assert isinstance(node, NodeId), (
f"internal solver error: expected a node, but got a {type(args[0])}. "
"Please report a bug at https://github.com/spack/spack/issues"
)
pkg = node.pkg
if spack.repo.PATH.is_virtual(pkg):
continue
# if we've already gotten a concrete spec for this pkg, we're done, unless
# we're splicing. `splice_at_hash()` is the only action we call on concrete specs.
spec = self._specs.get(node)
if spec and spec.concrete and name != "splice_at_hash":
continue
action(*args)
# fix flags after all specs are constructed
self.reorder_flags()
# inject patches -- note that we' can't use set() to unique the
# roots here, because the specs aren't complete, and the hash
# function will loop forever.
roots = [spec.root for spec in self._specs.values()]
roots = dict((id(r), r) for r in roots)
for root in roots.values():
spack.spec._inject_patches_variant(root)
# Add external paths to specs with just external modules
for s in self._specs.values():
_ensure_external_path_if_external(s)
for s in self._specs.values():
_develop_specs_from_env(s, ev.active_environment())
# check for commits must happen after all version adaptations are complete
for s in self._specs.values():
_specs_with_commits(s)
# mark concrete and assign hashes to all specs in the solve
for root in roots.values():
root._finalize_concretization()
# Unify hashes (this is to avoid duplicates of runtimes and compilers)
unifier = ConcreteSpecsByHash()
keys = list(self._specs)
for key in keys:
current_spec = self._specs[key]
unifier.add(current_spec)
self._specs[key] = unifier[current_spec.dag_hash()]
# Only attempt to resolve automatic splices if the solver produced any
if self._splices:
resolved_splices = spack.solver.splicing._resolve_collected_splices(
list(self._specs.values()), self._splices
)
new_specs = {}
for node, spec in self._specs.items():
new_specs[node] = resolved_splices.get(spec, spec)
self._specs = new_specs
for s in self._specs.values():
spack.spec.Spec.ensure_no_deprecated(s)
# Add git version lookup info to concrete Specs (this is generated for
# abstract specs as well but the Versions may be replaced during the
# concretization process)
for root in self._specs.values():
for spec in root.traverse():
if isinstance(spec.version, vn.GitVersion):
spec.version.attach_lookup(
spack.version.git_ref_lookup.GitRefLookup(spec.fullname)
)
specs = self.execute_explicit_splices()
return specs
[docs]
def execute_explicit_splices(self):
splice_config = spack.config.CONFIG.get("concretizer:splice:explicit", [])
splice_triples = []
for splice_set in splice_config:
target = splice_set["target"]
replacement = spack.spec.Spec(splice_set["replacement"])
if not replacement.abstract_hash:
location = getattr(
splice_set["replacement"], "_start_mark", " at unknown line number"
)
msg = f"Explicit splice replacement '{replacement}' does not include a hash.\n"
msg += f"{location}\n\n"
msg += " Splice replacements must be specified by hash"
raise InvalidSpliceError(msg)
transitive = splice_set.get("transitive", False)
splice_triples.append((target, replacement, transitive))
specs = {}
for key, spec in self._specs.items():
current_spec = spec
for target, replacement, transitive in splice_triples:
if target in current_spec:
# matches root or non-root
# e.g. mvapich2%gcc
# The first iteration, we need to replace the abstract hash
if not replacement.concrete:
replacement.replace_hash()
current_spec = current_spec.splice(replacement, transitive)
new_key = NodeId(id=key.id, pkg=current_spec.name)
specs[new_key] = current_spec
return specs
def _specs_with_commits(spec):
pkg_class = spack.repo.PATH.get_pkg_class(spec.fullname)
if not pkg_class.needs_commit(spec.version):
return
if isinstance(spec.version, vn.GitVersion):
if "commit" not in spec.variants and spec.version.commit_sha:
spec.variants["commit"] = vt.SingleValuedVariant("commit", spec.version.commit_sha)
pkg_class._resolve_git_provenance(spec)
if "commit" not in spec.variants:
if not spec.is_develop:
tty.warn(
f"Unable to resolve the git commit for {spec.name}. "
"An installation of this binary won't have complete binary provenance."
)
return
# check integrity of user specified commit shas
invalid_commit_msg = (
f"Internal Error: {spec.name}'s assigned commit {spec.variants['commit'].value}"
" does not meet commit syntax requirements."
)
assert vn.is_git_commit_sha(spec.variants["commit"].value), invalid_commit_msg
def _ensure_external_path_if_external(spec: spack.spec.Spec) -> None:
if not spec.external_modules or spec.external_path:
return
# Get the path from the module the package can override the default
# (this is mostly needed for Cray)
pkg_cls = spack.repo.PATH.get_pkg_class(spec.name)
package = pkg_cls(spec)
spec.external_path = getattr(package, "external_prefix", None) or md.path_from_modules(
spec.external_modules
)
def _develop_specs_from_env(spec, env):
dev_info = env.dev_specs.get(spec.name, {}) if env else {}
if not dev_info:
return
path = spack.util.path.canonicalize_path(dev_info["path"], default_wd=env.path)
if "dev_path" in spec.variants:
error_msg = (
"Internal Error: The dev_path for spec {name} is not connected to a valid environment"
"path. Please note that develop specs can only be used inside an environment"
"These paths should be the same:\n\tdev_path:{dev_path}\n\tenv_based_path:{env_path}"
).format(name=spec.name, dev_path=spec.variants["dev_path"], env_path=path)
assert spec.variants["dev_path"].value == path, error_msg
else:
spec.variants.setdefault("dev_path", vt.SingleValuedVariant("dev_path", path))
assert spec.satisfies(dev_info["spec"])
[docs]
class Solver:
"""This is the main external interface class for solving.
It manages solver configuration and preferences in one place. It sets up the solve
and passes the setup method to the driver, as well.
"""
def __init__(self, *, specs_factory: Optional[SpecFiltersFactory] = None):
# Compute possible compilers first, so we see them as externals
_ = spack.compilers.config.all_compilers(init_config=True)
self._conc_cache = ConcretizationCache()
self.driver = PyclingoDriver(conc_cache=self._conc_cache)
# Compute packages configuration with implicit externals once and reuse it
self.packages_with_externals = external_config_with_implicit_externals(spack.config.CONFIG)
completion_mode = spack.config.CONFIG.get("concretizer:externals:completion")
self.selector = ReusableSpecsSelector(
configuration=spack.config.CONFIG,
external_parser=create_external_parser(self.packages_with_externals, completion_mode),
factory=specs_factory,
packages_with_externals=self.packages_with_externals,
)
@staticmethod
def _check_input_and_extract_concrete_specs(
specs: Sequence[spack.spec.Spec],
) -> List[spack.spec.Spec]:
_check_unknown_virtuals_in_input_specs(specs)
reusable: List[spack.spec.Spec] = []
analyzer = create_graph_analyzer()
for root in specs:
for s in root.traverse():
if s.concrete:
reusable.append(s)
else:
if spack.repo.PATH.is_virtual(s.name):
continue
# Error if direct dependencies cannot be satisfied
deps = {
edge.spec.name
for edge in s.edges_to_dependencies()
if edge.direct and edge.when == EMPTY_SPEC
}
if deps:
graph = analyzer.possible_dependencies(
s, allowed_deps=dt.ALL, transitive=False
)
deps.difference_update(graph.real_pkgs, graph.virtuals)
if deps:
start_str = f"'{root}'" if s == root else f"'{s}' in '{root}'"
raise UnsatisfiableSpecError(
f"{start_str} cannot depend on {', '.join(deps)}"
)
try:
spack.repo.PATH.get_pkg_class(s.fullname)
except spack.repo.UnknownPackageError:
raise UnsatisfiableSpecError(
f"cannot concretize '{root}', since '{s.name}' does not exist"
)
spack.spec.Spec.ensure_valid_variants(s)
return reusable
[docs]
def solve_with_stats(
self,
specs: Sequence[spack.spec.Spec],
out: Optional[io.IOBase] = None,
timers: bool = False,
stats: bool = False,
tests: spack.concretize.TestsType = False,
setup_only: bool = False,
allow_deprecated: bool = False,
) -> Tuple[Result, Optional[spack.util.timer.Timer], Optional[Dict]]:
"""
Concretize a set of specs and track the timing and statistics for the solve
Arguments:
specs: List of ``Spec`` objects to solve for.
out: Optionally write the generate ASP program to a file-like object.
timers: Print out coarse timers for different solve phases.
stats: Print out detailed stats from clingo.
tests: If True, concretize test dependencies for all packages.
If a tuple of package names, concretize test dependencies for named
packages (defaults to False: do not concretize test dependencies).
setup_only: if True, stop after setup and don't solve (default False).
allow_deprecated: allow deprecated version in the solve
"""
specs = [s.lookup_hash() for s in specs]
reusable_specs = self._check_input_and_extract_concrete_specs(specs)
reusable_specs.extend(self.selector.reusable_specs(specs))
setup = SpackSolverSetup(tests=tests)
output = OutputConfiguration(timers=timers, stats=stats, out=out, setup_only=setup_only)
result = self.driver.solve(
setup,
specs,
reuse=reusable_specs,
packages_with_externals=self.packages_with_externals,
output=output,
allow_deprecated=allow_deprecated,
)
self._conc_cache.cleanup()
return result
[docs]
def solve(self, specs: Sequence[spack.spec.Spec], **kwargs) -> Result:
"""
Convenience function for concretizing a set of specs and ignoring timing
and statistics. Uses the same kwargs as solve_with_stats.
"""
# Check upfront that the variants are admissible
result, _, _ = self.solve_with_stats(specs, **kwargs)
return result
[docs]
def solve_in_rounds(
self,
specs: Sequence[spack.spec.Spec],
out: Optional[io.IOBase] = None,
timers: bool = False,
stats: bool = False,
tests: spack.concretize.TestsType = False,
allow_deprecated: bool = False,
) -> Generator[Result, None, None]:
"""Solve for a stable model of specs in multiple rounds.
This relaxes the assumption of solve that everything must be consistent and
solvable in a single round. Each round tries to maximize the reuse of specs
from previous rounds.
The function is a generator that yields the result of each round.
Arguments:
specs (list): list of Specs to solve.
out: Optionally write the generate ASP program to a file-like object.
timers (bool): print timing if set to True
stats (bool): print internal statistics if set to True
tests (bool): add test dependencies to the solve
allow_deprecated (bool): allow deprecated version in the solve
"""
specs = [s.lookup_hash() for s in specs]
reusable_specs = self._check_input_and_extract_concrete_specs(specs)
reusable_specs.extend(self.selector.reusable_specs(specs))
setup = SpackSolverSetup(tests=tests)
# Tell clingo that we don't have to solve all the inputs at once
setup.concretize_everything = False
input_specs = specs
output = OutputConfiguration(timers=timers, stats=stats, out=out, setup_only=False)
while True:
result, _, _ = self.driver.solve(
setup,
input_specs,
reuse=reusable_specs,
packages_with_externals=self.packages_with_externals,
output=output,
allow_deprecated=allow_deprecated,
)
yield result
# If we don't have unsolved specs we are done
if not result.unsolved_specs:
break
if not result.specs:
# This is also a problem: no specs were solved for, which means we would be in a
# loop if we tried again
raise OutputDoesNotSatisfyInputError(result.unsolved_specs)
input_specs = list(x for (x, y) in result.unsolved_specs)
for spec in result.specs:
reusable_specs.extend(spec.traverse())
self._conc_cache.cleanup()
class _SkipConcreteVisitor(traverse.BaseVisitor):
"""Visitor that trims edges between two concrete nodes."""
def neighbors(self, item):
if item.edge.spec.concrete:
return []
return super().neighbors(item)
def _check_unknown_virtuals_in_input_specs(specs: Sequence[spack.spec.Spec]) -> None:
"""Raise if any edge in *specs* requires a virtual that does not exist in the repository."""
errors = []
for root in specs:
root_edges = traverse.with_artificial_edges([root])
visitor = traverse.CoverNodesVisitor(_SkipConcreteVisitor())
for edge in traverse.traverse_breadth_first_edges_generator(root_edges, visitor):
for virtual in edge.virtuals:
if not spack.repo.PATH.is_virtual(virtual):
errors.append(f"'{virtual}' in '{root}' is not a known virtual package")
if not errors:
return
if len(errors) == 1:
raise spack.error.InvalidVirtualOnEdgeError(errors[0])
details = "\n".join(f" {idx}. {msg}" for idx, msg in enumerate(errors, 1))
raise spack.error.InvalidVirtualOnEdgeError(
f"unknown virtuals have been found in input specs:\n{details}"
)
[docs]
class UnsatisfiableSpecError(spack.error.UnsatisfiableSpecError):
"""There was an issue with the spec that was requested (i.e. a user error)."""
def __init__(self, msg):
super(spack.error.UnsatisfiableSpecError, self).__init__(msg)
self.provided = None
self.required = None
self.constraint_type = None
[docs]
class InternalConcretizerError(spack.error.UnsatisfiableSpecError):
"""Errors that indicate a bug in Spack."""
def __init__(self, msg):
super(spack.error.UnsatisfiableSpecError, self).__init__(msg)
self.provided = None
self.required = None
self.constraint_type = None
[docs]
class SolverError(InternalConcretizerError):
"""For cases where the solver is unable to produce a solution.
Such cases are unexpected because we allow for solutions with errors,
so for example user specs that are over-constrained should still
get a solution.
"""
def __init__(self, provided):
msg = (
"Spack concretizer internal error. Please submit a bug report at "
"https://github.com/spack/spack and include the command and environment "
"if applicable."
f"\n {provided} is unsatisfiable"
)
super().__init__(msg)
# Add attribute expected of the superclass interface
self.required = None
self.constraint_type = None
self.provided = provided
[docs]
class InvalidSpliceError(spack.error.SpackError):
"""For cases in which the splice configuration is invalid."""
[docs]
class NoCompilerFoundError(spack.error.SpackError):
"""Raised when there is no possible compiler"""
[docs]
class InvalidExternalError(spack.error.SpackError):
"""Raised when there is no possible compiler"""
[docs]
class DeprecatedVersionError(spack.error.SpackError):
"""Raised when user directly requests a deprecated version."""
[docs]
class InvalidVersionError(spack.error.SpackError):
"""Raised when a version can't be satisfied by any possible versions."""
[docs]
class InvalidDependencyError(spack.error.SpackError):
"""Raised when an explicit dependency is not a possible dependency."""