Python urlparse 模块,parse_qs() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用urlparse.parse_qs()

项目:nojs    作者:chrisdickinson    | 项目源码 | 文件源码
def do_GET(self):
    parsed_path = urlparse.urlparse(self.path)
    action = parsed_path.path
    params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1)
    logging.info('Action for GET method is: %s.', action)
    for param in params:
      logging.info('%s=%s', param, params[param][0])
    if action == '/kill':
      self._KillTestServer(params)
    elif action == '/ping':
      # The ping handler is used to check whether the spawner server is ready
      # to serve the requests. We don't need to test the status of the test
      # server when handling ping request.
      self._SendResponse(200, 'OK', {}, 'ready')
      logging.info('Handled ping request and sent response.')
    else:
      self._SendResponse(400, 'Unknown request', {}, '')
      logging.info('Encounter unknown request: %s.', action)
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def process_POST_request(request):
    dict_ = urlparse.parse_qs(request.text)
    def htmlify(thing):
        try:
            html = dict_[thing][0]
        except KeyError as e:
            html = ''
        return '<html>' + html + '</html>'
    uri = dict_['uri'][0]
    head = htmlify('head')
    body = htmlify('body')
    try:
        text = dict_['data'][0]
    except KeyError as e:
        text = ''

    headsoup = BeautifulSoup(head, 'lxml')
    bodysoup = BeautifulSoup(body, 'lxml')

    target_uri = getUri(uri, headsoup, bodysoup)
    doi = getDoi(headsoup, bodysoup)
    return target_uri, doi, head, body, text
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])

            query = urlparse.urljoin(self.base_link, self.search_link)
            query = query % urllib.quote_plus(data['tvshowtitle'])

            t = cleantitle.get(data['tvshowtitle'])

            r = client.request(query)

            r = client.parseDOM(r, 'div', attrs = {'class': 'thumb'})
            r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a', ret='title'), re.findall('(\d{4})', i)) for i in r]
            r = [(i[0][0], i[1][0], i[2][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0 and len(i[2]) > 0]

            url = [i[0] for i in r if t in cleantitle.get(i[1]) and ('Season %s' % season) in i[1]][0]
            url += '?episode=%01d' % int(episode)

            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])

            year = re.findall('(\d{4})', premiered)[0]
            season = '%01d' % int(season) ; episode = '%01d' % int(episode)
            tvshowtitle = '%s %s: Season %s' % (data['tvshowtitle'], year, season)

            url = cache.get(self.pidtv_tvcache, 120, tvshowtitle)

            if url == None: raise Exception()

            url += '?episode=%01d' % int(episode)
            url = url.encode('utf-8')
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            headers = eval(data['headers'])
            aliases = eval(data['aliases'])
            title = data['tvshowtitle'] if 'tvshowtitle' in data else data['title']
            title = cleantitle.getsearch(title)
            query = self.search_link % (urllib.quote_plus(title))
            query = urlparse.urljoin(self.base_link, query)
            r = client.request(query, headers=headers, timeout='30', mobile=True)
            match = re.compile('alias=(.+?)\'">(.+?)</a>').findall(r)
            r = [(i[0], re.findall('(.+?)\s+-\s+Season\s+(\d+)', i[1])) for i in match]
            r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if len(i[1]) > 0]
            r = [i[0] for i in r if self.matchAlias(i[1], aliases) and int(season) == int(i[2])][0]
            url = {'type': 'tvshow', 'id': r, 'episode': episode, 'season': season, 'headers': headers}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])

            year = re.findall('(\d{4})', premiered)[0]
            if int(year) >= 2016: raise Exception()

            url = re.sub('[^A-Za-z0-9]', '-', data['tvshowtitle']).lower()
            url = self.tvsearch_link % (url, data['year'], '%01d' % int(season), '%01d' % int(episode))

            r = urlparse.urljoin(self.base_link, url)
            r = client.request(r, output='geturl')
            if not data['year'] in r: raise Exception()

            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def resolve(self, url):
        try:
            b = urlparse.urlparse(url).netloc
            b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]

            if not b in base64.b64decode(self.b_link): return url

            u, p, h = url.split('|')
            r = urlparse.parse_qs(h)['Referer'][0]
            #u += '&app_id=Exodus'

            c = self.request(r, output='cookie', close=False)
            result = self.request(u, post=p, referer=r, cookie=c)

            url = result.split('url=')
            url = [urllib.unquote_plus(i.strip()) for i in url]
            url = [i for i in url if i.startswith('http')]
            url = url[-1]

            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            tvshowtitle = data['tvshowtitle']
            localtvshowtitle = data['localtvshowtitle']
            aliases = source_utils.aliases_to_array(eval(data['aliases']))
            year = data['year']

            episode = tvmaze.tvMaze().episodeAbsoluteNumber(tvdb, int(season), int(episode))

            url = self.__search([localtvshowtitle] + aliases, year, episode)
            if not url and tvshowtitle != localtvshowtitle:
                url = self.__search([tvshowtitle] + aliases, year, episode)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            data.update({'year': re.findall('(\d{4})', premiered)[0]})
            tvshowtitle = data['tvshowtitle']
            localtvshowtitle = data['localtvshowtitle']
            aliases = source_utils.aliases_to_array(eval(data['aliases']))

            url = self.__search([localtvshowtitle] + aliases, 'tv', data['year'], season, episode)
            if not url and tvshowtitle != localtvshowtitle: url = self.__search([tvshowtitle] + aliases, 'tv', data['year'], season, episode)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def resolve(self, url):
        try:
            url = urlparse.urljoin(self.base_link, url)

            r = client.request(url, referer=self.base_link)
            r = json.loads(r)['Stream']
            r = [(dom_parser.parse_dom(r, 'a', req='href'), dom_parser.parse_dom(r, 'iframe', req='src'))]
            r = [i[0][0].attrs['href'] if i[0] else i[1][0].attrs['src'] for i in r if i[0] or i[1]][0]

            if not r.startswith('http'):
                r = urlparse.parse_qs(r)
                r = [r[i][0] if r[i] and r[i][0].startswith('http') else (i, '') for i in r][0]

            return r
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            title = data['localtvshowtitle']
            title += ' S%02dE%02d' % (int(season), int(episode))
            aliases = source_utils.aliases_to_array(eval(data['aliases']))
            aliases = [i + ' S%02dE%02d' % (int(season), int(episode)) for i in aliases]

            url = self.__search([title] + aliases)
            if not url and data['tvshowtitle'] != data['localtvshowtitle']:
                title = data['tvshowtitle']
                title += ' S%02dE%02d' % (int(season), int(episode))
                url = self.__search([title] + aliases)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            tvshowtitle = data['tvshowtitle']
            localtvshowtitle = data['localtvshowtitle']
            aliases = source_utils.aliases_to_array(eval(data['aliases']))

            year = re.findall('(\d{4})', premiered)
            year = year[0] if year else data['year']

            url = self.__search([localtvshowtitle] + aliases, year, season, episode)
            if not url and tvshowtitle != localtvshowtitle:
                url = self.__search([tvshowtitle] + aliases, year, season, episode)
            return url
        except:
            return
