Python urllib 模块,request() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.request()

项目:docklet    作者:unias    | 项目源码 | 文件源码
def dorequest(url, data = "", method = 'GET'):
    try: 
        if method == 'GET':
            response = urllib.request.urlopen(url, timeout=10).read()
        else:
            # use PUT/DELETE/POST, data should be encoded in ascii/bytes 
            request = urllib.request.Request(url, data = data.encode('ascii'), method = method)
            response = urllib.request.urlopen(request, timeout=10).read()
    # etcd may return json result with response http error code
    # http error code will raise exception in urlopen
    # catch the HTTPError and get the json result
    except urllib.error.HTTPError as e:
        # e.fp must be read() in this except block.
        # the e will be deleted and e.fp will be closed after this block
        response = e.fp.read()
    # response is encoded in bytes. 
    # recoded in utf-8 and loaded in json
    result = json.loads(str(response, encoding='utf-8'))
    return result


# client to use etcd
# not all APIs are implemented below. just implement what we want
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def _validate_captcha(data):
    """
    Validates a captcha with google's reCAPTCHA.

    Args:
        data: the posted form data
    """

    settings = api.config.get_settings()["captcha"]

    post_data = urllib.parse.urlencode({
        "secret": api.config.reCAPTCHA_private_key,
        "response": data["g-recaptcha-response"],
        "remoteip": flask.request.remote_addr
    }).encode("utf-8")

    request = urllib.request.Request(api.config.captcha_url, post_data, method='POST')
    response = urllib.request.urlopen(request).read().decode("utf-8")
    parsed_response = json.loads(response)
    return parsed_response['success'] == True
项目:facebook-scraper    作者:bacilo    | 项目源码 | 文件源码
def create_request_object(rel_url, req_type, req_to, job_id):
        """
        Creates request strings to use for batch_requests,
        based on rel_url

        type: can be used to determine the type of request when reading
            the response
        to: can be used to link certain attributes (like 'reactions')
            to the post they belong
        """
        # print(rel_url)
        return {
            'req_type': req_type,
            'req_to': req_to,
            'job_id': job_id,
            'req': {
                "method": "GET",
                "relative_url": "{}".format(rel_url)
                }
            }
项目:facebook-scraper    作者:bacilo    | 项目源码 | 文件源码
def create_post_request(self, post_id, job_id):
        """
        Creates a request string for a post to use in
        batch_requests based on post_id
        Note: could add limit as well?
        """
        return self.create_request_object((
            '{}?fields={},{},{},{}').format(
                post_id,
                self.str_reactions_query(),
                self.str_comments_query(),
                self.str_sharedposts_query(),
                self.str_attachments_query()),
                                          req_type='post',
                                          req_to='',
                                          job_id=job_id)

    # @staticmethod
    # def encode_batch(batch):
    #     """
    #     URL encodes the batch to prepare it for a graph API request
    #     """
    #     _json = json.dumps(batch)
    #     _url = urllib.parse.urlparse(_json)
    #     return _url
项目:facebook-scraper    作者:bacilo    | 项目源码 | 文件源码
def extend_token(self):
        """
        Extends access token and replaces the previously used one
        Prints error message if API Key or API Secret not found

        TODO: Replace also config file once that file is defined
        TODO: Additional checks on the response
        """
        if not self.api_key or not self.api_secret:
            logging.error('No API Key and/or API Secret defined')
            return None

        resp = self.request(
            req='oauth/access_token?grant_type=fb_exchange_token&client_id={}'
            '&client_secret={}&fb_exchange_token={}'.format(
                self.api_key, self.api_secret, self.access_token))
        msg = json.loads(resp.read().decode('utf-8'))
        self.access_token = msg['access_token']
        logging.info('Extended Access Token: \n%s', self.access_token)
        return self.access_token
