Python six.moves.urllib.parse 模块,unquote() 实例源码

我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用six.moves.urllib.parse.unquote()

项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _header_to_id(self, header):
    """Convert a Content-ID header value to an id.

    Presumes the Content-ID header conforms to the format that _id_to_header()
    returns.

    Args:
      header: string, Content-ID header value.

    Returns:
      The extracted id value.

    Raises:
      BatchError if the header is not in the expected format.
    """
    if header[0] != '<' or header[-1] != '>':
      raise BatchError("Invalid value for Content-ID: %s" % header)
    if '+' not in header:
      raise BatchError("Invalid value for Content-ID: %s" % header)
    base, id_ = header[1:-1].rsplit('+', 1)

    return unquote(id_)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _header_to_id(self, header):
    """Convert a Content-ID header value to an id.

    Presumes the Content-ID header conforms to the format that _id_to_header()
    returns.

    Args:
      header: string, Content-ID header value.

    Returns:
      The extracted id value.

    Raises:
      BatchError if the header is not in the expected format.
    """
    if header[0] != '<' or header[-1] != '>':
      raise BatchError("Invalid value for Content-ID: %s" % header)
    if '+' not in header:
      raise BatchError("Invalid value for Content-ID: %s" % header)
    base, id_ = header[1:-1].rsplit('+', 1)

    return unquote(id_)
项目:webapp2    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def match(self, request):
        """Matches this route against the current request.

        :raises:
            ``exc.HTTPMethodNotAllowed`` if the route defines :attr:`methods`
            and the request method isn't allowed.

        .. seealso:: :meth:`BaseRoute.match`.
        """
        match = self.regex.match(unquote(request.path))
        if not match or self.schemes and request.scheme not in self.schemes:
            return None

        if self.methods and request.method not in self.methods:
            # This will be caught by the router, so routes with different
            # methods can be tried.
            raise exc.HTTPMethodNotAllowed()

        args, kwargs = _get_route_variables(match, self.defaults.copy())
        return self, args, kwargs
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def decode(data_url):
    """
    Decode DataURL data
    """
    metadata, data = data_url.rsplit(',', 1)
    _, metadata = metadata.split('data:', 1)
    parts = metadata.split(';')
    if parts[-1] == 'base64':
        data = b64decode(data)
    else:
        data = unquote(data)

    for part in parts:
        if part.startswith("charset="):
            data = data.decode(part[8:])
    return data
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def _header_to_id(self, header):
    """Convert a Content-ID header value to an id.

    Presumes the Content-ID header conforms to the format that _id_to_header()
    returns.

    Args:
      header: string, Content-ID header value.

    Returns:
      The extracted id value.

    Raises:
      BatchError if the header is not in the expected format.
    """
    if header[0] != '<' or header[-1] != '>':
      raise BatchError("Invalid value for Content-ID: %s" % header)
    if '+' not in header:
      raise BatchError("Invalid value for Content-ID: %s" % header)
    base, id_ = header[1:-1].rsplit('+', 1)

    return unquote(id_)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def _header_to_id(self, header):
    """Convert a Content-ID header value to an id.

    Presumes the Content-ID header conforms to the format that _id_to_header()
    returns.

    Args:
      header: string, Content-ID header value.

    Returns:
      The extracted id value.

    Raises:
      BatchError if the header is not in the expected format.
    """
    if header[0] != '<' or header[-1] != '>':
      raise BatchError("Invalid value for Content-ID: %s" % header)
    if '+' not in header:
      raise BatchError("Invalid value for Content-ID: %s" % header)
    base, id_ = header[1:-1].rsplit('+', 1)

    return unquote(id_)
项目:github2gitlab    作者:chauffer    | 项目源码 | 文件源码
def verify_merge_update(self, updates, result):
        g = self.gitlab
        for (key, value) in six.iteritems(updates):
            if key == 'private_token':
                continue
            if key == 'state_event':
                key = 'state'
                value = self.STATE_EVENT2MERGE_STATE[updates['state_event']]
            result_value = result.get(key) or ''
            if value != result_value:
                url = (g['host'] + "/" + parse.unquote(g['repo']) + "/" +
                       "merge_requests/" + str(result['iid']))
                raise ValueError("{url}: {key} value expected to be {value}"
                                 " but is {result}".format(
                                     url=url,
                                     key=key,
                                     value=value,
                                     result=result_value))

