Python re 模块,match() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用re.match()

项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def __init__(
            self, index_url="https://pypi.python.org/simple", hosts=('*',),
            ca_bundle=None, verify_ssl=True, *args, **kw
            ):
        Environment.__init__(self, *args, **kw)
        self.index_url = index_url + "/" [:not index_url.endswith('/')]
        self.scanned_urls = {}
        self.fetched_urls = {}
        self.package_pages = {}
        self.allows = re.compile('|'.join(map(translate, hosts))).match
        self.to_scan = []
        use_ssl = (
            verify_ssl
            and ssl_support.is_available
            and (ca_bundle or ssl_support.find_ca_bundle())
        )
        if use_ssl:
            self.opener = ssl_support.opener_for(ca_bundle)
        else:
            self.opener = urllib.request.urlopen
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def load_setting_sync_delay(logger, config_settings):
    """
    Attempt to parse delay between sync loops from config settings

    :param logger:           the logger
    :param config_settings:  config settings loaded from config file
    :return:                 extracted sync delay if valid, else DEFAULT_SYNC_DELAY_IN_SECONDS
    """
    try:
        sync_delay = config_settings['export_options']['sync_delay_in_seconds']
        sync_delay_is_valid = re.match('^[0-9]+$', str(sync_delay))
        if sync_delay_is_valid and sync_delay >= 0:
            if sync_delay < DEFAULT_SYNC_DELAY_IN_SECONDS:
                '{0} seconds'.format(logger.info(
                    'Sync delay is less than the minimum recommended value of ' + str(DEFAULT_SYNC_DELAY_IN_SECONDS)))
            return sync_delay
        else:
            logger.info('Invalid sync_delay_in_seconds from the configuration file, defaulting to {0}'.format(str(
                DEFAULT_SYNC_DELAY_IN_SECONDS)))
            return DEFAULT_SYNC_DELAY_IN_SECONDS
    except Exception as ex:
        log_critical_error(logger, ex,
                           'Exception parsing sync_delay from the configuration file, defaulting to {0}'.format(str(
                               DEFAULT_SYNC_DELAY_IN_SECONDS)))
        return DEFAULT_SYNC_DELAY_IN_SECONDS
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def load_setting_api_access_token(logger, config_settings):
    """
    Attempt to parse API token from config settings

    :param logger:           the logger
    :param config_settings:  config settings loaded from config file
    :return:                 API token if valid, else None
    """
    try:
        api_token = config_settings['API']['token']
        token_is_valid = re.match('^[a-f0-9]{64}$', api_token)
        if token_is_valid:
            logger.debug('API token matched expected pattern')
            return api_token
        else:
            logger.error('API token failed to match expected pattern')
            return None
    except Exception as ex:
        log_critical_error(logger, ex, 'Exception parsing API token from config.yaml')
        return None
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def load_setting_input_filename(logger, config_settings):
    """
    Attempt to parse input filename from config settings
    :param logger:           the logger
    :param config_settings:  config settings loaded from config file
    :return:                 input filename from config file if valid, else None
    """

    try:
        filename = config_settings['input_filename']
        filename_is_valid = re.match('.+xls|.+xlsx', filename)
        if filename_is_valid:
            logger.debug('Filename matched expected pattern')
            return filename
        else:
            logger.error('Filename failed to match expected pattern, acceptable formats are xls and xlsx')
            return None
    except Exception as ex:
        log_critical_error(logger, ex, 'Exception parsing input filename from config.yaml')
