我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.cast()。
def discardSomeCard(self) -> bool: best_index: int = 0 card: CardKnowledge bestCard: CardKnowledge i: int for i in range(len(self.hand)): card = cast(CardKnowledge, self.game.deck[self.hand[i]]) bestCard = cast(CardKnowledge, self.game.deck[self.hand[best_index]]) if bestCard.maybeValue is None: best_index = i elif (card.maybeValue is not None and card.maybeValue > bestCard.maybeValue): best_index = i self.discard_card(best_index) return True
def require_bool(value: Optional[Union[bool, str, int]], convert: bool=False, allow_none: bool=False) -> Any: """Make sure a value is a boolean. Used when dealing with http input data. """ if value is None and allow_none: return value if type(value) != bool: if not convert: raise InvalidData() if value in [None, 0, '0', 'false', 'False']: value = False elif value in [1, '1', 'true', 'True']: value = True else: raise InvalidData('value was %s(%s), expected bool' % (type(value), value)) return cast(bool, value)
def require_dict(value: Optional[Dict[Any, Any]], key_type: Any=None, value_type: Any=None, allow_none: bool=False) -> Any: """Make sure a value is a Dict[key_type, value_type]. Used when dealing with http input data. """ if value is None and allow_none: return value if type(value) != dict: raise InvalidData('value was %s(%s), expected dict' % (type(value), value)) value = cast(Dict, value) if key_type or value_type: for k, v in value.items(): if key_type and type(k) != key_type: raise InvalidData('dict key was %s(%s), expected %s' % (type(k), k, key_type)) if value_type and type(v) != value_type: raise InvalidData('dict value was %s(%s), expected %s' % (type(v), v, key_type)) return value
def _get_monitor_metadata(self, dbcon: DBConnection) -> Optional[Dict[int, Dict[str, str]]]: include_metadata = require_bool( get_request_param(self.request, 'include_metadata', error_if_missing=False), convert=True) or False if not include_metadata: return None if 'id' in self.request.rel_url.query: metadata_models = await metadata.get_metadata_for_object( dbcon, 'active_monitor', require_int(cast(str, get_request_param(self.request, 'id')))) elif 'meta_key' in self.request.rel_url.query: meta_key = require_str(get_request_param(self.request, 'meta_key')) meta_value = require_str(get_request_param(self.request, 'meta_value')) metadata_models = await metadata.get_metadata_for_object_metadata( dbcon, meta_key, meta_value, 'active_monitor', 'active_monitors') elif 'monitor_group_id' in self.request.rel_url.query: metadata_models = await monitor_group.get_active_monitor_metadata_for_monitor_group( dbcon, require_int(cast(str, get_request_param(self.request, 'monitor_group_id')))) else: metadata_models = await metadata.get_metadata_for_object_type(dbcon, 'active_monitor') metadata_dict = {} # type: Dict[int, Dict[str, str]] for metadata_model in metadata_models: if metadata_model.object_id not in metadata_dict: metadata_dict[metadata_model.object_id] = {} metadata_dict[metadata_model.object_id][metadata_model.key] = metadata_model.value return metadata_dict
def _sort_dataset_by_padding(dataset: Dataset, sorting_keys: List[Tuple[str, str]], # pylint: disable=invalid-sequence-index padding_noise: float = 0.0) -> Dataset: """ Sorts the ``Instances`` in this ``Dataset`` by their padding lengths, using the keys in ``sorting_keys`` (in the order in which they are provided). ``sorting_keys`` is a list of ``(field_name, padding_key)`` tuples. """ instances_with_lengths = [] for instance in dataset.instances: padding_lengths = cast(Dict[str, Dict[str, float]], instance.get_padding_lengths()) if padding_noise > 0.0: noisy_lengths = {} for field_name, field_lengths in padding_lengths.items(): noisy_lengths[field_name] = add_noise_to_dict_values(field_lengths, padding_noise) padding_lengths = noisy_lengths instance_with_lengths = ([padding_lengths[field_name][padding_key] for (field_name, padding_key) in sorting_keys], instance) instances_with_lengths.append(instance_with_lengths) instances_with_lengths.sort(key=lambda x: x[0]) return Dataset([instance_with_lengths[-1] for instance_with_lengths in instances_with_lengths])
def get_remaining_shared_breaks_this_week(group_members: Set[User]) -> List[Break]: """ Finds this weeks remaining common breaks between a group of users """ # So, the Mypy type checker treats `List` as invariant, meaning we # can't give a `List[B]` to a function that expects a `List[A]` if # B is a subclass of A. # So we have to cast it in to the function... # FIXME: Get rid of these casts when Van Rossum figures out how to write a # proper type system breaks = cast(List[Event_], get_shared_breaks(group_members)) now = datetime.now(BRISBANE_TIME_ZONE) ### ... and out. return cast(List[Break], get_this_weeks_events(now, breaks)) # FIXME: Make 'request_status' an enum: https://docs.python.org/3/library/enum.html
def create_file_in_config_folder(self, filename: str, mode: int=None) -> TextIO: """ :param filename: the name of the file in the generated config folder :param mode: pass an ``int`` here if you want to modify the files mode (will be umasked) :return: an open file descriptor (``TextIO``) object that the *caller must call `.close()` on* """ if os.path.isfile(filename): raise InvalidArgumentException("Call create_file_in_config_folder with a filename, not a path") self.ensure_config_folder() f = cast(TextIO, io.open(os.path.join(self.configfolder, filename), mode="wt", encoding="utf-8")) if mode: os.chmod(os.path.join(self.configfolder, filename), get_umasked_mode(mode)) return f
def validate_args(self, args: configargparse.Namespace) -> None: _aptly_args.validate_shared_args(args) from gopythongo.versioners import get_version_parsers debvp = cast(DebianVersionParser, get_version_parsers()["debian"]) # type: DebianVersionParser if args.version_action not in debvp.supported_actions: raise ErrorMessage("Version Action is set to '%s', but you chose the Aptly Store which relies on Debian " "version strings. Unfortunately the Debian Versioner does not support the '%s' action. " "It only supports: %s." % (highlight(args.version_action), highlight(args.version_action), highlight(", ".join(debvp.supported_actions)))) if "-distribution" in args.aptly_publish_opts: print_warning("You are using %s in your Aptly Store options. You should use the %s GoPythonGo argument " "instead, since using -distribution in the aptly command line is invalid when GoPythonGo " "tries to update a published repo." % (highlight("-distribution"), highlight("--aptly-distribution"))) if args.use_aptly_wrapper: wrapper_cmd = create_script_path(the_context.gopythongo_path, "vaultwrapper") if not os.path.exists(wrapper_cmd) or not os.access(wrapper_cmd, os.X_OK): raise ErrorMessage("%s can either not be found or is not executable. The vault wrapper seems to " "be unavailable." % wrapper_cmd) self.aptly_wrapper_cmd = wrapper_cmd
def _get_commands(dist # type: setuptools.dist.Distribution ): # type: (...) -> typing.Dict[str, typing.Set[str]] """Find all commands belonging to the given distribution. Args: dist: The Distribution to search for docopt-compatible docstrings that can be used to generate command entry points. Returns: A dictionary containing a mapping of primary commands to sets of subcommands. """ py_files = (f for f in setuptools.findall() if os.path.splitext(f)[1].lower() == '.py') pkg_files = (f for f in py_files if _get_package_name(f) in dist.packages) commands = {} # type: typing.Dict[str, typing.Set[str]] for file_name in pkg_files: with open(file_name) as py_file: module = typing.cast(ast.Module, ast.parse(py_file.read())) module_name = _get_module_name(file_name) _append_commands(commands, module_name, _get_module_commands(module)) _append_commands(commands, module_name, _get_class_commands(module)) _append_commands(commands, module_name, _get_function_commands(module)) return commands
def filter_all_or_404(model: t.Type[Y], *criteria: t.Any) -> t.Sequence[Y]: """Get all objects of the specified model filtered by the specified criteria. .. note:: ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a SQLAlchemy model. :param model: The object to get. :param criteria: The criteria to filter with. :returns: The requested objects. :raises APIException: If no object with the given id could be found. (OBJECT_ID_NOT_FOUND) """ return t.cast(t.Sequence[Y], _filter_or_404(model, True, criteria))
def filter_single_or_404(model: t.Type[Y], *criteria: t.Any) -> Y: """Get a single object of the specified model by filtering or raise an exception. .. note:: ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a SQLAlchemy model. :param model: The object to get. :param criteria: The criteria to filter with. :returns: The requested object. :raises APIException: If no object with the given id could be found. (OBJECT_ID_NOT_FOUND) """ return t.cast(Y, _filter_or_404(model, False, criteria))
def gen_update_pent_dynamic( context: PentContext, obj_id: UUID, pent_cls_name: str, data_cls_name: str, payload_cls_name: str, data: PentMutationData ) -> PentMutationPayload: data_cls = context.cls_from_name(data_cls_name) check.isinst(data, data_cls) pent_cls = context.cls_from_name(pent_cls_name) payload_cls = context.cls_from_name(payload_cls_name) pent = await update_pent(context, pent_cls, obj_id, data) return cast(PentMutationPayload, payload_cls(pent))
def gen_operation(self, graphql_text: str, operation: str, *args: GraphQLArg) -> dict: arg_strings = [] for name, arg_type, _value in args: arg_strings.append("${name}: {arg_type}".format(name=name, arg_type=arg_type)) arg_list = ', '.join(arg_strings) full_query = ( '{operation} ({arg_list}) '.format(arg_list=arg_list, operation=operation) + '{' + graphql_text + '}' ) arg_dict = {arg.name: arg.value for arg in args} result = await ( exec_in_mem_graphql( self.graphql_schema, self.context, full_query, self.root_value, arg_dict ) ) if result.errors: _process_error(result) return cast(dict, result.data)
def render(self, grid: Grid, **kwargs: Any) -> None: horizontal_wall = "\u2501" vertical_wall = "\u2503" output = self.JUNCTIONS[12] for x in range(grid.columns - 1): output += (horizontal_wall * 3 + self.get_topmost_junction(cast(Cell, grid.cell_at(row=0, column=x)))) output += horizontal_wall * 3 + self.JUNCTIONS[10] + "\n" for row in grid.each_row(): top = vertical_wall bottom = self.get_leftmost_junction(row[0]) for cell in row: body = grid.contents_of(cell) east_boundary = " " if cell.linked_to(cell.east) else vertical_wall top += body + east_boundary south_boundary = " " if cell.linked_to(cell.south) else horizontal_wall * 3 bottom += south_boundary + self.get_south_east_junction(cell) output += top + "\n" output += bottom + "\n" print(output)
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict: fd = {} # type: FeedDict sentences = cast(Iterable[List[str]], dataset.get_series(self.data_id, allow_none=True)) fd[self.train_mode] = train if sentences is not None: vectors, paddings = self.vocabulary.sentences_to_tensor( list(sentences), pad_to_max_len=False, train_mode=train) fd[self.train_targets] = vectors.T fd[self.train_weights] = paddings.T return fd
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict: sentences = cast(Iterable[List[str]], dataset.get_series(self.data_id, allow_none=True)) sentences_list = list(sentences) if sentences is not None else None fd = {} # type: FeedDict if sentences is not None: label_tensors, _ = self.vocabulary.sentences_to_tensor( sentences_list, self.max_output_len) # pylint: disable=unsubscriptable-object fd[self.gt_inputs[0]] = label_tensors[0] # pylint: enable=unsubscriptable-object fd[self.train_mode] = train return fd
def _get_series_outputs(series_config: SeriesConfig) -> Dict[str, str]: """Get paths to series outputs from the dataset keyword argument specs. Output file for a series named 'xxx' is specified by parameter 's_xxx_out' Arguments: series_config: A dictionary containing the dataset keyword argument specs. Returns: A dictionary which maps serie names to the paths for their output files. """ outputs = {} for key, value in series_config.items(): matcher = SERIES_OUTPUT.match(key) if matcher: name = matcher.group(1) if not isinstance(value, str): raise ValueError( "Output path for '{}' series must be a string, was {}.". format(name, type(value))) outputs[name] = cast(str, value) return outputs
def _preprocessed_datasets( dataset: Dataset, series_config: SeriesConfig) -> None: """Apply dataset-level preprocessing.""" keys = [key for key in series_config.keys() if PREPROCESSED_SERIES.match(key)] for key in keys: name = PREPROCESSED_SERIES.match(key).group(1) preprocessor = cast(DatasetPreprocess, series_config[key]) if isinstance(dataset, Dataset): new_series = list(preprocessor(dataset)) dataset.add_series(name, new_series) elif isinstance(dataset, LazyDataset): dataset.preprocess_series[name] = (None, preprocessor)
def __init__(self, output_series: str, encoder: Stateful, used_session: int = 0) -> None: """Initialize the representation runner. Args: output_series: Name of the output seriesi with vectors. encoder: Used encoder. used_session: Id of the TensorFlow session used in case of model ensembles. """ check_argument_types() if not isinstance(encoder, ModelPart): raise TypeError("The encoder of the representation runner has to " "be an instance of 'ModelPart'") BaseRunner.__init__(self, output_series, cast(ModelPart, encoder)) self._used_session = used_session # type: int self._encoded = encoder.output # type: tf.Tensor # pylint: disable=unused-argument
def _handle_haves(self, message_id: MessageType, payload: memoryview): if message_id == MessageType.have: (index,) = struct.unpack('!I', cast(bytes, payload)) self._mark_as_owner(index) elif message_id == MessageType.bitfield: piece_count = self._download_info.piece_count PeerTCPClient._check_payload_len(message_id, payload, int(ceil(piece_count / 8))) arr = bitarray(endian='big') arr.frombytes(payload.tobytes()) for i in range(piece_count): if arr[i]: self._mark_as_owner(i) for i in range(piece_count, len(arr)): if arr[i]: raise ValueError('Spare bits in "bitfield" message must be zero') # if self._download_info.complete and self.is_seed(): # raise SeedError('A seed is disconnected because a download is complete')
def _handle_requests(self, message_id: MessageType, payload: memoryview): piece_index, begin, length = struct.unpack('!3I', cast(bytes, payload)) request = BlockRequest(piece_index, begin, length) self._check_position_range(request) if message_id == MessageType.request: if length > PeerTCPClient.MAX_REQUEST_LENGTH: raise ValueError('Requested {} bytes, but the current policy allows to accept requests ' 'of not more than {} bytes'.format(length, PeerTCPClient.MAX_REQUEST_LENGTH)) if (self._am_choking or not self._peer_interested or not self._download_info.pieces[piece_index].downloaded): # If peer isn't interested but requesting, their peer_interested flag wasn't considered # when selecting who to unchoke, so we may be not ready to upload to them. # If requested piece is not downloaded yet, we shouldn't disconnect because our piece_downloaded flag # could be removed because of file corruption. return await self._send_block(request) await self.drain() elif message_id == MessageType.cancel: # Now we answer to a request immediately or reject and forget it, # so there's no need to handle cancel messages pass
def add(self, labels: LabelsType, value: NumericValueType) -> None: ''' Add will add the given value to the counter. :raises: ValueError if the value is negative. Counters can only increase. ''' value = cast(Union[float, int], value) # typing check, no runtime behaviour. if value < 0: raise ValueError("Counters can't decrease") try: current = self.get_value(labels) except KeyError: current = 0 current = cast(Union[float, int], current) # typing check, no runtime behaviour. self.set_value(labels, current + value)
def add(self, labels: LabelsType, value: NumericValueType) -> None: ''' Add adds a single observation to the summary ''' value = cast(Union[float, int], value) # typing check, no runtime behaviour. if type(value) not in (float, int): raise TypeError("Summary only works with digits (int, float)") try: e = self.get_value(labels) except KeyError: # Initialize quantile estimator e = quantile.Estimator(*self.invariants) self.set_value(labels, e) e.observe(float(value)) # type: ignore # https://prometheus.io/docs/instrumenting/writing_clientlibs/#summary # A summary MUST have the ``observe`` methods
def get(self, labels: LabelsType) -> Dict[Union[float, str], NumericValueType]: ''' Get gets a dict of values, containing the sum, count and percentiles, matching an arbitrary group of labels. :raises: KeyError if an item with matching labels is not present. ''' return_data = {} # type: Dict[Union[float, str], NumericValueType] e = self.get_value(labels) e = cast(Any, e) # typing check, no runtime behaviour. # Set invariants data (default to 0.50, 0.90 and 0.99) for i in e._invariants: # type: ignore q = i._quantile return_data[q] = e.query(q) # type: ignore # Set sum and count return_data[self.SUM_KEY] = e._sum # type: ignore return_data[self.COUNT_KEY] = e._observations # type: ignore return return_data
def get(self, labels: LabelsType) -> Dict[Union[float, str], NumericValueType]: ''' Get gets a dict of values, containing the sum, count and buckets, matching an arbitrary group of labels. :raises: KeyError if an item with matching labels is not present. ''' return_data = {} # type: Dict[Union[float, str], NumericValueType] h = self.get_value(labels) h = cast(histogram.Histogram, h) # typing check, no runtime behaviour. for upper_bound, cumulative_count in h.buckets.items(): return_data[upper_bound] = cumulative_count # keys are floats # Set sum and count return_data[self.SUM_KEY] = h.sum return_data[self.COUNT_KEY] = h.observations return return_data
def handle(self, config, out, force, wait, post_hook, **options): server_id, auth_token = self._require_credentials(config) server = Server.retrieve(server_id, auth_token) while wait and server.csr_pending: time.sleep(5) server = Server.retrieve(server_id, auth_token) tag = self._get_tag(config) if (not force) else None result = server.get_pki(tag) if result is not PKI.NOT_MODIFIED: pki = cast(PKI, result) if pki.entity is not None: self._handle_pki(result, config, out, post_hook) print("Certificates saved to {}.".format(out), file=self.stdout) else: print("No certificate available. Request one with req.", file=self.stdout) else: print("Not modified. Pass -f to download anyway.", file=self.stdout)
def __init__(self, source: Union['SourceTable', 'SourceQuery', 'File'], target: Union[str, Table], auto_map: bool = True, filter='', ignore_fields: List[str] = []) -> None: #todo transformations if isinstance(source, File): self.file_name = source.file_name elif isinstance(source, SourceTable): self.source_table = source elif isinstance(source, SourceQuery): self.source_table = source if isinstance(target, str): self.sor_table = str(target) #type: str else: target_tbl = cast(Table, target) self.sor_table = target_tbl.name super().__init__(source, target, filter) self.temp_table = self.sor_table.replace('_hstage', '') + '_temp' #todo keys self.keys = [] #type: List[str] ignore_fields = [s.lower() for s in ignore_fields] self.ignore_fields = ignore_fields # self.field_mappings = [] #type: List[FieldMapping] self.auto_map = auto_map if auto_map: self.create_auto_mappings(source, ignore_fields)
def test_10_send_response_json(self) -> None: self._restart_data('json') processor = RequestProcessorJSON(self.recvbuffer) processor._send_response(cast(Response, self.data)) res = self._loadResults('json') self.assertEqual(len(res), 1) self.assertDictEqual(self.data, res[0]) # process request already tested with TestPythonDriverBase
def updateEyesightCount(self) -> None: self.eyesightCount = {c: [0] * 6 for c in self.colors} p: Player c: int card: CardKnowledge for p in self.game.players: for c in p.hand: card = cast(CardKnowledge, self.game.deck[c]) if card.suit is not None and card.rank is not None: self.eyesightCount[card.suit][card.rank] += 1 elif card.color is not None and card.value is not None: self.eyesightCount[card.color][card.value] += 1
def updateLocatedCount(self) -> bool: '''Returns True if played/discarded cards has changed''' newCount: Dict[Color, List[int]] = {c: [0] * 6 for c in self.colors} p: Player c: int for p in self.game.players: for c in p.hand: card = cast(CardKnowledge, self.game.deck[c]) if card.color is not None and card.value is not None: newCount[card.color][card.value] += 1 if newCount != self.locatedCount: self.locatedCount = newCount return True return False
def handState(self, player: int, showCritical: bool=True) -> List[HandState]: handState: List[HandState] handState = [HandState.Unclued] * len(self.game.players[player].hand) c: int h: int card: CardKnowledge for c, h in enumerate(self.game.players[player].hand): card = cast(CardKnowledge, self.game.deck[h]) if card.worthless is True: handState[c] = HandState.Worthless continue if card.playWorthless is True: handState[c] = HandState.Worthless continue if card.playable is True: handState[c] = HandState.Playable continue if card.valuable is True: handState[c] = HandState.Saved continue if card.clued: handState[c] = HandState.SoonPlay continue if showCritical and player != self.position: if self.isValuable(card.suit, card.rank): handState[c] = HandState.Critical elif card.rank == Value.V2 and self.is2Valuable(card.suit): handState[c] = HandState.Critical2 return handState
def isCluedElsewhere(self, player: int, hand: int) -> bool: returnVal: bool = False cardIdx: int = self.game.players[player].hand[hand] handcard: CardKnowledge handcard = cast(CardKnowledge, self.game.deck[cardIdx]) color: Color = handcard.suit value: Value = handcard.rank p: Player c: int card: CardKnowledge for p in self.game.players: for c in p.hand: card = cast(CardKnowledge, self.game.deck[c]) if card.deckPosition == handcard.deckPosition: continue if p is self: if card.mustBeColor(color): if card.mustBeValue(value): return True if card.cannotBeValue(value) and card.clued: returnVal = None elif card.mustBeValue(value): if card.cannotBeColor(color) and card.clued: returnVal = None else: if (card.clued and card.suit == color and card.rank == value): return True elif card.color == color and card.value == value: return True return returnVal
def cluedCard(self, color: Color, value: Value, player: Optional[int]=None, strict: bool=False, maybe: bool=False) -> Optional[int]: p: Player c: int card: CardKnowledge for p in self.game.players: if player == p.position: if strict: continue # When it is the player, assume fully tagged cards as clued too for c in p.hand: card = cast(CardKnowledge, self.game.deck[c]) if card.color == color and card.value == value: return card.deckPosition if p is self: if (maybe and card.maybeColor == color and card.maybeValue == value): return card.deckPosition elif p is self: for c in p.hand: card = cast(CardKnowledge, self.game.deck[c]) if card.color == color and card.value == value: return card.deckPosition if (maybe and card.maybeColor == color and card.maybeValue == value): return card.deckPosition else: for c in p.hand: card = cast(CardKnowledge, self.game.deck[c]) if (card.clued and card.suit == color and card.rank == value): return card.deckPosition return None
def doesCardMatchHand(self, deckIdx: int) -> bool: deckCard: CardKnowledge deckCard = cast(CardKnowledge, self.game.deck[deckIdx]) assert deckCard.suit is not None assert deckCard.rank is not None if self.colorComplete[deckCard.suit]: return False if deckCard.rank == Value.V5: return False h: int for h in self.hand: card: CardKnowledge card = cast(CardKnowledge, self.game.deck[h]) if not card.clued: continue if card.worthless or card.playWorthless: continue if card.cantBe[deckCard.suit][deckCard.rank]: continue if card.color is not None and card.value is not None: continue if card.color == deckCard.suit: maybeValue: Optional[Value] = card.maybeValue if maybeValue is not None: if maybeValue == deckCard.rank: return True continue if deckCard.rank in card.possibleValues: return True if card.value == deckCard.rank: maybeColor: Optional[Color] = card.maybeColor if maybeColor is not None: if maybeColor == deckCard.suit: return True continue if deckCard.suit in card.possibleColors: return True return False
def pleaseObserveBeforeDiscard(self, from_: int, card_index: int, deckIdx: int) -> None: card: CardKnowledge = cast(CardKnowledge, self.game.deck[deckIdx]) card.state = CardState.Discard self.seePublicCard(card.suit, card.rank)
def srand(seed=0): # type: (KeyType) -> typing.Generator[int, None, None] if isinstance(seed, six.string_types) or isinstance(seed, bytes): if isinstance(seed, six.text_type): seed = seed.encode('utf-8') seed_int = int(hashlib.sha512(seed).hexdigest(), 16) seed = typing.cast(int, seed_int) rng = random.Random(seed) while True: yield rng.randint(0, sys.maxsize)
def configure_logging(logtype: str, logfilename: Optional[str]=None, debug_logging: bool=False, rotate_length: int=1000000, max_rotated_files: int=250) -> None: global logger level = logging.INFO if debug_logging: level = logging.DEBUG if logtype not in ['stdout', 'syslog', 'file']: raise errors.IrisettError('invalid logtype name %s' % logtype) if rotate_length is None: rotate_length = 1000000 if max_rotated_files is None: max_rotated_files = 250 logger = logging.getLogger('irisett') logger.setLevel(level) if logtype == 'stdout': handler = logging.StreamHandler() # type: Any handler.setLevel(level) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') elif logtype == 'syslog': handler = logging.handlers.SysLogHandler(address='/dev/log') handler.setLevel(level) formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s') else: # == file logfilename = cast(str, logfilename) logpath = os.path.split(logfilename)[0] if not os.path.exists(logpath): os.makedirs(logpath) handler = logging.handlers.RotatingFileHandler(logfilename, maxBytes=rotate_length, backupCount=max_rotated_files) handler.setLevel(level) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler)
def require_list(value: Optional[List[Any]], item_type: Any=None, allow_none: bool=False) -> Any: """Make sure a value is a List[item_type]. Used when dealing with http input data. """ if value is None and allow_none: return value if type(value) != list: raise InvalidData('value was %s, expected list' % type(value)) value = cast(List, value) if item_type: for item in value: if type(item) != item_type: raise InvalidData('list item was %s, expected %s' % (type(item), item_type)) return value
def require_int(value: Optional[Union[SupportsInt, str, bytes]], allow_none: bool=False) -> Any: """Make sure a value is an int. Used when dealing with http input data. """ if value is None and allow_none: return value value = cast(Union[SupportsInt, str, bytes], value) try: value = int(value) except (ValueError, TypeError): raise InvalidData('value was %s(%s), expected list' % (type(value), value)) return value
def update_monitor(self) -> web.Response: request_data = await self.request.json() monitor = self._get_request_monitor(self.request) if 'args' in request_data: args = cast(Dict[str, str], require_dict(request_data['args'])) await monitor.update_args(args) if 'checks_enabled' in request_data: await monitor.set_checks_enabled_status(cast(bool, require_bool(request_data['checks_enabled']))) if 'alerts_enabled' in request_data: await monitor.set_alerts_enabled_status(cast(bool, require_bool(request_data['alerts_enabled']))) return web.json_response(True)
def _get_request_monitor(self, request: web.Request) -> ActiveMonitor: monitor_id = require_int(cast(str, get_request_param(request, 'id'))) monitor = request.app['active_monitor_manager'].monitors.get(monitor_id, None) if not monitor: raise errors.NotFound() return monitor
def get(self) -> web.Response: monitor_id = cast(int, require_int(get_request_param(self.request, 'monitor_id'))) if 'include_all' in self.request.rel_url.query: contacts = await get_all_contacts_for_active_monitor(self.request.app['dbcon'], monitor_id) else: contacts = object_models.asdict( await get_contacts_for_active_monitor(self.request.app['dbcon'], monitor_id) ) ret = object_models.list_asdict(contacts) return web.json_response(ret)
def post(self) -> web.Response: request_data = await self.request.json() await add_contact_to_active_monitor( self.request.app['dbcon'], cast(int, require_int(request_data.get('contact_id'))), cast(int, require_int(request_data.get('monitor_id')))) return web.json_response(True)