项目:quartz-browser    作者:ksharindam    | 项目源码 | 文件源码
def downloadVideo(self):
        url = unicode(self.tabWidget.currentWidget().url().toString())
        # For youtube videos
        if validYoutubeUrl(url):
            vid_id = parse_qs(urlparse(url).query)['v'][0]
            url = 'https://m.youtube.com/watch?v=' + vid_id
            yt = YouTube(url)    # Use PyTube module for restricted videos
            videos = yt.get_videos()
            dialog = youtube_dialog.YoutubeDialog(videos, self)
            if dialog.exec_() == 1 :
                index = abs(dialog.buttonGroup.checkedId())-2
                vid = videos[index]
                reply = networkmanager.get( QNetworkRequest(QUrl.fromUserInput(vid.url)) )
                self.handleUnsupportedContent(reply, vid.filename + '.' + vid.extension)
            return
        # For embeded HTML5 videos
        request = QNetworkRequest(self.video_URL)
        request.setRawHeader('Referer', self.video_page_url)
        reply = networkmanager.get(request)
        self.handleUnsupportedContent(reply)
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def query_params(self, value=None):
        """
        Return or set a dictionary of query params

        :param dict value: new dictionary of values
        """
        if value is not None:
            return URL._mutate(self, query=unicode_urlencode(value, doseq=True))
        query = '' if self._tuple.query is None else self._tuple.query

        # In Python 2.6, urlparse needs a bytestring so we encode and then
        # decode the result.
        if not six.PY3:
            result = parse_qs(to_utf8(query), True)
            return dict_to_unicode(result)

        return parse_qs(query, True)