项目:scientific-paper-summarisation    作者:EdCo95    | 项目源码 | 文件源码
def getJournalURL(jname):
# get journal URL given the journal name for retrieving article PIIs
    urlstr = "http://api.elsevier.com/sitemap/page/sitemap/" + jname[0].lower() + ".html"
    retl = ""
    with urllib.request.urlopen(urlstr) as url:
        response = url.read()
        linkcnt = 0
        for link in BeautifulSoup(response, parse_only=SoupStrainer("a")):
            if linkcnt == 0:
                linkcnt += 1
                continue
            if link.has_attr("href"):
                if link.text.lower() == jname.lower():
                    #print(link["href"])
                    retl = link["href"]
                    break
            linkcnt += 1
    return retl
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def __init__(self, username=None, password=None, version=None, debug=None,
                 requesttype=None, baseurl=None, site=None):
        if username:
            self.username = username
        if password:
            self.password = password
        if version:
            self.version = version
        if debug:
            self.debug = debug
        if requesttype:
            self.requesttype = requesttype
        if baseurl:
            self.baseurl = baseurl
        if site:
            self.site = site

        ssl._create_default_https_context = ssl._create_unverified_context # This is the way to allow unverified SSL
        self.cj = http.cookiejar.CookieJar()
        opener = urllib.request.build_opener(urllib.request.HTTPHandler(debuglevel=1 if self.debug else 0),
                                             urllib.request.HTTPSHandler(debuglevel=1 if self.debug else 0),
                                             urllib.request.HTTPCookieProcessor(self.cj))
        opener.addheaders = [('User-agent', 'Mozilla/5.0')]
        urllib.request.install_opener(opener)
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def __init__(self, username=None, password=None, debug=None,
                 requesttype=None, baseurl=None):
        if username:
            self.username = username
        if password:
            self.password = password
        if debug:
            self.debug = debug
        if requesttype:
            self.requesttype = requesttype
        if baseurl:
            self.baseurl = baseurl

        ssl._create_default_https_context = ssl._create_unverified_context # This is the way to allow unverified SSL
        self.cj = http.cookiejar.CookieJar()
        opener = urllib.request.build_opener(urllib.request.HTTPHandler(debuglevel=1 if self.debug else 0),
                                             urllib.request.HTTPSHandler(debuglevel=1 if self.debug else 0),
                                             urllib.request.HTTPCookieProcessor(self.cj))
        opener.addheaders = [('User-agent', 'Mozilla/5.0')]
        urllib.request.install_opener(opener)
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def request(self, url, data=None, headers=None, method='POST', baseurl = None):
        # req = None
        headers = headers or {
            'Content-type': 'application/json',
            'Referer': 'https://account.ubnt.com/login?redirect=https%3A%2F%2Funifi.ubnt.com',
            'Origin': 'https://account.ubnt.com',
            'dnt': 1
        }
        if not baseurl:
            baseurl = self.baseurl
        self.log('Request to %s with data %s' % (baseurl + url, data))
        if data:
            req = urllib.request.Request(url=baseurl + url, data=json.dumps(data).encode("utf8"), headers=headers, method=method)
        else:
            req = urllib.request.Request(url=baseurl + url, headers=headers, method='GET')
        return urllib.request.urlopen(req)
项目:Blender-WMO-import-export-scripts    作者:WowDevTools    | 项目源码 | 文件源码
def get_api_raw(self, url):
        request = urllib.request.Request(self._api_url + url)
        try:
            result = urllib.request.urlopen(request)
        except urllib.error.HTTPError as e:
            self._error = "HTTP error"
            self._error_msg = str(e.code)
            self._update_ready = None
        except urllib.error.URLError as e:
            self._error = "URL error, check internet connection"
            self._error_msg = str(e.reason)
            self._update_ready = None
            return None
        else:
            result_string = result.read()
            result.close()
            return result_string.decode()
        # if we didn't get here, return or raise something else


    # result of all api calls, decoded into json format
项目:transfer    作者:viur-framework    | 项目源码 | 文件源码
def onSkeyAvailable(self, request=None ):
        """
            New SKey got avaiable
        """
        self.isRequesting = False
        try:
            skey = NetworkService.decode( request )
        except:
            SecurityTokenProvider.errorCount += 1
            self.isRequesting = False
            return
        if SecurityTokenProvider.errorCount>0:
            SecurityTokenProvider.errorCount = 0
        self.isRequesting = False
        if not skey:
            return
        try:
            self.queue.put( (skey,time.time()), False )
        except QFull:
            print( "Err: Queue FULL" )
项目:transfer    作者:viur-framework    | 项目源码 | 文件源码
def onFinished(self ):

        self.hasFinished = True
        if self.request.error()==self.request.NoError:
            self.requestSucceeded.emit( self )
        else:
            try:
                errorDescr = NetworkErrorDescrs[ self.request.error() ]
            except: #Unknown error 
                errorDescr = None
            if errorDescr:
                QtGui.QMessageBox.warning( None, "Networkrequest Failed", "The request to \"%s\" failed with: %s" % (self.url, errorDescr) )
            self.requestFailed.emit( self, self.request.error() )
        self.finished.emit( self )
        self.logger.debug("Request finished: %s", str(self) )
        self.logger.debug("Remaining requests: %s",  len(NetworkService.currentRequests) )
项目:holcrawl    作者:shaypal5    | 项目源码 | 文件源码
def _get_movie_url_by_name(movie_name, year=None):
    query = SEARCH_URL.format(movie_name=_parse_name_for_search(movie_name))
    request = urllib.request.Request(query, headers=_HEADERS)
    search_res = bs(urllib.request.urlopen(request), "html.parser")
    results = search_res.find_all("li", {"class": "result"})
    correct_result = None
    for result in results:
        title = result.find_all(
            "h3", {"class": "product_title"})[0].contents[0].contents[0]
        title_match = title.strip().lower() == movie_name.strip().lower()
        if year is None and title_match:
            correct_result = result
        else:
            year_match = str(year) in str(result)
            if title_match and year_match:
                correct_result = result
    movie_url_suffix = correct_result.find_all("a")[0]['href']
    return METACRITIC_URL + movie_url_suffix