项目:myhdlpeek    作者:xesscorp    | 项目源码 | 文件源码
def _sort_names(names):
    '''
    Sort peeker names by index and alphabetically.

    For example, the peeker names would be sorted as a[0], b[0], a[1], b[1], ...
    '''

    def index_key(lbl):
        '''Index sorting.'''
        m = re.match('.*\[(\d+)\]$', lbl)  # Get the bracketed index.
        if m:
            return int(m.group(1))  # Return the index as an integer.
        return -1  # No index found so it comes before everything else.

    def name_key(lbl):
        '''Name sorting.'''
        m = re.match('^([^\[]+)', lbl)  # Get name preceding bracketed index.
        if m:
            return m.group(1)  # Return name.
        return ''  # No name found.

    srt_names = sorted(names, key=name_key)
    srt_names = sorted(srt_names, key=index_key)
    return srt_names
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def split_filename(filename, project_name=None):
    """
    Extract name, version, python version from a filename (no extension)

    Return name, version, pyver or None
    """
    result = None
    pyver = None
    filename = unquote(filename).replace(' ', '-')
    m = PYTHON_VERSION.search(filename)
    if m:
        pyver = m.group(1)
        filename = filename[:m.start()]
    if project_name and len(filename) > len(project_name) + 1:
        m = re.match(re.escape(project_name) + r'\b', filename)
        if m:
            n = m.end()
            result = filename[:n], filename[n + 1:], pyver
    if result is None:
        m = PROJECT_NAME_AND_VERSION.match(filename)
        if m:
            result = m.group(1), m.group(3), pyver
    return result

# Allow spaces in name because of legacy dists like "Twisted Core"
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __init__(self, filename):
        """
        :raises InvalidWheelFilename: when the filename is invalid for a wheel
        """
        wheel_info = self.wheel_file_re.match(filename)
        if not wheel_info:
            raise InvalidWheelFilename(
                "%s is not a valid wheel filename." % filename
            )
        self.filename = filename
        self.name = wheel_info.group('name').replace('_', '-')
        # we'll assume "_" means "-" due to wheel naming scheme
        # (https://github.com/pypa/pip/issues/1150)
        self.version = wheel_info.group('ver').replace('_', '-')
        self.pyversions = wheel_info.group('pyver').split('.')
        self.abis = wheel_info.group('abi').split('.')
        self.plats = wheel_info.group('plat').split('.')

        # All the tag combinations from this file
        self.file_tags = set(
            (x, y, z) for x in self.pyversions
            for y in self.abis for z in self.plats
        )
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _needs_hiding(mod_name):
    """
    >>> _needs_hiding('setuptools')
    True
    >>> _needs_hiding('pkg_resources')
    True
    >>> _needs_hiding('setuptools_plugin')
    False
    >>> _needs_hiding('setuptools.__init__')
    True
    >>> _needs_hiding('distutils')
    True
    >>> _needs_hiding('os')
    False
    >>> _needs_hiding('Cython')
    True
    """
    pattern = re.compile(r'(setuptools|pkg_resources|distutils|Cython)(\.|$)')
    return bool(pattern.match(mod_name))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __init__(
            self, index_url="https://pypi.python.org/simple", hosts=('*',),
            ca_bundle=None, verify_ssl=True, *args, **kw
            ):
        Environment.__init__(self, *args, **kw)
        self.index_url = index_url + "/" [:not index_url.endswith('/')]
        self.scanned_urls = {}
        self.fetched_urls = {}
        self.package_pages = {}
        self.allows = re.compile('|'.join(map(translate, hosts))).match
        self.to_scan = []
        use_ssl = (
            verify_ssl
            and ssl_support.is_available
            and (ca_bundle or ssl_support.find_ca_bundle())
        )
        if use_ssl:
            self.opener = ssl_support.opener_for(ca_bundle)
        else:
            self.opener = urllib.request.urlopen
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
                   append=False):
    """Insert an entry into the list of warnings filters (at the front).

    'action' -- one of "error", "ignore", "always", "default", "module",
                or "once"
    'message' -- a regex that the warning message must match
    'category' -- a class that the warning must be a subclass of
    'module' -- a regex that the module name must match
    'lineno' -- an integer line number, 0 matches all warnings
    'append' -- if true, append to the list of filters
    """
    import re
    assert action in ("error", "ignore", "always", "default", "module",
                      "once"), "invalid action: %r" % (action,)
    assert isinstance(message, str), "message must be a string"
    assert isinstance(category, type), "category must be a class"
    assert issubclass(category, Warning), "category must be a Warning subclass"
    assert isinstance(module, str), "module must be a string"
    assert isinstance(lineno, int) and lineno >= 0, \
           "lineno must be an int >= 0"
    _add_filter(action, re.compile(message, re.I), category,
            re.compile(module), lineno, append=append)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _getcategory(category):
    import re
    if not category:
        return Warning
    if re.match("^[a-zA-Z0-9_]+$", category):
        try:
            cat = eval(category)
        except NameError:
            raise _OptionError("unknown warning category: %r" % (category,))
    else:
        i = category.rfind(".")
        module = category[:i]
        klass = category[i+1:]
        try:
            m = __import__(module, None, None, [klass])
        except ImportError:
            raise _OptionError("invalid module name: %r" % (module,))
        try:
            cat = getattr(m, klass)
        except AttributeError:
            raise _OptionError("unknown warning category: %r" % (category,))
    if not issubclass(cat, Warning):
        raise _OptionError("invalid warning category: %r" % (category,))
    return cat
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __exit__(self, *exc_info):
        if not self._entered:
            raise RuntimeError("Cannot exit %r without entering first" % self)
        self._module.filters = self._filters
        self._module._filters_mutated()
        self._module.showwarning = self._showwarning
        self._module._showwarnmsg_impl = self._showwarnmsg_impl


