我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用re.search()。
def create_table(self, table_string): lines = table_string.split("\n") table = Table() for line in lines: if 'TABLE' in line: table_name = re.search("`(\w+)`", line) table.name = table_name.group(1) if self.thesaurus_object is not None: table.equivalences = self.thesaurus_object.get_synonyms_of_a_word(table.name) elif 'PRIMARY KEY' in line: primary_key_columns = re.findall("`(\w+)`", line) for primary_key_column in primary_key_columns: table.add_primary_key(primary_key_column) else: column_name = re.search("`(\w+)`", line) if column_name is not None: column_type = self.predict_type(line) if self.thesaurus_object is not None: equivalences = self.thesaurus_object.get_synonyms_of_a_word(column_name.group(1)) else: equivalences = [] table.add_column(column_name.group(1), column_type, equivalences) return table
def run(self): Analyzer.run(self) if self.data_type == 'domain' or self.data_type == 'url': try: pattern = re.compile("(?:Category: )([\w\s]+)") baseurl = 'http://www.fortiguard.com/webfilter?q=' url = baseurl + self.getData() req = requests.get(url) category_match = re.search(pattern, req.content, flags=0) self.report({ 'category': category_match.group(1) }) except ValueError as e: self.unexpectedError(e) else: self.notSupported()
def loopback_devices(): ''' Parse through 'losetup -a' output to determine currently mapped loopback devices. Output is expected to look like: /dev/loop0: [0807]:961814 (/tmp/my.img) :returns: dict: a dict mapping {loopback_dev: backing_file} ''' loopbacks = {} cmd = ['losetup', '-a'] devs = [d.strip().split(' ') for d in check_output(cmd).splitlines() if d != ''] for dev, _, f in devs: loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0] return loopbacks
def searchExploit(exploit_list, soft_name, soft_version): """ Search affected packages in exploit_list """ result = [] version_search = versionVartions(soft_version, args.level) for exploit in exploit_list: if exploit[5] in valid_platforms and (args.dos or exploit[6]!='dos' or args.type == 'dos'): # Platform and DoS if args.filter == None or args.filter.lower() in exploit[2].lower(): # Filter if args.type == None or args.type == exploit[6]: # Type query = "(^(\w*\s){0,%s}|/\s?)%s(\s|\s.*\s|\/).* -" % (args.level, soft_name.replace('+', '\+')) if re.search(query, exploit[2],re.IGNORECASE): affected_versions = extractVersions(exploit[2]) for affected_version in affected_versions: if args.level == 5 or LooseVersion(version_search) <= LooseVersion(affected_version): if args.duplicates == False: exploit_list.remove(exploit) # Duplicates printOutput(exploit, soft_name, soft_version) result.append([exploit, soft_name, soft_version]) break return result
def extractVersions(title_string): """ Extract all version numbers from a string """ search = re.search(r'\s-|\(|\&', title_string) if search: title_string = title_string[:search.span()[0]] result = [] for possible_version in title_string.split(): if possible_version[0].isdigit(): if '/' in possible_version: for multiversion in possible_version.split('/'): if '-' in multiversion: multiversion = '.'.join(multiversion.split('-')[0].split('.')[:-1]) + '.' + multiversion.split('-')[-1] if purgeVersionString(multiversion): result.append(purgeVersionString(multiversion)) elif '-' in possible_version: result.append(purgeVersionString('.'.join(possible_version.split('-')[0].split('.')[:-1]) + '.' + possible_version.split('-')[-1])) else: result.append(purgeVersionString(possible_version)) return result
def test_oozie_workflow(self): # given input_data_dir = 'test/availability/input-data' apps_streaming_dir = 'test/availability/apps/streaming' cmd('hdfs dfs -rm -r %s' % input_data_dir) cmd('hdfs dfs -mkdir -p %s' % input_data_dir) cmd('hdfs dfs -put %s %s' % (sample_data, input_data_dir)) cmd('hdfs dfs -rm -r %s' % apps_streaming_dir) cmd('hdfs dfs -mkdir -p %s' % apps_streaming_dir) cmd('hdfs dfs -put resources/oozie/workflow.xml %s' % apps_streaming_dir) # when result = cmd('oozie job -oozie %s -config resources/oozie/job.properties -run' % oozie_host) job_id = result.stdout.replace('job: ', '') cmd('oozie job -oozie %s -poll %s -interval 1' % (oozie_host, job_id)) result = cmd('oozie job -oozie %s -info %s' % (oozie_host, job_id)) # than status = re.search('Status\s+:\s+(.+)', result.stdout).group(1) self.assertEqual('SUCCEEDED', status, result.stderr)
def commit_history(cli): """ Parse output of "show configuration history commit reverse detail" """ result = [] record = OrderedDict() for line in cli.splitlines(): r = re.search(' ([A-Z][a-z]+(?: ID)?): (.*?) +([A-Z][a-z]+): (.*)', line) if not r: continue record[r.group(1)] = r.group(2) record[r.group(3)] = r.group(4) if r.group(3) == 'Comment': result.append(record) record = OrderedDict() return result
def _getmember(self, name, tarinfo=None, normalize=False): """Find an archive member by name from bottom to top. If tarinfo is given, it is used as the starting point. """ # Ensure that all members have been loaded. members = self.getmembers() # Limit the member search list up to tarinfo. if tarinfo is not None: members = members[:members.index(tarinfo)] if normalize: name = os.path.normpath(name) for member in reversed(members): if normalize: member_name = os.path.normpath(member.name) else: member_name = member.name if name == member_name: return member
def find_external_links(url, page): """Find rel="homepage" and rel="download" links in `page`, yielding URLs""" for match in REL.finditer(page): tag, rel = match.groups() rels = set(map(str.strip, rel.lower().split(','))) if 'homepage' in rels or 'download' in rels: for match in HREF.finditer(tag): yield urllib.parse.urljoin(url, htmldecode(match.group(1))) for tag in ("<th>Home Page", "<th>Download URL"): pos = page.find(tag) if pos != -1: match = HREF.search(page, pos) if match: yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
def _find_link_target(self, tarinfo): """Find the target member of a symlink or hardlink member in the archive. """ if tarinfo.issym(): # Always search the entire archive. linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname))) limit = None else: # Search the archive before the link, because a hard link is # just a reference to an already archived file. linkname = tarinfo.linkname limit = tarinfo member = self._getmember(linkname, tarinfo=limit, normalize=True) if member is None: raise KeyError("linkname %r not found" % linkname) return member
def b16decode(s, casefold=False): """Decode the Base16 encoded bytes-like object or ASCII string s. Optional casefold is a flag specifying whether a lowercase alphabet is acceptable as input. For security purposes, the default is False. The result is returned as a bytes object. A binascii.Error is raised if s is incorrectly padded or if there are non-alphabet characters present in the input. """ s = _bytes_from_decode_data(s) if casefold: s = s.upper() if re.search(b'[^0-9A-F]', s): raise binascii.Error('Non-base16 digit found') return binascii.unhexlify(s) # # Ascii85 encoding/decoding #
def compiler_is_clang(comp) : print("check for clang compiler ...", end=' ') try: cc_output = subprocess.check_output(comp+['--version'], stderr = subprocess.STDOUT, shell=False) except OSError as ex: print("compiler test call failed with error {0:d} msg: {1}".format(ex.errno, ex.strerror)) print("no") return False ret = re.search(b'clang', cc_output) is not None if ret : print("yes") else: print("no") return ret
def validate_label(label): # For now we can only handle [a-z '] if "(" in label or \ "<" in label or \ "[" in label or \ "]" in label or \ "&" in label or \ "*" in label or \ "{" in label or \ re.search(r"[0-9]", label) != None: return None label = label.replace("-", "") label = label.replace("_", "") label = label.replace(".", "") label = label.replace(",", "") label = label.replace("?", "") label = label.strip() return label.lower()
def string_find_id(string, regex): """Find a marker's ID using a regular expression returns the ID as int if found, otherwise returns None Args: -string, any string -regex, the regex pattern to match.""" if not string and regex: raise AttributeError("Missing name or regex attribute") import re index = re.search(regex, string) if index: found_id = index.group(1) return int(found_id) if found_id else None return None
def main(cb, args): powershells = cb.process_search_iter('process_name:powershell.exe') for s in powershells: if s['cmdline']: encoded = re.search('\-[eE][nN][cC][oOdDeEcCmMaAnN]*\s([A-Za-z0-9\+/=]+)', s['cmdline']) if encoded != None: i = encoded.group(1) if not re.search('[a-zA-Z0-9\+/]+={1,2}$', i): trailingBytes = len(i) % 4 if trailingBytes == 3: i = i + '=' elif trailingBytes == 2: i = i + '==' decodedCommand = base64.standard_b64decode(i) try: a = decodedCommand.encode('ascii', 'replace') print "Powershell Decoded Command\n%s/#analyze/%s/1\n%s\n\n" % ( args['server_url'], s['id'], a.replace('\0', "")) except UnicodeError: print "Powershell Decoded Command\n%s/#analyze/%s/1\nNon-ASCII decoding, encoded form printed to assist more research\n%s\n" % ( args['server_url'], s['id'], s['cmdline']) pass
def check_triggers(message): global action_triggers return_value = False target_policy = "" for policy in action_triggers: # Need to make a copy here cause globals :( rules = list(action_triggers[policy]['rules']) while rules: criteria = rules.pop(0) expression = rules.pop(0) if re.search(expression, message[criteria].lower()): return_value = True else: return_value = False rules = [] if return_value: target_policy = action_triggers[policy]['targetpolicy'] break return return_value,target_policy
def move_policy(sensor, targetPolicy): global eptoken global epserver bit9 = bit9api.bit9Api( "https://"+epserver, # Replace with actual Bit9 server URL token=eptoken, ssl_verify=False # Don't validate server's SSL certificate. Set to True unless using self-signed cert on IIS ) # policy to send the naughty host to targetPolicyName = targetPolicy destPolicies = bit9.search('v1/policy', ['name:'+targetPolicyName]) if len(destPolicies)==0: raise ValueError("Cannot find destination policy "+targetPolicyName) # find the computer id destComputer = bit9.search('v1/computer', ['cbSensorId:'+str(sensor)]) if len(destComputer)==0: raise ValueError("Cannot find computer named "+hostname) for c in destComputer: print "Moving computer %s from policy %s to policy %s" % (c['name'], c['policyName'], targetPolicyName) c['policyId'] = destPolicies[0]['id'] bit9.update('v1/computer', c)
def _parse_attribute(self, attribute_str): """Parses a attribute string and updates the CANBus Args: attribute_str: String with attribute """ pattern = 'BA_\s+"(?P<attr_name>\S+)"\s*(?P<node>BU_)?(?P<msg>BO_)?(?P<sig>SG_)?\s*' pattern += '(?P<can_id>\d*)?\s*(?P<name>\S*)?\s+(?P<value>\S+)\s*;' reg = re.search(pattern, attribute_str) can_object = self._can_network if reg.group('node'): can_object = self._can_network.nodes[reg.group('name')] elif reg.group('msg'): can_object = self._can_network.get_message(int(reg.group('can_id'))) elif reg.group('sig'): can_object = self._can_network.get_signal(int(reg.group('can_id')), reg.group('name')) cad = self._can_network.attributes.definitions[reg.group('attr_name')] can_object.attributes.add(CANAttribute(cad, value=self._parse_attribute_value(cad, reg.group('value'))))
def ssh_setup_agent(config, envkeys=None): """ Starts the ssh-agent """ envkeys = envkeys or ['SSH_PRIVATE_KEY'] output = os.popen('ssh-agent -s').readlines() for line in output: matches = re.search(r"(\S+)\=(\S+)\;", line) if matches: config.environ[matches.group(1)] = matches.group(2) for envkey in envkeys: key = os.environ.get(envkey) if key: ssh_add_key(config.environ, key) else: logging.warning('%s is missing', envkey)
def next_unbalanced_tag(view, search=None, search_args={}, restart_at=None, tags=[]): assert search and restart_at, 'wrong call' region, tag, is_end_tag = search(view, **search_args) if not region: return None, None if not is_end_tag: tags.append(tag) search_args.update(restart_at(region)) return next_unbalanced_tag(view, search, search_args, restart_at, tags) if not tags or (tag not in tags): return region, tag while tag != tags.pop(): continue search_args.update(restart_at(region)) return next_unbalanced_tag(view, search, search_args, restart_at, tags)
def _try_config_test(self, logcfg, epattern, foundTest=None ): import ossie.utils.log4py.config with stdout_redirect(cStringIO.StringIO()) as new_stdout: ossie.utils.log4py.config.strConfig(logcfg,None) new_stdout.seek(0) found = [] epats=[] if type(epattern) == str: epats.append(epattern) else: epats = epattern if foundTest == None: foundTest = len(epats)*[True] for x in new_stdout.readlines(): for epat in epats: m=re.search( epat, x ) if m : found.append( True ) self.assertEqual(found, foundTest )
def get_human(self, attr): val = getattr(self, attr) val = val.replace("_", " ") product_mapping = { "ie": "Internet Explorer" } if attr == "product" and val in product_mapping: val = product_mapping[val] # if there's lowercase letters in the value, make it a title # (if there'FAILEDs not, leave it alone - e.g. SP3) if re.search('[a-z]', val) is not None: val = val.title() if val.upper() in ["SP0", "SP1", "SP2", "SP3", "SP4", "SP5", "SP6"]: val = val.upper() if val.lower() in ["x86", "x64"]: val = val.lower() return val
def handle(self, player, action, values, **kwargs): # pragma: no cover await self.close(player) # Try to parse the button id instead of the whole action string. button = action try: match = re.search('button_([0-9]+)$', action) if len(match.groups()) == 1: button = match.group(1) except: pass if not self.response_future.done(): self.response_future.set_result(button) if self.target: await self.target(player, action, values, **kwargs)
def __call__(self): settings = utils.get_settings('os') with open('/proc/cpuinfo', 'r') as fd: cpuinfo = fd.readlines() for line in cpuinfo: match = re.search(r"^vendor_id\s+:\s+(.+)", line) if match: vendor = match.group(1) if vendor == "GenuineIntel": vendor = "intel" elif vendor == "AuthenticAMD": vendor = "amd" ctxt = {'arch': platform.processor(), 'cpuVendor': vendor, 'desktop_enable': settings['general']['desktop_enable']} return ctxt
def sync_one(cls, external_id, last_error=None): post_data = cls.pipedrive_api_client.get_instance(external_id) # Error code from the API if not post_data[u'success']: logging.error(post_data) raise UnableToSyncException(cls, external_id) try: return cls.update_or_create_entity_from_api_post(post_data[u'data']) except IntegrityError as e: logging.warning(e) if e.message == last_error: raise SameErrorTwiceSyncException(cls, external_id, e.message) match = re.search('.*Key \((.*)\)=\((.*)\).*', e.message) if match: field_name = match.group(1) field_id = match.group(2) model = cls.field_model_map(field_name) model.sync_one(field_id) return cls.sync_one(external_id, e.message) else: raise Exception("Could not handle error message")
def addToCart(self): print '\nADD TO CART -----------------' session_get = self.user_session.get(self.URL_product_url, headers=self.get_headers) #print session_get.content soup = BeautifulSoup(session_get.content, 'lxml') results = soup.find_all('select', class_='size-select') #print results for item in results[0].select('option'): re_result = re.sub(self.sub_pattern, '', item.string) #print re_result matchObj = re.search(r"^%s+$" % self.user_size, re_result) if matchObj: self.post_data_addToCart['pid'] = item['value'] self.post_data_addToCart['masterPID'] = item['value'].partition("_")[0] print self.post_data_addToCart break session_post = self.user_session.post(url=self.URL_cart_post_url, headers=self.post_headers, data=self.post_data_addToCart) print 'Add To Cart Status: ' + str(session_post.status_code)
def git_get_keywords(versionfile_abs): """Extract version information from the given file.""" # the code embedded in _version.py can just fetch the value of these # keywords. When used from setup.py, we don't want to import _version.py, # so we do it with a regexp instead. This function is not used from # _version.py. keywords = {} try: f = open(versionfile_abs, "r") for line in f.readlines(): if line.strip().startswith("git_refnames ="): mo = re.search(r'=\s*"(.*)"', line) if mo: keywords["refnames"] = mo.group(1) if line.strip().startswith("git_full ="): mo = re.search(r'=\s*"(.*)"', line) if mo: keywords["full"] = mo.group(1) f.close() except EnvironmentError: pass return keywords
def get_sqls(self): """This function extracts sqls from mysql general log file. Returns: A list of :class:`SQL`. For example: [SQL('', u'select a.id, b.name from db.ac a join db.bc b on a.id=b.id or a.id=b.iid where a.cnt > 10')] """ general_log = open(self.log_path) log = GeneralQueryLog(general_log) session_db_map = {} sqls = [] for entry in log: if entry['command'] == 'Connect': m = re.search('\s+on\s(?P<name>\w+)', entry['argument']) if m: session_db_map[entry['session_id']] = m.groupdict()['name'].strip() elif entry['command'] == 'Init DB': session_db_map[entry['session_id']] = entry['argument'].strip() elif entry['command'] == 'Query': sql = entry['argument'] if sql.strip()[:6].lower() == 'select': yield SQL(session_db_map.get(entry['session_id'], ''), sql)
def git_get_keywords(versionfile_abs): # the code embedded in _version.py can just fetch the value of these # keywords. When used from setup.py, we don't want to import _version.py, # so we do it with a regexp instead. This function is not used from # _version.py. keywords = {} try: f = open(versionfile_abs, "r") for line in f.readlines(): if line.strip().startswith("git_refnames ="): mo = re.search(r'=\s*"(.*)"', line) if mo: keywords["refnames"] = mo.group(1) if line.strip().startswith("git_full ="): mo = re.search(r'=\s*"(.*)"', line) if mo: keywords["full"] = mo.group(1) f.close() except EnvironmentError: pass return keywords
def scan_thread(keyword, catalog_json): # Check each thread, threads who contains the keyword are returned matched_threads = [] for i in range(len(catalog_json)): for thread in catalog_json[i]["threads"]: regex = r'\b{0}\b'.format(keyword) # Search thread title if 'sub' in thread: if re.search(regex, str(thread["sub"]), re.IGNORECASE): matched_threads.append(thread["no"]) # Search OPs post body if 'com' in thread: if re.search(regex, str(thread["com"]), re.IGNORECASE): matched_threads.append(thread["no"]) return matched_threads
def alter_table(self, alter_string): lines = alter_string.replace('\n', ' ').split(';') for line in lines: if 'PRIMARY KEY' in line: table_name = re.search("TABLE `(\w+)`", line).group(1) table = self.get_table_by_name(table_name) primary_key_columns = re.findall("PRIMARY KEY \(`(\w+)`\)", line) for primary_key_column in primary_key_columns: table.add_primary_key(primary_key_column) elif 'FOREIGN KEY' in line: table_name = re.search("TABLE `(\w+)`", line).group(1) table = self.get_table_by_name(table_name) foreign_keys_list = re.findall("FOREIGN KEY \(`(\w+)`\) REFERENCES `(\w+)` \(`(\w+)`\)", line) for column, foreign_table, foreign_column in foreign_keys_list: table.add_foreign_key(column, foreign_table, foreign_column)
def allocate(self, ip_addr, name, platform, cpus, memory, disk): """When a node is found, scheduler calls this method with IP address, name, CPUs, memory and disk available on that node. This method should return a number indicating number of CPUs to use. If return value is 0, the node is not used; if the return value is < 0, this allocation is ignored (next allocation in the 'node_allocations' list, if any, is applied). """ if not re.match(self.ip_rex, ip_addr): return -1 if (self.platform and not re.search(self.platform, platform)): return -1 if ((self.memory and memory and self.memory > memory) or (self.disk and disk and self.disk > disk)): return 0 if self.cpus > 0: if self.cpus > cpus: return 0 return self.cpus elif self.cpus == 0: return 0 else: cpus += self.cpus if cpus < 0: return 0 return cpus
def Resolver(obj, path, full = False): m = re.search('#/(.*)/(.*)', path) x = None if full: b = obj[m.group(1)] x = b[m.group(2)] else: x = m.group(2) return x
def list_nics(nic_type=None): """Return a list of nics of given type(s)""" if isinstance(nic_type, six.string_types): int_types = [nic_type] else: int_types = nic_type interfaces = [] if nic_type: for int_type in int_types: cmd = ['ip', 'addr', 'show', 'label', int_type + '*'] ip_output = subprocess.check_output(cmd).decode('UTF-8') ip_output = ip_output.split('\n') ip_output = (line for line in ip_output if line) for line in ip_output: if line.split()[1].startswith(int_type): matched = re.search('.*: (' + int_type + r'[0-9]+\.[0-9]+)@.*', line) if matched: iface = matched.groups()[0] else: iface = line.split()[1].replace(":", "") if iface not in interfaces: interfaces.append(iface) else: cmd = ['ip', 'a'] ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n') ip_output = (line.strip() for line in ip_output if line) key = re.compile('^[0-9]+:\s+(.+):') for line in ip_output: matched = re.search(key, line) if matched: iface = matched.group(1) iface = iface.partition("@")[0] if iface not in interfaces: interfaces.append(iface) return interfaces
def is_device_mounted(device): '''Given a device path, return True if that device is mounted, and False if it isn't. :param device: str: Full path of the device to check. :returns: boolean: True if the path represents a mounted device, False if it doesn't. ''' try: out = check_output(['lsblk', '-P', device]).decode('UTF-8') except: return False return bool(re.search(r'MOUNTPOINT=".+"', out))
def parseDebian(packages_file): """ Parse debian package list to dict (name:version) """ result = {} if args.clean==True: first_field = 0 else: first_field = 1 for line in packages_file: if args.clean==True or line[:2] == 'ii': fields = line.split() if len(fields) < 2 + first_field: continue # Software name search = fields[first_field].find(':') if search != -1: soft_name = cleanName(fields[first_field][:search]) else: soft_name = cleanName(fields[first_field]) # Version search = re.search(r"-|\+|~", fields[first_field + 1]) if search: soft_version = fields[first_field + 1][:search.span()[0]] else: soft_version = fields[first_field + 1] search = soft_version.find(':') if search != -1: soft_version = soft_version[search + 1:] soft_version = purgeVersionString(soft_version) # Format check if not soft_name or not soft_version: continue # Intense package name split if args.intense and '-' in soft_name: for sub_package in soft_name.split('-'): if len(sub_package)>2 and '.' not in sub_package and sub_package not in badpackages: result[sub_package] = soft_version else: if soft_name not in badpackages: result[soft_name] = soft_version return result
def purgeVersionString(version_string): """ Eliminate invalid characters and last dot from version string """ search = re.search(r'[^0-9.]', version_string) if search: result = version_string[:search.span()[0]] else: result = version_string if len(result) > 0 and result[-1] == '.': result = result[:-1] return result
def _search_for_query(self, query): if query in self._search_pattern_cache: return self._search_pattern_cache[query] # Build pattern: include all characters pattern = [] for c in query: # pattern.append('[^{0}]*{0}'.format(re.escape(c))) pattern.append('.*?{0}'.format(re.escape(c))) pattern = ''.join(pattern) search = re.compile(pattern, re.IGNORECASE).search self._search_pattern_cache[query] = search return search
def get_password(self, account, service=None): """Retrieve the password saved at ``service/account``. Raise :class:`PasswordNotFound` exception if password doesn't exist. :param account: name of the account the password is for, e.g. "Pinboard" :type account: ``unicode`` :param service: Name of the service. By default, this is the workflow's bundle ID :type service: ``unicode`` :returns: account password :rtype: ``unicode`` """ if not service: service = self.bundleid output = self._call_security('find-generic-password', service, account, '-g') # Parsing of `security` output is adapted from python-keyring # by Jason R. Coombs # https://pypi.python.org/pypi/keyring m = re.search( r'password:\s*(?:0x(?P<hex>[0-9A-F]+)\s*)?(?:"(?P<pw>.*)")?', output) if m: groups = m.groupdict() h = groups.get('hex') password = groups.get('pw') if h: password = unicode(binascii.unhexlify(h), 'utf-8') self.logger.debug('Got password : %s:%s', service, account) return password