# === critics reviews page ===
项目:holcrawl    作者:shaypal5    | 项目源码 | 文件源码
def _get_critics_reviews_props(movie_url):
    critics_url = movie_url + CRITICS_REVIEWS_URL_SUFFIX
    critics_request = urllib.request.Request(critics_url, headers=_HEADERS)
    critics_page = bs(urllib.request.urlopen(critics_request), "html.parser")
    critics_props = {}
    critics_props['metascore'] = int(critics_page.find_all(
        "span", {"class": SCORE_CLASSES})[0].contents[0])
    critic_reviews = []
    for review in critics_page.find_all("div", {"class": "review"}):
        try:
            critic_reviews.append(_get_critic_review_props(review))
        except Exception:
            continue
    critics_props['pro_critic_reviews'] = critic_reviews
    return critics_props


# === user reviews page ===
项目:holcrawl    作者:shaypal5    | 项目源码 | 文件源码
def _get_user_reviews_from_page(users_page):
    review_elements = users_page.find_all("div", {"class": "review"})
    user_reviews = []
    for review in review_elements:
        try:
            user_reviews.append(_get_user_review_props(review))
        except Exception:
            continue
    # print("Extracted {} reviews.".format(len(user_reviews)))
    nexts = users_page.find_all("a", {"class": "action", "rel": "next"})
    if len(nexts) > 0:
        next_url = METACRITIC_URL + nexts[0]['href']
        next_request = urllib.request.Request(next_url, headers=_HEADERS)
        next_page = bs(urllib.request.urlopen(next_request), "html.parser")
        user_reviews += _get_user_reviews_from_page(next_page)
    return user_reviews
项目:holcrawl    作者:shaypal5    | 项目源码 | 文件源码
def _get_user_reviews_props(movie_url):
    users_url = movie_url + USERS_REVIEWS_URL_SUFFIX
    users_request = urllib.request.Request(users_url, headers=_HEADERS)
    users_page = bs(urllib.request.urlopen(users_request), "html.parser")
    users_props = {}
    users_props['movie_name'] = users_page.find_all(
        "meta", {"property": "og:title"})[0]['content']
    user_score = float(users_page.find_all(
        "span", {"class": USER_SCORE_CLASSES})[0].contents[0])
    users_props['avg_user_score'] = user_score
    for rating in ['positive', 'mixed', 'negative']:
        users_props['{}_rating_frequency'.format(
            rating)] = _get_user_rating_freq(users_page, rating)
    users_props['user_reviews'] = _get_user_reviews_from_page(users_page)
    return users_props


# === metacritic crawling ===
项目:GANGogh    作者:rkjones4    | 项目源码 | 文件源码
def soupit(j,genre):
    try:
        url = "https://www.wikiart.org/en/paintings-by-genre/"+ genre+ "/" + str(j)
        html = urllib.request.urlopen(url)
        soup =  BeautifulSoup(html)
        found = False
        urls = []
        for i in str(soup.findAll()).split():
            if i == 'data':
                found = True
            if found == True:
                if '}];' in i:
                    break;
                if 'https' in i:
                    web = "http" + i[6:-2]
                    urls.append(web)
                    j = j+1
        return urls
    except Exception as e:
        print('Failed to find the following genre page combo: '+genre+str(j))


#Given a url for an image, we download and save the image while also recovering information about the painting in the saved name depending on the length of the file.split('/') information (which corresponds to how much information is available)
项目:TorrentBro    作者:subins2000    | 项目源码 | 文件源码
def files(self):
        if not self._files:
            path = '/ajax_details_filelist.php'
            url = self.url.path(path).query_param('id', self.id)

            request = urllib.request.Request(
                url, headers={'User-Agent': "Magic Browser"})
            response = urllib.request.urlopen(request).read()

            root = html.document_fromstring(response)

            rows = root.findall('.//tr')

            if len(rows) == 1 and rows[0].find('td').get('colspan') == str(2):
                self._files = {}
            else:
                for row in rows:
                    name, size = [unicode(v.text_content())
                              for v in row.findall('.//td')]
                    self._files[name] = size.replace('\xa0', ' ')
        return self._files
项目:pipresenter    作者:Turakar    | 项目源码 | 文件源码
def connect(username, password):
    global token, userid, files

    token = None
    userid = None
    files = None

    token_req = urllib.request.Request(base_url + token_url % (urllib.parse.quote(username, safe=""), 
                                                                urllib.parse.quote(password, safe="")))
    with urllib.request.urlopen(token_req) as response:
        result = json.loads(response.readall().decode("utf-8"))
        if "errorcode" in result:
            raise Exception(result["errorcode"])
        token = result["token"]

    siteinfo = call_wsfunction("moodle_webservice_get_siteinfo")
    userid = siteinfo["userid"]

    try:
        os.makedirs(download_folder)
    except OSError as exc:
        if exc.errno == errno.EEXIST and os.path.isdir(download_folder):
            pass
        else:
            raise
