Source code for spack.repo_migrate

# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

import ast
import difflib
import os
import re
import shutil
import sys
from typing import IO, Dict, List, Optional, Set, Tuple

import spack.repo
import spack.util.naming
import spack.util.spack_yaml


def _same_contents(f: str, g: str) -> bool:
    """Return True if the files have the same contents."""
    try:
        with open(f, "rb") as f1, open(g, "rb") as f2:
            while True:
                b1 = f1.read(4096)
                b2 = f2.read(4096)
                if b1 != b2:
                    return False
                if not b1 and not b2:
                    break
            return True
    except OSError:
        return False


[docs] def migrate_v1_to_v2( repo: spack.repo.Repo, *, patch_file: Optional[IO[bytes]], err: IO[str] = sys.stderr ) -> Tuple[bool, Optional[spack.repo.Repo]]: """To upgrade a repo from Package API v1 to v2 we need to: 1. ensure ``spack_repo/<namespace>`` parent dirs to the ``repo.yaml`` file. 2. rename <pkg dir>/package.py to <pkg module>/package.py. 3. bump the version in ``repo.yaml``. """ if not (1, 0) <= repo.package_api < (2, 0): raise RuntimeError(f"Cannot upgrade from {repo.package_api_str} to v2.0") with open(os.path.join(repo.root, "repo.yaml"), encoding="utf-8") as f: updated_config = spack.util.spack_yaml.load(f) updated_config["repo"]["api"] = "v2.0" namespace = repo.namespace.split(".") if not all( spack.util.naming.valid_module_name(part, package_api=(2, 0)) for part in namespace ): print( f"Cannot upgrade from v1 to v2, because the namespace '{repo.namespace}' is not a " "valid Python module", file=err, ) return False, None try: subdirectory = spack.repo._validate_and_normalize_subdir( repo.subdirectory, repo.root, package_api=(2, 0) ) except spack.repo.BadRepoError: print( f"Cannot upgrade from v1 to v2, because the subdirectory '{repo.subdirectory}' is not " "a valid Python module", file=err, ) return False, None new_root = os.path.join(repo.root, "spack_repo", *namespace) ino_to_relpath: Dict[int, str] = {} symlink_to_ino: Dict[str, int] = {} prefix_len = len(repo.root) + len(os.sep) rename: Dict[str, str] = {} dirs_to_create: List[str] = [] files_to_copy: List[str] = [] errors = False stack: List[Tuple[str, int]] = [(repo.packages_path, 0)] while stack: path, depth = stack.pop() try: entries = os.scandir(path) except OSError: continue for entry in entries: rel_path = entry.path[prefix_len:] if depth == 0 and entry.name in ("spack_repo", "repo.yaml"): continue ino_to_relpath[entry.inode()] = entry.path[prefix_len:] if entry.is_symlink(): try: symlink_to_ino[rel_path] = entry.stat(follow_symlinks=True).st_ino except OSError: symlink_to_ino[rel_path] = -1 # dangling or no access continue elif entry.is_dir(follow_symlinks=False): if entry.name == "__pycache__": continue # check if this is a package if depth == 0 and os.path.exists(os.path.join(entry.path, "package.py")): if "_" in entry.name: print( f"Invalid package name '{entry.name}': underscores are not allowed in " "package names, rename the package with hyphens as separators", file=err, ) errors = True continue pkg_dir = spack.util.naming.pkg_name_to_pkg_dir(entry.name, package_api=(2, 0)) if pkg_dir != entry.name: rename[f"{subdirectory}{os.sep}{entry.name}"] = ( f"{subdirectory}{os.sep}{pkg_dir}" ) dirs_to_create.append(rel_path) stack.append((entry.path, depth + 1)) continue files_to_copy.append(rel_path) if errors: return False, None rename_regex = re.compile("^(" + "|".join(re.escape(k) for k in rename.keys()) + ")") if not patch_file: os.makedirs(os.path.join(new_root, repo.subdirectory), exist_ok=True) def _relocate(rel_path: str) -> Tuple[str, str]: old = os.path.join(repo.root, rel_path) if rename: new_rel = rename_regex.sub(lambda m: rename[m.group(0)], rel_path) else: new_rel = rel_path new = os.path.join(new_root, new_rel) return old, new if patch_file: patch_file.write(b"The following directories, files and symlinks will be created:\n") for rel_path in dirs_to_create: _, new_path = _relocate(rel_path) if not patch_file: try: os.mkdir(new_path) except FileExistsError: # not an error if the directory already exists continue else: patch_file.write(b"create directory ") patch_file.write(new_path.encode("utf-8")) patch_file.write(b"\n") for rel_path in files_to_copy: old_path, new_path = _relocate(rel_path) if os.path.lexists(new_path): # if we already copied this file, don't error. if not _same_contents(old_path, new_path): print( f"Cannot upgrade from v1 to v2, because the file '{new_path}' already exists", file=err, ) return False, None continue if not patch_file: shutil.copy2(old_path, new_path) else: patch_file.write(b"copy ") patch_file.write(old_path.encode("utf-8")) patch_file.write(b" -> ") patch_file.write(new_path.encode("utf-8")) patch_file.write(b"\n") for rel_path, ino in symlink_to_ino.items(): old_path, new_path = _relocate(rel_path) if ino in ino_to_relpath: # link by path relative to the new root _, new_target = _relocate(ino_to_relpath[ino]) tgt = os.path.relpath(new_target, new_path) else: tgt = os.path.realpath(old_path) # no-op if the same, error if different if os.path.lexists(new_path): if not os.path.islink(new_path) or os.readlink(new_path) != tgt: print( f"Cannot upgrade from v1 to v2, because the file '{new_path}' already exists", file=err, ) return False, None continue if not patch_file: os.symlink(tgt, new_path) else: patch_file.write(b"create symlink ") patch_file.write(new_path.encode("utf-8")) patch_file.write(b" -> ") patch_file.write(tgt.encode("utf-8")) patch_file.write(b"\n") if not patch_file: with open(os.path.join(new_root, "repo.yaml"), "w", encoding="utf-8") as f: spack.util.spack_yaml.dump(updated_config, f) updated_repo = spack.repo.from_path(new_root) else: patch_file.write(b"\n") updated_repo = repo # compute the import diff on the v1 repo since v2 doesn't exist yet result = migrate_v2_imports( updated_repo.packages_path, updated_repo.root, patch_file=patch_file, err=err ) return result, (updated_repo if patch_file else None)
def _pkg_module_update(pkg_module: str) -> str: return re.sub(r"^num(\d)", r"_\1", pkg_module) # num7zip -> _7zip. def _spack_pkg_to_spack_repo(modulename: str) -> str: # rewrite spack.pkg.builtin.foo -> spack_repo.builtin.packages.foo.package parts = modulename.split(".") assert parts[:2] == ["spack", "pkg"] parts[0:2] = ["spack_repo"] parts.insert(2, "packages") parts[3] = _pkg_module_update(parts[3]) parts.append("package") return ".".join(parts)
[docs] def migrate_v2_imports( packages_dir: str, root: str, patch_file: Optional[IO[bytes]], err: IO[str] = sys.stderr ) -> bool: """In Package API v2.0, packages need to explicitly import package classes and a few other symbols from the build_systems module. This function automatically adds the missing imports to each package.py file in the repository.""" symbol_to_module = { "AspellDictPackage": "spack_repo.builtin.build_systems.aspell_dict", "AutotoolsPackage": "spack_repo.builtin.build_systems.autotools", "BundlePackage": "spack_repo.builtin.build_systems.bundle", "CachedCMakePackage": "spack_repo.builtin.build_systems.cached_cmake", "cmake_cache_filepath": "spack_repo.builtin.build_systems.cached_cmake", "cmake_cache_option": "spack_repo.builtin.build_systems.cached_cmake", "cmake_cache_path": "spack_repo.builtin.build_systems.cached_cmake", "cmake_cache_string": "spack_repo.builtin.build_systems.cached_cmake", "CargoPackage": "spack_repo.builtin.build_systems.cargo", "CMakePackage": "spack_repo.builtin.build_systems.cmake", "generator": "spack_repo.builtin.build_systems.cmake", "CompilerPackage": "spack_repo.builtin.build_systems.compiler", "CudaPackage": "spack_repo.builtin.build_systems.cuda", "Package": "spack_repo.builtin.build_systems.generic", "GNUMirrorPackage": "spack_repo.builtin.build_systems.gnu", "GoPackage": "spack_repo.builtin.build_systems.go", "LuaPackage": "spack_repo.builtin.build_systems.lua", "MakefilePackage": "spack_repo.builtin.build_systems.makefile", "MavenPackage": "spack_repo.builtin.build_systems.maven", "MesonPackage": "spack_repo.builtin.build_systems.meson", "MSBuildPackage": "spack_repo.builtin.build_systems.msbuild", "NMakePackage": "spack_repo.builtin.build_systems.nmake", "OctavePackage": "spack_repo.builtin.build_systems.octave", "INTEL_MATH_LIBRARIES": "spack_repo.builtin.build_systems.oneapi", "IntelOneApiLibraryPackage": "spack_repo.builtin.build_systems.oneapi", "IntelOneApiLibraryPackageWithSdk": "spack_repo.builtin.build_systems.oneapi", "IntelOneApiPackage": "spack_repo.builtin.build_systems.oneapi", "IntelOneApiStaticLibraryList": "spack_repo.builtin.build_systems.oneapi", "PerlPackage": "spack_repo.builtin.build_systems.perl", "PythonExtension": "spack_repo.builtin.build_systems.python", "PythonPackage": "spack_repo.builtin.build_systems.python", "QMakePackage": "spack_repo.builtin.build_systems.qmake", "RPackage": "spack_repo.builtin.build_systems.r", "RacketPackage": "spack_repo.builtin.build_systems.racket", "ROCmPackage": "spack_repo.builtin.build_systems.rocm", "RubyPackage": "spack_repo.builtin.build_systems.ruby", "SConsPackage": "spack_repo.builtin.build_systems.scons", "SIPPackage": "spack_repo.builtin.build_systems.sip", "SourceforgePackage": "spack_repo.builtin.build_systems.sourceforge", "SourcewarePackage": "spack_repo.builtin.build_systems.sourceware", "WafPackage": "spack_repo.builtin.build_systems.waf", "XorgPackage": "spack_repo.builtin.build_systems.xorg", } success = True for f in os.scandir(packages_dir): pkg_path = os.path.join(f.path, "package.py") if ( f.name in ("__init__.py", "__pycache__") or not f.is_dir(follow_symlinks=False) or os.path.islink(pkg_path) ): print(f"Skipping {f.path}", file=err) continue try: with open(pkg_path, "rb") as file: tree = ast.parse(file.read()) except FileNotFoundError: continue except (OSError, SyntaxError) as e: print(f"Skipping {pkg_path}: {e}", file=err) continue #: Symbols that are referenced in the package and may need to be imported. referenced_symbols: Set[str] = set() #: Set of symbols of interest that are already defined through imports, assignments, or #: function definitions. defined_symbols: Set[str] = set() best_line: Optional[int] = None seen_import = False module_replacements: Dict[str, str] = {} parent: Dict[int, ast.AST] = {} #: List of (line, col start, old, new) tuples of strings to be replaced inline. inline_updates: List[Tuple[int, int, str, str]] = [] #: List of (line from, line to, new lines) tuples of line replacements multiline_updates: List[Tuple[int, int, List[str]]] = [] try: with open(pkg_path, "r", encoding="utf-8", newline="") as file: original_lines = file.readlines() except (OSError, UnicodeDecodeError) as e: success = False print(f"Skipping {pkg_path}: {e}", file=err) continue if len(original_lines) < 2: # assume package.py files have at least 2 lines... continue if original_lines[0].endswith("\r\n"): newline = "\r\n" elif original_lines[0].endswith("\n"): newline = "\n" elif original_lines[0].endswith("\r"): newline = "\r" else: success = False print(f"{pkg_path}: unknown line ending, cannot fix", file=err) continue updated_lines = original_lines.copy() for node in ast.walk(tree): for child in ast.iter_child_nodes(node): if isinstance(child, ast.Attribute): parent[id(child)] = node # Get the last import statement from the first block of top-level imports if isinstance(node, ast.Module): for child in ast.iter_child_nodes(node): # if we never encounter an import statement, the best line to add is right # before the first node under the module if best_line is None and isinstance(child, ast.stmt): best_line = child.lineno # prefer adding right before `from spack.package import ...` if isinstance(child, ast.ImportFrom) and child.module == "spack.package": seen_import = True best_line = child.lineno # add it right before spack.package break is_import = isinstance(child, (ast.Import, ast.ImportFrom)) if is_import: if isinstance(child, (ast.stmt, ast.expr)): end_lineno = getattr(child, "end_lineno", None) if end_lineno is not None: # put it right after the last import statement best_line = end_lineno + 1 else: # old versions of python don't have end_lineno; put it before. best_line = child.lineno if not seen_import and is_import: seen_import = True elif seen_import and not is_import: break # Function definitions or assignments to variables whose name is a symbol of interest # are considered as redefinitions, so we skip them. elif isinstance(node, ast.FunctionDef): if node.name in symbol_to_module: print( f"{pkg_path}:{node.lineno}: redefinition of `{node.name}` skipped", file=err, ) defined_symbols.add(node.name) elif isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name) and target.id in symbol_to_module: print( f"{pkg_path}:{target.lineno}: redefinition of `{target.id}` skipped", file=err, ) defined_symbols.add(target.id) # Register symbols that are not imported. elif isinstance(node, ast.Name) and node.id in symbol_to_module: referenced_symbols.add(node.id) # Find lines where spack.pkg is used. elif ( isinstance(node, ast.Attribute) and isinstance(node.value, ast.Name) and node.value.id == "spack" and node.attr == "pkg" ): # go as many attrs up until we reach a known module name to be replaced known_module = "spack.pkg" ancestor = node while True: next_parent = parent.get(id(ancestor)) if next_parent is None or not isinstance(next_parent, ast.Attribute): break ancestor = next_parent known_module = f"{known_module}.{ancestor.attr}" if known_module in module_replacements: break inline_updates.append( ( ancestor.lineno, ancestor.col_offset, known_module, module_replacements[known_module], ) ) elif isinstance(node, ast.ImportFrom): # Keep track of old style spack.pkg imports, to be replaced. if node.module and node.module.startswith("spack.pkg.") and node.level == 0: depth = node.module.count(".") # not all python versions have end_lineno for ImportFrom end_lineno = getattr(node, "end_lineno", None) # simple case of find and replace # from spack.pkg.builtin.my_pkg import MyPkg # -> from spack_repo.builtin.packages.my_pkg.package import MyPkg if depth == 3: module_replacements[node.module] = _spack_pkg_to_spack_repo(node.module) inline_updates.append( ( node.lineno, node.col_offset, node.module, module_replacements[node.module], ) ) # non-trivial possible multiline case # from spack.pkg.builtin import (boost, cmake as foo) # -> import spack_repo.builtin.packages.boost.package as boost # -> import spack_repo.builtin.packages.cmake.package as foo elif depth == 2: if end_lineno is None: success = False print( f"{pkg_path}:{node.lineno}: cannot rewrite {node.module} " "import statement, since this Python version does not " "provide end_lineno. Best to update to Python 3.8+", file=err, ) continue _, _, namespace = node.module.rpartition(".") indent = original_lines[node.lineno - 1][: node.col_offset] new_lines = [] for alias in node.names: pkg_module = _pkg_module_update(alias.name) new_lines.append( f"{indent}import spack_repo.{namespace}.packages." f"{pkg_module}.package as {alias.asname or pkg_module}" f"{newline}" ) multiline_updates.append((node.lineno, end_lineno + 1, new_lines)) else: success = False print( f"{pkg_path}:{node.lineno}: don't know how to rewrite `{node.module}`", file=err, ) continue elif node.module is not None and node.level == 1 and "." not in node.module: # rewrite `from .blt import ...` -> `from ..blt.package import ...` pkg_module = _pkg_module_update(node.module) inline_updates.append( ( node.lineno, node.col_offset, f".{node.module}", f"..{pkg_module}.package", ) ) # Subtract the symbols that are imported so we don't repeatedly add imports. for alias in node.names: if alias.name in symbol_to_module: if alias.asname is None: defined_symbols.add(alias.name) # error when symbols are explicitly imported that are no longer available if node.module == "spack.package" and node.level == 0: success = False print( f"{pkg_path}:{node.lineno}: `{alias.name}` is imported from " "`spack.package`, which no longer provides this symbol", file=err, ) if alias.asname and alias.asname in symbol_to_module: defined_symbols.add(alias.asname) elif isinstance(node, ast.Import): # normal imports are easy find and replace since they are single lines. for alias in node.names: if alias.asname and alias.asname in symbol_to_module: defined_symbols.add(alias.name) elif alias.asname is None and alias.name.startswith("spack.pkg."): module_replacements[alias.name] = _spack_pkg_to_spack_repo(alias.name) inline_updates.append( ( node.lineno, node.col_offset, alias.name, module_replacements[alias.name], ) ) # Remove imported symbols from the referenced symbols referenced_symbols.difference_update(defined_symbols) # Sort from last to first so we can modify without messing up the line / col offsets inline_updates.sort(reverse=True) # Nothing to change here. if not inline_updates and not referenced_symbols: continue # First do module replacements of spack.pkg imports for line, col, old, new in inline_updates: updated_lines[line - 1] = updated_lines[line - 1][:col] + updated_lines[line - 1][ col: ].replace(old, new, 1) # Then insert new imports for symbols referenced in the package if referenced_symbols: if best_line is None: print(f"{pkg_path}: failed to update imports", file=err) success = False continue # Group missing symbols by their module missing_imports_by_module: Dict[str, list] = {} for symbol in referenced_symbols: module = symbol_to_module[symbol] if module not in missing_imports_by_module: missing_imports_by_module[module] = [] missing_imports_by_module[module].append(symbol) new_lines = [ f"from {module} import {', '.join(sorted(symbols))}{newline}" for module, symbols in sorted(missing_imports_by_module.items()) ] if not seen_import: new_lines.extend((newline, newline)) multiline_updates.append((best_line, best_line, new_lines)) multiline_updates.sort(reverse=True) for start, end, new_lines in multiline_updates: updated_lines[start - 1 : end - 1] = new_lines if patch_file: rel_pkg_path = os.path.relpath(pkg_path, start=root).replace(os.sep, "/") diff = difflib.unified_diff( original_lines, updated_lines, n=3, fromfile=f"a/{rel_pkg_path}", tofile=f"b/{rel_pkg_path}", lineterm="\n", ) for _line in diff: patch_file.write(_line.encode("utf-8")) continue tmp_file = pkg_path + ".tmp" # binary mode to avoid newline conversion issues; utf-8 was already required upon read. with open(tmp_file, "wb") as file: for _line in updated_lines: file.write(_line.encode("utf-8")) os.replace(tmp_file, pkg_path) return success