# filters contains a sequence of filter 5-tuples
# The components of the 5-tuple are:
# - an action: error, ignore, always, default, module, or once
# - a compiled regex that must match the warning message
# - a class representing the warning category
# - a compiled regex that must match the module that is being warned
# - a line number for the line being warning, or 0 to mean any line
# If either if the compiled regexs are None, match anything.
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def b64decode(s, altchars=None, validate=False):
    """Decode the Base64 encoded bytes-like object or ASCII string s.

    Optional altchars must be a bytes-like object or ASCII string of length 2
    which specifies the alternative alphabet used instead of the '+' and '/'
    characters.

    The result is returned as a bytes object.  A binascii.Error is raised if
    s is incorrectly padded.

    If validate is False (the default), characters that are neither in the
    normal base-64 alphabet nor the alternative alphabet are discarded prior
    to the padding check.  If validate is True, these non-alphabet characters
    in the input result in a binascii.Error.
    """
    s = _bytes_from_decode_data(s)
    if altchars is not None:
        altchars = _bytes_from_decode_data(altchars)
        assert len(altchars) == 2, repr(altchars)
        s = s.translate(bytes.maketrans(altchars, b'+/'))
    if validate and not re.match(b'^[A-Za-z0-9+/]*={0,2}$', s):
        raise binascii.Error('Non-base64 digit found')
    return binascii.a2b_base64(s)
项目:facerecognition    作者:guoxiaolu    | 项目源码 | 文件源码
def get_model_filenames(model_dir):
    files = os.listdir(model_dir)
    meta_files = [s for s in files if s.endswith('.meta')]
    if len(meta_files)==0:
        raise ValueError('No meta file found in the model directory (%s)' % model_dir)
    elif len(meta_files)>1:
        raise ValueError('There should not be more than one meta file in the model directory (%s)' % model_dir)
    meta_file = meta_files[0]
    meta_files = [s for s in files if '.ckpt' in s]
    max_step = -1
    for f in files:
        step_str = re.match(r'(^model-[\w\- ]+.ckpt-(\d+))', f)
        if step_str is not None and len(step_str.groups())>=2:
            step = int(step_str.groups()[1])
            if step > max_step:
                max_step = step
                ckpt_file = step_str.groups()[0]
    return meta_file, ckpt_file
项目:python-opilo-script    作者:SEEDTEAM    | 项目源码 | 文件源码
def sms():
    if len(sys.argv) < 2:
        print(help_text)
        return
    if sys.argv[1] == "send":
        if len(sys.argv) < 3:
            print("????? ????? ?? ?? ???.")
            return
        if not re.match(r"[\+98|0]9[0-9]*",sys.argv[2]):
            print("????? ???? ??? ?????? ???.tetete")
            return
        number = sys.argv[2]
        if re.match(sys.argv[2], r"^\+98"):
            number = re.sub("+98", "0", number)
        text = sys.argv[3]
        if len(text) > 100:
            print("????? ??????? ??? ??? ????? ???? ???.")
            return
        send_sms(number, text, str(time.time()))
        return
    if sys.argv[1] == "credits":
        get_credits()
        return