项目:azure-search-ta    作者:yokawasa    | 项目源码 | 文件源码
def textanalyze(self,index_name, analyzer, text):
        # Create JSON string for request body
        reqobject={}
        reqobject['text'] = text
        reqobject['analyzer'] = analyzer
        io=StringIO()
        json.dump(reqobject, io)
        req_body = io.getvalue()
        # HTTP request to Azure search REST API
        conn = httplib.HTTPSConnection(self.__api_url)
        conn.request("POST",
                u"/indexes/{0}/analyze?api-version={1}".format(index_name, _AZURE_SEARCH_API_VERSION),
                req_body, self.headers)
        response = conn.getresponse()
        #print "status:", response.status, response.reason
        data = (response.read()).decode('utf-8')
        #print("data:{}".format(data))
        conn.close()
        return data
项目:picoCTF    作者:royragsdale    | 项目源码 | 文件源码
def _validate_captcha(data):
    """
    Validates a captcha with google's reCAPTCHA.

    Args:
        data: the posted form data
    """

    settings = api.config.get_settings()["captcha"]

    post_data = urllib.parse.urlencode({
        "secret": api.config.reCAPTCHA_private_key,
        "response": data["g-recaptcha-response"],
        "remoteip": flask.request.remote_addr
    }).encode("utf-8")

    request = urllib.request.Request(api.config.captcha_url, post_data, method='POST')
    response = urllib.request.urlopen(request).read().decode("utf-8")
    parsed_response = json.loads(response)
    return parsed_response['success'] == True
项目:gui_tool    作者:UAVCAN    | 项目源码 | 文件源码
def _do_pip_check():
    request = urllib.request.Request('https://api.github.com/repos/UAVCAN/gui_tool/tags',
                                     headers={
                                         'Accept': 'application/vnd.github.v3+json',
                                     })
    with urllib.request.urlopen(request) as response:
        data = response.read()

    data = json.loads(data.decode('utf8'), encoding='utf8')

    newest_tag_name = data[0]['name']
    logger.debug('Newest tag: %r', newest_tag_name)

    match = re.match(r'^.*?(\d{1,3})\.(\d{1,3})', newest_tag_name)

    version_tuple = int(match.group(1)), int(match.group(2))
    logger.debug('Parsed version tuple: %r', version_tuple)

    if _version_tuple_to_int(version_tuple) > _version_tuple_to_int(__version__):
        git_url = 'https://github.com/UAVCAN/gui_tool'
        return 'pip3 install --upgrade git+<a href="{0}">{0}</a>@{1}'.format(git_url, newest_tag_name)


# noinspection PyBroadException
项目:zimfarm    作者:openzim    | 项目源码 | 文件源码
def put_status(self):
        host = os.getenv('HOST')
        url = "https://{host}/api/task/{id}".format(host=host, id=self.request.id)
        payload = {
            'status': self.status,
            'steps': self.steps,
            'file_name': self.zim_file_name,
            'time_stamp': {
                'started': self.start_time,
                'ended': self.ended_time
            }
        }
        headers = {
            'Content-Type': 'application/json; charset=utf-8',
            'token': self.token
        }
        request = urllib.request.Request(url, json.dumps(payload, cls=JSONEncoder).encode('utf-8'),
                                         headers, method='PUT')
        try:
            with urllib.request.urlopen(request) as response:
                code = response.code
        except HTTPError as error:
            code = error.code
项目:PEBA    作者:dtag-dev-sec    | 项目源码 | 文件源码
def fixUrl(destinationPort, transport, url, peerType):
    """
        fixes the URL (original request string)
    """
    transportProtocol = ""
    if transport.lower() in "udp" or transport.lower() in "tcp":
        transportProtocol="/"+transport

    if ("honeytrap" in peerType):
        return "Attack on port " + str(destinationPort) + transportProtocol

    # prepared dionaea to output additional information in ticker
    if ("Dionaea" in peerType):
        return "Attack on port " + str(destinationPort)+ transportProtocol

    return url
项目:IM_Climate    作者:IMDProjects    | 项目源码 | 文件源码
def _call_ACIS(self, kwargs, **moreKwargs):
        '''
        Core method for calling the ACIS services.

        Returns python dictionary by de-serializing json response
        '''
        #self._formatInputDict(**kwargs)
        kwargs.update(moreKwargs)
        self._input_dict = self._stripNoneValues(kwargs)
        self.url = self.baseURL + self.webServiceSource
        if pyVersion == 2:      #python 2.x
            params = urllib.urlencode({'params':json.dumps(self._input_dict)})
            request = urllib2.Request(self.url, params, {'Accept':'application/json'})
            response = urllib2.urlopen(request)
            jsonData = response.read()
        elif pyVersion == 3:    #python 3.x
            params = urllib.parse.urlencode({'params':json.dumps(self._input_dict)})
            params = params.encode('utf-8')
            req = urllib.request.urlopen(self.url, data = params)
            jsonData = req.read().decode()
        return json.loads(jsonData)
