我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib2.urlopen()。
def add_user_devices(self, serial): # (url, access_token, api_token) = self.get_api_conf() api_url = self.url + "/api/v1/user/devices" token = self.access_token + " " + self.api_token data = {'serial': serial} request = urllib2.Request(api_url, json.dumps(data)) request.add_header('Authorization', token) request.add_header('Content-Type', 'application/json') try: urllib2.urlopen(request) except Exception, e: print e.code print e.read() # ?????????
def download_file_insecure(url, target): """ Use Python to download the file, even though it cannot authenticate the connection. """ try: from urllib.request import urlopen except ImportError: from urllib2 import urlopen src = dst = None try: src = urlopen(url) # Read/write all in one block, so we don't create a corrupt file # if the download is interrupted. data = src.read() dst = open(target, "wb") dst.write(data) finally: if src: src.close() if dst: dst.close()
def remove_devices_user(self, device_list): # (url, access_token, api_token) = self.get_api_conf("conf/stf.conf", "renguoliang") for device in device_list: serial = device["serial"] api_url = self.url + "/api/v1/user/devices/%s" % serial print api_url token = self.access_token + " " + self.api_token request = urllib2.Request(api_url) request.add_header('Authorization', token) request.get_method = lambda: 'DELETE' try: urllib2.urlopen(request) except Exception, e: print e.code print e.read() # ?????????
def get(self, url, proxy=None): if proxy: proxy = urllib2.ProxyHandler({'http': proxy}) opener = urllib2.build_opener(proxy) urllib2.install_opener(opener) try: response = urllib2.urlopen(url) except HTTPError, e: resp = e.read() self.status_code = e.code except URLError, e: resp = e.read() self.status_code = e.code else: self.status_code = response.code resp = response.read() return resp
def run(self): data = self.getData() value = { data: { "type": self.data_type } } json_data = json.dumps(value) post_data = json_data.encode('utf-8') headers = {'Content-Type': 'application/json'} try: request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers) response = urllib2.urlopen(request) report = json.loads(response.read()) self.report(report) except urllib2.HTTPError: self.error("Hippocampe: " + str(sys.exc_info()[1])) except urllib2.URLError: self.error("Hippocampe: service is not available") except Exception as e: self.unexpectedError(e)
def getMessagePayload(self): self.logger.debug("Preparing client->device message payload") salon = -127 try: salon = read_temp() except Exception as e: self.logger.error("error reading local temp") self.logger.exception(e) piwnica = -127 relay = 0 try: os.system("sudo ifconfig eth0 192.168.1.101 netmask 255.255.255.0") txt = urllib2.urlopen(relay1_addr).read() lines = string.split(txt, '\n') piwnica = float(lines[1]) relay = int(lines[0]) except Exception as e: self.logger.error("error reading data from {0}".format(relay1_addr)) self.logger.exception(e) payloadDict = {"values":{}} payloadDict["values"]["relay"] = relay if salon > -127: payloadDict["values"]["salon"] = salon if piwnica > -127: payloadDict["values"]["piwnica"] = piwnica payload = json.dumps(payloadDict) return payload
def downloadFilesSave(links, fileFormat): # main function if (links == 'EMPTY'): # if links list is empty return ' NO LINKS FOUND !' for link in links: name = random.randint(0, 10000001) if (name in os.listdir(os.getcwd())): # random name to files name = random.randint(0, 10000001) if (format not in ['zip', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'svg', 'gif']): try: saveFile=open(str(name)+'.' + fileFormat, 'w') saveFile.write(urllib2.urlopen(link).read()) saveFile.close() except urllib2.URLError: pass else: try: saveFile=open(str(name)+'.' + fileFormat, 'wb') saveFile.write(urllib2.urlopen(link).read()) saveFile.close() except urllib2.URLError: pass return ' {} DOWNLOADS SUCCESSFULL YET !'.format(len(os.listdir(os.getcwd())))
def get_system_status(): session_attributes = {} card_title = "BART System Status" reprompt_text = "" should_end_session = False response = urllib2.urlopen(API_BASE + "/status") bart_system_status = json.load(response) speech_output = "There are currently " + bart_system_status["traincount"] + " trains operating. " if len(bart_system_status["message"]) > 0: speech_output += bart_system_status["message"] else: speech_output += "The trains are running normally." return build_response(session_attributes, build_speechlet_response( card_title, speech_output, reprompt_text, should_end_session))
def test_start_object(self): server = PJFServer(configuration=PJFConfiguration(Namespace(ports={"servers": {"HTTP_PORT": 8080, "HTTPS_PORT": 8443}}, html=False, level=6, command=["radamsa"], stdin=True, json={"a": "test"}, indent=True, strong_fuzz=False, url_encode=False, parameters=[], notify=False, debug=False, content_type="text/plain", utf8=False, nologo=True))) server.run() json_http = urllib2.urlopen("http://127.0.0.1:8080").read() try: import requests requests.packages.urllib3.disable_warnings() json_https = requests.get('https://127.0.0.1:8443', verify=False).content self.assertTrue(json_https) except ImportError: pass self.assertTrue(json_http) server.stop()
def getRelsFromURIMSinWARC(warc): urims = getURIMsFromTimeMapInWARC(warc) startReplay(warc) # Get Link header values for each memento linkHeaders = [] for urim in urims: linkHeaders.append(urllib2.urlopen(urim).info().getheader('Link')) stopReplay() relsForURIMs = [] for linkHeader in linkHeaders: relForURIM = ipwbTest.extractRelationEntriesFromLinkTimeMap(linkHeader) relsForURIMs.append(relForURIM) stopReplay() return relsForURIMs
def send_result(email, result, title, urn): """ Args: email (str): address to send the results result (obj): results to send title (str): urn (str): uniform resource name Returns: str: response from endpoint """ url = 'https://mongoaud.it/results' headers = {'Content-type': 'application/json', 'Accept': 'application/json'} values = {'email': email, 'result': result, 'title': title, 'urn': urn, 'date': get_date()} try: req = urllib2.Request(url, json.dumps(values), headers) response = urllib2.urlopen(req) return response.read() except (urllib2.HTTPError, urllib2.URLError) as exc: return "Sadly enough, we are having technical difficulties at the moment, " \ "please try again later.\n\n%s" % str(exc)
def check_version(version): # if application is binary then check for latest version if getattr(sys, 'frozen', False): try: url = "https://api.github.com/repos/stampery/mongoaudit/releases/latest" req = urllib2.urlopen(url) releases = json.loads(req.read()) latest = releases["tag_name"] if version < latest: print("mongoaudit version " + version) print("There's a new version " + latest) _upgrade(releases) except (urllib2.HTTPError, urllib2.URLError): print("Couldn't check for upgrades") except os.error: print("Couldn't write mongoaudit binary")
def download_lyrics(artist, url): print url time.sleep(random() + 2) page = urllib2.urlopen(url).read() soup = BeautifulSoup(page, 'html.parser') # Get the song title song_title = soup.find('title').get_text().split(' - ')[1].lower().replace('/', ' ').replace(' ', '_') # Get the lyrics div lyrics = soup.findAll('div', {'class': ''}) for i in lyrics: lyrics = i.get_text().strip() if len(lyrics) > 10: with open('artists/' + artist + '/' + song_title + '.txt', 'wb') as w: cleaned_lyrics = lyrics.replace('\r\n', ' *BREAK* ').replace('\n', ' *BREAK* ').replace(' ', ' ') w.write(cleaned_lyrics.encode('utf-8'))
def download_songs(url): time.sleep(random.random() * 0.5) try: page = urllib2.urlopen(url).read() soup = BeautifulSoup(page, 'html.parser') # Get the artist name artist_name = soup.findAll('h1')[0].get_text()[:-7].lower().replace(' ', '_') # Store all songs for a given artist with open('artist_data/'+artist_name+'.txt', 'wb') as w: for song in soup.findAll('a', {'target': '_blank'}): if 'lyrics/' in song['href']: song_url = song['href'][1:].strip() w.write(song_url + '\n') except urllib2.HTTPError: print '404 not found'
def TestSite(url): protocheck(url) print "Trying: " + url try: urllib2.urlopen(url, timeout=3) except urllib2.HTTPError, e: if e.code == 405: print url + " found!" print "Now the brute force will begin! >:)" if e.code == 404: printout(str(e), YELLOW) print " - XMLRPC has been moved, removed, or blocked" sys.exit() except urllib2.URLError, g: printout("Could not identify XMLRPC. Please verify the domain.\n", YELLOW) sys.exit() except socket.timeout as e: print type(e) printout("The socket timed out, try it again.", YELLOW) sys.exit()
def paste(self): """Create a paste and return the paste id.""" data = json.dumps({ 'description': 'Werkzeug Internal Server Error', 'public': False, 'files': { 'traceback.txt': { 'content': self.plaintext } } }).encode('utf-8') try: from urllib2 import urlopen except ImportError: from urllib.request import urlopen rv = urlopen('https://api.github.com/gists', data=data) resp = json.loads(rv.read().decode('utf-8')) rv.close() return { 'url': resp['html_url'], 'id': resp['id'] }
def update(self, docs, commitwithin=None): """Post list of docs to Solr, return URL and status. Opptionall tell Solr to "commitwithin" that many milliseconds.""" url = self.url + '/update' add_xml = etree.Element('add') if commitwithin is not None: add_xml.set('commitWithin', str(commitwithin)) for doc in docs: xdoc = etree.SubElement(add_xml, 'doc') for key, value in doc.iteritems(): if value: field = etree.Element('field', name=key) field.text = (value if isinstance(value, unicode) else str(value)) xdoc.append(field) request = urllib2.Request(url) request.add_header('Content-Type', 'text/xml; charset=utf-8') request.add_data(etree.tostring(add_xml, pretty_print=True)) response = urllib2.urlopen(request).read() status = etree.XML(response).findtext('lst/int') return url, status
def post(self): site = GetSite() browser = detect(self.request) member = CheckAuth(self) l10n = GetMessages(self, member, site) if member: image = self.request.argument['image'][0] if image is not None: import urllib, urllib2 parameters = urllib.urlencode(dict(member_id=member.num, image=image)) try: f = urllib2.urlopen('http://daydream/upload', parameters) data = f.read() f.close() except: self.session = Session() self.session['message'] = '?????? 1M' self.redirect('/images') else: self.redirect('/signin')
def _html_link_return(self, url, tag, key, value, deeper=False, second=False): """ Returns links :param url: URL to filter :param key: Name of key to search in tag :param tag: Name of value to find :param value: Name of the value expected in tag """ if url[0] == '/': url = '{0}{1}'.format(self.url, url) r = urllib2.Request(url) response = urllib2.urlopen(r) soup = BeautifulSoup(response, 'html.parser') matches = soup.findAll(tag, {key, value}) if deeper: m = matches[0] matches = m.findAll('a')[0]['href'] elif second: m = matches[0] matches = m.findAll('a')[1]['href'] print m.findAll('a') else: matches = matches[0]['href'] return '{0}{1}'.format(self.url, matches)
def execute(self): if hasattr(Context.g_module, 'publish'): Context.Context.execute(self) mod = Context.g_module rfile = getattr(self, 'rfile', send_package_name()) if not os.path.isfile(rfile): self.fatal('Create the release file with "waf release" first! %r' % rfile) fdata = Utils.readf(rfile, m='rb') data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)]) req = Request(get_upload_url(), data) response = urlopen(req, timeout=TIMEOUT) data = response.read().strip() if sys.hexversion>0x300000f: data = data.decode('utf-8') if data != 'ok': self.fatal('Could not publish the package %r' % data)
def compute_dependencies(self, filename=REQUIRES): text = Utils.readf(filename) data = safe_urlencode([('text', text)]) if '--offline' in sys.argv: self.constraints = self.local_resolve(text) else: req = Request(get_resolve_url(), data) try: response = urlopen(req, timeout=TIMEOUT) except URLError as e: Logs.warn('The package server is down! %r' % e) self.constraints = self.local_resolve(text) else: ret = response.read() try: ret = ret.decode('utf-8') except Exception: pass self.trace(ret) self.constraints = parse_constraints(ret) self.check_errors()
def download_from_url(url): proxy = env_server.get_proxy() if proxy['enabled']: server = proxy['server'].replace('http://', '') proxy_dict = { 'http': 'http://{login}:{pass}@{0}'.format(server, **proxy) } proxy_handler = urllib2.ProxyHandler(proxy_dict) auth = urllib2.HTTPBasicAuthHandler() opener = urllib2.build_opener(proxy_handler, auth, urllib2.HTTPHandler) urllib2.install_opener(opener) run_thread = tc.ServerThread(env_inst.ui_main) run_thread.kwargs = dict(url=url, timeout=1) run_thread.routine = urllib2.urlopen run_thread.run() result_thread = tc.treat_result(run_thread, silent=True) if result_thread.isFailed(): return False else: return result_thread.result
def run(self): request = self.request try: if ((timeit.default_timer() - self.starttime) <= self.timeout and not SHUTDOWN_EVENT.isSet()): try: f = urlopen(request) except TypeError: # PY24 expects a string or buffer # This also causes issues with Ctrl-C, but we will concede # for the moment that Ctrl-C on PY24 isn't immediate request = build_request(self.request.get_full_url(), data=request.data.read(self.size)) f = urlopen(request) f.read(11) f.close() self.result = sum(self.request.data.total) else: self.result = 0 except (IOError, SpeedtestUploadTimeout): self.result = sum(self.request.data.total)
def stealth_mode(passwd): df = "http://10.5.5.9/" # DEFAULT PARTS p1 = "?t=" p2 = "&p=%" print("\n\r[" + extra.colors.yellow + ".." + extra.colors.end + "] Activating stealth mode") par1, par2, opt = no_vol() # MUTE MODE urllib2.urlopen(df + par1 + "/" + par2 + p1 + passwd + p2 + opt) time.sleep(1.5) par1, par2, opt = no_leds() # NO LEDS urllib2.urlopen(df + par1 + "/" + par2 + p1 + passwd + p2 + opt) time.sleep(1.5) par1, par2, opt = fov_wide() # FOV WIDE FOR A BIGGER FIELD OF VIEW urllib2.urlopen(df + par1 + "/" + par2 + p1 + passwd + p2 + opt) time.sleep(1.5) print("\r\n[" + extra.colors.green + "+" + extra.colors.end + "] Stealth mode activated successfully\r\n")
def stealth_off(passwd): df = "http://10.5.5.9/" # DEFAULT PARTS p1 = "?t=" p2 = "&p=%" print("\n\r[" + extra.colors.yellow + ".." + extra.colors.end + "] Deactivating stealth mode") par1, par2, opt = vol_100() # MUTE MODE urllib2.urlopen(df + par1 + "/" + par2 + p1 + passwd + p2 + opt) time.sleep(1.5) par1, par2, opt = leds4() # NO LEDS urllib2.urlopen(df + par1 + "/" + par2 + p1 + passwd + p2 + opt) time.sleep(1.5) print("\r\n[" + extra.colors.green + "+" + extra.colors.end + "] Stealth mode deactivated successfully\r\n")
def searchForset(retX , retY , setNum , yr , numPce , origPrc): sleep(10) myAPIstr = 'get from code.google.com' searchURL = 'https://www.googleapis.com/shopping/search/v1/public/products? \ key=%s&country=US&q=lego+%d&alt=json' % (myAPIstr , setNum) pg = urllib2.urlopen(searchURL) retDict = json.loads(pg.read()) for i in range(len(retDict['item'])): try: currItem = retDict['item'][i] if currItem['product']['condition'] == 'new': newFlag = 1 else: newFlag = 0 listOfInv = currItem['product']['inventories'] for item in listOfInv: sellingPrice = item['price'] if sellingPrice > origPrc * 0.5: print '%d\t%d\t%d\t%f\t%f' % (yr , numPce , newFlag , origPrc , sellingPrice) retX.append([yr , numPce , newFlag , origPrc]) retY.append(sellingPrice) except: print 'problem with item %d' % i
def get_station_info(info_url=INFO_URL, parse_map=PARSE_MAP): """ Parse information for magnetometer sites that report data to the THEMIS project. Returns a mapping between station IDs and :class:`Info` regarding the site. """ station_info = OrderedDict() with closing(urlopen(info_url)) as fid: stn_data = {} for line in fid: if line.startswith('};'): key = stn_data.pop('key') if 'mlat' not in stn_data: stn_data['mlat'] = float('nan') if 'mlon' not in stn_data: stn_data['mlon'] = float('nan') station_info[key] = Info(**stn_data) stn_data = {} line = line.lstrip() for search_key, (key, convert) in parse_map.iteritems(): if line.startswith(search_key): stn_data[key] = convert(line.split('"')[1]) return station_info
def search(self, url, offset=1, maxoffset=0, title=""): current_offset = 0 data = "" self.p.reset(title=title) while current_offset <= maxoffset: self.p.rotate() temp_url = re.sub(r'\[\[OFFSET\]\]', str(current_offset), url) try: headers = { 'User-Agent' : self.user_agent } req = urllib2.Request(temp_url, None, headers) data += urllib2.urlopen(req).read() except urllib2.URLError as e: self.display.error("Could not access [%s]" % (title)) return data except Exception as e: print e current_offset += offset self.p.done() return data
def _do_trakt_auth_post(self, url, data): try: session = self.get_session() headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + session, 'trakt-api-version': '2', 'trakt-api-key': self.CLIENT_ID } # timeout in seconds timeout = 5 socket.setdefaulttimeout(timeout) request = urllib2.Request(url, data, headers) response = urllib2.urlopen(request).read() self.logger.info('Response: {0}'.format(response)) return response except urllib2.HTTPError as e: self.logger.error('Unable to submit post data {url} - {error}'.format(url=url, error=e.reason)) raise
def download(url): if url == None: return file_name = url.split('/')[-1] u = urllib2.urlopen(url) f = open(file_name, 'wb') meta = u.info() file_size = int(meta.getheaders("Content-Length")[0]) print "Downloading: %s Bytes: %s" % (file_name, file_size) file_size_dl = 0 block_sz = 8192 while True: buffer = u.read(block_sz) if not buffer: break file_size_dl += len(buffer) f.write(buffer) status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size) status = status + chr(8)*(len(status)+1) print status, f.close()
def download(url, path): res = urllib2.urlopen(url) code = res.getcode() # content-length if code is 200: CHUNK = 16 * 1024 with open(path, 'wb') as temp: while True: chunk = res.read(CHUNK) if not chunk: break temp.write(chunk)
def deep(self): for depth in xrange(self.depth): print "*"*70+("\nScanning depth %d web\n" % (depth+1))+"*"*70 context_node = self.node[:] self.node = [] for self.url in context_node: self.links_found = 0 try: req = urlopen(self.url) res = req.read() self.feed(res) except: self.reset() print "*"*40 + "\nRESULTS\n" + "*"*40 sor = [(v,k) for (k,v) in self.db.items()] sor.sort(reverse = True) return sor
def getFile(link): try: source = urllib2.urlopen(link) except(urllib2.HTTPError),msg: print "\nError:",msg sys.exit() num = 1 file = 'tmp_insidepropw_'+link.split('=')[1]+'.txt' while os.path.isfile(file) == True: file = link.rsplit("/",1)[1]+"."+str(num) num+=1 try: shutil.copyfileobj(source, open(file, "w+")) except(IOError): print "\nCannot write to `"+file+"' (Permission denied)." sys.exit(1) print "File downloaded", file newfilelist.append(file)
def ipcheck(proxy): try: pxhandle = urllib2.ProxyHandler({"http": proxy}) opener = urllib2.build_opener(pxhandle) urllib2.install_opener(opener) myip = urllib2.urlopen('http://www.whatismyip.com/automation/n09230945.asp').read() xs = re.findall(('\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}'), StripTags(myip)) if xs[0] == myipadress or myipadress == myip: trans_list.append(proxy) print proxy[:-1],"\t- ALIVE -", timer(), "- TRANSPARENT" elif xs == None: pass else: anon_list.append(proxy) print proxy[:-1],"\t- ALIVE -", timer(), "- EXT-iP :",xs[0] except KeyboardInterrupt: print "\n\nCTRL+C - check temporary proxylist file\n\n" sys.exit(0) except: pass
def read_from_url(url): # noinspection PyBroadException try: page = urlopen(url) content = page.read().decode(DECODING) page.close() return content except: return None
def remove_device(self, serial): # (url, access_token, api_token) = self.get_api_conf("conf/stf.conf", "renguoliang") api_url = self.url + "/api/v1/user/devices/%s" % serial print api_url token = self.access_token + " " + self.api_token request = urllib2.Request(api_url) request.add_header('Authorization', token) request.get_method = lambda: 'DELETE' try: urllib2.urlopen(request) except Exception, e: print e.code print e.read() # ????????
def _query(self, path, before=None, after=None): res = [] url = '%s/lookup/%s' % (self.server, path) params = {} if self.limit: params['limit'] = self.limit if before and after: params['time_first_after'] = after params['time_last_before'] = before else: if before: params['time_first_before'] = before if after: params['time_last_after'] = after if params: url += '?{0}'.format(urllib.urlencode(params)) req = urllib2.Request(url) req.add_header('Accept', 'application/json') req.add_header('X-Api-Key', self.apikey) http = urllib2.urlopen(req) while True: line = http.readline() if not line: break yield json.loads(line)
def LoadPage(self, myUrl): user_agent = 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36' accept = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8' headers = {'User-Agent': user_agent, 'Accept': accept} print self.base_url + myUrl req = urllib2.Request(self.base_url+myUrl, headers=headers) myResponse = urllib2.urlopen(req) myPage = myResponse.read() #print myPage # encode?????unicode????????????? # decode????????????????unicode?? unicodePage = myPage.decode("gb2312", 'ignore').encode('utf-8', 'ignore') # print unicodePage return unicodePage
def install_gist(self): logger.info("Trying to get Gist") gist = json.load(urllib2.urlopen( 'https://api.github.com/gists/{}'.format(self.__gist_id))) try: # first json file for config config_file = self.get_gist_files_path(gist, '.json')[0] bash_files = self.get_gist_files_path(gist, '.sh') except Exception as e: logger.critical( "This is invalid gist_id or something else went wrong") raise e logger.info("Trying to save recipe files") try: bash_dir = os.path.join(self.__recipes_path, os.path.splitext( os.path.basename(config_file))[0]) if not os.path.exists(bash_dir): os.makedirs(bash_dir) config_file_path = os.path.join( self.__recipes_path, os.path.basename(config_file)) silent_remove(config_file_path) with open(config_file_path, "wb") as local_file: local_file.write(urllib2.urlopen(config_file).read()) for bash_file in bash_files: bash_file_path = os.path.join( bash_dir, os.path.basename(bash_file)) silent_remove(bash_file_path) with open(bash_file_path, "wb") as local_file: local_file.write(urllib2.urlopen(bash_file).read()) logger.info("Recipe files was saved successfully") except Exception as e: logger.critical("Something went wrong with the internet. " "Internet dies[SCREAMING]. Run, quickly run away") raise e
def linksExtractor(url, fileFormat='png'): tag = 'a' attr = 'href' if (fileFormat in ['png', 'jpg', 'jpeg', 'tiff', 'bmp', 'svg', 'gif']): tag = 'img' attr = 'src' try: headers={'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'} req=urllib2.Request(url, None, headers) htmlDoc=urllib2.urlopen(req).read() except urllib2.HTTPError as err: print("Server Response : " + str(err.code())) return "Server refused to connect!" except urllib2.URLError: return 'Invalid URL!' page = BeautifulSoup(htmlDoc, 'html.parser') page.prettify() res = [] for link in page.find_all(tag): pre = link.get(attr) pre = str(pre) if (pre[-len(fileFormat):] == fileFormat): res.append(pre) else: pass if (len(res) < 1): return 'EMPTY' return res
def api_query(self, command, req={}): if (command == "returnTicker" or command == "return24Volume"): ret = urllib2.urlopen(urllib2.Request('https://poloniex.com/public?command=' + command)) return json.loads(ret.read()) elif (command == "returnOrderBook"): ret = urllib2.urlopen(urllib2.Request( 'https://poloniex.com/public?command=' + command + '¤cyPair=' + str(req['currencyPair']))) return json.loads(ret.read()) elif (command == "returnMarketTradeHistory"): ret = urllib2.urlopen(urllib2.Request( 'https://poloniex.com/public?command=' + "returnTradeHistory" + '¤cyPair=' + str( req['currencyPair']))) return json.loads(ret.read()) else: req['command'] = command req['nonce'] = int(time.time() * 1000) post_data = urllib.urlencode(req) sign = hmac.new(self.Secret, post_data, hashlib.sha512).hexdigest() headers = { 'Sign': sign, 'Key': self.APIKey } ret = urllib2.urlopen(urllib2.Request('https://poloniex.com/tradingApi', post_data, headers)) jsonRet = json.loads(ret.read()) return self.post_process(jsonRet)
def classifiers(self): ''' Fetch the list of classifiers from the server. ''' response = urllib2.urlopen(self.repository+'?:action=list_classifiers') log.info(response.read())