项目:Dockerfiles    作者:appscode    | 项目源码 | 文件源码
def _ungroup_go_imports(fname):
    with open(fname, 'r+') as f:
        content = f.readlines()
        out = []
        import_block = False
        for line in content:
            c = line.strip()
            if import_block:
                if c == '':
                    continue
                elif re.match(END_IMPORT_REGEX, c) is not None:
                    import_block = False
            elif re.match(BEGIN_IMPORT_REGEX, c) is not None:
                    import_block = True
            out.append(line)
        f.seek(0)
        f.writelines(out)
        f.truncate()
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def check_header_validity(header):
    """Verifies that header value is a string which doesn't contain
    leading whitespace or return characters. This prevents unintended
    header injection.

    :param header: tuple, in the format (name, value).
    """
    name, value = header

    if isinstance(value, bytes):
        pat = _CLEAN_HEADER_REGEX_BYTE
    else:
        pat = _CLEAN_HEADER_REGEX_STR
    try:
        if not pat.match(value):
            raise InvalidHeader("Invalid return character or leading space in header: %s" % name)
    except TypeError:
        raise InvalidHeader("Value for header {%s: %s} must be of type str or "
                            "bytes, not %s" % (name, value, type(value)))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _needs_hiding(mod_name):
    """
    >>> _needs_hiding('setuptools')
    True
    >>> _needs_hiding('pkg_resources')
    True
    >>> _needs_hiding('setuptools_plugin')
    False
    >>> _needs_hiding('setuptools.__init__')
    True
    >>> _needs_hiding('distutils')
    True
    >>> _needs_hiding('os')
    False
    >>> _needs_hiding('Cython')
    True
    """
    pattern = re.compile(r'(setuptools|pkg_resources|distutils|Cython)(\.|$)')
    return bool(pattern.match(mod_name))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def zap_pyfiles(self):
        log.info("Removing .py files from temporary directory")
        for base, dirs, files in walk_egg(self.bdist_dir):
            for name in files:
                path = os.path.join(base, name)

                if name.endswith('.py'):
                    log.debug("Deleting %s", path)
                    os.unlink(path)

                if base.endswith('__pycache__'):
                    path_old = path

                    pattern = r'(?P<name>.+)\.(?P<magic>[^.]+)\.pyc'
                    m = re.match(pattern, name)
                    path_new = os.path.join(base, os.pardir, m.group('name') + '.pyc')
                    log.info("Renaming file from [%s] to [%s]" % (path_old, path_new))
                    try:
                        os.remove(path_new)
                    except OSError:
                        pass
                    os.rename(path_old, path_new)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def find_external_links(url, page):
    """Find rel="homepage" and rel="download" links in `page`, yielding URLs"""

    for match in REL.finditer(page):
        tag, rel = match.groups()
        rels = set(map(str.strip, rel.lower().split(',')))
        if 'homepage' in rels or 'download' in rels:
            for match in HREF.finditer(tag):
                yield urllib.parse.urljoin(url, htmldecode(match.group(1)))

    for tag in ("<th>Home Page", "<th>Download URL"):
        pos = page.find(tag)
        if pos != -1:
            match = HREF.search(page, pos)
            if match:
                yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def get_all_users(show_teachers=False):
    """
    Finds all the users in the database

    Args:
        show_teachers: whether or not to include teachers in the response
    Returns:
        Returns the uid, username, and email of all users.
    """

    db = api.common.get_conn()

    match = {}
    projection = {"uid": 1, "username": 1, "email": 1, "tid": 1}

    if not show_teachers:
        match.update({"teacher": False})
        projection.update({"teacher": 1})

    return list(db.users.find(match, projection))
项目:NeoVintageous    作者:NeoVintageous    | 项目源码 | 文件源码
def run(self):
        view = self.window.active_view()
        pt = view.sel()[0]
        scope = view.scope_name(pt.begin()).rstrip()

        # TODO Fix jumptags scopes (rename them to less generic scopes)
        jumptag_scopes = [
            'text.neovintageous.help string.neovintageous',
            'text.neovintageous.help support.constant.neovintageous'
        ]

        if scope not in jumptag_scopes:
            return

        subject = view.substr(view.extract_scope(pt.begin()))

        if len(subject) < 3:
            return

        match = re.match('^\'[a-z_]+\'|\\|[^\\s\\|]+\\|$', subject)
        if match:
            subject = subject.strip('|')
            # TODO Refactor ex_help code into a reusable middle layer so that
            # this command doesn't have to call the ex command.
            self.window.run_command('ex_help', {'command_line': 'help ' + subject})