项目:PythonStudyCode    作者:TongTongX    | 项目源码 | 文件源码
def test_auth_url(self):
        perms = ['email', 'birthday']
        redirect_url = 'https://localhost/facebook/callback/'

        expected_url = 'https://www.facebook.com/dialog/oauth?' + urlencode(
            dict(client_id=self.app_id,
                 redirect_uri=redirect_url,
                 scope=','.join(perms)))
        actual_url = facebook.auth_url(self.app_id, redirect_url, perms=perms)

        # Since the order of the query string parameters might be
        # different in each URL, we cannot just compare them to each
        # other.
        expected_url_result = urlparse(expected_url)
        actual_url_result = urlparse(actual_url)
        expected_query = parse_qs(expected_url_result.query)
        actual_query = parse_qs(actual_url_result.query)

        self.assertEqual(actual_url_result.scheme, expected_url_result.scheme)
        self.assertEqual(actual_url_result.netloc, expected_url_result.netloc)
        self.assertEqual(actual_url_result.path, expected_url_result.path)
        self.assertEqual(actual_url_result.params, expected_url_result.params)
        self.assertEqual(actual_query, expected_query)
项目:OpenBAC    作者:PrometheanInfoSec    | 项目源码 | 文件源码
def parse(self, path):
        parsed = parse_qs(path[2:])
        if "au" in parsed.keys():
            for i in parsed:
                parsed[i] = " ".join(parsed[i])
            print parsed
            return self.process(parsed)
        if "mk" in parsed.keys():
            for i in parsed:
                parsed[i] = " ".join(parsed[i])
            print parsed
            return self.gen(parsed)
        if "up" in parsed.keys() and "passwd" in parsed.keys():
            for i in parsed:
                parsed[i] = " ".join(parsed[i])
            print parsed
            return self.unpack(parsed)
        return "Not able to parse input"
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
项目:optimove    作者:nicolasramy    | 项目源码 | 文件源码
def get_customers_by_action_callback(request):
    params = parse_qs(urlparse(request.url).query)
    if params['RecipientGroupID'][0] == '1' and params['ActionID'][0] == '2' and params['Date'][0] == '2015-06-24':
        if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params:
            if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',':
                resp_body = [
                    {'CustomerID': '231342', 'CustomerAttribute': 'BuddyZZ,UK'},
                    {'CustomerID': '943157', 'CustomerAttribute': 'Pax65,DE'}
                ]

            else:
                return 404, HEADERS['text'], 'Not Found'

        else:
            resp_body = [
                {'CustomerID': '231342'},
                {'CustomerID': '943157'}
            ]

        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
项目:optimove    作者:nicolasramy    | 项目源码 | 文件源码
def get_customer_actions_by_target_group_callback(request):
    params = parse_qs(urlparse(request.url).query)
    if params['TargetGroupID'][0] == '2' and params['Date'][0] == '2015-12-24':
        if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params:
            if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',':
                resp_body = [
                    {'CustomerID': 'A1342', 'ActionID': 49, 'ChannelID': 6, 'CustomerAttribute': 'BuddyZZ,UK'},
                    {'CustomerID': 'G4650', 'ActionID': 49, 'ChannelID': 6, 'CustomerAttribute': 'Mighty6,ES'}
                ]

            else:
                return 404, HEADERS['text'], 'Not Found'

        else:
            resp_body = [
                {'CustomerID': 'A1342', 'ActionID': 49, 'ChannelID': 6},
                {'CustomerID': 'G4650', 'ActionID': 49, 'ChannelID': 6}
            ]

        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
