Source code for spack.oci.oci

# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

import hashlib
import json
import os
import urllib.error
import urllib.parse
from typing import List, NamedTuple, Tuple
from urllib.request import Request

import spack.fetch_strategy
import spack.llnl.util.tty as tty
import spack.mirrors.layout
import spack.mirrors.mirror
import spack.oci.opener
import spack.stage
import spack.util.url

from .image import Digest, ImageReference


[docs] class Blob(NamedTuple): compressed_digest: Digest uncompressed_digest: Digest size: int
[docs] def with_query_param(url: str, param: str, value: str) -> str: """Add a query parameter to a URL Args: url: The URL to add the parameter to. param: The parameter name. value: The parameter value. Returns: The URL with the parameter added. """ parsed = urllib.parse.urlparse(url) query = urllib.parse.parse_qs(parsed.query) if param in query: query[param].append(value) else: query[param] = [value] return urllib.parse.urlunparse( parsed._replace(query=urllib.parse.urlencode(query, doseq=True)) )
[docs] def list_tags(ref: ImageReference, _urlopen: spack.oci.opener.MaybeOpen = None) -> List[str]: """Retrieves the list of tags associated with an image, handling pagination.""" _urlopen = _urlopen or spack.oci.opener.urlopen tags = set() fetch_url = ref.tags_url() while True: # Fetch tags request = Request(url=fetch_url) with _urlopen(request) as response: spack.oci.opener.ensure_status(request, response, 200) tags.update(json.load(response)["tags"]) # Check for pagination link_header = response.headers["Link"] if link_header is None: break tty.debug(f"OCI tag pagination: {link_header}") rel_next_value = spack.util.url.parse_link_rel_next(link_header) if rel_next_value is None: break rel_next = urllib.parse.urlparse(rel_next_value) if rel_next.scheme not in ("https", ""): break fetch_url = ref.endpoint(rel_next_value) return sorted(tags)
[docs] def upload_blob( ref: ImageReference, file: str, digest: Digest, force: bool = False, small_file_size: int = 0, _urlopen: spack.oci.opener.MaybeOpen = None, ) -> bool: """Uploads a blob to an OCI registry We only do monolithic uploads, even though it's very simple to do chunked. Observed problems with chunked uploads: (1) it's slow, many sequential requests, (2) some registries set an *unknown* max chunk size, and the spec doesn't say how to obtain it Args: ref: The image reference. file: The file to upload. digest: The digest of the file. force: Whether to force upload the blob, even if it already exists. small_file_size: For files at most this size, attempt to do a single POST request instead of POST + PUT. Some registries do no support single requests, and others do not specify what size they support in single POST. For now this feature is disabled by default (0KB) Returns: True if the blob was uploaded, False if it already existed. """ _urlopen = _urlopen or spack.oci.opener.urlopen # Test if the blob already exists, if so, early exit. if not force and blob_exists(ref, digest, _urlopen): return False with open(file, "rb") as f: file_size = os.fstat(f.fileno()).st_size # For small blobs, do a single POST request. # The spec says that registries MAY support this if file_size <= small_file_size: request = Request( url=ref.uploads_url(digest), method="POST", data=f, headers={ "Content-Type": "application/octet-stream", "Content-Length": str(file_size), }, ) else: request = Request( url=ref.uploads_url(), method="POST", headers={"Content-Length": "0"} ) with _urlopen(request) as response: # Created the blob in one go. if response.status == 201: return True # Otherwise, do another PUT request. spack.oci.opener.ensure_status(request, response, 202) assert "Location" in response.headers # Can be absolute or relative, joining handles both upload_url = with_query_param( ref.endpoint(response.headers["Location"]), "digest", str(digest) ) f.seek(0) request = Request( url=upload_url, method="PUT", data=f, headers={"Content-Type": "application/octet-stream", "Content-Length": str(file_size)}, ) with _urlopen(request) as response: spack.oci.opener.ensure_status(request, response, 201) return True
[docs] def upload_manifest( ref: ImageReference, manifest: dict, tag: bool = True, _urlopen: spack.oci.opener.MaybeOpen = None, ): """Uploads a manifest/index to a registry Args: ref: The image reference. manifest: The manifest or index. tag: When true, use the tag, otherwise use the digest, this is relevant for multi-arch images, where the tag is an index, referencing the manifests by digest. Returns: The digest and size of the uploaded manifest. """ _urlopen = _urlopen or spack.oci.opener.urlopen data = json.dumps(manifest, separators=(",", ":")).encode() digest = Digest.from_sha256(hashlib.sha256(data).hexdigest()) size = len(data) if not tag: ref = ref.with_digest(digest) request = Request( url=ref.manifest_url(), method="PUT", data=data, headers={"Content-Type": manifest["mediaType"]}, ) with _urlopen(request) as response: spack.oci.opener.ensure_status(request, response, 201) return digest, size
[docs] def image_from_mirror(mirror: spack.mirrors.mirror.Mirror) -> ImageReference: """Given an OCI based mirror, extract the URL and image name from it""" return ImageReference.from_url(mirror.push_url)
[docs] def blob_exists( ref: ImageReference, digest: Digest, _urlopen: spack.oci.opener.MaybeOpen = None ) -> bool: """Checks if a blob exists in an OCI registry""" try: _urlopen = _urlopen or spack.oci.opener.urlopen with _urlopen(Request(url=ref.blob_url(digest), method="HEAD")) as response: return response.status == 200 except urllib.error.HTTPError as e: if e.getcode() == 404: return False raise
[docs] def copy_missing_layers( src: ImageReference, dst: ImageReference, architecture: str, _urlopen: spack.oci.opener.MaybeOpen = None, ) -> Tuple[dict, dict]: """Copy image layers from src to dst for given architecture. Args: src: The source image reference. dst: The destination image reference. architecture: The architecture (when referencing an index) Returns: Tuple of manifest and config of the base image. """ _urlopen = _urlopen or spack.oci.opener.urlopen manifest, config = get_manifest_and_config(src, architecture, _urlopen=_urlopen) # Get layer digests digests = [Digest.from_string(layer["digest"]) for layer in manifest["layers"]] # Filter digests that are don't exist in the registry missing_digests = [ digest for digest in digests if not blob_exists(dst, digest, _urlopen=_urlopen) ] if not missing_digests: return manifest, config # Pull missing blobs, push them to the registry with spack.stage.StageComposite.from_iterable( make_stage(url=src.blob_url(digest), digest=digest, _urlopen=_urlopen) for digest in missing_digests ) as stages: stages.fetch() stages.check() stages.cache_local() for stage, digest in zip(stages, missing_digests): # No need to check existence again, force=True. upload_blob( dst, file=stage.save_filename, force=True, digest=digest, _urlopen=_urlopen ) return manifest, config
#: OCI manifest content types (including docker type) manifest_content_type = [ "application/vnd.oci.image.manifest.v1+json", "application/vnd.docker.distribution.manifest.v2+json", ] #: OCI index content types (including docker type) index_content_type = [ "application/vnd.oci.image.index.v1+json", "application/vnd.docker.distribution.manifest.list.v2+json", ] #: All OCI manifest / index content types all_content_type = manifest_content_type + index_content_type
[docs] def get_manifest_and_config( ref: ImageReference, architecture="amd64", recurse=3, _urlopen: spack.oci.opener.MaybeOpen = None, ) -> Tuple[dict, dict]: """Recursively fetch manifest and config for a given image reference with a given architecture. Args: ref: The image reference. architecture: The architecture (when referencing an index) recurse: How many levels of index to recurse into. Returns: A tuple of (manifest, config)""" _urlopen = _urlopen or spack.oci.opener.urlopen # Get manifest with _urlopen( Request(url=ref.manifest_url(), headers={"Accept": ", ".join(all_content_type)}) ) as response: # Recurse when we find an index if response.headers["Content-Type"] in index_content_type: if recurse == 0: raise Exception("Maximum recursion depth reached while fetching OCI manifest") index = json.load(response) manifest_meta = next( manifest for manifest in index["manifests"] if manifest["platform"]["architecture"] == architecture ) return get_manifest_and_config( ref.with_digest(manifest_meta["digest"]), architecture=architecture, recurse=recurse - 1, _urlopen=_urlopen, ) # Otherwise, require a manifest if response.headers["Content-Type"] not in manifest_content_type: raise Exception(f"Unknown content type {response.headers['Content-Type']}") manifest = json.load(response) # Download, verify and cache config file config_digest = Digest.from_string(manifest["config"]["digest"]) with make_stage(ref.blob_url(config_digest), config_digest, _urlopen=_urlopen) as stage: stage.fetch() stage.check() stage.cache_local() with open(stage.save_filename, "rb") as f: config = json.load(f) return manifest, config
#: Same as upload_manifest, but with retry wrapper upload_manifest_with_retry = spack.oci.opener.default_retry(upload_manifest) #: Same as upload_blob, but with retry wrapper upload_blob_with_retry = spack.oci.opener.default_retry(upload_blob) #: Same as get_manifest_and_config, but with retry wrapper get_manifest_and_config_with_retry = spack.oci.opener.default_retry(get_manifest_and_config) #: Same as copy_missing_layers, but with retry wrapper copy_missing_layers_with_retry = spack.oci.opener.default_retry(copy_missing_layers)
[docs] def make_stage( url: str, digest: Digest, keep: bool = False, _urlopen: spack.oci.opener.MaybeOpen = None ) -> spack.stage.Stage: _urlopen = _urlopen or spack.oci.opener.urlopen fetch_strategy = spack.fetch_strategy.OCIRegistryFetchStrategy( url=url, checksum=digest.digest, _urlopen=_urlopen ) # Use blobs/<alg>/<encoded> as the cache path, which follows # the OCI Image Layout Specification. What's missing though, # is the `oci-layout` and `index.json` files, which are # required by the spec. return spack.stage.Stage( fetch_strategy, mirror_paths=spack.mirrors.layout.OCILayout(digest), name=digest.digest, keep=keep, )