Source code for spack.binary_distribution

# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

import collections
import concurrent.futures
import contextlib
import copy
import datetime
import hashlib
import io
import itertools
import json
import os
import pathlib
import re
import shutil
import sys
import tarfile
import tempfile
import textwrap
import time
import urllib.error
import urllib.parse
import urllib.request
import warnings
from collections import defaultdict
from contextlib import closing
from typing import (
    IO,
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Mapping,
    NamedTuple,
    Optional,
    Set,
    Tuple,
    Union,
    cast,
)

import spack.caches
import spack.config
import spack.database
import spack.deptypes as dt
import spack.error
import spack.hash_types as ht
import spack.hooks
import spack.hooks.sbang
import spack.llnl.util.filesystem as fsys
import spack.llnl.util.lang
import spack.llnl.util.tty as tty
import spack.mirrors.mirror
import spack.oci.image
import spack.oci.oci
import spack.oci.opener
import spack.paths
import spack.platforms
import spack.relocate as relocate
import spack.spec
import spack.stage
import spack.store
import spack.user_environment
import spack.util.archive
import spack.util.crypto
import spack.util.file_cache as file_cache
import spack.util.gpg
import spack.util.parallel
import spack.util.path
import spack.util.spack_json as sjson
import spack.util.spack_yaml as syaml
import spack.util.timer as timer
import spack.util.url as url_util
import spack.util.web as web_util
from spack import traverse
from spack.llnl.util.filesystem import mkdirp
from spack.oci.image import (
    Digest,
    ImageReference,
    default_config,
    default_manifest,
    ensure_valid_tag,
)
from spack.oci.oci import (
    copy_missing_layers_with_retry,
    get_manifest_and_config_with_retry,
    list_tags,
    upload_blob_with_retry,
    upload_manifest_with_retry,
)
from spack.package_prefs import get_package_dir_permissions, get_package_group
from spack.relocate_text import utf8_paths_to_single_binary_regex
from spack.stage import Stage
from spack.util.executable import which

from .enums import InstallRecordStatus
from .url_buildcache import (
    CURRENT_BUILD_CACHE_LAYOUT_VERSION,
    BlobRecord,
    BuildcacheComponent,
    BuildcacheEntryError,
    BuildcacheManifest,
    InvalidMetadataFile,
    ListMirrorSpecsError,
    MirrorMetadata,
    URLBuildcacheEntry,
    get_entries_from_cache,
    get_url_buildcache_class,
    get_valid_spec_file,
)
from .vendor.typing_extensions import TypedDict


