Source code for spack.environment.list

# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import itertools
from typing import Any, Dict, List, NamedTuple, Optional, Union

import spack.spec
import spack.util.spack_yaml
import spack.variant
from spack.error import SpackError
from spack.spec import Spec
from spack.spec_parser import expand_toolchains


[docs] class SpecList: def __init__( self, *, name: str = "specs", yaml_list=None, expanded_list=None, toolchains=None ): self.name = name self.yaml_list = yaml_list[:] if yaml_list is not None else [] # Expansions can be expensive to compute and difficult to keep updated # We cache results and invalidate when self.yaml_list changes self.specs_as_yaml_list = expanded_list or [] self._constraints = None self._specs: Optional[List[Spec]] = None self._toolchains = toolchains @property def is_matrix(self): for item in self.specs_as_yaml_list: if isinstance(item, dict): return True return False @property def specs_as_constraints(self): if self._constraints is None: constraints = [] for item in self.specs_as_yaml_list: if isinstance(item, dict): # matrix of specs constraints.extend(_expand_matrix_constraints(item)) else: # individual spec constraints.append([Spec(item)]) self._constraints = constraints return self._constraints @property def specs(self) -> List[Spec]: if self._specs is None: specs: List[Spec] = [] # This could be slightly faster done directly from yaml_list, # but this way is easier to maintain. for constraint_list in self.specs_as_constraints: spec = constraint_list[0].copy() for const in constraint_list[1:]: spec.constrain(const) if self._toolchains: expand_toolchains(spec, self._toolchains) specs.append(spec) self._specs = specs return self._specs
[docs] def add(self, spec: Spec): spec_str = str(spec) self.yaml_list.append(spec_str) # expanded list can be updated without invalidation if self.specs_as_yaml_list is not None: self.specs_as_yaml_list.append(spec_str) # Invalidate cache variables when we change the list self._constraints = None self._specs = None
[docs] def remove(self, spec): # Get spec to remove from list remove = [ s for s in self.yaml_list if (isinstance(s, str) and not s.startswith("$")) and Spec(s) == Spec(spec) ] if not remove: msg = f"Cannot remove {spec} from SpecList {self.name}.\n" msg += f"Either {spec} is not in {self.name} or {spec} is " msg += "expanded from a matrix and cannot be removed directly." raise SpecListError(msg) # Remove may contain more than one string representation of the same spec for item in remove: self.yaml_list.remove(item) self.specs_as_yaml_list.remove(item) # invalidate cache variables when we change the list self._constraints = None self._specs = None
[docs] def extend(self, other: "SpecList", copy_reference=True) -> None: self.yaml_list.extend(other.yaml_list) self.specs_as_yaml_list.extend(other.specs_as_yaml_list) self._constraints = None self._specs = None
def __len__(self): return len(self.specs) def __getitem__(self, key): return self.specs[key] def __iter__(self): return iter(self.specs)
def _expand_matrix_constraints(matrix_config): # recurse so we can handle nested matrices expanded_rows = [] for row in matrix_config["matrix"]: new_row = [] for r in row: if isinstance(r, dict): # Flatten the nested matrix into a single row of constraints new_row.extend( [ [" ".join([str(c) for c in expanded_constraint_list])] for expanded_constraint_list in _expand_matrix_constraints(r) ] ) else: new_row.append([r]) expanded_rows.append(new_row) excludes = matrix_config.get("exclude", []) # only compute once sigil = matrix_config.get("sigil", "") results = [] for combo in itertools.product(*expanded_rows): # Construct a combined spec to test against excludes flat_combo = [Spec(constraint) for constraints in combo for constraint in constraints] test_spec = flat_combo[0].copy() for constraint in flat_combo[1:]: test_spec.constrain(constraint) # Abstract variants don't have normal satisfaction semantics # Convert all variants to concrete types. # This method is best effort, so all existing variants will be # converted before any error is raised. # Catch exceptions because we want to be able to operate on # abstract specs without needing package information try: spack.spec.substitute_abstract_variants(test_spec) except spack.variant.UnknownVariantError: pass # Resolve abstract hashes for exclusion criteria if any(test_spec.lookup_hash().satisfies(x) for x in excludes): continue if sigil: flat_combo[0] = Spec(sigil + str(flat_combo[0])) # Add to list of constraints results.append(flat_combo) return results def _sigilify(item, sigil): if isinstance(item, dict): if sigil: item["sigil"] = sigil return item else: return sigil + item
[docs] class Definition(NamedTuple): name: str yaml_list: List[Union[str, Dict]] when: Optional[str]
[docs] class SpecListParser: """Parse definitions and user specs from data in environments""" def __init__(self, *, toolchains=None): self.definitions: Dict[str, SpecList] = {} self._toolchains = toolchains
[docs] def parse_definitions(self, *, data: List[Dict[str, Any]]) -> Dict[str, SpecList]: definitions_from_yaml: Dict[str, List[Definition]] = {} for item in data: value = self._parse_yaml_definition(item) definitions_from_yaml.setdefault(value.name, []).append(value) self.definitions = {} self._build_definitions(definitions_from_yaml) return self.definitions
[docs] def parse_user_specs(self, *, name, yaml_list) -> SpecList: definition = Definition(name=name, yaml_list=yaml_list, when=None) return self._speclist_from_definitions(name, [definition])
def _parse_yaml_definition(self, yaml_entry) -> Definition: when_string = yaml_entry.get("when") if (when_string and len(yaml_entry) > 2) or (not when_string and len(yaml_entry) > 1): mark = spack.util.spack_yaml.get_mark_from_yaml_data(yaml_entry) attributes = ", ".join(x for x in yaml_entry if x != "when") error_msg = f"definition must have a single attribute, got many: {attributes}" raise SpecListError(f"{mark.name}:{mark.line + 1}: {error_msg}") for name, yaml_list in yaml_entry.items(): if name == "when": continue return Definition(name=name, yaml_list=yaml_list, when=when_string) # If we are here, it means only "when" is in the entry mark = spack.util.spack_yaml.get_mark_from_yaml_data(yaml_entry) error_msg = "definition must have a single attribute, got none" raise SpecListError(f"{mark.name}:{mark.line + 1}: {error_msg}") def _build_definitions(self, definitions_from_yaml: Dict[str, List[Definition]]): for name, definitions in definitions_from_yaml.items(): self.definitions[name] = self._speclist_from_definitions(name, definitions) def _speclist_from_definitions(self, name, definitions) -> SpecList: combined_yaml_list = [] for def_part in definitions: if def_part.when is not None and not spack.spec.eval_conditional(def_part.when): continue combined_yaml_list.extend(def_part.yaml_list) expanded_list = self._expand_yaml_list(combined_yaml_list) return SpecList( name=name, yaml_list=combined_yaml_list, expanded_list=expanded_list, toolchains=self._toolchains, ) def _expand_yaml_list(self, raw_yaml_list): result = [] for item in raw_yaml_list: if isinstance(item, str) and item.startswith("$"): result.extend(self._expand_reference(item)) continue value = item if isinstance(item, dict): value = self._expand_yaml_matrix(item) result.append(value) return result def _expand_reference(self, item: str): sigil, name = "", item[1:] if name.startswith("^") or name.startswith("%"): sigil, name = name[0], name[1:] if name not in self.definitions: mark = spack.util.spack_yaml.get_mark_from_yaml_data(item) error_msg = f"trying to expand the name '{name}', which is not defined yet" raise UndefinedReferenceError(f"{mark.name}:{mark.line + 1}: {error_msg}") value = self.definitions[name].specs_as_yaml_list if not sigil: return value return [_sigilify(x, sigil) for x in value] def _expand_yaml_matrix(self, matrix_yaml): extra_attributes = set(matrix_yaml) - {"matrix", "exclude"} if extra_attributes: mark = spack.util.spack_yaml.get_mark_from_yaml_data(matrix_yaml) error_msg = f"extra attributes in spec matrix: {','.join(sorted(extra_attributes))}" raise SpecListError(f"{mark.name}:{mark.line + 1}: {error_msg}") if "matrix" not in matrix_yaml: mark = spack.util.spack_yaml.get_mark_from_yaml_data(matrix_yaml) error_msg = "matrix is missing the 'matrix' attribute" raise SpecListError(f"{mark.name}:{mark.line + 1}: {error_msg}") # Assume data has been validated against the YAML schema result = {"matrix": [self._expand_yaml_list(row) for row in matrix_yaml["matrix"]]} if "exclude" in matrix_yaml: result["exclude"] = matrix_yaml["exclude"] return result
[docs] class SpecListError(SpackError): """Error class for all errors related to SpecList objects."""
[docs] class UndefinedReferenceError(SpecListError): """Error class for undefined references in Spack stacks."""
[docs] class InvalidSpecConstraintError(SpecListError): """Error class for invalid spec constraints at concretize time."""