# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import collections
import concurrent.futures
import contextlib
import copy
import datetime
import hashlib
import io
import itertools
import json
import os
import pathlib
import re
import shutil
import sys
import tarfile
import tempfile
import textwrap
import time
import urllib.error
import urllib.parse
import urllib.request
import warnings
from collections import defaultdict
from contextlib import closing
from typing import (
IO,
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
NamedTuple,
Optional,
Set,
Tuple,
Union,
cast,
)
import spack.caches
import spack.config
import spack.database
import spack.deptypes as dt
import spack.error
import spack.hash_types as ht
import spack.hooks
import spack.hooks.sbang
import spack.llnl.util.filesystem as fsys
import spack.llnl.util.lang
import spack.llnl.util.tty as tty
import spack.mirrors.mirror
import spack.oci.image
import spack.oci.oci
import spack.oci.opener
import spack.paths
import spack.platforms
import spack.relocate as relocate
import spack.spec
import spack.stage
import spack.store
import spack.user_environment
import spack.util.archive
import spack.util.crypto
import spack.util.file_cache as file_cache
import spack.util.gpg
import spack.util.parallel
import spack.util.path
import spack.util.spack_json as sjson
import spack.util.spack_yaml as syaml
import spack.util.timer as timer
import spack.util.url as url_util
import spack.util.web as web_util
from spack import traverse
from spack.llnl.util.filesystem import mkdirp
from spack.oci.image import (
Digest,
ImageReference,
default_config,
default_manifest,
ensure_valid_tag,
)
from spack.oci.oci import (
copy_missing_layers_with_retry,
get_manifest_and_config_with_retry,
list_tags,
upload_blob_with_retry,
upload_manifest_with_retry,
)
from spack.package_prefs import get_package_dir_permissions, get_package_group
from spack.relocate_text import utf8_paths_to_single_binary_regex
from spack.stage import Stage
from spack.util.executable import which
from .enums import InstallRecordStatus
from .url_buildcache import (
CURRENT_BUILD_CACHE_LAYOUT_VERSION,
BlobRecord,
BuildcacheComponent,
BuildcacheEntryError,
BuildcacheManifest,
InvalidMetadataFile,
ListMirrorSpecsError,
MirrorMetadata,
URLBuildcacheEntry,
get_entries_from_cache,
get_url_buildcache_class,
get_valid_spec_file,
)
from .vendor.typing_extensions import TypedDict
[docs]
class BuildCacheDatabase(spack.database.Database):
"""A database for binary buildcaches.
A database supports writing buildcache index files, in which case certain fields are not
needed in each install record, and no locking is required. To use this feature, it provides
``lock_cfg=NO_LOCK``, and override the list of ``record_fields``.
"""
record_fields = ("spec", "ref_count", "in_buildcache")
def __init__(self, root):
super().__init__(root, lock_cfg=spack.database.NO_LOCK, layout=None)
self._write_transaction_impl = spack.llnl.util.lang.nullcontext
self._read_transaction_impl = spack.llnl.util.lang.nullcontext
def _handle_old_db_versions_read(self, check, db, *, reindex: bool):
if not self.is_readable():
raise spack.database.DatabaseNotReadableError(
f"cannot read buildcache v{self.db_version} at {self.root}"
)
return self._handle_current_version_read(check, db)
[docs]
class FetchCacheError(Exception):
"""Error thrown when fetching the cache failed, usually a composite error list."""
def __init__(self, errors):
if not isinstance(errors, list):
raise TypeError("Expected a list of errors")
self.errors = errors
if len(errors) > 1:
msg = " Error {0}: {1}: {2}"
self.message = "Multiple errors during fetching:\n"
self.message += "\n".join(
(
msg.format(i + 1, err.__class__.__name__, str(err))
for (i, err) in enumerate(errors)
)
)
else:
err = errors[0]
self.message = "{0}: {1}".format(err.__class__.__name__, str(err))
super().__init__(self.message)
class _MirrorIndexResult(NamedTuple):
succeeded: bool
regenerate: bool
had_cache_entry: bool
error: Optional[Exception]
no_index: bool = False
class _LastFetch(NamedTuple):
time: float
succeeded: bool
class _LocalIndexCache(TypedDict, total=False):
index_hash: str
index_path: str
etag: str
[docs]
class BinaryIndexCache:
"""
The BinaryIndexCache tracks what specs are available on (usually remote)
binary caches.
This index is "best effort", in the sense that whenever we don't find
what we're looking for here, we will attempt to fetch it directly from
configured mirrors anyway. Thus, it has the potential to speed things
up, but cache misses shouldn't break any spack functionality.
At the moment, everything in this class is initialized as lazily as
possible, so that it avoids slowing anything in spack down until
absolutely necessary.
"""
def __init__(self, cache_root: Optional[str] = None):
self._index_cache_root: str = cache_root or binary_index_location()
# the key associated with the serialized _local_index_cache
self._index_contents_key = "contents.json"
# a FileCache instance storing copies of remote binary cache indices
self._index_file_cache: file_cache.FileCache = file_cache.FileCache(self._index_cache_root)
self._index_file_cache_initialized = False
# stores a map of mirror URL and version layout to index hash and cache key (index path)
self._local_index_cache: dict[str, _LocalIndexCache] = {}
# hashes of remote indices already ingested into the concrete spec
# cache (_mirrors_for_spec)
self._specs_already_associated: Set[str] = set()
# mapping from mirror urls to the time.time() of the last index fetch and a bool indicating
# whether the fetch succeeded or not.
self._last_fetch_times: Dict[MirrorMetadata, _LastFetch] = {}
#: Dictionary mapping DAG hashes of specs to Spec objects
self._known_specs: Dict[str, spack.spec.Spec] = {}
#: Dictionary mapping DAG hashes of specs to a list of mirrors where they can be found
self._mirrors_for_spec: Dict[str, Set[MirrorMetadata]] = defaultdict(set)
#: URLs of binary mirrors that had no buildcache index during the last update()
self.mirrors_without_index: Set[str] = set()
def _init_local_index_cache(self):
if not self._index_file_cache_initialized:
cache_key = self._index_contents_key
self._local_index_cache = {}
with self._index_file_cache.read_transaction(cache_key) as cache_file:
if cache_file is not None:
self._local_index_cache = json.load(cache_file)
self._index_file_cache_initialized = True
def _write_local_index_cache(self):
self._init_local_index_cache()
cache_key = self._index_contents_key
with self._index_file_cache.write_transaction(cache_key) as (old, new):
json.dump(self._local_index_cache, new)
[docs]
def regenerate_spec_cache(self, clear_existing=False):
"""Populate the local cache of concrete specs (``_mirrors_for_spec``)
from the locally cached buildcache index files. This is essentially a
no-op if it has already been done, as we keep track of the index
hashes for which we have already associated the built specs."""
self._init_local_index_cache()
if clear_existing:
self._specs_already_associated = set()
self._mirrors_for_spec = defaultdict(set)
self._known_specs = {}
for mirror_metadata in self._local_index_cache:
cache_entry = self._local_index_cache[mirror_metadata]
cached_index_path = cache_entry["index_path"]
cached_index_hash = cache_entry["index_hash"]
if cached_index_hash not in self._specs_already_associated:
self._associate_built_specs_with_mirror(
cached_index_path, MirrorMetadata.from_string(mirror_metadata)
)
self._specs_already_associated.add(cached_index_hash)
def _associate_built_specs_with_mirror(self, cache_key, mirror_metadata: MirrorMetadata):
with tempfile.TemporaryDirectory(dir=spack.stage.get_stage_root()) as tmpdir:
db = BuildCacheDatabase(tmpdir)
with self._index_file_cache.read_transaction(cache_key) as f:
if f is not None:
try:
db._read_from_stream(f)
except spack.database.InvalidDatabaseVersionError as e:
tty.warn(
"you need a newer Spack version to read the buildcache index for the "
f"following v{mirror_metadata.version} mirror: "
f"'{mirror_metadata.url}'. {e.database_version_message}"
)
return
spec_list = [
s
for s in db.query_local(installed=InstallRecordStatus.ANY)
# todo, make it easier to get install records associated with specs
if s.external or db._data[s.dag_hash()].in_buildcache
]
for spec in spec_list:
dag_hash = spec.dag_hash()
mirrors = self._mirrors_for_spec[dag_hash]
mirrors.add(mirror_metadata.strip_view())
if dag_hash not in self._known_specs:
self._known_specs[dag_hash] = spec
[docs]
def get_all_built_specs(self) -> List[spack.spec.Spec]:
"""Returns a list of all concrete specs known to be available in a binary cache."""
return list(self._known_specs.values())
[docs]
def find_built_spec(self, spec: spack.spec.Spec) -> List[MirrorMetadata]:
"""Returns a list of MirrorMetadata objects indicating which mirrors have the given
concrete spec.
This method does not trigger reading anything from remote mirrors, but rather just checks
if the concrete spec is found within the cache.
The cache can be updated by calling ``update()`` on the cache.
Args:
spec: Concrete spec to find
"""
return self.find_by_hash(spec.dag_hash())
[docs]
def find_by_hash(self, dag_hash: str) -> List[MirrorMetadata]:
"""Same as find_built_spec but uses the hash of a spec.
Args:
dag_hash: hash of the spec to search
"""
return list(self._mirrors_for_spec.get(dag_hash, []))
[docs]
def update_spec(self, spec: spack.spec.Spec, found_list: List[MirrorMetadata]) -> None:
"""Update the cache with a new list of mirrors for a given spec."""
spec_dag_hash = spec.dag_hash()
if spec_dag_hash not in self._mirrors_for_spec:
self._mirrors_for_spec[spec_dag_hash] = set(found_list)
self._known_specs[spec_dag_hash] = spec
else:
current_list = self._mirrors_for_spec[spec_dag_hash]
for new_entry in found_list:
current_list.add(new_entry.strip_view())
[docs]
def update(self, with_cooldown: bool = False) -> None:
"""Make sure local cache of buildcache index files is up to date.
If the same mirrors are configured as the last time this was called
and none of the remote buildcache indices have changed, calling this
method will only result in fetching the index hash from each mirror
to confirm it is the same as what is stored locally. Otherwise, the
buildcache ``index.json`` and ``index.json.hash`` files are retrieved
from each configured mirror and stored locally (both in memory and
on disk under ``_index_cache_root``)."""
self._init_local_index_cache()
self.mirrors_without_index = set()
supported_mirror_versions = {
(m.fetch_url, m.fetch_view): m.supported_layout_versions
for m in spack.mirrors.mirror.MirrorCollection(binary=True).values()
}
# If we have a cached index for a mirror which is no longer configured, remove it
clear_cache, regenerate_cache = self._remove_stale_cache_entries(supported_mirror_versions)
# Fetch or update the other indexes
errors, all_failed = [], True
for (url, view), versions in supported_mirror_versions.items():
result = self._fetch_mirror_index(url, view, versions=versions, cooldown=with_cooldown)
if result.error:
errors.append(result.error)
if result.succeeded:
all_failed = False
if result.no_index:
self.mirrors_without_index.add(url)
regenerate_cache |= result.regenerate
clear_cache |= result.regenerate and result.had_cache_entry
self._write_local_index_cache()
if supported_mirror_versions and all_failed:
raise FetchCacheError(errors)
if errors:
warnings.warn(
"The following issues were ignored while updating the indices of binary caches:\n"
+ str(FetchCacheError(errors))
)
if regenerate_cache:
self.regenerate_spec_cache(clear_existing=clear_cache)
def _fetch_mirror_index(
self, url: str, view: Optional[str], *, versions: List[int], cooldown: bool
) -> _MirrorIndexResult:
"""Fetches the index of a mirror, using a highest-version first approach, and returning
after the first success.
"""
now = time.time()
ttl = spack.config.CONFIG.get_config("config").get("binary_index_ttl", 600)
for version in versions:
meta = MirrorMetadata(url, version, view)
cache_entry = self._local_index_cache.get(str(meta))
if cache_entry is not None and (
# Cache entry in cooldown
cooldown
and ttl > 0
and meta in self._last_fetch_times
and now - self._last_fetch_times[meta].time < ttl
):
return _MirrorIndexResult(
succeeded=self._last_fetch_times[meta].succeeded,
regenerate=False,
had_cache_entry=True,
error=None,
)
try:
regenerate = self._fetch_and_cache_index(meta, cache_entry=cache_entry or {})
self._last_fetch_times[meta] = _LastFetch(time=now, succeeded=True)
return _MirrorIndexResult(
succeeded=True,
regenerate=regenerate,
had_cache_entry=cache_entry is not None,
error=None,
)
except FetchIndexError as e:
self._last_fetch_times[meta] = _LastFetch(time=now, succeeded=False)
return _MirrorIndexResult(
succeeded=False,
regenerate=False,
had_cache_entry=cache_entry is not None,
error=e,
)
except BuildcacheIndexNotExists:
# Try next lower layout version
self._last_fetch_times[meta] = _LastFetch(time=now, succeeded=False)
continue
# All versions reported no index found. Record it for concretization callers to warn.
return _MirrorIndexResult(
succeeded=True, regenerate=False, had_cache_entry=False, error=None, no_index=True
)
def _remove_stale_cache_entries(
self, supported_mirror_versions: Dict[Tuple[str, Any], List[int]]
) -> Tuple[bool, bool]:
items_to_remove = []
clear, regenerate = False, not self._mirrors_for_spec
for local_index_key in self._local_index_cache:
meta = MirrorMetadata.from_string(local_index_key)
if meta.version not in supported_mirror_versions.get((meta.url, meta.view), ()):
index_file_key = self._local_index_cache[local_index_key]["index_path"]
items_to_remove.append((local_index_key, index_file_key, meta))
clear, regenerate = True, True
for local_index_key, index_file_key, meta in items_to_remove:
self._last_fetch_times.pop(meta, None)
self._index_file_cache.remove(index_file_key)
del self._local_index_cache[local_index_key]
return clear, regenerate
def _fetch_and_cache_index(self, mirror_metadata: MirrorMetadata, cache_entry={}):
"""Fetch a buildcache index file from a remote mirror and cache it.
If we already have a cached index from this mirror, then we first
check if the hash has changed, and we avoid fetching it if not.
Args:
mirror_metadata: Contains mirror base url and target binary cache layout version
cache_entry (dict): Old cache metadata with keys ``index_hash``, ``index_path``,
``etag``
Returns:
True if the local index.json was updated.
Throws:
FetchIndexError
BuildcacheIndexNotExists
"""
mirror_url = mirror_metadata.url
mirror_view = mirror_metadata.view
layout_version = mirror_metadata.version
# TODO: get rid of this request, handle 404 better
scheme = urllib.parse.urlparse(mirror_url).scheme
if scheme != "oci":
cache_class = get_url_buildcache_class(layout_version=layout_version)
index_url = cache_class.get_index_url(mirror_url, mirror_view)
if not web_util.url_exists(index_url):
raise BuildcacheIndexNotExists(f"Index not found in cache {index_url}")
fetcher: IndexHandler = get_index_fetcher(scheme, mirror_metadata, cache_entry)
result = fetcher.conditional_fetch()
# Nothing to do
if result.fresh:
return False
# Persist new index.json
url_hash = compute_hash(str(mirror_metadata))
cache_key = "{}_{}.json".format(url_hash[:10], result.hash[:10])
with self._index_file_cache.write_transaction(cache_key) as (old, new):
new.write(result.data)
self._local_index_cache[str(mirror_metadata)] = {
"index_hash": result.hash,
"index_path": cache_key,
"etag": result.etag,
}
# clean up the old cache_key if necessary
old_cache_key = cache_entry.get("index_path", None)
if old_cache_key:
self._index_file_cache.remove(old_cache_key)
# We fetched an index and updated the local index cache, we should
# regenerate the spec cache as a result.
return True
[docs]
def binary_index_location():
"""Set up a BinaryIndexCache for remote buildcache dbs in the user's homedir."""
cache_root = os.path.join(spack.caches.misc_cache_location(), "indices")
return spack.util.path.canonicalize_path(cache_root)
#: Default binary cache index instance
BINARY_INDEX = cast(BinaryIndexCache, spack.llnl.util.lang.Singleton(BinaryIndexCache))
[docs]
def compute_hash(data):
if isinstance(data, str):
data = data.encode("utf-8")
return hashlib.sha256(data).hexdigest()
[docs]
def buildinfo_file_name(prefix):
"""Filename of the binary package meta-data file"""
return os.path.join(prefix, ".spack", "binary_distribution")
[docs]
def read_buildinfo_file(prefix):
"""Read buildinfo file"""
with open(buildinfo_file_name(prefix), "r", encoding="utf-8") as f:
return syaml.load(f)
[docs]
def file_matches(f: IO[bytes], regex: spack.llnl.util.lang.PatternBytes) -> bool:
try:
return bool(regex.search(f.read()))
finally:
f.seek(0)
[docs]
def specs_to_relocate(spec: spack.spec.Spec) -> List[spack.spec.Spec]:
"""Return the set of specs that may be referenced in the install prefix of the provided spec.
We currently include non-external transitive link and direct run dependencies."""
specs = [
s
for s in itertools.chain(
spec.traverse(root=True, deptype="link", order="breadth", key=traverse.by_dag_hash),
spec.dependencies(deptype="run"),
)
if not s.external
]
return list(spack.llnl.util.lang.dedupe(specs, key=lambda s: s.dag_hash()))
[docs]
def get_buildinfo_dict(spec):
"""Create metadata for a tarball"""
return {
"sbang_install_path": spack.hooks.sbang.sbang_install_path(),
"buildpath": spack.store.STORE.layout.root,
"spackprefix": spack.paths.prefix,
"relative_prefix": os.path.relpath(spec.prefix, spack.store.STORE.layout.root),
# "relocate_textfiles": [],
# "relocate_binaries": [],
# "relocate_links": [],
"hardlinks_deduped": True,
"hash_to_prefix": {d.dag_hash(): str(d.prefix) for d in specs_to_relocate(spec)},
}
[docs]
def buildcache_relative_keys_path(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION):
cache_class = get_url_buildcache_class(layout_version=layout_version)
return os.path.join(*cache_class.get_relative_path_components(BuildcacheComponent.KEY))
[docs]
def buildcache_relative_keys_url(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION):
cache_class = get_url_buildcache_class(layout_version=layout_version)
return url_util.join(*cache_class.get_relative_path_components(BuildcacheComponent.KEY))
[docs]
def buildcache_relative_specs_path(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION):
cache_class = get_url_buildcache_class(layout_version=layout_version)
return os.path.join(*cache_class.get_relative_path_components(BuildcacheComponent.SPEC))
[docs]
def buildcache_relative_specs_url(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION):
cache_class = get_url_buildcache_class(layout_version=layout_version)
return url_util.join(*cache_class.get_relative_path_components(BuildcacheComponent.SPEC))
[docs]
def buildcache_relative_blobs_path(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION):
cache_class = get_url_buildcache_class(layout_version=layout_version)
return os.path.join(*cache_class.get_relative_path_components(BuildcacheComponent.BLOB))
[docs]
def buildcache_relative_blobs_url(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION):
cache_class = get_url_buildcache_class(layout_version=layout_version)
return url_util.join(*cache_class.get_relative_path_components(BuildcacheComponent.BLOB))
[docs]
def buildcache_relative_index_path(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION):
cache_class = get_url_buildcache_class(layout_version=layout_version)
return os.path.join(*cache_class.get_relative_path_components(BuildcacheComponent.INDEX))
[docs]
def buildcache_relative_index_url(layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION):
cache_class = get_url_buildcache_class(layout_version=layout_version)
return url_util.join(*cache_class.get_relative_path_components(BuildcacheComponent.INDEX))
[docs]
@spack.llnl.util.lang.memoized
def warn_v2_layout(mirror_url: str, action: str) -> bool:
lines = textwrap.wrap(
f"{action} from a v2 binary mirror layout, located at "
f"{mirror_url} is deprecated. Support for this will be "
"removed in a future version of spack. "
"If you manage the buildcache please consider running:",
width=72,
subsequent_indent=" ",
)
lines.extend(
[
" 'spack buildcache migrate'",
" or rebuilding the specs in this mirror. Otherwise, consider running:",
" 'spack mirror list'",
" 'spack mirror remove <name>'",
" with the <name> for the mirror url shown in the list.",
]
)
tty.warn("\n".join(lines))
return True
[docs]
def select_signing_key() -> str:
keys = spack.util.gpg.signing_keys()
num = len(keys)
if num > 1:
raise PickKeyException(str(keys))
elif num == 0:
raise NoKeyException(
"No default key available for signing.\n"
"Use spack gpg init and spack gpg create"
" to create a default key."
)
return keys[0]
def _push_index(db: BuildCacheDatabase, temp_dir: str, cache_prefix: str, name: str = ""):
"""Generate the index, compute its hash, and push the files to the mirror"""
index_json_path = os.path.join(temp_dir, spack.database.INDEX_JSON_FILE)
with open(index_json_path, "w", encoding="utf-8") as f:
db._write_to_file(f)
cache_class = get_url_buildcache_class(layout_version=CURRENT_BUILD_CACHE_LAYOUT_VERSION)
cache_class.push_local_file_as_blob(
index_json_path,
cache_prefix,
url_util.join(name, "index") if name else "index",
BuildcacheComponent.INDEX,
compression="none",
)
cache_class.maybe_push_layout_json(cache_prefix)
def _read_specs_and_push_index(
file_list: List[str],
read_method: Callable[[str], URLBuildcacheEntry],
name: str,
filter_fn: Callable[[str], bool],
cache_prefix: str,
db: BuildCacheDatabase,
temp_dir: str,
*,
timer=timer.NULL_TIMER,
):
"""Read listed specs, generate the index, and push it to the mirror.
Args:
file_list: List of urls or file paths pointing at spec files to read
read_method: A function taking a single argument, either a url or a file path,
and which reads the spec file at that location, and returns the spec.
cache_prefix: prefix of the build cache on s3 where index should be pushed.
db: A spack database used for adding specs and then writing the index.
temp_dir: Location to write index.json and hash for pushing
"""
with timer.measure("read"):
for file in file_list:
# All supported versions of build caches put the hash as the last
# parameter before the extension
try:
x = file.split("/")[-1].split("-")[-1].split(".")[0]
except IndexError:
raise GenerateIndexError(f"Malformed metadata file name detected {file}")
if not filter_fn(x):
continue
cache_entry: Optional[URLBuildcacheEntry] = None
try:
cache_entry = read_method(file)
spec_dict = cache_entry.fetch_metadata()
fetched_spec = spack.spec.Spec.from_dict(spec_dict)
except Exception as e:
tty.warn(f"Unable to fetch spec for manifest {file} due to: {e}")
continue
finally:
if cache_entry:
cache_entry.destroy()
db.add(fetched_spec)
db.mark(fetched_spec, "in_buildcache", True)
with timer.measure("push"):
_push_index(db, temp_dir, cache_prefix, name)
def _url_generate_package_index(
url: str,
tmpdir: str,
db: Optional[BuildCacheDatabase] = None,
name: str = "",
filter_fn: Callable[[str], bool] = lambda x: True,
*,
timer=timer.NULL_TIMER,
):
"""Create or replace the build cache index on the given mirror. The
buildcache index contains an entry for each binary package under the
cache_prefix.
Args:
url: Base url of binary mirror.
Return:
None
"""
with tempfile.TemporaryDirectory(dir=spack.stage.get_stage_root()) as tmpspecsdir:
try:
with timer.measure("list"):
filename_to_mtime_mapping, read_fn = get_entries_from_cache(
url, tmpspecsdir, component_type=BuildcacheComponent.SPEC
)
file_list = list(filename_to_mtime_mapping.keys())
except ListMirrorSpecsError as e:
raise GenerateIndexError(f"Unable to generate package index: {e}") from e
tty.debug(f"Retrieving spec descriptor files from {url} to build index")
if not db:
db = BuildCacheDatabase(tmpdir)
db._write()
try:
_read_specs_and_push_index(
file_list,
read_fn,
name,
filter_fn,
url,
db,
str(db.database_directory),
timer=timer,
)
except Exception as e:
raise GenerateIndexError(
f"Encountered problem pushing package index to {url}: {e}"
) from e
[docs]
def generate_key_index(mirror_url: str, tmpdir: str) -> None:
"""Create the key index page.
Creates (or replaces) the ``index.json`` page at the location given in mirror_url. This page
contains an entry for each key under mirror_url.
"""
tty.debug(f"Retrieving key.pub files from {url_util.format(mirror_url)} to build key index")
key_prefix = url_util.join(mirror_url, buildcache_relative_keys_url())
try:
fingerprints = (
entry[:-18]
for entry in web_util.list_url(key_prefix, recursive=False)
if entry.endswith(".key.manifest.json")
)
except Exception as e:
raise CannotListKeys(f"Encountered problem listing keys at {key_prefix}: {e}") from e
target = os.path.join(tmpdir, "index.json")
index = {"keys": dict((fingerprint, {}) for fingerprint in sorted(set(fingerprints)))}
with open(target, "w", encoding="utf-8") as f:
sjson.dump(index, f)
cache_class = get_url_buildcache_class()
try:
cache_class.push_local_file_as_blob(
local_file_path=target,
mirror_url=mirror_url,
manifest_name="keys",
component_type=BuildcacheComponent.KEY_INDEX,
compression="none",
)
cache_class.maybe_push_layout_json(mirror_url)
except Exception as e:
raise GenerateIndexError(
f"Encountered problem pushing key index to {key_prefix}: {e}"
) from e
[docs]
class FileTypes:
BINARY = 0
TEXT = 1
UNKNOWN = 2
NOT_ISO8859_1_TEXT = re.compile(b"[\x00\x7f-\x9f]")
[docs]
def file_type(f: IO[bytes]) -> int:
try:
# first check if this is an ELF or mach-o binary.
magic = f.read(8)
if len(magic) < 8:
return FileTypes.UNKNOWN
elif relocate.is_elf_magic(magic) or relocate.is_macho_magic(magic):
return FileTypes.BINARY
f.seek(0)
# Then try utf-8, which has a fast exponential decay in false positive rate with file size.
# Use chunked reads for fast early exit.
f_txt = io.TextIOWrapper(f, encoding="utf-8", errors="strict")
try:
while f_txt.read(1024):
pass
return FileTypes.TEXT
except UnicodeError:
f_txt.seek(0)
pass
finally:
f_txt.detach()
# Finally try iso-8859-1 heuristically. In Python, all possible 256 byte values are valid.
# We classify it as text if it does not contain any control characters / null bytes.
data = f.read(1024)
while data:
if NOT_ISO8859_1_TEXT.search(data):
break
data = f.read(1024)
else:
return FileTypes.TEXT
return FileTypes.UNKNOWN
finally:
f.seek(0)
[docs]
def tarfile_of_spec_prefix(
tar: tarfile.TarFile, prefix: str, prefixes_to_relocate: List[str]
) -> dict:
"""Create a tarfile of an install prefix of a spec. Skips existing buildinfo file.
Args:
tar: tarfile object to add files to
prefix: absolute install prefix of spec"""
if not os.path.isabs(prefix) or not os.path.isdir(prefix):
raise ValueError(f"prefix '{prefix}' must be an absolute path to a directory")
stat_key = lambda stat: (stat.st_dev, stat.st_ino)
try: # skip buildinfo file if it exists
files_to_skip = [stat_key(os.lstat(buildinfo_file_name(prefix)))]
skip = lambda entry: stat_key(entry.stat(follow_symlinks=False)) in files_to_skip
except OSError:
skip = lambda entry: False
binary_regex = utf8_paths_to_single_binary_regex(prefixes_to_relocate)
relocate_binaries = []
relocate_links = []
relocate_textfiles = []
# use callbacks to add files and symlinks, so we can register which files need relocation upon
# extraction.
def add_file(tar: tarfile.TarFile, info: tarfile.TarInfo, path: str):
with open(path, "rb") as f:
relpath = os.path.relpath(path, prefix)
# no need to relocate anything in the .spack directory
if relpath.split(os.sep, 1)[0] == ".spack":
tar.addfile(info, f)
return
f_type = file_type(f)
if f_type == FileTypes.BINARY:
relocate_binaries.append(os.path.relpath(path, prefix))
elif f_type == FileTypes.TEXT and file_matches(f, binary_regex):
relocate_textfiles.append(os.path.relpath(path, prefix))
tar.addfile(info, f)
def add_symlink(tar: tarfile.TarFile, info: tarfile.TarInfo, path: str):
if os.path.isabs(info.linkname) and binary_regex.match(info.linkname.encode("utf-8")):
relocate_links.append(os.path.relpath(path, prefix))
tar.addfile(info)
spack.util.archive.reproducible_tarfile_from_prefix(
tar,
prefix,
# Spack <= 0.21 did not include parent directories, leading to issues when tarballs are
# used in runtimes like AWS lambda.
include_parent_directories=True,
skip=skip,
add_file=add_file,
add_symlink=add_symlink,
)
return {
"relocate_binaries": relocate_binaries,
"relocate_links": relocate_links,
"relocate_textfiles": relocate_textfiles,
}
[docs]
def create_tarball(spec: spack.spec.Spec, tarfile_path: str) -> Tuple[str, str]:
"""Create a tarball of a spec and return the checksums of the compressed tarfile and the
uncompressed tarfile."""
return _do_create_tarball(
tarfile_path,
spec.prefix,
buildinfo=get_buildinfo_dict(spec),
prefixes_to_relocate=prefixes_to_relocate(spec),
)
def _do_create_tarball(
tarfile_path: str, prefix: str, buildinfo: dict, prefixes_to_relocate: List[str]
) -> Tuple[str, str]:
with spack.util.archive.gzip_compressed_tarfile(tarfile_path) as (
tar,
tar_gz_checksum,
tar_checksum,
):
# Tarball the install prefix
files_to_relocate = tarfile_of_spec_prefix(tar, prefix, prefixes_to_relocate)
buildinfo.update(files_to_relocate)
# Serialize buildinfo for the tarball
bstring = syaml.dump(buildinfo, default_flow_style=True).encode("utf-8")
tarinfo = tarfile.TarInfo(
name=spack.util.archive.default_path_to_name(buildinfo_file_name(prefix))
)
tarinfo.type = tarfile.REGTYPE
tarinfo.size = len(bstring)
tarinfo.mode = 0o644
tar.addfile(tarinfo, io.BytesIO(bstring))
return tar_gz_checksum.hexdigest(), tar_checksum.hexdigest()
def _exists_in_buildcache(
spec: spack.spec.Spec, out_url: str, allow_unsigned: bool = False
) -> URLBuildcacheEntry:
"""creates and returns (after checking existence) a URLBuildcacheEntry"""
cache_type = get_url_buildcache_class(CURRENT_BUILD_CACHE_LAYOUT_VERSION)
cache_entry = cache_type(out_url, spec, allow_unsigned=allow_unsigned)
return cache_entry
[docs]
def prefixes_to_relocate(spec):
prefixes = [s.prefix for s in specs_to_relocate(spec)]
prefixes.append(spack.hooks.sbang.sbang_install_path())
prefixes.append(str(spack.store.STORE.layout.root))
return prefixes
def _url_upload_tarball_and_specfile(
spec: spack.spec.Spec, tmpdir: str, cache_entry: URLBuildcacheEntry, signing_key: Optional[str]
):
tarball = os.path.join(tmpdir, f"{spec.dag_hash()}.tar.gz")
checksum, _ = create_tarball(spec, tarball)
cache_entry.push_binary_package(spec, tarball, "sha256", checksum, tmpdir, signing_key)
[docs]
class Uploader:
def __init__(self, mirror: spack.mirrors.mirror.Mirror, force: bool, update_index: bool):
self.mirror = mirror
self.force = force
self.update_index = update_index
self.tmpdir: str
self.executor: concurrent.futures.Executor
# Verify if the mirror meets the requirements to push
self.mirror.ensure_mirror_usable("push")
def __enter__(self):
self._tmpdir = tempfile.TemporaryDirectory(dir=spack.stage.get_stage_root())
self._executor = spack.util.parallel.make_concurrent_executor()
self.tmpdir = self._tmpdir.__enter__()
self.executor = self.executor = self._executor.__enter__()
return self
def __exit__(self, *args):
self._executor.__exit__(*args)
self._tmpdir.__exit__(*args)
[docs]
def push_or_raise(self, specs: List[spack.spec.Spec]) -> List[spack.spec.Spec]:
skipped, errors = self.push(specs)
if errors:
raise PushToBuildCacheError(
f"Failed to push {len(errors)} specs to {self.mirror.push_url}:\n"
+ "\n".join(
f"Failed to push {_format_spec(spec)}: {error}" for spec, error in errors
)
)
return skipped
[docs]
def push(
self, specs: List[spack.spec.Spec]
) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]:
raise NotImplementedError
[docs]
def tag(self, tag: str, roots: List[spack.spec.Spec]):
"""Make a list of selected specs together available under the given tag"""
pass
[docs]
class OCIUploader(Uploader):
def __init__(
self,
mirror: spack.mirrors.mirror.Mirror,
force: bool,
update_index: bool,
base_image: Optional[str],
) -> None:
super().__init__(mirror, force, update_index)
self.target_image = spack.oci.oci.image_from_mirror(mirror)
self.base_image = ImageReference.from_string(base_image) if base_image else None
[docs]
def push(
self, specs: List[spack.spec.Spec]
) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]:
skipped, base_images, checksums, upload_errors = _oci_push(
target_image=self.target_image,
base_image=self.base_image,
installed_specs_with_deps=specs,
force=self.force,
tmpdir=self.tmpdir,
executor=self.executor,
)
self._base_images = base_images
self._checksums = checksums
# only update index if any binaries were uploaded
if self.update_index and len(skipped) + len(upload_errors) < len(specs):
_oci_update_index(self.target_image, self.tmpdir, self.executor)
return skipped, upload_errors
[docs]
def tag(self, tag: str, roots: List[spack.spec.Spec]):
tagged_image = self.target_image.with_tag(tag)
# _push_oci may not populate self._base_images if binaries were already in the registry
for spec in roots:
_oci_update_base_images(
base_image=self.base_image,
target_image=self.target_image,
spec=spec,
base_image_cache=self._base_images,
)
_oci_put_manifest(
self._base_images, self._checksums, tagged_image, self.tmpdir, None, None, *roots
)
tty.info(f"Tagged {tagged_image}")
[docs]
class URLUploader(Uploader):
def __init__(
self,
mirror: spack.mirrors.mirror.Mirror,
force: bool,
update_index: bool,
signing_key: Optional[str],
) -> None:
super().__init__(mirror, force, update_index)
self.url = mirror.push_url
self.signing_key = signing_key
[docs]
def push(
self, specs: List[spack.spec.Spec]
) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]:
return _url_push(
specs,
out_url=self.url,
force=self.force,
update_index=self.update_index,
signing_key=self.signing_key,
tmpdir=self.tmpdir,
executor=self.executor,
)
[docs]
def make_uploader(
mirror: spack.mirrors.mirror.Mirror,
force: bool = False,
update_index: bool = False,
signing_key: Optional[str] = None,
base_image: Optional[str] = None,
) -> Uploader:
"""Builder for the appropriate uploader based on the mirror type"""
if spack.oci.image.is_oci_url(mirror.push_url):
return OCIUploader(
mirror=mirror, force=force, update_index=update_index, base_image=base_image
)
else:
return URLUploader(
mirror=mirror, force=force, update_index=update_index, signing_key=signing_key
)
def _format_spec(spec: spack.spec.Spec) -> str:
return spec.cformat("{name}{@version}{/hash:7}")
[docs]
class FancyProgress:
def __init__(self, total: int):
self.n = 0
self.total = total
self.running = False
self.enable = sys.stdout.isatty()
self.pretty_spec: str = ""
self.pre = ""
def _clear(self):
if self.enable and self.running:
sys.stdout.write("\033[F\033[K")
def _progress(self):
if self.total > 1:
digits = len(str(self.total))
return f"[{self.n:{digits}}/{self.total}] "
return ""
[docs]
def start(self, spec: spack.spec.Spec, running: bool) -> None:
self.n += 1
self.running = running
self.pre = self._progress()
self.pretty_spec = _format_spec(spec)
if self.enable and self.running:
tty.info(f"{self.pre}Pushing {self.pretty_spec}...")
[docs]
def ok(self, msg: Optional[str] = None) -> None:
self._clear()
msg = msg or f"Pushed {self.pretty_spec}"
tty.info(f"{self.pre}{msg}")
[docs]
def fail(self) -> None:
self._clear()
tty.info(f"{self.pre}Failed to push {self.pretty_spec}")
def _url_push(
specs: List[spack.spec.Spec],
out_url: str,
signing_key: Optional[str],
force: bool,
update_index: bool,
tmpdir: str,
executor: concurrent.futures.Executor,
) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]:
"""Pushes to the provided build cache, and returns a list of skipped specs that were already
present (when force=False), and a list of errors. Does not raise on error."""
skipped: List[spack.spec.Spec] = []
errors: List[Tuple[spack.spec.Spec, BaseException]] = []
exists_futures = [
executor.submit(
_exists_in_buildcache, spec, out_url, allow_unsigned=False if signing_key else True
)
for spec in specs
]
cache_entries = {
spec.dag_hash(): exists_future.result()
for spec, exists_future in zip(specs, exists_futures)
}
if not force:
specs_to_upload = []
for spec in specs:
if cache_entries[spec.dag_hash()].exists(
[BuildcacheComponent.SPEC, BuildcacheComponent.TARBALL]
):
skipped.append(spec)
else:
specs_to_upload.append(spec)
else:
specs_to_upload = specs
if not specs_to_upload:
return skipped, errors
total = len(specs_to_upload)
if total != len(specs):
tty.info(f"{total} specs need to be pushed to {out_url}")
upload_futures = [
executor.submit(
_url_upload_tarball_and_specfile,
spec,
tmpdir,
cache_entries[spec.dag_hash()],
signing_key,
)
for spec in specs_to_upload
]
uploaded_any = False
fancy_progress = FancyProgress(total)
for spec, upload_future in zip(specs_to_upload, upload_futures):
fancy_progress.start(spec, upload_future.running())
error = upload_future.exception()
if error is None:
uploaded_any = True
fancy_progress.ok()
else:
fancy_progress.fail()
errors.append((spec, error))
# don't bother pushing keys / index if all failed to upload
if not uploaded_any:
return skipped, errors
# If the layout.json doesn't yet exist on this mirror, push it
cache_class = get_url_buildcache_class(layout_version=CURRENT_BUILD_CACHE_LAYOUT_VERSION)
cache_class.maybe_push_layout_json(out_url)
if signing_key:
keys_tmpdir = os.path.join(tmpdir, "keys")
os.mkdir(keys_tmpdir)
_url_push_keys(out_url, keys=[signing_key], update_index=update_index, tmpdir=keys_tmpdir)
if update_index:
index_tmpdir = os.path.join(tmpdir, "index")
os.mkdir(index_tmpdir)
_url_generate_package_index(out_url, index_tmpdir)
return skipped, errors
def _oci_upload_success_msg(spec: spack.spec.Spec, digest: Digest, size: int, elapsed: float):
elapsed = max(elapsed, 0.001) # guard against division by zero
return (
f"Pushed {_format_spec(spec)}: {digest} ({elapsed:.2f}s, "
f"{size / elapsed / 1024 / 1024:.2f} MB/s)"
)
def _oci_get_blob_info(image_ref: ImageReference) -> Optional[spack.oci.oci.Blob]:
"""Get the spack tarball layer digests and size if it exists"""
try:
manifest, config = get_manifest_and_config_with_retry(image_ref)
return spack.oci.oci.Blob(
compressed_digest=Digest.from_string(manifest["layers"][-1]["digest"]),
uncompressed_digest=Digest.from_string(config["rootfs"]["diff_ids"][-1]),
size=manifest["layers"][-1]["size"],
)
except Exception:
return None
def _oci_push_pkg_blob(
image_ref: ImageReference, spec: spack.spec.Spec, tmpdir: str
) -> Tuple[spack.oci.oci.Blob, float]:
"""Push a package blob to the registry and return the blob info and the time taken"""
filename = os.path.join(tmpdir, f"{spec.dag_hash()}.tar.gz")
# Create an oci.image.layer aka tarball of the package
tar_gz_checksum, tar_checksum = create_tarball(spec, filename)
blob = spack.oci.oci.Blob(
Digest.from_sha256(tar_gz_checksum),
Digest.from_sha256(tar_checksum),
os.path.getsize(filename),
)
# Upload the blob
start = time.time()
upload_blob_with_retry(image_ref, file=filename, digest=blob.compressed_digest)
elapsed = time.time() - start
# delete the file
os.unlink(filename)
return blob, elapsed
def _oci_retrieve_env_dict_from_config(config: dict) -> dict:
"""Retrieve the environment variables from the image config file.
Sets a default value for PATH if it is not present.
Args:
config (dict): The image config file.
Returns:
dict: The environment variables.
"""
env = {"PATH": "/bin:/usr/bin"}
if "Env" in config.get("config", {}):
for entry in config["config"]["Env"]:
key, value = entry.split("=", 1)
env[key] = value
return env
def _oci_archspec_to_gooarch(spec: spack.spec.Spec) -> str:
name = spec.target.family.name
name_map = {"aarch64": "arm64", "x86_64": "amd64"}
return name_map.get(name, name)
def _oci_put_manifest(
base_images: Dict[str, Tuple[dict, dict]],
checksums: Dict[str, spack.oci.oci.Blob],
image_ref: ImageReference,
tmpdir: str,
extra_config: Optional[dict],
annotations: Optional[dict],
*specs: spack.spec.Spec,
):
architecture = _oci_archspec_to_gooarch(specs[0])
expected_blobs: List[spack.spec.Spec] = [
s
for s in traverse.traverse_nodes(specs, order="topo", deptype=("link", "run"), root=True)
if not s.external
]
expected_blobs.reverse()
base_manifest, base_config = base_images[architecture]
env = _oci_retrieve_env_dict_from_config(base_config)
# If the base image uses `vnd.docker.distribution.manifest.v2+json`, then we use that too.
# This is because Singularity / Apptainer is very strict about not mixing them.
base_manifest_mediaType = base_manifest.get(
"mediaType", "application/vnd.oci.image.manifest.v1+json"
)
use_docker_format = (
base_manifest_mediaType == "application/vnd.docker.distribution.manifest.v2+json"
)
spack.user_environment.environment_modifications_for_specs(*specs).apply_modifications(env)
# Create an oci.image.config file
config = copy.deepcopy(base_config)
# Add the diff ids of the blobs
for s in expected_blobs:
# If a layer for a dependency has gone missing (due to removed manifest in the registry, a
# failed push, or a local forced uninstall), we cannot create a runnable container image.
checksum = checksums.get(s.dag_hash())
if checksum:
config["rootfs"]["diff_ids"].append(str(checksum.uncompressed_digest))
# Set the environment variables
config["config"]["Env"] = [f"{k}={v}" for k, v in env.items()]
if extra_config:
# From the OCI v1.0 spec:
# > Any extra fields in the Image JSON struct are considered implementation
# > specific and MUST be ignored by any implementations which are unable to
# > interpret them.
config.update(extra_config)
config_file = os.path.join(tmpdir, f"{specs[0].dag_hash()}.config.json")
with open(config_file, "w", encoding="utf-8") as f:
json.dump(config, f, separators=(",", ":"))
config_file_checksum = Digest.from_sha256(
spack.util.crypto.checksum(hashlib.sha256, config_file)
)
# Upload the config file
upload_blob_with_retry(image_ref, file=config_file, digest=config_file_checksum)
manifest = {
"mediaType": base_manifest_mediaType,
"schemaVersion": 2,
"config": {
"mediaType": base_manifest["config"]["mediaType"],
"digest": str(config_file_checksum),
"size": os.path.getsize(config_file),
},
"layers": [
*(layer for layer in base_manifest["layers"]),
*(
{
"mediaType": (
"application/vnd.docker.image.rootfs.diff.tar.gzip"
if use_docker_format
else "application/vnd.oci.image.layer.v1.tar+gzip"
),
"digest": str(checksums[s.dag_hash()].compressed_digest),
"size": checksums[s.dag_hash()].size,
}
for s in expected_blobs
if s.dag_hash() in checksums
),
],
}
if not use_docker_format and annotations:
manifest["annotations"] = annotations
# Finally upload the manifest
upload_manifest_with_retry(image_ref, manifest=manifest)
# delete the config file
os.unlink(config_file)
def _oci_update_base_images(
*,
base_image: Optional[ImageReference],
target_image: ImageReference,
spec: spack.spec.Spec,
base_image_cache: Dict[str, Tuple[dict, dict]],
):
"""For a given spec and base image, copy the missing layers of the base image with matching
arch to the registry of the target image. If no base image is specified, create a dummy
manifest and config file."""
architecture = _oci_archspec_to_gooarch(spec)
if architecture in base_image_cache:
return
if base_image is None:
base_image_cache[architecture] = (
default_manifest(),
default_config(architecture, "linux"),
)
else:
base_image_cache[architecture] = copy_missing_layers_with_retry(
base_image, target_image, architecture
)
def _oci_default_tag(spec: spack.spec.Spec) -> str:
"""Return a valid, default image tag for a spec."""
return ensure_valid_tag(f"{spec.name}-{spec.version}-{spec.dag_hash()}.spack")
#: Default OCI index tag
default_index_tag = "index.spack"
[docs]
def tag_is_spec(tag: str) -> bool:
"""Check if a tag is likely a Spec"""
return tag.endswith(".spack") and tag != default_index_tag
def _oci_push(
*,
target_image: ImageReference,
base_image: Optional[ImageReference],
installed_specs_with_deps: List[spack.spec.Spec],
tmpdir: str,
executor: concurrent.futures.Executor,
force: bool = False,
) -> Tuple[
List[spack.spec.Spec],
Dict[str, Tuple[dict, dict]],
Dict[str, spack.oci.oci.Blob],
List[Tuple[spack.spec.Spec, BaseException]],
]:
# Spec dag hash -> blob
checksums: Dict[str, spack.oci.oci.Blob] = {}
# arch -> (manifest, config)
base_images: Dict[str, Tuple[dict, dict]] = {}
# Specs not uploaded because they already exist
skipped: List[spack.spec.Spec] = []
if not force:
tty.info("Checking for existing specs in the buildcache")
blobs_to_upload = []
tags_to_check = (
target_image.with_tag(_oci_default_tag(s)) for s in installed_specs_with_deps
)
available_blobs = executor.map(_oci_get_blob_info, tags_to_check)
for spec, maybe_blob in zip(installed_specs_with_deps, available_blobs):
if maybe_blob is not None:
checksums[spec.dag_hash()] = maybe_blob
skipped.append(spec)
else:
blobs_to_upload.append(spec)
else:
blobs_to_upload = installed_specs_with_deps
if not blobs_to_upload:
return skipped, base_images, checksums, []
if len(blobs_to_upload) != len(installed_specs_with_deps):
tty.info(
f"{len(blobs_to_upload)} specs need to be pushed to "
f"{target_image.domain}/{target_image.name}"
)
blob_progress = FancyProgress(len(blobs_to_upload))
# Upload blobs
blob_futures = [
executor.submit(_oci_push_pkg_blob, target_image, spec, tmpdir) for spec in blobs_to_upload
]
manifests_to_upload: List[spack.spec.Spec] = []
errors: List[Tuple[spack.spec.Spec, BaseException]] = []
# And update the spec to blob mapping for successful uploads
for spec, blob_future in zip(blobs_to_upload, blob_futures):
blob_progress.start(spec, blob_future.running())
error = blob_future.exception()
if error is None:
blob, elapsed = blob_future.result()
blob_progress.ok(
_oci_upload_success_msg(spec, blob.compressed_digest, blob.size, elapsed)
)
manifests_to_upload.append(spec)
checksums[spec.dag_hash()] = blob
else:
blob_progress.fail()
errors.append((spec, error))
# Copy base images if necessary
for spec in manifests_to_upload:
_oci_update_base_images(
base_image=base_image,
target_image=target_image,
spec=spec,
base_image_cache=base_images,
)
def extra_config(spec: spack.spec.Spec):
spec_dict = spec.to_dict(hash=ht.dag_hash)
spec_dict["buildcache_layout_version"] = CURRENT_BUILD_CACHE_LAYOUT_VERSION
spec_dict["binary_cache_checksum"] = {
"hash_algorithm": "sha256",
"hash": checksums[spec.dag_hash()].compressed_digest.digest,
}
spec_dict["archive_size"] = checksums[spec.dag_hash()].size
spec_dict["archive_timestamp"] = datetime.datetime.now().astimezone().isoformat()
spec_dict["archive_compression"] = "gzip"
return spec_dict
# Upload manifests
tty.info("Uploading manifests")
manifest_futures = [
executor.submit(
_oci_put_manifest,
base_images,
checksums,
target_image.with_tag(_oci_default_tag(spec)),
tmpdir,
extra_config(spec),
{"org.opencontainers.image.description": spec.format()},
spec,
)
for spec in manifests_to_upload
]
manifest_progress = FancyProgress(len(manifests_to_upload))
# Print the image names of the top-level specs
for spec, manifest_future in zip(manifests_to_upload, manifest_futures):
error = manifest_future.exception()
manifest_progress.start(spec, manifest_future.running())
if error is None:
manifest_progress.ok(
f"Tagged {_format_spec(spec)} as {target_image.with_tag(_oci_default_tag(spec))}"
)
else:
manifest_progress.fail()
errors.append((spec, error))
return skipped, base_images, checksums, errors
def _oci_config_from_tag(image_ref_and_tag: Tuple[ImageReference, str]) -> Optional[dict]:
image_ref, tag = image_ref_and_tag
# Don't allow recursion here, since Spack itself always uploads
# vnd.oci.image.manifest.v1+json, not vnd.oci.image.index.v1+json
_, config = get_manifest_and_config_with_retry(image_ref.with_tag(tag), tag, recurse=0)
# Do very basic validation: if "spec" is a key in the config, it
# must be a Spec object too.
return config if "spec" in config else None
def _oci_update_index(
image_ref: ImageReference, tmpdir: str, pool: concurrent.futures.Executor
) -> None:
tags = list_tags(image_ref)
# Fetch all image config files in parallel
spec_dicts = pool.map(
_oci_config_from_tag, ((image_ref, tag) for tag in tags if tag_is_spec(tag))
)
# Populate the database
db_root_dir = os.path.join(tmpdir, "db_root")
db = BuildCacheDatabase(db_root_dir)
for spec_dict in spec_dicts:
spec = spack.spec.Spec.from_dict(spec_dict)
db.add(spec)
db.mark(spec, "in_buildcache", True)
# Create the index.json file
index_json_path = os.path.join(tmpdir, spack.database.INDEX_JSON_FILE)
with open(index_json_path, "w", encoding="utf-8") as f:
db._write_to_file(f)
# Create an empty config.json file
empty_config_json_path = os.path.join(tmpdir, "config.json")
with open(empty_config_json_path, "wb") as f:
f.write(b"{}")
# Upload the index.json file
index_shasum = Digest.from_sha256(spack.util.crypto.checksum(hashlib.sha256, index_json_path))
upload_blob_with_retry(image_ref, file=index_json_path, digest=index_shasum)
# Upload the config.json file
empty_config_digest = Digest.from_sha256(
spack.util.crypto.checksum(hashlib.sha256, empty_config_json_path)
)
upload_blob_with_retry(image_ref, file=empty_config_json_path, digest=empty_config_digest)
# Push a manifest file that references the index.json file as a layer
# Notice that we push this as if it is an image, which it of course is not.
# When the ORAS spec becomes official, we can use that instead of a fake image.
# For now we just use the OCI image spec, so that we don't run into issues with
# automatic garbage collection of blobs that are not referenced by any image manifest.
oci_manifest = {
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"schemaVersion": 2,
# Config is just an empty {} file for now, and irrelevant
"config": {
"mediaType": "application/vnd.oci.image.config.v1+json",
"digest": str(empty_config_digest),
"size": os.path.getsize(empty_config_json_path),
},
# The buildcache index is the only layer, and is not a tarball, we lie here.
"layers": [
{
"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip",
"digest": str(index_shasum),
"size": os.path.getsize(index_json_path),
}
],
}
upload_manifest_with_retry(image_ref.with_tag(default_index_tag), oci_manifest)
[docs]
def try_fetch(url_to_fetch):
"""Utility function to try and fetch a file from a url, stage it
locally, and return the path to the staged file.
Args:
url_to_fetch (str): Url pointing to remote resource to fetch
Returns:
Path to locally staged resource or ``None`` if it could not be fetched.
"""
stage = Stage(url_to_fetch, keep=True)
stage.create()
try:
stage.fetch()
except spack.error.FetchError:
stage.destroy()
return None
return stage
[docs]
def download_tarball(
spec: spack.spec.Spec,
unsigned: Optional[bool] = False,
mirrors_for_spec: Optional[List[MirrorMetadata]] = None,
) -> Optional[spack.stage.Stage]:
"""Download binary tarball for given package
Args:
spec: a concrete spec
unsigned: if ``True`` or ``False`` override the mirror signature verification defaults
mirrors_for_spec: Optional list of mirrors known to have the spec. These will be checked
in order first before looking in other configured mirrors.
Returns:
``None`` if the tarball could not be downloaded, the signature verified
(if required), and its checksum validated. Otherwise, return the stage
containing the downloaded tarball.
"""
configured_mirrors: Iterable[spack.mirrors.mirror.Mirror] = (
spack.mirrors.mirror.MirrorCollection(binary=True).values()
)
if not configured_mirrors:
raise NoConfiguredBinaryMirrors()
# Note on try_first and try_next:
# mirrors_for_spec mostly likely came from spack caching remote
# mirror indices locally and adding their specs to a local data
# structure supporting quick lookup of concrete specs. Those
# mirrors are likely a subset of all configured mirrors, and
# we'll probably find what we need in one of them. But we'll
# look in all configured mirrors if needed, as maybe the spec
# we need was in an un-indexed mirror. No need to check any
# mirror for the spec twice though.
try_first = mirrors_for_spec or []
try_next = [
MirrorMetadata(mirror.fetch_url, layout, mirror.fetch_view)
for mirror in configured_mirrors
for layout in mirror.supported_layout_versions
]
urls_and_versions = try_first + [uv for uv in try_next if uv not in try_first]
# TODO: turn `mirrors_for_spec` into a list of Mirror instances, instead of doing that here.
def fetch_url_to_mirror(
mirror_metadata: MirrorMetadata,
) -> Tuple[spack.mirrors.mirror.Mirror, int]:
url = mirror_metadata.url
layout_version = mirror_metadata.version
for mirror in configured_mirrors:
if mirror.fetch_url == url:
return mirror, layout_version
return spack.mirrors.mirror.Mirror(url), layout_version
mirrors = [fetch_url_to_mirror(mirror_metadata) for mirror_metadata in urls_and_versions]
for mirror, layout_version in mirrors:
# Override mirror's default if
currently_unsigned = unsigned if unsigned is not None else not mirror.signed
# If it's an OCI index, do things differently, since we cannot compose URLs.
fetch_url = mirror.fetch_url
# TODO: refactor this to some "nice" place.
if spack.oci.image.is_oci_url(fetch_url):
ref = ImageReference.from_url(fetch_url).with_tag(_oci_default_tag(spec))
# Fetch the manifest
try:
with spack.oci.opener.urlopen(
urllib.request.Request(
url=ref.manifest_url(),
headers={"Accept": ", ".join(spack.oci.oci.manifest_content_type)},
)
) as response:
manifest = json.load(response)
except Exception:
continue
# Download the config = spec.json and the relevant tarball
try:
spec_digest = spack.oci.image.Digest.from_string(manifest["config"]["digest"])
tarball_digest = spack.oci.image.Digest.from_string(
manifest["layers"][-1]["digest"]
)
except Exception:
continue
with spack.oci.oci.make_stage(
ref.blob_url(spec_digest), spec_digest, keep=True
) as local_specfile_stage:
try:
local_specfile_stage.fetch()
local_specfile_stage.check()
try:
get_valid_spec_file(
local_specfile_stage.save_filename, CURRENT_BUILD_CACHE_LAYOUT_VERSION
)
except InvalidMetadataFile as e:
tty.warn(
f"Ignoring binary package for {spec.name}/{spec.dag_hash()[:7]} "
f"from {fetch_url} due to invalid metadata file: {e}"
)
local_specfile_stage.destroy()
continue
except Exception:
continue
local_specfile_stage.cache_local()
local_specfile_stage.destroy()
with spack.oci.oci.make_stage(
ref.blob_url(tarball_digest), tarball_digest, keep=True
) as tarball_stage:
try:
tarball_stage.fetch()
tarball_stage.check()
except Exception:
continue
tarball_stage.cache_local()
return tarball_stage
else:
cache_type = get_url_buildcache_class(layout_version=layout_version)
cache_entry = cache_type(fetch_url, spec, allow_unsigned=currently_unsigned)
try:
cache_entry.fetch_archive()
except Exception as e:
tty.debug(
f"Encountered error attempting to fetch archive for "
f"{spec.name}/{spec.dag_hash()[:7]} from {fetch_url} "
f"(v{layout_version}) due to {e}"
)
cache_entry.destroy()
continue
if layout_version == 2:
warn_v2_layout(fetch_url, "Installing a spec")
return cache_entry.get_archive_stage()
# Falling through the nested loops means we exhaustively searched
# for all known kinds of spec files on all mirrors and did not find
# an acceptable one for which we could download a tarball and (if
# needed) verify a signature. So at this point, we will proceed to
# install from source.
return None
[docs]
def dedupe_hardlinks_if_necessary(root, buildinfo):
"""Updates a buildinfo dict for old archives that did not dedupe hardlinks. De-duping hardlinks
is necessary when relocating files in parallel and in-place. This means we must preserve inodes
when relocating."""
# New archives don't need this.
if buildinfo.get("hardlinks_deduped", False):
return
# Clearly we can assume that an inode is either in the
# textfile or binary group, but let's just stick to
# a single set of visited nodes.
visited = set()
# Note: we do *not* dedupe hardlinked symlinks, since
# it seems difficult or even impossible to relink
# symlinks while preserving inode.
for key in ("relocate_textfiles", "relocate_binaries"):
if key not in buildinfo:
continue
new_list = []
for rel_path in buildinfo[key]:
stat_result = os.lstat(os.path.join(root, rel_path))
identifier = (stat_result.st_dev, stat_result.st_ino)
if stat_result.st_nlink > 1:
if identifier in visited:
continue
visited.add(identifier)
new_list.append(rel_path)
buildinfo[key] = new_list
[docs]
def relocate_package(spec: spack.spec.Spec) -> None:
"""Relocate binaries and text files in the given spec prefix, based on its buildinfo file."""
spec_prefix = str(spec.prefix)
buildinfo = read_buildinfo_file(spec_prefix)
old_layout_root = str(buildinfo["buildpath"])
# Warn about old style tarballs created with the --rel flag (removed in Spack v0.20)
if buildinfo.get("relative_rpaths", False):
tty.warn(
f"Tarball for {spec} uses relative rpaths, which can cause library loading issues."
)
# In Spack 0.19 and older prefix_to_hash was the default and externals were not dropped, so
# prefixes were not unique.
if "hash_to_prefix" in buildinfo:
hash_to_old_prefix = buildinfo["hash_to_prefix"]
elif "prefix_to_hash" in buildinfo:
hash_to_old_prefix = {v: k for (k, v) in buildinfo["prefix_to_hash"].items()}
else:
raise NewLayoutException(
"Package tarball was created from an install prefix with a different directory layout "
"and an older buildcache create implementation. It cannot be relocated."
)
prefix_to_prefix: Dict[str, str] = {}
if "sbang_install_path" in buildinfo:
old_sbang_install_path = str(buildinfo["sbang_install_path"])
prefix_to_prefix[old_sbang_install_path] = spack.hooks.sbang.sbang_install_path()
# First match specific prefix paths. Possibly the *local* install prefix of some dependency is
# in an upstream, so we cannot assume the original spack store root can be mapped uniformly to
# the new spack store root.
# If the spec is spliced, we need to handle the simultaneous mapping from the old install_tree
# to the new install_tree and from the build_spec to the spliced spec. Because foo.build_spec
# is foo for any non-spliced spec, we can simplify by checking for spliced-in nodes by checking
# for nodes not in the build_spec without any explicit check for whether the spec is spliced.
# An analog in this algorithm is any spec that shares a name or provides the same virtuals in
# the context of the relevant root spec. This ensures that the analog for a spec s is the spec
# that s replaced when we spliced.
relocation_specs = specs_to_relocate(spec)
build_spec_ids = set(id(s) for s in spec.build_spec.traverse(deptype=dt.ALL & ~dt.BUILD))
for s in relocation_specs:
analog = s
if id(s) not in build_spec_ids:
analogs = [
d
for d in spec.build_spec.traverse(deptype=dt.ALL & ~dt.BUILD)
if s._splice_match(d, self_root=spec, other_root=spec.build_spec)
]
if analogs:
# Prefer same-name analogs and prefer higher versions
# This matches the preferences in spack.spec.Spec.splice, so we
# will find same node
analog = max(analogs, key=lambda a: (a.name == s.name, a.version))
lookup_dag_hash = analog.dag_hash()
if lookup_dag_hash in hash_to_old_prefix:
old_dep_prefix = hash_to_old_prefix[lookup_dag_hash]
prefix_to_prefix[old_dep_prefix] = str(s.prefix)
# Only then add the generic fallback of install prefix -> install prefix.
prefix_to_prefix[old_layout_root] = str(spack.store.STORE.layout.root)
# Delete identity mappings from prefix_to_prefix
prefix_to_prefix = {k: v for k, v in prefix_to_prefix.items() if k != v}
# If there's nothing to relocate, we're done.
if not prefix_to_prefix:
return
for old, new in prefix_to_prefix.items():
tty.debug(f"Relocating: {old} => {new}.")
# Old archives may have hardlinks repeated.
dedupe_hardlinks_if_necessary(spec_prefix, buildinfo)
# Text files containing the prefix text
textfiles = [os.path.join(spec_prefix, f) for f in buildinfo["relocate_textfiles"]]
binaries = [os.path.join(spec_prefix, f) for f in buildinfo.get("relocate_binaries")]
links = [os.path.join(spec_prefix, f) for f in buildinfo.get("relocate_links", [])]
platform = spack.platforms.by_name(spec.platform)
if "macho" in platform.binary_formats:
relocate.relocate_macho_binaries(binaries, prefix_to_prefix)
elif "elf" in platform.binary_formats:
relocate.relocate_elf_binaries(binaries, prefix_to_prefix)
relocate.relocate_links(links, prefix_to_prefix)
relocate.relocate_text(textfiles, prefix_to_prefix)
changed_files = relocate.relocate_text_bin(binaries, prefix_to_prefix)
# Add ad-hoc signatures to patched macho files when on macOS.
if "macho" in platform.binary_formats and sys.platform == "darwin":
codesign = which("codesign")
if not codesign:
return
for binary in changed_files:
# preserve the original inode by running codesign on a copy
with fsys.edit_in_place_through_temporary_file(binary) as tmp_binary:
codesign("-fs-", tmp_binary)
install_manifest = os.path.join(
spec.prefix,
spack.store.STORE.layout.metadata_dir,
spack.store.STORE.layout.manifest_file_name,
)
if not os.path.exists(install_manifest):
spec_id = spec.format("{name}/{hash:7}")
tty.warn("No manifest file in tarball for spec %s" % spec_id)
# overwrite old metadata with new
if spec.spliced:
# rewrite spec on disk
spack.store.STORE.layout.write_spec(spec, spack.store.STORE.layout.spec_file_path(spec))
# de-cache the install manifest
with contextlib.suppress(FileNotFoundError):
os.unlink(install_manifest)
def _tar_strip_component(tar: tarfile.TarFile, prefix: str):
"""Yield all members of tarfile that start with given prefix, and strip that prefix (including
symlinks)"""
# Including trailing /, otherwise we end up with absolute paths.
regex = re.compile(re.escape(prefix) + "/*")
# Only yield members in the package prefix.
# Note: when a tarfile is created, relative in-prefix symlinks are
# expanded to matching member names of tarfile entries. So, we have
# to ensure that those are updated too.
# Absolute symlinks are copied verbatim -- relocation should take care of
# them.
for m in tar.getmembers():
result = regex.match(m.name)
if not result:
continue
m.name = m.name[result.end() :]
if m.linkname:
result = regex.match(m.linkname)
if result:
m.linkname = m.linkname[result.end() :]
yield m
def _ensure_common_prefix(tar: tarfile.TarFile) -> str:
# Find the lowest `binary_distribution` file (hard-coded forward slash is on purpose).
binary_distribution = min(
(
e.name
for e in tar.getmembers()
if e.isfile() and e.name.endswith(".spack/binary_distribution")
),
key=len,
default=None,
)
if binary_distribution is None:
raise ValueError("Tarball is not a Spack package, missing binary_distribution file")
pkg_path = pathlib.PurePosixPath(binary_distribution).parent.parent
# Even the most ancient Spack version has required to list the dir of the package itself, so
# guard against broken tarballs where `path.parent.parent` is empty.
if pkg_path == pathlib.PurePosixPath():
raise ValueError("Invalid tarball, missing package prefix dir")
pkg_prefix = str(pkg_path)
# Ensure all tar entries are in the pkg_prefix dir, and if they're not, they should be parent
# dirs of it.
has_prefix = False
for member in tar.getmembers():
stripped = member.name.rstrip("/")
if not (
stripped.startswith(pkg_prefix) or member.isdir() and pkg_prefix.startswith(stripped)
):
raise ValueError(f"Tarball contains file {stripped} outside of prefix {pkg_prefix}")
if member.isdir() and stripped == pkg_prefix:
has_prefix = True
# This is technically not required, but let's be defensive about the existence of the package
# prefix dir.
if not has_prefix:
raise ValueError(f"Tarball does not contain a common prefix {pkg_prefix}")
return pkg_prefix
[docs]
def install_root_node(
spec: spack.spec.Spec,
unsigned=False,
force: bool = False,
sha256: Optional[str] = None,
allow_missing: bool = False,
) -> None:
"""Install the root node of a concrete spec from a buildcache.
Checking the sha256 sum of a node before installation is usually needed only
for software installed during Spack's bootstrapping (since we might not have
a proper signature verification mechanism available).
Args:
spec: spec to be installed (note that only the root node will be installed)
unsigned: if True allows installing unsigned binaries
force: force installation if the spec is already present in the local store
sha256: optional sha256 of the binary package, to be checked before installation
allow_missing: when true, allows installing a node with missing dependencies
"""
# Early termination
if spec.external or not spec.concrete:
warnings.warn("Skipping external or abstract spec {0}".format(spec.format()))
return
elif spec.installed and not force:
warnings.warn("Package for spec {0} already installed.".format(spec.format()))
return
tarball_stage = download_tarball(spec.build_spec, unsigned)
if not tarball_stage:
msg = 'download of binary cache file for spec "{0}" failed'
raise RuntimeError(msg.format(spec.build_spec.format()))
# don't print long padded paths while extracting/relocating binaries
with spack.util.path.filter_padding():
tty.msg('Installing "{0}" from a buildcache'.format(spec.format()))
extract_tarball(spec, tarball_stage, force)
spec.package.windows_establish_runtime_linkage()
spack.hooks.post_install(spec, False)
spack.store.STORE.db.add(spec, allow_missing=allow_missing)
[docs]
def install_single_spec(spec, unsigned=False, force=False):
"""Install a single concrete spec from a buildcache.
Args:
spec (spack.spec.Spec): spec to be installed
unsigned (bool): if True allows installing unsigned binaries
force (bool): force installation if the spec is already present in the
local store
"""
for node in spec.traverse(root=True, order="post", deptype=("link", "run")):
install_root_node(node, unsigned=unsigned, force=force)
[docs]
def try_direct_fetch(spec: spack.spec.Spec) -> List[MirrorMetadata]:
"""Try to find the spec directly on the configured mirrors"""
found_specs: List[MirrorMetadata] = []
binary_mirrors = spack.mirrors.mirror.MirrorCollection(binary=True).values()
for mirror in binary_mirrors:
# TODO: OCI-support
if spack.oci.image.is_oci_url(mirror.fetch_url):
continue
for layout_version in mirror.supported_layout_versions:
# layout_version could eventually come from the mirror config
cache_class = get_url_buildcache_class(layout_version=layout_version)
cache_entry = cache_class(mirror.fetch_url, spec)
try:
spec_dict = cache_entry.fetch_metadata()
except BuildcacheEntryError:
continue
finally:
cache_entry.destroy()
# All specs in build caches are concrete (as they are built) so we need
# to mark this spec concrete on read-in.
fetched_spec = spack.spec.Spec.from_dict(spec_dict)
fetched_spec._mark_concrete()
found_specs.append(MirrorMetadata(mirror.fetch_url, layout_version, mirror.fetch_view))
return found_specs
[docs]
def get_mirrors_for_spec(spec: spack.spec.Spec, index_only: bool = False) -> List[MirrorMetadata]:
"""
Check if concrete spec exists on mirrors and return a list indicating the mirrors on which it
can be found
Args:
spec: The spec to look for in binary mirrors
index_only: When ``index_only`` is set to ``True``, only the local cache is checked, no
requests are made.
"""
if not spack.mirrors.mirror.MirrorCollection(binary=True):
tty.debug("No Spack mirrors are currently configured")
return []
results = BINARY_INDEX.find_built_spec(spec)
# The index may be out-of-date. If we aren't only considering indices, try
# to fetch directly since we know where the file should be.
if not results and not index_only:
results = try_direct_fetch(spec)
# We found a spec by the direct fetch approach, we might as well
# add it to our mapping.
if results:
BINARY_INDEX.update_spec(spec, results)
return results
[docs]
def update_cache_and_get_specs():
"""
Get all concrete specs for build caches available on configured mirrors.
Initialization of internal cache data structures is done as lazily as
possible, so this method will also attempt to initialize and update the
local index cache (essentially a no-op if it has been done already and
nothing has changed on the configured mirrors.)
Raises:
FetchCacheError
"""
BINARY_INDEX.update()
return BINARY_INDEX.get_all_built_specs()
[docs]
def get_keys(
install: bool = False,
trust: bool = False,
force: bool = False,
mirrors: Optional[Mapping[str, spack.mirrors.mirror.Mirror]] = None,
):
"""Get pgp public keys available on mirror with suffix .pub"""
mirror_collection = mirrors or spack.mirrors.mirror.MirrorCollection(binary=True)
if not mirror_collection:
tty.die("Please add a spack mirror to allow " + "download of build caches.")
fingerprints = []
for mirror in mirror_collection.values():
if not mirror.signed:
# Don't bother fetching keys for unsigned mirrors
continue
for layout_version in mirror.supported_layout_versions:
fetch_url = mirror.fetch_url
if layout_version == 2:
mirror_layout_fingerprints = _get_keys_v2(fetch_url, install, trust, force)
else:
mirror_layout_fingerprints = _get_keys(
fetch_url, layout_version, install, trust, force
)
if mirror_layout_fingerprints:
fingerprints.extend(mirror_layout_fingerprints)
return fingerprints
def _get_keys(
mirror_url: str,
layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION,
install: bool = False,
trust: bool = False,
force: bool = False,
) -> Optional[List[str]]:
cache_class = get_url_buildcache_class(layout_version=layout_version)
tty.debug("Finding public keys in {0}".format(url_util.format(mirror_url)))
keys_prefix = url_util.join(
mirror_url, *cache_class.get_relative_path_components(BuildcacheComponent.KEY)
)
key_index_manifest_url = url_util.join(keys_prefix, "keys.manifest.json")
index_entry = cache_class(mirror_url, allow_unsigned=True)
try:
index_manifest = index_entry.read_manifest(manifest_url=key_index_manifest_url)
index_blob_path = index_entry.fetch_blob(index_manifest.data[0])
except BuildcacheEntryError as e:
tty.debug(f"Failed to fetch key index due to: {e}")
index_entry.destroy()
return None
with open(index_blob_path, encoding="utf-8") as fd:
json_index = json.load(fd)
index_entry.destroy()
saved_fingerprints = []
for fingerprint, _ in json_index["keys"].items():
key_manifest_url = url_util.join(keys_prefix, f"{fingerprint}.key.manifest.json")
key_entry = cache_class(mirror_url, allow_unsigned=True)
try:
key_manifest = key_entry.read_manifest(manifest_url=key_manifest_url)
key_blob_path = key_entry.fetch_blob(key_manifest.data[0])
except BuildcacheEntryError as e:
tty.debug(f"Failed to fetch key {fingerprint} due to: {e}")
key_entry.destroy()
continue
tty.debug("Found key {0}".format(fingerprint))
if install:
if trust:
spack.util.gpg.trust(key_blob_path)
tty.debug(f"Added {fingerprint} to trusted keys.")
saved_fingerprints.append(fingerprint)
else:
tty.debug(
"Will not add this key to trusted keys.Use -t to install all downloaded keys"
)
key_entry.destroy()
return saved_fingerprints
def _get_keys_v2(mirror_url, install=False, trust=False, force=False) -> Optional[List[str]]:
cache_class = get_url_buildcache_class(layout_version=2)
keys_url = url_util.join(
mirror_url, *cache_class.get_relative_path_components(BuildcacheComponent.KEY)
)
keys_index = url_util.join(keys_url, "index.json")
tty.debug("Finding public keys in {0}".format(url_util.format(mirror_url)))
try:
json_index = web_util.read_json(keys_index)
except (web_util.SpackWebError, OSError, ValueError) as url_err:
# TODO: avoid repeated request
if web_util.url_exists(keys_index):
tty.error(
f"Unable to find public keys in {url_util.format(mirror_url)},"
f" caught exception attempting to read from {url_util.format(keys_index)}."
)
tty.error(url_err)
return None
saved_fingerprints = []
for fingerprint, key_attributes in json_index["keys"].items():
link = os.path.join(keys_url, fingerprint + ".pub")
with Stage(link, name="build_cache", keep=True) as stage:
if os.path.exists(stage.save_filename) and force:
os.remove(stage.save_filename)
if not os.path.exists(stage.save_filename):
try:
stage.fetch()
except spack.error.FetchError:
continue
tty.debug("Found key {0}".format(fingerprint))
if install:
if trust:
spack.util.gpg.trust(stage.save_filename)
tty.debug("Added this key to trusted keys.")
saved_fingerprints.append(fingerprint)
else:
tty.debug(
"Will not add this key to trusted keys.Use -t to install all downloaded keys"
)
return saved_fingerprints
def _url_push_keys(
*mirrors: Union[spack.mirrors.mirror.Mirror, str],
keys: List[str],
tmpdir: str,
update_index: bool = False,
):
"""Upload pgp public keys to the given mirrors"""
keys = spack.util.gpg.public_keys(*(keys or ()))
files = [os.path.join(tmpdir, f"{key}.pub") for key in keys]
for key, file in zip(keys, files):
spack.util.gpg.export_keys(file, [key])
cache_class = get_url_buildcache_class()
for mirror in mirrors:
push_url = mirror if isinstance(mirror, str) else mirror.push_url
tty.debug(f"Pushing public keys to {url_util.format(push_url)}")
pushed_a_key = False
for key, file in zip(keys, files):
cache_class.push_local_file_as_blob(
local_file_path=file,
mirror_url=push_url,
manifest_name=f"{key}.key",
component_type=BuildcacheComponent.KEY,
compression="none",
)
pushed_a_key = True
if update_index:
generate_key_index(push_url, tmpdir=tmpdir)
if pushed_a_key or update_index:
cache_class.maybe_push_layout_json(push_url)
[docs]
def needs_rebuild(spec, mirror_url):
if not spec.concrete:
raise ValueError("spec must be concrete to check against mirror")
pkg_name = spec.name
pkg_version = spec.version
pkg_hash = spec.dag_hash()
tty.debug("Checking {0}-{1}, dag_hash = {2}".format(pkg_name, pkg_version, pkg_hash))
tty.debug(spec.tree())
# Try to retrieve the specfile directly, based on the known
# format of the name, in order to determine if the package
# needs to be rebuilt.
cache_class = get_url_buildcache_class(layout_version=CURRENT_BUILD_CACHE_LAYOUT_VERSION)
cache_entry = cache_class(mirror_url, spec, allow_unsigned=True)
exists = cache_entry.exists([BuildcacheComponent.SPEC, BuildcacheComponent.TARBALL])
return not exists
[docs]
def check_specs_against_mirrors(mirrors, specs, output_file=None):
"""Check all the given specs against buildcaches on the given mirrors and
determine if any of the specs need to be rebuilt. Specs need to be rebuilt
when their hash doesn't exist in the mirror.
Arguments:
mirrors (dict): Mirrors to check against
specs (typing.Iterable): Specs to check against mirrors
output_file (str): Path to output file to be written. If provided,
mirrors with missing or out-of-date specs will be formatted as a
JSON object and written to this file.
Returns: 1 if any spec was out-of-date on any mirror, 0 otherwise.
"""
rebuilds = {}
for mirror in spack.mirrors.mirror.MirrorCollection(mirrors, binary=True).values():
tty.debug("Checking for built specs at {0}".format(mirror.fetch_url))
rebuild_list = []
for spec in specs:
if needs_rebuild(spec, mirror.fetch_url):
rebuild_list.append({"short_spec": spec.short_spec, "hash": spec.dag_hash()})
if rebuild_list:
rebuilds[mirror.fetch_url] = {
"mirrorName": mirror.name,
"mirrorUrl": mirror.fetch_url,
"rebuildSpecs": rebuild_list,
}
if output_file:
with open(output_file, "w", encoding="utf-8") as outf:
outf.write(json.dumps(rebuilds))
return 1 if rebuilds else 0
[docs]
def download_single_spec(
concrete_spec,
destination,
mirror_url=None,
layout_version: int = CURRENT_BUILD_CACHE_LAYOUT_VERSION,
):
"""Download the buildcache files for a single concrete spec.
Args:
concrete_spec: concrete spec to be downloaded
destination (str): path where to put the downloaded buildcache
mirror_url (str): url of the mirror from which to download
"""
if not mirror_url and not spack.mirrors.mirror.MirrorCollection(binary=True):
tty.die(
"Please provide or add a spack mirror to allow " + "download of buildcache entries."
)
urls = (
[mirror_url]
if mirror_url
else [
mirror.fetch_url
for mirror in spack.mirrors.mirror.MirrorCollection(binary=True).values()
]
)
mkdirp(destination)
for url in urls:
cache_class = get_url_buildcache_class(layout_version=layout_version)
cache_entry = cache_class(url, concrete_spec, allow_unsigned=True)
try:
cache_entry.fetch_metadata()
cache_entry.fetch_archive()
except BuildcacheEntryError as e:
tty.warn(f"Error downloading {concrete_spec.name}/{concrete_spec.dag_hash()[:7]}: {e}")
cache_entry.destroy()
continue
shutil.move(cache_entry.get_local_spec_path(), destination)
shutil.move(cache_entry.get_local_archive_path(), destination)
return True
return False
[docs]
class BinaryCacheQuery:
"""Callable object to query if a spec is in a binary cache"""
def __init__(self, all_architectures):
"""
Args:
all_architectures (bool): if True consider all the spec for querying,
otherwise restrict to the current default architecture
"""
self.all_architectures = all_architectures
specs = update_cache_and_get_specs()
if not self.all_architectures:
arch = spack.spec.Spec.default_arch()
specs = [s for s in specs if s.satisfies(arch)]
self.possible_specs = specs
def __call__(self, spec: spack.spec.Spec, **kwargs):
"""
Args:
spec: The spec being searched for
"""
return [s for s in self.possible_specs if s.satisfies(spec)]
[docs]
class FetchIndexError(Exception):
def __str__(self):
if len(self.args) == 1:
return str(self.args[0])
else:
return "{}, due to: {}".format(self.args[0], self.args[1])
[docs]
class BuildcacheIndexError(spack.error.SpackError):
"""Raised when a buildcache cannot be read for any reason"""
[docs]
class BuildcacheIndexNotExists(Exception):
"""Buildcache does not contain an index"""
FetchIndexResult = collections.namedtuple("FetchIndexResult", "etag hash data fresh")
[docs]
class IndexHandler:
[docs]
def conditional_fetch(self) -> FetchIndexResult:
raise NotImplementedError(f"{self.__class__.__name__} is abstract")
[docs]
def get_index_manifest(self, manifest_response) -> BlobRecord:
"""Read the response of the manifest request and return a BlobRecord"""
cache_class = get_url_buildcache_class(CURRENT_BUILD_CACHE_LAYOUT_VERSION)
try:
result = io.TextIOWrapper(manifest_response, encoding="utf-8").read()
except (ValueError, OSError) as e:
raise FetchIndexError(f"Remote index {manifest_response.url} is invalid", e) from e
manifest = BuildcacheManifest.from_dict(
# Currently we do not sign buildcache index, but we could
cache_class.verify_and_extract_manifest(result, verify=False)
)
blob_record = manifest.get_blob_records(
cache_class.component_to_media_type(BuildcacheComponent.INDEX)
)[0]
return blob_record
[docs]
def fetch_index_blob(
self, cache_entry: URLBuildcacheEntry, blob_record: BlobRecord
) -> Tuple[str, str]:
"""Fetch the index blob indicated by the BlobRecord, and return the
(checksum, contents) of the blob"""
try:
staged_blob_path = cache_entry.fetch_blob(blob_record)
except BuildcacheEntryError as e:
cache_entry.destroy()
raise FetchIndexError(
f"Could not fetch index blob from {cache_entry.mirror_url}"
) from e
with open(staged_blob_path, encoding="utf-8") as fd:
blob_result = fd.read()
computed_hash = compute_hash(blob_result)
if computed_hash != blob_record.checksum:
cache_entry.destroy()
raise FetchIndexError(f"Remote index at {cache_entry.mirror_url} is invalid")
return (computed_hash, blob_result)
[docs]
class DefaultIndexHandlerV2(IndexHandler):
"""Fetcher for index.json, using separate index.json.hash as cache invalidation strategy"""
def __init__(self, mirror_metadata, local_hash, urlopen=web_util.urlopen):
self.url = mirror_metadata.url
self.local_hash = local_hash
self.urlopen = urlopen
self.headers = {"User-Agent": web_util.SPACK_USER_AGENT}
[docs]
def get_remote_hash(self):
# Failure to fetch index.json.hash is not fatal
url_index_hash = url_util.join(self.url, "build_cache", "index.json.hash")
try:
with self.urlopen(
urllib.request.Request(url_index_hash, headers=self.headers)
) as response:
remote_hash = response.read(64)
except OSError:
return None
# Validate the hash
if not re.match(rb"[a-f\d]{64}$", remote_hash):
return None
return remote_hash.decode("utf-8")
[docs]
def conditional_fetch(self) -> FetchIndexResult:
# Do an intermediate fetch for the hash
# and a conditional fetch for the contents
# Early exit if our cache is up to date.
if self.local_hash and self.local_hash == self.get_remote_hash():
return FetchIndexResult(etag=None, hash=None, data=None, fresh=True)
# Otherwise, download index.json
url_index = url_util.join(self.url, "build_cache", spack.database.INDEX_JSON_FILE)
try:
response = self.urlopen(urllib.request.Request(url_index, headers=self.headers))
except OSError as e:
raise FetchIndexError(f"Could not fetch index from {url_index}", e) from e
with response:
try:
result = io.TextIOWrapper(response, encoding="utf-8").read()
except (ValueError, OSError) as e:
raise FetchIndexError(f"Remote index {url_index} is invalid") from e
# For now we only handle etags on http(s), since 304 error handling
# in s3:// is not there yet.
if urllib.parse.urlparse(self.url).scheme not in ("http", "https"):
etag = None
else:
etag = web_util.parse_etag(
response.headers.get("Etag", None) or response.headers.get("etag", None)
)
computed_hash = compute_hash(result)
# We don't handle computed_hash != remote_hash here, which can happen
# when remote index.json and index.json.hash are out of sync, or if
# the hash algorithm changed.
# The most likely scenario is that we got index.json got updated
# while we fetched index.json.hash. Warning about an issue thus feels
# wrong, as it's more of an issue with race conditions in the cache
# invalidation strategy.
warn_v2_layout(self.url, "Fetching an index")
return FetchIndexResult(etag=etag, hash=computed_hash, data=result, fresh=False)
[docs]
class EtagIndexHandlerV2(IndexHandler):
"""Fetcher for index.json, using ETags headers as cache invalidation strategy"""
def __init__(self, mirror_metadata, etag, urlopen=web_util.urlopen):
self.url = mirror_metadata.url
self.etag = etag
self.urlopen = urlopen
[docs]
def conditional_fetch(self) -> FetchIndexResult:
# Just do a conditional fetch immediately
url = url_util.join(self.url, "build_cache", spack.database.INDEX_JSON_FILE)
headers = {"User-Agent": web_util.SPACK_USER_AGENT, "If-None-Match": f'"{self.etag}"'}
try:
response = self.urlopen(urllib.request.Request(url, headers=headers))
except urllib.error.HTTPError as e:
if e.getcode() == 304:
# Not modified; that means fresh.
return FetchIndexResult(etag=None, hash=None, data=None, fresh=True)
raise FetchIndexError(f"Could not fetch index {url}", e) from e
except OSError as e: # URLError, socket.timeout, etc.
raise FetchIndexError(f"Could not fetch index {url}", e) from e
with response:
try:
result = io.TextIOWrapper(response, encoding="utf-8").read()
except (ValueError, OSError) as e:
raise FetchIndexError(f"Remote index {url} is invalid", e) from e
warn_v2_layout(self.url, "Fetching an index")
etag_header_value = response.headers.get("Etag", None) or response.headers.get(
"etag", None
)
return FetchIndexResult(
etag=web_util.parse_etag(etag_header_value),
hash=compute_hash(result),
data=result,
fresh=False,
)
[docs]
class OCIIndexHandler(IndexHandler):
def __init__(self, mirror_metadata: MirrorMetadata, local_hash, urlopen=None) -> None:
self.local_hash = local_hash
self.ref = spack.oci.image.ImageReference.from_url(mirror_metadata.url)
self.urlopen = urlopen or spack.oci.opener.urlopen
[docs]
def conditional_fetch(self) -> FetchIndexResult:
"""Download an index from an OCI registry type mirror."""
url_manifest = self.ref.with_tag(default_index_tag).manifest_url()
try:
response = self.urlopen(
urllib.request.Request(
url=url_manifest,
headers={"Accept": "application/vnd.oci.image.manifest.v1+json"},
)
)
except OSError as e:
raise FetchIndexError(f"Could not fetch manifest from {url_manifest}", e) from e
with response:
try:
manifest = json.load(response)
except Exception as e:
raise FetchIndexError(f"Remote index {url_manifest} is invalid", e) from e
# Get first blob hash, which should be the index.json
try:
index_digest = spack.oci.image.Digest.from_string(manifest["layers"][0]["digest"])
except Exception as e:
raise FetchIndexError(f"Remote index {url_manifest} is invalid", e) from e
# Fresh?
if index_digest.digest == self.local_hash:
return FetchIndexResult(etag=None, hash=None, data=None, fresh=True)
# Otherwise fetch the blob / index.json
try:
with self.urlopen(
urllib.request.Request(
url=self.ref.blob_url(index_digest),
headers={"Accept": "application/vnd.oci.image.layer.v1.tar+gzip"},
)
) as response:
result = io.TextIOWrapper(response, encoding="utf-8").read()
except (OSError, ValueError) as e:
raise FetchIndexError(f"Remote index {url_manifest} is invalid", e) from e
# Make sure the blob we download has the advertised hash
if compute_hash(result) != index_digest.digest:
raise FetchIndexError(f"Remote index {url_manifest} is invalid")
return FetchIndexResult(etag=None, hash=index_digest.digest, data=result, fresh=False)
[docs]
class DefaultIndexHandler(IndexHandler):
"""Fetcher for buildcache index, cache invalidation via manifest contents"""
def __init__(self, mirror_metadata: MirrorMetadata, local_hash, urlopen=web_util.urlopen):
self.url = mirror_metadata.url
self.view = mirror_metadata.view
self.layout_version = mirror_metadata.version
self.local_hash = local_hash
self.urlopen = urlopen
self.headers = {"User-Agent": web_util.SPACK_USER_AGENT}
[docs]
def conditional_fetch(self) -> FetchIndexResult:
cache_class = get_url_buildcache_class(layout_version=self.layout_version)
url_index_manifest = cache_class.get_index_url(self.url, self.view)
try:
response = self.urlopen(
urllib.request.Request(url_index_manifest, headers=self.headers)
)
except OSError as e:
raise FetchIndexError(
f"Could not read index manifest from {url_index_manifest}"
) from e
with response:
index_blob_record = self.get_index_manifest(response)
# Early exit if our cache is up to date.
if self.local_hash and self.local_hash == index_blob_record.checksum:
return FetchIndexResult(etag=None, hash=None, data=None, fresh=True)
# Otherwise, download the index blob
cache_entry = cache_class(self.url, allow_unsigned=True)
computed_hash, result = self.fetch_index_blob(cache_entry, index_blob_record)
cache_entry.destroy()
# For now we only handle etags on http(s), since 304 error handling
# in s3:// is not there yet.
if urllib.parse.urlparse(self.url).scheme not in ("http", "https"):
etag = None
else:
etag = web_util.parse_etag(
response.headers.get("Etag", None) or response.headers.get("etag", None)
)
return FetchIndexResult(etag=etag, hash=computed_hash, data=result, fresh=False)
[docs]
class EtagIndexHandler(IndexHandler):
"""Fetcher for buildcache index, cache invalidation via ETags headers
This class differs from the :class:`DefaultIndexHandler` in the following ways:
1. It is provided with an etag value on creation, rather than an index checksum value. Note
that since we never start out with an etag, the default fetcher must have been used initially
and determined that the etag approach is valid.
2. It provides this etag value in the ``If-None-Match`` request header for the
index manifest.
3. It checks for special exception type and response code indicating the index manifest is not
modified, exiting early and returning ``Fresh``, if encountered.
4. If it needs to actually read the manifest, it does not need to do any checks of the url
scheme to determine whether an etag should be included in the return value."""
def __init__(self, mirror_metadata: MirrorMetadata, etag, urlopen=web_util.urlopen):
self.url = mirror_metadata.url
self.view = mirror_metadata.view
self.layout_version = mirror_metadata.version
self.etag = etag
self.urlopen = urlopen
[docs]
def conditional_fetch(self) -> FetchIndexResult:
# Do a conditional fetch of the index manifest (i.e. using If-None-Match header)
cache_class = get_url_buildcache_class(layout_version=self.layout_version)
manifest_url = cache_class.get_index_url(self.url, self.view)
headers = {"User-Agent": web_util.SPACK_USER_AGENT, "If-None-Match": f'"{self.etag}"'}
try:
response = self.urlopen(urllib.request.Request(manifest_url, headers=headers))
except urllib.error.HTTPError as e:
if e.getcode() == 304:
# The remote manifest has not been modified, i.e. the index we
# already have is the freshest there is.
return FetchIndexResult(etag=None, hash=None, data=None, fresh=True)
raise FetchIndexError(f"Could not fetch index manifest {manifest_url}", e) from e
except OSError as e: # URLError, socket.timeout, etc.
raise FetchIndexError(f"Could not fetch index manifest {manifest_url}", e) from e
# We need to read the index manifest and fetch the associated blob
with response:
index_blob_record = self.get_index_manifest(response)
etag_header_value = response.headers.get("Etag", None) or response.headers.get(
"etag", None
)
cache_entry = cache_class(self.url, allow_unsigned=True)
computed_hash, result = self.fetch_index_blob(cache_entry, index_blob_record)
cache_entry.destroy()
return FetchIndexResult(
etag=web_util.parse_etag(etag_header_value),
hash=computed_hash,
data=result,
fresh=False,
)
[docs]
def get_index_fetcher(
scheme: str, mirror_metadata: MirrorMetadata, cache_entry: Dict[str, str]
) -> IndexHandler:
if scheme == "oci":
# TODO: Actually etag and OCI are not mutually exclusive...
return OCIIndexHandler(mirror_metadata, cache_entry.get("index_hash", None))
elif cache_entry.get("etag"):
if mirror_metadata.version < 3:
return EtagIndexHandlerV2(mirror_metadata, cache_entry["etag"])
else:
return EtagIndexHandler(mirror_metadata, cache_entry["etag"])
else:
if mirror_metadata.version < 3:
return DefaultIndexHandlerV2(
mirror_metadata, local_hash=cache_entry.get("index_hash", None)
)
else:
return DefaultIndexHandler(
mirror_metadata, local_hash=cache_entry.get("index_hash", None)
)
[docs]
class NoOverwriteException(spack.error.SpackError):
"""Raised when a file would be overwritten"""
def __init__(self, file_path):
super().__init__(f"Refusing to overwrite the following file: {file_path}")
[docs]
class NoGpgException(spack.error.SpackError):
"""
Raised when gpg2 is not in PATH
"""
def __init__(self, msg):
super().__init__(msg)
[docs]
class NoKeyException(spack.error.SpackError):
"""
Raised when gpg has no default key added.
"""
def __init__(self, msg):
super().__init__(msg)
[docs]
class PickKeyException(spack.error.SpackError):
"""
Raised when multiple keys can be used to sign.
"""
def __init__(self, keys):
err_msg = "Multiple keys available for signing\n%s\n" % keys
err_msg += "Use spack buildcache create -k <key hash> to pick a key."
super().__init__(err_msg)
[docs]
class NewLayoutException(spack.error.SpackError):
"""
Raised if directory layout is different from buildcache.
"""
def __init__(self, msg):
super().__init__(msg)
[docs]
class UnsignedPackageException(spack.error.SpackError):
"""
Raised if installation of unsigned package is attempted without
the use of ``--no-check-signature``.
"""
[docs]
class GenerateIndexError(spack.error.SpackError):
"""Raised when unable to generate key or package index for mirror"""
[docs]
class CannotListKeys(GenerateIndexError):
"""Raised when unable to list keys when generating key index"""
[docs]
class PushToBuildCacheError(spack.error.SpackError):
"""Raised when unable to push objects to binary mirror"""