[docs] class BuildCacheDatabase(spack.database.Database): """A database for binary buildcaches. A database supports writing buildcache index files, in which case certain fields are not needed in each install record, and no locking is required. To use this feature, it provides ``lock_cfg=NO_LOCK``, and override the list of ``record_fields``. """ record_fields = ("spec", "ref_count", "in_buildcache") def __init__(self, root): super().__init__(root, lock_cfg=spack.database.NO_LOCK, layout=None) self._write_transaction_impl = spack.llnl.util.lang.nullcontext self._read_transaction_impl = spack.llnl.util.lang.nullcontext def _handle_old_db_versions_read(self, check, db, *, reindex: bool): if not self.is_readable(): raise spack.database.DatabaseNotReadableError( f"cannot read buildcache v{self.db_version} at {self.root}" ) return self._handle_current_version_read(check, db)
[docs] class FetchCacheError(Exception): """Error thrown when fetching the cache failed, usually a composite error list.""" def __init__(self, errors): if not isinstance(errors, list): raise TypeError("Expected a list of errors") self.errors = errors if len(errors) > 1: msg = " Error {0}: {1}: {2}" self.message = "Multiple errors during fetching:\n" self.message += "\n".join( ( msg.format(i + 1, err.__class__.__name__, str(err)) for (i, err) in enumerate(errors) ) ) else: err = errors[0] self.message = "{0}: {1}".format(err.__class__.__name__, str(err)) super().__init__(self.message)
class _MirrorIndexResult(NamedTuple): succeeded: bool regenerate: bool had_cache_entry: bool error: Optional[Exception] no_index: bool = False class _LastFetch(NamedTuple): time: float succeeded: bool class _LocalIndexCache(TypedDict, total=False): index_hash: str index_path: str etag: str
[docs] class BinaryIndexCache: """ The BinaryIndexCache tracks what specs are available on (usually remote) binary caches. This index is "best effort", in the sense that whenever we don't find what we're looking for here, we will attempt to fetch it directly from configured mirrors anyway. Thus, it has the potential to speed things up, but cache misses shouldn't break any spack functionality. At the moment, everything in this class is initialized as lazily as possible, so that it avoids slowing anything in spack down until absolutely necessary. """ def __init__(self, cache_root: Optional[str] = None): self._index_cache_root: str = cache_root or binary_index_location() # the key associated with the serialized _local_index_cache self._index_contents_key = "contents.json" # a FileCache instance storing copies of remote binary cache indices self._index_file_cache: file_cache.FileCache = file_cache.FileCache(self._index_cache_root) self._index_file_cache_initialized = False # stores a map of mirror URL and version layout to index hash and cache key (index path) self._local_index_cache: dict[str, _LocalIndexCache] = {} # hashes of remote indices already ingested into the concrete spec # cache (_mirrors_for_spec) self._specs_already_associated: Set[str] = set() # mapping from mirror urls to the time.time() of the last index fetch and a bool indicating # whether the fetch succeeded or not. self._last_fetch_times: Dict[MirrorMetadata, _LastFetch] = {} #: Dictionary mapping DAG hashes of specs to Spec objects self._known_specs: Dict[str, spack.spec.Spec] = {} #: Dictionary mapping DAG hashes of specs to a list of mirrors where they can be found self._mirrors_for_spec: Dict[str, Set[MirrorMetadata]] = defaultdict(set) #: URLs of binary mirrors that had no buildcache index during the last update() self.mirrors_without_index: Set[str] = set() def _init_local_index_cache(self): if not self._index_file_cache_initialized: cache_key = self._index_contents_key self._local_index_cache = {} with self._index_file_cache.read_transaction(cache_key) as cache_file: if cache_file is not None: self._local_index_cache = json.load(cache_file) self._index_file_cache_initialized = True def _write_local_index_cache(self): self._init_local_index_cache() cache_key = self._index_contents_key with self._index_file_cache.write_transaction(cache_key) as (old, new): json.dump(self._local_index_cache, new)
[docs] def regenerate_spec_cache(self, clear_existing=False): """Populate the local cache of concrete specs (``_mirrors_for_spec``) from the locally cached buildcache index files. This is essentially a no-op if it has already been done, as we keep track of the index hashes for which we have already associated the built specs.""" self._init_local_index_cache() if clear_existing: self._specs_already_associated = set() self._mirrors_for_spec = defaultdict(set) self._known_specs = {} for mirror_metadata in self._local_index_cache: cache_entry = self._local_index_cache[mirror_metadata] cached_index_path = cache_entry["index_path"] cached_index_hash = cache_entry["index_hash"] if cached_index_hash not in self._specs_already_associated: self._associate_built_specs_with_mirror( cached_index_path, MirrorMetadata.from_string(mirror_metadata) ) self._specs_already_associated.add(cached_index_hash)
def _associate_built_specs_with_mirror(self, cache_key, mirror_metadata: MirrorMetadata): with tempfile.TemporaryDirectory(dir=spack.stage.get_stage_root()) as tmpdir: db = BuildCacheDatabase(tmpdir) with self._index_file_cache.read_transaction(cache_key) as f: if f is not None: try: db._read_from_stream(f) except spack.database.InvalidDatabaseVersionError as e: tty.warn( "you need a newer Spack version to read the buildcache index for the " f"following v{mirror_metadata.version} mirror: " f"'{mirror_metadata.url}'. {e.database_version_message}" ) return spec_list = [ s for s in db.query_local(installed=InstallRecordStatus.ANY) # todo, make it easier to get install records associated with specs if s.external or db._data[s.dag_hash()].in_buildcache ] for spec in spec_list: dag_hash = spec.dag_hash() mirrors = self._mirrors_for_spec[dag_hash] mirrors.add(mirror_metadata.strip_view()) if dag_hash not in self._known_specs: self._known_specs[dag_hash] = spec
[docs] def get_all_built_specs(self) -> List[spack.spec.Spec]: """Returns a list of all concrete specs known to be available in a binary cache.""" return list(self._known_specs.values())
[docs] def find_built_spec(self, spec: spack.spec.Spec) -> List[MirrorMetadata]: """Returns a list of MirrorMetadata objects indicating which mirrors have the given concrete spec. This method does not trigger reading anything from remote mirrors, but rather just checks if the concrete spec is found within the cache. The cache can be updated by calling ``update()`` on the cache. Args: spec: Concrete spec to find """ return self.find_by_hash(spec.dag_hash())
[docs] def find_by_hash(self, dag_hash: str) -> List[MirrorMetadata]: """Same as find_built_spec but uses the hash of a spec. Args: dag_hash: hash of the spec to search """ return list(self._mirrors_for_spec.get(dag_hash, []))
[docs] def update_spec(self, spec: spack.spec.Spec, found_list: List[MirrorMetadata]) -> None: """Update the cache with a new list of mirrors for a given spec.""" spec_dag_hash = spec.dag_hash() if spec_dag_hash not in self._mirrors_for_spec: self._mirrors_for_spec[spec_dag_hash] = set(found_list) self._known_specs[spec_dag_hash] = spec else: current_list = self._mirrors_for_spec[spec_dag_hash] for new_entry in found_list: current_list.add(new_entry.strip_view())
[docs] def update(self, with_cooldown: bool = False) -> None: """Make sure local cache of buildcache index files is up to date. If the same mirrors are configured as the last time this was called and none of the remote buildcache indices have changed, calling this method will only result in fetching the index hash from each mirror to confirm it is the same as what is stored locally. Otherwise, the buildcache ``index.json`` and ``index.json.hash`` files are retrieved from each configured mirror and stored locally (both in memory and on disk under ``_index_cache_root``).""" self._init_local_index_cache() self.mirrors_without_index = set() supported_mirror_versions = { (m.fetch_url, m.fetch_view): m.supported_layout_versions for m in spack.mirrors.mirror.MirrorCollection(binary=True).values() } # If we have a cached index for a mirror which is no longer configured, remove it clear_cache, regenerate_cache = self._remove_stale_cache_entries(supported_mirror_versions) # Fetch or update the other indexes errors, all_failed = [], True for (url, view), versions in supported_mirror_versions.items(): result = self._fetch_mirror_index(url, view, versions=versions, cooldown=with_cooldown) if result.error: errors.append(result.error) if result.succeeded: all_failed = False if result.no_index: self.mirrors_without_index.add(url) regenerate_cache |= result.regenerate clear_cache |= result.regenerate and result.had_cache_entry self._write_local_index_cache() if supported_mirror_versions and all_failed: raise FetchCacheError(errors) if errors: warnings.warn( "The following issues were ignored while updating the indices of binary caches:\n" + str(FetchCacheError(errors)) ) if regenerate_cache: self.regenerate_spec_cache(clear_existing=clear_cache)
def _fetch_mirror_index( self, url: str, view: Optional[str], *, versions: List[int], cooldown: bool ) -> _MirrorIndexResult: """Fetches the index of a mirror, using a highest-version first approach, and returning after the first success. """ now = time.time() ttl = spack.config.CONFIG.get_config("config").get("binary_index_ttl", 600) for version in versions: meta = MirrorMetadata(url, version, view) cache_entry = self._local_index_cache.get(str(meta)) if cache_entry is not None and ( # Cache entry in cooldown cooldown and ttl > 0 and meta in self._last_fetch_times and now - self._last_fetch_times[meta].time < ttl ): return _MirrorIndexResult( succeeded=self._last_fetch_times[meta].succeeded, regenerate=False, had_cache_entry=True, error=None, ) try: regenerate = self._fetch_and_cache_index(meta, cache_entry=cache_entry or {}) self._last_fetch_times[meta] = _LastFetch(time=now, succeeded=True) return _MirrorIndexResult( succeeded=True, regenerate=regenerate, had_cache_entry=cache_entry is not None, error=None, ) except FetchIndexError as e: self._last_fetch_times[meta] = _LastFetch(time=now, succeeded=False) return _MirrorIndexResult( succeeded=False, regenerate=False, had_cache_entry=cache_entry is not None, error=e, ) except BuildcacheIndexNotExists: # Try next lower layout version self._last_fetch_times[meta] = _LastFetch(time=now, succeeded=False) continue # All versions reported no index found. Record it for concretization callers to warn. return _MirrorIndexResult( succeeded=True, regenerate=False, had_cache_entry=False, error=None, no_index=True ) def _remove_stale_cache_entries( self, supported_mirror_versions: Dict[Tuple[str, Any], List[int]] ) -> Tuple[bool, bool]: items_to_remove = [] clear, regenerate = False, not self._mirrors_for_spec for local_index_key in self._local_index_cache: meta = MirrorMetadata.from_string(local_index_key) if meta.version not in supported_mirror_versions.get((meta.url, meta.view), ()): index_file_key = self._local_index_cache[local_index_key]["index_path"] items_to_remove.append((local_index_key, index_file_key, meta)) clear, regenerate = True, True for local_index_key, index_file_key, meta in items_to_remove: self._last_fetch_times.pop(meta, None) self._index_file_cache.remove(index_file_key) del self._local_index_cache[local_index_key] return clear, regenerate def _fetch_and_cache_index(self, mirror_metadata: MirrorMetadata, cache_entry={}): """Fetch a buildcache index file from a remote mirror and cache it. If we already have a cached index from this mirror, then we first check if the hash has changed, and we avoid fetching it if not. Args: mirror_metadata: Contains mirror base url and target binary cache layout version cache_entry (dict): Old cache metadata with keys ``index_hash``, ``index_path``, ``etag`` Returns: True if the local index.json was updated. Throws: FetchIndexError BuildcacheIndexNotExists """ mirror_url = mirror_metadata.url mirror_view = mirror_metadata.view layout_version = mirror_metadata.version # TODO: get rid of this request, handle 404 better scheme = urllib.parse.urlparse(mirror_url).scheme if scheme != "oci": cache_class = get_url_buildcache_class(layout_version=layout_version) index_url = cache_class.get_index_url(mirror_url, mirror_view) if not web_util.url_exists(index_url): raise BuildcacheIndexNotExists(f"Index not found in cache {index_url}") fetcher: IndexHandler = get_index_fetcher(scheme, mirror_metadata, cache_entry) result = fetcher.conditional_fetch() # Nothing to do if result.fresh: return False # Persist new index.json url_hash = compute_hash(str(mirror_metadata)) cache_key = "{}_{}.json".format(url_hash[:10], result.hash[:10]) with self._index_file_cache.write_transaction(cache_key) as (old, new): new.write(result.data) self._local_index_cache[str(mirror_metadata)] = { "index_hash": result.hash, "index_path": cache_key, "etag": result.etag, } # clean up the old cache_key if necessary old_cache_key = cache_entry.get("index_path", None) if old_cache_key: self._index_file_cache.remove(old_cache_key) # We fetched an index and updated the local index cache, we should # regenerate the spec cache as a result. return True
[docs] def binary_index_location(): """Set up a BinaryIndexCache for remote buildcache dbs in the user's homedir.""" cache_root = os.path.join(spack.caches.misc_cache_location(), "indices") return spack.util.path.canonicalize_path(cache_root)
#: Default binary cache index instance BINARY_INDEX = cast(BinaryIndexCache, spack.llnl.util.lang.Singleton(BinaryIndexCache))
[docs] def compute_hash(data): if isinstance(data, str): data = data.encode("utf-8") return hashlib.sha256(data).hexdigest()
[docs] def buildinfo_file_name(prefix): """Filename of the binary package meta-data file""" return os.path.join(prefix, ".spack", "binary_distribution")
[docs] def read_buildinfo_file(prefix): """Read buildinfo file""" with open(buildinfo_file_name(prefix), "r", encoding="utf-8") as f: return syaml.load(f)
[docs] def file_matches(f: IO[bytes], regex: spack.llnl.util.lang.PatternBytes) -> bool: try: return bool(regex.search(f.read())) finally: f.seek(0)
[docs] def specs_to_relocate(spec: spack.spec.Spec) -> List[spack.spec.Spec]: """Return the set of specs that may be referenced in the install prefix of the provided spec. We currently include non-external transitive link and direct run dependencies.""" specs = [ s for s in itertools.chain( spec.traverse(root=True, deptype="link", order="breadth", key=traverse.by_dag_hash), spec.dependencies(deptype="run"), ) if not s.external ] return list(spack.llnl.util.lang.dedupe(specs, key=lambda s: s.dag_hash()))
[docs] def get_buildinfo_dict(spec): """Create metadata for a tarball""" return { "sbang_install_path": spack.hooks.sbang.sbang_install_path(), "buildpath": spack.store.STORE.layout.root, "spackprefix": spack.paths.prefix, "relative_prefix": os.path.relpath(spec.prefix, spack.store.STORE.layout.root), # "relocate_textfiles": [], # "relocate_binaries": [], # "relocate_links": [], "hardlinks_deduped": True, "hash_to_prefix": {d.dag_hash(): str(d.prefix) for d in specs_to_relocate(spec)}, }
[docs] def buildcache_relative_keys_path(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION): cache_class = get_url_buildcache_class(layout_version=layout_version) return os.path.join(*cache_class.get_relative_path_components(BuildcacheComponent.KEY))
[docs] def buildcache_relative_keys_url(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION): cache_class = get_url_buildcache_class(layout_version=layout_version) return url_util.join(*cache_class.get_relative_path_components(BuildcacheComponent.KEY))
[docs] def buildcache_relative_specs_path(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION): cache_class = get_url_buildcache_class(layout_version=layout_version) return os.path.join(*cache_class.get_relative_path_components(BuildcacheComponent.SPEC))
[docs] def buildcache_relative_specs_url(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION): cache_class = get_url_buildcache_class(layout_version=layout_version) return url_util.join(*cache_class.get_relative_path_components(BuildcacheComponent.SPEC))
[docs] def buildcache_relative_blobs_path(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION): cache_class = get_url_buildcache_class(layout_version=layout_version) return os.path.join(*cache_class.get_relative_path_components(BuildcacheComponent.BLOB))
[docs] def buildcache_relative_blobs_url(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION): cache_class = get_url_buildcache_class(layout_version=layout_version) return url_util.join(*cache_class.get_relative_path_components(BuildcacheComponent.BLOB))
[docs] def buildcache_relative_index_path(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION): cache_class = get_url_buildcache_class(layout_version=layout_version) return os.path.join(*cache_class.get_relative_path_components(BuildcacheComponent.INDEX))
[docs] def buildcache_relative_index_url(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION): cache_class = get_url_buildcache_class(layout_version=layout_version) return url_util.join(*cache_class.get_relative_path_components(BuildcacheComponent.INDEX))
[docs] @spack.llnl.util.lang.memoized def warn_v2_layout(mirror_url: str, action: str) -> bool: lines = textwrap.wrap( f"{action} from a v2 binary mirror layout, located at " f"{mirror_url} is deprecated. Support for this will be " "removed in a future version of spack. " "If you manage the buildcache please consider running:", width=72, subsequent_indent=" ", ) lines.extend( [ " 'spack buildcache migrate'", " or rebuilding the specs in this mirror. Otherwise, consider running:", " 'spack mirror list'", " 'spack mirror remove <name>'", " with the <name> for the mirror url shown in the list.", ] ) tty.warn("\n".join(lines)) return True
[docs] def select_signing_key() -> str: keys = spack.util.gpg.signing_keys() num = len(keys) if num > 1: raise PickKeyException(str(keys)) elif num == 0: raise NoKeyException( "No default key available for signing.\n" "Use spack gpg init and spack gpg create" " to create a default key." ) return keys[0]
def _push_index(db: BuildCacheDatabase, temp_dir: str, cache_prefix: str, name: str = ""): """Generate the index, compute its hash, and push the files to the mirror""" index_json_path = os.path.join(temp_dir, spack.database.INDEX_JSON_FILE) with open(index_json_path, "w", encoding="utf-8") as f: db._write_to_file(f) cache_class = get_url_buildcache_class(layout_version=CURRENT_BUILD_CACHE_LAYOUT_VERSION) cache_class.push_local_file_as_blob( index_json_path, cache_prefix, url_util.join(name, "index") if name else "index", BuildcacheComponent.INDEX, compression="none", ) cache_class.maybe_push_layout_json(cache_prefix) def _read_specs_and_push_index( file_list: List[str], read_method: Callable[[str], URLBuildcacheEntry], name: str, filter_fn: Callable[[str], bool], cache_prefix: str, db: BuildCacheDatabase, temp_dir: str, *, timer=timer.NULL_TIMER, ): """Read listed specs, generate the index, and push it to the mirror. Args: file_list: List of urls or file paths pointing at spec files to read read_method: A function taking a single argument, either a url or a file path, and which reads the spec file at that location, and returns the spec. cache_prefix: prefix of the build cache on s3 where index should be pushed. db: A spack database used for adding specs and then writing the index. temp_dir: Location to write index.json and hash for pushing """ with timer.measure("read"): for file in file_list: # All supported versions of build caches put the hash as the last # parameter before the extension try: x = file.split("/")[-1].split("-")[-1].split(".")[0] except IndexError: raise GenerateIndexError(f"Malformed metadata file name detected {file}") if not filter_fn(x): continue cache_entry: Optional[URLBuildcacheEntry] = None try: cache_entry = read_method(file) spec_dict = cache_entry.fetch_metadata() fetched_spec = spack.spec.Spec.from_dict(spec_dict) except Exception as e: tty.warn(f"Unable to fetch spec for manifest {file} due to: {e}") continue finally: if cache_entry: cache_entry.destroy() db.add(fetched_spec) db.mark(fetched_spec, "in_buildcache", True) with timer.measure("push"): _push_index(db, temp_dir, cache_prefix, name) def _url_generate_package_index( url: str, tmpdir: str, db: Optional[BuildCacheDatabase] = None, name: str = "", filter_fn: Callable[[str], bool] = lambda x: True, *, timer=timer.NULL_TIMER, ): """Create or replace the build cache index on the given mirror. The buildcache index contains an entry for each binary package under the cache_prefix. Args: url: Base url of binary mirror. Return: None """ with tempfile.TemporaryDirectory(dir=spack.stage.get_stage_root()) as tmpspecsdir: try: with timer.measure("list"): filename_to_mtime_mapping, read_fn = get_entries_from_cache( url, tmpspecsdir, component_type=BuildcacheComponent.SPEC ) file_list = list(filename_to_mtime_mapping.keys()) except ListMirrorSpecsError as e: raise GenerateIndexError(f"Unable to generate package index: {e}") from e tty.debug(f"Retrieving spec descriptor files from {url} to build index") if not db: db = BuildCacheDatabase(tmpdir) db._write() try: _read_specs_and_push_index( file_list, read_fn, name, filter_fn, url, db, str(db.database_directory), timer=timer, ) except Exception as e: raise GenerateIndexError( f"Encountered problem pushing package index to {url}: {e}" ) from e
[docs] def generate_key_index(mirror_url: str, tmpdir: str) -> None: """Create the key index page. Creates (or replaces) the ``index.json`` page at the location given in mirror_url. This page contains an entry for each key under mirror_url. """ tty.debug(f"Retrieving key.pub files from {url_util.format(mirror_url)} to build key index") key_prefix = url_util.join(mirror_url, buildcache_relative_keys_url()) try: fingerprints = ( entry[:-18] for entry in web_util.list_url(key_prefix, recursive=False) if entry.endswith(".key.manifest.json") ) except Exception as e: raise CannotListKeys(f"Encountered problem listing keys at {key_prefix}: {e}") from e target = os.path.join(tmpdir, "index.json") index = {"keys": dict((fingerprint, {}) for fingerprint in sorted(set(fingerprints)))} with open(target, "w", encoding="utf-8") as f: sjson.dump(index, f) cache_class = get_url_buildcache_class() try: cache_class.push_local_file_as_blob( local_file_path=target, mirror_url=mirror_url, manifest_name="keys", component_type=BuildcacheComponent.KEY_INDEX, compression="none", ) cache_class.maybe_push_layout_json(mirror_url) except Exception as e: raise GenerateIndexError( f"Encountered problem pushing key index to {key_prefix}: {e}" ) from e
[docs] class FileTypes: BINARY = 0 TEXT = 1 UNKNOWN = 2
NOT_ISO8859_1_TEXT = re.compile(b"[\x00\x7f-\x9f]")
[docs] def file_type(f: IO[bytes]) -> int: try: # first check if this is an ELF or mach-o binary. magic = f.read(8) if len(magic) < 8: return FileTypes.UNKNOWN elif relocate.is_elf_magic(magic) or relocate.is_macho_magic(magic): return FileTypes.BINARY f.seek(0) # Then try utf-8, which has a fast exponential decay in false positive rate with file size. # Use chunked reads for fast early exit. f_txt = io.TextIOWrapper(f, encoding="utf-8", errors="strict") try: while f_txt.read(1024): pass return FileTypes.TEXT except UnicodeError: f_txt.seek(0) pass finally: f_txt.detach() # Finally try iso-8859-1 heuristically. In Python, all possible 256 byte values are valid. # We classify it as text if it does not contain any control characters / null bytes. data = f.read(1024) while data: if NOT_ISO8859_1_TEXT.search(data): break data = f.read(1024) else: return FileTypes.TEXT return FileTypes.UNKNOWN finally: f.seek(0)
[docs] def tarfile_of_spec_prefix( tar: tarfile.TarFile, prefix: str, prefixes_to_relocate: List[str] ) -> dict: """Create a tarfile of an install prefix of a spec. Skips existing buildinfo file. Args: tar: tarfile object to add files to prefix: absolute install prefix of spec""" if not os.path.isabs(prefix) or not os.path.isdir(prefix): raise ValueError(f"prefix '{prefix}' must be an absolute path to a directory") stat_key = lambda stat: (stat.st_dev, stat.st_ino) try: # skip buildinfo file if it exists files_to_skip = [stat_key(os.lstat(buildinfo_file_name(prefix)))] skip = lambda entry: stat_key(entry.stat(follow_symlinks=False)) in files_to_skip except OSError: skip = lambda entry: False binary_regex = utf8_paths_to_single_binary_regex(prefixes_to_relocate) relocate_binaries = [] relocate_links = [] relocate_textfiles = [] # use callbacks to add files and symlinks, so we can register which files need relocation upon # extraction. def add_file(tar: tarfile.TarFile, info: tarfile.TarInfo, path: str): with open(path, "rb") as f: relpath = os.path.relpath(path, prefix) # no need to relocate anything in the .spack directory if relpath.split(os.sep, 1)[0] == ".spack": tar.addfile(info, f) return f_type = file_type(f) if f_type == FileTypes.BINARY: relocate_binaries.append(os.path.relpath(path, prefix)) elif f_type == FileTypes.TEXT and file_matches(f, binary_regex): relocate_textfiles.append(os.path.relpath(path, prefix)) tar.addfile(info, f) def add_symlink(tar: tarfile.TarFile, info: tarfile.TarInfo, path: str): if os.path.isabs(info.linkname) and binary_regex.match(info.linkname.encode("utf-8")): relocate_links.append(os.path.relpath(path, prefix)) tar.addfile(info) spack.util.archive.reproducible_tarfile_from_prefix( tar, prefix, # Spack <= 0.21 did not include parent directories, leading to issues when tarballs are # used in runtimes like AWS lambda. include_parent_directories=True, skip=skip, add_file=add_file, add_symlink=add_symlink, ) return { "relocate_binaries": relocate_binaries, "relocate_links": relocate_links, "relocate_textfiles": relocate_textfiles, }
[docs] def create_tarball(spec: spack.spec.Spec, tarfile_path: str) -> Tuple[str, str]: """Create a tarball of a spec and return the checksums of the compressed tarfile and the uncompressed tarfile.""" return _do_create_tarball( tarfile_path, spec.prefix, buildinfo=get_buildinfo_dict(spec), prefixes_to_relocate=prefixes_to_relocate(spec), )
def _do_create_tarball( tarfile_path: str, prefix: str, buildinfo: dict, prefixes_to_relocate: List[str] ) -> Tuple[str, str]: with spack.util.archive.gzip_compressed_tarfile(tarfile_path) as ( tar, tar_gz_checksum, tar_checksum, ): # Tarball the install prefix files_to_relocate = tarfile_of_spec_prefix(tar, prefix, prefixes_to_relocate) buildinfo.update(files_to_relocate) # Serialize buildinfo for the tarball bstring = syaml.dump(buildinfo, default_flow_style=True).encode("utf-8") tarinfo = tarfile.TarInfo( name=spack.util.archive.default_path_to_name(buildinfo_file_name(prefix)) ) tarinfo.type = tarfile.REGTYPE tarinfo.size = len(bstring) tarinfo.mode = 0o644 tar.addfile(tarinfo, io.BytesIO(bstring)) return tar_gz_checksum.hexdigest(), tar_checksum.hexdigest() def _exists_in_buildcache( spec: spack.spec.Spec, out_url: str, allow_unsigned: bool = False ) -> URLBuildcacheEntry: """creates and returns (after checking existence) a URLBuildcacheEntry""" cache_type = get_url_buildcache_class(CURRENT_BUILD_CACHE_LAYOUT_VERSION) cache_entry = cache_type(out_url, spec, allow_unsigned=allow_unsigned) return cache_entry
[docs] def prefixes_to_relocate(spec): prefixes = [s.prefix for s in specs_to_relocate(spec)] prefixes.append(spack.hooks.sbang.sbang_install_path()) prefixes.append(str(spack.store.STORE.layout.root)) return prefixes
def _url_upload_tarball_and_specfile( spec: spack.spec.Spec, tmpdir: str, cache_entry: URLBuildcacheEntry, signing_key: Optional[str] ): tarball = os.path.join(tmpdir, f"{spec.dag_hash()}.tar.gz") checksum, _ = create_tarball(spec, tarball) cache_entry.push_binary_package(spec, tarball, "sha256", checksum, tmpdir, signing_key)
[docs] class Uploader: def __init__(self, mirror: spack.mirrors.mirror.Mirror, force: bool, update_index: bool): self.mirror = mirror self.force = force self.update_index = update_index self.tmpdir: str self.executor: concurrent.futures.Executor # Verify if the mirror meets the requirements to push self.mirror.ensure_mirror_usable("push") def __enter__(self): self._tmpdir = tempfile.TemporaryDirectory(dir=spack.stage.get_stage_root()) self._executor = spack.util.parallel.make_concurrent_executor() self.tmpdir = self._tmpdir.__enter__() self.executor = self.executor = self._executor.__enter__() return self def __exit__(self, *args): self._executor.__exit__(*args) self._tmpdir.__exit__(*args)
[docs] def push_or_raise(self, specs: List[spack.spec.Spec]) -> List[spack.spec.Spec]: skipped, errors = self.push(specs) if errors: raise PushToBuildCacheError( f"Failed to push {len(errors)} specs to {self.mirror.push_url}:\n" + "\n".join( f"Failed to push {_format_spec(spec)}: {error}" for spec, error in errors ) ) return skipped
[docs] def push( self, specs: List[spack.spec.Spec] ) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]: raise NotImplementedError
[docs] def tag(self, tag: str, roots: List[spack.spec.Spec]): """Make a list of selected specs together available under the given tag""" pass
[docs] class OCIUploader(Uploader): def __init__( self, mirror: spack.mirrors.mirror.Mirror, force: bool, update_index: bool, base_image: Optional[str], ) -> None: super().__init__(mirror, force, update_index) self.target_image = spack.oci.oci.image_from_mirror(mirror) self.base_image = ImageReference.from_string(base_image) if base_image else None
[docs] def push( self, specs: List[spack.spec.Spec] ) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]: skipped, base_images, checksums, upload_errors = _oci_push( target_image=self.target_image, base_image=self.base_image, installed_specs_with_deps=specs, force=self.force, tmpdir=self.tmpdir, executor=self.executor, ) self._base_images = base_images self._checksums = checksums # only update index if any binaries were uploaded if self.update_index and len(skipped) + len(upload_errors) < len(specs): _oci_update_index(self.target_image, self.tmpdir, self.executor) return skipped, upload_errors
[docs] def tag(self, tag: str, roots: List[spack.spec.Spec]): tagged_image = self.target_image.with_tag(tag) # _push_oci may not populate self._base_images if binaries were already in the registry for spec in roots: _oci_update_base_images( base_image=self.base_image, target_image=self.target_image, spec=spec, base_image_cache=self._base_images, ) _oci_put_manifest( self._base_images, self._checksums, tagged_image, self.tmpdir, None, None, *roots ) tty.info(f"Tagged {tagged_image}")
[docs] class URLUploader(Uploader): def __init__( self, mirror: spack.mirrors.mirror.Mirror, force: bool, update_index: bool, signing_key: Optional[str], ) -> None: super().__init__(mirror, force, update_index) self.url = mirror.push_url self.signing_key = signing_key
[docs] def push( self, specs: List[spack.spec.Spec] ) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]: return _url_push( specs, out_url=self.url, force=self.force, update_index=self.update_index, signing_key=self.signing_key, tmpdir=self.tmpdir, executor=self.executor, )
[docs] def make_uploader( mirror: spack.mirrors.mirror.Mirror, force: bool = False, update_index: bool = False, signing_key: Optional[str] = None, base_image: Optional[str] = None, ) -> Uploader: """Builder for the appropriate uploader based on the mirror type""" if spack.oci.image.is_oci_url(mirror.push_url): return OCIUploader( mirror=mirror, force=force, update_index=update_index, base_image=base_image ) else: return URLUploader( mirror=mirror, force=force, update_index=update_index, signing_key=signing_key )
def _format_spec(spec: spack.spec.Spec) -> str: return spec.cformat("{name}{@version}{/hash:7}")
[docs] class FancyProgress: def __init__(self, total: int): self.n = 0 self.total = total self.running = False self.enable = sys.stdout.isatty() self.pretty_spec: str = "" self.pre = "" def _clear(self): if self.enable and self.running: sys.stdout.write("\033[F\033[K") def _progress(self): if self.total > 1: digits = len(str(self.total)) return f"[{self.n:{digits}}/{self.total}] " return ""
[docs] def start(self, spec: spack.spec.Spec, running: bool) -> None: self.n += 1 self.running = running self.pre = self._progress() self.pretty_spec = _format_spec(spec) if self.enable and self.running: tty.info(f"{self.pre}Pushing {self.pretty_spec}...")
[docs] def ok(self, msg: Optional[str] = None) -> None: self._clear() msg = msg or f"Pushed {self.pretty_spec}" tty.info(f"{self.pre}{msg}")
[docs] def fail(self) -> None: self._clear() tty.info(f"{self.pre}Failed to push {self.pretty_spec}")
def _url_push( specs: List[spack.spec.Spec], out_url: str, signing_key: Optional[str], force: bool, update_index: bool, tmpdir: str, executor: concurrent.futures.Executor, ) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]: """Pushes to the provided build cache, and returns a list of skipped specs that were already present (when force=False), and a list of errors. Does not raise on error.""" skipped: List[spack.spec.Spec] = [] errors: List[Tuple[spack.spec.Spec, BaseException]] = [] exists_futures = [ executor.submit( _exists_in_buildcache, spec, out_url, allow_unsigned=False if signing_key else True ) for spec in specs ] cache_entries = { spec.dag_hash(): exists_future.result() for spec, exists_future in zip(specs, exists_futures) } if not force: specs_to_upload = [] for spec in specs: if cache_entries[spec.dag_hash()].exists( [BuildcacheComponent.SPEC, BuildcacheComponent.TARBALL] ): skipped.append(spec) else: specs_to_upload.append(spec) else: specs_to_upload = specs if not specs_to_upload: return skipped, errors total = len(specs_to_upload) if total != len(specs): tty.info(f"{total} specs need to be pushed to {out_url}") upload_futures = [ executor.submit( _url_upload_tarball_and_specfile, spec, tmpdir, cache_entries[spec.dag_hash()], signing_key, ) for spec in specs_to_upload ] uploaded_any = False fancy_progress = FancyProgress(total) for spec, upload_future in zip(specs_to_upload, upload_futures): fancy_progress.start(spec, upload_future.running()) error = upload_future.exception() if error is None: uploaded_any = True fancy_progress.ok() else: fancy_progress.fail() errors.append((spec, error)) # don't bother pushing keys / index if all failed to upload if not uploaded_any: return skipped, errors # If the layout.json doesn't yet exist on this mirror, push it cache_class = get_url_buildcache_class(layout_version=CURRENT_BUILD_CACHE_LAYOUT_VERSION) cache_class.maybe_push_layout_json(out_url) if signing_key: keys_tmpdir = os.path.join(tmpdir, "keys") os.mkdir(keys_tmpdir) _url_push_keys(out_url, keys=[signing_key], update_index=update_index, tmpdir=keys_tmpdir) if update_index: index_tmpdir = os.path.join(tmpdir, "index") os.mkdir(index_tmpdir) _url_generate_package_index(out_url, index_tmpdir) return skipped, errors def _oci_upload_success_msg(spec: spack.spec.Spec, digest: Digest, size: int, elapsed: float): elapsed = max(elapsed, 0.001) # guard against division by zero return ( f"Pushed {_format_spec(spec)}: {digest} ({elapsed:.2f}s, " f"{size / elapsed / 1024 / 1024:.2f} MB/s)" ) def _oci_get_blob_info(image_ref: ImageReference) -> Optional[spack.oci.oci.Blob]: """Get the spack tarball layer digests and size if it exists""" try: manifest, config = get_manifest_and_config_with_retry(image_ref) return spack.oci.oci.Blob( compressed_digest=Digest.from_string(manifest["layers"][-1]["digest"]), uncompressed_digest=Digest.from_string(config["rootfs"]["diff_ids"][-1]), size=manifest["layers"][-1]["size"], ) except Exception: return None def _oci_push_pkg_blob( image_ref: ImageReference, spec: spack.spec.Spec, tmpdir: str ) -> Tuple[spack.oci.oci.Blob, float]: """Push a package blob to the registry and return the blob info and the time taken""" filename = os.path.join(tmpdir, f"{spec.dag_hash()}.tar.gz") # Create an oci.image.layer aka tarball of the package tar_gz_checksum, tar_checksum = create_tarball(spec, filename) blob = spack.oci.oci.Blob( Digest.from_sha256(tar_gz_checksum), Digest.from_sha256(tar_checksum), os.path.getsize(filename), ) # Upload the blob start = time.time() upload_blob_with_retry(image_ref, file=filename, digest=blob.compressed_digest) elapsed = time.time() - start # delete the file os.unlink(filename) return blob, elapsed def _oci_retrieve_env_dict_from_config(config: dict) -> dict: """Retrieve the environment variables from the image config file. Sets a default value for PATH if it is not present. Args: config (dict): The image config file. Returns: dict: The environment variables. """ env = {"PATH": "/bin:/usr/bin"} if "Env" in config.get("config", {}): for entry in config["config"]["Env"]: key, value = entry.split("=", 1) env[key] = value return env def _oci_archspec_to_gooarch(spec: spack.spec.Spec) -> str: name = spec.target.family.name name_map = {"aarch64": "arm64", "x86_64": "amd64"} return name_map.get(name, name) def _oci_put_manifest( base_images: Dict[str, Tuple[dict, dict]], checksums: Dict[str, spack.oci.oci.Blob], image_ref: ImageReference, tmpdir: str, extra_config: Optional[dict], annotations: Optional[dict], *specs: spack.spec.Spec, ): architecture = _oci_archspec_to_gooarch(specs[0]) expected_blobs: List[spack.spec.Spec] = [ s for s in traverse.traverse_nodes(specs, order="topo", deptype=("link", "run"), root=True) if not s.external ] expected_blobs.reverse() base_manifest, base_config = base_images[architecture] env = _oci_retrieve_env_dict_from_config(base_config) # If the base image uses `vnd.docker.distribution.manifest.v2+json`, then we use that too. # This is because Singularity / Apptainer is very strict about not mixing them. base_manifest_mediaType = base_manifest.get( "mediaType", "application/vnd.oci.image.manifest.v1+json" ) use_docker_format = ( base_manifest_mediaType == "application/vnd.docker.distribution.manifest.v2+json" ) spack.user_environment.environment_modifications_for_specs(*specs).apply_modifications(env) # Create an oci.image.config file config = copy.deepcopy(base_config) # Add the diff ids of the blobs for s in expected_blobs: # If a layer for a dependency has gone missing (due to removed manifest in the registry, a # failed push, or a local forced uninstall), we cannot create a runnable container image. checksum = checksums.get(s.dag_hash()) if checksum: config["rootfs"]["diff_ids"].append(str(checksum.uncompressed_digest)) # Set the environment variables config["config"]["Env"] = [f"{k}={v}" for k, v in env.items()] if extra_config: # From the OCI v1.0 spec: # > Any extra fields in the Image JSON struct are considered implementation # > specific and MUST be ignored by any implementations which are unable to # > interpret them. config.update(extra_config) config_file = os.path.join(tmpdir, f"{specs[0].dag_hash()}.config.json") with open(config_file, "w", encoding="utf-8") as f: json.dump(config, f, separators=(",", ":")) config_file_checksum = Digest.from_sha256( spack.util.crypto.checksum(hashlib.sha256, config_file) ) # Upload the config file upload_blob_with_retry(image_ref, file=config_file, digest=config_file_checksum) manifest = { "mediaType": base_manifest_mediaType, "schemaVersion": 2, "config": { "mediaType": base_manifest["config"]["mediaType"], "digest": str(config_file_checksum), "size": os.path.getsize(config_file), }, "layers": [ *(layer for layer in base_manifest["layers"]), *( { "mediaType": ( "application/vnd.docker.image.rootfs.diff.tar.gzip" if use_docker_format else "application/vnd.oci.image.layer.v1.tar+gzip" ), "digest": str(checksums[s.dag_hash()].compressed_digest), "size": checksums[s.dag_hash()].size, } for s in expected_blobs if s.dag_hash() in checksums ), ], } if not use_docker_format and annotations: manifest["annotations"] = annotations # Finally upload the manifest upload_manifest_with_retry(image_ref, manifest=manifest) # delete the config file os.unlink(config_file) def _oci_update_base_images( *, base_image: Optional[ImageReference], target_image: ImageReference, spec: spack.spec.Spec, base_image_cache: Dict[str, Tuple[dict, dict]], ): """For a given spec and base image, copy the missing layers of the base image with matching arch to the registry of the target image. If no base image is specified, create a dummy manifest and config file.""" architecture = _oci_archspec_to_gooarch(spec) if architecture in base_image_cache: return if base_image is None: base_image_cache[architecture] = ( default_manifest(), default_config(architecture, "linux"), ) else: base_image_cache[architecture] = copy_missing_layers_with_retry( base_image, target_image, architecture ) def _oci_default_tag(spec: spack.spec.Spec) -> str: """Return a valid, default image tag for a spec.""" return ensure_valid_tag(f"{spec.name}-{spec.version}-{spec.dag_hash()}.spack") #: Default OCI index tag default_index_tag = "index.spack"
[docs] def tag_is_spec(tag: str) -> bool: """Check if a tag is likely a Spec""" return tag.endswith(".spack") and tag != default_index_tag
def _oci_push( *, target_image: ImageReference, base_image: Optional[ImageReference], installed_specs_with_deps: List[spack.spec.Spec], tmpdir: str, executor: concurrent.futures.Executor, force: bool = False, ) -> Tuple[ List[spack.spec.Spec], Dict[str, Tuple[dict, dict]], Dict[str, spack.oci.oci.Blob], List[Tuple[spack.spec.Spec, BaseException]], ]: # Spec dag hash -> blob checksums: Dict[str, spack.oci.oci.Blob] = {} # arch -> (manifest, config) base_images: Dict[str, Tuple[dict, dict]] = {} # Specs not uploaded because they already exist skipped: List[spack.spec.Spec] = [] if not force: tty.info("Checking for existing specs in the buildcache") blobs_to_upload = [] tags_to_check = ( target_image.with_tag(_oci_default_tag(s)) for s in installed_specs_with_deps ) available_blobs = executor.map(_oci_get_blob_info, tags_to_check) for spec, maybe_blob in zip(installed_specs_with_deps, available_blobs): if maybe_blob is not None: checksums[spec.dag_hash()] = maybe_blob skipped.append(spec) else: blobs_to_upload.append(spec) else: blobs_to_upload = installed_specs_with_deps if not blobs_to_upload: return skipped, base_images, checksums, [] if len(blobs_to_upload) != len(installed_specs_with_deps): tty.info( f"{len(blobs_to_upload)} specs need to be pushed to " f"{target_image.domain}/{target_image.name}" ) blob_progress = FancyProgress(len(blobs_to_upload)) # Upload blobs blob_futures = [ executor.submit(_oci_push_pkg_blob, target_image, spec, tmpdir) for spec in blobs_to_upload ] manifests_to_upload: List[spack.spec.Spec] = [] errors: List[Tuple[spack.spec.Spec, BaseException]] = [] # And update the spec to blob mapping for successful uploads for spec, blob_future in zip(blobs_to_upload, blob_futures): blob_progress.start(spec, blob_future.running()) error = blob_future.exception() if error is None: blob, elapsed = blob_future.result() blob_progress.ok( _oci_upload_success_msg(spec, blob.compressed_digest, blob.size, elapsed) ) manifests_to_upload.append(spec) checksums[spec.dag_hash()] = blob else: blob_progress.fail() errors.append((spec, error)) # Copy base images if necessary for spec in manifests_to_upload: _oci_update_base_images( base_image=base_image, target_image=target_image, spec=spec, base_image_cache=base_images, ) def extra_config(spec: spack.spec.Spec): spec_dict = spec.to_dict(hash=ht.dag_hash) spec_dict["buildcache_layout_version"] = CURRENT_BUILD_CACHE_LAYOUT_VERSION spec_dict["binary_cache_checksum"] = { "hash_algorithm": "sha256", "hash": checksums[spec.dag_hash()].compressed_digest.digest, } spec_dict["archive_size"] = checksums[spec.dag_hash()].size spec_dict["archive_timestamp"] = datetime.datetime.now().astimezone().isoformat() spec_dict["archive_compression"] = "gzip" return spec_dict # Upload manifests tty.info("Uploading manifests") manifest_futures = [ executor.submit( _oci_put_manifest, base_images, checksums, target_image.with_tag(_oci_default_tag(spec)), tmpdir, extra_config(spec), {"org.opencontainers.image.description": spec.format()}, spec, ) for spec in manifests_to_upload ] manifest_progress = FancyProgress(len(manifests_to_upload)) # Print the image names of the top-level specs for spec, manifest_future in zip(manifests_to_upload, manifest_futures): error = manifest_future.exception() manifest_progress.start(spec, manifest_future.running()) if error is None: manifest_progress.ok( f"Tagged {_format_spec(spec)} as {target_image.with_tag(_oci_default_tag(spec))}" ) else: manifest_progress.fail() errors.append((spec, error)) return skipped, base_images, checksums, errors def _oci_config_from_tag(image_ref_and_tag: Tuple[ImageReference, str]) -> Optional[dict]: image_ref, tag = image_ref_and_tag # Don't allow recursion here, since Spack itself always uploads # vnd.oci.image.manifest.v1+json, not vnd.oci.image.index.v1+json _, config = get_manifest_and_config_with_retry(image_ref.with_tag(tag), tag, recurse=0) # Do very basic validation: if "spec" is a key in the config, it # must be a Spec object too. return config if "spec" in config else None def _oci_update_index( image_ref: ImageReference, tmpdir: str, pool: concurrent.futures.Executor ) -> None: tags = list_tags(image_ref) # Fetch all image config files in parallel spec_dicts = pool.map( _oci_config_from_tag, ((image_ref, tag) for tag in tags if tag_is_spec(tag)) ) # Populate the database db_root_dir = os.path.join(tmpdir, "db_root") db = BuildCacheDatabase(db_root_dir) for spec_dict in spec_dicts: spec = spack.spec.Spec.from_dict(spec_dict) db.add(spec) db.mark(spec, "in_buildcache", True) # Create the index.json file index_json_path = os.path.join(tmpdir, spack.database.INDEX_JSON_FILE) with open(index_json_path, "w", encoding="utf-8") as f: db._write_to_file(f) # Create an empty config.json file empty_config_json_path = os.path.join(tmpdir, "config.json") with open(empty_config_json_path, "wb") as f: f.write(b"{}") # Upload the index.json file index_shasum = Digest.from_sha256(spack.util.crypto.checksum(hashlib.sha256, index_json_path)) upload_blob_with_retry(image_ref, file=index_json_path, digest=index_shasum) # Upload the config.json file empty_config_digest = Digest.from_sha256( spack.util.crypto.checksum(hashlib.sha256, empty_config_json_path) ) upload_blob_with_retry(image_ref, file=empty_config_json_path, digest=empty_config_digest) # Push a manifest file that references the index.json file as a layer # Notice that we push this as if it is an image, which it of course is not. # When the ORAS spec becomes official, we can use that instead of a fake image. # For now we just use the OCI image spec, so that we don't run into issues with # automatic garbage collection of blobs that are not referenced by any image manifest. oci_manifest = { "mediaType": "application/vnd.oci.image.manifest.v1+json", "schemaVersion": 2, # Config is just an empty {} file for now, and irrelevant "config": { "mediaType": "application/vnd.oci.image.config.v1+json", "digest": str(empty_config_digest), "size": os.path.getsize(empty_config_json_path), }, # The buildcache index is the only layer, and is not a tarball, we lie here. "layers": [ { "mediaType": "application/vnd.oci.image.layer.v1.tar+gzip", "digest": str(index_shasum), "size": os.path.getsize(index_json_path), } ], } upload_manifest_with_retry(image_ref.with_tag(default_index_tag), oci_manifest)
[docs] def try_fetch(url_to_fetch): """Utility function to try and fetch a file from a url, stage it locally, and return the path to the staged file. Args: url_to_fetch (str): Url pointing to remote resource to fetch Returns: Path to locally staged resource or ``None`` if it could not be fetched. """ stage = Stage(url_to_fetch, keep=True) stage.create() try: stage.fetch() except spack.error.FetchError: stage.destroy() return None return stage
[docs] def download_tarball( spec: spack.spec.Spec, unsigned: Optional[bool] = False, mirrors_for_spec: Optional[List[MirrorMetadata]] = None, ) -> Optional[spack.stage.Stage]: """Download binary tarball for given package Args: spec: a concrete spec unsigned: if ``True`` or ``False`` override the mirror signature verification defaults mirrors_for_spec: Optional list of mirrors known to have the spec. These will be checked in order first before looking in other configured mirrors. Returns: ``None`` if the tarball could not be downloaded, the signature verified (if required), and its checksum validated. Otherwise, return the stage containing the downloaded tarball. """ configured_mirrors: Iterable[spack.mirrors.mirror.Mirror] = ( spack.mirrors.mirror.MirrorCollection(binary=True).values() ) if not configured_mirrors: raise NoConfiguredBinaryMirrors() # Note on try_first and try_next: # mirrors_for_spec mostly likely came from spack caching remote # mirror indices locally and adding their specs to a local data # structure supporting quick lookup of concrete specs. Those # mirrors are likely a subset of all configured mirrors, and # we'll probably find what we need in one of them. But we'll # look in all configured mirrors if needed, as maybe the spec # we need was in an un-indexed mirror. No need to check any # mirror for the spec twice though. try_first = mirrors_for_spec or [] try_next = [ MirrorMetadata(mirror.fetch_url, layout, mirror.fetch_view) for mirror in configured_mirrors for layout in mirror.supported_layout_versions ] urls_and_versions = try_first + [uv for uv in try_next if uv not in try_first] # TODO: turn `mirrors_for_spec` into a list of Mirror instances, instead of doing that here. def fetch_url_to_mirror( mirror_metadata: MirrorMetadata, ) -> Tuple[spack.mirrors.mirror.Mirror, int]: url = mirror_metadata.url layout_version = mirror_metadata.version for mirror in configured_mirrors: if mirror.fetch_url == url: return mirror, layout_version return spack.mirrors.mirror.Mirror(url), layout_version mirrors = [fetch_url_to_mirror(mirror_metadata) for mirror_metadata in urls_and_versions] for mirror, layout_version in mirrors: # Override mirror's default if currently_unsigned = unsigned if unsigned is not None else not mirror.signed # If it's an OCI index, do things differently, since we cannot compose URLs. fetch_url = mirror.fetch_url # TODO: refactor this to some "nice" place. if spack.oci.image.is_oci_url(fetch_url): ref = ImageReference.from_url(fetch_url).with_tag(_oci_default_tag(spec)) # Fetch the manifest try: with spack.oci.opener.urlopen( urllib.request.Request( url=ref.manifest_url(), headers={"Accept": ", ".join(spack.oci.oci.manifest_content_type)}, ) ) as response: manifest = json.load(response) except Exception: continue # Download the config = spec.json and the relevant tarball try: spec_digest = spack.oci.image.Digest.from_string(manifest["config"]["digest"]) tarball_digest = spack.oci.image.Digest.from_string( manifest["layers"][-1]["digest"] ) except Exception: continue with spack.oci.oci.make_stage( ref.blob_url(spec_digest), spec_digest, keep=True ) as local_specfile_stage: try: local_specfile_stage.fetch() local_specfile_stage.check() try: get_valid_spec_file( local_specfile_stage.save_filename, CURRENT_BUILD_CACHE_LAYOUT_VERSION ) except InvalidMetadataFile as e: tty.warn( f"Ignoring binary package for {spec.name}/{spec.dag_hash()[:7]} " f"from {fetch_url} due to invalid metadata file: {e}" ) local_specfile_stage.destroy() continue except Exception: continue local_specfile_stage.cache_local() local_specfile_stage.destroy() with spack.oci.oci.make_stage( ref.blob_url(tarball_digest), tarball_digest, keep=True ) as tarball_stage: try: tarball_stage.fetch() tarball_stage.check() except Exception: continue tarball_stage.cache_local() return tarball_stage else: cache_type = get_url_buildcache_class(layout_version=layout_version) cache_entry = cache_type(fetch_url, spec, allow_unsigned=currently_unsigned) try: cache_entry.fetch_archive() except Exception as e: tty.debug( f"Encountered error attempting to fetch archive for " f"{spec.name}/{spec.dag_hash()[:7]} from {fetch_url} " f"(v{layout_version}) due to {e}" ) cache_entry.destroy() continue if layout_version == 2: warn_v2_layout(fetch_url, "Installing a spec") return cache_entry.get_archive_stage() # Falling through the nested loops means we exhaustively searched # for all known kinds of spec files on all mirrors and did not find # an acceptable one for which we could download a tarball and (if # needed) verify a signature. So at this point, we will proceed to # install from source. return None
[docs] def relocate_package(spec: spack.spec.Spec) -> None: """Relocate binaries and text files in the given spec prefix, based on its buildinfo file.""" spec_prefix = str(spec.prefix) buildinfo = read_buildinfo_file(spec_prefix) old_layout_root = str(buildinfo["buildpath"]) # Warn about old style tarballs created with the --rel flag (removed in Spack v0.20) if buildinfo.get("relative_rpaths", False): tty.warn( f"Tarball for {spec} uses relative rpaths, which can cause library loading issues." ) # In Spack 0.19 and older prefix_to_hash was the default and externals were not dropped, so # prefixes were not unique. if "hash_to_prefix" in buildinfo: hash_to_old_prefix = buildinfo["hash_to_prefix"] elif "prefix_to_hash" in buildinfo: hash_to_old_prefix = {v: k for (k, v) in buildinfo["prefix_to_hash"].items()} else: raise NewLayoutException( "Package tarball was created from an install prefix with a different directory layout " "and an older buildcache create implementation. It cannot be relocated." ) prefix_to_prefix: Dict[str, str] = {} if "sbang_install_path" in buildinfo: old_sbang_install_path = str(buildinfo["sbang_install_path"]) prefix_to_prefix[old_sbang_install_path] = spack.hooks.sbang.sbang_install_path() # First match specific prefix paths. Possibly the *local* install prefix of some dependency is # in an upstream, so we cannot assume the original spack store root can be mapped uniformly to # the new spack store root. # If the spec is spliced, we need to handle the simultaneous mapping from the old install_tree # to the new install_tree and from the build_spec to the spliced spec. Because foo.build_spec # is foo for any non-spliced spec, we can simplify by checking for spliced-in nodes by checking # for nodes not in the build_spec without any explicit check for whether the spec is spliced. # An analog in this algorithm is any spec that shares a name or provides the same virtuals in # the context of the relevant root spec. This ensures that the analog for a spec s is the spec # that s replaced when we spliced. relocation_specs = specs_to_relocate(spec) build_spec_ids = set(id(s) for s in spec.build_spec.traverse(deptype=dt.ALL & ~dt.BUILD)) for s in relocation_specs: analog = s if id(s) not in build_spec_ids: analogs = [ d for d in spec.build_spec.traverse(deptype=dt.ALL & ~dt.BUILD) if s._splice_match(d, self_root=spec, other_root=spec.build_spec) ] if analogs: # Prefer same-name analogs and prefer higher versions # This matches the preferences in spack.spec.Spec.splice, so we # will find same node analog = max(analogs, key=lambda a: (a.name == s.name, a.version)) lookup_dag_hash = analog.dag_hash() if lookup_dag_hash in hash_to_old_prefix: old_dep_prefix = hash_to_old_prefix[lookup_dag_hash] prefix_to_prefix[old_dep_prefix] = str(s.prefix) # Only then add the generic fallback of install prefix -> install prefix. prefix_to_prefix[old_layout_root] = str(spack.store.STORE.layout.root) # Delete identity mappings from prefix_to_prefix prefix_to_prefix = {k: v for k, v in prefix_to_prefix.items() if k != v} # If there's nothing to relocate, we're done. if not prefix_to_prefix: return for old, new in prefix_to_prefix.items(): tty.debug(f"Relocating: {old} => {new}.") # Old archives may have hardlinks repeated. dedupe_hardlinks_if_necessary(spec_prefix, buildinfo) # Text files containing the prefix text textfiles = [os.path.join(spec_prefix, f) for f in buildinfo["relocate_textfiles"]] binaries = [os.path.join(spec_prefix, f) for f in buildinfo.get("relocate_binaries")] links = [os.path.join(spec_prefix, f) for f in buildinfo.get("relocate_links", [])] platform = spack.platforms.by_name(spec.platform) if "macho" in platform.binary_formats: relocate.relocate_macho_binaries(binaries, prefix_to_prefix) elif "elf" in platform.binary_formats: relocate.relocate_elf_binaries(binaries, prefix_to_prefix) relocate.relocate_links(links, prefix_to_prefix) relocate.relocate_text(textfiles, prefix_to_prefix) changed_files = relocate.relocate_text_bin(binaries, prefix_to_prefix) # Add ad-hoc signatures to patched macho files when on macOS. if "macho" in platform.binary_formats and sys.platform == "darwin": codesign = which("codesign") if not codesign: return for binary in changed_files: # preserve the original inode by running codesign on a copy with fsys.edit_in_place_through_temporary_file(binary) as tmp_binary: codesign("-fs-", tmp_binary) install_manifest = os.path.join( spec.prefix, spack.store.STORE.layout.metadata_dir, spack.store.STORE.layout.manifest_file_name, ) if not os.path.exists(install_manifest): spec_id = spec.format("{name}/{hash:7}") tty.warn("No manifest file in tarball for spec %s" % spec_id) # overwrite old metadata with new if spec.spliced: # rewrite spec on disk spack.store.STORE.layout.write_spec(spec, spack.store.STORE.layout.spec_file_path(spec)) # de-cache the install manifest with contextlib.suppress(FileNotFoundError): os.unlink(install_manifest)
def _tar_strip_component(tar: tarfile.TarFile, prefix: str): """Yield all members of tarfile that start with given prefix, and strip that prefix (including symlinks)""" # Including trailing /, otherwise we end up with absolute paths. regex = re.compile(re.escape(prefix) + "/*") # Only yield members in the package prefix. # Note: when a tarfile is created, relative in-prefix symlinks are # expanded to matching member names of tarfile entries. So, we have # to ensure that those are updated too. # Absolute symlinks are copied verbatim -- relocation should take care of # them. for m in tar.getmembers(): result = regex.match(m.name) if not result: continue m.name = m.name[result.end() :] if m.linkname: result = regex.match(m.linkname) if result: m.linkname = m.linkname[result.end() :] yield m
[docs] def extract_buildcache_tarball(tarfile_path: str, destination: str) -> None: with closing(tarfile.open(tarfile_path, "r")) as tar: # For consistent behavior across all supported Python versions tar.extraction_filter = lambda member, path: member # Remove common prefix from tarball entries and directly extract them to the install dir. tar.extractall( path=destination, members=_tar_strip_component(tar, prefix=_ensure_common_prefix(tar)) )
[docs] def extract_tarball(spec, tarball_stage: spack.stage.Stage, force=False, timer=timer.NULL_TIMER): """ extract binary tarball for given package into install area """ timer.start("extract") if os.path.exists(spec.prefix): if force: shutil.rmtree(spec.prefix) else: raise NoOverwriteException(str(spec.prefix)) # Create the install prefix fsys.mkdirp( spec.prefix, mode=get_package_dir_permissions(spec), group=get_package_group(spec), default_perms="parents", ) tarfile_path = tarball_stage.save_filename try: extract_buildcache_tarball(tarfile_path, destination=spec.prefix) except Exception: shutil.rmtree(spec.prefix, ignore_errors=True) tarball_stage.destroy() raise timer.stop("extract") timer.start("relocate") try: relocate_package(spec) except Exception as e: shutil.rmtree(spec.prefix, ignore_errors=True) raise e finally: tarball_stage.destroy() timer.stop("relocate")
def _ensure_common_prefix(tar: tarfile.TarFile) -> str: # Find the lowest `binary_distribution` file (hard-coded forward slash is on purpose). binary_distribution = min( ( e.name for e in tar.getmembers() if e.isfile() and e.name.endswith(".spack/binary_distribution") ), key=len, default=None, ) if binary_distribution is None: raise ValueError("Tarball is not a Spack package, missing binary_distribution file") pkg_path = pathlib.PurePosixPath(binary_distribution).parent.parent # Even the most ancient Spack version has required to list the dir of the package itself, so # guard against broken tarballs where `path.parent.parent` is empty. if pkg_path == pathlib.PurePosixPath(): raise ValueError("Invalid tarball, missing package prefix dir") pkg_prefix = str(pkg_path) # Ensure all tar entries are in the pkg_prefix dir, and if they're not, they should be parent # dirs of it. has_prefix = False for member in tar.getmembers(): stripped = member.name.rstrip("/") if not ( stripped.startswith(pkg_prefix) or member.isdir() and pkg_prefix.startswith(stripped) ): raise ValueError(f"Tarball contains file {stripped} outside of prefix {pkg_prefix}") if member.isdir() and stripped == pkg_prefix: has_prefix = True # This is technically not required, but let's be defensive about the existence of the package # prefix dir. if not has_prefix: raise ValueError(f"Tarball does not contain a common prefix {pkg_prefix}") return pkg_prefix
[docs] def install_root_node( spec: spack.spec.Spec, unsigned=False, force: bool = False, sha256: Optional[str] = None, allow_missing: bool = False, ) -> None: """Install the root node of a concrete spec from a buildcache. Checking the sha256 sum of a node before installation is usually needed only for software installed during Spack's bootstrapping (since we might not have a proper signature verification mechanism available). Args: spec: spec to be installed (note that only the root node will be installed) unsigned: if True allows installing unsigned binaries force: force installation if the spec is already present in the local store sha256: optional sha256 of the binary package, to be checked before installation allow_missing: when true, allows installing a node with missing dependencies """ # Early termination if spec.external or not spec.concrete: warnings.warn("Skipping external or abstract spec {0}".format(spec.format())) return elif spec.installed and not force: warnings.warn("Package for spec {0} already installed.".format(spec.format())) return tarball_stage = download_tarball(spec.build_spec, unsigned) if not tarball_stage: msg = 'download of binary cache file for spec "{0}" failed' raise RuntimeError(msg.format(spec.build_spec.format())) # don't print long padded paths while extracting/relocating binaries with spack.util.path.filter_padding(): tty.msg('Installing "{0}" from a buildcache'.format(spec.format())) extract_tarball(spec, tarball_stage, force) spec.package.windows_establish_runtime_linkage() spack.hooks.post_install(spec, False) spack.store.STORE.db.add(spec, allow_missing=allow_missing)
[docs] def install_single_spec(spec, unsigned=False, force=False): """Install a single concrete spec from a buildcache. Args: spec (spack.spec.Spec): spec to be installed unsigned (bool): if True allows installing unsigned binaries force (bool): force installation if the spec is already present in the local store """ for node in spec.traverse(root=True, order="post", deptype=("link", "run")): install_root_node(node, unsigned=unsigned, force=force)
[docs] def try_direct_fetch(spec: spack.spec.Spec) -> List[MirrorMetadata]: """Try to find the spec directly on the configured mirrors""" found_specs: List[MirrorMetadata] = [] binary_mirrors = spack.mirrors.mirror.MirrorCollection(binary=True).values() for mirror in binary_mirrors: # TODO: OCI-support if spack.oci.image.is_oci_url(mirror.fetch_url): continue for layout_version in mirror.supported_layout_versions: # layout_version could eventually come from the mirror config cache_class = get_url_buildcache_class(layout_version=layout_version) cache_entry = cache_class(mirror.fetch_url, spec) try: spec_dict = cache_entry.fetch_metadata() except BuildcacheEntryError: continue finally: cache_entry.destroy() # All specs in build caches are concrete (as they are built) so we need # to mark this spec concrete on read-in. fetched_spec = spack.spec.Spec.from_dict(spec_dict) fetched_spec._mark_concrete() found_specs.append(MirrorMetadata(mirror.fetch_url, layout_version, mirror.fetch_view)) return found_specs
[docs] def get_mirrors_for_spec(spec: spack.spec.Spec, index_only: bool = False) -> List[MirrorMetadata]: """ Check if concrete spec exists on mirrors and return a list indicating the mirrors on which it can be found Args: spec: The spec to look for in binary mirrors index_only: When ``index_only`` is set to ``True``, only the local cache is checked, no requests are made. """ if not spack.mirrors.mirror.MirrorCollection(binary=True): tty.debug("No Spack mirrors are currently configured") return [] results = BINARY_INDEX.find_built_spec(spec) # The index may be out-of-date. If we aren't only considering indices, try # to fetch directly since we know where the file should be. if not results and not index_only: results = try_direct_fetch(spec) # We found a spec by the direct fetch approach, we might as well # add it to our mapping. if results: BINARY_INDEX.update_spec(spec, results) return results
[docs] def update_cache_and_get_specs(): """ Get all concrete specs for build caches available on configured mirrors. Initialization of internal cache data structures is done as lazily as possible, so this method will also attempt to initialize and update the local index cache (essentially a no-op if it has been done already and nothing has changed on the configured mirrors.) Raises: FetchCacheError """ BINARY_INDEX.update() return BINARY_INDEX.get_all_built_specs()
[docs] def get_keys( install: bool = False, trust: bool = False, force: bool = False, mirrors: Optional[Mapping[str, spack.mirrors.mirror.Mirror]] = None, ): """Get pgp public keys available on mirror with suffix .pub""" mirror_collection = mirrors or spack.mirrors.mirror.MirrorCollection(binary=True) if not mirror_collection: tty.die("Please add a spack mirror to allow " + "download of build caches.") fingerprints = [] for mirror in mirror_collection.values(): if not mirror.signed: # Don't bother fetching keys for unsigned mirrors continue for layout_version in mirror.supported_layout_versions: fetch_url = mirror.fetch_url if layout_version == 2: mirror_layout_fingerprints = _get_keys_v2(fetch_url, install, trust, force) else: mirror_layout_fingerprints = _get_keys( fetch_url, layout_version, install, trust, force ) if mirror_layout_fingerprints: fingerprints.extend(mirror_layout_fingerprints) return fingerprints
def _get_keys( mirror_url: str, layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION, install: bool = False, trust: bool = False, force: bool = False, ) -> Optional[List[str]]: cache_class = get_url_buildcache_class(layout_version=layout_version) tty.debug("Finding public keys in {0}".format(url_util.format(mirror_url))) keys_prefix = url_util.join( mirror_url, *cache_class.get_relative_path_components(BuildcacheComponent.KEY) ) key_index_manifest_url = url_util.join(keys_prefix, "keys.manifest.json") index_entry = cache_class(mirror_url, allow_unsigned=True) try: index_manifest = index_entry.read_manifest(manifest_url=key_index_manifest_url) index_blob_path = index_entry.fetch_blob(index_manifest.data[0]) except BuildcacheEntryError as e: tty.debug(f"Failed to fetch key index due to: {e}") index_entry.destroy() return None with open(index_blob_path, encoding="utf-8") as fd: json_index = json.load(fd) index_entry.destroy() saved_fingerprints = [] for fingerprint, _ in json_index["keys"].items(): key_manifest_url = url_util.join(keys_prefix, f"{fingerprint}.key.manifest.json") key_entry = cache_class(mirror_url, allow_unsigned=True) try: key_manifest = key_entry.read_manifest(manifest_url=key_manifest_url) key_blob_path = key_entry.fetch_blob(key_manifest.data[0]) except BuildcacheEntryError as e: tty.debug(f"Failed to fetch key {fingerprint} due to: {e}") key_entry.destroy() continue tty.debug("Found key {0}".format(fingerprint)) if install: if trust: spack.util.gpg.trust(key_blob_path) tty.debug(f"Added {fingerprint} to trusted keys.") saved_fingerprints.append(fingerprint) else: tty.debug( "Will not add this key to trusted keys.Use -t to install all downloaded keys" ) key_entry.destroy() return saved_fingerprints def _get_keys_v2(mirror_url, install=False, trust=False, force=False) -> Optional[List[str]]: cache_class = get_url_buildcache_class(layout_version=2) keys_url = url_util.join( mirror_url, *cache_class.get_relative_path_components(BuildcacheComponent.KEY) ) keys_index = url_util.join(keys_url, "index.json") tty.debug("Finding public keys in {0}".format(url_util.format(mirror_url))) try: json_index = web_util.read_json(keys_index) except (web_util.SpackWebError, OSError, ValueError) as url_err: # TODO: avoid repeated request if web_util.url_exists(keys_index): tty.error( f"Unable to find public keys in {url_util.format(mirror_url)}," f" caught exception attempting to read from {url_util.format(keys_index)}." ) tty.error(url_err) return None saved_fingerprints = [] for fingerprint, key_attributes in json_index["keys"].items(): link = os.path.join(keys_url, fingerprint + ".pub") with Stage(link, name="build_cache", keep=True) as stage: if os.path.exists(stage.save_filename) and force: os.remove(stage.save_filename) if not os.path.exists(stage.save_filename): try: stage.fetch() except spack.error.FetchError: continue tty.debug("Found key {0}".format(fingerprint)) if install: if trust: spack.util.gpg.trust(stage.save_filename) tty.debug("Added this key to trusted keys.") saved_fingerprints.append(fingerprint) else: tty.debug( "Will not add this key to trusted keys.Use -t to install all downloaded keys" ) return saved_fingerprints def _url_push_keys( *mirrors: Union[spack.mirrors.mirror.Mirror, str], keys: List[str], tmpdir: str, update_index: bool = False, ): """Upload pgp public keys to the given mirrors""" keys = spack.util.gpg.public_keys(*(keys or ())) files = [os.path.join(tmpdir, f"{key}.pub") for key in keys] for key, file in zip(keys, files): spack.util.gpg.export_keys(file, [key]) cache_class = get_url_buildcache_class() for mirror in mirrors: push_url = mirror if isinstance(mirror, str) else mirror.push_url tty.debug(f"Pushing public keys to {url_util.format(push_url)}") pushed_a_key = False for key, file in zip(keys, files): cache_class.push_local_file_as_blob( local_file_path=file, mirror_url=push_url, manifest_name=f"{key}.key", component_type=BuildcacheComponent.KEY, compression="none", ) pushed_a_key = True if update_index: generate_key_index(push_url, tmpdir=tmpdir) if pushed_a_key or update_index: cache_class.maybe_push_layout_json(push_url)
[docs] def needs_rebuild(spec, mirror_url): if not spec.concrete: raise ValueError("spec must be concrete to check against mirror") pkg_name = spec.name pkg_version = spec.version pkg_hash = spec.dag_hash() tty.debug("Checking {0}-{1}, dag_hash = {2}".format(pkg_name, pkg_version, pkg_hash)) tty.debug(spec.tree()) # Try to retrieve the specfile directly, based on the known # format of the name, in order to determine if the package # needs to be rebuilt. cache_class = get_url_buildcache_class(layout_version=CURRENT_BUILD_CACHE_LAYOUT_VERSION) cache_entry = cache_class(mirror_url, spec, allow_unsigned=True) exists = cache_entry.exists([BuildcacheComponent.SPEC, BuildcacheComponent.TARBALL]) return not exists
[docs] def check_specs_against_mirrors(mirrors, specs, output_file=None): """Check all the given specs against buildcaches on the given mirrors and determine if any of the specs need to be rebuilt. Specs need to be rebuilt when their hash doesn't exist in the mirror. Arguments: mirrors (dict): Mirrors to check against specs (typing.Iterable): Specs to check against mirrors output_file (str): Path to output file to be written. If provided, mirrors with missing or out-of-date specs will be formatted as a JSON object and written to this file. Returns: 1 if any spec was out-of-date on any mirror, 0 otherwise. """ rebuilds = {} for mirror in spack.mirrors.mirror.MirrorCollection(mirrors, binary=True).values(): tty.debug("Checking for built specs at {0}".format(mirror.fetch_url)) rebuild_list = [] for spec in specs: if needs_rebuild(spec, mirror.fetch_url): rebuild_list.append({"short_spec": spec.short_spec, "hash": spec.dag_hash()}) if rebuild_list: rebuilds[mirror.fetch_url] = { "mirrorName": mirror.name, "mirrorUrl": mirror.fetch_url, "rebuildSpecs": rebuild_list, } if output_file: with open(output_file, "w", encoding="utf-8") as outf: outf.write(json.dumps(rebuilds)) return 1 if rebuilds else 0
[docs] def download_single_spec( concrete_spec, destination, mirror_url=None, layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION, ): """Download the buildcache files for a single concrete spec. Args: concrete_spec: concrete spec to be downloaded destination (str): path where to put the downloaded buildcache mirror_url (str): url of the mirror from which to download """ if not mirror_url and not spack.mirrors.mirror.MirrorCollection(binary=True): tty.die( "Please provide or add a spack mirror to allow " + "download of buildcache entries." ) urls = ( [mirror_url] if mirror_url else [ mirror.fetch_url for mirror in spack.mirrors.mirror.MirrorCollection(binary=True).values() ] ) mkdirp(destination) for url in urls: cache_class = get_url_buildcache_class(layout_version=layout_version) cache_entry = cache_class(url, concrete_spec, allow_unsigned=True) try: cache_entry.fetch_metadata() cache_entry.fetch_archive() except BuildcacheEntryError as e: tty.warn(f"Error downloading {concrete_spec.name}/{concrete_spec.dag_hash()[:7]}: {e}") cache_entry.destroy() continue shutil.move(cache_entry.get_local_spec_path(), destination) shutil.move(cache_entry.get_local_archive_path(), destination) return True return False
[docs] class BinaryCacheQuery: """Callable object to query if a spec is in a binary cache""" def __init__(self, all_architectures): """ Args: all_architectures (bool): if True consider all the spec for querying, otherwise restrict to the current default architecture """ self.all_architectures = all_architectures specs = update_cache_and_get_specs() if not self.all_architectures: arch = spack.spec.Spec.default_arch() specs = [s for s in specs if s.satisfies(arch)] self.possible_specs = specs def __call__(self, spec: spack.spec.Spec, **kwargs): """ Args: spec: The spec being searched for """ return [s for s in self.possible_specs if s.satisfies(spec)]
[docs] class FetchIndexError(Exception): def __str__(self): if len(self.args) == 1: return str(self.args[0]) else: return "{}, due to: {}".format(self.args[0], self.args[1])
[docs] class BuildcacheIndexError(spack.error.SpackError): """Raised when a buildcache cannot be read for any reason"""
[docs] class BuildcacheIndexNotExists(Exception): """Buildcache does not contain an index"""
FetchIndexResult = collections.namedtuple("FetchIndexResult", "etag hash data fresh")
[docs] class IndexHandler:
[docs] def conditional_fetch(self) -> FetchIndexResult: raise NotImplementedError(f"{self.__class__.__name__} is abstract")
[docs] def get_index_manifest(self, manifest_response) -> BlobRecord: """Read the response of the manifest request and return a BlobRecord""" cache_class = get_url_buildcache_class(CURRENT_BUILD_CACHE_LAYOUT_VERSION) try: result = io.TextIOWrapper(manifest_response, encoding="utf-8").read() except (ValueError, OSError) as e: raise FetchIndexError(f"Remote index {manifest_response.url} is invalid", e) from e manifest = BuildcacheManifest.from_dict( # Currently we do not sign buildcache index, but we could cache_class.verify_and_extract_manifest(result, verify=False) ) blob_record = manifest.get_blob_records( cache_class.component_to_media_type(BuildcacheComponent.INDEX) )[0] return blob_record
[docs] def fetch_index_blob( self, cache_entry: URLBuildcacheEntry, blob_record: BlobRecord ) -> Tuple[str, str]: """Fetch the index blob indicated by the BlobRecord, and return the (checksum, contents) of the blob""" try: staged_blob_path = cache_entry.fetch_blob(blob_record) except BuildcacheEntryError as e: cache_entry.destroy() raise FetchIndexError( f"Could not fetch index blob from {cache_entry.mirror_url}" ) from e with open(staged_blob_path, encoding="utf-8") as fd: blob_result = fd.read() computed_hash = compute_hash(blob_result) if computed_hash != blob_record.checksum: cache_entry.destroy() raise FetchIndexError(f"Remote index at {cache_entry.mirror_url} is invalid") return (computed_hash, blob_result)
[docs] class DefaultIndexHandlerV2(IndexHandler): """Fetcher for index.json, using separate index.json.hash as cache invalidation strategy""" def __init__(self, mirror_metadata, local_hash, urlopen=web_util.urlopen): self.url = mirror_metadata.url self.local_hash = local_hash self.urlopen = urlopen self.headers = {"User-Agent": web_util.SPACK_USER_AGENT}
[docs] def get_remote_hash(self): # Failure to fetch index.json.hash is not fatal url_index_hash = url_util.join(self.url, "build_cache", "index.json.hash") try: with self.urlopen( urllib.request.Request(url_index_hash, headers=self.headers) ) as response: remote_hash = response.read(64) except OSError: return None # Validate the hash if not re.match(rb"[a-f\d]{64}$", remote_hash): return None return remote_hash.decode("utf-8")
[docs] def conditional_fetch(self) -> FetchIndexResult: # Do an intermediate fetch for the hash # and a conditional fetch for the contents # Early exit if our cache is up to date. if self.local_hash and self.local_hash == self.get_remote_hash(): return FetchIndexResult(etag=None, hash=None, data=None, fresh=True) # Otherwise, download index.json url_index = url_util.join(self.url, "build_cache", spack.database.INDEX_JSON_FILE) try: response = self.urlopen(urllib.request.Request(url_index, headers=self.headers)) except OSError as e: raise FetchIndexError(f"Could not fetch index from {url_index}", e) from e with response: try: result = io.TextIOWrapper(response, encoding="utf-8").read() except (ValueError, OSError) as e: raise FetchIndexError(f"Remote index {url_index} is invalid") from e # For now we only handle etags on http(s), since 304 error handling # in s3:// is not there yet. if urllib.parse.urlparse(self.url).scheme not in ("http", "https"): etag = None else: etag = web_util.parse_etag( response.headers.get("Etag", None) or response.headers.get("etag", None) ) computed_hash = compute_hash(result) # We don't handle computed_hash != remote_hash here, which can happen # when remote index.json and index.json.hash are out of sync, or if # the hash algorithm changed. # The most likely scenario is that we got index.json got updated # while we fetched index.json.hash. Warning about an issue thus feels # wrong, as it's more of an issue with race conditions in the cache # invalidation strategy. warn_v2_layout(self.url, "Fetching an index") return FetchIndexResult(etag=etag, hash=computed_hash, data=result, fresh=False)
[docs] class EtagIndexHandlerV2(IndexHandler): """Fetcher for index.json, using ETags headers as cache invalidation strategy""" def __init__(self, mirror_metadata, etag, urlopen=web_util.urlopen): self.url = mirror_metadata.url self.etag = etag self.urlopen = urlopen
[docs] def conditional_fetch(self) -> FetchIndexResult: # Just do a conditional fetch immediately url = url_util.join(self.url, "build_cache", spack.database.INDEX_JSON_FILE) headers = {"User-Agent": web_util.SPACK_USER_AGENT, "If-None-Match": f'"{self.etag}"'} try: response = self.urlopen(urllib.request.Request(url, headers=headers)) except urllib.error.HTTPError as e: if e.getcode() == 304: # Not modified; that means fresh. return FetchIndexResult(etag=None, hash=None, data=None, fresh=True) raise FetchIndexError(f"Could not fetch index {url}", e) from e except OSError as e: # URLError, socket.timeout, etc. raise FetchIndexError(f"Could not fetch index {url}", e) from e with response: try: result = io.TextIOWrapper(response, encoding="utf-8").read() except (ValueError, OSError) as e: raise FetchIndexError(f"Remote index {url} is invalid", e) from e warn_v2_layout(self.url, "Fetching an index") etag_header_value = response.headers.get("Etag", None) or response.headers.get( "etag", None ) return FetchIndexResult( etag=web_util.parse_etag(etag_header_value), hash=compute_hash(result), data=result, fresh=False, )
[docs] class OCIIndexHandler(IndexHandler): def __init__(self, mirror_metadata: MirrorMetadata, local_hash, urlopen=None) -> None: self.local_hash = local_hash self.ref = spack.oci.image.ImageReference.from_url(mirror_metadata.url) self.urlopen = urlopen or spack.oci.opener.urlopen
[docs] def conditional_fetch(self) -> FetchIndexResult: """Download an index from an OCI registry type mirror.""" url_manifest = self.ref.with_tag(default_index_tag).manifest_url() try: response = self.urlopen( urllib.request.Request( url=url_manifest, headers={"Accept": "application/vnd.oci.image.manifest.v1+json"}, ) ) except OSError as e: raise FetchIndexError(f"Could not fetch manifest from {url_manifest}", e) from e with response: try: manifest = json.load(response) except Exception as e: raise FetchIndexError(f"Remote index {url_manifest} is invalid", e) from e # Get first blob hash, which should be the index.json try: index_digest = spack.oci.image.Digest.from_string(manifest["layers"][0]["digest"]) except Exception as e: raise FetchIndexError(f"Remote index {url_manifest} is invalid", e) from e # Fresh? if index_digest.digest == self.local_hash: return FetchIndexResult(etag=None, hash=None, data=None, fresh=True) # Otherwise fetch the blob / index.json try: with self.urlopen( urllib.request.Request( url=self.ref.blob_url(index_digest), headers={"Accept": "application/vnd.oci.image.layer.v1.tar+gzip"}, ) ) as response: result = io.TextIOWrapper(response, encoding="utf-8").read() except (OSError, ValueError) as e: raise FetchIndexError(f"Remote index {url_manifest} is invalid", e) from e # Make sure the blob we download has the advertised hash if compute_hash(result) != index_digest.digest: raise FetchIndexError(f"Remote index {url_manifest} is invalid") return FetchIndexResult(etag=None, hash=index_digest.digest, data=result, fresh=False)
[docs] class DefaultIndexHandler(IndexHandler): """Fetcher for buildcache index, cache invalidation via manifest contents""" def __init__(self, mirror_metadata: MirrorMetadata, local_hash, urlopen=web_util.urlopen): self.url = mirror_metadata.url self.view = mirror_metadata.view self.layout_version = mirror_metadata.version self.local_hash = local_hash self.urlopen = urlopen self.headers = {"User-Agent": web_util.SPACK_USER_AGENT}
[docs] def conditional_fetch(self) -> FetchIndexResult: cache_class = get_url_buildcache_class(layout_version=self.layout_version) url_index_manifest = cache_class.get_index_url(self.url, self.view) try: response = self.urlopen( urllib.request.Request(url_index_manifest, headers=self.headers) ) except OSError as e: raise FetchIndexError( f"Could not read index manifest from {url_index_manifest}" ) from e with response: index_blob_record = self.get_index_manifest(response) # Early exit if our cache is up to date. if self.local_hash and self.local_hash == index_blob_record.checksum: return FetchIndexResult(etag=None, hash=None, data=None, fresh=True) # Otherwise, download the index blob cache_entry = cache_class(self.url, allow_unsigned=True) computed_hash, result = self.fetch_index_blob(cache_entry, index_blob_record) cache_entry.destroy() # For now we only handle etags on http(s), since 304 error handling # in s3:// is not there yet. if urllib.parse.urlparse(self.url).scheme not in ("http", "https"): etag = None else: etag = web_util.parse_etag( response.headers.get("Etag", None) or response.headers.get("etag", None) ) return FetchIndexResult(etag=etag, hash=computed_hash, data=result, fresh=False)
[docs] class EtagIndexHandler(IndexHandler): """Fetcher for buildcache index, cache invalidation via ETags headers This class differs from the :class:`DefaultIndexHandler` in the following ways: 1. It is provided with an etag value on creation, rather than an index checksum value. Note that since we never start out with an etag, the default fetcher must have been used initially and determined that the etag approach is valid. 2. It provides this etag value in the ``If-None-Match`` request header for the index manifest. 3. It checks for special exception type and response code indicating the index manifest is not modified, exiting early and returning ``Fresh``, if encountered. 4. If it needs to actually read the manifest, it does not need to do any checks of the url scheme to determine whether an etag should be included in the return value.""" def __init__(self, mirror_metadata: MirrorMetadata, etag, urlopen=web_util.urlopen): self.url = mirror_metadata.url self.view = mirror_metadata.view self.layout_version = mirror_metadata.version self.etag = etag self.urlopen = urlopen
[docs] def conditional_fetch(self) -> FetchIndexResult: # Do a conditional fetch of the index manifest (i.e. using If-None-Match header) cache_class = get_url_buildcache_class(layout_version=self.layout_version) manifest_url = cache_class.get_index_url(self.url, self.view) headers = {"User-Agent": web_util.SPACK_USER_AGENT, "If-None-Match": f'"{self.etag}"'} try: response = self.urlopen(urllib.request.Request(manifest_url, headers=headers)) except urllib.error.HTTPError as e: if e.getcode() == 304: # The remote manifest has not been modified, i.e. the index we # already have is the freshest there is. return FetchIndexResult(etag=None, hash=None, data=None, fresh=True) raise FetchIndexError(f"Could not fetch index manifest {manifest_url}", e) from e except OSError as e: # URLError, socket.timeout, etc. raise FetchIndexError(f"Could not fetch index manifest {manifest_url}", e) from e # We need to read the index manifest and fetch the associated blob with response: index_blob_record = self.get_index_manifest(response) etag_header_value = response.headers.get("Etag", None) or response.headers.get( "etag", None ) cache_entry = cache_class(self.url, allow_unsigned=True) computed_hash, result = self.fetch_index_blob(cache_entry, index_blob_record) cache_entry.destroy() return FetchIndexResult( etag=web_util.parse_etag(etag_header_value), hash=computed_hash, data=result, fresh=False, )
[docs] def get_index_fetcher( scheme: str, mirror_metadata: MirrorMetadata, cache_entry: Dict[str, str] ) -> IndexHandler: if scheme == "oci": # TODO: Actually etag and OCI are not mutually exclusive... return OCIIndexHandler(mirror_metadata, cache_entry.get("index_hash", None)) elif cache_entry.get("etag"): if mirror_metadata.version < 3: return EtagIndexHandlerV2(mirror_metadata, cache_entry["etag"]) else: return EtagIndexHandler(mirror_metadata, cache_entry["etag"]) else: if mirror_metadata.version < 3: return DefaultIndexHandlerV2( mirror_metadata, local_hash=cache_entry.get("index_hash", None) ) else: return DefaultIndexHandler( mirror_metadata, local_hash=cache_entry.get("index_hash", None) )
[docs] class NoOverwriteException(spack.error.SpackError): """Raised when a file would be overwritten""" def __init__(self, file_path): super().__init__(f"Refusing to overwrite the following file: {file_path}")
[docs] class NoGpgException(spack.error.SpackError): """ Raised when gpg2 is not in PATH """ def __init__(self, msg): super().__init__(msg)
[docs] class NoKeyException(spack.error.SpackError): """ Raised when gpg has no default key added. """ def __init__(self, msg): super().__init__(msg)
[docs] class PickKeyException(spack.error.SpackError): """ Raised when multiple keys can be used to sign. """ def __init__(self, keys): err_msg = "Multiple keys available for signing\n%s\n" % keys err_msg += "Use spack buildcache create -k <key hash> to pick a key." super().__init__(err_msg)
[docs] class NewLayoutException(spack.error.SpackError): """ Raised if directory layout is different from buildcache. """ def __init__(self, msg): super().__init__(msg)
[docs] class UnsignedPackageException(spack.error.SpackError): """ Raised if installation of unsigned package is attempted without the use of ``--no-check-signature``. """
[docs] class GenerateIndexError(spack.error.SpackError): """Raised when unable to generate key or package index for mirror"""
[docs] class CannotListKeys(GenerateIndexError): """Raised when unable to list keys when generating key index"""
[docs] class PushToBuildCacheError(spack.error.SpackError): """Raised when unable to push objects to binary mirror"""
[docs] class NoConfiguredBinaryMirrors(spack.error.SpackError): """Raised when no binary mirrors are configured but an operation requires them""" def __init__(self): super().__init__("Please add a spack mirror to allow download of pre-compiled packages.")