项目:NeoVintageous    作者:NeoVintageous    | 项目源码 | 文件源码
def find_line_text_object(view, s):
    """Implement the line object."""
    line = view.line(s)
    line_content = view.substr(line)

    begin = line.begin()
    end = line.end()

    whitespace_match = re.match("\s+", line_content)
    if whitespace_match:
        begin = begin + len(whitespace_match.group(0))

    return (begin, end)


# TODO: Move this to units.py.
项目:NeoVintageous    作者:NeoVintageous    | 项目源码 | 文件源码
def _url(regex, text):
        match = re.match(regex, text)
        if match:
            url = match.group('url')

            # Remove end of line full stop character.
            url = url.rstrip('.')

            # Remove closing tag markdown link e.g. [title](url)
            url = url.rstrip(')')

            # Remove closing tag markdown image e.g. ![alt](url)]
            if url[-2:] == ')]':
                url = url[:-2]

            return url

        return None
项目:sopel-modules    作者:phixion    | 项目源码 | 文件源码
def check_callbacks(bot, trigger, url, run=True):
    """
    Check the given URL against the callbacks list. If it matches, and ``run``
    is given as ``True``, run the callback function, otherwise pass. Returns
    ``True`` if the url matched anything in the callbacks list.
    """
    # Check if it matches the exclusion list first
    matched = any(regex.search(url) for regex in bot.memory['url_exclude'])
    # Then, check if there's anything in the callback list
    for regex, function in tools.iteritems(bot.memory['url_callbacks']):
        match = regex.search(url)
        if match:
            if run:
                function(bot, trigger, match)
            matched = True
    return matched
项目:docker-monitoring-zabbix-agent    作者:digiapulssi    | 项目源码 | 文件源码
def multi_stat_check(args, filename):
    dict = {}
    try:
        with open(args.container + "/" + filename, "r") as f:
            for line in f:
                m = _STAT_RE.match(line)
                if m:
                    dict[m.group(1)] = m.group(2)
    except Exception, e:
        if not os.path.isdir(args.container):
            os.mkdir(args.container)
        debug(args.container + ": could not get last stats from " + filename)
        debug(str(e))

        # first time running for this container create empty file
        open(args.container + "/" + filename,"w").close()
    return dict
