Python urllib 模块,urlencode() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.urlencode()

项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def existing_tags(target_uri, h):#, doi, text, h):
    params = {
        'limit':200,
        'uri':target_uri,
        'group':h.group,
        'user':h.username,
    }
    query_url = h.query_url_template.format(query=urlencode(params, True))
    obj = h.authenticated_api_query(query_url)
    rows = obj['rows']
    tags = {}
    unresolved_exacts = {}
    for row in rows:
        for tag in row['tags']:
            if tag.startswith('RRID:'):
                tags[tag] = row['id']
            elif tag.startswith('PMID:'):
                tags[tag] = row['id']
            elif tag.startswith('DOI:'):
                tags[tag] = row['id']
            elif tag == 'RRIDCUR:Unresolved':
                unresolved_exacts[row['target'][0]['selector'][0]['exact']] = row['id']
    return tags, unresolved_exacts
项目:IotCenter    作者:panjanek    | 项目源码 | 文件源码
def get(self, file):
        if self.isAuthenticated():
            if file.find("..") > -1:
                return
            fullPath = os.path.join(self.path, file)
            if not os.path.exists(fullPath):
                self.set_status(404)
                self.write("404 Not Found")
                return
            ext = file.split('.')[-1]
            contentType = "application/octet-stream"
            if ext == "jpg" or ext== "jpeg" or ext == "bmp":
                contentType = "image/{0}".format(ext) 
            self.logger.debug("serving file {0}".format(fullPath))
            with open(fullPath, mode='rb') as file:
                fileData = file.read()
                self.write(fileData)
                self.set_header("Content-Type", contentType)
        else:       
            self.redirect("/login?"+urllib.urlencode({"returnUrl":self.request.uri}))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:DropboxConnect    作者:raguay    | 项目源码 | 文件源码
def _params_to_urlencoded(params):
    """
    Returns a application/x-www-form-urlencoded ``str`` representing the
    key/value pairs in ``params``.

    Keys are values are ``str()``'d before calling ``urllib.urlencode``, with
    the exception of unicode objects which are utf8-encoded.
    """
    def encode(o):
        if isinstance(o, six.binary_type):
            return o
        else:
            if isinstance(o, six.text_type):
                return o.encode('utf-8')
            else:
                return str(o).encode('utf-8')

    utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
    return url_encode(utf8_params)
项目:HeaTDV4A    作者:HeaTTheatR    | 项目源码 | 文件源码
def get_cookie(site, params, agent):
        """
        type params: list;
        param params: post-??????;

        """

        post = urllib.urlencode(params)
        cookie = None

        try:
            urlopener = urllib.FancyURLopener()
            urlopener.addheaders = [("User-agent", agent)]
            cookie = urlopener.open(site, post).info()["Set-Cookie"].split()
        except Exception:
            return False

        return cookie
项目:HeaTDV4A    作者:HeaTTheatR    | 项目源码 | 文件源码
def get_cookie(site, params, agent):
    """
    type site: str
    param site: ?????? ?? ???????? ????? ?? ????

    type params: list
    param params: post-??????

    type agent: str
    param agent: ?????????? ?? ??????

    """

    post = urllib.urlencode(params)

    try:
        urlopener = urllib.FancyURLopener()
        urlopener.addheaders = [("User-agent", agent)]
        cookie = urlopener.open(site, post).info()["Set-Cookie"].split()
    except Exception:
        return False

    return cookie
