view pygnulib/vfs.py @ 39114:7025642fe93e

vfs: simplify some code parts
author Dmitry Selyutin <ghostmansd@gmail.com>
date Sat, 30 Jun 2018 16:30:02 +0300
parents cd0c5e1d7984
children aa50b6ed9282
line wrap: on
line source

#!/usr/bin/python
# encoding: UTF-8
"""gnulib virtual filesystem API"""



import codecs as _codecs
import filecmp as _filecmp
import os as _os
import shutil as _shutil
import sys as _sys
import tempfile as _tempfile
import subprocess as _sp


from .error import UnknownModuleError as _UnknownModuleError
from .module import DummyModule as _DummyModule
from .module import GnulibModule as _GnulibModule
from .misc import Executable as _Executable



class BaseVFS:
    """gnulib generic virtual file system"""
    __slots__ = ("__origin", "__root", "__table")


    def __init__(self, origin, **table):
        self.__table = {}
        for (key, value) in table.items():
            self.__table[key] = _os.path.normpath(value)
        self.__origin = _os.path.normpath(origin)
        self.__root = _os.path.abspath(origin)


    def __repr__(self):
        module = self.__class__.__module__
        name = self.__class__.__name__
        return "{}.{}{{{}}}".format(module, name, repr(self.__origin))


    def __enter__(self):
        return self


    def __exit__(self, exctype, excval, exctrace):
        pass


    def __contains__(self, name):
        if _os.path.isabs(name):
            raise ValueError("name must be a relative path")
        path = _os.path.join(self.__root, self[name])
        return _os.path.exists(path)


    def __getitem__(self, name):
        parts = []
        replaced = False
        name = _os.path.normpath(name)
        if _os.path.isabs(name):
            raise ValueError("name cannot be an absolute path")
        for part in name.split(_os.path.sep):
            if part == "..":
                parts += [part]
                continue
            if not replaced and part in self.__table:
                part = self.__table[part]
                replaced = True
            parts += [part]
        return _os.path.sep.join(parts)


    def __setitem__(self, src, dst):
        for name in (src, dst):
            if _os.path.isabs(name):
                raise ValueError("name cannot be an absoule path")
        src = _os.path.normpath(src)
        dst = _os.path.normpath(dst)
        self.__table[src] = dst


    @property
    def origin(self):
        """origin VFS path"""
        return self.__origin


    @property
    def root(self):
        """absolute VFS path"""
        return self.__root



def lookup(name, primary, secondary, patch):
    """
    Try to look up a regular file inside virtual file systems or combine it via patch utility.
    The name argument is a relative file name which is going to be looked up.
    The primary argument is the primary virtual file system to search for the file.
    The secondary argument is the secondary virtual file system to search for the file.
    The patch argument must be a path to the 'patch' utility binary.

    - file is present only inside the primary VFS: open the file inside the primary VFS.
    - file is present inside the secondary VFS: open the file inside the secondary VFS.
    - both file and patch are present: combine in memory.
    - file is not present: raise an FileNotFoundError exception.

    The function returns a pair of values, representing the file path and state.
    The first element, path, represents a regular file system path.
    The second element, vfs, is either the primary or the secondary VFS.
    If the file was obtained via dynamic patching, the vfs element is None.

    NOTE: It is up to the caller to unlink files obtained after dynamic patching.
    """
    if not isinstance(name, str):
        raise TypeError("name: str expected")
    if not isinstance(primary, BaseVFS):
        raise TypeError("primary: VFS expected")
    if not isinstance(secondary, BaseVFS):
        raise TypeError("secondary: VFS expected")
    if not isinstance(patch, _Executable):
        raise TypeError("patch: executable expected")

    if name in secondary:
        return (secondary, name)
    diff = f"{name}.diff"
    if diff not in secondary:
        return (primary, name)
    with _codecs.open(primary[name], "rb") as istream:
        with _tempfile.NamedTemporaryFile(mode="w+b", delete=False) as ostream:
            path = ostream.name
            _shutil.copyfileobj(istream, ostream)
    stdin = _codecs.open(secondary[diff], "rb")
    cmd = (patch, "-s", path)
    pipes = _sp.Popen(cmd, stdin=stdin, stdout=_sp.PIPE, stderr=_sp.PIPE)
    (stdout, stderr) = pipes.communicate()
    stdout = stdout.decode("UTF-8")
    stderr = stderr.decode("UTF-8")
    returncode = pipes.returncode
    if returncode != 0:
        cmd = "patch -s {} < {}".format(path, secondary[diff])
        raise _sp.CalledProcessError(returncode, cmd, stdout, stderr)
    return (None, path)