# Local Variables:
# compile-command: "cd .. ; virtualenv/bin/tox -e flake8"
# End:
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def post_account(self, headers, query_string=None, data=None,
                     response_dict=None):
        if query_string == 'bulk-delete':
            resp = {'Response Status': '200 OK',
                    'Response Body': '',
                    'Number Deleted': 0,
                    'Number Not Found': 0}
            if response_dict is not None:
                response_dict['status'] = 200
            if data:
                for path in data.splitlines():
                    try:
                        __, container, obj = (unquote(path.decode('utf8'))
                                              .split('/', 2))
                        del self.kvs[container][obj]
                        resp['Number Deleted'] += 1
                    except KeyError:
                        resp['Number Not Found'] += 1
            return {}, json.dumps(resp).encode('utf-8')

        if response_dict is not None:
            response_dict['status'] = 204

        return {}, None
项目:alfredToday    作者:jeeftor    | 项目源码 | 文件源码
def _header_to_id(self, header):
    """Convert a Content-ID header value to an id.

    Presumes the Content-ID header conforms to the format that _id_to_header()
    returns.

    Args:
      header: string, Content-ID header value.

    Returns:
      The extracted id value.

    Raises:
      BatchError if the header is not in the expected format.
    """
    if header[0] != '<' or header[-1] != '>':
      raise BatchError("Invalid value for Content-ID: %s" % header)
    if '+' not in header:
      raise BatchError("Invalid value for Content-ID: %s" % header)
    base, id_ = header[1:-1].rsplit('+', 1)

    return unquote(id_)
项目:Webradio_v2    作者:Acer54    | 项目源码 | 文件源码
def _header_to_id(self, header):
    """Convert a Content-ID header value to an id.

    Presumes the Content-ID header conforms to the format that _id_to_header()
    returns.

    Args:
      header: string, Content-ID header value.

    Returns:
      The extracted id value.

    Raises:
      BatchError if the header is not in the expected format.
    """
    if header[0] != '<' or header[-1] != '>':
      raise BatchError("Invalid value for Content-ID: %s" % header)
    if '+' not in header:
      raise BatchError("Invalid value for Content-ID: %s" % header)
    base, id_ = header[1:-1].rsplit('+', 1)

    return unquote(id_)
项目:GAMADV-X    作者:taers232c    | 项目源码 | 文件源码
def _header_to_id(self, header):
    """Convert a Content-ID header value to an id.

    Presumes the Content-ID header conforms to the format that _id_to_header()
    returns.

    Args:
      header: string, Content-ID header value.

    Returns:
      The extracted id value.

    Raises:
      BatchError if the header is not in the expected format.
    """
    if header[0] != '<' or header[-1] != '>':
      raise BatchError("Invalid value for Content-ID: %s" % header)
    if '+' not in header:
      raise BatchError("Invalid value for Content-ID: %s" % header)
    base, id_ = header[1:-1].rsplit('+', 1)

    return unquote(id_)
项目:col-aws-clients    作者:collectrium    | 项目源码 | 文件源码
def reduce_url(url):
        """
        Get path to object on bucket from presigned url
        :param url: presigned url
        :type url: str
        :return: path to object on bucket
        :rtype str
        """
        if (url is None) or (parse.urlparse(url).path == url):
            return url
        upr = parse.urlparse(url)
        if not upr.scheme:
            path = url
        else:
            path = parse.unquote(upr.path)
        return path
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def choose_name(fp, mfst):
        """Find the package name for this manifest.  If it's defined in a set
        action in the manifest, use that.  Otherwise use the basename of the
        path to the manifest as the name.  If a proper package fmri is found,
        then also return a PkgFmri object so that we can track which packages
        are being processed.

        'fp' is the path to the file for the manifest.

        'mfst' is the Manifest object."""

        if mfst is None:
                return unquote(os.path.basename(fp)), None
        name = mfst.get("pkg.fmri", mfst.get("fmri", None))
        if name is not None:
                try:
                        pfmri = fmri.PkgFmri(name)
                except fmri.IllegalFmri:
                        pfmri = None
                return name, pfmri
        return unquote(os.path.basename(fp)), None
