def retrieve_json(self,url): ''' Retrieve data from the Veneer service at the given url path. url: Path to required resource, relative to the root of the Veneer service. ''' if PRINT_URLS: print("*** %s ***" % (url)) if self.protocol=='file': text = open(self.prefix+url+self.data_ext).read() else: conn = hc.HTTPConnection(self.host,port=self.port) conn.request('GET',quote(url+self.data_ext)) resp = conn.getresponse() text = resp.read().decode('utf-8') #text = urlopen(self.base_url + quote(url+self.data_ext)).read().decode('utf-8') text = self._replace_inf(text) if PRINT_ALL: print(json.loads(text)) print("") return json.loads(text)
def retrieve_csv(self,url): ''' Retrieve data from the Veneer service, at the given url path, in CSV format. url: Path to required resource, relative to the root of the Veneer service. NOTE: CSV responses are currently only available for time series results ''' if PRINT_URLS: print("*** %s ***" % (url)) req = Request(self.base_url + quote(url+self.data_ext),headers={"Accept":"text/csv"}) text = urlopen(req).read().decode('utf-8') result = utils.read_veneer_csv(text) if PRINT_ALL: print(result) print("") return result
def retrieve_json(self,url,**kwargs): if self.print_urls: print("*** %s ***" % (url)) try: text = urlopen(self.base_url + quote(url)).read().decode('utf-8') except: self.log("Couldn't retrieve %s"%url) return None self.save_data(url[1:],bytes(text,'utf-8'),"json") if self.print_all: print(json.loads(text)) print("") return json.loads(text)
def google_image(message, keywords): """ google ???????????? https://github.com/llimllib/limbo/blob/master/limbo/plugins/image.py """ query = quote(keywords) searchurl = "https://www.google.com/search?tbm=isch&q={0}".format(query) # this is an old iphone user agent. Seems to make google return good results. useragent = "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_0 like Mac OS X; en-us) AppleWebKit/532.9 (KHTML, like Gecko) Versio n/4.0.5 Mobile/8A293 Safari/6531.22.7" result = requests.get(searchurl, headers={"User-agent": useragent}).text images = list(map(unescape, re.findall(r"var u='(.*?)'", result))) if images: botsend(message, choice(images)) else: botsend(message, "`{}` ???????????????".format(keywords))
def google_map(message, keywords): """ google ????????????? https://github.com/llimllib/limbo/blob/master/limbo/plugins/map.py """ query = quote(keywords) # Slack seems to ignore the size param # # To get google to auto-reasonably-zoom its map, you have to use a marker # instead of using a "center" parameter. I found that setting it to tiny # and grey makes it the least visible. url = "https://maps.googleapis.com/maps/api/staticmap?size=800x400&markers={0}&maptype={1}" url = url.format(query, 'roadmap') botsend(message, url) attachments = [{ 'pretext': '<http://maps.google.com/maps?q={}|????????>'.format(query), 'mrkdwn_in': ["pretext"], }] botwebapi(message, attachments)
def generate_search_url(song, viewsort=False): """ Generate YouTube search URL for the given song. """ # urllib.request.quote() encodes URL with special characters song = quote(song) if viewsort: url = u"https://www.youtube.com/results?q={0}".format(song) else: url = u"https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={0}".format(song) return url
def retrieve_resource(self,url,ext): if self.print_urls: print("*** %s ***" % (url)) self.save_data(url[1:],urlopen(self.base_url+quote(url)).read(),ext,mode="b") # Process Run list and results
def youtube(keyword=None): """Open youtube. Args: keyword (optional): Search word. """ if keyword is None: web.open('https://www.youtube.com/watch?v=L_mBVT2jBFw') else: web.open(quote('https://www.youtube.com/results?search_query={}'.format(keyword), RESERVED))
def get_user(self, username): username_ = urllib2.quote(username) if six.PY3: username = base64.encodestring(username_.encode())[:-1] username = username.decode('utf-8') if six.PY2: username = base64.encodestring(username_)[:-1] return username
def parse_videos(url): data = b'url=' + urllib.quote(url).encode('ascii') html = urllib.urlopen('http://yipeiwu.com/getvideo.html', data=data).read().decode('utf-8') name = _re_name.findall(html) if name: name = name[0] else: return False result = _re.findall(html) return name,result
def play_genres(self, genre_list, player_id=None): """Adds then plays a random mix of albums of specified genres""" gs = genre_list or [] commands = (["playlist clear", "playlist shuffle 1"] + ["playlist addalbum %s * *" % urllib.quote(genre) for genre in gs if genre] + ["play 2"]) pid = player_id or self.cur_player_id return self._request(["%s %s" % (pid, com) for com in commands])
def playlist_play(self, path, player_id=None): """Play song / playlist immediately""" self.player_request("playlist play %s" % (urllib.quote(path)), player_id=player_id)
def playlist_resume(self, name, resume=True, wipe=False, player_id=None): cmd = ("playlist resume %s noplay:%d wipePlaylist:%d" % (urllib.quote(name), int(not resume), int(wipe))) self.player_request(cmd, wait=False, player_id=player_id)
def google(message, keywords): """ google ?????????? https://github.com/llimllib/limbo/blob/master/limbo/plugins/google.py """ if keywords == 'help': return query = quote(keywords) url = "https://encrypted.google.com/search?q={0}".format(query) soup = BeautifulSoup(requests.get(url).text, "html.parser") answer = soup.findAll("h3", attrs={"class": "r"}) if not answer: botsend(message, "`{}` ???????????????".format(keywords)) try: _, url = answer[0].a['href'].split('=', 1) url, _ = url.split('&', 1) botsend(message, unquote(url)) except IndexError: # in this case there is a first answer without a link, which is a # google response! Let's grab it and display it to the user. return ' '.join(answer[0].stripped_strings)
def xml_set_cdata(_node, _value, _lowercase=False): """Helper to set character data in an XML tree""" if _value is not None and _value != "": sec = Text() if _value is str: _value = quote(_value) if _lowercase: # Force lowercase. sec.data = _value.lower() else: sec.data = _value _node.appendChild(sec)
def iriToUri(iri): parts= urlparse(iri) pp= [(parti,part) for parti, part in enumerate(parts)] res=[]; for p in pp: res.append(p[1] if p[0] != 4 else quote(p[1] )) return urlunparse(res);
def _get_from_api(self, lang="en"): word = self.word baseurl = "https://od-api.oxforddictionaries.com/api/v1" app_id = "45aecf84" app_key = "bb36fd6a1259e5baf8df6110a2f7fc8f" headers = {"app_id": app_id, "app_key": app_key} word_id = urllib2.quote(word.lower().replace(" ", "_")) url = baseurl + "/entries/" + lang + "/" + word_id url = urllib2.Request(url, headers=headers) response = json.loads(urllib2.urlopen(url).read()) return response["results"]
def quote_base64_encode(text): """ Quoting and encoding string using base64 encoding. """ quote_text = quote(text) quote_text = base64.b64encode(bytearray(quote_text, 'utf-8')) return quote(quote_text)
def escape(s): return quote(s, safe="~")
def _parse_object_name(object_name): if isinstance(object_name, list): object_name = quote(('/'.join(object_name))) return object_name
def generate_link(user): return "http://www.codewars.com/{user}".format(user=request.quote(user))
def generate_link(user): return "http://www.codewars.com/users/{user}".format(user=request.quote(user)) # print(generate_link("matt c"))
def __yahoo_request(query): """Request Yahoo Finance information. Request information from YQL. `Check <http://goo.gl/8AROUD>`_ for more information on YQL. """ query = quote(query) url = 'https://query.yahooapis.com/v1/public/yql?q=' + query + \ '&format=json&env=store://datatables.org/alltableswithkeys' response = urlopen(url).read() return json.loads(response.decode('utf-8'))['query']['results']
def request_quotes(tickers_list, selected_columns=['*']): """Request Yahoo Finance recent quotes. Returns quotes information from YQL. The columns to be requested are listed at selected_columns. Check `here <http://goo.gl/8AROUD>`_ for more information on YQL. >>> request_quotes(['AAPL'], ['Name', 'PreviousClose']) { 'PreviousClose': '95.60', 'Name': 'Apple Inc.' } :param table: Table name. :type table: string :param tickers_list: List of tickers that will be returned. :type tickers_list: list of strings :param selected_columns: List of columns to be returned, defaults to ['*'] :type selected_columns: list of strings, optional :returns: Requested quotes. :rtype: json :raises: TypeError, TypeError """ __validate_list(tickers_list) __validate_list(selected_columns) query = 'select {cols} from yahoo.finance.quotes where symbol in ({vals})' query = query.format( cols=', '.join(selected_columns), vals=', '.join('"{0}"'.format(s) for s in tickers_list) ) response = __yahoo_request(query) if not response: raise RequestError('Unable to process the request. Check if the ' + 'columns selected are valid.') if not type(response['quote']) is list: return [response['quote']] return response['quote']
def getStationsFromName(self, userInput): """ Query that will return some stations suggestions according to user input """ return self.reqhandler.sendrequest("/stations?query=" + urlqt(userInput))
def main(): """ The entry point for the app. Called when music-scraper is typed in terminal. Starts the GUI and starts the scraping process after the input is given """ curses.initscr() if curses.COLS < 80 or curses.LINES < 5: curses.endwin() print('Terminal\'s dimensions are too small') return process = CrawlerProcess({'LOG_ENABLED': False}) def gui_input(screen): GUI.screen = screen curses.start_color() GUI.screen.keypad(1) curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_CYAN) GUI.high_light_text = curses.color_pair(1) GUI.normal_text = curses.A_NORMAL GUI.box = curses.newwin(curses.LINES, curses.COLS, 0, 0) GUI.message = GUI.get_input() curses.wrapper(gui_input) s = request.quote(GUI.message) MusicSpider.start_urls = [ "http://www.google.com/search?q=" + s, ] process.crawl(MusicSpider) thread = GUIThread(process, start_gui) thread.start() process.start() if not GUI.gui_stopped: if len(GUI.strings) == 0: GUI.box.erase() GUI.box.addstr(1, 1, "No Results Found... Try with Some other keywords.", GUI.high_light_text) GUI.add_bottom_menus() GUI.screen.refresh() GUI.box.refresh() else: GUI.box.addstr(curses.LINES - 2, 1, "Completed Scraping !!", GUI.high_light_text) GUI.add_bottom_menus() GUI.screen.refresh() GUI.box.refresh()
def send_to_es(self, path, method="GET", payload={}): """Low-level POST data to Amazon Elasticsearch Service generating a Sigv4 signed request Args: path (str): path to send to ES method (str, optional): HTTP method default:GET payload (dict, optional): additional payload used during POST or PUT Returns: dict: json answer converted in dict Raises: #: Error during ES communication ES_Exception: Description """ if not path.startswith("/"): path = "/" + path es_region = self.cfg["es_endpoint"].split(".")[1] # send to ES with exponential backoff retries = 0 while retries < int(self.cfg["es_max_retry"]): if retries > 0: seconds = (2**retries) * .1 # print('Waiting for %.1f seconds', seconds) time.sleep(seconds) req = AWSRequest( method=method, url="https://%s%s?pretty&format=json" % (self.cfg["es_endpoint"], quote(path)), data=payload, headers={'Host': self.cfg["es_endpoint"]}) credential_resolver = create_credential_resolver(get_session()) credentials = credential_resolver.load_credentials() SigV4Auth(credentials, 'es', es_region).add_auth(req) try: preq = req.prepare() session = Session() res = session.send(preq) if res.status_code >= 200 and res.status_code <= 299: # print("%s %s" % (res.status_code, res.content)) return json.loads(res.content) else: raise ES_Exception(res.status_code, res._content) except ES_Exception as e: if (e.status_code >= 500) and (e.status_code <= 599): retries += 1 # Candidate for retry else: raise # Stop retrying, re-raise exception