项目:docker-monitoring-zabbix-agent    作者:digiapulssi    | 项目源码 | 文件源码
def multi_stat_update(args, container_dir, filename):
    dict = {}
    try:
        pipe = os.popen("docker exec " + args.container + " cat " + container_dir + "/" + filename  + " 2>&1")
        for line in pipe:
            m = _STAT_RE.match(line)
            if m:
                dict[m.group(1)] = m.group(2)
        pipe.close()
        f = open(args.container + "/" + filename,"w")

        for key in dict.keys():
            f.write(key + " " + dict[key] + "\n")
        f.close()
    except Exception, e:
        debug(args.container + ": could not update " + filename)
        debug(str(sys.exc_info()))
    return dict
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def parse(version):
    """Parse version to major, minor, patch, pre-release, build parts.

    :param version: version string
    :return: dictionary with the keys 'build', 'major', 'minor', 'patch',
             and 'prerelease'. The prerelease or build keys can be None
             if not provided
    :rtype: dict
    """
    match = _REGEX.match(version)
    if match is None:
        raise ValueError('%s is not valid SemVer string' % version)

    version_parts = match.groupdict()

    version_parts['major'] = int(version_parts['major'])
    version_parts['minor'] = int(version_parts['minor'])
    version_parts['patch'] = int(version_parts['patch'])

    return version_parts
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def _nat_cmp(a, b):
    def convert(text):
        return int(text) if re.match('[0-9]+', text) else text

    def split_key(key):
        return [convert(c) for c in key.split('.')]

    def cmp_prerelease_tag(a, b):
        if isinstance(a, int) and isinstance(b, int):
            return cmp(a, b)
        elif isinstance(a, int):
            return -1
        elif isinstance(b, int):
            return 1
        else:
            return cmp(a, b)

    a, b = a or '', b or ''
    a_parts, b_parts = split_key(a), split_key(b)
    for sub_a, sub_b in zip(a_parts, b_parts):
        cmp_result = cmp_prerelease_tag(sub_a, sub_b)
        if cmp_result != 0:
            return cmp_result
    else:
        return cmp(len(a), len(b))
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_iface_from_addr(addr):
    """Work out on which interface the provided address is configured."""
    for iface in netifaces.interfaces():
        addresses = netifaces.ifaddresses(iface)
        for inet_type in addresses:
            for _addr in addresses[inet_type]:
                _addr = _addr['addr']
                # link local
                ll_key = re.compile("(.+)%.*")
                raw = re.match(ll_key, _addr)
                if raw:
                    _addr = raw.group(1)

                if _addr == addr:
                    log("Address '%s' is configured on iface '%s'" %
                        (addr, iface))
                    return iface

    msg = "Unable to infer net iface on which '%s' is configured" % (addr)
    raise Exception(msg)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _add_cloud_pocket(pocket):
    """Add a cloud pocket as /etc/apt/sources.d/cloud-archive.list

    Note that this overwrites the existing file if there is one.

    This function also converts the simple pocket in to the actual pocket using
    the CLOUD_ARCHIVE_POCKETS mapping.

    :param pocket: string representing the pocket to add a deb spec for.
    :raises: SourceConfigError if the cloud pocket doesn't exist or the
        requested release doesn't match the current distro version.
    """
    apt_install(filter_installed_packages(['ubuntu-cloud-keyring']),
                fatal=True)
    if pocket not in CLOUD_ARCHIVE_POCKETS:
        raise SourceConfigError(
            'Unsupported cloud: source option %s' %
            pocket)
    actual_pocket = CLOUD_ARCHIVE_POCKETS[pocket]
    with open('/etc/apt/sources.list.d/cloud-archive.list', 'w') as apt:
        apt.write(CLOUD_ARCHIVE.format(actual_pocket))
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _add_cloud_staging(cloud_archive_release, openstack_release):
    """Add the cloud staging repository which is in
    ppa:ubuntu-cloud-archive/<openstack_release>-staging

    This function checks that the cloud_archive_release matches the current
    codename for the distro that charm is being installed on.

    :param cloud_archive_release: string, codename for the release.
    :param openstack_release: String, codename for the openstack release.
    :raises: SourceConfigError if the cloud_archive_release doesn't match the
        current version of the os.
    """
    _verify_is_ubuntu_rel(cloud_archive_release, openstack_release)
    ppa = 'ppa:ubuntu-cloud-archive/{}-staging'.format(openstack_release)
    cmd = 'add-apt-repository -y {}'.format(ppa)
    _run_with_retries(cmd.split(' '))
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _add_cloud_staging(cloud_archive_release, openstack_release):
    """Add the cloud staging repository which is in
    ppa:ubuntu-cloud-archive/<openstack_release>-staging

    This function checks that the cloud_archive_release matches the current
    codename for the distro that charm is being installed on.

    :param cloud_archive_release: string, codename for the release.
    :param openstack_release: String, codename for the openstack release.
    :raises: SourceConfigError if the cloud_archive_release doesn't match the
        current version of the os.
    """
    _verify_is_ubuntu_rel(cloud_archive_release, openstack_release)
    ppa = 'ppa:ubuntu-cloud-archive/{}-staging'.format(openstack_release)
    cmd = 'add-apt-repository -y {}'.format(ppa)
    _run_with_retries(cmd.split(' '))
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def summaries_to_be_listed_in_waiting_list(
            result_log, status_pr, statuses, trigger_str):
        summaries = []
        for a_status in statuses:
            # results's 2nd.  condition which match to "Start" is for
            # fail-safe.
            if not result_log.has_result_of_status(
                    status=a_status, results=["Succeed", "Start", "Failed"]):
                st_str = status_pr.make_tweet_string_from_toot(
                    a_status, hashtag=trigger_str)
                rs_str = "Waiting"
                rs_sm = result_log.make_result_and_others_summary(
                    status_string=st_str, hashtag=trigger_str, result=rs_str)
                in_sm = result_log.make_status_summary("inbound", a_status)
                rs_sm.update(in_sm)
                summaries.append(rs_sm)
        return summaries
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def cares_about(self, delta):
        """Return True if this observer "cares about" (i.e. wants to be
        called) for a this delta.

        """
        if (self.entity_id and delta.get_id() and
                not re.match(self.entity_id, str(delta.get_id()))):
            return False

        if self.entity_type and self.entity_type != delta.entity:
            return False

        if self.action and self.action != delta.type:
            return False

        if self.predicate and not self.predicate(delta):
            return False

        return True
