Source code for spack.buildcache_migrate

# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

import json
import os
import pathlib
import tempfile
from typing import NamedTuple

import spack.binary_distribution
import spack.database as spack_db
import spack.error
import spack.llnl.util.tty as tty
import spack.mirrors.mirror
import spack.spec
import spack.stage
import spack.util.crypto
import spack.util.parallel
import spack.util.url as url_util
import spack.util.web as web_util

from .enums import InstallRecordStatus
from .url_buildcache import (
    BlobRecord,
    BuildcacheComponent,
    compressed_json_from_dict,
    get_url_buildcache_class,
    sign_file,
    try_verify,
)


[docs] def v2_tarball_directory_name(spec): """ Return name of the tarball directory according to the convention <os>-<architecture>/<compiler>/<package>-<version>/ """ return spec.format_path("{architecture}/{compiler.name}-{compiler.version}/{name}-{version}")
[docs] def v2_tarball_name(spec, ext): """ Return the name of the tarfile according to the convention <os>-<architecture>-<package>-<dag_hash><ext> """ spec_formatted = spec.format_path( "{architecture}-{compiler.name}-{compiler.version}-{name}-{version}-{hash}" ) return f"{spec_formatted}{ext}"
[docs] def v2_tarball_path_name(spec, ext): """ Return the full path+name for a given spec according to the convention <tarball_directory_name>/<tarball_name> """ return os.path.join(v2_tarball_directory_name(spec), v2_tarball_name(spec, ext))
[docs] class MigrateSpecResult(NamedTuple): success: bool message: str
[docs] class MigrationException(spack.error.SpackError): """ Raised when migration fails irrevocably """ def __init__(self, msg): super().__init__(msg)
def _migrate_spec( s: spack.spec.Spec, mirror_url: str, tmpdir: str, unsigned: bool = False, signing_key: str = "" ) -> MigrateSpecResult: """Parallelizable function to migrate a single spec""" print_spec = f"{s.name}/{s.dag_hash()[:7]}" # Check if the spec file exists in the new location and exit early if so v3_cache_class = get_url_buildcache_class(layout_version=3) v3_cache_entry = v3_cache_class(mirror_url, s, allow_unsigned=unsigned) exists = v3_cache_entry.exists([BuildcacheComponent.SPEC, BuildcacheComponent.TARBALL]) v3_cache_entry.destroy() if exists: msg = f"No need to migrate {print_spec}" return MigrateSpecResult(True, msg) # Try to fetch the spec metadata v2_metadata_urls = [ url_util.join(mirror_url, "build_cache", v2_tarball_name(s, ".spec.json.sig")) ] if unsigned: v2_metadata_urls.append( url_util.join(mirror_url, "build_cache", v2_tarball_name(s, ".spec.json")) ) spec_contents = None for meta_url in v2_metadata_urls: try: spec_contents = web_util.read_text(meta_url) v2_spec_url = meta_url break except (web_util.SpackWebError, OSError): pass else: msg = f"Unable to read metadata for {print_spec}" return MigrateSpecResult(False, msg) spec_dict = {} if unsigned: # User asked for unsigned, if we found a signed specfile, just ignore # the signature if v2_spec_url.endswith(".sig"): spec_dict = spack.spec.Spec.extract_json_from_clearsig(spec_contents) else: spec_dict = json.loads(spec_contents) else: # User asked for signed, we must successfully verify the signature local_signed_pre_verify = os.path.join( tmpdir, f"{s.name}_{s.dag_hash()}_verify.spec.json.sig" ) with open(local_signed_pre_verify, "w", encoding="utf-8") as fd: fd.write(spec_contents) if not try_verify(local_signed_pre_verify): return MigrateSpecResult(False, f"Failed to verify signature of {print_spec}") with open(local_signed_pre_verify, encoding="utf-8") as fd: spec_dict = spack.spec.Spec.extract_json_from_clearsig(fd.read()) # Read out and remove the bits needed to rename and position the archive bcc = spec_dict.pop("binary_cache_checksum", None) if not bcc: msg = "Cannot migrate a spec that does not have 'binary_cache_checksum'" return MigrateSpecResult(False, msg) algorithm = bcc["hash_algorithm"] checksum = bcc["hash"] # TODO: Remove this key once oci buildcache no longer uses it spec_dict["buildcache_layout_version"] = 2 v2_archive_url = url_util.join(mirror_url, "build_cache", v2_tarball_path_name(s, ".spack")) # spacks web utilities do not include direct copying of s3 objects, so we # need to download the archive locally, and then push it back to the target # location archive_stage_path = os.path.join(tmpdir, f"archive_stage_{s.name}_{s.dag_hash()}") archive_stage = spack.stage.Stage(v2_archive_url, path=archive_stage_path) try: archive_stage.create() archive_stage.fetch() except spack.error.FetchError: return MigrateSpecResult(False, f"Unable to fetch archive for {print_spec}") local_tarfile_path = archive_stage.save_filename # As long as we have to download the tarball anyway, we might as well compute the # checksum locally and check it against the expected value local_checksum = spack.util.crypto.checksum( spack.util.crypto.hash_fun_for_algo(algorithm), local_tarfile_path ) if local_checksum != checksum: return MigrateSpecResult( False, f"Checksum mismatch for {print_spec}: expected {checksum}, got {local_checksum}" ) spec_dict["archive_size"] = os.stat(local_tarfile_path).st_size # Compress the spec dict and compute its checksum metadata_checksum_algo = "sha256" spec_json_path = os.path.join(tmpdir, f"{s.name}_{s.dag_hash()}.spec.json") metadata_checksum, metadata_size = compressed_json_from_dict( spec_json_path, spec_dict, metadata_checksum_algo ) tarball_blob_record = BlobRecord( spec_dict["archive_size"], v3_cache_class.TARBALL_MEDIATYPE, "gzip", algorithm, checksum ) metadata_blob_record = BlobRecord( metadata_size, v3_cache_class.SPEC_MEDIATYPE, "gzip", metadata_checksum_algo, metadata_checksum, ) # Compute the urls to the new blobs v3_archive_url = v3_cache_class.get_blob_url(mirror_url, tarball_blob_record) v3_spec_url = v3_cache_class.get_blob_url(mirror_url, metadata_blob_record) # First push the tarball tty.debug(f"Pushing {local_tarfile_path} to {v3_archive_url}") try: web_util.push_to_url(local_tarfile_path, v3_archive_url, keep_original=True) except Exception: return MigrateSpecResult(False, f"Failed to push archive for {print_spec}") # Then push the spec file tty.debug(f"Pushing {spec_json_path} to {v3_spec_url}") try: web_util.push_to_url(spec_json_path, v3_spec_url, keep_original=True) except Exception: return MigrateSpecResult(False, f"Failed to push spec metadata for {print_spec}") # Generate the manifest and write it to a temporary location manifest = { "version": v3_cache_class.get_layout_version(), "data": [tarball_blob_record.to_dict(), metadata_blob_record.to_dict()], } manifest_path = os.path.join(tmpdir, f"{s.dag_hash()}.manifest.json") with open(manifest_path, "w", encoding="utf-8") as f: json.dump(manifest, f, indent=0, separators=(",", ":")) # Note: when using gpg clear sign, we need to avoid long lines (19995 # chars). If lines are longer, they are truncated without error. So, # here we still add newlines, but no indent, so save on file size and # line length. # Possibly sign the manifest if not unsigned: manifest_path = sign_file(signing_key, manifest_path) v3_manifest_url = v3_cache_class.get_manifest_url(s, mirror_url) # Push the manifest try: web_util.push_to_url(manifest_path, v3_manifest_url, keep_original=True) except Exception: return MigrateSpecResult(False, f"Failed to push manifest for {print_spec}") return MigrateSpecResult(True, f"Successfully migrated {print_spec}")
[docs] def migrate( mirror: spack.mirrors.mirror.Mirror, unsigned: bool = False, delete_existing: bool = False ) -> None: """Perform migration of the given mirror If unsigned is True, signatures on signed specs will be ignored, and specs will not be re-signed before pushing to the new location. Otherwise, spack will attempt to verify signatures and re-sign specs, and will fail if not able to do so. If delete_existing is True, spack will delete the original contents of the mirror once the migration is complete.""" signing_key = "" if not unsigned: try: signing_key = spack.binary_distribution.select_signing_key() except ( spack.binary_distribution.NoKeyException, spack.binary_distribution.PickKeyException, ): raise MigrationException( "Signed migration requires exactly one secret key in keychain" ) delete_action = "deleting" if delete_existing else "keeping" sign_action = "an unsigned" if unsigned else "a signed" mirror_url = mirror.fetch_url tty.msg( f"Performing {sign_action} migration of {mirror.push_url} " f"and {delete_action} existing contents" ) index_url = url_util.join(mirror_url, "build_cache", spack_db.INDEX_JSON_FILE) contents = None try: contents = web_util.read_text(index_url) except (web_util.SpackWebError, OSError): raise MigrationException("Buildcache migration requires a buildcache index") with tempfile.TemporaryDirectory(dir=spack.stage.get_stage_root()) as tmpdir: index_path = os.path.join(tmpdir, "_tmp_index.json") with open(index_path, "w", encoding="utf-8") as fd: fd.write(contents) db = spack.binary_distribution.BuildCacheDatabase(tmpdir) db._read_from_file(pathlib.Path(index_path)) specs_to_migrate = [ s for s in db.query_local(installed=InstallRecordStatus.ANY) # todo, make it easier to get install records associated with specs if not s.external and db._data[s.dag_hash()].in_buildcache ] # Run the tasks in parallel if possible executor = spack.util.parallel.make_concurrent_executor() migrate_futures = [ executor.submit(_migrate_spec, spec, mirror_url, tmpdir, unsigned, signing_key) for spec in specs_to_migrate ] success_count = 0 tty.msg("Migration summary:") for spec, migrate_future in zip(specs_to_migrate, migrate_futures): result = migrate_future.result() msg = f" {spec.name}/{spec.dag_hash()[:7]}: {result.message}" if result.success: success_count += 1 tty.msg(msg) else: tty.error(msg) # The migrated index should have the same specs as the original index, # modulo any specs that we failed to migrate for whatever reason. So # to avoid having to re-fetch all the spec files now, just mark them # appropriately in the existing database and push that. db.mark(spec, "in_buildcache", result.success) if success_count > 0: tty.msg("Updating index and pushing keys") # If the layout.json doesn't yet exist on this mirror, push it v3_cache_class = get_url_buildcache_class(layout_version=3) v3_cache_class.maybe_push_layout_json(mirror_url) # Push the migrated mirror index index_tmpdir = os.path.join(tmpdir, "rebuild_index") os.mkdir(index_tmpdir) spack.binary_distribution._push_index(db, index_tmpdir, mirror_url) # Push the public part of the signing key if not unsigned: keys_tmpdir = os.path.join(tmpdir, "keys") os.mkdir(keys_tmpdir) spack.binary_distribution._url_push_keys( mirror_url, keys=[signing_key], update_index=True, tmpdir=keys_tmpdir ) else: tty.warn("No specs migrated, did you mean to perform an unsigned migration instead?") # Delete the old layout if the user requested it if delete_existing: delete_prefix = url_util.join(mirror_url, "build_cache") tty.msg(f"Recursively deleting {delete_prefix}") web_util.remove_url(delete_prefix, recursive=True) tty.msg("Migration complete")