项目:BackManager    作者:linuxyan    | 项目源码 | 文件源码
def request_api(params_dict):
    httpClient = None
    try:
        params = urllib.urlencode(params_dict)
        headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"}
        httpClient = httplib.HTTPConnection(api_url, int(api_port), timeout=5)
        httpClient.request("POST", api_path, params, headers)
        response = httpClient.getresponse()
        status = response.status
        if str(status) != '200':
            data = {'status':500,'auth':'failed'}
        else:
            data = eval(response.read())
            data['status'] = status
        return data
    except Exception, e:
        print e
        data = {'status':500,'auth':'failed'}
        return data
    finally:
        if httpClient:
            httpClient.close()
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def request(self, endpoint, query = None):
        try:
            # Encode the queries, if there is any...
            if (query != None):
                query = '?' + urllib.urlencode(query)
            else:
                query = ''

            # Make the request
            request = self.api_url % (endpoint, query)

            # Send the request and get the response
            # Get the results from cache if available
            response = cache.get(client.request, 24, request)

            # Retrun the result as a dictionary
            return json.loads(response)
        except:
            pass

        return {}
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def googlepass(url):
    try:
        try:
            headers = dict(urlparse.parse_qsl(url.rsplit('|', 1)[1]))
        except:
            headers = None
        url = url.split('|')[0].replace('\\', '')
        url = client.request(url, headers=headers, output='geturl')
        if 'requiressl=yes' in url:
            url = url.replace('http://', 'https://')
        else:
            url = url.replace('https://', 'http://')
        if headers: url += '|%s' % urllib.urlencode(headers)
        return url
    except:
        return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def yandex(url):
    try:
        cookie = client.request(url, output='cookie')

        r = client.request(url, cookie=cookie)
        r = re.sub(r'[^\x00-\x7F]+', ' ', r)

        sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0]

        idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0]

        idclient = binascii.b2a_hex(os.urandom(16))

        post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring}
        post = urllib.urlencode(post)

        r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie)
        r = json.loads(r)

        url = r['models'][0]['data']['file']

        return url
    except:
        return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year):
        try:
            tvshowtitle = cleantitle.getsearch(tvshowtitle)
            p = urllib.urlencode({'action': 'ajaxy_sf', 'sf_value': tvshowtitle, 'search': 'false'})
            r = urlparse.urljoin(self.base_link, self.search_link)
            result = client.request(r, post=p, XHR=True)
            diziler = json.loads(result)['diziler'][0]['all']
            for i in diziler:
                t = cleantitle.get(i['post_title'])
                if tvshowtitle == t:
                    url = i['post_link']
                    url = url.split('/')[4]
                    url = url.encode('utf-8')
                    return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def movie(self, imdb, title, localtitle, aliases, year):
        try:
            url = urlparse.urljoin(self.base_link, self.search_link % urllib.quote_plus(title))
            headers = {'Referer': url, 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.81 Safari/537.36'}
            cookie = client.request(url, headers=headers, output='cookie')
            cookie += client.request(url, headers=headers, cookie=cookie, output='cookie')
            client.request(url, headers=headers, cookie=cookie)
            cookie += '; '+ headers['Cookie']
            headers = {'Referer': url, 'Cookie': cookie, 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.81 Safari/537.36'}
            r = client.request(url, headers=headers)
            r = client.parseDOM(r, 'div', attrs={'class': 'title'})
            r = [zip(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a')) for i in r]
            r = [i[0] for i in r]
            r = [i[0] for i in r if (cleantitle.get(title) in cleantitle.get(i[1]))][0]
            url = {'imdb': imdb, 'title': title, 'year': year, 'url': r, 'headers': headers}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            headers = eval(data['headers'])
            aliases = eval(data['aliases'])
            title = data['tvshowtitle'] if 'tvshowtitle' in data else data['title']
            title = cleantitle.getsearch(title)
            query = self.search_link % (urllib.quote_plus(title))
            query = urlparse.urljoin(self.base_link, query)
            r = client.request(query, headers=headers, timeout='30', mobile=True)
            match = re.compile('alias=(.+?)\'">(.+?)</a>').findall(r)
            r = [(i[0], re.findall('(.+?)\s+-\s+Season\s+(\d+)', i[1])) for i in match]
            r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if len(i[1]) > 0]
            r = [i[0] for i in r if self.matchAlias(i[1], aliases) and int(season) == int(i[2])][0]
            url = {'type': 'tvshow', 'id': r, 'episode': episode, 'season': season, 'headers': headers}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def __search(self, title, localtitle, year, content_type):
        try:
            t = cleantitle.get(title)
            tq = cleantitle.get(localtitle)
            y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']

            query = urlparse.urljoin(self.base_link, self.search_link)

            post = urllib.urlencode({'k': "%s"}) % tq
            r = client.request(query, post=post)
            r = json.loads(r)

            r = [i.get('result') for i in r if i.get('type', '').encode('utf-8') == content_type]
            r = [(i.get('url'), i.get('originalTitle'), i.get('title'), i.get('anneeProduction', 0), i.get('dateStart', 0)) for i in r]
            r = [(i[0], re.sub('<.+?>|</.+?>', '', i[1] if i[1] else ''), re.sub('<.+?>|</.+?>', '', i[2] if i[2] else ''), i[3] if i[3] else re.findall('(\d{4})', i[4])[0]) for i in r if i[3] or i[4]]
            r = sorted(r, key=lambda i: int(i[3]), reverse=True)  # with year > no year
            r = [i[0] for i in r if i[3] in y and (t.lower() == cleantitle.get(i[1].lower()) or tq.lower() == cleantitle.query(i[2].lower()))][0]

            url = re.findall('(?://.+?|)(/.+)', r)[0]
            url = client.replaceHTMLCodes(url)
            url = url.encode('utf-8')

            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            tvshowtitle = data['tvshowtitle']
            localtvshowtitle = data['localtvshowtitle']
            aliases = source_utils.aliases_to_array(eval(data['aliases']))
            year = data['year']

            url = self.__search([localtvshowtitle] + source_utils.aliases_to_array(aliases), year, season)
            if not url and tvshowtitle != localtvshowtitle: url = self.__search([tvshowtitle] + source_utils.aliases_to_array(aliases), year, season)

            if url: return urllib.urlencode({'url': source_utils.strip_domain(url), 'episode': episode})
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def __search(self, titles, year, content):
        try:
            t = [cleantitle.get(i) for i in set(titles) if i]

            c = client.request(urlparse.urljoin(self.base_link, self.year_link % int(year)), output='cookie')

            p = urllib.urlencode({'search': cleantitle.query(titles[0])})
            c = client.request(urlparse.urljoin(self.base_link, self.search_link), cookie=c, post=p, output='cookie')
            r = client.request(urlparse.urljoin(self.base_link, self.type_link % content), cookie=c, post=p)

            r = dom_parser.parse_dom(r, 'div', attrs={'id': 'content'})
            r = dom_parser.parse_dom(r, 'tr')
            r = [dom_parser.parse_dom(i, 'td') for i in r]
            r = [dom_parser.parse_dom(i, 'a', req='href') for i in r]
            r = [(i[0].attrs['href'], i[0].content, i[1].content) for i in r if i]
            r = [(i[0], i[1], re.findall('(.+?)\s<i>\((.+?)\)<', i[1]), i[2]) for i in r]
            r = [(i[0], i[2][0][0] if len(i[2]) > 0 else i[1], i[2][0][1] if len(i[2]) > 0 else '', i[3]) for i in r]
            r = [i[0] for i in r if (cleantitle.get(i[1]) in t or cleantitle.get(i[2]) in t) and i[3] == year][0]

            return source_utils.strip_domain(r)
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def resolve(self, url):
        try:
            h_url = []

            for id in url:
                query = urlparse.urljoin(self.base_link, self.stream_link % id)
                r = client.request(query, XHR=True, post=urllib.urlencode({'streamID': id}))
                r = json.loads(r)
                if 'error' in r and r['error'] == '0' and 'url' in r:
                    h_url.append(r['url'])

            h_url = h_url[0] if len(h_url) == 1 else 'stack://' + ' , '.join(h_url)

            return h_url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            tvshowtitle = data['tvshowtitle']
            localtvshowtitle = data['localtvshowtitle']
            aliases = source_utils.aliases_to_array(eval(data['aliases']))

            year = re.findall('(\d{4})', premiered)
            year = year[0] if year else data['year']

            url = self.__search([localtvshowtitle] + aliases, year, season)
            if not url and tvshowtitle != localtvshowtitle: url = self.__search([tvshowtitle] + aliases,year, season)
            if not url: return

            return urllib.urlencode({'url': url, 'episode': episode})
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def moonwalk(link, ref, season, episode):
    try:
        if season and episode:
            q = dict(urlparse.parse_qsl(urlparse.urlsplit(link).query))
            q.update({'season': season, 'episode': episode})
            q = (urllib.urlencode(q)).replace('%2C', ',')
            link = link.replace('?' + urlparse.urlparse(link).query, '') + '?' + q

        trans = __get_moonwalk_translators(link, ref)
        trans = trans if trans else [(link, '')]

        urls = []
        for i in trans:
            urls += __get_moonwalk(i[0], ref, info=i[1])
        return urls
    except:
        return []
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def build_url(self):
        """Depending on the service name, and the options we built the good url to request"""

        query_string = urllib.urlencode(self.options)

        self.url     = self.configuration.base_url + ('/' if self.domain_name.strip()=='' else '/' + self.domain_name + '/') + self.service_name + '?' + query_string
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def _query(self, path, before=None, after=None):
        res = []
        url = '%s/lookup/%s' % (self.server, path)

        params = {}
        if self.limit:
            params['limit'] = self.limit
        if before and after:
            params['time_first_after'] = after
            params['time_last_before'] = before
        else:
            if before:
                params['time_first_before'] = before
            if after:
                params['time_last_after'] = after
        if params:
            url += '?{0}'.format(urllib.urlencode(params))

        req = urllib2.Request(url)
        req.add_header('Accept', 'application/json')
        req.add_header('X-Api-Key', self.apikey)
        http = urllib2.urlopen(req)
        while True:
            line = http.readline()
            if not line:
                break
            yield json.loads(line)
项目:plugin.video.swisscom_tv_unofficial    作者:kungpfui    | 项目源码 | 文件源码
def build_url(query):
    """build url by query"""
    return sys.argv[0] + '?' + urllib.urlencode(query)
项目:cbapi-examples    作者:cbcommunity    | 项目源码 | 文件源码
def printWatchlistHits(serverurl, watchlistid, watchlisttype, rows):
    global cb
    pp = pprint.PrettyPrinter(indent=2)

    print rows

    getparams = {"cb.urlver": 1,
                "watchlist_%d" % watchlistid : "*",
                "rows": rows }

    if watchlisttype == 'modules':
        getparams["cb.q.server_added_timestamp"] = "-1440m"
        r = cb.cbapi_get("%s/api/v1/binary?%s" % (serverurl, urllib.urlencode(getparams)))
        parsedjson = json.loads(r.text)
        pp.pprint(parsedjson)

    elif watchlisttype == 'events':
        getparams["cb.q.start"] = "-1440m"
        r = cb.cbapi_get("%s/api/v1/process?%s" % (serverurl, urllib.urlencode(getparams)))
        parsedjson = json.loads(r.text)
        pp.pprint(parsedjson)
    else:
        return

    print
    print "Total Number of results returned: %d" % len(parsedjson['results'])
    print
项目:IotCenter    作者:panjanek    | 项目源码 | 文件源码
def get(self):
        if self.isAuthenticated():
            self.logger.info("Attempting to stream video from 127.0.0.1:{0}".format(self.localVideoPort))
            self.clear()
            self.set_status(200)
            self.set_header('Connection', 'close')
            self.set_header('Max-Age', '0')
            self.set_header('Expires', '0')
            self.set_header('Cache-Control', 'no-cache, private')
            self.set_header('Pragma', 'no-cache')
            self.set_header('Content-type','multipart/x-mixed-replace; boundary=--BoundaryString')
            self.flush()  

            self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
            self.sock.connect(('127.0.0.1', self.localVideoPort))   
            self.sock.sendall("GET http://127.0.0.1:{0}/ HTTP/1.1\r\nHost: 127.0.0.1:{0}\r\n\r\n".format(self.localVideoPort))

            #read headers from mjpg stream
            line = self.readLine()
            while len(line) > 0:
                self.logger.debug("header line from video server: {0}".format(line))
                line = self.readLine()

            #stream video
            self.logger.info("Starting serving mjpg stream")
            self._auto_finish = False;
            threading.Thread(target = self.streamVideo).start()
        else:
            self.redirect("/login?"+urllib.urlencode({"returnUrl":self.request.uri}))
项目:IotCenter    作者:panjanek    | 项目源码 | 文件源码
def get(self):
        if self.isAuthenticated():          
            devices = self.iotManager.getAllDevices()        
            self.render("views/devices.html", devices=devices)               
        else:
            self.redirect("/login?"+urllib.urlencode({"returnUrl":self.request.uri}))
项目:IotCenter    作者:panjanek    | 项目源码 | 文件源码
def get(self, deviceIdHex):
        if self.isAuthenticated():
            imagesCount = int(tornado.escape.xhtml_escape(self.get_argument("images", "6")))
            deviceModel = self.iotManager.getDevice(deviceIdHex, imagesCount)
            if deviceModel:
                self.render("views/device.html", device = deviceModel, imagesCount=imagesCount)    
            else:
                self.logger.warning("device {0} not found".format(deviceIdHex))            
        else:
            self.redirect("/login?"+urllib.urlencode({"returnUrl":self.request.uri}))
项目:IotCenter    作者:panjanek    | 项目源码 | 文件源码
def get(self):
        if self.isAuthenticated():
            fromTime = tornado.escape.xhtml_escape(self.get_argument("fromTime", (datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d')))
            toTime = tornado.escape.xhtml_escape(self.get_argument("toTime", (datetime.datetime.now() + datetime.timedelta(days=1)).strftime('%Y-%m-%d')))
            aggregation = tornado.escape.xhtml_escape(self.get_argument("aggregation", "minutes"))
            sensors = []
            chartData = []
            chartSensors = []
            showChart = False
            for deviceId, conf in self.iotManager.deviceConfig.items():
                if "values" in conf:
                    for id, varConf in conf["values"].items():
                        parameterName = "{0}.{1}".format(deviceId, id)
                        selected = self.get_argument(parameterName, default=None)
                        sensorObj = SensorFilter(deviceId, conf["name"], id, varConf.get("label", id), varConf.get("type", "number"), selected)
                        sensors.append(sensorObj)
                        if selected:
                            showChart = True
                            chartSensors.append(sensorObj)
            fromTimeParsed = datetime.datetime.strptime(fromTime, '%Y-%m-%d')
            toTimeParsed = datetime.datetime.strptime(toTime, '%Y-%m-%d')
            if showChart:
                self.logger.debug("Showing chart for period {0} - {1} aggregated to {2} for sensors {3}".format(fromTimeParsed, toTimeParsed, aggregation, chartSensors))
                chartData = self.iotManager.database.getChartData(chartSensors, fromTimeParsed, toTimeParsed, aggregation)
                finalChartSensors = []
                for sensor in chartSensors:
                    if not all(sensor.fullId not in record for record in chartData):
                      finalChartSensors.append(sensor)
                chartSensors = finalChartSensors
            self.render("views/history.html", sensors=sensors, fromTime=fromTime, toTime=toTime, aggregation=aggregation, showChart=showChart, chartData=chartData, chartSensors=chartSensors)      
        else:
            self.redirect("/login?"+urllib.urlencode({"returnUrl":self.request.uri}))
项目:WikiLoves    作者:wmes    | 项目源码 | 文件源码
def get_global_usage (api_url, query_string_dict, title_list) :
    usage_dict_ = dict()
    usage_dict_["image"] = dict()
    usage_dict_["article"] = dict()
    raw_api_query_string = unicode(u'|'.join(title_list)).encode('utf-8')
    #print raw_api_query_string
    API_QUERY_STRING["titles"] = raw_api_query_string
    f = urlopen(API_BASE_URL, urlencode(API_QUERY_STRING))
    response = f.read()
    response_dict = json.loads(response)
    for key, value in response_dict["query"]["pages"].iteritems():
        if len(value[u'globalusage']) > 0:
            #print value
            found_dict = dict()
            for item in value[u'globalusage']:
                if (item[u'ns'] == u'0') or (item[u'ns'] == u'104'):
                    if item[u'wiki'] in usage_dict_["article"]:
                        usage_dict_["article"][item[u'wiki']] += 1
                    else:
                        usage_dict_["article"][item[u'wiki']] = 1
                    found_dict[item[u'wiki']] = True
            for key, value in found_dict.iteritems():
                if key in usage_dict_["image"]:
                    usage_dict_["image"][key] += 1
                else:
                    usage_dict_["image"][key] = 1
    #print usage_dict_
    return usage_dict_
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def build(self, _name, *anons, **query):
        """ Build an URL by filling the wildcards in a rule. """
        builder = self.builder.get(_name)
        if not builder:
            raise RouteBuildError("No route with that name.", _name)
        try:
            for i, value in enumerate(anons):
                query['anon%d' % i] = value
            url = ''.join([f(query.pop(n)) if n else f for (n, f) in builder])
            return url if not query else url + '?' + urlencode(query)
        except KeyError as E:
            raise RouteBuildError('Missing URL argument: %r' % E.args[0])
项目:tornado-ssdb-project    作者:ego008    | 项目源码 | 文件源码
def send_email(to, subject, html):
    if isinstance(to, unicode):
        to = to.encode('utf-8')
    if isinstance(subject, unicode):
        subject = subject.encode('utf-8')
    if isinstance(html, unicode):
        html = html.encode('utf-8')

    data = {
        'from': CONFIG.EMAIL_SENDER,
        'to': to,
        'subject': subject,
        'html': html
    }
    data = urlencode(data)
    request = HTTPRequest(
        url=_MAILGUN_API_URL,
        method='POST',
        auth_username='api',
        auth_password=CONFIG.MAILGUN_API_KEY,
        body=data
    )
    client = AsyncHTTPClient()
    try:
        yield client.fetch(request)
    except HTTPError as e:
        try:
            response = e.response.body
        except AttributeError:
            response = None
        logging.exception('failed to send email:\nto: %s\nsubject: %s\nhtml: %s\nresponse: %s', to, subject, html, response)
项目:easydo-ui    作者:easydo-cn    | 项目源码 | 文件源码
def push_state(self, request, title, url=''):
        if request.is_mobile():
            # FIXME hack????webview?????document.title???
            script = '''
                (function(){
                    var $body = $('body');
                    var $iframe = $('<iframe src="/@@/img/favicon.ico" style="display:none;"></iframe>').on('load', function() {
                        setTimeout(function() {
                            $iframe.off('load').remove()
                        }, 0)
                    }).appendTo($body);
                })();
            '''
            self._append_script(script, False)

        title = self._escape_value(title)
        # ??ajax???pushState
        if not request.headers.has_key('kss'):
            self._append_script('document.title=%s' % title, False)
            return

        form = self.request.form
        # ?????
        if form.has_key('back'):
            return
        else:
            form['back'] = True

        kss = request.getURL()
        if form:
            kss += '?%s' % urllib.urlencode(form)

        data = json.dumps({'form':form, 'url':kss})
        if not url:
            url = urllib.unquote(kss)

        script = "History.trigger=false;History.pushState(%s, %s, '%s')" % (data, title, url)
        self._append_script(script, False)
项目:BitBot    作者:crack00r    | 项目源码 | 文件源码
def api_query(self, command, req={}):

        if (command == "returnTicker" or command == "return24Volume"):
            ret = urllib2.urlopen(urllib2.Request('https://poloniex.com/public?command=' + command))
            return json.loads(ret.read())
        elif (command == "returnOrderBook"):
            ret = urllib2.urlopen(urllib2.Request(
                'https://poloniex.com/public?command=' + command + '&currencyPair=' + str(req['currencyPair'])))
            return json.loads(ret.read())
        elif (command == "returnMarketTradeHistory"):
            ret = urllib2.urlopen(urllib2.Request(
                'https://poloniex.com/public?command=' + "returnTradeHistory" + '&currencyPair=' + str(
                    req['currencyPair'])))
            return json.loads(ret.read())
        else:
            req['command'] = command
            req['nonce'] = int(time.time() * 1000)
            post_data = urllib.urlencode(req)

            sign = hmac.new(self.Secret, post_data, hashlib.sha512).hexdigest()
            headers = {
                'Sign': sign,
                'Key': self.APIKey
            }

            ret = urllib2.urlopen(urllib2.Request('https://poloniex.com/tradingApi', post_data, headers))
            jsonRet = json.loads(ret.read())
            return self.post_process(jsonRet)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def emit(self, record):
        """
        Emit a record.

        Send the record to the Web server as a percent-encoded dictionary
        """
        try:
            import httplib, urllib
            host = self.host
            h = httplib.HTTP(host)
            url = self.url
            data = urllib.urlencode(self.mapLogRecord(record))
            if self.method == "GET":
                if (url.find('?') >= 0):
                    sep = '&'
                else:
                    sep = '?'
                url = url + "%c%s" % (sep, data)
            h.putrequest(self.method, url)
            # support multiple hosts on one IP address...
            # need to strip optional :port from host, if present
            i = host.find(":")
            if i >= 0:
                host = host[:i]
            h.putheader("Host", host)
            if self.method == "POST":
                h.putheader("Content-type",
                            "application/x-www-form-urlencoded")
                h.putheader("Content-length", str(len(data)))
            h.endheaders(data if self.method == "POST" else None)
            h.getreply()    #can't do anything with the result
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def printWatchlistHits(serverurl, watchlistid, watchlisttype, rows):
    global cb
    pp = pprint.PrettyPrinter(indent=2)

    print rows

    getparams = {"cb.urlver": 1,
                "watchlist_%d" % watchlistid : "*",
                "rows": rows }

    if watchlisttype == 'modules':
        getparams["cb.q.server_added_timestamp"] = "-1440m"
        r = cb.cbapi_get("%s/api/v1/binary?%s" % (serverurl, urllib.urlencode(getparams)))
        parsedjson = json.loads(r.text)
        pp.pprint(parsedjson)

    elif watchlisttype == 'events':
        getparams["cb.q.start"] = "-1440m"
        r = cb.cbapi_get("%s/api/v1/process?%s" % (serverurl, urllib.urlencode(getparams)))
        parsedjson = json.loads(r.text)
        pp.pprint(parsedjson)
    else:
        return

    print
    print "Total Number of results returned: %d" % len(parsedjson['results'])
    print
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def _getToken():
    result = urllib.urlencode({'grant_type': 'client_credentials', 'client_id': 'kodiexodus-7erse', 'client_secret': 'XelwkDEccpHX2uO8NpqIjVf6zeg'})
    result = client.request('https://anilist.co/api/auth/access_token', post=result, headers={'Content-Type': 'application/x-www-form-urlencoded'}, error=True)
    result = utils.json_loads_as_str(result)
    return result['token_type'], result['access_token']
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def get_plugin_url(queries):
    try:
        query = urllib.urlencode(queries)
    except UnicodeEncodeError:
        for k in queries:
            if isinstance(queries[k], unicode):
                queries[k] = queries[k].encode('utf-8')
        query = urllib.urlencode(queries)
    addon_id = sys.argv[0]
    if not addon_id: addon_id = addonId()
    return addon_id + '?' + query
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def movie(self, imdb, title, localtitle, aliases, year):
        try:
            aliases.append({'country': 'us', 'title': title})
            url = {'imdb': imdb, 'title': title, 'year': year, 'aliases': aliases}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year):
        try:
            aliases.append({'country': 'us', 'title': tvshowtitle})
            url = {'imdb': imdb, 'tvdb': tvdb, 'tvshowtitle': tvshowtitle, 'year': year, 'aliases': aliases}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if url == None: return
            url = urlparse.parse_qs(url)
            url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url])
            url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year):
        try:
            url = {'imdb': imdb, 'tvdb': tvdb, 'tvshowtitle': tvshowtitle, 'year': year}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if url == None: return

            url = urlparse.parse_qs(url)
            url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url])
            url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year):
        '''
        Takes TV show information, encodes it as name value pairs, and returns
        a string of url params

        Keyword arguments:

        imdb -- string - imdb tv show id
        tvdb -- string - tvdb tv show id
        tvshowtitle -- string - name of the tv show
        localtvshowtitle -- string - regional title of the tv show
        year -- string - year the tv show was released

        Returns:

        url -- string - url encoded params

        '''
        try:
            data = {
                'imdb': imdb,
                'tvdb': tvdb,
                'tvshowtitle': tvshowtitle,
                'year': year
            }
            url = urllib.urlencode(data)

            return url

        except Exception:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def movie(self, imdb, title, localtitle, aliases, year):
        try:
            url = {'imdb': imdb, 'title': title, 'year': year}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def movie(self, imdb, title, localtitle, aliases, year):
        try:
            url = {'imdb': imdb, 'title': title, 'year': year}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def movie(self, imdb, title, localtitle, aliases, year):
        try:
            aliases.append({'country': 'us', 'title': title})
            url = {'imdb': imdb, 'title': title, 'year': year, 'aliases': aliases}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year):
        try:
            aliases.append({'country': 'us', 'title': tvshowtitle})
            url = {'imdb': imdb, 'tvdb': tvdb, 'tvshowtitle': tvshowtitle, 'year': year, 'aliases': aliases}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def movie(self, imdb, title, localtitle, aliases, year):
        try:
            url = {'imdb': imdb, 'title': title, 'year': year}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year):
        try:
            url = {'imdb': imdb, 'tvdb': tvdb, 'tvshowtitle': tvshowtitle, 'year': year}
            url = urllib.urlencode(url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if url == None: return

            url = urlparse.parse_qs(url)
            url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url])
            url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode
            url = urllib.urlencode(url)
            return url
        except:
            return