我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用contextlib.suppress()。
def create_notification(**kwargs): """Notify signal receiver.""" # make fresh copy and retain kwargs params = kwargs.copy() del params['signal'] del params['sender'] with suppress(KeyError): del params['silent'] silent = kwargs.get('silent', False) # If it's a silent notification create the notification but don't save it if silent: notification = Notification(**params) else: notification = Notification.objects.create(**params) # send via custom adapters for adapter_path in getattr(settings, 'NOTIFICATION_ADAPTERS', []): adapter = import_attr(adapter_path) adapter(**kwargs).notify() if getattr(settings, 'NOTIFICATIONS_WEBSOCKET', False): send_to_queue(notification)
def value_set(self, colname, key, filter=None, sort=True, flat=False, **kwargs): """Return a set of all values in a key""" if filter is not None: data = self.get_collection(colname).find(filter, **kwargs) else: data = self.get_collection(colname).find(**kwargs) values = [item.get(key) for item in data if item.get(key) is not None] if flat is True: values = list(itertools.chain(*values)) with suppress(TypeError): values = list(set(values)) return sorted(values) if sort is True else values
def __delitem__(self, key): key = key.casefold() if key == 'targetname': with suppress(KeyError): self.map.by_target[ self.keys.get('targetname', None) ].remove(self) self.map.by_target[None].add(self) if key == 'classname': with suppress(KeyError): self.map.by_class[ self.keys.get('classname', None) ].remove(self) self.map.by_class[None].add(self) for k in self.keys: if k.casefold() == key: del self.keys[k] break
def future_with_timeout(loop, timeout, future=None): loop = loop or asyncio.get_event_loop() f = future or create_future(loop=loop) def on_timeout(): if f.done(): return f.set_exception(TimeoutError) if timeout: handler = loop.call_later(timeout, on_timeout) def on_result(*_): with suppress(Exception): handler.cancel() f.add_done_callback(on_result) return f
def unfreeze(self, user=None): from pretalx.schedule.models import TalkSlot if not self.version: raise Exception('Cannot unfreeze schedule version: not released yet.') # collect all talks, which have been added since this schedule (#72) submission_ids = self.talks.all().values_list('submission_id', flat=True) talks = self.event.wip_schedule.talks \ .exclude(submission_id__in=submission_ids) \ .union(self.talks.all()) wip_schedule = Schedule.objects.create(event=self.event) new_talks = [] for talk in talks: new_talks.append(talk.copy_to_schedule(wip_schedule, save=False)) TalkSlot.objects.bulk_create(new_talks) self.event.wip_schedule.talks.all().delete() self.event.wip_schedule.delete() with suppress(AttributeError): del wip_schedule.event.wip_schedule return self, wip_schedule
def sort_queryset(self, qs): sort_key = self.request.GET.get('sort') if sort_key: plain_key = sort_key[1:] if sort_key.startswith('-') else sort_key reverse = not (plain_key == sort_key) if plain_key in self.sortable_fields: is_text = False with suppress(FieldDoesNotExist): is_text = isinstance(qs.model._meta.get_field(plain_key), CharField) if is_text: # TODO: this only sorts direct lookups case insensitively # A sorting field like 'speaker__name' will not be found qs = qs.annotate(key=Lower(plain_key)).order_by('-key' if reverse else 'key') else: qs = qs.order_by(sort_key) return qs
def _select_locale(self, request): supported = request.event.locales if (hasattr(request, 'event') and request.event) else settings.LANGUAGES language = ( self._language_from_user(request, supported) or self._language_from_cookie(request, supported) or self._language_from_browser(request, supported) ) if hasattr(request, 'event') and request.event: language = language or request.event.locale translation.activate(language) request.LANGUAGE_CODE = translation.get_language() with suppress(pytz.UnknownTimeZoneError): if request.user.is_authenticated: tzname = request.user.timezone elif hasattr(request, 'event') and request.event: tzname = request.event.timezone else: tzname = settings.TIME_ZONE timezone.activate(pytz.timezone(tzname)) request.timezone = tzname
def parse_json(self): data = self._get_config_data() try: json_data = json.loads(data.value) # pylint: disable=no-member except ValueError: raise InvokeError('Failed to parse JSON data') # simple ordered dict command_execution_pairs = [ ('ab', ApacheBenchExecution), ('nginx', NginxExecution), ('netperf', NetperfExecution), ('iperf', IperfExecution), ] for command_type, exec_class in command_execution_pairs: with suppress(KeyError): return exec_class(json_data[command_type], self) raise InvokeError('No command found to parse')
def pytest_report_teststatus(self, report): outcome = yield if report.when == 'call': if report.passed: result = self.RESULT_PASSED elif report.skipped: result = self.RESULT_SKIPPED else: # if report.failed: result = self.RESULT_FAILED status, letter, msg = self.get_result(result) if report.passed: # print return result instead of 'PASSED' with suppress(AttributeError): msg = report.retval outcome.result = status, letter, msg
def _show_feedback_label(self, message, seconds=None): """Display a message in lbl_feedback, which then times out after some number of seconds. Use after() to schedule a callback to hide the feedback message. This works better than using threads, which can cause problems in Tk. """ if seconds is None: seconds = CONFIG['MESSAGE_DURATION'] # cancel any existing callback to clear the feedback # label. this prevents flickering and inconsistent # timing during rapid input. with contextlib.suppress(AttributeError): self.root.after_cancel(self.clear_feedback) logger.debug('Label feedback: "{}"'.format(message)) self.feedback.set(message) self.clear_feedback = self.root.after( 1000 * seconds, lambda: self.feedback.set("") )
def on_guild_join(self, guild): """Send the bot introduction message when invited.""" self.logger.info('New guild: ' + guild.name) if self.selfbot: return try: await self.send_message(guild.default_channel, join_msg) except discord.Forbidden: satisfied = False c_count = 0 try_channels = list(guild.channels) channel_count = len(try_channels) - 1 while not satisfied: with suppress(discord.Forbidden, discord.HTTPException): await self.send_message(try_channels[c_count], join_msg) satisfied = True if c_count > channel_count: self.logger.warning('Couldn\'t announce join to guild ' + guild.name) satisfied = True c_count += 1
def from_code(cls, *lines, index=0): line = lines[index].strip() match = (cls.RE_ENV_SET.match(line) or cls.RE_QUOTED_CAPITALS.search(line)) if not match: log.debug("Skipped line %s without variable: %r", index + 1, line) return None name = match.group('name') context = line with suppress(IndexError): if context.endswith('{'): context += lines[index + 1].strip() context += lines[index + 2].strip() variable = cls(name, context=context) log.info("Loaded variable: %s", variable) return variable
def run(self): self._interaction = asyncio.ensure_future(self._game_screen.interact(timeout=None, delete_after=False)) self._runner = asyncio.ensure_future(self.run_loop()) # await self._game_screen.wait_until_ready() try: return await self._runner finally: # For some reason having all these games hanging around causes lag. # Until I properly make a delete_after on the paginator I'll have to # settle with this hack. async def task(): await asyncio.sleep(30) with contextlib.suppress(discord.HTTPException): await self._game_screen._message.delete() self.ctx.bot.loop.create_task(task()) self._interaction.cancel()
def _inrole(ctx, *roles, members, final='and'): joined_roles = human_join(map(str, roles), final=final) truncated_title = truncate(f'Members in {pluralize(role=len(roles))} {joined_roles}', 256, '...') total_color = map(sum, zip(*(role.colour.to_rgb() for role in roles))) average_color = discord.Colour.from_rgb(*map(round, (c / len(roles) for c in total_color))) if members: entries = sorted(map(str, members)) # Make the author's name bold (assuming they have that role). # We have to do it after the list was built, otherwise the author's name # would be at the top. with contextlib.suppress(ValueError): index = entries.index(str(ctx.author)) entries[index] = f'**{entries[index]}**' else: entries = ('There are no members :(', ) pages = ListPaginator(ctx, entries, colour=average_color, title=truncated_title) await pages.interact()
def _get_afk_embed(self, member): message = self.afks.get(member.id) if message is None: return None avatar = member.avatar_url colour = await user_color(member) title = f"{member.display_name} is AFK" embed = (discord.Embed(description=message, colour=colour) .set_author(name=title, icon_url=avatar) .set_footer(text=f"ID: {member.id}") ) with contextlib.suppress(IndexError): embed.timestamp = self.user_message_queues[member.id][-1] return embed
def on_voice_state_update(self, before, after): if before == self._client.user: if after.voice.voice_channel != self._voice_channel: log.warning('Client was disconnected from the voice channel') await self.connect_voice() return # joining if before.voice.voice_channel != self._voice_channel and after.voice.voice_channel == self._voice_channel: with suppress(database.bot.IgnoredUserError): if await self._database.interaction_check(int(after.id)): await self._send_welcome_message(after) await self._users.add_listener(int(after.id), direct=False) # leaving elif before.voice.voice_channel == self._voice_channel and after.voice.voice_channel != self._voice_channel: try: await self._users.remove_listener(int(after.id), direct=False) except ValueError: log.warning('Tried to remove {0} (ID: {0.id}) from listeners but the user was not listed'.format(after))
def backup(self): """Backup files from source directory to destination.""" logzero.logfile("directory_backup_%s.log" % str(d.now().strftime(self._f2))) sep = 50*'-' log.info("#%s" % sep) log.info("The script name is %s" % os.path.basename(__file__)) log.info("The date and time is currently %s" % str(d.now().strftime(self._f1))) with contextlib.suppress(OSError): # TODO Add SameFileError from shutil # TODO Add threading log.error(OSError) copytree(self.sourcedir, self.backupdir) log.info('%s has been backed up.' % self.sourcedir)
def __init__(self, attack=False, interface=False, stdout=None, stderr=None, **kwargs): self.stdout = stdout self.stderr = stderr if not stdout: self.stdout = DEVNULL if not stderr: self.stderr = DEVNULL self.interface = interface if attack not in self._allowed_attacks: raise WrongArgument self.attack = attack extra = tuple() with suppress(AttributeError): extra = getattr(self, "_allowed_arguments_{}".format(attack)) if extra: self._allowed_arguments += extra self._allowed_arguments.append((attack, False)) kwargs[attack] = True super(self.__class__, self).__init__(**kwargs)
def _process_rule(self, rule): path = {} view_func = self.app.view_functions[rule.endpoint] schema = self._extract_schema(view_func) if schema: path['parameters'] = [{ 'in': 'body', 'name': 'payload', 'schema': schema }] with suppress(AttributeError): path['responses'] = view_func.responses add_optional(path, 'description', self._extract_description(view_func)) add_optional( path, 'deprecated', getattr(view_func, 'deprecated', None)) with suppress(AttributeError): path['tags'] = sorted(view_func.tags) return path
def serve_many(workers=1): # thank you sanic workers = min(workers, multiprocessing.cpu_count()) event = multiprocessing.Event() signal(SIGINT, lambda *_: event.set()) signal(SIGTERM, lambda *_: event.set()) processes = [] kwargs = dict(reuse_port=True) for _ in range(workers): # noinspection PyArgumentList process = multiprocessing.Process(target=serve, kwargs=kwargs, daemon=True) process.start() print('Started subprocess:', process.name, process.pid) processes.append(process) with contextlib.suppress(Exception): while not event.is_set(): time.sleep(0.5) [process.terminate() for process in processes] [process.join() for process in processes]
def run(coro, loop=None): async def main_task(): pycurl_task = aio.ensure_future(curl_loop()) try: r = await coro finally: pycurl_task.cancel() with suppress(aio.CancelledError): await pycurl_task return r, pycurl_task if loop is None: loop = uvloop.new_event_loop() # loop = aio.get_event_loop() aio.set_event_loop(loop) loop.set_exception_handler(exception_handler) r, _ = loop.run_until_complete(main_task()) return r
def redbaron_pyfor_to_vhdl(red_node): def modify_for(red_node): # if for range contains call to 'range' -> skip with suppress(Exception): if red_node.target('call')[0].previous.value == 'range': return red_node frange = red_node.target ite = red_node.iterator red_node(ite.__class__.__name__, value=ite.value) \ .map(lambda x: x.replace(f'{frange}[_i_]')) red_node.iterator = '_i_' return red_node fors = red_node.find_all('for') for x in fors: modify_for(x) return red_node
def flush_pipeline(func): """ For inputs: adds 'x.get_delay()' dummy samples, to flush out pipeline values For outputs: removes the first 'x.get_delay()' samples, as these are initial pipeline values""" @wraps(func) def flush_pipeline_wrap(self, *args, **kwargs): delay = 0 with suppress(AttributeError): # no get_delay() delay = self.model._delay if delay == 0: return func(self, *args, **kwargs) args = list(args) for i in range(delay): args.append(args[0]) ret = func(self, *args, **kwargs) ret = ret[delay:] return ret return flush_pipeline_wrap
def field_kwargs(self): kwargs = {'label': capfirst(' '.join(self.property.key.split('_'))), 'help_text': self.property.doc} with suppress(AttributeError): enum_class = self.column.type.enum_class if enum_class is not None: kwargs['enum_class'] = self.column.type.enum_class else: kwargs['choices'] = self.column.type.enums with suppress(AttributeError): kwargs['max_digits'] = self.column.type.precision with suppress(AttributeError): kwargs['decimal_places'] = self.column.type.scale with suppress(AttributeError): kwargs['max_length'] = self.column.type.length kwargs['required'] = not self.column.nullable kwargs['allow_null'] = self.column.nullable return kwargs
def __exit__(self: 'FileChange', *exc) -> None: """Clean up after change -- both success and failure.""" if any(exc): # Failure -- rollback if self.__valid_destination: if self.__file_change: # Ignore missing file on deletion with suppress(FileNotFoundError): self.new_path.unlink() if self.__valid_source: if self.__file_change: self.tmp_path.rename(self.old_path) if self.__store_change: self.source[self.old_file.mod.id] = self.old_file else: # Success -- change metadata if self.__valid_destination: self.destination[self.new_file.mod.id] = self.new_file # Clean temporary file if self.__valid_source and self.__file_change: self.tmp_path.unlink()
def save(self, **kwargs): new = self.pk is None if not new and (self.was_processed or not self.processed): raise TypeError super().save(**kwargs) with suppress(FileExistsError): os.mkdir(os.path.dirname(self._changed_geometries_filename())) from c3nav.mapdata.utils.cache.changes import changed_geometries pickle.dump(changed_geometries, open(self._changed_geometries_filename(), 'wb')) if new: transaction.on_commit( lambda: cache.set('mapdata:last_update', self.to_tuple, 300) ) if settings.HAS_CELERY: transaction.on_commit( lambda: process_map_updates.delay() )
def clean_cut_polygon(polygon: Polygon) -> Polygon: interiors = [] interiors.extend(cut_ring(polygon.exterior)) exteriors = [(i, ring) for (i, ring) in enumerate(interiors) if ring.is_ccw] with suppress(AttributeError): delattr(polygon, 'c3nav_cache') if len(exteriors) != 1: raise ValueError('Invalid cut polygon!') exterior = interiors[exteriors[0][0]] interiors.pop(exteriors[0][0]) for ring in polygon.interiors: interiors.extend(cut_ring(ring)) return Polygon(exterior, interiors)
def shutdown(self, timeout=15.0): """Worker process is about to exit, we need cleanup everything and stop accepting requests. It is especially important for keep-alive connections.""" if self._request_handler is None: return self._closing = True if self._request_count > 1 and not self._reading_request: # force-close idle keep-alive connections self._request_handler.cancel() elif timeout: canceller = self._loop.call_later(timeout, self._request_handler.cancel) with suppress(asyncio.CancelledError): yield from self._request_handler canceller.cancel() else: self._request_handler.cancel()
def upsert_comment_chain(mongo, identifier, recursive=False): """ Upsert given comments and its parent(s). Args: mongo: mongodb instance identifier: Post identifier recursive: (Defaults to False). If True, recursively update all parent comments, incl. root post. """ with suppress(PostDoesNotExist): p = Post(identifier) if p.is_comment(): mongo.Comments.update({'identifier': p.identifier}, p.export(), upsert=True) parent_identifier = '@%s/%s' % (p.parent_author, p.parent_permlink) if recursive: upsert_comment_chain(mongo, parent_identifier, recursive) else: upsert_comment(mongo, parent_identifier) else: return mongo.Posts.update({'identifier': p.identifier}, p.export(), upsert=True)
def load(self, arg): '''Always load all proxies, we'll be able to prune and rearrange later ''' self.master_plist=OrderedDict() if isinstance(arg, str): # File name lg.info('Loading {}'.format(arg)) c=Counter() with suppress(FileNotFoundError), open(arg) as f: rd=reader(f, delimiter='\t') for row in rd: # (proxy, status) status=row[1] if len(row) > 1 else '' c.update([status]) p=proxy(row[0], status) self.master_plist[p.p]=p lg.info('Loaded {} {}'.format(arg, c)) else: # Iterable of proxies for i in arg: p=proxy(i, 'G') # Mark as good self.master_plist[p.p]=p lg.info('Loaded {} proxies from iterable'.format(len(self.master_plist)))
def ensure_latest_consul_release(host=None, port=22): """ Make sure the latest consul release is downloaded on the remote machine :param str host: hostname or ip of the remote machine, or None for the local machine :param int port: port to use to connect to the remote machine over ssh :return None: """ log.info("Ensuring consul release {} is on disk " "on the remote machine".format(CONSUL_RELEASE.split('/')[-1])) with suppress(FileNotFoundError): # remove any previously existing zip in case we tried # before but the zip was corrupted remove(CONSUL_RELEASE.split('/')[-1]) wget( CONSUL_RELEASE, host=host, port=port, failure_message="Failed to retrieve {}".format( CONSUL_RELEASE.split('/')[-1] ) )
def run_app(app, port, loop): handler = app.make_handler(access_log=None, loop=loop) loop.run_until_complete(app.startup()) server = loop.run_until_complete(loop.create_server(handler, HOST, port)) try: loop.run_forever() except KeyboardInterrupt: # pragma: no branch pass finally: logger.info('shutting down server...') server.close() loop.run_until_complete(server.wait_closed()) loop.run_until_complete(app.shutdown()) with contextlib.suppress(asyncio.TimeoutError, KeyboardInterrupt): loop.run_until_complete(handler.shutdown(0.1)) with contextlib.suppress(asyncio.TimeoutError, KeyboardInterrupt): loop.run_until_complete(app.cleanup()) with contextlib.suppress(KeyboardInterrupt): loop.close()
def loop(): loop = asyncio.new_event_loop() asyncio.set_event_loop(None) executor = concurrent.futures.ThreadPoolExecutor() loop.set_default_executor(executor) yield loop if loop.is_closed: pending = asyncio.Task.all_tasks(loop=loop) for task in pending: task.cancel() # Now we should await task to execute it's cancellation. # Cancelled task raises asyncio.CancelledError that we can suppress with suppress(asyncio.CancelledError): loop.run_until_complete(task) executor.shutdown() loop.close() gc.collect() asyncio.set_event_loop(None)
def apply(self): """Creates or deletes the autostart file based upon current state""" if not self.props.sensitive: # We never got the current state return if self.props.active and not self._was_enabled: logging.info('Creating autostart file') source = Gio.File.new_for_uri('resource:///se/tingping/Trg/se.tingping.Trg.service.desktop') if hasattr(source, 'copy_async'): # TODO: Fix upstream in GLib source.copy_async(self.autostart_file, Gio.FileCopyFlags.NONE, GLib.PRIORITY_DEFAULT) else: with suppress(GLib.Error): source.copy(self.autostart_file, Gio.FileCopyFlags.NONE) elif not self.props.active and self._was_enabled: logging.info('Deleting autostart file') self.autostart_file.delete_async(GLib.PRIORITY_DEFAULT)
def data_mapper(mapper: Callable[[types.DataType], types.DataType]) -> Callable[[types.ListenerType], types.ListenerType]: ''' Apply a mapper to the listener's `data` argument. ''' def decorator(listener: types.ListenerType) -> types.ListenerType: @functools.wraps(listener) def wrapped(dtype: types.DiffType, index: int, data: types.DataType) -> None: listener(dtype, index, mapper(data)) with contextlib.suppress(AttributeError): wrapped.finalize_batch = listener.finalize_batch return wrapped return decorator
def insert_reviews(neighborhood, db): listings_cursor = db['listings_' + neighborhood].find({"reviews_count": {"$gt": 0}}) listings = [listing for listing in listings_cursor] collection = db['reviews_' + neighborhood] for listing in listings: reviews_cursor = db.reviews_en.find({'listing_id':listing['_id']}) reviews = [review for review in reviews_cursor] if reviews: with suppress(Exception): collection.insert_many(reviews, ordered=True) # try: # collection.insert_many(reviews, ordered=True) # except: # print(reviews) # raise
def remove(self, node): self._nodes.remove(node) for n in iter(self._nodes): with contextlib.suppress(KeyError): n.edges.remove(node)
def _get_name_from_func(func, other): if not isinstance(func, types.LambdaType): with contextlib.suppress(AttributeError): return func.__qualname__ with contextlib.suppress(AttributeError): return func.__name__ return other
def resolve_self_references(self, rules): """Resolves `$self` references to actual application name in security group rules.""" with suppress(KeyError): rule = rules.pop('$self') rules[self.app_name] = rule return rules
def __getattr__(self, name): if name != '__setstate__': # this avoids an infinite loop when pickle looks for the # __setstate__ attribute before the object is initialized with suppress(KeyError): return self._mapping[name] raise AttributeError("%r object has no attribute %r" % (type(self).__name__, name))
def remove_project(self): self.close_project() with contextlib.suppress(FileNotFoundError): self.project_path.unlink()
def cleanup(ctx): pv = Povray(ctx.config.povray.parent_dir) with contextlib.suppress(FileNotFoundError): pv.shelve_db_path.unlink() pv.scitools_udb_path.unlink()
def byro_information(request): ctx = {'config': Configuration.get_solo()} try: ctx['url_name'] = resolve(request.path_info).url_name except Http404: ctx['url_name'] = '' if settings.DEBUG: ctx['development_warning'] = True with suppress(Exception): import subprocess ctx['byro_version'] = subprocess.check_output(['git', 'describe', '--always']) return ctx
def delete(self, sender): if self.control.filename != '': (name, _) = os.path.splitext(os.path.basename(self.control.filename)) try: console.alert('Delete', name, button1='Ok') except KeyboardInterrupt: return os.remove(self.control.filename) with contextlib.suppress(FileNotFoundError): os.remove(self.control.filename[:-4]+'.json') self.control.set_default_map() else: self.control.quit_map(sender)
def match(v1, v2, nomatch=-1, incomparables=None, start=0): """ Return a vector of the positions of (first) matches of its first argument in its second. Parameters ---------- v1: array-like the values to be matched v2: array-like the values to be matched against nomatch: int the value to be returned in the case when no match is found. incomparables: array-like a list of values that cannot be matched. Any value in v1 matching a value in this list is assigned the nomatch value. start: int type of indexing to use. Most likely 0 or 1 """ lookup = {} for i, x in enumerate(v2): if x not in lookup: lookup[x] = i lst = [nomatch] * len(v1) skip = set(incomparables) if incomparables else set() for i, x in enumerate(v1): if x in skip: continue with suppress(KeyError): lst[i] = lookup[x] + start return lst
def suppress(*exceptions): """ Return a context manager that suppresses any of the specified exceptions if they occur in the body of a with statement and then resumes execution with the first statement following the end of the with statement. """ try: yield except exceptions: pass
def get_valid_kwargs(func, potential_kwargs): """ Return valid kwargs to function func """ kwargs = {} for name in get_kwarg_names(func): with suppress(KeyError): kwargs[name] = potential_kwargs[name] return kwargs
def init_dbs(sqls): # graceful initialization tries to create new tables as a test to see if this is a new DB or not for s in sqls: with suppress(sqlite3.OperationalError): cursor.execute(s)
def __setitem__(self, key, val): """Allow using [] syntax to save a keyvalue. - It is case-insensitive, so it will overwrite a key which only differs by case. """ if isinstance(val, bool): val = '1' if val else '0' key_fold = key.casefold() for k in self.keys: if k.casefold() == key_fold: # Check case-insensitively for this key first orig_val = self.keys.get(k) self.keys[k] = str(val) break else: orig_val = self.keys.get(key) self.keys[key] = str(val) # Update the by_class/target dicts with our new value if key_fold == 'classname': with suppress(KeyError): self.map.by_class[orig_val].remove(self) self.map.by_class[val].add(self) elif key_fold == 'targetname': with suppress(KeyError): self.map.by_target[orig_val].remove(self) self.map.by_target[val].add(self)
def ignore_exceptions_2(): data = None import contextlib with contextlib.suppress(FileNotFoundError, PermissionError): with open(DATA_FILE) as f: data = f.read() return data