Python re 模块,search() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用re.search()

项目:ln2sql    作者:FerreroJeremy    | 项目源码 | 文件源码
def create_table(self, table_string):
        lines = table_string.split("\n")
        table = Table()
        for line in lines:
            if 'TABLE' in line:
                table_name = re.search("`(\w+)`", line)
                table.name = table_name.group(1)
                if self.thesaurus_object is not None:
                    table.equivalences = self.thesaurus_object.get_synonyms_of_a_word(table.name)
            elif 'PRIMARY KEY' in line:
                primary_key_columns = re.findall("`(\w+)`", line)
                for primary_key_column in primary_key_columns:
                    table.add_primary_key(primary_key_column)
            else:
                column_name = re.search("`(\w+)`", line)
                if column_name is not None:
                    column_type = self.predict_type(line)
                    if self.thesaurus_object is not None:
                        equivalences = self.thesaurus_object.get_synonyms_of_a_word(column_name.group(1))
                    else:
                        equivalences = []
                    table.add_column(column_name.group(1), column_type, equivalences)
        return table
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def run(self):
        Analyzer.run(self)

        if self.data_type == 'domain' or self.data_type == 'url':
            try:
                pattern = re.compile("(?:Category: )([\w\s]+)")
                baseurl = 'http://www.fortiguard.com/webfilter?q='
                url = baseurl + self.getData()
                req = requests.get(url)
                category_match = re.search(pattern, req.content, flags=0)
                self.report({
                    'category': category_match.group(1)
                })
            except ValueError as e:
                self.unexpectedError(e)
        else:
            self.notSupported()
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def loopback_devices():
    '''
    Parse through 'losetup -a' output to determine currently mapped
    loopback devices. Output is expected to look like:

        /dev/loop0: [0807]:961814 (/tmp/my.img)

    :returns: dict: a dict mapping {loopback_dev: backing_file}
    '''
    loopbacks = {}
    cmd = ['losetup', '-a']
    devs = [d.strip().split(' ') for d in
            check_output(cmd).splitlines() if d != '']
    for dev, _, f in devs:
        loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
    return loopbacks
项目:linux-soft-exploit-suggester    作者:belane    | 项目源码 | 文件源码
def searchExploit(exploit_list, soft_name, soft_version):
    """ Search affected packages in exploit_list """
    result = []
    version_search = versionVartions(soft_version, args.level)
    for exploit in exploit_list:
        if exploit[5] in valid_platforms and (args.dos or exploit[6]!='dos' or args.type == 'dos'): # Platform and DoS
            if args.filter == None or args.filter.lower() in exploit[2].lower():                    # Filter
                if args.type == None or args.type == exploit[6]:                                    # Type
                    query = "(^(\w*\s){0,%s}|/\s?)%s(\s|\s.*\s|\/).* -" % (args.level, soft_name.replace('+', '\+'))
                    if re.search(query, exploit[2],re.IGNORECASE):
                        affected_versions = extractVersions(exploit[2])
                        for affected_version in affected_versions:
                            if args.level == 5 or LooseVersion(version_search) <= LooseVersion(affected_version):
                                if args.duplicates == False: exploit_list.remove(exploit)          # Duplicates
                                printOutput(exploit, soft_name, soft_version)
                                result.append([exploit, soft_name, soft_version])
                                break
    return result
项目:linux-soft-exploit-suggester    作者:belane    | 项目源码 | 文件源码
def extractVersions(title_string):
    """ Extract all version numbers from a string """
    search = re.search(r'\s-|\(|\&', title_string)
    if search:
        title_string = title_string[:search.span()[0]]
    result = []
    for possible_version in title_string.split():
        if possible_version[0].isdigit():
            if '/' in possible_version:
                for multiversion in possible_version.split('/'):
                    if '-' in multiversion:
                        multiversion = '.'.join(multiversion.split('-')[0].split('.')[:-1]) + '.' + multiversion.split('-')[-1]
                    if purgeVersionString(multiversion):
                        result.append(purgeVersionString(multiversion))
            elif '-' in possible_version:
                result.append(purgeVersionString('.'.join(possible_version.split('-')[0].split('.')[:-1]) + '.' + possible_version.split('-')[-1]))
            else:
                result.append(purgeVersionString(possible_version))
    return result