def mkdir(root, name):
    """Create a leaf directory and all intermediate ones recursively."""
    root = BaseVFS(".") if root is None else root
    path = name if _os.path.isabs(name) else _os.path.join(root.root, root[name])
    _os.makedirs(root[name], exist_ok=True)


def backup(root, name):
    """Backup the given file."""
    root = BaseVFS(".") if root is None else root
    original_path = _os.path.join(root.root, root[name])
    backup_path = "{}~".format(original_path)
    try:
        _os.unlink(backup_path)
    except FileNotFoundError:
        pass # ignore non-existent files
    _os.rename(original_path, backup_path)


def compare(lhs_root, lhs_name, rhs_root, rhs_name):
    """Compare the given files; return True if files contain the same data."""
    lhs_root = BaseVFS(".") if lhs_root is None else lhs_root
    rhs_root = BaseVFS(".") if rhs_root is None else rhs_root
    (lhs_path, rhs_path) = (lhs_name, rhs_name)
    if not _os.path.isabs(lhs_name):
        lhs_path = _os.path.join(lhs_root.root, lhs_root[lhs_name])
    if not _os.path.isabs(rhs_name):
        rhs_path = _os.path.join(rhs_root.root, rhs_root[rhs_name])
    return _filecmp.cmp(lhs_path, rhs_path, shallow=False)


def copy(src_root, src_name, dst_root, dst_name):
    """Copy file data."""
    src_abs = _os.path.isabs(src_name)
    dst_abs = _os.path.isabs(dst_name)
    if src_abs and dst_abs:
        raise ValueError("absolute src and dst")
    limit = (16 * 1024)
    src_root = BaseVFS(".") if src_root is None else src_root
    dst_root = BaseVFS(".") if dst_root is None else dst_root
    mkdir(dst_root, _os.path.dirname(dst_name))
    (src_path, dst_path) = (src_name, dst_name)
    if not _os.path.isabs(src_name):
        src_path = _os.path.join(src_root.root, src_root[src_name])
    if not _os.path.isabs(dst_name):
        dst_path = _os.path.join(dst_root.root, dst_root[dst_name])
    with _codecs.open(src_path, "rb") as istream:
        with _codecs.open(dst_path, "wb") as ostream:
            data = True
            while data:
                data = istream.read(limit)
                ostream.write(data)


def exists(root, name):
    """Check whether the given file exists."""
    root = BaseVFS(".") if root is None else root
    path = name if _os.path.isabs(name) else _os.path.join(root.root, root[name])
    return _os.path.exists(path)


def hardlink(src_root, src_name, dst_root, dst_name):
    """Create a hard link to the file."""
    src_abs = _os.path.isabs(src_name)
    dst_abs = _os.path.isabs(dst_name)
    if src_abs and dst_abs:
        raise ValueError("absolute src and dst")
    src_root = BaseVFS(".") if src_root is None else src_root
    dst_root = BaseVFS(".") if dst_root is None else dst_root
    mkdir(src_root, _os.path.dirname(src_name))
    mkdir(dst_root, _os.path.dirname(dst_name))
    (src_path, dst_path) = (src_name, dst_name)
    if not _os.path.isabs(src_name):
        src_path = _os.path.join(src_root.root, src_root[src_name])
    if not _os.path.isabs(dst_name):
        dst_path = _os.path.join(dst_root.root, dst_root[dst_name])
    _os.link(src_path, dst_path)


def move(src_root, src_name, dst_root, dst_name):
    """Move file data."""
    src_abs = _os.path.isabs(src_name)
    dst_abs = _os.path.isabs(dst_name)
    if src_abs and dst_abs:
        raise ValueError("absolute src and dst")
    src_root = BaseVFS(".") if src_root is None else src_root
    dst_root = BaseVFS(".") if dst_root is None else dst_root
    mkdir(dst_root, _os.path.dirname(dst_name))
    (src_path, dst_path) = (src_name, dst_name)
    if not _os.path.isabs(src_name):
        src_path = _os.path.join(src_root.root, src_root[src_name])
    if not _os.path.isabs(dst_name):
        dst_path = _os.path.join(dst_root.root, dst_root[dst_name])
    _os.rename(src_path, dst_path)


