我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用slacker.Slacker()。
def post_message(message): if not SLACKBOT_TOKEN: print "Slackbot key not found, cannot notify slack of '%s'" % message print "Set the environment variable DRIFT_SLACKBOT_KEY to remedy." return final_message = "{}: {}".format(getpass.getuser(), message) try: from slacker import Slacker except ImportError: print "Message '{}' not posted to slack".format(message) print "Slack integration disabled. Enable slack with 'pip install slacker'" try: slack = Slacker(SLACKBOT_TOKEN) slack.chat.post_message(NOTIFICATION_CHANNEL, final_message) except Exception as e: log.warning("Cannot post '%s' to Slack: %s", final_message, e)
def get_users_info(): """SlackAPI?usersAPI???????? Slacker ? users.list API ????? - https://github.com/os/slacker - https://api.slack.com/methods/users.info :return dict: key?slack_id?value????????? """ users = {} webapi = slacker.Slacker(settings.API_TOKEN) try: for d in webapi.users.list().body['members']: users[d['id']] = d['name'] except slacker.Error: logger.error('Cannot connect to Slack') return users
def add_message_reaction(self, slackAPIToken, channel, timestamp, reaction, remove): try: if reaction == None: return reaction = reaction.strip() if len(reaction) == 0: return slackAPIConnection = Slacker(slackAPIToken) self._logger.debug("Sending Slack RTM reaction - Channel: " + channel + ", Timestamp: " + timestamp + ", Reaction: " + reaction + ", Remove: " + str(remove)) if remove: reaction_rsp = slackAPIConnection.reactions.remove(channel=channel, timestamp=timestamp, name=reaction) else: reaction_rsp = slackAPIConnection.reactions.add(channel=channel, timestamp=timestamp, name=reaction) if reaction_rsp.successful == None or reaction_rsp.successful == False: self._logger.debug("Slack RTM send reaction failed - Channel: " + channel + ", Timestamp: " + timestamp + ", Reaction: " + reaction + ", Remove: " + str(remove) + json.dumps(reaction_rsp.body)) else: self._logger.debug("Successfully sent Slack RTM reaction - Channel: " + channel + ", Timestamp: " + timestamp + ", Reaction: " + reaction + ", Remove: " + str(remove)) except Exception as e: self._logger.exception("Error sending Slack RTM reaction - Channel: " + channel + ", Timestamp: " + timestamp + ", Reaction: " + reaction + ", Remove: " + str(remove) + ", Error: " + str(e.message))
def notify(daily_file, weekly_file, stop_time, slack_api_token=None, use_channel_time=False): if slack_api_token is None: slack_api_token = SLACK_API_TOKEN slack = Slacker(slack_api_token) slack.files.upload( daily_file, channels=[SLACK_CHANNEL], title='Daily AWS Spot Price ending on {}'.format(stop_time) ) slack.files.upload( weekly_file, channels=[SLACK_CHANNEL], title='Weekly AWS Spot Price ending on {}'.format(stop_time) ) if use_channel_time: slack.chat.post_message( '#aws', '/time AWS Spot prices ending on {} UTC are available'.format(stop_time), username='AWS Bot') else: slack.chat.post_message( '#aws', 'AWS Spot prices ending on {} are available'.format(stop_time), username='AWS Bot')
def _login_oauth(self): try: oauth = Slacker.oauth.access( client_id=SLACK_CLIENT_ID, client_secret=SLACK_CLIENT_SECRET, code=flask.request.args['code'], redirect_uri=WebServer.url_for('login') ).body except Error as err: self.mongo.db.z_errors.insert_one({'_id': time.time(), 'ctx': 'oauth', 'msg': str(err)}) return WebServer._basic_page('OAuth error', 'OAuth error: ' + str(err)) token = oauth['access_token'] identity_only = oauth['scope'].count(',') == 1 return self._login_with_token(token, identity_only)
def tokens_validation(self): self.log.info('Validating tokens') for token, enc_key in self.tokens.decrypt_keys_map().items(): self.log.info('Check token %s', token) try: user_info = Slacker(token).auth.test().body SlackArchive.api_call_delay() except Error as err: self.mongo.db.z_errors.insert_one({'_id': time.time(), 'ctx': 'tokens_validation', 'msg': str(err)}) self.log.exception('Error %s for token %s', str(err), token) del self.tokens[enc_key] continue self.log.info('Valid token') self.tokens.upsert(token, user_info)
def _post(self, message): """Post message to Slack""" key = self.GetConfigEntryWithDefault("SlackAPIKey", None) if not key: return channel = self.GetConfigEntryWithDefault("SlackChannel", None) if not channel: return as_user = self.GetBooleanConfigEntryWithDefault("SlackAsUser", True) as_user = bool(as_user) from slacker import Slacker slack = Slacker(key) # Post to slack slack.chat.post_message(channel, message, as_user=as_user)
def __init__( self, actor, oid, apiToken, botToken ): self.actor = actor self.oid = oid self.apiToken = apiToken self.botToken = botToken self.slack = Slacker( self.apiToken ) self.bot = SlackClient( self.botToken ) self.botId = None self.invId = str( uuid.uuid4() ) self.taskId = 0 self.slackLinkRE = re.compile( r'<.+://.+\|(.+)>' ) self.history = { 'last_cmd' : [] } self.makeChannel( '#detects' ) if not self.bot.rtm_connect(): raise Exception( 'failed to connect bot to Slack rtm API' ) self.actor.newThread( self.botThread )
def safe_post_message(slacker, crawler, start_id, post_message, max_retries=3): """ post for my slack channel :param slacker.Slacker slacker: :param g_crawler.crawler.GCrawler crawler: :param int start_id: :param str post_message: :param int max_retries=3: """ retries = 0 while True: try: slacker.chat.post_message("#crawler", "[{}] [ID: {}] {}".format(crawler.__class__.__name__, start_id, post_message)) except HTTPError as http_err: retries += 1 if retries >= max_retries: raise Exception("Too many retries.") wait = 2 ** (retries) print("[ RETRY ] Waiting {} seconds...".format(wait)) time.sleep(wait) else: break
def get(self, request, resource=None): app_integration = get_integration_token( request.user, APP_INTEGRATION_PROVIDER_SLACK, task=request.GET.get('task') ) if app_integration and app_integration.extra: extra = json.loads(app_integration.extra) slack_client = Slacker(app_integration.token) response = None if resource == 'channels': channel_response = slack_client.channels.list(exclude_archived=True) if channel_response.successful: response = channel_response.body.get(slack_utils.KEY_CHANNELS, None) else: response = { 'team': {'name': extra.get('team_name'), 'id': extra.get('team_id', None)}, # 'incoming_webhook': {'channel': extra.get('incoming_webhook').get('channel')} } if response: return Response(response, status.HTTP_200_OK) return Response({'status': 'Failed'}, status.HTTP_400_BAD_REQUEST) return Response({'status': 'Not implemented'}, status.HTTP_501_NOT_IMPLEMENTED)
def get_and_add_users(slack_importer): # type: (Slacker) -> Dict[str, Dict[str, str]] users = slack_importer.get_slack_users_email() added_users = {} print('######### IMPORTING USERS STARTED #########\n') for user_id, user in users.items(): r = client.create_user({ 'email': user['email'], 'full_name': user['name'], 'short_name': user['name'] }) if not r.get('msg'): added_users[user_id] = user print(u"{} -> {}\nCreated\n".format(user['name'], user['email'])) else: print(u"{} -> {}\n{}\n".format(user['name'], user['email'], r.get('msg'))) print('######### IMPORTING USERS FINISHED #########\n') return added_users
def post_slack_message(message=None, channel=None, username=None, icon_emoji=None): """Format the message and post to the appropriate slack channel. Args: message (str): Message to post to slack channel (str): Desired channel. Must start with # """ LOG.debug('Slack Channel: %s\nSlack Message: %s', channel, message) slack = slacker.Slacker(SLACK_TOKEN) try: slack.chat.post_message(channel=channel, text=message, username=username, icon_emoji=icon_emoji) LOG.info('Message posted to %s', channel) except slacker.Error: LOG.info("error posted message to %s", channel)
def __init__(self, channel, *args, **kwargs): super().__init__(*args, **kwargs) self._channels = None self.slack = Slacker(self.token) self.channel = self.channels[channel]
def send_slack(self, attachments): """Sends a message to Slack""" # Create a Slack bot instance slack = Slacker(os.environ['SLACK_API_TOKEN']) # Send a message to #general channel self.log.info('sending a message to Slack') slack.chat.post_message('#general', text = ' ', username = 'timetable', icon_emoji = ':spiral_calendar_pad:', attachments = attachments)
def __init__(self, api_key, name, default_channel, default_priority='normal'): self.api_key = api_key self.name = name self.default_channel = default_channel self.slack = Slacker(self.api_key) self.default_priority = default_priority
def postMessage(message: str, channel: str=config.channel, attachments: List=[]) -> Optional[Mapping]: """ Post a message to the specified slack channel """ if secrets.slackOauth: slack = Slacker(secrets.slackOauth) if channel: resp = slack.chat.post_message(channel, message, attachments=attachments) return resp.body return None
def slackbot(text): """ Posts messages to Slack channel based on environment """ slack = Slacker(SLACK_TOKEN) username = "electionsbot" icon_url = "https://c1.staticflickr.com/6/5667/20571587160_92070a9546_b.jpg" # icon_url = "https://pixabay.com/static/uploads/photo/2013/07/13/13/41/robot-161368_960_720.png" ## set channel based on the environment: local, test, prod if mccelectionsenv == "local": channel = "#electionserverlocal" elif mccelectionsenv == "test": channel = "#electionservertest" elif mccelectionsenv == "prod": channel = "#electionserverprod" ## uses try statement in order to avoid requests package error: # Max retries exceeded with url try: slack.chat.post_message(channel, text, username=username, link_names=True, icon_url=icon_url) # return "Messaged posted: %s" % (text) except: print "WARNING: An error occured when trying to send the text to Slack." ## outputs to command line so you can follow along/log there, especially when working locally print text
def connect(self): self.__client = Slacker(self.__api_key) self.update_channels() # Send a message letting the channel know that this alarm started
def __init__(self, token, bot_icon=None, bot_emoji=None, connect=True): self.token = token self.bot_icon = bot_icon self.bot_emoji = bot_emoji self.username = None self.domain = None self.login_data = None self.websocket = None self.users = {} self.channels = {} self.connected = False self.webapi = slacker.Slacker(self.token) if connect: self.rtm_connect()
def __init__(self, token): self.token = token # Slacker is a Slack Web API Client self.web = Slacker(token) # SlackClient is a Slack Websocket RTM API Client self.rtm = SlackClient(token)
def connect(): global connected if connected: return connected = True slacker = Slacker(API_TOKEN) url = slacker.rtm.start().body["url"] ws = WebSocketApp(url, on_message = on_message, on_error = on_error, on_close = on_close) ws.on_open = on_open ws.run_forever()
def __init__(self, config, channel_name, user_name): self.slacker = Slacker(config.decrypt('SlackAPI')) self.channel_name = channel_name self.user_name = user_name self.icon = config.get('icon', '') # Bot icon URL self.botname = config.get('BotName', 'Nimbus') self.DEBUG = config.DEBUG
def __init__(self, name='secbot', token=None, websocket_delay=0.5, config_path='config.ini'): self.name = name self.mode = mode self.websocket_delay = websocket_delay self.config = configparser.ConfigParser() self.config_path = config_path self.config.read(self.config_path) if not token: token = os.environ.get('SLACK_BOT_TOKEN') if self.mode == 'slacker': self.session = Session() self.slack = Slacker(token, session=self.session) elif self.mode == 'slackclient': self.slack = SlackClient(token) self.id = self.get_id() self.at_bot = "<@{}>".format(self.id) self.handlers = self.get_handlers() self.jobs = {} self.executor = ThreadPoolExecutor() #self.executor = ProcessPoolExecutor()
def start(self): if self.mode == 'slacker': exit('Slacker mode is missing websocket for reading events... sorry!') if self.slack.rtm.connect().body.get('ok'): print("[+] SecBot connected and running!") ch_joiner = self.join_channels() while True: channel, user, ts, message, at_bot = self.parse_slack_output(self.slack.rtm.read()) if message and channel: self.executor.submit(self.handle_command, channel, user, ts, message, at_bot) #self.handle_command(channel, user, ts, message, at_bot) time.sleep(self.websocket_delay) if self.mode == 'slackclient': if self.slack.rtm_connect(no_latest=True, no_unreads=True, presence_sub=True): print("[+] SecBot connected and running!") ch_joiner = self.join_channels() while True: channel, user, ts, message, at_bot = self.parse_slack_output(self.slack.rtm_read()) if message and channel: self.executor.submit(self.handle_command, channel, user, ts, message, at_bot) #self.handle_command(channel, user, ts, message, at_bot) time.sleep(self.websocket_delay) else: print("[!] Connection failed. Invalid Slack token or bot ID?")
def __init__(self): super(Bot, self).__init__() self.name = "pythonboardingbot" self.emoji = ":robot_face:" # When we instantiate a new bot object, we can access the app # credentials we set earlier in our local development environment. self.oauth = {"client_id": settings.SLACK_CLIENT_ID, "client_secret": settings.SLACK_CLIENT_SECRET, # Scopes provide and limit permissions to what our app # can access. It's important to use the most restricted # scope that your app will need. "scope": "bot"} self.verification = settings.SLACK_VERIFICATION_TOKEN # NOTE: Python-slack requires a client connection to generate # an oauth token. We can connect to the client without authenticating # by passing an empty string as a token and then reinstantiating the # client with a valid OAuth token once we have one. self.client = SlackClient("") self.slacker = Slacker("") # We'll use this dictionary to store the state of each message object. # In a production envrionment you'll likely want to store this more # persistantly in a database. self.messages = {} self.authed_teams = {} #self.last_messages = list()
def auth(self, code): """ Authenticate with OAuth and assign correct scopes. Save a dictionary of authed team information in memory on the bot object. Parameters ---------- code : str temporary authorization code sent by Slack to be exchanged for an OAuth token """ # After the user has authorized this app for use in their Slack team, # Slack returns a temporary authorization code that we'll exchange for # an OAuth token using the oauth.access endpoint auth_response = self.client.api_call( "oauth.access", client_id=self.oauth["client_id"], client_secret=self.oauth["client_secret"], code=code ) # To keep track of authorized teams and their associated OAuth tokens, # we will save the team ID and bot tokens to the global # authed_teams object team_id = auth_response["team_id"] self.authed_teams[team_id] = {"bot_token": auth_response["bot"]["bot_access_token"]} # Then we'll reconnect to the Slack Client with the correct team's # bot token self.slacker = Slacker(self.authed_teams[team_id]["bot_token"]) team = self.find_team(team_id) if team is None: team = Team(team_id=team_id, name=auth_response["team_name"], bot_user_id=auth_response["bot"]["bot_user_id"], bot_access_token=auth_response["bot"]["bot_access_token"]) team.save()
def find_team(self, team_id): try: team = Team.objects.get(team_id=team_id) #self.client = SlackClient(team.bot_access_token) self.slacker = Slacker(team.bot_access_token) self.id = team.bot_user_id except Team.DoesNotExist: team = None return team
def _is_admin(user): """ ?????Slack?Admin??????? :param user: Slack?????ID """ slack = Slacker(settings.API_TOKEN) user_info = slack.users.info(user) return user_info.body['user']['is_admin']
def _send_password_on_dm(message, email, password): """ ?????????????? DM ??????????????????? :param email: ???????????? :param password: ????????????? """ # ??????DM??????ID??? user = message._body['user'] slack = Slacker(settings.API_TOKEN) result = slack.im.open(user) dm_channel = result.body['channel']['id'] msg = '???? `{}` ??????? `{}` ??'.format(email, password) # DM???????????????? message._client.rtm_send_message(dm_channel, msg)
def _get_user_name(user_id): """ ????? Slack ? user_id ????? username ??? Slacker ? users.list API ????? - https://github.com/os/slacker - https://api.slack.com/methods/users.info """ webapi = slacker.Slacker(settings.API_TOKEN) response = webapi.users.info(user_id) if response.body['ok']: return response.body['user']['name'] else: return ''
def random_command(message, subcommand=None): """ ??????????????????????????? - https://github.com/os/slacker - https://api.slack.com/methods/channels.info - https://api.slack.com/methods/users.getPresence - https://api.slack.com/methods/users.info """ if subcommand == 'help': botsend(message, '''- `$random`: ???????????????????????? - `$random active`: ????????active????????????????? ''') return # ??????????????? channel = message.body['channel'] webapi = slacker.Slacker(settings.API_TOKEN) cinfo = webapi.channels.info(channel) members = cinfo.body['channel']['members'] # bot ? id ??? bot_id = message._client.login_data['self']['id'] members.remove(bot_id) member_id = None while not member_id: # ?????????????????? member_id = random.choice(members) if subcommand == 'active': # active ??????????? presence ????? presence = webapi.users.get_presence(member_id) if presence.body['presence'] == 'away': members.remove(member_id) member_id = None user_info = webapi.users.info(member_id) name = user_info.body['user']['name'] botsend(message, '{} ?????????'.format(name))
def __init__(self, config): self.slack_api_token = config['slack_api_token'] self.slack_username = config['slack_username'] self.slack_channel = config['slack_channel'] self.slack_emote = config['slack_emote'] self.slack_shoutout = config['slack_shoutout'] self.slack_text_prefix = 'System Error {shoutout}: '.format( shoutout=self.slack_shoutout) self.slack = Slacker(self.slack_api_token)
def _login_with_token(self, token, identity_only): try: api_auth = Slacker(token).auth.test().body assert api_auth['team_id'] == SLACK_TEAM_ID except Error as err: self.mongo.db.z_errors.insert_one({'_id': time.time(), 'ctx': 'auth.test', 'msg': str(err)}) return WebServer._basic_page('Auth error', 'Auth error: ' + str(err)) except AssertionError: return WebServer._basic_page('Wrong team', 'Wrong team: ' + api_auth['team']) return self._login_success(token, api_auth, identity_only)
def __init__(self, mongo, ctx, tokens, api_key): self.mongo = mongo self.log = SlackArchive.get_logger() self.people = mongo_store.MongoStore(self.mongo.db.users, ctx) self.streams = mongo_store.MongoStore(self.mongo.db.streams, ctx) self.tokens = tokens self.api_handle = Slacker(api_key) self.scheduler = scheduler.Scheduler(ctx, mongo) self._setup_scheduler() self.scheduler.start()
def streams_fetch(self, token): enc_key = self.tokens.get_key_by_known_token(token) user_info = self.tokens[enc_key] if user_info['full_access']: try: self.log.info('Fetch channels for %s', user_info['login']) all_ch = Slacker(token).channels.list().body SlackArchive.api_call_delay() self.people.set_field(user_info['user'], 'channels', SlackArchive._filter_channel_ids(all_ch)) self.update_streams(all_ch) # not a duplicate op: fight races self.log.info('Fetch %s\'s private groups', user_info['login']) groups = Slacker(token).groups.list().body SlackArchive.api_call_delay() self.people.set_field(user_info['user'], 'groups', SlackArchive._filter_group_ids(groups)) self.update_streams(groups, user_info['user']) self.log.info('Fetch direct msgs for %s', user_info['login']) ims = Slacker(token).im.list().body SlackArchive.api_call_delay() self.people.set_field(user_info['user'], 'ims', SlackArchive._filter_im_ids(groups, ims)) self.update_streams(ims, user_info['user']) except Error as err: self.mongo.db.z_errors.insert_one({'_id': time.time(), 'ctx': 'channels_fetch', 'msg': str(err)}) self.log.exception('Fetch streams error %s', str(err)) # full access was revoked if str(err) == 'missing_scope': self.tokens.set_field(enc_key, 'full_access', False)
def __init__(self): # Open database connection self.db = mysql.connect(host=ROJAK_DB_HOST, port=ROJAK_DB_PORT, user=ROJAK_DB_USER, passwd=ROJAK_DB_PASS, db=ROJAK_DB_NAME) self.cursor = self.db.cursor() self.media = {} try: # Get media information from the database self.logger.info('Fetching media information') self.cursor.execute(sql_get_media, [self.name]) row = self.cursor.fetchone() self.media['id'] = row[0] self.media['last_scraped_at'] = row[1] except mysql.Error as err: self.logger.error('Unable to fetch media data: %s', err) raise NotConfigured('Unable to fetch media data: %s' % err) if ROJAK_SLACK_TOKEN != '': self.is_slack = True self.slack = Slacker(ROJAK_SLACK_TOKEN) else: self.is_slack = False self.logger.info('Post error to #rojak-pantau-errors is disabled') # Capture the signal spider_opened and spider_closed # https://doc.scrapy.org/en/latest/topics/signals.html
def slackbot(text): """ Posts messages to Slack channel based on environment """ slack = Slacker(SLACK_TOKEN) username = "DocPubBot" icon_url = "http://autobinaryrobots.com/wp-content/uploads/2016/11/robot.png" ## map environment to related Slack channel env_channels = { "local": "#docpublocal", "test": "#docpubtest", "prod": "#docpubprod" } ## set channel based on the environment channel = env_channels[DOCPUBENV] # channel = os.environ["MCCELECTIONSENV"] ## uses try statement in order to avoid requests package error: # "Max retries exceeded with url" try: slack.chat.post_message(channel, text, username=username, link_names=True, icon_url=icon_url) # return "Messaged posted: %s" % (text) except: print("WARNING: An error occured when trying to send the text to Slack.") ## outputs to command line so you can follow along/log there, especially when working locally print(text)
def main(argv=None): global VERBOSE argv = sys.argv[1:] if argv is None else argv args = parser.parse_args(argv) VERBOSE = args.verbose archives = list(args_get_archives(args)) if not archives: print "ERROR: no archives specified. Specify an archive or a config file." return 1 for a in archives: print "Processing:", a["name"] slack = Slacker(a["token"]) with SlackArchive(slack, a) as archive: needs_upgrade = archive.needs_upgrade() if needs_upgrade: print "Notice: wayslack needs to fiddle around with some symlinks." print "This will cause some non-destructive changes to the directory." res = raw_input("Continue? Y/n: ") if res and res.lower()[:1] != "y": break archive.upgrade() if needs_upgrade or args.download_everything: archive.download_all_files() archive.refresh() archive.delete_old_files(args.confirm_delete)
def __init__(self, config): self.config = config self.slack = Slacker(self.config.slackToken) self.icon = SLACK_ICON
def connect(self): self.client = Slacker(self.api_key) self.update_channels() #Set the appropriate settings for each alert
def __init__(self, *args, **kwargs): super(TeamSettingsForm, self).__init__(*args, **kwargs) slack = Slacker(kwargs['instance'].access_token) priv_ch = [(g['id'], g['name']) for g in slack.groups.list().body['groups'] if not (g['is_archived'] or g['name'].startswith('mpdm'))] pub_ch = [(c['id'], c['name']) for c in slack.channels.list().body['channels'] if not c['is_archived']] users = [(u['id'], u['profile']['real_name']) for u in slack.users.list().body['members'] if not u['deleted']] self.fields['post_channel'].widget = forms.Select(choices=tuple(pub_ch)) self.fields['approval_channel'].widget = forms.Select(choices=tuple(pub_ch + priv_ch + users))
def slacker(): """ Keep the slacker session short-lived, use 'with slacker() as slack'""" slack = Slacker(os.environ['SLACK_API_TOKEN']) yield slack
def slack_notify(text=None, channel='#test', username='???', attachments=None): token = settings.SLACK_TOKEN slack = Slacker(token) slack.chat.post_message(text=text, channel=channel, username=username, attachments=attachments)
def __init__(self, token, verbose=False, request_pause_period=0.5): self.slack = slacker.Slacker(token) self.verbose = verbose self.request_pause_period = 0.5 self._invite_link = None
def __init__(self, api_key, username, channel): """ :param api_key: string :param username: string :param channel: string """ self.username = username self.channel = channel self.slack = Slacker(api_key)
def auth_slack(): api = Slacker(bot_secret_) try: api.auth.test() except Exception as e: print('slack not authed\n', e) return api