项目:heerkat    作者:Roche    | 项目源码 | 文件源码
def test_oozie_workflow(self):
        # given
        input_data_dir = 'test/availability/input-data'
        apps_streaming_dir = 'test/availability/apps/streaming'

        cmd('hdfs dfs -rm -r %s' % input_data_dir)
        cmd('hdfs dfs -mkdir -p %s' % input_data_dir)
        cmd('hdfs dfs -put %s %s' % (sample_data, input_data_dir))

        cmd('hdfs dfs -rm -r %s' % apps_streaming_dir)
        cmd('hdfs dfs -mkdir -p %s' % apps_streaming_dir)
        cmd('hdfs dfs -put resources/oozie/workflow.xml %s' % apps_streaming_dir)


        # when
        result = cmd('oozie job -oozie %s -config resources/oozie/job.properties -run' % oozie_host)
        job_id = result.stdout.replace('job: ', '')
        cmd('oozie job -oozie %s -poll %s -interval 1' % (oozie_host, job_id))
        result = cmd('oozie job -oozie %s -info %s' % (oozie_host, job_id))

        # than
        status = re.search('Status\s+:\s+(.+)', result.stdout).group(1)
        self.assertEqual('SUCCEEDED', status, result.stderr)
项目:xr-telemetry-m2m-web    作者:cisco    | 项目源码 | 文件源码
def commit_history(cli):
    """
    Parse output of "show configuration history commit reverse detail"
    """
    result = []
    record = OrderedDict()
    for line in cli.splitlines():
        r = re.search(' ([A-Z][a-z]+(?: ID)?): (.*?) +([A-Z][a-z]+): (.*)', line)
        if not r:
            continue
        record[r.group(1)] = r.group(2)
        record[r.group(3)] = r.group(4)
        if r.group(3) == 'Comment':
            result.append(record)
            record = OrderedDict()
    return result
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _getmember(self, name, tarinfo=None, normalize=False):
        """Find an archive member by name from bottom to top.
           If tarinfo is given, it is used as the starting point.
        """
        # Ensure that all members have been loaded.
        members = self.getmembers()

        # Limit the member search list up to tarinfo.
        if tarinfo is not None:
            members = members[:members.index(tarinfo)]

        if normalize:
            name = os.path.normpath(name)

        for member in reversed(members):
            if normalize:
                member_name = os.path.normpath(member.name)
            else:
                member_name = member.name

            if name == member_name:
                return member
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def find_external_links(url, page):
    """Find rel="homepage" and rel="download" links in `page`, yielding URLs"""

    for match in REL.finditer(page):
        tag, rel = match.groups()
        rels = set(map(str.strip, rel.lower().split(',')))
        if 'homepage' in rels or 'download' in rels:
            for match in HREF.finditer(tag):
                yield urllib.parse.urljoin(url, htmldecode(match.group(1)))

    for tag in ("<th>Home Page", "<th>Download URL"):
        pos = page.find(tag)
        if pos != -1:
            match = HREF.search(page, pos)
            if match:
                yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _getmember(self, name, tarinfo=None, normalize=False):
        """Find an archive member by name from bottom to top.
           If tarinfo is given, it is used as the starting point.
        """
        # Ensure that all members have been loaded.
        members = self.getmembers()

        # Limit the member search list up to tarinfo.
        if tarinfo is not None:
            members = members[:members.index(tarinfo)]

        if normalize:
            name = os.path.normpath(name)

        for member in reversed(members):
            if normalize:
                member_name = os.path.normpath(member.name)
            else:
                member_name = member.name

            if name == member_name:
                return member
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _find_link_target(self, tarinfo):
        """Find the target member of a symlink or hardlink member in the
           archive.
        """
        if tarinfo.issym():
            # Always search the entire archive.
            linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname)))
            limit = None
        else:
            # Search the archive before the link, because a hard link is
            # just a reference to an already archived file.
            linkname = tarinfo.linkname
            limit = tarinfo

        member = self._getmember(linkname, tarinfo=limit, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def b16decode(s, casefold=False):
    """Decode the Base16 encoded bytes-like object or ASCII string s.

    Optional casefold is a flag specifying whether a lowercase alphabet is
    acceptable as input.  For security purposes, the default is False.

    The result is returned as a bytes object.  A binascii.Error is raised if
    s is incorrectly padded or if there are non-alphabet characters present
    in the input.
    """
    s = _bytes_from_decode_data(s)
    if casefold:
        s = s.upper()
    if re.search(b'[^0-9A-F]', s):
        raise binascii.Error('Non-base16 digit found')
    return binascii.unhexlify(s)

#
# Ascii85 encoding/decoding
#
项目:py_find_1st    作者:roebel    | 项目源码 | 文件源码
def compiler_is_clang(comp) :
    print("check for clang compiler ...", end=' ')
    try:
        cc_output = subprocess.check_output(comp+['--version'],
                                            stderr = subprocess.STDOUT, shell=False)
    except OSError as ex:
        print("compiler test call failed with error {0:d} msg: {1}".format(ex.errno, ex.strerror))
        print("no")
        return False

    ret = re.search(b'clang', cc_output) is not None
    if ret :
        print("yes")
    else:
        print("no")
    return ret
项目:AVSR-Deep-Speech    作者:pandeydivesh15    | 项目源码 | 文件源码
def validate_label(label):
    # For now we can only handle [a-z ']
    if "(" in label or \
                    "<" in label or \
                    "[" in label or \
                    "]" in label or \
                    "&" in label or \
                    "*" in label or \
                    "{" in label or \
            re.search(r"[0-9]", label) != None:
        return None

    label = label.replace("-", "")
    label = label.replace("_", "")
    label = label.replace(".", "")
    label = label.replace(",", "")
    label = label.replace("?", "")
    label = label.strip()

    return label.lower()
项目:AVSR-Deep-Speech    作者:pandeydivesh15    | 项目源码 | 文件源码
def validate_label(label):
    # For now we can only handle [a-z ']
    if "(" in label or \
                    "<" in label or \
                    "[" in label or \
                    "]" in label or \
                    "&" in label or \
                    "*" in label or \
                    "{" in label or \
            re.search(r"[0-9]", label) != None:
        return None

    label = label.replace("-", "")
    label = label.replace("_", "")
    label = label.replace(".", "")
    label = label.replace(",", "")
    label = label.replace("?", "")
    label = label.strip()

    return label.lower()
项目:Blender-power-sequencer    作者:GDquest    | 项目源码 | 文件源码
def string_find_id(string, regex):
    """Find a marker's ID using a regular expression
    returns the ID as int if found, otherwise returns None
    Args:
        -string, any string
        -regex, the regex pattern to match."""

    if not string and regex:
        raise AttributeError("Missing name or regex attribute")

    import re
    index = re.search(regex, string)
    if index:
        found_id = index.group(1)
        return int(found_id) if found_id else None
    return None
项目:cbapi-examples    作者:cbcommunity    | 项目源码 | 文件源码
def main(cb, args):
    powershells = cb.process_search_iter('process_name:powershell.exe')
    for s in powershells:
        if s['cmdline']:
            encoded = re.search('\-[eE][nN][cC][oOdDeEcCmMaAnN]*\s([A-Za-z0-9\+/=]+)', s['cmdline'])
            if encoded != None:
                i = encoded.group(1)
                if not re.search('[a-zA-Z0-9\+/]+={1,2}$', i):
                    trailingBytes = len(i) % 4
                    if trailingBytes == 3:
                        i = i + '='
                    elif trailingBytes == 2:
                        i = i + '=='
                decodedCommand = base64.standard_b64decode(i)
                try:
                    a = decodedCommand.encode('ascii', 'replace')
                    print "Powershell Decoded Command\n%s/#analyze/%s/1\n%s\n\n" % (
                    args['server_url'], s['id'], a.replace('\0', ""))
                except UnicodeError:
                    print "Powershell Decoded Command\n%s/#analyze/%s/1\nNon-ASCII decoding, encoded form printed to assist more research\n%s\n" % (
                    args['server_url'], s['id'], s['cmdline'])
                    pass
项目:cbapi-examples    作者:cbcommunity    | 项目源码 | 文件源码
def check_triggers(message):
    global action_triggers
    return_value = False
    target_policy = ""

    for policy in action_triggers:
#       Need to make a copy here cause globals :(
        rules = list(action_triggers[policy]['rules'])
        while rules:
            criteria = rules.pop(0)
            expression = rules.pop(0)
            if re.search(expression, message[criteria].lower()):
                return_value = True
            else:
                return_value = False
                rules = []

        if return_value:
            target_policy = action_triggers[policy]['targetpolicy']
            break

    return return_value,target_policy
项目:cbapi-examples    作者:cbcommunity    | 项目源码 | 文件源码
def move_policy(sensor, targetPolicy):
    global eptoken
    global epserver

    bit9 = bit9api.bit9Api(
        "https://"+epserver,  # Replace with actual Bit9 server URL
        token=eptoken,
        ssl_verify=False  # Don't validate server's SSL certificate. Set to True unless using self-signed cert on IIS
    )

    # policy to send the naughty host to
    targetPolicyName = targetPolicy
    destPolicies = bit9.search('v1/policy', ['name:'+targetPolicyName])
    if len(destPolicies)==0:
        raise ValueError("Cannot find destination policy "+targetPolicyName)

    # find the computer id
    destComputer = bit9.search('v1/computer', ['cbSensorId:'+str(sensor)])
    if len(destComputer)==0:
      raise ValueError("Cannot find computer named "+hostname)

    for c in destComputer:
      print "Moving computer %s from policy %s to policy %s" % (c['name'], c['policyName'], targetPolicyName)
      c['policyId'] = destPolicies[0]['id']
      bit9.update('v1/computer', c)
项目:CANpy    作者:stefanhoelzl    | 项目源码 | 文件源码
def _parse_attribute(self, attribute_str):
        """Parses a attribute string and updates the CANBus

        Args:
            attribute_str: String with attribute
        """
        pattern  = 'BA_\s+"(?P<attr_name>\S+)"\s*(?P<node>BU_)?(?P<msg>BO_)?(?P<sig>SG_)?\s*'
        pattern += '(?P<can_id>\d*)?\s*(?P<name>\S*)?\s+(?P<value>\S+)\s*;'
        reg = re.search(pattern, attribute_str)

        can_object = self._can_network
        if reg.group('node'):
            can_object = self._can_network.nodes[reg.group('name')]
        elif reg.group('msg'):
            can_object = self._can_network.get_message(int(reg.group('can_id')))
        elif reg.group('sig'):
            can_object = self._can_network.get_signal(int(reg.group('can_id')), reg.group('name'))

        cad = self._can_network.attributes.definitions[reg.group('attr_name')]
        can_object.attributes.add(CANAttribute(cad, value=self._parse_attribute_value(cad, reg.group('value'))))
项目:kas    作者:siemens    | 项目源码 | 文件源码
def ssh_setup_agent(config, envkeys=None):
    """
        Starts the ssh-agent
    """
    envkeys = envkeys or ['SSH_PRIVATE_KEY']
    output = os.popen('ssh-agent -s').readlines()
    for line in output:
        matches = re.search(r"(\S+)\=(\S+)\;", line)
        if matches:
            config.environ[matches.group(1)] = matches.group(2)

    for envkey in envkeys:
        key = os.environ.get(envkey)
        if key:
            ssh_add_key(config.environ, key)
        else:
            logging.warning('%s is missing', envkey)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _getmember(self, name, tarinfo=None, normalize=False):
        """Find an archive member by name from bottom to top.
           If tarinfo is given, it is used as the starting point.
        """
        # Ensure that all members have been loaded.
        members = self.getmembers()

        # Limit the member search list up to tarinfo.
        if tarinfo is not None:
            members = members[:members.index(tarinfo)]

        if normalize:
            name = os.path.normpath(name)

        for member in reversed(members):
            if normalize:
                member_name = os.path.normpath(member.name)
            else:
                member_name = member.name

            if name == member_name:
                return member
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def find_external_links(url, page):
    """Find rel="homepage" and rel="download" links in `page`, yielding URLs"""

    for match in REL.finditer(page):
        tag, rel = match.groups()
        rels = set(map(str.strip, rel.lower().split(',')))
        if 'homepage' in rels or 'download' in rels:
            for match in HREF.finditer(tag):
                yield urllib.parse.urljoin(url, htmldecode(match.group(1)))

    for tag in ("<th>Home Page", "<th>Download URL"):
        pos = page.find(tag)
        if pos != -1:
            match = HREF.search(page, pos)
            if match:
                yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
项目:NeoVintageous    作者:NeoVintageous    | 项目源码 | 文件源码
def next_unbalanced_tag(view, search=None, search_args={}, restart_at=None, tags=[]):
    assert search and restart_at, 'wrong call'
    region, tag, is_end_tag = search(view, **search_args)

    if not region:
        return None, None

    if not is_end_tag:
        tags.append(tag)
        search_args.update(restart_at(region))

        return next_unbalanced_tag(view, search, search_args, restart_at, tags)

    if not tags or (tag not in tags):
        return region, tag

    while tag != tags.pop():
        continue

    search_args.update(restart_at(region))

    return next_unbalanced_tag(view, search, search_args, restart_at, tags)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def _try_config_test(self, logcfg, epattern, foundTest=None ):
        import ossie.utils.log4py.config

        with stdout_redirect(cStringIO.StringIO()) as new_stdout:
            ossie.utils.log4py.config.strConfig(logcfg,None)

        new_stdout.seek(0)
        found = []
        epats=[]
        if type(epattern) == str:
            epats.append(epattern)
        else:
            epats = epattern
        if foundTest == None:
            foundTest = len(epats)*[True]
        for x in new_stdout.readlines():
            for epat in epats:
                m=re.search( epat, x )
                if m :
                    found.append( True )

        self.assertEqual(found, foundTest )
项目:cpe_utils    作者:ExodusIntelligence    | 项目源码 | 文件源码
def get_human(self, attr):
        val = getattr(self, attr)
        val = val.replace("_", " ")

        product_mapping = {
            "ie": "Internet Explorer"
        }
        if attr == "product" and val in product_mapping:
            val = product_mapping[val]

        # if there's lowercase letters in the value, make it a title
        # (if there'FAILEDs not, leave it alone - e.g. SP3)
        if re.search('[a-z]', val) is not None:
            val = val.title()

        if val.upper() in ["SP0", "SP1", "SP2", "SP3", "SP4", "SP5", "SP6"]:
            val = val.upper()
        if val.lower() in ["x86", "x64"]:
            val = val.lower()

        return val
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def handle(self, player, action, values, **kwargs):  # pragma: no cover
        await self.close(player)

        # Try to parse the button id instead of the whole action string.
        button = action
        try:
            match = re.search('button_([0-9]+)$', action)
            if len(match.groups()) == 1:
                button = match.group(1)
        except:
            pass

        if not self.response_future.done():
            self.response_future.set_result(button)

        if self.target:
            await self.target(player, action, values, **kwargs)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def loopback_devices():
    '''
    Parse through 'losetup -a' output to determine currently mapped
    loopback devices. Output is expected to look like:

        /dev/loop0: [0807]:961814 (/tmp/my.img)

    :returns: dict: a dict mapping {loopback_dev: backing_file}
    '''
    loopbacks = {}
    cmd = ['losetup', '-a']
    devs = [d.strip().split(' ') for d in
            check_output(cmd).splitlines() if d != '']
    for dev, _, f in devs:
        loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
    return loopbacks
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def __call__(self):
        settings = utils.get_settings('os')
        with open('/proc/cpuinfo', 'r') as fd:
            cpuinfo = fd.readlines()

        for line in cpuinfo:
            match = re.search(r"^vendor_id\s+:\s+(.+)", line)
            if match:
                vendor = match.group(1)

        if vendor == "GenuineIntel":
            vendor = "intel"
        elif vendor == "AuthenticAMD":
            vendor = "amd"

        ctxt = {'arch': platform.processor(),
                'cpuVendor': vendor,
                'desktop_enable': settings['general']['desktop_enable']}

        return ctxt
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def loopback_devices():
    '''
    Parse through 'losetup -a' output to determine currently mapped
    loopback devices. Output is expected to look like:

        /dev/loop0: [0807]:961814 (/tmp/my.img)

    :returns: dict: a dict mapping {loopback_dev: backing_file}
    '''
    loopbacks = {}
    cmd = ['losetup', '-a']
    devs = [d.strip().split(' ') for d in
            check_output(cmd).splitlines() if d != '']
    for dev, _, f in devs:
        loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
    return loopbacks
项目:django_pipedrive    作者:MasAval    | 项目源码 | 文件源码
def sync_one(cls, external_id, last_error=None):
        post_data = cls.pipedrive_api_client.get_instance(external_id)

        # Error code from the API
        if not post_data[u'success']:
            logging.error(post_data)
            raise UnableToSyncException(cls, external_id)
        try:
            return cls.update_or_create_entity_from_api_post(post_data[u'data'])
        except IntegrityError as e:
            logging.warning(e)
            if e.message == last_error:
                raise SameErrorTwiceSyncException(cls, external_id, e.message)
            match = re.search('.*Key \((.*)\)=\((.*)\).*', e.message)
            if match:
                field_name = match.group(1)
                field_id = match.group(2)
                model = cls.field_model_map(field_name)
                model.sync_one(field_id)
                return cls.sync_one(external_id, e.message)
            else:
                raise Exception("Could not handle error message")
项目:OpenCouture-Dev    作者:9-9-0    | 项目源码 | 文件源码
def addToCart(self):
        print '\nADD TO CART -----------------'
        session_get = self.user_session.get(self.URL_product_url, headers=self.get_headers)
        #print session_get.content
        soup = BeautifulSoup(session_get.content, 'lxml')

        results = soup.find_all('select', class_='size-select')
        #print results

        for item in results[0].select('option'):
            re_result = re.sub(self.sub_pattern, '', item.string)
            #print re_result
            matchObj = re.search(r"^%s+$" % self.user_size, re_result)
            if matchObj:
                self.post_data_addToCart['pid'] = item['value']
                self.post_data_addToCart['masterPID'] = item['value'].partition("_")[0]
                print self.post_data_addToCart
                break

        session_post = self.user_session.post(url=self.URL_cart_post_url, headers=self.post_headers, data=self.post_data_addToCart)
        print 'Add To Cart Status: ' + str(session_post.status_code)
项目:IgDiscover    作者:NBISweden    | 项目源码 | 文件源码
def git_get_keywords(versionfile_abs):
    """Extract version information from the given file."""
    # the code embedded in _version.py can just fetch the value of these
    # keywords. When used from setup.py, we don't want to import _version.py,
    # so we do it with a regexp instead. This function is not used from
    # _version.py.
    keywords = {}
    try:
        f = open(versionfile_abs, "r")
        for line in f.readlines():
            if line.strip().startswith("git_refnames ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["refnames"] = mo.group(1)
            if line.strip().startswith("git_full ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["full"] = mo.group(1)
        f.close()
    except EnvironmentError:
        pass
    return keywords
项目:IgDiscover    作者:NBISweden    | 项目源码 | 文件源码
def git_get_keywords(versionfile_abs):
    """Extract version information from the given file."""
    # the code embedded in _version.py can just fetch the value of these
    # keywords. When used from setup.py, we don't want to import _version.py,
    # so we do it with a regexp instead. This function is not used from
    # _version.py.
    keywords = {}
    try:
        f = open(versionfile_abs, "r")
        for line in f.readlines():
            if line.strip().startswith("git_refnames ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["refnames"] = mo.group(1)
            if line.strip().startswith("git_full ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["full"] = mo.group(1)
        f.close()
    except EnvironmentError:
        pass
    return keywords
项目:flask-basic-roles    作者:ownaginatious    | 项目源码 | 文件源码
def git_get_keywords(versionfile_abs):
    """Extract version information from the given file."""
    # the code embedded in _version.py can just fetch the value of these
    # keywords. When used from setup.py, we don't want to import _version.py,
    # so we do it with a regexp instead. This function is not used from
    # _version.py.
    keywords = {}
    try:
        f = open(versionfile_abs, "r")
        for line in f.readlines():
            if line.strip().startswith("git_refnames ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["refnames"] = mo.group(1)
            if line.strip().startswith("git_full ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["full"] = mo.group(1)
        f.close()
    except EnvironmentError:
        pass
    return keywords
项目:flask-basic-roles    作者:ownaginatious    | 项目源码 | 文件源码
def git_get_keywords(versionfile_abs):
    """Extract version information from the given file."""
    # the code embedded in _version.py can just fetch the value of these
    # keywords. When used from setup.py, we don't want to import _version.py,
    # so we do it with a regexp instead. This function is not used from
    # _version.py.
    keywords = {}
    try:
        f = open(versionfile_abs, "r")
        for line in f.readlines():
            if line.strip().startswith("git_refnames ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["refnames"] = mo.group(1)
            if line.strip().startswith("git_full ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["full"] = mo.group(1)
        f.close()
    except EnvironmentError:
        pass
    return keywords
项目:mysql-er    作者:StefanLim0    | 项目源码 | 文件源码
def get_sqls(self):
        """This function extracts sqls from mysql general log file.


        Returns:
           A list of :class:`SQL`. For example:
           [SQL('', u'select a.id, b.name from db.ac a join db.bc b on a.id=b.id or a.id=b.iid where a.cnt > 10')]

        """
        general_log = open(self.log_path)
        log = GeneralQueryLog(general_log)
        session_db_map = {}
        sqls = []
        for entry in log:
            if entry['command'] == 'Connect':
                m = re.search('\s+on\s(?P<name>\w+)', entry['argument'])
                if m:
                    session_db_map[entry['session_id']] = m.groupdict()['name'].strip()
            elif entry['command'] == 'Init DB':
                session_db_map[entry['session_id']] = entry['argument'].strip()
            elif entry['command'] == 'Query':
                sql = entry['argument']
                if sql.strip()[:6].lower() == 'select':
                    yield SQL(session_db_map.get(entry['session_id'], ''), sql)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def git_get_keywords(versionfile_abs):
    # the code embedded in _version.py can just fetch the value of these
    # keywords. When used from setup.py, we don't want to import _version.py,
    # so we do it with a regexp instead. This function is not used from
    # _version.py.
    keywords = {}
    try:
        f = open(versionfile_abs, "r")
        for line in f.readlines():
            if line.strip().startswith("git_refnames ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["refnames"] = mo.group(1)
            if line.strip().startswith("git_full ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["full"] = mo.group(1)
        f.close()
    except EnvironmentError:
        pass
    return keywords
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def git_get_keywords(versionfile_abs):
    # the code embedded in _version.py can just fetch the value of these
    # keywords. When used from setup.py, we don't want to import _version.py,
    # so we do it with a regexp instead. This function is not used from
    # _version.py.
    keywords = {}
    try:
        f = open(versionfile_abs, "r")
        for line in f.readlines():
            if line.strip().startswith("git_refnames ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["refnames"] = mo.group(1)
            if line.strip().startswith("git_full ="):
                mo = re.search(r'=\s*"(.*)"', line)
                if mo:
                    keywords["full"] = mo.group(1)
        f.close()
    except EnvironmentError:
        pass
    return keywords
项目:4scanner    作者:pboardman    | 项目源码 | 文件源码
def scan_thread(keyword, catalog_json):
    # Check each thread, threads who contains the keyword are returned
    matched_threads = []
    for i in range(len(catalog_json)):
        for thread in catalog_json[i]["threads"]:
            regex = r'\b{0}\b'.format(keyword)
            # Search thread title
            if 'sub' in thread:
                if re.search(regex, str(thread["sub"]), re.IGNORECASE):
                    matched_threads.append(thread["no"])

            # Search OPs post body
            if 'com' in thread:
                if re.search(regex, str(thread["com"]), re.IGNORECASE):
                    matched_threads.append(thread["no"])

    return matched_threads
项目:ln2sql    作者:FerreroJeremy    | 项目源码 | 文件源码
def alter_table(self, alter_string):
        lines = alter_string.replace('\n', ' ').split(';')
        for line in lines:
            if 'PRIMARY KEY' in line:
                table_name = re.search("TABLE `(\w+)`", line).group(1)
                table = self.get_table_by_name(table_name)
                primary_key_columns = re.findall("PRIMARY KEY \(`(\w+)`\)", line)
                for primary_key_column in primary_key_columns:
                    table.add_primary_key(primary_key_column)
            elif 'FOREIGN KEY' in line:
                table_name = re.search("TABLE `(\w+)`", line).group(1)
                table = self.get_table_by_name(table_name)
                foreign_keys_list = re.findall("FOREIGN KEY \(`(\w+)`\) REFERENCES `(\w+)` \(`(\w+)`\)", line)
                for column, foreign_table, foreign_column in foreign_keys_list:
                    table.add_foreign_key(column, foreign_table, foreign_column)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def allocate(self, ip_addr, name, platform, cpus, memory, disk):
        """When a node is found, scheduler calls this method with IP address,
        name, CPUs, memory and disk available on that node. This method should
        return a number indicating number of CPUs to use. If return value is 0,
        the node is not used; if the return value is < 0, this allocation is
        ignored (next allocation in the 'node_allocations' list, if any, is
        applied).
        """
        if not re.match(self.ip_rex, ip_addr):
            return -1
        if (self.platform and not re.search(self.platform, platform)):
            return -1
        if ((self.memory and memory and self.memory > memory) or
            (self.disk and disk and self.disk > disk)):
            return 0
        if self.cpus > 0:
            if self.cpus > cpus:
                return 0
            return self.cpus
        elif self.cpus == 0:
            return 0
        else:
            cpus += self.cpus
            if cpus < 0:
                return 0
            return cpus
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def allocate(self, ip_addr, name, platform, cpus, memory, disk):
        """When a node is found, scheduler calls this method with IP address,
        name, CPUs, memory and disk available on that node. This method should
        return a number indicating number of CPUs to use. If return value is 0,
        the node is not used; if the return value is < 0, this allocation is
        ignored (next allocation in the 'node_allocations' list, if any, is
        applied).
        """
        if not re.match(self.ip_rex, ip_addr):
            return -1
        if (self.platform and not re.search(self.platform, platform)):
            return -1
        if ((self.memory and memory and self.memory > memory) or
            (self.disk and disk and self.disk > disk)):
            return 0
        if self.cpus > 0:
            if self.cpus > cpus:
                return 0
            return self.cpus
        elif self.cpus == 0:
            return 0
        else:
            cpus += self.cpus
            if cpus < 0:
                return 0
            return cpus
项目:django-openapi-gen    作者:Ecognize    | 项目源码 | 文件源码
def Resolver(obj, path, full = False):
    m = re.search('#/(.*)/(.*)', path)
    x = None

    if full:
        b = obj[m.group(1)]
        x = b[m.group(2)]
    else:
        x = m.group(2)

    return x
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def list_nics(nic_type=None):
    """Return a list of nics of given type(s)"""
    if isinstance(nic_type, six.string_types):
        int_types = [nic_type]
    else:
        int_types = nic_type

    interfaces = []
    if nic_type:
        for int_type in int_types:
            cmd = ['ip', 'addr', 'show', 'label', int_type + '*']
            ip_output = subprocess.check_output(cmd).decode('UTF-8')
            ip_output = ip_output.split('\n')
            ip_output = (line for line in ip_output if line)
            for line in ip_output:
                if line.split()[1].startswith(int_type):
                    matched = re.search('.*: (' + int_type +
                                        r'[0-9]+\.[0-9]+)@.*', line)
                    if matched:
                        iface = matched.groups()[0]
                    else:
                        iface = line.split()[1].replace(":", "")

                    if iface not in interfaces:
                        interfaces.append(iface)
    else:
        cmd = ['ip', 'a']
        ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n')
        ip_output = (line.strip() for line in ip_output if line)

        key = re.compile('^[0-9]+:\s+(.+):')
        for line in ip_output:
            matched = re.search(key, line)
            if matched:
                iface = matched.group(1)
                iface = iface.partition("@")[0]
                if iface not in interfaces:
                    interfaces.append(iface)

    return interfaces
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def is_device_mounted(device):
    '''Given a device path, return True if that device is mounted, and False
    if it isn't.

    :param device: str: Full path of the device to check.
    :returns: boolean: True if the path represents a mounted device, False if
        it doesn't.
    '''
    try:
        out = check_output(['lsblk', '-P', device]).decode('UTF-8')
    except:
        return False
    return bool(re.search(r'MOUNTPOINT=".+"', out))
项目:linux-soft-exploit-suggester    作者:belane    | 项目源码 | 文件源码
def parseDebian(packages_file):
    """ Parse debian package list to dict (name:version) """
    result = {}
    if args.clean==True: first_field = 0
    else: first_field = 1
    for line in packages_file:
        if args.clean==True or line[:2] == 'ii':
            fields = line.split()
            if len(fields) < 2 + first_field: continue
            # Software name
            search = fields[first_field].find(':')
            if search != -1:
                soft_name = cleanName(fields[first_field][:search])
            else:
                soft_name = cleanName(fields[first_field])
            # Version
            search = re.search(r"-|\+|~", fields[first_field + 1])
            if search:
                soft_version = fields[first_field + 1][:search.span()[0]]
            else:
                soft_version = fields[first_field + 1]
            search = soft_version.find(':')
            if search != -1:
                soft_version = soft_version[search + 1:]
            soft_version = purgeVersionString(soft_version)
            # Format check
            if not soft_name or not soft_version: continue
            # Intense package name split
            if args.intense and '-' in soft_name:
                for sub_package in soft_name.split('-'):
                    if len(sub_package)>2 and '.' not in sub_package and sub_package not in badpackages: result[sub_package] = soft_version
            else:
                if soft_name not in badpackages: result[soft_name] = soft_version
    return result
项目:linux-soft-exploit-suggester    作者:belane    | 项目源码 | 文件源码
def purgeVersionString(version_string):
    """ Eliminate invalid characters and last dot from version string """
    search = re.search(r'[^0-9.]', version_string)
    if search: result = version_string[:search.span()[0]]
    else: result = version_string
    if len(result) > 0 and result[-1] == '.': result = result[:-1]
    return result
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def _search_for_query(self, query):
        if query in self._search_pattern_cache:
            return self._search_pattern_cache[query]

        # Build pattern: include all characters
        pattern = []
        for c in query:
            # pattern.append('[^{0}]*{0}'.format(re.escape(c)))
            pattern.append('.*?{0}'.format(re.escape(c)))
        pattern = ''.join(pattern)
        search = re.compile(pattern, re.IGNORECASE).search

        self._search_pattern_cache[query] = search
        return search
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def get_password(self, account, service=None):
        """Retrieve the password saved at ``service/account``.

        Raise :class:`PasswordNotFound` exception if password doesn't exist.

        :param account: name of the account the password is for, e.g.
            "Pinboard"
        :type account: ``unicode``
        :param service: Name of the service. By default, this is the workflow's
                        bundle ID
        :type service: ``unicode``
        :returns: account password
        :rtype: ``unicode``

        """
        if not service:
            service = self.bundleid

        output = self._call_security('find-generic-password', service,
                                     account, '-g')

        # Parsing of `security` output is adapted from python-keyring
        # by Jason R. Coombs
        # https://pypi.python.org/pypi/keyring
        m = re.search(
            r'password:\s*(?:0x(?P<hex>[0-9A-F]+)\s*)?(?:"(?P<pw>.*)")?',
            output)

        if m:
            groups = m.groupdict()
            h = groups.get('hex')
            password = groups.get('pw')
            if h:
                password = unicode(binascii.unhexlify(h), 'utf-8')

        self.logger.debug('Got password : %s:%s', service, account)

        return password