def iostream(root, name, mode="r", encoding=None):
    """Open file and return a stream. Raise IOError upon failure."""
    root = BaseVFS(".") if root is None else root
    path = name if _os.path.isabs(name) else _os.path.join(root.root, root[name])
    return _codecs.open(path, mode, encoding)


def readlink(root, name):
    """Obtain the path to which the symbolic link points."""
    root = BaseVFS(".") if root is None else root
    mkdir(root, _os.path.dirname(name))
    path = name if _os.path.isabs(name) else _os.path.join(root.root, root[name])
    return _os.readlink(path)


def symlink(src_root, src_name, dst_root, dst_name, relative=True):
    """Create a symbolic link to the file."""
    src_abs = _os.path.isabs(src_name)
    dst_abs = _os.path.isabs(dst_name)
    if src_abs and dst_abs:
        raise ValueError("absolute src and dst")
    src_root = BaseVFS(".") if src_root is None else src_root
    dst_root = BaseVFS(".") if dst_root is None else dst_root
    mkdir(dst_root, _os.path.dirname(dst_name))
    if not relative:
        (src_path, dst_path) = (src_name, dst_name)
        if not _os.path.isabs(src_name):
            src_path = _os.path.join(src_root.root, src_root[src_name])
        if not _os.path.isabs(dst_name):
            dst_path = _os.path.join(dst_root.root, dst_root[dst_name])
    else:
        src_path = _os.path.join(src_root.origin, src_root[src_name])
        dst_path = _os.path.join(dst_root.origin, dst_root[dst_name])
        prefix = _os.path.relpath(_os.path.dirname(src_path), _os.path.dirname(dst_path))
        suffix = _os.path.basename(src_root[src_name])
        src_path = _os.path.join(prefix, suffix)
        dst_path = _os.path.join(dst_root.root, dst_root[dst_name])
    _os.symlink(src_path, dst_path)


def unlink(root, name):
    """Unlink a file, backing it up if necessary."""
    root = BaseVFS(".") if root is None else root
    mkdir(root, _os.path.dirname(name))
    path = name if _os.path.isabs(name) else _os.path.join(root.root, root[name])
    _os.unlink(path)



class GnulibGitVFS(BaseVFS):
    """gnulib git repository"""
    __slots__ = ("__cache", "__prefix")


    _EXCLUDE = {
        "."                 : str.startswith,
        "~"                 : str.endswith,
        "-tests"            : str.endswith,
        "ChangeLog"         : str.__eq__,
        "COPYING"           : str.__eq__,
        "README"            : str.__eq__,
        "TEMPLATE"          : str.__eq__,
        "TEMPLATE-TESTS"    : str.__eq__,
        "TEMPLATE-EXTENDED" : str.__eq__,
    }


    def __init__(self, origin):
        super().__init__(origin=origin)
        self.__cache = {"dummy": _DummyModule()}
        self.__prefix = _sys.intern(_os.path.join(self.root, "modules"))
        if not _os.path.exists(self.root):
            raise FileNotFoundError(self.root)
        if not _os.path.isdir(self.root):
            raise NotADirectoryError(self.root)
        if not _os.path.isdir(_os.path.join(self.root, ".git")):
            raise TypeError("{} is not a gnulib repository".format(prefix))


    def module(self, name):
        """Try to find the module by name."""
        if name in self.__cache:
            return self.__cache[name]
        path = _os.path.join(self.__prefix, name)
        try:
            result = self.__cache[name] = _GnulibModule(path=path, name=name)
            return result
        except _UnknownModuleError:
            return None


    def modules(self):
        """Iterate over all available modules."""
        for root, _, files in _os.walk(self.__prefix):
            names = []
            for name in files:
                exclude = False
                for (key, method) in GnulibGitVFS._EXCLUDE.items():
                    if method(name, key):
                        exclude = True
                        break
                if not exclude:
                    names += [name]
            for name in names:
                path = _os.path.join(root, name)
                name = path[len(self.__prefix) + 1:]
                yield self.module(name)