项目:share-class    作者:junyiacademy    | 项目源码 | 文件源码
def _header_to_id(self, header):
    """Convert a Content-ID header value to an id.

    Presumes the Content-ID header conforms to the format that _id_to_header()
    returns.

    Args:
      header: string, Content-ID header value.

    Returns:
      The extracted id value.

    Raises:
      BatchError if the header is not in the expected format.
    """
    if header[0] != '<' or header[-1] != '>':
      raise BatchError("Invalid value for Content-ID: %s" % header)
    if '+' not in header:
      raise BatchError("Invalid value for Content-ID: %s" % header)
    base, id_ = header[1:-1].rsplit('+', 1)

    return unquote(id_)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def _parse_root_device_hints(node):
    """Convert string with hints to dict. """
    root_device = node.properties.get('root_device')
    if not root_device:
        return {}
    try:
        parsed_hints = irlib_utils.parse_root_device_hints(root_device)
    except ValueError as e:
        raise exception.InvalidParameterValue(
            _('Failed to validate the root device hints for node %(node)s. '
              'Error: %(error)s') % {'node': node.uuid, 'error': e})
    root_device_hints = {}
    advanced = {}
    for hint, value in parsed_hints.items():
        if isinstance(value, six.string_types):
            if value.startswith('== '):
                root_device_hints[hint] = int(value[3:])
            elif value.startswith('s== '):
                root_device_hints[hint] = urlparse.unquote(value[4:])
            else:
                advanced[hint] = value
        else:
            root_device_hints[hint] = value
    if advanced:
        raise exception.InvalidParameterValue(
            _('Ansible-deploy does not support advanced root device hints '
              'based on oslo.utils operators. '
              'Present advanced hints for node %(node)s are %(hints)s.') % {
                  'node': node.uuid, 'hints': advanced})
    return root_device_hints
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def build_path(operation, ns):
    """
    Build a path URI for an operation.

    """
    try:
        return ns.url_for(operation, _external=False)
    except BuildError as error:
        # we are missing some URI path parameters
        uri_templates = {
            argument: "{{{}}}".format(argument)
            for argument in error.suggested.arguments
        }
        # flask will sometimes try to quote '{' and '}' characters
        return unquote(ns.url_for(operation, _external=False, **uri_templates))
项目:zcp    作者:apolloliu    | 项目源码 | 文件源码
def unquote_keys(data):
    """Restores initial view of 'quoted' keys in dictionary data

    :param data: is a dictionary
    :return: data with restored keys if they were 'quoted'.
    """
    if isinstance(data, dict):
        for key, value in data.items():
            if isinstance(value, dict):
                unquote_keys(value)
            if key.startswith('%24'):
                k = parse.unquote(key)
                data[k] = data.pop(key)
    return data
项目:webapp2    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def match(self, request):
        """Matches this route against the current request.

        .. seealso:: :meth:`BaseRoute.match`.
        """
        match = self.regex.match(unquote(request.path))
        if match:
            return self, match.groups(), {}
项目:webapp2    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def unquote(value, *args, **kwargs):
    """Decodes a value using urllib.unquote or urllib.parse.unquote(PY3)
    and deserializes it from JSON.

    Parameters and return value are the same from :func:`decode`.
    """
    return decode(parse.unquote(value), *args, **kwargs)
项目:webapp2    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def match(self, request):
        if not self.regex.match(parse.unquote(request.path)):
            return None

        return _match_routes(self.get_match_children, request)
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def reconnect(self):
        """Reconnect to rabbitmq server"""
        parsed = urlparse.urlparse(self.amqp_url)
        port = parsed.port or 5672
        self.connection = amqp.Connection(host="%s:%s" % (parsed.hostname, port),
                                          userid=parsed.username or 'guest',
                                          password=parsed.password or 'guest',
                                          virtual_host=unquote(
                                              parsed.path.lstrip('/') or '%2F'))
        self.channel = self.connection.channel()
        try:
            self.channel.queue_declare(self.name)
        except amqp.exceptions.PreconditionFailed:
            pass
        #self.channel.queue_purge(self.name)
