我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用praw.Reddit()。
def login(): # Try to login or sleep/wait until logged in, or exit if user/pass wrong NotLoggedIn = True while NotLoggedIn: try: reddit = praw.Reddit( user_agent=credsUserAgent, client_id=credsClientID, client_secret=credsClientSecret, username=credsUserName, password=credsPassword) print_and_log("Logged in") NotLoggedIn = False except praw.errors.InvalidUserPass: print_and_log("Wrong username or password", error=True) exit(1) except Exception as err: print_and_log(str(err), error=True) time.sleep(5) return reddit
def read_login_info(): """ Read all login information from ../data/config.ini """ config = ConfigParser.ConfigParser() config.read('../data/config.ini') client_id = config.get('API_INFO', 'id') client_secret = config.get('API_INFO', 'secret') password = config.get('API_INFO', 'password') user_agent = config.get('API_INFO', 'user_agent') username = config.get('API_INFO', 'username') """ If No Api key info in config file: warn user """ ensure_valid_config(client_id) reddit_api = praw.Reddit(client_id=client_id, client_secret=client_secret, password=password, user_agent=user_agent, username=username) return reddit_api
def build_api(): data = datatools.get_data() if "reddit_api_user_agent" not in data["discord"]["keys"] or \ "reddit_api_client_id" not in data["discord"]["keys"] or \ "reddit_api_client_secret" not in data["discord"]["keys"]: logger.warning("For gamedeals to work, please make sure \"reddit_api_user_agent\", " + "\"reddit_api_client_id\", and \"reddit_api_client_secret\" are all added as API keys") logger.debug("Building Reddit API") try: global redditapi redditapi = praw.Reddit( user_agent=data["discord"]["keys"]["reddit_api_user_agent"], client_id=data["discord"]["keys"]["reddit_api_client_id"], client_secret=data["discord"]["keys"]["reddit_api_client_secret"]) logger.debug("Build successfull") return True except: logger.warning("Error connecting to Reddit API, Reddit won't be available") return False
def init(): connection = MongoClient(secret.mongo_url, secret.mongo_port) db = connection[secret.mongo_db] db.authenticate(secret.mongo_user, urllib.quote_plus(secret.mongo_pass)) r = praw.Reddit(user_agent="Samachar Bot for /r/india by /u/sallurocks") scopes = {u'edit', u'submit', u'read', u'privatemessages', u'identity', u'history'} oauth_helper = PrawOAuth2Mini(r, app_key=secret.news_app_key, app_secret=secret.news_app_secret, access_token=secret.news_access_token, refresh_token=secret.news_refresh_token, scopes=scopes) init_object = {'db': db, 'reddit': r, 'oauth': oauth_helper, 'goose': Goose()} return init_object
def login(self, configuration): credentials = configuration.get_credentials() user_agent = configuration.get_user_agent() reddit = praw.Reddit( client_id=credentials["client_id"], client_secret=credentials["client_secret"], username=credentials["username"], password=credentials["password"], user_agent=user_agent ) if reddit.user.me() == credentials["username"]: self.reddit = reddit return True else: raise Exception("LoginException")
def top_scorers(self, league_url, league_name): cprint("\nRetrieving Top Scorers for " + league_name + "...", 'yellow') cprint("Rank Player Team Goals", 'red', attrs=['bold', 'underline']) r = requests.get(league_url) soup = BeautifulSoup(r.content, "lxml") try: for x in range(0, 15): player_name = soup.find_all("td", {"class": "player large-link"})[x].text team_name = soup.find_all("td", {"class": "team large-link"})[x].text goals_count = soup.find_all("td", {"class": "number goals"})[x].text if x < 9: topscorer = "{:<20}".format(player_name), space, "{:<15}".format(team_name), space, goals_count cprint(" " + str(x + 1) + ". " + str("".join(topscorer)), 'blue') elif 9 <= x: topscorer = "{:<20}".format(player_name), space, "{:<15}".format(team_name), space, goals_count cprint(" " + str(x + 1) + ". " + str("".join(topscorer)), 'blue') except IndexError: pass # Scrapes subreddit in Reddit to display URLs to live streams (when available) based on user input
def test(): top_posts = get_top_posts(limit=5) sorted_comments = get_top_comments(top_posts, limit=20) authors = extract_authors(sorted_comments) sorted_authors = calculate_karma(authors, limit=50) write_authors_to_file(sorted_authors) # Don't accidentally override author's file!! # def try_until_works(function): # while True: # try: # finction() # break # except urllib2.HTTPError, e: # if e.code in [429, 500, 502, 503, 504]: # print "Reddit is down (error %s), sleeping..." % e.code # time.sleep(60) # pass # else: # raise # except Exception, e: # print "couldn't Reddit: %s" % str(e) # raise
def configure_tor(config): """ Assembles the tor object based on whether or not we've enabled debug mode and returns it. There's really no reason to put together a Subreddit object dedicated to our subreddit -- it just makes some future lines a little easier to type. :param r: the active Reddit object. :param config: the global config object. :return: the Subreddit object for the chosen subreddit. """ if config.debug_mode: tor = config.r.subreddit('ModsOfToR') else: # normal operation, our primary subreddit tor = config.r.subreddit('transcribersofreddit') return tor
def get_subreddit_content(subreddit_name, error_msg): try: reddit_inst = praw.Reddit(client_id=client_id, client_secret=client_secret, password=password, user_agent='contentscript by /u/pimpdaddyballer', username=username) puppies_subreddit = reddit_inst.subreddit(subreddit_name) return puppies_subreddit.random().url except praw.exceptions.ClientException as e: print(e) return None except praw.exceptions as e: print(e) return error_msg
def __init__(self, subreddit=None, multireddit=None, days=2): """Initialize the SubredditStats instance with config options.""" self.submissions = [] self.comments = [] # extract post with at least 2 ours and less the <days+1> days now_utc = calendar.timegm(time.gmtime()) self.max_date_thread = now_utc - HOUR_IN_SECONDS * 2 self.min_date_thread = now_utc - DAY_IN_SECONDS * (days + 1) self.min_date = now_utc - DAY_IN_SECONDS * days self.reddit = Reddit(check_for_updates=False, user_agent=AGENT) if subreddit: self.subreddit = self.reddit.subreddit(subreddit) elif multireddit: self.subreddit = self.reddit.multireddit(*multireddit) else: raise ValueError('Specify subreddit or multireddit')
def parse_args(): """parse args with argparse :returns: args """ parser = argparse.ArgumentParser(description="Daily Reddit Wallpaper") parser.add_argument("-s", "--subreddit", type=str, default=config["subreddit"], help="Example: art, getmotivated, wallpapers, ...") parser.add_argument("-t", "--time", type=str, default=config["time"], help="Example: new, hour, day, week, month, year") parser.add_argument("-n", "--nsfw", action='store_true', default=config["nsfw"], help="Enables NSFW tagged posts.") parser.add_argument("-d", "--display", type=int, default=config["display"], help="Desktop display number on OS X (0: all displays, 1: main display, etc") parser.add_argument("-o", "--output", type=str, default=config["output"], help="Set the outputfolder in the home directory to save the Wallpapers to.") args = parser.parse_args() return args
def main(): print("Loading cache") seen_posts = cache.load_cached_storage(cache_file, default_size=30) print("Connecting to reddit") r = praw.Reddit(user_agent=user_agent) sub = r.get_subreddit(subreddit, fetch=False) print("Getting new") new = sub.get_new(limit=10) new = seen_posts.get_diff(new) print("New posts:") print(new) for post in new: msg = create_post_message(post) if msg: print() print(msg) send_message(msg)
def play_dice(wager_amount): die_result = roll_die() if (die_result > 2): outcome = SG_Repository.WagerOutcome.WIN winnings = wager_amount * pay_table[die_result] else: outcome = SG_Repository.WagerOutcome.LOSE winnings = 0 wager_result = {'die_roll_result' : die_result, 'outcome' : outcome, 'winnings' : winnings} print("All-or-nothing die roll wager result:") pprint.pprint(wager_result) return wager_result # create our Reddit instance
def play_6_9(wager_amount): roll_1 = roll_die() roll_2 = roll_die() if (roll_1 + roll_2 == 6) or (roll_1 + roll_2 == 9) or (roll_1 + roll_2 == 3): outcome = SG_Repository.WagerOutcome.WIN winnings = wager_amount * payout_factor else: outcome = SG_Repository.WagerOutcome.LOSE winnings = 0 wager_result = {'roll_1' : roll_1, 'roll_2' : roll_2, 'outcome' : outcome, 'winnings' : winnings, 'roll_total' : (roll_1 + roll_2)} print("6-9 wager result:") pprint.pprint(wager_result) return wager_result # create our Reddit instance
def play_coin_toss(wager_amount): coin_result = flip_coin() if (coin_result == "HEADS"): outcome = SG_Repository.WagerOutcome.WIN winnings = wager_amount * 2 else: outcome = SG_Repository.WagerOutcome.LOSE winnings = 0 wager_result = {'toss_result' : coin_result, 'outcome' : outcome, 'winnings' : winnings} print("Coin toss wager result:") pprint.pprint(wager_result) return wager_result # create our Reddit instance
def play_3_6_9(wager_amount): roll_1 = roll_die() roll_2 = roll_die() if (roll_1 + roll_2 == 6) or (roll_1 + roll_2 == 9) or (roll_1 + roll_2 == 3): outcome = SG_Repository.WagerOutcome.WIN winnings = wager_amount * payout_factor else: outcome = SG_Repository.WagerOutcome.LOSE winnings = 0 wager_result = {'roll_1' : roll_1, 'roll_2' : roll_2, 'outcome' : outcome, 'winnings' : winnings, 'roll_total' : (roll_1 + roll_2)} # print("6-9 wager result:") # pprint.pprint(wager_result) return wager_result # create our Reddit instance
def init(logger, responseWhitelist, user): global reddit global log global whitelist try: reddit = praw.Reddit( user, user_agent=globals.USER_AGENT) except configparser.NoSectionError: log.error("User "+user+" not in praw.ini, aborting") return False globals.ACCOUNT_NAME = str(reddit.user.me()) log = logger whitelist = responseWhitelist log.info("Logged into reddit as /u/" + globals.ACCOUNT_NAME) return True
def _authenticate(self): response = requests.post( 'https://www.reddit.com/api/login', {'user': self._username, 'passwd': self._password}, headers = _FAKE_HEADERS ) self._cookies = response.cookies if self._client_id: self._reddit = praw.Reddit(user_agent=USER_AGENT, client_id=self._client_id, client_secret=self._client_secret, username=self._username, password=self._password)
def authenticate(request, username=None, code=None): try: return RedditUser.objects.get(username=username) except RedditUser.DoesNotExist: if code: print('Creating new RedditUser') with open('secret.json', 'r') as f: secret = json.load(f) reddit = praw.Reddit(client_id=secret['client_id'], client_secret=secret['client_secret'], redirect_uri='http://localhost:8000/callback', user_agent='Plan-Reddit by /u/SkullTech101') token = reddit.auth.authorize(code) user = RedditUser(username=str(reddit.user.me()), token=token) user.save() return user return None
def auth_reddit(): global reddit print('Attempting to authenticate with reddit...') reddit = praw.Reddit(reddit_app_ua) reddit.set_oauth_app_info(reddit_app_id, reddit_app_secret, reddit_app_uri) reddit.refresh_access_information(reddit_app_refresh) print('Success') # connects to imgur with pyimgur
def __init__(self, *, subreddits, iniSite='bot', newLimit=25, sleep=30, connectAttempts=1, scopes=('submit', 'privatemessages', 'read', 'identity'), dbName='praww.db'): """Create an instance of Reddit. Does not yet connect. :param subreddits: list of subreddits to read :param iniSite: see PRAW config docs using praw.ini (default: bot) :param newLimit: number of entries to read (default: 25) :param sleep: read reddit every n seconds (default: 30) :param connectAttempts: attempt initial connection n times and sleep 2^n sec between attempts (default: 1) :param scopes: required scopes :param dbName: name of file of seen-things db """ self.killed = False signal.signal(signal.SIGTERM, self.__catchKill) self.__subreddits = '+'.join(subreddits) self.iniSite = iniSite self.newLimit = newLimit self.sleep = sleep self.connectAttempts = connectAttempts self.scopes = scopes self.dbName = dbName self.rateSleep = 0 self.roundStart = 0 # restart after 15 min of consecutive fails self.__failLimit = 15*60 // max(sleep, 1) # use with() setter self.__commentListener = None self.__submissionListener = None self.__mentionListener = None self.__pmListener = None
def __connect(self): connectTry = 1 while True: try: log.debug("connect() creating reddit adapter") self.r = praw.Reddit(self.iniSite) # connect and check if instance has required scopes for scope in self.scopes: if scope not in self.r.auth.scopes(): raise Exception('reddit init missing scope', scope) self.me = self.r.user.me() log.debug('connect() logged into reddit as: %s', self.me) return except prawcore.exceptions.RequestException as e: log.exception('connect() failed to send request') if connectTry >= self.connectAttempts: log.error('connect() connection attempt %s failed', connectTry) raise Exception('failed to connect') log.warn('connect() connection attempt %s failed', connectTry) # sleep up to 2^try sec before failing (3 trys = 6s) for s in range(2 ** connectTry): if self.killed: raise Exception('killed') time.sleep(1) connectTry += 1
def __init__(self, cache_size=500, maxcache_day=1, popular_type="Daily", timeout=20): """ The __init__ method for MemeGenerator class Args: cache_size (int): Number of memes stored as cache maxcache_day (int): Number of days until the cache expires """ super(RedditMemes, self).__init__( "https://www.reddit.com/r/memes/", cache_size, maxcache_day) self._origin = Origins.REDDITMEMES # Client ID and user agent requested by Reddit API config = configparser.ConfigParser() cdir = os.path.dirname(os.path.realpath(__file__)) config.read(os.path.join(cdir, 'config.ini')) self._client_id = config['Reddit']['ClientID'] self._client_secret = config['Reddit']['ClientSecret'] if self._client_secret == '': self._client_secret = None self._user_agent = config['Reddit']['UserAgent'].format(sys.platform) # Generate a Reddit instance self._reddit = praw.Reddit(client_id=self._client_id, client_secret=self._client_secret, user_agent=self. _user_agent) if self._no_cache() or self._cache_expired(): self._build_cache()
def get_memes(self, num): """ Get memes from Reddit /r/meme subreddit """ if self._no_cache() or self._cache_expired(): self._build_cache() else: self._update_with_cache() if self._cache_size >= num: return self._pop_memes(num) else: # Haven't found a way to get memes # in a specific range using PRAW results = self._reddit.subreddit('memes').hot(limit=num) meme_results = [] for submission in results: # Get required properties for the memes ctitle = submission.title curl = submission.url cmeme = Meme(curl, datetime.datetime.now(), title=ctitle, origin=Origins.REDDITMEMES) meme_results.append(cmeme) return meme_results
def init_reddit(self): """Initialize the reddit instance""" # Get reddit instance self.r = praw.Reddit(user_agent=self.reddit.user_agent, client_id=self.reddit.client_id, client_secret=self.reddit.secret, username=self.reddit.user_name, password=self.reddit.password) # Get r/all/ instance self.r_all = self.r.subreddit(self.reddit.subreddit) # Get r/R_I_P instance self.r_R_I_P = self.r.subreddit('R_I_P')
def __init__(self, sparcli): self.sparcli = sparcli tokens = getTokens()['Reddit'] # If the tokens are not set up properly, this command will fail right here self.reddit = praw.Reddit( client_id=tokens['ID'], client_secret=tokens['Secret'], user_agent=tokens['User Agent'] ) self.getRedditor = lambda name: self.reddit.redditor(name) self.getSubreddit = lambda name: self.reddit.subreddit(name) self.redditIcon = 'http://rawapk.com/wp-content/uploads/2016/04/Reddit-The-Official-App-Icon.png'
def on_command(self, command, ctx): if command.cog_name == 'Reddit': await self.sparcli.send_typing(ctx.message.channel)
def setup(bot): bot.add_cog(Reddit(bot))
def authenticate(): """ Logs into reddit, using credentials from praw.ini file :return: Instance of Reddit """ print('Authenticating') reddit = praw.Reddit('TsubasaRedditBot') print("Authenticated as {}".format(reddit.user.me())) return reddit
def login(): return praw.Reddit(username=config.reddit_username, password=config.reddit_password, client_id=config.reddit_client_id, client_secret=config.reddit_client_secret, user_agent='agent')
def _connect_reddit(): if _config is None: error("Can't connect to reddit without a config") return None return praw.Reddit(client_id=_config.r_oauth_key, client_secret=_config.r_oauth_secret, username=_config.r_username, password=_config.r_password, user_agent=_config.useragent, check_for_updates=False)
def __init__(self): useragent = "CryoChecker 1.0" self.r = praw.Reddit('CryoChecker', user_agent = useragent) self.subreddit = self.r.subreddit("thecryopodtohell")
def authenticate(): log("Authenticating...") reddit = praw.Reddit(username=USERNAME, password=PASSWORD, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, user_agent="StandardsBot v1.2 for reddit. /u/Nokeo08") log("Authenticated as {}".format(reddit.user.me())) return reddit
def loginReddit(): global r r = praw.Reddit(client_id=obot.client_id, client_secret=obot.client_secret, user_agent=obot.user_agent, username=obot.username, password=obot.password)
def parse_reddit_comments(): import sys import praw import tools.settings from datetime import datetime username = settings.getOwner() r = praw.Reddit(settings.getRedditToken()) user = r.get_redditor(username) for comment in user.get_comments(limit = None): body = comment.body.encode('utf-8').strip() if is_valid_message(body): yield body, datetime.fromtimestamp(comment.created).year
def reportKarma(username): import praw r = praw.Reddit(settings.getRedditToken()) try: user = r.get_redditor(username) message = "<https://www.reddit.com/user/{}|{}> has {} comment karma and {} link karma.".format(username, username, user.comment_karma, user.link_karma) except: return "User _{}_ does not exit (or Reddit is down).".format(username) return message
def authenticate(): print('Authenticating...\n') reddit = praw.Reddit('animal-facts-bot', user_agent='/u/AnimalFactsBot') print('Authenticated as {}\n'.format(reddit.user.me())) return reddit
def __init__(self, bot): super().__init__(bot) logger.info('Reddit cog created!') self.praw = praw.Reddit(**bot.cfg['credentials']['reddit']) self.update_interval = UPDATE_INTERVAL self.fuzz_interval = FUZZ_INTERVAL self.feed_task = bot.loop.create_task(self.post_to_feeds())
def __unload(self): logger.info('Reddit cog unloading, cancelling feed task...') self.feed_task.cancel()
def post_to_feeds(self): # guilds aren't available until the bot is ready, and this task begins before the bot # is ready. so let's wait for it to be ready before updating feeds await self.bot.wait_until_ready() logger.debug('Bot is ready, proceeding to Reddit feed update loop...') while True: # sleep up here so we don't update immediately # wait the main sleep interval await asyncio.sleep(self.update_interval) logger.debug('Going to update all feeds...') # fetch all feeds, and update them all feeds = await self.bot.pgpool.fetch('SELECT * FROM reddit_feeds') # enumerate through all feeds for idx, feed in enumerate(feeds): logger.debug('Updating feed {}/{}!'.format(idx + 1, len(feeds))) # wait a minute or two to prevent rate limiting (doesn't really help but w/e) await asyncio.sleep(random.random() + self.fuzz_interval) # update the feed await self.update_feed(feed) logger.debug('Updated.')
def reddit(self, ctx): """ This command group contains all commands related to Reddit feeds. Feeds will be updated every 30 minutes. Both self and link posts will be posted to the channel. NSFW posts will only be posted if the channel that the bot is posting in is NSFW. Stickied posts are never posted. """ pass
def setup(bot): if 'reddit' not in bot.cfg['credentials']: logger.warning('Not adding Reddit cog, not present in configuration!') return bot.add_cog(Reddit(bot))
def connect_reddit(self): """ Returns a praw connection object """ seconds = int(1 * 3000) if hasattr(self.conf.reddit, 'last_refresh') and self.conf.reddit.last_refresh + seconds > int(time.mktime(time.gmtime())): lg.debug("< CointipBot::connect_reddit(): to connect to reddit(): DONE (skipping)") return self.reddit lg.debug('CointipBot::connect_reddit(): connecting to Reddit via OAuth2...') client_auth = requests.auth.HTTPBasicAuth(self.conf.reddit.auth.id, self.conf.reddit.auth.secret) post_data = {"grant_type": "password", "username": self.conf.reddit.auth.user, "password": self.conf.reddit.auth.password} conn = praw.Reddit(user_agent = self.conf.reddit.auth.user, api_request_delay=1.0) conn.set_oauth_app_info(client_id=self.conf.reddit.auth.id, client_secret=self.conf.reddit.auth.secret, redirect_uri=self.conf.reddit.auth.redirect) while True: response = requests.post("https://ssl.reddit.com/api/v1/access_token", auth=client_auth, data=post_data) r_info = json.loads(response.content) lg.debug(r_info) if (response.status_code == 200): self.conf.reddit.access_token = r_info['access_token'] conn.set_access_credentials(set(['edit','identity','privatemessages','read','submit','vote', 'creddits']),r_info['access_token']) print "Access Granted" self.conf.reddit.last_refresh = int(time.mktime(time.gmtime())) lg.info("CointipBot::connect_reddit(): logged in to Reddit as %s", self.conf.reddit.auth.user) return conn else: print "Sleeping..." time.sleep(10) pass # conn.login(self.conf.reddit.auth.user, self.conf.reddit.auth.password)
def setup(): return praw.Reddit(client_id=config['client_id'], client_secret=config['client_secret'], user_agent=config['user_agent'] )
def __init__(self, stream=True): self.auth = helpers.reddit_auth() self.api = praw.Reddit(**self.auth) self.stream = stream
def send_private_message(self, title, content): reddit = praw.Reddit(config.bot_name) reddit.redditor(self.username).message(title, content)
def __init__(self): self.reddit = praw.Reddit(config.bot_name)
def authenticate_api(): reddit = praw.Reddit(client_id=REDDIT['CLIENT_ID'], client_secret=REDDIT['CLIENT_SECRET'], user_agent=REDDIT['USER_AGENT']) return reddit
def get_reddit_submissions(subreddit): # Connect to Kafka producer = KafkaProducer(bootstrap_servers='kafka:9092') # Reddit API reddit = authenticate_api() submissions = 0 try: for submission in reddit.subreddit(subreddit).new(): sub = format_submission(submission) if submissions > 1000: break msg = producer.send('data', json.dumps(sub).encode('utf-8')) submissions += 1 print(submissions) with open('test.jsonl', 'a') as f: f.write(json.dumps(sub)+'\n') # Flush kafka producer producer.flush() except Exception as e: with open('Errors.txt', 'a') as f: f.write(str(type(e))+'\n') f.write(str(e)+'\n') # Flush kafka producer producer.flush() return subreddit