我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.request()。
def dorequest(url, data = "", method = 'GET'): try: if method == 'GET': response = urllib.request.urlopen(url, timeout=10).read() else: # use PUT/DELETE/POST, data should be encoded in ascii/bytes request = urllib.request.Request(url, data = data.encode('ascii'), method = method) response = urllib.request.urlopen(request, timeout=10).read() # etcd may return json result with response http error code # http error code will raise exception in urlopen # catch the HTTPError and get the json result except urllib.error.HTTPError as e: # e.fp must be read() in this except block. # the e will be deleted and e.fp will be closed after this block response = e.fp.read() # response is encoded in bytes. # recoded in utf-8 and loaded in json result = json.loads(str(response, encoding='utf-8')) return result # client to use etcd # not all APIs are implemented below. just implement what we want
def _validate_captcha(data): """ Validates a captcha with google's reCAPTCHA. Args: data: the posted form data """ settings = api.config.get_settings()["captcha"] post_data = urllib.parse.urlencode({ "secret": api.config.reCAPTCHA_private_key, "response": data["g-recaptcha-response"], "remoteip": flask.request.remote_addr }).encode("utf-8") request = urllib.request.Request(api.config.captcha_url, post_data, method='POST') response = urllib.request.urlopen(request).read().decode("utf-8") parsed_response = json.loads(response) return parsed_response['success'] == True
def create_request_object(rel_url, req_type, req_to, job_id): """ Creates request strings to use for batch_requests, based on rel_url type: can be used to determine the type of request when reading the response to: can be used to link certain attributes (like 'reactions') to the post they belong """ # print(rel_url) return { 'req_type': req_type, 'req_to': req_to, 'job_id': job_id, 'req': { "method": "GET", "relative_url": "{}".format(rel_url) } }
def create_post_request(self, post_id, job_id): """ Creates a request string for a post to use in batch_requests based on post_id Note: could add limit as well? """ return self.create_request_object(( '{}?fields={},{},{},{}').format( post_id, self.str_reactions_query(), self.str_comments_query(), self.str_sharedposts_query(), self.str_attachments_query()), req_type='post', req_to='', job_id=job_id) # @staticmethod # def encode_batch(batch): # """ # URL encodes the batch to prepare it for a graph API request # """ # _json = json.dumps(batch) # _url = urllib.parse.urlparse(_json) # return _url
def extend_token(self): """ Extends access token and replaces the previously used one Prints error message if API Key or API Secret not found TODO: Replace also config file once that file is defined TODO: Additional checks on the response """ if not self.api_key or not self.api_secret: logging.error('No API Key and/or API Secret defined') return None resp = self.request( req='oauth/access_token?grant_type=fb_exchange_token&client_id={}' '&client_secret={}&fb_exchange_token={}'.format( self.api_key, self.api_secret, self.access_token)) msg = json.loads(resp.read().decode('utf-8')) self.access_token = msg['access_token'] logging.info('Extended Access Token: \n%s', self.access_token) return self.access_token
def getJournalURL(jname): # get journal URL given the journal name for retrieving article PIIs urlstr = "http://api.elsevier.com/sitemap/page/sitemap/" + jname[0].lower() + ".html" retl = "" with urllib.request.urlopen(urlstr) as url: response = url.read() linkcnt = 0 for link in BeautifulSoup(response, parse_only=SoupStrainer("a")): if linkcnt == 0: linkcnt += 1 continue if link.has_attr("href"): if link.text.lower() == jname.lower(): #print(link["href"]) retl = link["href"] break linkcnt += 1 return retl
def __init__(self, username=None, password=None, version=None, debug=None, requesttype=None, baseurl=None, site=None): if username: self.username = username if password: self.password = password if version: self.version = version if debug: self.debug = debug if requesttype: self.requesttype = requesttype if baseurl: self.baseurl = baseurl if site: self.site = site ssl._create_default_https_context = ssl._create_unverified_context # This is the way to allow unverified SSL self.cj = http.cookiejar.CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPHandler(debuglevel=1 if self.debug else 0), urllib.request.HTTPSHandler(debuglevel=1 if self.debug else 0), urllib.request.HTTPCookieProcessor(self.cj)) opener.addheaders = [('User-agent', 'Mozilla/5.0')] urllib.request.install_opener(opener)
def __init__(self, username=None, password=None, debug=None, requesttype=None, baseurl=None): if username: self.username = username if password: self.password = password if debug: self.debug = debug if requesttype: self.requesttype = requesttype if baseurl: self.baseurl = baseurl ssl._create_default_https_context = ssl._create_unverified_context # This is the way to allow unverified SSL self.cj = http.cookiejar.CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPHandler(debuglevel=1 if self.debug else 0), urllib.request.HTTPSHandler(debuglevel=1 if self.debug else 0), urllib.request.HTTPCookieProcessor(self.cj)) opener.addheaders = [('User-agent', 'Mozilla/5.0')] urllib.request.install_opener(opener)
def request(self, url, data=None, headers=None, method='POST', baseurl = None): # req = None headers = headers or { 'Content-type': 'application/json', 'Referer': 'https://account.ubnt.com/login?redirect=https%3A%2F%2Funifi.ubnt.com', 'Origin': 'https://account.ubnt.com', 'dnt': 1 } if not baseurl: baseurl = self.baseurl self.log('Request to %s with data %s' % (baseurl + url, data)) if data: req = urllib.request.Request(url=baseurl + url, data=json.dumps(data).encode("utf8"), headers=headers, method=method) else: req = urllib.request.Request(url=baseurl + url, headers=headers, method='GET') return urllib.request.urlopen(req)
def get_api_raw(self, url): request = urllib.request.Request(self._api_url + url) try: result = urllib.request.urlopen(request) except urllib.error.HTTPError as e: self._error = "HTTP error" self._error_msg = str(e.code) self._update_ready = None except urllib.error.URLError as e: self._error = "URL error, check internet connection" self._error_msg = str(e.reason) self._update_ready = None return None else: result_string = result.read() result.close() return result_string.decode() # if we didn't get here, return or raise something else # result of all api calls, decoded into json format
def onSkeyAvailable(self, request=None ): """ New SKey got avaiable """ self.isRequesting = False try: skey = NetworkService.decode( request ) except: SecurityTokenProvider.errorCount += 1 self.isRequesting = False return if SecurityTokenProvider.errorCount>0: SecurityTokenProvider.errorCount = 0 self.isRequesting = False if not skey: return try: self.queue.put( (skey,time.time()), False ) except QFull: print( "Err: Queue FULL" )
def onFinished(self ): self.hasFinished = True if self.request.error()==self.request.NoError: self.requestSucceeded.emit( self ) else: try: errorDescr = NetworkErrorDescrs[ self.request.error() ] except: #Unknown error errorDescr = None if errorDescr: QtGui.QMessageBox.warning( None, "Networkrequest Failed", "The request to \"%s\" failed with: %s" % (self.url, errorDescr) ) self.requestFailed.emit( self, self.request.error() ) self.finished.emit( self ) self.logger.debug("Request finished: %s", str(self) ) self.logger.debug("Remaining requests: %s", len(NetworkService.currentRequests) )
def _get_movie_url_by_name(movie_name, year=None): query = SEARCH_URL.format(movie_name=_parse_name_for_search(movie_name)) request = urllib.request.Request(query, headers=_HEADERS) search_res = bs(urllib.request.urlopen(request), "html.parser") results = search_res.find_all("li", {"class": "result"}) correct_result = None for result in results: title = result.find_all( "h3", {"class": "product_title"})[0].contents[0].contents[0] title_match = title.strip().lower() == movie_name.strip().lower() if year is None and title_match: correct_result = result else: year_match = str(year) in str(result) if title_match and year_match: correct_result = result movie_url_suffix = correct_result.find_all("a")[0]['href'] return METACRITIC_URL + movie_url_suffix # === critics reviews page ===
def _get_critics_reviews_props(movie_url): critics_url = movie_url + CRITICS_REVIEWS_URL_SUFFIX critics_request = urllib.request.Request(critics_url, headers=_HEADERS) critics_page = bs(urllib.request.urlopen(critics_request), "html.parser") critics_props = {} critics_props['metascore'] = int(critics_page.find_all( "span", {"class": SCORE_CLASSES})[0].contents[0]) critic_reviews = [] for review in critics_page.find_all("div", {"class": "review"}): try: critic_reviews.append(_get_critic_review_props(review)) except Exception: continue critics_props['pro_critic_reviews'] = critic_reviews return critics_props # === user reviews page ===
def _get_user_reviews_from_page(users_page): review_elements = users_page.find_all("div", {"class": "review"}) user_reviews = [] for review in review_elements: try: user_reviews.append(_get_user_review_props(review)) except Exception: continue # print("Extracted {} reviews.".format(len(user_reviews))) nexts = users_page.find_all("a", {"class": "action", "rel": "next"}) if len(nexts) > 0: next_url = METACRITIC_URL + nexts[0]['href'] next_request = urllib.request.Request(next_url, headers=_HEADERS) next_page = bs(urllib.request.urlopen(next_request), "html.parser") user_reviews += _get_user_reviews_from_page(next_page) return user_reviews
def _get_user_reviews_props(movie_url): users_url = movie_url + USERS_REVIEWS_URL_SUFFIX users_request = urllib.request.Request(users_url, headers=_HEADERS) users_page = bs(urllib.request.urlopen(users_request), "html.parser") users_props = {} users_props['movie_name'] = users_page.find_all( "meta", {"property": "og:title"})[0]['content'] user_score = float(users_page.find_all( "span", {"class": USER_SCORE_CLASSES})[0].contents[0]) users_props['avg_user_score'] = user_score for rating in ['positive', 'mixed', 'negative']: users_props['{}_rating_frequency'.format( rating)] = _get_user_rating_freq(users_page, rating) users_props['user_reviews'] = _get_user_reviews_from_page(users_page) return users_props # === metacritic crawling ===
def soupit(j,genre): try: url = "https://www.wikiart.org/en/paintings-by-genre/"+ genre+ "/" + str(j) html = urllib.request.urlopen(url) soup = BeautifulSoup(html) found = False urls = [] for i in str(soup.findAll()).split(): if i == 'data': found = True if found == True: if '}];' in i: break; if 'https' in i: web = "http" + i[6:-2] urls.append(web) j = j+1 return urls except Exception as e: print('Failed to find the following genre page combo: '+genre+str(j)) #Given a url for an image, we download and save the image while also recovering information about the painting in the saved name depending on the length of the file.split('/') information (which corresponds to how much information is available)
def files(self): if not self._files: path = '/ajax_details_filelist.php' url = self.url.path(path).query_param('id', self.id) request = urllib.request.Request( url, headers={'User-Agent': "Magic Browser"}) response = urllib.request.urlopen(request).read() root = html.document_fromstring(response) rows = root.findall('.//tr') if len(rows) == 1 and rows[0].find('td').get('colspan') == str(2): self._files = {} else: for row in rows: name, size = [unicode(v.text_content()) for v in row.findall('.//td')] self._files[name] = size.replace('\xa0', ' ') return self._files
def connect(username, password): global token, userid, files token = None userid = None files = None token_req = urllib.request.Request(base_url + token_url % (urllib.parse.quote(username, safe=""), urllib.parse.quote(password, safe=""))) with urllib.request.urlopen(token_req) as response: result = json.loads(response.readall().decode("utf-8")) if "errorcode" in result: raise Exception(result["errorcode"]) token = result["token"] siteinfo = call_wsfunction("moodle_webservice_get_siteinfo") userid = siteinfo["userid"] try: os.makedirs(download_folder) except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(download_folder): pass else: raise
def textanalyze(self,index_name, analyzer, text): # Create JSON string for request body reqobject={} reqobject['text'] = text reqobject['analyzer'] = analyzer io=StringIO() json.dump(reqobject, io) req_body = io.getvalue() # HTTP request to Azure search REST API conn = httplib.HTTPSConnection(self.__api_url) conn.request("POST", u"/indexes/{0}/analyze?api-version={1}".format(index_name, _AZURE_SEARCH_API_VERSION), req_body, self.headers) response = conn.getresponse() #print "status:", response.status, response.reason data = (response.read()).decode('utf-8') #print("data:{}".format(data)) conn.close() return data
def _do_pip_check(): request = urllib.request.Request('https://api.github.com/repos/UAVCAN/gui_tool/tags', headers={ 'Accept': 'application/vnd.github.v3+json', }) with urllib.request.urlopen(request) as response: data = response.read() data = json.loads(data.decode('utf8'), encoding='utf8') newest_tag_name = data[0]['name'] logger.debug('Newest tag: %r', newest_tag_name) match = re.match(r'^.*?(\d{1,3})\.(\d{1,3})', newest_tag_name) version_tuple = int(match.group(1)), int(match.group(2)) logger.debug('Parsed version tuple: %r', version_tuple) if _version_tuple_to_int(version_tuple) > _version_tuple_to_int(__version__): git_url = 'https://github.com/UAVCAN/gui_tool' return 'pip3 install --upgrade git+<a href="{0}">{0}</a>@{1}'.format(git_url, newest_tag_name) # noinspection PyBroadException
def put_status(self): host = os.getenv('HOST') url = "https://{host}/api/task/{id}".format(host=host, id=self.request.id) payload = { 'status': self.status, 'steps': self.steps, 'file_name': self.zim_file_name, 'time_stamp': { 'started': self.start_time, 'ended': self.ended_time } } headers = { 'Content-Type': 'application/json; charset=utf-8', 'token': self.token } request = urllib.request.Request(url, json.dumps(payload, cls=JSONEncoder).encode('utf-8'), headers, method='PUT') try: with urllib.request.urlopen(request) as response: code = response.code except HTTPError as error: code = error.code
def fixUrl(destinationPort, transport, url, peerType): """ fixes the URL (original request string) """ transportProtocol = "" if transport.lower() in "udp" or transport.lower() in "tcp": transportProtocol="/"+transport if ("honeytrap" in peerType): return "Attack on port " + str(destinationPort) + transportProtocol # prepared dionaea to output additional information in ticker if ("Dionaea" in peerType): return "Attack on port " + str(destinationPort)+ transportProtocol return url
def _call_ACIS(self, kwargs, **moreKwargs): ''' Core method for calling the ACIS services. Returns python dictionary by de-serializing json response ''' #self._formatInputDict(**kwargs) kwargs.update(moreKwargs) self._input_dict = self._stripNoneValues(kwargs) self.url = self.baseURL + self.webServiceSource if pyVersion == 2: #python 2.x params = urllib.urlencode({'params':json.dumps(self._input_dict)}) request = urllib2.Request(self.url, params, {'Accept':'application/json'}) response = urllib2.urlopen(request) jsonData = response.read() elif pyVersion == 3: #python 3.x params = urllib.parse.urlencode({'params':json.dumps(self._input_dict)}) params = params.encode('utf-8') req = urllib.request.urlopen(self.url, data = params) jsonData = req.read().decode() return json.loads(jsonData)
def _fetch_img_urls(self, keyword, safe_search=False): # bing img search, https://gist.github.com/stephenhouser/c5e2b921c3770ed47eb3b75efbc94799 url = self._get_bing_url(keyword, safe_search=safe_search) self.logger.debug('search url {}'.format(url)) header = { 'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/43.0.2357.134 Safari/537.36"} soup = BeautifulSoup(urllib.request.urlopen(urllib.request.Request(url, headers=header)), 'html.parser') imgs = [] # contains the link for Large original images, type of image for a in soup.find_all("a", {"class": "iusc"}): mad = json.loads(a["mad"]) turl = mad["turl"] m = json.loads(a["m"]) murl = m["murl"] image_name = urllib.parse.urlsplit(murl).path.split("/")[-1] imgs.append((image_name, turl, murl)) return imgs
def search(query): request = { 'order': 'desc', 'sort': 'relevance', 'q': query, 'answers': 1, 'site': 'stackoverflow', 'filter': 'withbody', 'pagesize': 1 } response = send_request("search/advanced", request) question = response["items"][0] answer = get_answer(question) return (question["link"] + "\n\n" + "<b>" + question["title"] + "</b>\n\n" + format_user_data(question) + "\n\n" + "<b>Answer:</b>\n\n" + format_user_data(answer) +"\n\n" + "? StackOverflow users, CC-BY-SA 3.0")
def getDanmu(cid): if not cid: return "?????" try: cid_url = "http://comment.bilibili.com/%s.xml" % cid danmu_xml = urllib.request.urlopen(cid_url).read() xml = zlib.decompressobj(-zlib.MAX_WBITS).decompress(danmu_xml).decode() # ???????? return xml # ????? except Exception: pass # ??xml??????????????
def camouflageBrowser(url): myHeaders = [ "Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.111 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:37.0) Gecko/20100101 Firefox/37.0", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:30.0) Gecko/20100101 Firefox/30.0", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.11 TaoBrowser/3.5 Safari/536.11" ] # ????? try: content = urllib.request.urlopen(url).read() return content except Exception: pass # ?P??????
def _validate_captcha(data): """ Validates a captcha with google's reCAPTCHA. Args: data: the posted form data """ post_data = urllib.parse.urlencode({ "secret": api.config.reCAPTCHA_private_key, "response": data["g-recaptcha-response"], "remoteip": flask.request.remote_addr }).encode("utf-8") request = urllib.request.Request(api.config.captcha_url, post_data, method='POST') response = urllib.request.urlopen(request).read().decode("utf-8") parsed_response = json.loads(response) return parsed_response['success'] == True
def force_update_db(): dbdir = '/var/tmp/supotato' dbpath = '/var/tmp/supotato/supotato.db' if os.path.exists(dbpath): os.remove(dbpath) if not os.path.exists(dbdir): os.mkdir(dbdir) import urllib.request url = "https://everettjf.github.io/app/supotato/supotato.db" print('CocoaPods Index Database not exist , will download from ' + url) print('Please wait for a moment (about 5 MB file will be downloaded)') # Download the file from `url` and save it locally under `file_name`: with urllib.request.urlopen(url) as response, open(dbpath, 'wb') as out_file: data = response.read() # a `bytes` object out_file.write(data) print('Download completed') print('Update succeed')
def OnClicked(self): r = sr.Recognizer() with sr.Microphone() as source: speak.say('Hey I am Listening ') speak.runAndWait() audio = r.listen(source) try: put=r.recognize_google(audio) self.displayText(put) self.textBox.insert('1.2',put) self.textBox.delete('1.2',tk.END) events(self,put) except sr.UnknownValueError: self.displayText("Could not understand audio") except sr.RequestError as e: self.displayText("Could not request results; {0}".format(e))
def OnClicked(self): r = sr.Recognizer() with sr.Microphone() as source: speak.say('Hey I am Listening ') speak.runAndWait() audio = r.listen(source) try: put=r.recognize_google(audio) self.displayText(put) self.textBox.insert('1.2',put) put=put.lower() put = put.strip() #put = re.sub(r'[?|$|.|!]', r'', put) link=put.split() events(self,put,link) except sr.UnknownValueError: self.displayText("Could not understand audio") except sr.RequestError as e: self.displayText("Could not request results; {0}".format(e))
def setUp(self): # the URLs for now which will have the WSDL files and the XSD file #urlparse.urljoin('file:', urllib.pathname2url(os.path.abspath("service.xml"))) import urllib import os from urllib.parse import urlparse from urllib.request import pathname2url query_services_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-query-1.7.wsdl'))) userservices_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-auth-1.7.wsdl'))) # initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file query_services_client = Client(query_services_url, transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt')) user_services_client = Client(userservices_url, transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt')) self.test_user_services_object = SymantecUserServices(user_services_client)
def test_poll_in_Push(self, mock): reply = {"requestId": "ac123", "status": "6040", "statusMessage": "Mobile push request sent", "pushDetail": {"pushCredentialId": "133709001", "pushSent": True}, "transactionId": "RealTransactionId", "authContext": {"params": {"Key": "authLevel.level", "Value": 10}}} mock.SymantecServices.authenticateUserWithPushThenPolling.return_value = Mock() mock.authenticateUserWithPushThenPolling.return_value.hash.return_value = reply response = symantec_package.lib.allServices.SymantecServices.authenticateUserWithNothing() self.assertTrue(response.hash() != reply) response = symantec_package.lib.allServices.SymantecServices.authenticateUserWithPushThenPolling("Parameters Here!") self.assertTrue((response.hash()) == reply) self.assertTrue(response.hash()["status"] == "6040") self.assertTrue(response.hash()['requestId'] == "ac123") self.assertTrue(response.hash()['statusMessage'] == "Mobile push request sent") self.assertTrue(response.hash()["pushDetail"]['pushCredentialId'] == "133709001") self.assertTrue(response.hash()["pushDetail"]['pushSent'] is True) self.assertTrue(response.hash()['transactionId'] == "RealTransactionId") self.assertTrue(response.hash()['authContext']['params']['Key'] == "authLevel.level") self.assertTrue(response.hash()['authContext']['params']['Value'] == 10) pass
def setUp(self): # the URLs for now which will have the WSDL files and the XSD file import urllib import os from urllib.parse import urlparse from urllib.request import pathname2url managementservices_url = urllib.parse.urljoin('file:', pathname2url( os.path.abspath('../wsdl_files/vipuserservices-mgmt-1.7.wsdl'))) # managementservices_url = 'http://webdev.cse.msu.edu/~huynhall/vipuserservices-mgmt-1.7.wsdl' # initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file self.management_client = Client(managementservices_url, transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt')) self.test_management_services_object = SymantecManagementServices(self.management_client) pass
def test_mock_create_user(self, mock_managementservices): reply = {'requestId': 'create_123', 'status': '0000', 'statusMessage': 'Success'} # Configure the mock to return a response with an OK status code. Also, the mock should have # a `json()` method that returns a list of todos. mock_managementservices.createUser.return_value = Mock() mock_managementservices.createUser.return_value.json.return_value = reply # Call the service, which will send a request to the server. response = symantec_package.lib.managementService.SymantecManagementServices.createUser("create_123", "new_user3") print(response.json()) # If the request is sent successfully, then I expect a response to be returned. self.assertTrue((response.json()) == reply) self.assertTrue((response.json()["status"] == "0000")) pass
def test_mock_delete_user(self, mock_managementservices): reply = {'requestId': 'delete_123', 'status': '0000', 'statusMessage': 'Success'} # Configure the mock to return a response with an OK status code. Also, the mock should have # a `json()` method that returns a list of todos. mock_managementservices.deleteUser.return_value = Mock() mock_managementservices.deleteUser.return_value.json.return_value = reply # Call the service, which will send a request to the server. response = symantec_package.lib.managementService.SymantecManagementServices.deleteUser("delete_123", "new_user3") print(response.json()) # If the request is sent successfully, then I expect a response to be returned. self.assertTrue((response.json()) == reply) self.assertTrue((response.json()["status"] == "0000")) pass
def test_mock_add_STANDARDOTP_credential(self, mock_managementservices): reply = {'statusMessage': "Success", 'requestId': 'add_otp_cred', 'status': '0000'} # Configure the mock to return a response with an OK status code. Also, the mock should have # a `json()` method that returns a list of todos. mock_managementservices.addCredentialOtp.return_value = Mock() mock_managementservices.addCredentialOtp.return_value.json.return_value = reply response = symantec_package.lib.managementService.SymantecManagementServices.addCredentialOtp("add_otp_cred", "new_user3", "", "STANDARD_OTP", \ "678066") # change with what's on your device print(response.json()) # If the request is sent successfully, then I expect a response to be returned. self.assertTrue((response.json()) == reply) self.assertTrue((response.json()["status"] == "0000")) pass
def test_mock_update_STANDARDOTP_credential(self, mock_managementservices): reply = {'statusMessage': 'Success', 'requestId': 'update_123', 'status': '0000'} # Configure the mock to return a response with an OK status code. Also, the mock should have # a `json()` method that returns a list of todos. mock_managementservices.updateCredential.return_value = Mock() mock_managementservices.updateCredential.return_value.json.return_value = reply response = symantec_package.lib.managementService.SymantecManagementServices.updateCredential("update_123", "gabe_phone", "", "STANDARD_OTP", "My personal cell phone") print(response.json()) # If the request is sent successfully, then I expect a response to be returned. self.assertTrue((response.json()) == reply) self.assertTrue((response.json()["status"] == "0000")) pass
def test_mock_setTemporaryPasswordSMSDelivery(self, mock_managementservices): reply = {'status': '0000', 'requestId': 'setTempPWD', 'statusMessage': 'Success', 'temporaryPassword': '998241'} # Configure the mock to return a response with an OK status code. Also, the mock should have # a `json()` method that returns a list of todos. mock_managementservices.setTemporaryPasswordSMSDelivery.return_value = Mock() mock_managementservices.setTemporaryPasswordSMSDelivery.return_value.json.return_value = reply response = symantec_package.lib.managementService.SymantecManagementServices.setTemporaryPasswordSMSDelivery("setTempPWD", "gabe_phone", "12313608781", "17879481605") print(response.json()) # If the request is sent successfully, then I expect a response to be returned. self.assertTrue((response.json()) == reply) self.assertTrue((response.json()["status"] == "0000")) pass
def sendMessage(self, data, method, url): conn = http.client.HTTPConnection(Parameters["Address"] + ":1400") headers = {"Content-Type": 'text/xml; charset="utf-8"', "SOAPACTION": method} conn.request("POST", url, data, headers) response = conn.getresponse() conn.close() if response.status == 200: data = response.read().decode("utf-8") LogMessage(str(data)) self.parseMessage(data) else: Domoticz.Error("Unexpected response status received in function sendMessage (" + str(response.status) + ", " + str(response.reason) + "). \ The following command is sent: " + str(method) + ", " + str(url)) return # Process message from Sonos
def getRecognitionResult(self, apiName, seqValue): query = self.getBasicQueryString(apiName, seqValue) + "&stop=1" '''Request speech recognition service by HTTP GET''' url = str(self.apiBaseUrl) + "?" + str(query) req = urllib.request.Request(url,headers = {'Cookie': self.cookies}) with urllib.request.urlopen(req) as f: getResponse = f.read().decode() '''Now you can check the status here.''' print("Sending 'GET' request to URL : " + self.apiBaseUrl) print("get parameters : " + str(query)) print("Response Code : " + str(f.getcode())) '''Get the response''' return str(getResponse)
def load_url(url, timeout): # Build URL query to email signup page urlquery = "http://" + url + "/m-users-a-email_list-job-add-email-" + targetEmail + "-source-2.htm" print_out(Style.BRIGHT + Fore.WHITE + "Sending request to: " + url) # Build the request req = urllib.request.Request( urlquery, data=None, headers={ 'User-Agent': random.choice(useragents), 'Host': url } ) # Send try: f = urllib.request.urlopen(req) print_out(Style.BRIGHT + Fore.GREEN + "Successfully sent!") f.close() except urllib.error.URLError as e: print_out(Style.BRIGHT + Fore.RED + e.reason)
def retry(max_retries): """ Retry a function `max_retries` times. taken from https://stackoverflow.com/questions/23892210/python-catch-timeout-and-repeat-request. """ def retry(func): @wraps(func) def wrapper(*args, **kwargs): num_retries = 0 while num_retries <= max_retries: try: ret = func(*args, **kwargs) break except HTTPError: if num_retries == max_retries: raise num_retries += 1 time.sleep(1) return ret return wrapper return retry
def execute_get(self, operation, headers={}): url = self._host + operation.get_url() headers['Content-Type'] = 'application/json;charset=utf-8' request = urllib.request.Request(url, headers=headers) try: response = self._opener.open(request) except urllib.error.HTTPError as error: error_body = error.read().decode('utf-8') new_error_string = ('HTTP error ' + str(error.code) + ' ' + error.reason + ': ' + error_body) raise ConnectionError(new_error_string) return response.read().decode('utf-8')
def is_operation_supported(self, operation=None, headers={}): url = self._host + '/graph/operations/' + operation.get_operation() headers['Content-Type'] = 'application/json;charset=utf-8' request = urllib.request.Request(url, headers=headers) try: response = self._opener.open(request) except urllib.error.HTTPError as error: error_body = error.read().decode('utf-8') new_error_string = ('HTTP error ' + str(error.code) + ' ' + error.reason + ': ' + error_body) raise ConnectionError(new_error_string) response_text = response.read().decode('utf-8') return response_text
def paradasBiziCercanas(bot,update,longitud,latitud,estado,numposte): DISTANCIA = '350'#Distancia en metros desde la posición enviada, lo ponemos como string para evitarnos conversiones luego. Por contrato la más cercana tiene que estar como mucho a 300 metros, así que al poner 350 nos aseguramos de que haya al menos otra url='http://www.zaragoza.es/api/recurso/urbanismo-infraestructuras/estacion-bicicleta.json?rf=html&results_only=false&srsname=wgs84&point='+longitud+','+latitud+'&distance='+DISTANCIA try: h = urllib.request.urlopen(url) except Exception as e: bot.sendMessage(chat_id=update.message.chat_id, text='??<b>Error</b>??\nImposible contactar con el servicio del Ayuntamiento.', parse_mode='HTML') #BIZI jsonleidobizi = json.loads(str(h.read().decode('utf-8'))) h.close() nElementosbizi = jsonleidobizi["totalCount"] textobizi = '' if nElementosbizi==0: textobizi='No hay estaciones BiZi a '+DISTANCIA+' metros de la ubicación\n\n' else: for i in range(nElementosbizi): if(jsonleidobizi["result"][i]["id"]!=numposte):#no enseñar la parada desde la que se invoca textobizi = textobizi + '/bizi '+ jsonleidobizi["result"][i]["id"] + '\n' + jsonleidobizi["result"][i]["title"] + '\n\n' bot.sendMessage(chat_id=update.message.chat_id, text='<b>'+estado+', quizás te interesen otras estaciones cercanas (a <'+DISTANCIA+'m):</b>\n'+textobizi, parse_mode='HTML', disable_web_page_preview=True)
def get_api_raw(self, url): request = urllib.request.Request(self._api_url + url) try: result = urllib.request.urlopen(request) except urllib.error.HTTPError as e: self._error = "HTTP error" if str(e.code) == '404': self._error_msg = "404 - repository not found, verify register settings" else: self._error_msg = "Response: "+str(e.code) self._update_ready = None except urllib.error.URLError as e: self._error = "URL error, check internet connection" self._error_msg = str(e.reason) self._update_ready = None return None else: result_string = result.read() result.close() return result_string.decode() # if we didn't get here, return or raise something else # result of all api calls, decoded into json format