项目:reahl    作者:reahl    | 项目源码 | 文件源码
def parse_arguments_from_fields(self, for_fields, relative_path):
        if not for_fields:
            return {}
        assert isinstance(relative_path, six.text_type) # Scaffolding for Py3 port
        matched_arguments = self.match(relative_path).match.groupdict()
        fields = self.get_temp_url_argument_field_index(for_fields)
        raw_input_values = dict(
            [(self.convert_str_to_identifier(key), urllib_parse.unquote(value or ''))
             for key, value in matched_arguments.items()])
        fields.accept_input(raw_input_values)
        return fields.as_kwargs()
项目:reahl    作者:reahl    | 项目源码 | 文件源码
def get_session_key(cls):
        context = ExecutionContext.get_context()
        try:
            raw_cookie = context.request.cookies[context.config.web.session_key_name]
            return urllib_parse.unquote(raw_cookie)
        except KeyError:
            return None
项目:aspen.py    作者:AspenWeb    | 项目源码 | 文件源码
def path_decode(bs):
    return _decode(unquote(bs.encode('ascii') if PY2 else bs))
项目:greenswitch    作者:EvoluxBR    | 项目源码 | 文件源码
def parse_data(self, data):
        headers = {}
        data = unquote(data)
        data = data.strip().splitlines()
        last_key = None
        value = ''
        for line in data:
            if ': ' in line:
                key, value = line.split(': ', 1)
                last_key = key
            else:
                key = last_key
                value += '\n' + line
            headers[key.strip()] = value.strip()
        self.headers = headers
项目:deb-python-pysaml2    作者:openstack    | 项目源码 | 文件源码
def decode(txt):
    """Turns a coded string by code() into a NameID class instance.

    :param txt: The coded string
    """
    _nid = NameID()
    for part in txt.split(","):
        if part.find("=") != -1:
            i, val = part.split("=")
            try:
                setattr(_nid, ATTR[int(i)], unquote(val))
            except:
                pass
    return _nid
项目:pysnow    作者:rbw0    | 项目源码 | 文件源码
def qs_as_dict(url):
    qs_str = urlparse(url).query
    return dict((x[0], unquote(x[1])) for x in [x.split("=") for x in qs_str.split("&")])
项目:airtable-python-wrapper    作者:gtalarico    | 项目源码 | 文件源码
def _process_response(self, response):
        # Removed due to IronPython Bug
        # https://github.com/IronLanguages/ironpython2/issues/242
        # if response.status_code == 422:
        #     raise HTTPError('Unprocessable Entity for url(
        #                        decoded): {}'.format(unquote(response.url)))
        response.raise_for_status()
        return response.json()