项目:libbuild    作者:appscode    | 项目源码 | 文件源码
def _ungroup_go_imports(fname):
    with open(fname, 'r+') as f:
        content = f.readlines()
        out = []
        import_block = False
        for line in content:
            c = line.strip()
            if import_block:
                if c == '':
                    continue
                elif re.match(END_IMPORT_REGEX, c) is not None:
                    import_block = False
            elif re.match(BEGIN_IMPORT_REGEX, c) is not None:
                    import_block = True
            out.append(line)
        f.seek(0)
        f.writelines(out)
        f.truncate()
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def module_requirements(requirements_path, module_names, strict_bounds,
                        conda_format=False):
    module_names = set(module_names)
    found = set()
    module_lines = []
    for line in read_requirements(requirements_path,
                                  strict_bounds=strict_bounds):
        match = REQ_PATTERN.match(line)
        if match is None:
            raise AssertionError("Could not parse requirement: '%s'" % line)

        name = match.group(1)
        if name in module_names:
            found.add(name)
            if conda_format:
                line = _conda_format(line)
            module_lines.append(line)

    if found != module_names:
        raise AssertionError(
            "No requirements found for %s." % (module_names - found)
        )
    return module_lines
项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def parse(self, response):
        """
                ???html??????url ?????url??????
                ?????url???? /question/xxx ?????????????
                """
        all_urls = response.css("a::attr(href)").extract()
        all_urls = [parse.urljoin(response.url, url) for url in all_urls]
        # ??lambda???????url????????true???????false???
        all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls)
        for url in all_urls:
            match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
            if match_obj:
                # ?????question???????????????????
                request_url = match_obj.group(1)
                yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
                #??
                # break
            else:
                # pass
                # ????question??????????
                yield scrapy.Request(url, headers=self.headers, callback=self.parse)
项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def login(self, response):
        response_text = response.text
        match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
        xsrf = ''
        if match_obj:
            xsrf = (match_obj.group(1))

        if xsrf:
            post_url = "https://www.zhihu.com/login/phone_num"
            post_data = {
                "_xsrf": xsrf,
                "phone_num": "18487255487",
                "password": "ty158917",
                "captcha": ""
            }

            import time
            t = str(int(time.time() * 1000))
            captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
            yield scrapy.Request(captcha_url, headers=self.headers, meta={"post_data":post_data}, callback=self.login_after_captcha)
项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def get_xsrf():
    #??xsrf code
    response = session.get("https://www.zhihu.com", headers=header)
    response_text = response.text
    match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
    xsrf = ''
    if match_obj:
        xsrf = (match_obj.group(1))
        return xsrf
项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def zhihu_login(account, password):
    #????
    if re.match("^1\d{10}",account):
        print ("??????")
        post_url = "https://www.zhihu.com/login/phone_num"
        post_data = {
            "_xsrf": get_xsrf(),
            "phone_num": account,
            "password": password,
            "captcha":get_captcha()
        }
    else:
        if "@" in account:
            #??????????
            print("??????")
            post_url = "https://www.zhihu.com/login/email"
            post_data = {
                "_xsrf": get_xsrf(),
                "email": account,
                "password": password
            }

    response_text = session.post(post_url, data=post_data, headers=header)
    session.cookies.save()