项目:optimove    作者:nicolasramy    | 项目源码 | 文件源码
def get_customer_one_time_actions_by_date_callback(request):
    params = parse_qs(urlparse(request.url).query)
    if params['Date'][0] == '2015-06-24':
        if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params:
            if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',':
                resp_body = [
                    {'CustomerID': '8D871', 'ActionID': 19, 'ChannelID': 3, 'CustomerAttribute': 'Yo999,UA'},
                    {'CustomerID': '8U76T', 'ActionID': 19, 'ChannelID': 3, 'CustomerAttribute': 'Neto2,TR'}
                ]

            else:
                return 404, HEADERS['text'], 'Not Found'

        else:
            resp_body = [
                {'CustomerID': '8D871', 'ActionID': 19, 'ChannelID': 3},
                {'CustomerID': '8U76T', 'ActionID': 19, 'ChannelID': 3}
            ]

        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
项目:optimove    作者:nicolasramy    | 项目源码 | 文件源码
def get_target_group_changers_callback(request):
    params = parse_qs(urlparse(request.url).query)
    if params['StartDate'][0] == '2015-09-01' and params['EndDate'][0] == '2015-09-30':
        if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params:
            if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',':
                resp_body = [
                    {'CustomerID': '231342', 'InitialTargetGroupID': 4, 'FinalTargetGroupID': 12,
                     'CustomerAttribute': 'BuddyZZ,UK'},
                    {'CustomerID': '931342', 'InitialTargetGroupID': -1, 'FinalTargetGroupID': 8,
                     'CustomerAttribute': 'Pax65,DE'}
                ]

            else:
                return 404, HEADERS['text'], 'Not Found'

        else:
            resp_body = [
                {'CustomerID': '231342', 'InitialTargetGroupID': 4, 'FinalTargetGroupID': 12},
                {'CustomerID': '931342', 'InitialTargetGroupID': -1, 'FinalTargetGroupID': 8}
            ]

        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