项目:jabbapylib3    作者:jabbalaci    | 项目源码 | 文件源码
def extract(test=False):
    text = requests.get(URL).text
    text = text.split('g_img={url:')[1]
    text = text.split(',')[0].replace("'", "").replace('"', '').replace("\\", "")
    img_url = urljoin(URL, text).replace(" ", "")
    fname = img_url.split('/')[-1]
    fname = unquote(fname).split('/')[-1]
    if not test:
        print('# img_url:', img_url)
        print('# fname:', fname)
    save_name = '{date}-{fname}'.format(date=get_date_from_year_to_day(),
                                        fname=fname)
    return (img_url, save_name)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def close(self, add_to_catalog=True):
                """Closes an open transaction, returning the published FMRI for
                the corresponding package, and its current state in the catalog.
                """
                def split_trans_id(tid):
                        m = re.match("(\d+)_(.*)", tid)
                        return m.group(1), unquote(m.group(2))

                trans_id = self.get_basename()
                pkg_fmri = split_trans_id(trans_id)[1]

                # set package state to SUBMITTED
                pkg_state = "SUBMITTED"

                # set state to PUBLISHED
                if self.append_trans:
                        pkg_fmri, pkg_state = self.accept_append(add_to_catalog)
                else:
                        pkg_fmri, pkg_state = self.accept_publish(
                            add_to_catalog)

                # Discard the in-flight transaction data.
                try:
                        shutil.rmtree(self.dir)
                except EnvironmentError as e:
                        # Ensure that the error goes to stderr, and then drive
                        # on as the actual package was published.
                        misc.emsg(e)

                return (pkg_fmri, pkg_state)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __fmri_from_path(pkgpath, ver):
                """Helper method that takes the full path to the package
                directory and the name of the manifest file, and returns an FMRI
                constructed from the information in those components."""

                v = pkg.version.Version(unquote(ver), None)
                f = fmri.PkgFmri(unquote(os.path.basename(pkgpath)))
                f.version = v
                return f
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __parse_v_1(line, pub, v):
                """This function parses the string returned by a version 1
                search server and puts it into the expected format of
                (query_number, publisher, (version, return_type, (results)))
                If it receives a line it can't parse, it raises a
                ServerReturnError."""

                fields = line.split(None, 2)
                if len(fields) != 3:
                        raise apx.ServerReturnError(line)
                try:
                        return_type = int(fields[1])
                        query_num = int(fields[0])
                except ValueError:
                        raise apx.ServerReturnError(line)
                if return_type == Query.RETURN_ACTIONS:
                        subfields = fields[2].split(None, 2)
                        pfmri = fmri.PkgFmri(subfields[0])
                        return pfmri, (query_num, pub, (v, return_type,
                            (pfmri, unquote(subfields[1]),
                            subfields[2])))
                elif return_type == Query.RETURN_PACKAGES:
                        pfmri = fmri.PkgFmri(fields[2])
                        return pfmri, (query_num, pub, (v, return_type, pfmri))
                else:
                        raise apx.ServerReturnError(line)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def read_dict_file(self):
                """Reads in a dictionary stored in with an entity
                and its number on each line.
                """
                self._dict.clear()
                for line in self._file_handle:
                        token, offset = line.split(" ")
                        if token[0] == "1":
                                token = unquote(token[1:])
                        else:
                                token = token[1:]
                        offset = int(offset)
                        self._dict[token] = offset
                IndexStoreBase.read_dict_file(self)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __init__(self, origin_url, create_repo=False, pkg_name=None,
            repo_props=EmptyDict, trans_id=None, xport=None, pub=None,
            progtrack=None):

                scheme, netloc, path, params, query, fragment = \
                    urlparse(origin_url, "http", allow_fragments=0)

                self.pkg_name = pkg_name
                self.trans_id = trans_id
                self.scheme = scheme
                if scheme == "file":
                        path = unquote(path)
                self.path = path
                self.progtrack = progtrack
                self.transport = xport
                self.publisher = pub
                self.__local = False
                self.__uploaded = 0
                self.__uploads = {}
                self.__transactions = {}
                self._tmpdir = None
                self._append_mode = False
                self._upload_mode = None

                if scheme == "file":
                        self.__local = True
                        self.create_file_repo(repo_props=repo_props,
                            create_repo=create_repo)
                elif scheme != "file" and create_repo:
                        raise UnsupportedRepoTypeOperationError("create_repo",
                            type=scheme)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def _build_version(vers):
                """ Private method for building versions from a string. """

                return pkg.version.Version(unquote(vers), None)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def count_manifest(mg, d):
        try:
                manifest_by_date[d.date().isoformat()] += 1
        except KeyError:
                manifest_by_date[d.date().isoformat()] = 1
        try:
                manifest_by_ip[mg["ip"]] += 1
        except KeyError:
                manifest_by_ip[mg["ip"]] = 1

        pm = pkg_pat.search(mg["uri"])
        if pm != None and mg["response"] == "200":
                pg = pm.groupdict()

                try:
                        manifest_by_pkg[unquote(pg["stem"])] += 1
                except KeyError:
                        manifest_by_pkg[unquote(pg["stem"])] = 1

                try:
                        manifest_by_ver_pkg[unquote(pg["stem"] + "@" + pg["version"])] += 1
                except KeyError:
                        manifest_by_ver_pkg[unquote(pg["stem"] + "@" + pg["version"])] = 1

        agent = pkg_agent_pat.search(mg["agent"])
        if agent == None:
                return

        ag = agent.groupdict()
        try:
                manifest_by_arch[ag["arch"]] += 1
        except KeyError:
                manifest_by_arch[ag["arch"]] = 1
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def parse_fs_url(fs_url):
    """Parse a Filesystem URL and return a `ParseResult`.

    Arguments:
        fs_url (str): A filesystem URL.

    Returns:
        ~fs.opener.parse.ParseResult: a parse result instance.

    Raises:
        ~fs.errors.ParseError: if the FS URL is not valid.

    """
    match = _RE_FS_URL.match(fs_url)
    if match is None:
        raise ParseError('{!r} is not a fs2 url'.format(fs_url))

    fs_name, credentials, url1, url2, path = match.groups()
    if credentials:
        username, _, password = credentials.partition(':')
        username = unquote(username)
        password = unquote(password)
        url = url1
    else:
        username = None
        password = None
        url = url2
    url, has_qs, _params = url.partition('?')
    resource = unquote(url)
    if has_qs:
        params = parse_qs(_params, keep_blank_values=True)
        params = {k:v[0] for k, v in params.items()}
    else:
        params = {}
    return ParseResult(
        fs_name,
        username,
        password,
        resource,
        params,
        path
    )