# get_index()
# is_login()
# get_captcha()
项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def get_nums(value):
    match_re = re.match(".*?(\d+).*", value)
    if match_re:
        nums = int(match_re.group(1))
    else:
        nums = 0

    return nums
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def allocate(self, ip_addr, name, platform, cpus, memory, disk):
        """When a node is found, scheduler calls this method with IP address,
        name, CPUs, memory and disk available on that node. This method should
        return a number indicating number of CPUs to use. If return value is 0,
        the node is not used; if the return value is < 0, this allocation is
        ignored (next allocation in the 'node_allocations' list, if any, is
        applied).
        """
        if not re.match(self.ip_rex, ip_addr):
            return -1
        if (self.platform and not re.search(self.platform, platform)):
            return -1
        if ((self.memory and memory and self.memory > memory) or
            (self.disk and disk and self.disk > disk)):
            return 0
        if self.cpus > 0:
            if self.cpus > cpus:
                return 0
            return self.cpus
        elif self.cpus == 0:
            return 0
        else:
            cpus += self.cpus
            if cpus < 0:
                return 0
            return cpus
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __init__(self, host, tcp_port):
        if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
            self.addr = host
        else:
            self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
        self.port = int(tcp_port)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def allocate(self, ip_addr, name, platform, cpus, memory, disk):
        """When a node is found, scheduler calls this method with IP address,
        name, CPUs, memory and disk available on that node. This method should
        return a number indicating number of CPUs to use. If return value is 0,
        the node is not used; if the return value is < 0, this allocation is
        ignored (next allocation in the 'node_allocations' list, if any, is
        applied).
        """
        if not re.match(self.ip_rex, ip_addr):
            return -1
        if (self.platform and not re.search(self.platform, platform)):
            return -1
        if ((self.memory and memory and self.memory > memory) or
            (self.disk and disk and self.disk > disk)):
            return 0
        if self.cpus > 0:
            if self.cpus > cpus:
                return 0
            return self.cpus
        elif self.cpus == 0:
            return 0
        else:
            cpus += self.cpus
            if cpus < 0:
                return 0
            return cpus
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __init__(self, host, tcp_port):
        if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
            self.addr = host
        else:
            self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
        self.port = int(tcp_port)
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def __init__(self, fqArchiveUrl, filtersDir, outputPrefix, outputUrl, diskSize, diskType, logsPath, container, scriptUrl, tag, cores, mem, preemptible):
        super(PipelineStep, self).__init__()

        fqFileName = os.path.basename(fqArchiveUrl)
        fqInputs = "{fqArchive}:{fqFileName}".format(fqArchive=fqArchiveUrl, fqFileName=fqFileName)

        try:
            filtersDirContents = subprocess.check_output(["gsutil", "ls", filtersDir])

        except subprocess.CalledProcessError as e:
            print "ERROR: couldn't get a listing of filter files!  -- {reason}".format(reason=e)
            exit(-1)

        bfInputs = [x for x in filtersDirContents.split('\n') if re.match('^.*\.bf$', x) or re.match('^.*\.txt', x)]
        bfInputs.append(fqInputs)

        inputs = ",".join(["{url}:{filename}".format(url=x, filename=os.path.basename(x)) for x in bfInputs])
        outputs = "{outputPrefix}*:{outDir}".format(outputPrefix=outputPrefix, outDir=outputUrl)
        env = "INPUT_FILE={fqFileName},OUTPUT_PREFIX={outputPrefix},FILTERS_LIST={filtersList}".format(fqFileName=fqFileName, outputPrefix=outputPrefix, filtersList=','.join([os.path.basename(x) for x in bfInputs if re.match('^.*\.bf$', x)]))

        self._step = PipelineSchema("biobloomcategorizer",
                                                   self._pipelinesConfig,
                                                   logsPath,
                                                   container,
                                                   scriptUrl=scriptUrl,
                                                   cores=cores,
                                                   mem=mem,
                                                   diskSize=diskSize,
                                                   diskType=diskType,
                                                   inputs=inputs,
                                                   outputs=outputs,
                                                   env=env,
                                                   tag=tag,
                                                   preemptible=preemptible)
项目:docker-utils    作者:a-ba    | 项目源码 | 文件源码
def docker_version():
    out = subprocess.check_output(["docker", "-v"])
    mo = re.match(br"Docker version (\d+)\.(\d+)\.(\d+)", out)
    if mo:
        return tuple(map(int, mo.groups()))
    die("unable to parse a version number from the output of 'docker -v'")