项目:optimove    作者:nicolasramy    | 项目源码 | 文件源码
def get_customer_send_details_by_campaign_callback(request):
    params = parse_qs(urlparse(request.url).query)
    if params['CampaignID'][0] == '65874':
        if 'IncludeTemplateIDs' in params:
            if params['IncludeTemplateIDs'][0] == 'True':
                resp_body = [
                    {'CustomerID': '231342', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 10:30:00',
                     'SendID': 'HG65D', 'TemplateID': 12},
                    {'CustomerID': '917251', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 11:45:00',
                     'SendID': 'HG65E', 'TemplateID': 7}
                ]
                return 200, HEADERS['json'], json.dumps(resp_body)

        resp_body = [
            {'CustomerID': '231342', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 10:30:00', 'SendID': 'HG65D'},
            {'CustomerID': '917251', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 11:45:00', 'SendID': 'HG65E'}
        ]
        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
项目:optimove    作者:nicolasramy    | 项目源码 | 文件源码
def get_executed_campaign_details_callback(request):
    params = parse_qs(urlparse(request.url).query)

    if params['Date'][0] == '2015-06-19':
        resp_body = [
            {'CampaignID': 221, 'TargetGroupID': 15, 'CampaignType': 'Test/Control', 'Duration': 7,
             'LeadTime': 3, 'Notes': '', 'IsMultiChannel': 'false', 'IsRecurrence': 'false',
             'Status': 'Successful', 'Error': ''},
            {'CampaignID': 81, 'TargetGroupID': 40, 'CampaignType': 'Test/Control', 'Duration': 10,
             'LeadTime': 0, 'Notes': '', 'IsMultiChannel': 'true', 'IsRecurrence': 'true',
             'Status': 'Failed', 'Error': 'ESP unavailable'}
        ]
        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
项目:optimove    作者:nicolasramy    | 项目源码 | 文件源码
def get_microsegment_changers_with_attributes_callback(request):
    params = parse_qs(urlparse(request.url).query)

    if params['StartDate'][0] == '2016-01-01' and params['EndDate'][0] == '2016-01-31'\
            and params['CustomerAttributes'][0] == 'Alias;Country'\
            and params['CustomerAttributesDelimiter'][0] == ',':
        resp_body = [
            {'CustomerID': '231342', 'InitialMicrosegmentID': 4, 'FinalMicrosegmentID': 12,
             'CustomerAttributes': 'BuddyZZ,UK'},
            {'CustomerID': '231342', 'InitialMicrosegmentID': 3, 'FinalMicrosegmentID': 67,
             'CustomerAttributes': 'Player99,US'}
        ]
        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def __init__(self, URL, assignment_id='', worker_id='', participant_id=''):
        logger.info("Creating bot with URL: %s." % URL)
        self.URL = URL

        parts = urlparse(URL)
        query = parse_qs(parts.query)
        if not assignment_id:
            assignment_id = query.get('assignment_id', [''])[0]
        if not participant_id:
            participant_id = query.get('participant_id', [''])[0]
        self.assignment_id = assignment_id
        if not worker_id:
            worker_id = query.get('worker_id', [''])[0]
        self.participant_id = participant_id
        self.worker_id = worker_id
        self.unique_id = worker_id + ':' + assignment_id
项目:scrapyProject    作者:bedcode    | 项目源码 | 文件源码
def parse_search_page(self, response):
        # handle current page
        for item in self.parse_tweets_block(response.body):
            yield item

        # get next page
        tmp = self.reScrollCursor.search(response.body)
        if tmp:
            query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0]
            scroll_cursor = tmp.group(1)
            url = 'https://twitter.com/i/search/timeline?q=%s&' \
                  'include_available_features=1&include_entities=1&max_position=%s' % \
                  (urllib.quote_plus(query), scroll_cursor)
            yield http.Request(url, callback=self.parse_more_page)

        # TODO: # get refresh page
        # tmp = self.reRefreshCursor.search(response.body)
        # if tmp:
        #     query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0]
        #     refresh_cursor=tmp.group(1)
项目:WebHackSHL    作者:SecHackLabs    | 项目源码 | 文件源码
def _parse_get(self, all_injectable = False):

        params_dict_list = urlparse.parse_qs(urlparse.urlsplit(self.url).query)

        for param, value_list in params_dict_list.items():

            self.get_params[param] = value_list

            if self.tag in param:
                self.injs.append({
                    'field' : 'GET',
                    'part' : 'param',
                    'param': param
                })

            for idx, value in enumerate(value_list):
                if self.tag in value or all_injectable:
                    self.injs.append({
                        'field' : 'GET',
                        'part': 'value',
                        'param': param,
                        'value' : value,
                        'idx' : idx
                    })
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def extract_video_id(url):
    """ Extract the video id from a url, return video id as str. """
    idregx = re.compile(r'[\w-]{11}$')
    url = str(url)

    if idregx.match(url):
        return url # ID of video

    if '://' not in url:
        url = '//' + url
    parsedurl = urlparse(url)
    if parsedurl.netloc in ('youtube.com', 'www.youtube.com', 'm.youtube.com', 'gaming.youtube.com'):
        query = parse_qs(parsedurl.query)
        if 'v' in query and idregx.match(query['v'][0]):
            return query['v'][0]
    elif parsedurl.netloc in ('youtu.be', 'www.youtu.be'):
        vidid = parsedurl.path.split('/')[-1] if parsedurl.path else ''
        if idregx.match(vidid):
            return vidid

    err = "Need 11 character video id or the URL of the video. Got %s"
    raise ValueError(err % url)
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def parseqs(data):
    """ parse_qs, return unicode. """
    if type(data) == uni:
        return parse_qs(data)

    elif pyver == 3:
        data = data.decode("utf8")
        data = parse_qs(data)

    else:
        data = parse_qs(data)
        out = {}

        for k, v in data.items():
            k = k.decode("utf8")
            out[k] = [x.decode("utf8") for x in v]
            data = out

    return data
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def extract_playlist_id(playlist_url):
    # Normal playlists start with PL, Mixes start with RD + first video ID,
    # Liked videos start with LL, Uploads start with UU,
    # Favorites lists start with FL
    idregx = re.compile(r'((?:RD|PL|LL|UU|FL)[-_0-9a-zA-Z]+)$')

    playlist_id = None
    if idregx.match(playlist_url):
        playlist_id = playlist_url # ID of video

    if '://' not in playlist_url:
        playlist_url = '//' + playlist_url
    parsedurl = urlparse(playlist_url)
    if parsedurl.netloc in ('youtube.com', 'www.youtube.com'):
        query = parse_qs(parsedurl.query)
        if 'list' in query and idregx.match(query['list'][0]):
            playlist_id = query['list'][0]

    return playlist_id
项目:uicourses_v2    作者:sumerinlan    | 项目源码 | 文件源码
def do_POST(self):
        # http://stackoverflow.com/questions/4233218/python-basehttprequesthandler-post-variables
        ctype, pdict = cgi.parse_header(self.headers['content-type'])
        if ctype == 'multipart/form-data':
            postvars = cgi.parse_multipart(self.rfile, pdict)
        elif ctype == 'application/x-www-form-urlencoded':
            length = int(self.headers['content-length'])
            postvars = cgi.parse_qs(self.rfile.read(length), keep_blank_values=1)
        else:
            postvars = {}
        # print(postvars)
        if 'Username' not in list(postvars.keys()) \
                or 'Password' not in list(postvars.keys()):
            log('E', 'vali.', 'No credentials.')
            self.exit_on_error('No credentials.')
            return
        if not validate_id(postvars['Username'][0], postvars['Password'][0]):
            log('E', 'vali.', 'Wrong credentials.')
            self.exit_on_error('Wrong credentials.')
            return
        # print(postvars)
        try:
            dispatch(postvars)
            self.write_response({'Status': 'OK'})
        except:
            log('E', 'hand.', 'Handler throws an exception.')
            self.exit_on_error('Handler throws and exception.')
项目:devops-playground    作者:jerrywardlow    | 项目源码 | 文件源码
def content():
    parsed = urlparse.parse_qs(app.current_request.raw_body)
    return { 'states': parsed.get('states', []) }
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def parse_qs(qs, keep_blank_values=0, strict_parsing=0):
    """Parse a query given as a string argument."""
    warn("cgi.parse_qs is deprecated, use urlparse.parse_qs instead",
         PendingDeprecationWarning, 2)
    return urlparse.parse_qs(qs, keep_blank_values, strict_parsing)
项目:cbas    作者:ImmobilienScout24    | 项目源码 | 文件源码
def test_obtain_access_token(self, rmock):
        rmock.post(requests_mock.ANY, text='{"access_token": "ANY_TOKEN"}')
        cmock = Mock()
        cmock.username = "ANY_USERNAME"
        cmock.auth_host = "ANY_URL.example"
        result = obtain_access_token(cmock, 'ANY_PASSWORD')
        self.assertEqual('ANY_TOKEN', result)
        received_post_data = parse_qs(rmock.request_history[0].text)
        expected_post_data = {u'username': [u'ANY_USERNAME'],
                              u'password': [u'ANY_PASSWORD'],
                              u'client_id': [u'jumpauth'],
                              u'grant_type': [u'password']}
        self.assertEqual(received_post_data, expected_post_data)
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def parse(url):
    try: url = client.replaceHTMLCodes(url)
    except: pass
    try: url = urlparse.parse_qs(urlparse.urlparse(url).query)['u'][0]
    except: pass
    try: url = urlparse.parse_qs(urlparse.urlparse(url).query)['q'][0]
    except: pass
    return url
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])

            show = data['url'].split('/')[4]
            r = urlparse.urljoin(self.base_link, self.episode_link % (show, season, episode))
            url = re.findall('(?://.+?|)(/.+)', r)[0]
            url = client.replaceHTMLCodes(url)
            url = url.encode('utf-8')
            return url
        except:
            pass
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if url == None: return
            url = urlparse.parse_qs(url)
            url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url])
            url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if url == None: return
            url = urlparse.parse_qs(url)
            url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url])
            url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if url == None: return

            url = urlparse.parse_qs(url)
            url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url])
            url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if url == None: return
            url = urlparse.parse_qs(url)
            url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url])
            url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            return urllib.urlencode({'imdb': imdb, 'title': title, 'year': data['year'], 'season': season, 'episode': episode})
        except:
            return