项目:dnsknife    作者:Gandi    | 项目源码 | 文件源码
def trusted_params(uri):
    """Walk through an URI, lookup the set of DNSKEYs for the origin
    third party provider domain, validate URI signature against the
    found keys. If valid, returns the trustable URI - otherwise raise
    an exception.

    /!\ User MUST use the returned URI, as signature validation is
    only done on everything *before* the URI.
    """
    # Truncate the signature= part
    try:
        uri, sig = uri.split('&signature=')
        sig = parse.unquote(sig)
    except ValueError:
        raise exceptions.IncompleteURI

    pr = parse.urlparse(uri)
    if not pr.query:
        raise exceptions.IncompleteURI

    expires = _qsl_get_one(pr.query, 'expires')
    if (datetime.datetime.utcnow() >
        datetime.datetime.strptime(expires, '%Y%m%d%H%M%S')):
        raise exceptions.Expired

    source = _qsl_get_one(pr.query, 'source')

    txtl = Checker(source, dnssec=True).txt('_tpda')
    if not txtl:
        raise exceptions.NoTPDA

    keys = [RSA.importKey(base64.b64decode(txt.encode('ascii')))
            for txt in txtl.split('\n')]

    digest = SHA256.new()
    digest.update(uri.encode('ascii'))

    for key in keys:
        signer = PKCS1_v1_5.new(key)
        if signer.verify(digest, base64.b64decode(sig)):
            return params(uri)

    raise exceptions.NoSignatureMatch
