我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用re.match()。
def __init__( self, index_url="https://pypi.python.org/simple", hosts=('*',), ca_bundle=None, verify_ssl=True, *args, **kw ): Environment.__init__(self, *args, **kw) self.index_url = index_url + "/" [:not index_url.endswith('/')] self.scanned_urls = {} self.fetched_urls = {} self.package_pages = {} self.allows = re.compile('|'.join(map(translate, hosts))).match self.to_scan = [] use_ssl = ( verify_ssl and ssl_support.is_available and (ca_bundle or ssl_support.find_ca_bundle()) ) if use_ssl: self.opener = ssl_support.opener_for(ca_bundle) else: self.opener = urllib.request.urlopen
def load_setting_sync_delay(logger, config_settings): """ Attempt to parse delay between sync loops from config settings :param logger: the logger :param config_settings: config settings loaded from config file :return: extracted sync delay if valid, else DEFAULT_SYNC_DELAY_IN_SECONDS """ try: sync_delay = config_settings['export_options']['sync_delay_in_seconds'] sync_delay_is_valid = re.match('^[0-9]+$', str(sync_delay)) if sync_delay_is_valid and sync_delay >= 0: if sync_delay < DEFAULT_SYNC_DELAY_IN_SECONDS: '{0} seconds'.format(logger.info( 'Sync delay is less than the minimum recommended value of ' + str(DEFAULT_SYNC_DELAY_IN_SECONDS))) return sync_delay else: logger.info('Invalid sync_delay_in_seconds from the configuration file, defaulting to {0}'.format(str( DEFAULT_SYNC_DELAY_IN_SECONDS))) return DEFAULT_SYNC_DELAY_IN_SECONDS except Exception as ex: log_critical_error(logger, ex, 'Exception parsing sync_delay from the configuration file, defaulting to {0}'.format(str( DEFAULT_SYNC_DELAY_IN_SECONDS))) return DEFAULT_SYNC_DELAY_IN_SECONDS
def load_setting_api_access_token(logger, config_settings): """ Attempt to parse API token from config settings :param logger: the logger :param config_settings: config settings loaded from config file :return: API token if valid, else None """ try: api_token = config_settings['API']['token'] token_is_valid = re.match('^[a-f0-9]{64}$', api_token) if token_is_valid: logger.debug('API token matched expected pattern') return api_token else: logger.error('API token failed to match expected pattern') return None except Exception as ex: log_critical_error(logger, ex, 'Exception parsing API token from config.yaml') return None
def load_setting_input_filename(logger, config_settings): """ Attempt to parse input filename from config settings :param logger: the logger :param config_settings: config settings loaded from config file :return: input filename from config file if valid, else None """ try: filename = config_settings['input_filename'] filename_is_valid = re.match('.+xls|.+xlsx', filename) if filename_is_valid: logger.debug('Filename matched expected pattern') return filename else: logger.error('Filename failed to match expected pattern, acceptable formats are xls and xlsx') return None except Exception as ex: log_critical_error(logger, ex, 'Exception parsing input filename from config.yaml')
def _sort_names(names): ''' Sort peeker names by index and alphabetically. For example, the peeker names would be sorted as a[0], b[0], a[1], b[1], ... ''' def index_key(lbl): '''Index sorting.''' m = re.match('.*\[(\d+)\]$', lbl) # Get the bracketed index. if m: return int(m.group(1)) # Return the index as an integer. return -1 # No index found so it comes before everything else. def name_key(lbl): '''Name sorting.''' m = re.match('^([^\[]+)', lbl) # Get name preceding bracketed index. if m: return m.group(1) # Return name. return '' # No name found. srt_names = sorted(names, key=name_key) srt_names = sorted(srt_names, key=index_key) return srt_names
def split_filename(filename, project_name=None): """ Extract name, version, python version from a filename (no extension) Return name, version, pyver or None """ result = None pyver = None filename = unquote(filename).replace(' ', '-') m = PYTHON_VERSION.search(filename) if m: pyver = m.group(1) filename = filename[:m.start()] if project_name and len(filename) > len(project_name) + 1: m = re.match(re.escape(project_name) + r'\b', filename) if m: n = m.end() result = filename[:n], filename[n + 1:], pyver if result is None: m = PROJECT_NAME_AND_VERSION.match(filename) if m: result = m.group(1), m.group(3), pyver return result # Allow spaces in name because of legacy dists like "Twisted Core"
def __init__(self, filename): """ :raises InvalidWheelFilename: when the filename is invalid for a wheel """ wheel_info = self.wheel_file_re.match(filename) if not wheel_info: raise InvalidWheelFilename( "%s is not a valid wheel filename." % filename ) self.filename = filename self.name = wheel_info.group('name').replace('_', '-') # we'll assume "_" means "-" due to wheel naming scheme # (https://github.com/pypa/pip/issues/1150) self.version = wheel_info.group('ver').replace('_', '-') self.pyversions = wheel_info.group('pyver').split('.') self.abis = wheel_info.group('abi').split('.') self.plats = wheel_info.group('plat').split('.') # All the tag combinations from this file self.file_tags = set( (x, y, z) for x in self.pyversions for y in self.abis for z in self.plats )
def _needs_hiding(mod_name): """ >>> _needs_hiding('setuptools') True >>> _needs_hiding('pkg_resources') True >>> _needs_hiding('setuptools_plugin') False >>> _needs_hiding('setuptools.__init__') True >>> _needs_hiding('distutils') True >>> _needs_hiding('os') False >>> _needs_hiding('Cython') True """ pattern = re.compile(r'(setuptools|pkg_resources|distutils|Cython)(\.|$)') return bool(pattern.match(mod_name))
def filterwarnings(action, message="", category=Warning, module="", lineno=0, append=False): """Insert an entry into the list of warnings filters (at the front). 'action' -- one of "error", "ignore", "always", "default", "module", or "once" 'message' -- a regex that the warning message must match 'category' -- a class that the warning must be a subclass of 'module' -- a regex that the module name must match 'lineno' -- an integer line number, 0 matches all warnings 'append' -- if true, append to the list of filters """ import re assert action in ("error", "ignore", "always", "default", "module", "once"), "invalid action: %r" % (action,) assert isinstance(message, str), "message must be a string" assert isinstance(category, type), "category must be a class" assert issubclass(category, Warning), "category must be a Warning subclass" assert isinstance(module, str), "module must be a string" assert isinstance(lineno, int) and lineno >= 0, \ "lineno must be an int >= 0" _add_filter(action, re.compile(message, re.I), category, re.compile(module), lineno, append=append)
def _getcategory(category): import re if not category: return Warning if re.match("^[a-zA-Z0-9_]+$", category): try: cat = eval(category) except NameError: raise _OptionError("unknown warning category: %r" % (category,)) else: i = category.rfind(".") module = category[:i] klass = category[i+1:] try: m = __import__(module, None, None, [klass]) except ImportError: raise _OptionError("invalid module name: %r" % (module,)) try: cat = getattr(m, klass) except AttributeError: raise _OptionError("unknown warning category: %r" % (category,)) if not issubclass(cat, Warning): raise _OptionError("invalid warning category: %r" % (category,)) return cat
def __exit__(self, *exc_info): if not self._entered: raise RuntimeError("Cannot exit %r without entering first" % self) self._module.filters = self._filters self._module._filters_mutated() self._module.showwarning = self._showwarning self._module._showwarnmsg_impl = self._showwarnmsg_impl # filters contains a sequence of filter 5-tuples # The components of the 5-tuple are: # - an action: error, ignore, always, default, module, or once # - a compiled regex that must match the warning message # - a class representing the warning category # - a compiled regex that must match the module that is being warned # - a line number for the line being warning, or 0 to mean any line # If either if the compiled regexs are None, match anything.
def b64decode(s, altchars=None, validate=False): """Decode the Base64 encoded bytes-like object or ASCII string s. Optional altchars must be a bytes-like object or ASCII string of length 2 which specifies the alternative alphabet used instead of the '+' and '/' characters. The result is returned as a bytes object. A binascii.Error is raised if s is incorrectly padded. If validate is False (the default), characters that are neither in the normal base-64 alphabet nor the alternative alphabet are discarded prior to the padding check. If validate is True, these non-alphabet characters in the input result in a binascii.Error. """ s = _bytes_from_decode_data(s) if altchars is not None: altchars = _bytes_from_decode_data(altchars) assert len(altchars) == 2, repr(altchars) s = s.translate(bytes.maketrans(altchars, b'+/')) if validate and not re.match(b'^[A-Za-z0-9+/]*={0,2}$', s): raise binascii.Error('Non-base64 digit found') return binascii.a2b_base64(s)
def get_model_filenames(model_dir): files = os.listdir(model_dir) meta_files = [s for s in files if s.endswith('.meta')] if len(meta_files)==0: raise ValueError('No meta file found in the model directory (%s)' % model_dir) elif len(meta_files)>1: raise ValueError('There should not be more than one meta file in the model directory (%s)' % model_dir) meta_file = meta_files[0] meta_files = [s for s in files if '.ckpt' in s] max_step = -1 for f in files: step_str = re.match(r'(^model-[\w\- ]+.ckpt-(\d+))', f) if step_str is not None and len(step_str.groups())>=2: step = int(step_str.groups()[1]) if step > max_step: max_step = step ckpt_file = step_str.groups()[0] return meta_file, ckpt_file
def sms(): if len(sys.argv) < 2: print(help_text) return if sys.argv[1] == "send": if len(sys.argv) < 3: print("????? ????? ?? ?? ???.") return if not re.match(r"[\+98|0]9[0-9]*",sys.argv[2]): print("????? ???? ??? ?????? ???.tetete") return number = sys.argv[2] if re.match(sys.argv[2], r"^\+98"): number = re.sub("+98", "0", number) text = sys.argv[3] if len(text) > 100: print("????? ??????? ??? ??? ????? ???? ???.") return send_sms(number, text, str(time.time())) return if sys.argv[1] == "credits": get_credits() return
def _ungroup_go_imports(fname): with open(fname, 'r+') as f: content = f.readlines() out = [] import_block = False for line in content: c = line.strip() if import_block: if c == '': continue elif re.match(END_IMPORT_REGEX, c) is not None: import_block = False elif re.match(BEGIN_IMPORT_REGEX, c) is not None: import_block = True out.append(line) f.seek(0) f.writelines(out) f.truncate()
def check_header_validity(header): """Verifies that header value is a string which doesn't contain leading whitespace or return characters. This prevents unintended header injection. :param header: tuple, in the format (name, value). """ name, value = header if isinstance(value, bytes): pat = _CLEAN_HEADER_REGEX_BYTE else: pat = _CLEAN_HEADER_REGEX_STR try: if not pat.match(value): raise InvalidHeader("Invalid return character or leading space in header: %s" % name) except TypeError: raise InvalidHeader("Value for header {%s: %s} must be of type str or " "bytes, not %s" % (name, value, type(value)))
def zap_pyfiles(self): log.info("Removing .py files from temporary directory") for base, dirs, files in walk_egg(self.bdist_dir): for name in files: path = os.path.join(base, name) if name.endswith('.py'): log.debug("Deleting %s", path) os.unlink(path) if base.endswith('__pycache__'): path_old = path pattern = r'(?P<name>.+)\.(?P<magic>[^.]+)\.pyc' m = re.match(pattern, name) path_new = os.path.join(base, os.pardir, m.group('name') + '.pyc') log.info("Renaming file from [%s] to [%s]" % (path_old, path_new)) try: os.remove(path_new) except OSError: pass os.rename(path_old, path_new)
def find_external_links(url, page): """Find rel="homepage" and rel="download" links in `page`, yielding URLs""" for match in REL.finditer(page): tag, rel = match.groups() rels = set(map(str.strip, rel.lower().split(','))) if 'homepage' in rels or 'download' in rels: for match in HREF.finditer(tag): yield urllib.parse.urljoin(url, htmldecode(match.group(1))) for tag in ("<th>Home Page", "<th>Download URL"): pos = page.find(tag) if pos != -1: match = HREF.search(page, pos) if match: yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
def get_all_users(show_teachers=False): """ Finds all the users in the database Args: show_teachers: whether or not to include teachers in the response Returns: Returns the uid, username, and email of all users. """ db = api.common.get_conn() match = {} projection = {"uid": 1, "username": 1, "email": 1, "tid": 1} if not show_teachers: match.update({"teacher": False}) projection.update({"teacher": 1}) return list(db.users.find(match, projection))
def run(self): view = self.window.active_view() pt = view.sel()[0] scope = view.scope_name(pt.begin()).rstrip() # TODO Fix jumptags scopes (rename them to less generic scopes) jumptag_scopes = [ 'text.neovintageous.help string.neovintageous', 'text.neovintageous.help support.constant.neovintageous' ] if scope not in jumptag_scopes: return subject = view.substr(view.extract_scope(pt.begin())) if len(subject) < 3: return match = re.match('^\'[a-z_]+\'|\\|[^\\s\\|]+\\|$', subject) if match: subject = subject.strip('|') # TODO Refactor ex_help code into a reusable middle layer so that # this command doesn't have to call the ex command. self.window.run_command('ex_help', {'command_line': 'help ' + subject})
def find_line_text_object(view, s): """Implement the line object.""" line = view.line(s) line_content = view.substr(line) begin = line.begin() end = line.end() whitespace_match = re.match("\s+", line_content) if whitespace_match: begin = begin + len(whitespace_match.group(0)) return (begin, end) # TODO: Move this to units.py.
def _url(regex, text): match = re.match(regex, text) if match: url = match.group('url') # Remove end of line full stop character. url = url.rstrip('.') # Remove closing tag markdown link e.g. [title](url) url = url.rstrip(')') # Remove closing tag markdown image e.g. ![alt](url)] if url[-2:] == ')]': url = url[:-2] return url return None
def check_callbacks(bot, trigger, url, run=True): """ Check the given URL against the callbacks list. If it matches, and ``run`` is given as ``True``, run the callback function, otherwise pass. Returns ``True`` if the url matched anything in the callbacks list. """ # Check if it matches the exclusion list first matched = any(regex.search(url) for regex in bot.memory['url_exclude']) # Then, check if there's anything in the callback list for regex, function in tools.iteritems(bot.memory['url_callbacks']): match = regex.search(url) if match: if run: function(bot, trigger, match) matched = True return matched
def multi_stat_check(args, filename): dict = {} try: with open(args.container + "/" + filename, "r") as f: for line in f: m = _STAT_RE.match(line) if m: dict[m.group(1)] = m.group(2) except Exception, e: if not os.path.isdir(args.container): os.mkdir(args.container) debug(args.container + ": could not get last stats from " + filename) debug(str(e)) # first time running for this container create empty file open(args.container + "/" + filename,"w").close() return dict
def multi_stat_update(args, container_dir, filename): dict = {} try: pipe = os.popen("docker exec " + args.container + " cat " + container_dir + "/" + filename + " 2>&1") for line in pipe: m = _STAT_RE.match(line) if m: dict[m.group(1)] = m.group(2) pipe.close() f = open(args.container + "/" + filename,"w") for key in dict.keys(): f.write(key + " " + dict[key] + "\n") f.close() except Exception, e: debug(args.container + ": could not update " + filename) debug(str(sys.exc_info())) return dict
def parse(version): """Parse version to major, minor, patch, pre-release, build parts. :param version: version string :return: dictionary with the keys 'build', 'major', 'minor', 'patch', and 'prerelease'. The prerelease or build keys can be None if not provided :rtype: dict """ match = _REGEX.match(version) if match is None: raise ValueError('%s is not valid SemVer string' % version) version_parts = match.groupdict() version_parts['major'] = int(version_parts['major']) version_parts['minor'] = int(version_parts['minor']) version_parts['patch'] = int(version_parts['patch']) return version_parts
def _nat_cmp(a, b): def convert(text): return int(text) if re.match('[0-9]+', text) else text def split_key(key): return [convert(c) for c in key.split('.')] def cmp_prerelease_tag(a, b): if isinstance(a, int) and isinstance(b, int): return cmp(a, b) elif isinstance(a, int): return -1 elif isinstance(b, int): return 1 else: return cmp(a, b) a, b = a or '', b or '' a_parts, b_parts = split_key(a), split_key(b) for sub_a, sub_b in zip(a_parts, b_parts): cmp_result = cmp_prerelease_tag(sub_a, sub_b) if cmp_result != 0: return cmp_result else: return cmp(len(a), len(b))
def get_iface_from_addr(addr): """Work out on which interface the provided address is configured.""" for iface in netifaces.interfaces(): addresses = netifaces.ifaddresses(iface) for inet_type in addresses: for _addr in addresses[inet_type]: _addr = _addr['addr'] # link local ll_key = re.compile("(.+)%.*") raw = re.match(ll_key, _addr) if raw: _addr = raw.group(1) if _addr == addr: log("Address '%s' is configured on iface '%s'" % (addr, iface)) return iface msg = "Unable to infer net iface on which '%s' is configured" % (addr) raise Exception(msg)
def _add_cloud_pocket(pocket): """Add a cloud pocket as /etc/apt/sources.d/cloud-archive.list Note that this overwrites the existing file if there is one. This function also converts the simple pocket in to the actual pocket using the CLOUD_ARCHIVE_POCKETS mapping. :param pocket: string representing the pocket to add a deb spec for. :raises: SourceConfigError if the cloud pocket doesn't exist or the requested release doesn't match the current distro version. """ apt_install(filter_installed_packages(['ubuntu-cloud-keyring']), fatal=True) if pocket not in CLOUD_ARCHIVE_POCKETS: raise SourceConfigError( 'Unsupported cloud: source option %s' % pocket) actual_pocket = CLOUD_ARCHIVE_POCKETS[pocket] with open('/etc/apt/sources.list.d/cloud-archive.list', 'w') as apt: apt.write(CLOUD_ARCHIVE.format(actual_pocket))
def _add_cloud_staging(cloud_archive_release, openstack_release): """Add the cloud staging repository which is in ppa:ubuntu-cloud-archive/<openstack_release>-staging This function checks that the cloud_archive_release matches the current codename for the distro that charm is being installed on. :param cloud_archive_release: string, codename for the release. :param openstack_release: String, codename for the openstack release. :raises: SourceConfigError if the cloud_archive_release doesn't match the current version of the os. """ _verify_is_ubuntu_rel(cloud_archive_release, openstack_release) ppa = 'ppa:ubuntu-cloud-archive/{}-staging'.format(openstack_release) cmd = 'add-apt-repository -y {}'.format(ppa) _run_with_retries(cmd.split(' '))
def summaries_to_be_listed_in_waiting_list( result_log, status_pr, statuses, trigger_str): summaries = [] for a_status in statuses: # results's 2nd. condition which match to "Start" is for # fail-safe. if not result_log.has_result_of_status( status=a_status, results=["Succeed", "Start", "Failed"]): st_str = status_pr.make_tweet_string_from_toot( a_status, hashtag=trigger_str) rs_str = "Waiting" rs_sm = result_log.make_result_and_others_summary( status_string=st_str, hashtag=trigger_str, result=rs_str) in_sm = result_log.make_status_summary("inbound", a_status) rs_sm.update(in_sm) summaries.append(rs_sm) return summaries
def cares_about(self, delta): """Return True if this observer "cares about" (i.e. wants to be called) for a this delta. """ if (self.entity_id and delta.get_id() and not re.match(self.entity_id, str(delta.get_id()))): return False if self.entity_type and self.entity_type != delta.entity: return False if self.action and self.action != delta.type: return False if self.predicate and not self.predicate(delta): return False return True
def module_requirements(requirements_path, module_names, strict_bounds, conda_format=False): module_names = set(module_names) found = set() module_lines = [] for line in read_requirements(requirements_path, strict_bounds=strict_bounds): match = REQ_PATTERN.match(line) if match is None: raise AssertionError("Could not parse requirement: '%s'" % line) name = match.group(1) if name in module_names: found.add(name) if conda_format: line = _conda_format(line) module_lines.append(line) if found != module_names: raise AssertionError( "No requirements found for %s." % (module_names - found) ) return module_lines
def parse(self, response): """ ???html??????url ?????url?????? ?????url???? /question/xxx ????????????? """ all_urls = response.css("a::attr(href)").extract() all_urls = [parse.urljoin(response.url, url) for url in all_urls] # ??lambda???????url????????true???????false??? all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls) for url in all_urls: match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url) if match_obj: # ?????question??????????????????? request_url = match_obj.group(1) yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question) #?? # break else: # pass # ????question?????????? yield scrapy.Request(url, headers=self.headers, callback=self.parse)
def login(self, response): response_text = response.text match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL) xsrf = '' if match_obj: xsrf = (match_obj.group(1)) if xsrf: post_url = "https://www.zhihu.com/login/phone_num" post_data = { "_xsrf": xsrf, "phone_num": "18487255487", "password": "ty158917", "captcha": "" } import time t = str(int(time.time() * 1000)) captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t) yield scrapy.Request(captcha_url, headers=self.headers, meta={"post_data":post_data}, callback=self.login_after_captcha)
def get_xsrf(): #??xsrf code response = session.get("https://www.zhihu.com", headers=header) response_text = response.text match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL) xsrf = '' if match_obj: xsrf = (match_obj.group(1)) return xsrf
def zhihu_login(account, password): #???? if re.match("^1\d{10}",account): print ("??????") post_url = "https://www.zhihu.com/login/phone_num" post_data = { "_xsrf": get_xsrf(), "phone_num": account, "password": password, "captcha":get_captcha() } else: if "@" in account: #?????????? print("??????") post_url = "https://www.zhihu.com/login/email" post_data = { "_xsrf": get_xsrf(), "email": account, "password": password } response_text = session.post(post_url, data=post_data, headers=header) session.cookies.save() # get_index() # is_login() # get_captcha()
def get_nums(value): match_re = re.match(".*?(\d+).*", value) if match_re: nums = int(match_re.group(1)) else: nums = 0 return nums
def allocate(self, ip_addr, name, platform, cpus, memory, disk): """When a node is found, scheduler calls this method with IP address, name, CPUs, memory and disk available on that node. This method should return a number indicating number of CPUs to use. If return value is 0, the node is not used; if the return value is < 0, this allocation is ignored (next allocation in the 'node_allocations' list, if any, is applied). """ if not re.match(self.ip_rex, ip_addr): return -1 if (self.platform and not re.search(self.platform, platform)): return -1 if ((self.memory and memory and self.memory > memory) or (self.disk and disk and self.disk > disk)): return 0 if self.cpus > 0: if self.cpus > cpus: return 0 return self.cpus elif self.cpus == 0: return 0 else: cpus += self.cpus if cpus < 0: return 0 return cpus
def __init__(self, host, tcp_port): if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host): self.addr = host else: self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0] self.port = int(tcp_port)
def __init__(self, fqArchiveUrl, filtersDir, outputPrefix, outputUrl, diskSize, diskType, logsPath, container, scriptUrl, tag, cores, mem, preemptible): super(PipelineStep, self).__init__() fqFileName = os.path.basename(fqArchiveUrl) fqInputs = "{fqArchive}:{fqFileName}".format(fqArchive=fqArchiveUrl, fqFileName=fqFileName) try: filtersDirContents = subprocess.check_output(["gsutil", "ls", filtersDir]) except subprocess.CalledProcessError as e: print "ERROR: couldn't get a listing of filter files! -- {reason}".format(reason=e) exit(-1) bfInputs = [x for x in filtersDirContents.split('\n') if re.match('^.*\.bf$', x) or re.match('^.*\.txt', x)] bfInputs.append(fqInputs) inputs = ",".join(["{url}:{filename}".format(url=x, filename=os.path.basename(x)) for x in bfInputs]) outputs = "{outputPrefix}*:{outDir}".format(outputPrefix=outputPrefix, outDir=outputUrl) env = "INPUT_FILE={fqFileName},OUTPUT_PREFIX={outputPrefix},FILTERS_LIST={filtersList}".format(fqFileName=fqFileName, outputPrefix=outputPrefix, filtersList=','.join([os.path.basename(x) for x in bfInputs if re.match('^.*\.bf$', x)])) self._step = PipelineSchema("biobloomcategorizer", self._pipelinesConfig, logsPath, container, scriptUrl=scriptUrl, cores=cores, mem=mem, diskSize=diskSize, diskType=diskType, inputs=inputs, outputs=outputs, env=env, tag=tag, preemptible=preemptible)
def docker_version(): out = subprocess.check_output(["docker", "-v"]) mo = re.match(br"Docker version (\d+)\.(\d+)\.(\d+)", out) if mo: return tuple(map(int, mo.groups())) die("unable to parse a version number from the output of 'docker -v'")