项目:postcards    作者:abertschi    | 项目源码 | 文件源码
def _fetch_img_urls(self, keyword, safe_search=False):
        # bing img search, https://gist.github.com/stephenhouser/c5e2b921c3770ed47eb3b75efbc94799

        url = self._get_bing_url(keyword, safe_search=safe_search)
        self.logger.debug('search url {}'.format(url))

        header = {
            'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/43.0.2357.134 Safari/537.36"}

        soup = BeautifulSoup(urllib.request.urlopen(urllib.request.Request(url, headers=header)), 'html.parser')
        imgs = []  # contains the link for Large original images, type of  image
        for a in soup.find_all("a", {"class": "iusc"}):
            mad = json.loads(a["mad"])
            turl = mad["turl"]
            m = json.loads(a["m"])
            murl = m["murl"]

            image_name = urllib.parse.urlsplit(murl).path.split("/")[-1]
            imgs.append((image_name, turl, murl))

        return imgs
项目:progrobot    作者:petr-kalinin    | 项目源码 | 文件源码
def search(query):
    request = {
        'order': 'desc',
        'sort': 'relevance',
        'q': query,
        'answers': 1,
        'site': 'stackoverflow',
        'filter': 'withbody',
        'pagesize': 1
        }
    response = send_request("search/advanced", request)
    question = response["items"][0]
    answer = get_answer(question)

    return (question["link"] + "\n\n"
            + "<b>" + question["title"] + "</b>\n\n" 
            + format_user_data(question) + "\n\n"
            + "<b>Answer:</b>\n\n"
            + format_user_data(answer) +"\n\n"
            + "? StackOverflow users, CC-BY-SA 3.0")
项目:danmu-bilibili    作者:saberxxy    | 项目源码 | 文件源码
def getDanmu(cid):
    if not cid:
        return "?????"
    try:
        cid_url = "http://comment.bilibili.com/%s.xml" % cid
        danmu_xml = urllib.request.urlopen(cid_url).read()
        xml = zlib.decompressobj(-zlib.MAX_WBITS).decompress(danmu_xml).decode()  # ????????

        return xml  # ?????
    except Exception:
        pass




# ??xml??????????????
项目:danmu-bilibili    作者:saberxxy    | 项目源码 | 文件源码
def camouflageBrowser(url):
    myHeaders = [
                  "Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0",
                  "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.111 Safari/537.36",
                  "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:37.0) Gecko/20100101 Firefox/37.0",
                  "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:30.0) Gecko/20100101 Firefox/30.0",
                  "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.11 TaoBrowser/3.5 Safari/536.11"
                 ]  # ?????
    try:
        content = urllib.request.urlopen(url).read()
        return content
    except Exception:
        pass


# ?P??????
项目:xgovctf    作者:alphagov    | 项目源码 | 文件源码
def _validate_captcha(data):
    """
    Validates a captcha with google's reCAPTCHA.

    Args:
        data: the posted form data
    """

    post_data = urllib.parse.urlencode({
        "secret": api.config.reCAPTCHA_private_key,
        "response": data["g-recaptcha-response"],
        "remoteip": flask.request.remote_addr
    }).encode("utf-8")

    request = urllib.request.Request(api.config.captcha_url, post_data, method='POST')
    response = urllib.request.urlopen(request).read().decode("utf-8")
    parsed_response = json.loads(response)
    return parsed_response['success'] == True
项目:supotato    作者:everettjf    | 项目源码 | 文件源码
def force_update_db():
    dbdir = '/var/tmp/supotato'
    dbpath = '/var/tmp/supotato/supotato.db'
    if os.path.exists(dbpath):
        os.remove(dbpath)

    if not os.path.exists(dbdir):
        os.mkdir(dbdir)

    import urllib.request
    url = "https://everettjf.github.io/app/supotato/supotato.db"

    print('CocoaPods Index Database not exist , will download from ' + url)
    print('Please wait for a moment (about 5 MB file will be downloaded)')

    # Download the file from `url` and save it locally under `file_name`:
    with urllib.request.urlopen(url) as response, open(dbpath, 'wb') as out_file:
        data = response.read() # a `bytes` object
        out_file.write(data)

    print('Download completed')
    print('Update succeed')
项目:B.E.N.J.I.    作者:the-ethan-hunt    | 项目源码 | 文件源码
def OnClicked(self):
        r = sr.Recognizer()
        with sr.Microphone() as source:
            speak.say('Hey I am Listening ')
            speak.runAndWait()
            audio = r.listen(source)
        try:
            put=r.recognize_google(audio)
            self.displayText(put)
            self.textBox.insert('1.2',put)
            self.textBox.delete('1.2',tk.END)
            events(self,put)
        except sr.UnknownValueError:
            self.displayText("Could not understand audio")
        except sr.RequestError as e:
            self.displayText("Could not request results; {0}".format(e))
项目:B.E.N.J.I.    作者:the-ethan-hunt    | 项目源码 | 文件源码
def OnClicked(self):
        r = sr.Recognizer()
        with sr.Microphone() as source:
            speak.say('Hey I am Listening ')
            speak.runAndWait()
            audio = r.listen(source)
        try:
            put=r.recognize_google(audio)

            self.displayText(put)
            self.textBox.insert('1.2',put)
            put=put.lower()
            put = put.strip()
            #put = re.sub(r'[?|$|.|!]', r'', put)
            link=put.split()
            events(self,put,link)
        except sr.UnknownValueError:
            self.displayText("Could not understand audio")
        except sr.RequestError as e:
            self.displayText("Could not request results; {0}".format(e))
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def setUp(self):
        # the URLs for now which will have the WSDL files and the XSD file
        #urlparse.urljoin('file:', urllib.pathname2url(os.path.abspath("service.xml")))
        import urllib
        import os
        from urllib.parse import urlparse
        from urllib.request import pathname2url

        query_services_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-query-1.7.wsdl')))
        userservices_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-auth-1.7.wsdl')))

        # initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
        query_services_client = Client(query_services_url,
                                       transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
        user_services_client = Client(userservices_url,
                                      transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))

        self.test_user_services_object = SymantecUserServices(user_services_client)
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_poll_in_Push(self, mock):
        reply = {"requestId": "ac123", "status": "6040", "statusMessage": "Mobile push request sent",
                 "pushDetail": {"pushCredentialId": "133709001", "pushSent": True},
                 "transactionId": "RealTransactionId",
                 "authContext": {"params": {"Key": "authLevel.level", "Value": 10}}}

        mock.SymantecServices.authenticateUserWithPushThenPolling.return_value = Mock()
        mock.authenticateUserWithPushThenPolling.return_value.hash.return_value = reply

        response = symantec_package.lib.allServices.SymantecServices.authenticateUserWithNothing()
        self.assertTrue(response.hash() != reply)

        response = symantec_package.lib.allServices.SymantecServices.authenticateUserWithPushThenPolling("Parameters Here!")

        self.assertTrue((response.hash()) == reply)

        self.assertTrue(response.hash()["status"] == "6040")
        self.assertTrue(response.hash()['requestId'] == "ac123")
        self.assertTrue(response.hash()['statusMessage'] == "Mobile push request sent")
        self.assertTrue(response.hash()["pushDetail"]['pushCredentialId'] == "133709001")
        self.assertTrue(response.hash()["pushDetail"]['pushSent'] is True)
        self.assertTrue(response.hash()['transactionId'] == "RealTransactionId")
        self.assertTrue(response.hash()['authContext']['params']['Key'] == "authLevel.level")
        self.assertTrue(response.hash()['authContext']['params']['Value'] == 10)
        pass
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def setUp(self):
        # the URLs for now which will have the WSDL files and the XSD file
        import urllib
        import os
        from urllib.parse import urlparse
        from urllib.request import pathname2url

        managementservices_url = urllib.parse.urljoin('file:', pathname2url(
            os.path.abspath('../wsdl_files/vipuserservices-mgmt-1.7.wsdl')))
        # managementservices_url = 'http://webdev.cse.msu.edu/~huynhall/vipuserservices-mgmt-1.7.wsdl'

        # initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
        self.management_client = Client(managementservices_url,
                                        transport=HTTPSClientCertTransport('vip_certificate.crt',
                                                                           'vip_certificate.crt'))

        self.test_management_services_object = SymantecManagementServices(self.management_client)
        pass
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_mock_create_user(self, mock_managementservices):
        reply = {'requestId': 'create_123', 'status': '0000', 'statusMessage': 'Success'}
        # Configure the mock to return a response with an OK status code. Also, the mock should have
        # a `json()` method that returns a list of todos.
        mock_managementservices.createUser.return_value = Mock()
        mock_managementservices.createUser.return_value.json.return_value = reply

        # Call the service, which will send a request to the server.
        response = symantec_package.lib.managementService.SymantecManagementServices.createUser("create_123",
                                                                                                "new_user3")

        print(response.json())
        # If the request is sent successfully, then I expect a response to be returned.
        self.assertTrue((response.json()) == reply)
        self.assertTrue((response.json()["status"] == "0000"))
        pass
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_mock_delete_user(self, mock_managementservices):
        reply = {'requestId': 'delete_123', 'status': '0000', 'statusMessage': 'Success'}

        # Configure the mock to return a response with an OK status code. Also, the mock should have
        # a `json()` method that returns a list of todos.
        mock_managementservices.deleteUser.return_value = Mock()
        mock_managementservices.deleteUser.return_value.json.return_value = reply

        # Call the service, which will send a request to the server.
        response = symantec_package.lib.managementService.SymantecManagementServices.deleteUser("delete_123",
                                                                                                "new_user3")

        print(response.json())
        # If the request is sent successfully, then I expect a response to be returned.
        self.assertTrue((response.json()) == reply)
        self.assertTrue((response.json()["status"] == "0000"))
        pass
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_mock_add_STANDARDOTP_credential(self, mock_managementservices):
        reply = {'statusMessage': "Success", 'requestId': 'add_otp_cred', 'status': '0000'}

        # Configure the mock to return a response with an OK status code. Also, the mock should have
        # a `json()` method that returns a list of todos.
        mock_managementservices.addCredentialOtp.return_value = Mock()
        mock_managementservices.addCredentialOtp.return_value.json.return_value = reply

        response = symantec_package.lib.managementService.SymantecManagementServices.addCredentialOtp("add_otp_cred", "new_user3",
                                                                               "", "STANDARD_OTP", \
                                                                               "678066")  # change with what's on your device
        print(response.json())
        # If the request is sent successfully, then I expect a response to be returned.
        self.assertTrue((response.json()) == reply)
        self.assertTrue((response.json()["status"] == "0000"))
        pass
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_mock_update_STANDARDOTP_credential(self, mock_managementservices):
        reply = {'statusMessage': 'Success', 'requestId': 'update_123', 'status': '0000'}

        # Configure the mock to return a response with an OK status code. Also, the mock should have
        # a `json()` method that returns a list of todos.
        mock_managementservices.updateCredential.return_value = Mock()
        mock_managementservices.updateCredential.return_value.json.return_value = reply

        response = symantec_package.lib.managementService.SymantecManagementServices.updateCredential("update_123", "gabe_phone",
                                                                                                      "", "STANDARD_OTP",
                                                                                                      "My personal cell phone")
        print(response.json())
        # If the request is sent successfully, then I expect a response to be returned.
        self.assertTrue((response.json()) == reply)
        self.assertTrue((response.json()["status"] == "0000"))
        pass
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_mock_setTemporaryPasswordSMSDelivery(self, mock_managementservices):
        reply = {'status': '0000', 'requestId': 'setTempPWD', 'statusMessage': 'Success', 'temporaryPassword': '998241'}

        # Configure the mock to return a response with an OK status code. Also, the mock should have
        # a `json()` method that returns a list of todos.
        mock_managementservices.setTemporaryPasswordSMSDelivery.return_value = Mock()
        mock_managementservices.setTemporaryPasswordSMSDelivery.return_value.json.return_value = reply

        response = symantec_package.lib.managementService.SymantecManagementServices.setTemporaryPasswordSMSDelivery("setTempPWD",
                                                                                                      "gabe_phone",
                                                                                                      "12313608781",
                                                                                                      "17879481605")
        print(response.json())
        # If the request is sent successfully, then I expect a response to be returned.
        self.assertTrue((response.json()) == reply)
        self.assertTrue((response.json()["status"] == "0000"))
        pass
项目:sonos    作者:gerard33    | 项目源码 | 文件源码
def sendMessage(self, data, method, url):
        conn = http.client.HTTPConnection(Parameters["Address"] + ":1400")
        headers = {"Content-Type": 'text/xml; charset="utf-8"', "SOAPACTION": method}
        conn.request("POST", url, data, headers)
        response = conn.getresponse()
        conn.close()

        if response.status == 200:
            data = response.read().decode("utf-8")
            LogMessage(str(data))
            self.parseMessage(data)
        else:
            Domoticz.Error("Unexpected response status received in function sendMessage (" + str(response.status) + ", " + str(response.reason) + "). \
                            The following command is sent: " + str(method) + ", " + str(url))
        return

    # Process message from Sonos
项目:olami-api-quickstart-python-samples    作者:olami-developers    | 项目源码 | 文件源码
def getRecognitionResult(self, apiName, seqValue):
        query = self.getBasicQueryString(apiName, seqValue) + "&stop=1"

        '''Request speech recognition service by HTTP GET'''
        url = str(self.apiBaseUrl) + "?" + str(query)
        req = urllib.request.Request(url,headers = {'Cookie': self.cookies})
        with urllib.request.urlopen(req) as f:
            getResponse = f.read().decode()

        '''Now you can check the status here.'''
        print("Sending 'GET' request to URL : " + self.apiBaseUrl)
        print("get parameters : " + str(query))
        print("Response Code : " + str(f.getcode()))

        '''Get the response'''
        return str(getResponse)
项目:MailFail    作者:m0rtem    | 项目源码 | 文件源码
def load_url(url, timeout):
    # Build URL query to email signup page
    urlquery = "http://" + url + "/m-users-a-email_list-job-add-email-" + targetEmail + "-source-2.htm"
    print_out(Style.BRIGHT + Fore.WHITE + "Sending request to: " + url)
    # Build the request
    req = urllib.request.Request(
        urlquery, 
        data=None, 
        headers={
            'User-Agent': random.choice(useragents),
            'Host': url
        }
    )
    # Send
    try:
        f = urllib.request.urlopen(req)
        print_out(Style.BRIGHT + Fore.GREEN + "Successfully sent!")
        f.close()
    except urllib.error.URLError as e:
        print_out(Style.BRIGHT + Fore.RED + e.reason)
项目:mfnf-pdf-export    作者:Lodifice    | 项目源码 | 文件源码
def retry(max_retries):
    """
    Retry a function `max_retries` times.
    taken from https://stackoverflow.com/questions/23892210/python-catch-timeout-and-repeat-request.
    """
    def retry(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            num_retries = 0
            while num_retries <= max_retries:
                try:
                    ret = func(*args, **kwargs)
                    break
                except HTTPError:
                    if num_retries == max_retries:
                        raise
                    num_retries += 1
                    time.sleep(1)
            return ret
        return wrapper
    return retry
项目:gaffer-tools    作者:gchq    | 项目源码 | 文件源码
def execute_get(self, operation, headers={}):
        url = self._host + operation.get_url()
        headers['Content-Type'] = 'application/json;charset=utf-8'
        request = urllib.request.Request(url, headers=headers)

        try:
            response = self._opener.open(request)
        except urllib.error.HTTPError as error:
            error_body = error.read().decode('utf-8')
            new_error_string = ('HTTP error ' +
                                str(error.code) + ' ' +
                                error.reason + ': ' +
                                error_body)
            raise ConnectionError(new_error_string)

        return response.read().decode('utf-8')
项目:gaffer-tools    作者:gchq    | 项目源码 | 文件源码
def is_operation_supported(self, operation=None, headers={}):
        url = self._host + '/graph/operations/' + operation.get_operation()
        headers['Content-Type'] = 'application/json;charset=utf-8'

        request = urllib.request.Request(url, headers=headers)

        try:
            response = self._opener.open(request)
        except urllib.error.HTTPError as error:
            error_body = error.read().decode('utf-8')
            new_error_string = ('HTTP error ' +
                                str(error.code) + ' ' +
                                error.reason + ': ' +
                                error_body)
            raise ConnectionError(new_error_string)

        response_text = response.read().decode('utf-8')

        return response_text
项目:tzgzbot    作者:mapcolabora    | 项目源码 | 文件源码
def paradasBiziCercanas(bot,update,longitud,latitud,estado,numposte):
    DISTANCIA = '350'#Distancia en metros desde la posición enviada, lo ponemos como string para evitarnos conversiones luego. Por contrato la más cercana tiene que estar como mucho a 300 metros, así que al poner 350 nos aseguramos de que haya al menos otra
    url='http://www.zaragoza.es/api/recurso/urbanismo-infraestructuras/estacion-bicicleta.json?rf=html&results_only=false&srsname=wgs84&point='+longitud+','+latitud+'&distance='+DISTANCIA
    try:
        h = urllib.request.urlopen(url)
    except Exception as e:
        bot.sendMessage(chat_id=update.message.chat_id, text='??<b>Error</b>??\nImposible contactar con el servicio del Ayuntamiento.', parse_mode='HTML')
    #BIZI
    jsonleidobizi = json.loads(str(h.read().decode('utf-8')))
    h.close()
    nElementosbizi = jsonleidobizi["totalCount"]
    textobizi = ''
    if nElementosbizi==0:
        textobizi='No hay estaciones BiZi a '+DISTANCIA+' metros de la ubicación\n\n'
    else:
        for i in range(nElementosbizi):
            if(jsonleidobizi["result"][i]["id"]!=numposte):#no enseñar la parada desde la que se invoca
                textobizi = textobizi + '/bizi '+ jsonleidobizi["result"][i]["id"] + '\n' + jsonleidobizi["result"][i]["title"] + '\n\n'
    bot.sendMessage(chat_id=update.message.chat_id, text='<b>'+estado+', quizás te interesen otras estaciones cercanas (a &lt'+DISTANCIA+'m):</b>\n'+textobizi, parse_mode='HTML', disable_web_page_preview=True)
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def get_api_raw(self, url):
        request = urllib.request.Request(self._api_url + url)
        try:
            result = urllib.request.urlopen(request)
        except urllib.error.HTTPError as e:
            self._error = "HTTP error"
            self._error_msg = str(e.code)
            self._update_ready = None 
        except urllib.error.URLError as e:
            self._error = "URL error, check internet connection"
            self._error_msg = str(e.reason)
            self._update_ready = None 
            return None
        else:
            result_string = result.read()
            result.close()
            return result_string.decode()
        # if we didn't get here, return or raise something else


    # result of all api calls, decoded into json format
项目:kaleidoscope    作者:blenderskool    | 项目源码 | 文件源码
def get_api_raw(self, url):
        request = urllib.request.Request(self._api_url + url)
        try:
            result = urllib.request.urlopen(request)
        except urllib.error.HTTPError as e:
            self._error = "HTTP error"
            if str(e.code) == '404':
                self._error_msg = "404 - repository not found, verify register settings"
            else:
                self._error_msg = "Response: "+str(e.code)
            self._update_ready = None 
        except urllib.error.URLError as e:
            self._error = "URL error, check internet connection"
            self._error_msg = str(e.reason)
            self._update_ready = None 
            return None
        else:
            result_string = result.read()
            result.close()
            return result_string.decode()
        # if we didn't get here, return or raise something else


    # result of all api calls, decoded into json format