项目:github2gitlab    作者:chauffer    | 项目源码 | 文件源码
def sync(self):
        pull_f2merge_f = {
            'state': 'state',
            'body': 'description',
            'title': 'title',
        }
        for number in sorted(self.pull_requests.keys()):
            pull = self.pull_requests[number]
            merge = None
            if number in self.pull2merge:
                merge = self.pull2merge[number]
            else:
                source_branch = 'pull/' + number + '/head'
                target_branch = pull['base']['ref']
                if (self.rev_parse(pull, source_branch) and
                        self.rev_parse(pull, target_branch)):
                    data = {'title': pull['title'],
                            'source_branch': source_branch,
                            'target_branch': target_branch}
                    if pull['body']:
                        data['description'] = pull['body'][:DESCRIPTION_MAX]
                    merge = self.create_merge_request(data)

            if merge:
                updates = {}
                for (pull_field, merge_field) in six.iteritems(pull_f2merge_f):
                    if not self.field_equal(pull,
                                            pull_field,
                                            pull[pull_field],
                                            merge,
                                            merge_field,
                                            merge[merge_field]):
                        (key, value) = self.field_update(pull,
                                                         pull_field,
                                                         pull[pull_field],
                                                         merge,
                                                         merge_field,
                                                         merge[merge_field])
                        updates[key] = value
                if updates:
                    self.update_merge_request(merge, updates)
                else:
                    log.debug("https://github.com/" +
                              self.github['repo'] + "/" +
                              "pull/" + number + " == " +
                              self.gitlab['host'] + "/" +
                              parse.unquote(self.gitlab['repo']) + "/" +
                              "merge_requests/" + str(merge['iid']))
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def reopen(self, rstore, trans_dir):
                """The reopen() method is invoked by the repository as needed to
                load Transaction data."""

                self.rstore = rstore
                try:
                        open_time_str, self.esc_pkg_name = \
                            os.path.basename(trans_dir).split("_", 1)
                except ValueError:
                        raise TransactionUnknownIDError(os.path.basename(
                            trans_dir))

                self.open_time = \
                    datetime.datetime.utcfromtimestamp(int(open_time_str))
                self.pkg_name = unquote(self.esc_pkg_name)

                # This conversion should always work, because we encoded the
                # client release on the initial open of the transaction.
                self.fmri = fmri.PkgFmri(self.pkg_name, None)

                self.dir = os.path.join(rstore.trans_root, self.get_basename())

                if not os.path.exists(self.dir):
                        raise TransactionUnknownIDError(self.get_basename())

                tmode = "rb"
                if not rstore.read_only:
                        # The mode is important especially when dealing with
                        # NFS because of problems with opening a file as
                        # read/write or readonly multiple times.
                        tmode += "+"

                # Find out if the package is renamed or obsolete.
                try:
                        tfpath = os.path.join(self.dir, "manifest")
                        tfile = open(tfpath, tmode)
                except IOError as e:
                        if e.errno == errno.ENOENT:
                                return
                        raise
                m = pkg.manifest.Manifest()
                # If tfile is a StreamingFileObj obj, its read()
                # methods will return bytes. We need str for
                # manifest and here's an earlisest point that
                # we can convert it to str.
                m.set_content(content=misc.force_str(tfile.read()))
                tfile.close()
                if os.path.exists(os.path.join(self.dir, "append")):
                        self.append_trans = True
                self.obsolete = m.getbool("pkg.obsolete", "false")
                self.renamed = m.getbool("pkg.renamed", "false")
                self.types_found = set((
                    action.name for action in m.gen_actions()
                ))
                self.has_reqdeps = any(
                    a.attrs["type"] == "require"
                    for a in m.gen_actions_by_type("depend")
                )
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def parse_main_dict_line(line):
                """Parses one line of a main dictionary file.
                Changes to this function must be paired with changes to
                write_main_dict_line below.

                This should produce the same data structure that
                _write_main_dict_line in indexer.py creates to write out each
                line.
                """

                split_chars = IndexStoreMainDict.sep_chars
                line = line.rstrip('\n')
                tmp = line.split(split_chars[0])
                tok = unquote(tmp[0])
                atl = tmp[1:]
                res = []
                for ati in atl:
                        tmp = ati.split(split_chars[1])
                        action_type = tmp[0]
                        stl = tmp[1:]
                        at_res = []
                        for sti in stl:
                                tmp = sti.split(split_chars[2])
                                subtype = tmp[0]
                                fvl = tmp[1:]
                                st_res = []
                                for fvi in fvl:
                                        tmp = fvi.split(split_chars[3])
                                        full_value = unquote(tmp[0])
                                        pfl = tmp[1:]
                                        fv_res = []
                                        for pfi in pfl:
                                                tmp = pfi.split(split_chars[4])
                                                pfmri_index = int(tmp[0])
                                                offsets = [
                                                    int(t) for t in tmp[1:]
                                                ]
                                                fv_res.append(
                                                    (pfmri_index, offsets))
                                        st_res.append((full_value, fv_res))
                                at_res.append((subtype, st_res))
                        res.append((action_type, at_res))
                return tok, res