我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.urlopen()。
def uploadFile(current_user): format = "%Y-%m-%dT%H:%M:%S" now = datetime.datetime.utcnow().strftime(format) try: file = request.files['file'] except: file = None try: url = request.form['url'] except: url = None if file and allowed_file(file.filename): filename = now + '_' +str(current_user) + '_' + file.filename filename = secure_filename(filename) file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) file_uploaded = True elif url: file = urllib.urlopen(url) filename = url.split('/')[-1] filename = now + '_' +str(current_user) + '_' + filename filename = secure_filename(filename) if file and allowed_file(filename): open(os.path.join(app.config['UPLOAD_FOLDER'], filename), 'wb').write(file.read()) file_uploaded = True else: filename = None file_uploaded = False return file_uploaded, filename
def decrypt(hash, tipo): global word try: if(tipo == 0): url = BeautifulSoup(urllib.urlopen("https://md5.gromweb.com/?md5=" + hash), "html.parser") else: url = BeautifulSoup(urllib.urlopen("https://sha1.gromweb.com/?hash=" + hash), "html.parser") password = url.find("em", {"class": "long-content string"}) password = re.sub(re.compile("<.*?>"), "", str(password)).strip() if str(password) == "None": print word+"\t\t\t\t[-] Senha nao encontrada! :-(" else: print word+"\t\t\t\t[+] Senha encontrada: " + password except IOError: decryptwl(hash, tipo)
def fetch_quote(symbols, timestamp, cached_file=None): url = URL % '+'.join(symbols) if not cached_file: # fetch log('Fetching %s' % url) fp = urllib.urlopen(url) try: data = fp.read() finally: fp.close() # log result if LOG_DATA_FETCHED: log_filename = LOG_FILENAME % timestamp.replace(':','-') out = open(log_filename, 'wb') try: log('Fetched %s bytes logged in %s' % (len(data), log_filename)) out.write(data) finally: out.close() else: data = open(cached_file,'rb').read() return StringIO(data)
def fetch_relative(url, proxies=None, postfetch=None): """postfetch is a callback that receives fetched data (as string)""" path = localize_path(url) path = normalize_path(path) if os.path.exists(path): if postfetch: logging.debug("reprocessing file %s" % path) f = open(path, "rb") data = f.read() f.close() postfetch(data) return False logging.debug("fetching %s" % url) f = urllib.urlopen(url, proxies=proxies) data = f.read() f.close() head, tail = os.path.split(path) if not os.path.exists(head): os.makedirs(head) f = open(path, "wb") f.write(data) f.close() if postfetch: postfetch(data) return True
def get_page(self,url): """ loads a webpage into a string """ page = '' try: f = urllib.urlopen(url=url) page = f.read() f.close() except IOError: print "Error opening {}".format(url) except httplib.InvalidURL, e: print "{} caused an Invalid URL error.".format(url) if hasattr(e, 'reason'): print 'We failed to reach a server.' print 'Reason: ', e.reason elif hasattr(e, 'code'): print 'The server couldn\'t fulfill the request.' print 'Error code: ', e.code return page
def getEntries(person): """ Fetch a Advogato member's diary and return a dictionary in the form { date : entry, ... } """ parser = DiaryParser() f = urllib.urlopen("http://www.advogato.org/person/%s/diary.xml" % urllib.quote(person)) s = f.read(8192) while s: parser.feed(s) s = f.read(8192) parser.close() result = {} for d, e in map(None, parser.dates, parser.entries): result[d] = e return result
def get_url(self, query): site1 = urllib.urlopen('http://www.youtube.com/results?search_query=%s'%query) html = site1.read() soup = BS(html) links = soup.findAll('a') vidlinks = [link.get('href') for link in links if link.get('href') is not None] vlink = [ i for i in vidlinks if '/watch?v=' in i][0] img_link = soup.findAll('img',{'alt':'Thumbnail', 'width':'185'})[0].get('src') img_url = 'http:%s' %img_link imagethread = threading.Thread(target=lambda:urllib.urlretrieve(img_url, 'Files\image.jpg')) imagethread.start() return vlink
def run(self): ind=self.qu.get() url=self.url+str(ind) soup =bs.BeautifulSoup(''.join( ul.urlopen(url).readlines() )) bu = up.urlsplit(self.url) print 'started with the ' ,str(url).split('/')[-1], for i in soup.find_all(attrs = { "class" : "recipe-title"}): sp = up.urlsplit(i.a.get('href')) path = sp.path print path if re.search(pat, path): path = bu.scheme+'://'+bu.netloc+path filename = str(path).split('/')[-2] filename = op.join(op.abspath(op.curdir),filename+'.py') # recipe will be stored in given location # filename = op.join(op.abspath(op.curdir),filename+'.html') #uncomment the above line if downloading the web page for teh recipe print path self.q.put((path,filename)) self.fetch_data() time.sleep(1) self.qu.task_done() self.q.join() print 'done with the ' ,str(url).split('/')[-1],
def bitcoind(self, method, params=[]): postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'}) while True: try: respdata = urllib.urlopen(self.bitcoind_url, postdata).read() except: print_log("cannot reach martexcoind...") self.wait_on_bitcoind() else: r = loads(respdata) if r['error'] is not None: if r['error'].get('code') == -28: print_log("martexcoind still warming up...") self.wait_on_bitcoind() continue raise BaseException(r['error']) break return r.get('result')
def UploadPDB(self, structure): """ Uploads a structure to the Server. Allowed input format: Bio.PDB Structure object, PDB formatted string Returns id for future services. """ if isinstance(structure, Entity): # SMCRA s = self._smcra_to_str(structure) elif isinstance(structure, str): # String s = structure else: raise ValueError('Unknown format. Use SMCRA object or string.') u = urllib.urlopen("http://www.cmbi.ru.nl/wiwsd/rest/UploadPDB", s) x = xml.dom.minidom.parse(u) id = x.getElementsByTagName("response")[0].childNodes[0].data return id
def download_archive(self, src, dst): for x in self.env.PACKAGE_REPO: url = '/'.join((x, src)) try: web = urlopen(url) try: if web.getcode() != 200: continue except AttributeError: pass except Exception: # on python3 urlopen throws an exception # python 2.3 does not have getcode and throws an exception to fail continue else: tmp = self.root.make_node(dst) tmp.write(web.read()) Logs.warn('Downloaded %s from %s' % (tmp.abspath(), url)) break else: self.fatal('Could not get the package %s' % src)
def GetFh(self,code):#?????? try: ret = urllib.urlopen("http://money.finance.sina.com.cn/corp/go.php/vISSUE_ShareBonus/stockid/" + code + ".phtml") soup = BeautifulSoup(Tools().smartCode(ret.read()), "html.parser") dict = {} for x in soup.find_all('tbody'): for e in str(x).split('_blank'): if "type=1" in e: td = re.findall(r'<td>(.+?)</td>', e) dict.update({td[0]: {u"????".encode('gbk', 'ignore').decode('gbk'): td[0], u"??".encode('gbk', 'ignore').decode('gbk'): td[1], u"??".encode('gbk', 'ignore').decode('gbk'): td[2], u"??".encode('gbk', 'ignore').decode('gbk'): td[3], u"??".encode('gbk', 'ignore').decode('gbk'): td[4], u"?????".encode('gbk', 'ignore').decode('gbk'): td[5], u"?????".encode('gbk', 'ignore').decode('gbk'): td[6], u"?????".encode('gbk', 'ignore').decode('gbk'): td[7] }}) return pandas.DataFrame.from_dict(dict, orient="index") except: return None
def run_wsgi(cls): if cls.wsgi_process != None: cls.make_browser() return cls.wsgi_process = Process(target=cls._run_wsgi) cls.wsgi_process.start() # Wait for it to come up success = False for i in range(10): try: if urllib.urlopen("http://localhost:%i/" % cls.port_num).getcode() == 200: success = True break except Exception: pass time.sleep(2) # Create a second app for routing etc cls.app = cls._make_app() # If we failed to run WSGI then clean-up if not success: cls.stop_wsgi() cls.wsgi_process = None raise Exception("Couldn't bring up WSGI server") cls.make_browser()
def deep(self): for depth in xrange(self.depth): print "*"*70+("\nScanning depth %d web\n" % (depth+1))+"*"*70 context_node = self.node[:] self.node = [] for self.url in context_node: self.links_found = 0 try: req = urlopen(self.url) res = req.read() self.feed(res) except: self.reset() print "*"*40 + "\nRESULTS\n" + "*"*40 sor = [(v,k) for (k,v) in self.db.items()] sor.sort(reverse = True) return sor
def ipcheck(proxy): try: pxhandle = urllib2.ProxyHandler({"http": proxy}) opener = urllib2.build_opener(pxhandle) urllib2.install_opener(opener) myip = urllib2.urlopen('http://www.whatismyip.com/automation/n09230945.asp').read() xs = re.findall(('\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}'), StripTags(myip)) if xs[0] == myipadress or myipadress == myip: trans_list.append(proxy) print proxy[:-1],"\t- ALIVE -", timer(), "- TRANSPARENT" elif xs == None: pass else: anon_list.append(proxy) print proxy[:-1],"\t- ALIVE -", timer(), "- EXT-iP :",xs[0] except KeyboardInterrupt: print "\n\nCTRL+C - check temporary proxylist file\n\n" sys.exit(0) except: pass
def get_pypi_src_download(package): url = 'https://pypi.python.org/pypi/%s/json'%(package,) fp = urllib.urlopen(url) try: try: data = fp.read() finally: fp.close() except urllib.error: raise RuntimeError("Cannot determine download link for %s"%(package,)) pkgdata = json.loads(data.decode('utf-8')) if 'urls' not in pkgdata: raise RuntimeError("Cannot determine download link for %s"%(package,)) for info in pkgdata['urls']: if info['packagetype'] == 'sdist' and info['url'].endswith('tar.gz'): return (info.get('md5_digest'), info['url']) raise RuntimeError("Cannot determine downlink link for %s"%(package,))
def _dopost(method, auth=False, **params): #uncomment to check you aren't killing the flickr server #print "***** do post %s" % method params = _prepare_params(params) url = '%s%s/?api_key=%s%s'% \ (HOST, API, API_KEY, _get_auth_url_suffix(method, auth, params)) # There's no reason this can't be str(urlencode(params)). I just wanted to # have it the same as the rest. payload = '%s' % (urlencode(params)) #another useful debug print statement if debug: print "_dopost url", url print "_dopost payload", payload return _get_data(minidom.parse(urlopen(url, payload)))
def main(url, port, apiKey): backlog = urllib.urlopen( "{}/api/{}/?cmd=backlog".format("http://" + url + ":" + port, apiKey) ) jsonBacklog = json.loads(backlog.read()) for tvshow in jsonBacklog['data']: indexerid = tvshow['indexerid'] episodes = tvshow['episodes'] for episode in episodes: season = episodio['season'] episodeNumber = episodio['episode'] urllib.urlopen( "{}/api/{}/?cmd=episode.search&indexerid={}" + \ "&season={}&episode={}".format( urlSickRage, apiKey, indexerid, season, episodeNumber, ) )
def ipkgCallback(self, event, param): if event == IpkgComponent.EVENT_DONE: if self.updating: self.updating = False self.ipkg.startCmd(IpkgComponent.CMD_UPGRADE_LIST) elif self.ipkg.currentCommand == IpkgComponent.CMD_UPGRADE_LIST: self.total_packages = len(self.ipkg.getFetchedList()) print ('[OnlineVersionCheck] %s Updates available' % self.total_packages) if self.total_packages: from urllib import urlopen import socket currentTimeoutDefault = socket.getdefaulttimeout() socket.setdefaulttimeout(3) config.softwareupdate.updatefound.setValue(True) try: config.softwareupdate.updateisunstable.setValue(urlopen("http://odisealinux.com/feeds/" + getImageVersion() + "/status").read()) except: config.softwareupdate.updateisunstable.setValue(1) socket.setdefaulttimeout(currentTimeoutDefault) else: config.softwareupdate.updatefound.setValue(False) else: config.softwareupdate.updatefound.setValue(False) pass
def get_domain(target): url = "https://www.robtex.net/?dns=" + str(target) + "&rev=1" html = urllib.urlopen(url).read() soup = BeautifulSoup(html, 'html.parser') table = soup.findAll("td") table = remove_tags(str(table)) data = table.split(",") for d in data: if len(d) > 10: d = d.replace(" ", "") d = d.replace("]","") if check_domain_mongodb(target, d): print "[INFO]" + str(d) + " in " + str(target) + " already insert ..." else: insert_mongodb(target, d) print colores.verde + "[INFO]" + str(d) + " in " + str(target) + " insert ..." + colores.normal
def get_ref_microbe_taxids(): """ Downloads the latest bacterial genome assembly summary from the NCBI genome ftp site and generate a list of taxids of the bacterial reference genomes. :return: """ import urllib import csv urlbase = 'ftp://ftp.ncbi.nlm.nih.gov' urlextension = '/genomes/refseq/bacteria/assembly_summary.txt' assembly = urllib.urlopen(urlbase + urlextension) datareader = csv.reader(assembly.read().splitlines(), delimiter="\t") taxid = [] for row in datareader: if row[4] == 'reference genome': taxid.append(row[5]) ts = get_timestamp() dump(taxid, "ref_microbe_taxids_{}.pyobj".format(ts)) return taxid
def GetDomainLogo(self): """Gets Domain Logo This function does not make use of the Google Apps Admin Settings API, it does an HTTP Get of a url specific to the Google Apps domain. It is included for completeness sake. Args: None Returns: binary image file""" import urllib url = 'http://www.google.com/a/cpanel/'+self.domain+'/images/logo.gif' response = urllib.urlopen(url) return response.read()
def Weather(req): if req.get("result").get("action") != "WeatherRequest": ## DEFINING THE PREFIX TO EXECUTE FUNCTION return {} city = req.get("result").get("parameters").get("geo-city").encode("utf8") clean = re.compile('ã') ## REMOVING ERROR UTF8 city = re.sub(clean, 'a', city) city = urllib.quote(city.encode("utf8")) ## REMOVING ACCENTUATION result = urllib.urlopen(WeatherRequest.format(cidade=city, key=WeatherKey)).read() ## DEFINING URL query = json.loads(result) ## LOADING JSON TO SELECT SOME PARAMETERS main = query.get('main') speech = lang.WeatherMSG.format(cidade=query.get('name'), temperatura=main.get('temp') + 3) return { "speech": speech, "displayText": speech, "source": source }
def check_omahaproxy(channel="stable"): version = 0 status_url = "http://omahaproxy.appspot.com/all?os=linux&channel=" + channel usock = urllib.urlopen(status_url) status_dump = usock.read() usock.close() status_list = StringIO.StringIO(status_dump) status_reader = list(csv.reader(status_list, delimiter=',')) linux_channels = [s for s in status_reader if "linux" in s] linux_channel = [s for s in linux_channels if channel in s] version = linux_channel[0][2] if version == 0: print 'I could not find the latest %s build. Bailing out.' % channel sys.exit(1) else: print 'Latest Chromium Version on %s at %s is %s' % (channel, status_url, version) return version
def myopen_http(method, url, values): if not url: raise ValueError("cannot submit, no URL provided") ## FIXME: should test that it's not a relative URL or something try: from urllib import urlencode, urlopen except ImportError: # Python 3 from urllib.request import urlopen from urllib.parse import urlencode if method == 'GET': if '?' in url: url += '&' else: url += '?' url += urlencode(values) data = None else: data = urlencode(values).encode('utf-8') return urlopen(url, data)
def access_page(url): content = None try: page = urlopen(url) content = page.read().decode('utf-8') except: print('Unable to open or decode content: %s' % url) print_tb() return content
def fetch_public_key(repo): """Download RSA public key Travis will use for this repo. Travis API docs: http://docs.travis-ci.com/api/#repository-keys """ keyurl = 'https://api.travis-ci.org/repos/{0}/key'.format(repo) data = json.loads(urlopen(keyurl).read().decode()) if 'key' not in data: errmsg = "Could not find public key for repo: {}.\n".format(repo) errmsg += "Have you already added your GitHub repo to Travis?" raise ValueError(errmsg) return data['key']
def generate(url): parts = ['''\ """ webencodings.labels ~~~~~~~~~~~~~~~~~~~ Map encoding labels to their name. :copyright: Copyright 2012 by Simon Sapin :license: BSD, see LICENSE for details. """ # XXX Do not edit! # This file is automatically generated by mklabels.py LABELS = { '''] labels = [ (repr(assert_lower(label)).lstrip('u'), repr(encoding['name']).lstrip('u')) for category in json.loads(urlopen(url).read().decode('ascii')) for encoding in category['encodings'] for label in encoding['labels']] max_len = max(len(label) for label, name in labels) parts.extend( ' %s:%s %s,\n' % (label, ' ' * (max_len - len(label)), name) for label, name in labels) parts.append('}') return ''.join(parts)
def get_global_usage (api_url, query_string_dict, title_list) : usage_dict_ = dict() usage_dict_["image"] = dict() usage_dict_["article"] = dict() raw_api_query_string = unicode(u'|'.join(title_list)).encode('utf-8') #print raw_api_query_string API_QUERY_STRING["titles"] = raw_api_query_string f = urlopen(API_BASE_URL, urlencode(API_QUERY_STRING)) response = f.read() response_dict = json.loads(response) for key, value in response_dict["query"]["pages"].iteritems(): if len(value[u'globalusage']) > 0: #print value found_dict = dict() for item in value[u'globalusage']: if (item[u'ns'] == u'0') or (item[u'ns'] == u'104'): if item[u'wiki'] in usage_dict_["article"]: usage_dict_["article"][item[u'wiki']] += 1 else: usage_dict_["article"][item[u'wiki']] = 1 found_dict[item[u'wiki']] = True for key, value in found_dict.iteritems(): if key in usage_dict_["image"]: usage_dict_["image"][key] += 1 else: usage_dict_["image"][key] = 1 #print usage_dict_ return usage_dict_
def GetHTTPFileContents( url ): fileContents = None try: filehandle = urllib.urlopen( url ) return filehandle.read() except: logging.warning("connection cannot be made to" + url) return
def prepare_input_source(source, base = ""): """This function takes an InputSource and an optional base URL and returns a fully resolved InputSource object ready for reading.""" if type(source) in _StringTypes: source = xmlreader.InputSource(source) elif hasattr(source, "read"): f = source source = xmlreader.InputSource() source.setByteStream(f) if hasattr(f, "name"): source.setSystemId(f.name) if source.getByteStream() is None: sysid = source.getSystemId() basehead = os.path.dirname(os.path.normpath(base)) sysidfilename = os.path.join(basehead, sysid) if os.path.isfile(sysidfilename): source.setSystemId(sysidfilename) f = open(sysidfilename, "rb") else: source.setSystemId(urlparse.urljoin(base, sysid)) f = urllib.urlopen(source.getSystemId()) source.setByteStream(f) return source
def download(file_url,local_filename): web_file = urllib.urlopen(file_url) local_file = open(local_filename, 'w') local_file.write(web_file.read()) web_file.close() local_file.close()
def check(): response = urlopen('https://raw.githubusercontent.com/D4Vinci/Dr0p1t-Framework/master/core/version.txt') version = response.read().decode('utf-8').strip() f = open( os.path.join("core","version.txt"), 'r') file_data = f.read().strip() if version != file_data: colored_print('\n[*] New Version available ! Visit: https://github.com/D4Vinci/Dr0p1t-Framework\n',"y") else: colored_print('[*] Your version is up-to-date ;)',"b")
def command_nick(current_buffer, args): pass # urllib.urlopen("https://%s/account/settings" % (domain)) # browser.select_form(nr=0) # browser.form['username'] = args # reply = browser.submit()
def get_html(url): """Get the html """ page = urllib.urlopen(url) html = page.read() return html