我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Tuple()。
def get_diff_with_color(expected: str, ans: str) -> Tuple[str, str]: d = difflib.Differ() diff = d.compare(expected, ans) expected_with_mistake = "" ans_with_mistake = "" for e in diff: if e.startswith("+"): ans_with_mistake += colored(e[-1], "red") elif e.startswith("-"): expected_with_mistake += colored(e[-1], "green") else: expected_with_mistake += e[-1] ans_with_mistake += e[-1] return expected_with_mistake, ans_with_mistake
def get_processor_instance(format_: str, custom_inbuffer: InBuffer=None, custom_outbuffer: OutBuffer=None) -> Tuple[Any, Any]: """ Get a processor instance. The class and buffers will be selected based on the python_driver.ProcessorConfigs dictionary. The input and output buffers can be overriden using the custom_inbuffer and custom_outbuffer parameters. This is mainly useful for unittesting. """ conf = ProcessorConfigs.get(format_) if not conf: raise RequestInstantiationException('No RequestProcessor found for format %s' % format_) inbuffer = custom_inbuffer if custom_inbuffer else conf['inbuffer'] outbuffer = custom_outbuffer if custom_outbuffer else conf['outbuffer'] instance = conf['class'](outbuffer) # type: ignore return instance, inbuffer
def __init__( self, description: str = None, pre_hooks: (List, Tuple) = None, post_hooks: (List, Tuple) = None ): self.result = None self.total = None self.success = None self.errors = None self.params = None self.output = None self.pagination = None self.limit = None self.offset = None self.app = None self.settings = None self.description = description self.pre_hooks = pre_hooks self.post_hooks = post_hooks self.meta = {}
def _load_word_freq(self, threshold: int) -> Tuple[Dict[str, int], int]: n_total_words = 0 word_freq = {} with open(self.rnnlm_model_path, mode='r') as f: for line in f: n_total_words += 1 word, freq = line.split(' ') freq = int(freq) if freq > threshold: word_freq[word] = freq else: word_freq['<unk/>'] = word_freq.get('<unk/>', 0) + 1 return (word_freq, n_total_words)
def extra_penalty(self, style, complexity): # type: (Style, int) -> Tuple[int, int] """Trying longer and longer column limits without getting better results should be penalized to speed up the search. """ standards = {'ColumnLimit': 80, 'MaxEmptyLinesToKeep': 2, } penalty = 0 for optionname, value in standards.items(): fvalue = style.get(optionname, value) if fvalue is not None and fvalue > value: penalty += fvalue - value if style.get('BreakBeforeBraces') == 'Custom': # Rate a commonly known brace breaking style # better than an equally performing custom style. penalty += 1 # We would prefer an equally performing style even if we had to # add another 12 options. complexity += 12 return complexity, penalty
def mget(self, keys): # type: (List[str]) -> List[Optional[bytes]] if not keys: return [] cached = [] uncached = [] # type: List[Tuple[int, Optional[bytes]]] contentkeys = super(DedupKeyValueStore, self).mget(keys) for idx, contentkey in enumerate(contentkeys): if contentkey is None: uncached.append((idx, None)) else: sha = binary_type(contentkey) cached.append((idx, unistr(sha))) if not cached: return [None for _, contentkey in uncached] indices, existing_keys = zip(*cached) existing_values = self.kvstore.mget(existing_keys) idx_value_pairs = sorted(uncached + list(zip(indices, existing_values))) return list([value for _, value in idx_value_pairs])
def split_reffiles(references, filenames): # type: (bool, List[str]) -> Tuple[List[str], List[str]] """Splits [file1, reffile1, file2, reffile2] into [file1, file2], [reffile1, reffile2] when references is True. When references is False returns the pair (filenames, filenames). """ if not references: return filenames, filenames assert len(filenames) % 2 == 0 files = [] refs = [] for filename, reffilename in grouper(2, filenames): files.append(filename) refs.append(reffilename) return files, refs # ---------------------------------------------------------------------- # Functions to convert ANSI text into HTML.
def update_evaluations(formatter, # type: CodeFormatter evaluations, # type: List[AttemptResult] finished_styles, # type: List[AttemptResult] bestdist # type: Sequence[int] ): # type: (...) -> Tuple[bool, bool, Sequence[int]] attemptresult = heapq.heappop(evaluations) nested_round = False if bestdist is None or (distquality(attemptresult.distance) < distquality(bestdist)): bestdist = attemptresult.distance heapq.heappush(evaluations, attemptresult) else: # We found a style that could no longer be improved by adding a single option value. heapq.heappush(finished_styles, attemptresult) nested_styles = formatter.nested_derivations(attemptresult.formatstyle) if not nested_styles: # This formatstyle does not unlock more options. return True, nested_round, bestdist # Restart the optimization from scratch with the attemptresult augmented with # every nested option as seed styles. bestdist = None ndist = (HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE) evaluations[:] = [AttemptResult(ndist, s) for s in nested_styles] nested_round = True return False, nested_round, bestdist
def avg_linelength_diffs(diffargs): # type: (List[Tuple[str, bytes]]) -> Iterable[int] """Returns the nudged absolute line length differences. """ for filename1, content2 in diffargs: linelen1 = get_num_lines(filename1) filelen1 = len(get_cached_file(filename1)) avg1 = 0.0 if linelen1 > 0: avg1 = float(filelen1) / linelen1 linelen2 = count_content_lines(content2) filelen2 = len(content2) avg2 = 0.0 if linelen2 > 0: avg2 = float(filelen2) / linelen2 yield int(abs(10000.0 * (avg1 - avg2)))
def unified_diff(filename, content2=None): # type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]] """This function prints a unified diff of the contents of filename and the standard input, when used from the command line as follows: echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt We get this result: --- +++ @@ -1 +1 @@ -123 +456 """ use_stdin = content2 is None if content2 is None: # Read binary input stream stdin = rawstream(sys.stdin) econtent2 = bytestr(stdin.read()) else: econtent2 = content2 exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='') if use_stdin: write('\n'.join(diff)) return exit_code, diff
def compute_unified_diff(filename, content2, **kwargs): # type: (str, bytes, **Any) -> Tuple[int, Iterable[str]] diff = () # type: Iterable[str] exit_code = ERROR kw = kwargs.copy() if 'n' not in kwargs: # zero context lines kw['n'] = 0 try: content1 = get_cached_file(filename) if PY3: c1 = unistr(content1) c2 = unistr(content2) else: c1 = content1 c2 = content2 diff = difflib.unified_diff(c1.splitlines(True), c2.splitlines(True), **kw) exit_code = OK finally: return exit_code, diff # --------------------------------------------------------------------- # Spare the user from specifying a formatter by finding a suitable one.
def bin_stats(predictions: tf.Tensor, labels: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]: """ Calculate f1, precision and recall from binary classification expected and predicted values. :param predictions: 2-d tensor (batch, predictions) of predicted 0/1 classes :param labels: 2-d tensor (batch, labels) of expected 0/1 classes :return: a tuple of batched (f1, precision and recall) values """ predictions = tf.cast(predictions, tf.int32) labels = tf.cast(labels, tf.int32) true_positives = tf.reduce_sum((predictions * labels), axis=1) false_positives = tf.reduce_sum(tf.cast(tf.greater(predictions, labels), tf.int32), axis=1) false_negatives = tf.reduce_sum(tf.cast(tf.greater(labels, predictions), tf.int32), axis=1) recall = true_positives / (true_positives + false_negatives) precision = true_positives / (true_positives + false_positives) f1_score = 2 / (1 / precision + 1 / recall) return f1_score, precision, recall
def _check_edges(self, edges: List[Tuple[str, str]]) -> None: for parent_node, child_node in edges: # check if both nodes are already inserted into the graph if child_node not in self._hyperparameters: raise ValueError("Child hyperparameter '%s' not in configuration " "space." % child_node) if parent_node not in self._hyperparameters: raise ValueError("Parent hyperparameter '%s' not in configuration " "space." % parent_node) # TODO: recursively check everything which is inside the conditions, # this means we have to recursively traverse the condition tmp_dag = self._create_tmp_dag() for parent_node, child_node in edges: tmp_dag.add_edge(parent_node, child_node) if not ConfigSpace.nx.is_directed_acyclic_graph(tmp_dag): cycles = list(ConfigSpace.nx.simple_cycles(tmp_dag)) # type: List[List[str]] for cycle in cycles: cycle.sort() cycles.sort() raise ValueError("Hyperparameter configuration contains a " "cycle %s" % str(cycles))
def randomly_grouped_by(key_from_example: Callable[[LabeledExample], Any], training_share: float = .9) -> Callable[ [List[LabeledExample]], Tuple[List[LabeledExample], List[LabeledExample]]]: def split(examples: List[LabeledExample]) -> Tuple[List[LabeledExample], List[LabeledExample]]: examples_by_directory = group(examples, key=key_from_example) directories = examples_by_directory.keys() # split must be the same every time: random.seed(42) keys = set(random.sample(directories, int(training_share * len(directories)))) training_examples = [example for example in examples if key_from_example(example) in keys] test_examples = [example for example in examples if key_from_example(example) not in keys] return training_examples, test_examples return split
def _transform(self, source_type: Type[S], target_type: Type[T]) -> Tuple[Callable[[S], T], int]: try: LOGGER.info("Searching type graph for shortest path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) path = dijkstra_path(self._type_graph, source=source_type, target=target_type, weight="cost") LOGGER.info("Found a path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) except (KeyError, NetworkXNoPath): raise NoConversionError("Pipeline can't convert \"{source_type}\" to \"{target_type}\"".format(source_type=source_type, target_type=target_type)) LOGGER.info("Building transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) chain = [] cost = 0 for source, target in _pairwise(path): transformer = self._type_graph.adj[source][target][_TRANSFORMER] chain.append((transformer, target)) cost += transformer.cost LOGGER.info("Built transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) if not chain: return _identity, 0 return partial(_transform, transformer_chain=chain), cost
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]: best = None best_cost = _MAX_TRANSFORM_COST to_type = None for target_type in target_types: try: transform, cost = self._transform(source_type, target_type) if cost < best_cost: best = transform best_cost = cost to_type = target_type except NoConversionError: pass if best is None: raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types)) return best, to_type, best_cost
def _split_text_to_lines_and_columns( cls, text) -> Iterable[Tuple[int, int, str]]: lines = text.splitlines() if len(lines) > cls._MAX_LINES: raise ValueError( 'Max {} specification lines allowed, got {}' .format(cls._MAX_LINES, len(lines))) for line_number, line_text in enumerate(lines, 1): if len(line_text) > cls._MAX_LINE_LENGTH: raise ValueError( 'Specification lines must be max {} chars long, ' 'got {}: {!r}' .format(cls._MAX_LINE_LENGTH, len(line_text), line_text)) yield (line_number, 1, '{:40}'.format(line_text[0:40])) yield (line_number, 2, '{:40}'.format(line_text[40:80]))
def _encrypt_key( self, key: bytes, pubkey: bytes = None ) -> Tuple[bytes, bytes]: """ Encrypts the `key` provided for the provided `pubkey` using the ECIES schema. If no `pubkey` is provided, it uses `self.pub_key`. :param key: Key to encrypt :param pubkey: Public Key to encrypt the `key` for :return (encrypted key, encapsulated ECIES key) """ pubkey = pubkey or self.pub_key symm_key, enc_symm_key = API.ecies_encaspulate(pubkey) enc_key = API.symm_encrypt(symm_key, key) return (enc_key, enc_symm_key)
def gen_path_keys( self, path: bytes ) -> List[Tuple[bytes, bytes]]: """ Generates path keys and returns path keys :param path: Path to derive key(s) from :return: List of path keys """ subpaths = self._split_path(path) keys = [] for subpath in subpaths: path_priv, path_pub = self._derive_path_key(subpath) keys.append((path_priv, path_pub)) return keys
def encrypt( self, data: bytes, pubkey: bytes = None ) -> Tuple[bytes, bytes]: """ Encrypts data with Public key encryption :param data: Data to encrypt :param pubkey: publc key to encrypt for :return: (Encrypted Key, Encrypted data) """ pubkey = pubkey or self.pub_key key, enc_key = API.ecies_encapsulate(pubkey) enc_data = API.symm_encrypt(key, data) return (enc_data, API.elliptic_curve.serialize(enc_key.ekey))
def decrypt( self, enc_data: Tuple[bytes, bytes], privkey: bytes = None ) -> bytes: """ Decrypts data using ECIES PKE. If no `privkey` is provided, it uses `self.priv_key`. :param enc_data: Tuple: (encrypted data, ECIES encapsulated key) :param privkey: Private key to decapsulate with :return: Decrypted data """ privkey = privkey or self.priv_key ciphertext, enc_key = enc_data enc_key = API.elliptic_curve.deserialize(API.PRE.ecgroup, enc_key) enc_key = API.umbral.EncryptedKey(ekey=enc_key, re_id=None) dec_key = API.ecies_decapsulate(privkey, enc_key) return API.symm_decrypt(dec_key, ciphertext)
def _ecies_gen_ephemeral_key( recp_pubkey: Union[bytes, elliptic_curve.ec_element] ) -> Tuple[bytes, Tuple[bytes, bytes]]: """ Generates and encrypts an ephemeral key for the `recp_pubkey`. :param recp_pubkey: Recipient's pubkey :return: Tuple of the eph_privkey, and a tuple of the encrypted symmetric key, and encrypted ephemeral privkey """ symm_key, enc_symm_key = API.ecies_encapsulate(recp_pubkey) eph_privkey = API.ecies_gen_priv() enc_eph_privkey = API.symm_encrypt(symm_key, eph_privkey) return (eph_privkey, (enc_symm_key, enc_eph_privkey))
def ecdsa_priv2pub( privkey: bytes, to_bytes: bool = True ) -> Union[bytes, Tuple[int]]: """ Returns the public component of an ECDSA private key. :param privkey: Private key as an int or bytestring :param to_bytes: Serialize to bytes or not? :return: Byte encoded or Tuple[int] ECDSA pubkey """ pubkey = privtopub(privkey) if to_bytes: return SIG_KEYPAIR_BYTE + PUB_KEY_BYTE + ecdsa_pub2bytes(pubkey) return pubkey
def ecdsa_verify( v: int, r: int, s: int, msghash: bytes, pubkey: Union[bytes, Tuple[int, int]] ) -> bool: """ Takes a v, r, s, a pubkey, and a hash of a message to verify via ECDSA. :param v: V of sig :param r: R of sig :param s: S of sig :param bytes msghash: The hashed message to verify :param bytes pubkey: Pubkey to validate signature for :rtype: Boolean :return: Is the signature valid or not? """ if bytes == type(pubkey): pubkey = ecdsa_bytes2pub(pubkey) verify_sig = ecdsa_raw_recover(msghash, (v, r, s)) # TODO: Should this equality test be done better? return verify_sig == pubkey
def ecies_ephemeral_split_rekey( privkey_a: Union[bytes, elliptic_curve.ec_element], pubkey_b: Union[bytes, elliptic_curve.ec_element], min_shares: int, total_shares: int ) -> Tuple[List[umbral.RekeyFrag], Tuple[bytes, bytes]]: """ Performs a split-key re-encryption key generation where a minimum number of shares `min_shares` are required to reproduce a rekey. Will split a rekey inot `total_shares`. This also generates an ephemeral keypair for the recipient as `pubkey_b`. :param privkey_a: Privkey to re-encrypt from :param pubkey_b: Public key to re-encrypt for (w/ ephemeral key) :param min_shares: Minium shares needed to reproduce a rekey :param total_shares: Total shares to generate from split-rekey gen :return: A tuple containing a list of rekey frags, and a tuple of the encrypted ephemeral key data (enc_symm_key, enc_eph_privkey) """ eph_privkey, (encrypted_key, encrypted_message) = _internal._ecies_gen_ephemeral_key(pubkey_b) kfrags = ecies_split_rekey(privkey_a, eph_privkey, min_shares, total_shares) pfrag = PFrag(ephemeral_data_as_bytes=None, encrypted_key=encrypted_key, encrypted_message=encrypted_message) return (kfrags, pfrag)
def encrypt(self, data: bytes, pubkey: bytes = None) -> Tuple[bytes, bytes]: """ :data: The data to encrypt. If derived per-subpath, it's a symmetric key to use for block ciphers. :pubkey: Optional public key to encrypt for. If not given, encrypt for ours :returns: (ekey, edata) where ekey is needed for recepient to reconstruct a DH secret, edata is data encrypted with this DH secret. The output should be treated as a monolithic ciphertext outside of this class """ if pubkey is None: pubkey = self._pub_key else: pubkey = ec.deserialize(self.pre.ecgroup, pubkey) key, ekey = self.pre.encapsulate(pubkey) cipher = SecretBox(key) return ((ec.serialize(ekey.ekey), None), cipher.encrypt(data))
def crop_scale(self, dimensions: Tuple[int, int]) -> 'Segment': """ Returns ------- A new Segment, cropped and/or scaled as necessary to reach specified dimensions """ segment = self.copy() dimensions = Dimensions(*dimensions) if segment.aspect_ratio != dimensions.aspect_ratio: # Crop segment to match aspect ratio segment = segment.crop_to_aspect_ratio(dimensions.aspect_ratio) if segment.dimensions != dimensions: # Resize segment to reach final dimensions segment = segment.resize(dimensions) return segment
def store_message_in_file(self, message: Message) -> Tuple[str, str]: """ Stores a message in a json file. The filename of the file will be the current time. Also generates a response file location in which the executable may write a response into :param message: The message to save :return: The location of the stored message json file, the location of the response file """ json_data = message.to_dict() while True: # Make sure that file does not exist message_file = os.path.join(self.message_dir, str(time.time())) if not os.path.isfile(message_file): with open(message_file + ".json", 'w') as json_file: json.dump(json_data, json_file) return message_file + ".json", message_file + "-response.json" # noinspection PyMethodMayBeStatic
def run_plugin(executable: str, args: List[str], timeout: int) -> Tuple[str, List[str]]: run_args = [executable] + args try: proc = await asyncio.create_subprocess_exec( *run_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) except FileNotFoundError: raise NagiosError('executable not found') stdin_data, stderr_data = await proc.communicate() std_data = stdin_data + stderr_data await proc.wait() if proc.returncode not in [STATUS_OK, STATUS_WARNING, STATUS_CRITICAL]: raise MonitorFailedError(std_data) text, perf = parse_plugin_output(std_data) if proc.returncode not in [STATUS_OK, STATUS_WARNING]: raise MonitorFailedError(text) return text, perf
def callAllMethods(obj: object) -> List[Tuple[str, Any]]: results = [] # type: List[Tuple[str, Any]] for method in dir(obj): if method == '__hash__': continue if callable(getattr(obj, method)): try: res = getattr(obj, method)() if isinstance(res, bool) or isinstance(res, int): results.append((method, res)) if isinstance(res, str): # Ignore anything with 0x in it since memory addresses change if '0x' not in res: results.append((method, res)) except: if '0x' not in method: results.append(('except', method)) return results
def image_reading(path: str, resized_size: Tuple[int, int]=None, data_augmentation: bool=False, padding: bool=False) -> Tuple[tf.Tensor, tf.Tensor]: # Read image image_content = tf.read_file(path, name='image_reader') image = tf.cond(tf.equal(tf.string_split([path], '.').values[1], tf.constant('jpg', dtype=tf.string)), true_fn=lambda: tf.image.decode_jpeg(image_content, channels=1, try_recover_truncated=True), # TODO channels = 3 ? false_fn=lambda: tf.image.decode_png(image_content, channels=1), name='image_decoding') # Data augmentation if data_augmentation: image = augment_data(image) # Padding if padding: with tf.name_scope('padding'): image, img_width = padding_inputs_width(image, resized_size, increment=CONST.DIMENSION_REDUCTION_W_POOLING) # Resize else: image = tf.image.resize_images(image, size=resized_size) img_width = tf.shape(image)[1] with tf.control_dependencies([tf.assert_equal(image.shape[:2], resized_size)]): return image, img_width
def process_(child) -> Tuple[str, datetime]: name, text = child.name, child.get_text() try: # Try converting text to an integer text = int(text) # Ignore if we get a value we can't cast to int except ValueError: pass if name == "my_last_updated": text = datetime.fromtimestamp(float(text)) if name in ('my_finish_date', "my_start_date", "series_end", "series_start"): try: text = datetime.strptime(text, "%Y-%m-%d") except ValueError: text = datetime.fromtimestamp(0) # Return name and text in tuple return name, text
def _sort_dataset_by_padding(dataset: Dataset, sorting_keys: List[Tuple[str, str]], # pylint: disable=invalid-sequence-index padding_noise: float = 0.0) -> Dataset: """ Sorts the ``Instances`` in this ``Dataset`` by their padding lengths, using the keys in ``sorting_keys`` (in the order in which they are provided). ``sorting_keys`` is a list of ``(field_name, padding_key)`` tuples. """ instances_with_lengths = [] for instance in dataset.instances: padding_lengths = cast(Dict[str, Dict[str, float]], instance.get_padding_lengths()) if padding_noise > 0.0: noisy_lengths = {} for field_name, field_lengths in padding_lengths.items(): noisy_lengths[field_name] = add_noise_to_dict_values(field_lengths, padding_noise) padding_lengths = noisy_lengths instance_with_lengths = ([padding_lengths[field_name][padding_key] for (field_name, padding_key) in sorting_keys], instance) instances_with_lengths.append(instance_with_lengths) instances_with_lengths.sort(key=lambda x: x[0]) return Dataset([instance_with_lengths[-1] for instance_with_lengths in instances_with_lengths])
def text_to_instance(self, # type: ignore question_text: str, passage_text: str, token_spans: List[Tuple[int, int]] = None, answer_texts: List[str] = None, question_tokens: List[Token] = None, passage_tokens: List[Token] = None) -> Instance: # pylint: disable=arguments-differ if not question_tokens: question_tokens = self._tokenizer.tokenize(question_text) if not passage_tokens: passage_tokens = self._tokenizer.tokenize(passage_text) return util.make_reading_comprehension_instance(question_tokens, passage_tokens, self._token_indexers, passage_text, token_spans, answer_texts)
def evaluate_result(result: dict, group_order: list) -> Tuple[dict, OrderedDict]: """ Evaluate and describe a complete result dictionary. As a result, a dictionary of the groups is returned. Each group has another dictionary specifying the amount of good, the amount of bad and the amount of neutral results as well as the overall group rating and the ratio of good results. """ if 'reachable' in result and not result['reachable']: return UnrateableSiteEvaluation(), {} evaluated_groups = {} described_groups = OrderedDict() for group in group_order: if group not in CHECKS: continue evaluated_groups[group], described_groups[group] = evaluate_group( group, result) return SiteEvaluation(evaluated_groups, group_order), described_groups
def _parse_new_results(previous_results: List[Tuple[list, dict]]) -> tuple: """ Parse previous results, split into raw data, results and errors and merge data from multiple test suites. """ raw = [] result = {} errors = [] for e in previous_results: if isinstance(e, (list, tuple)): scan_host = e[0] test = e[1] if isinstance(e[2], dict): # add test specifier to each raw data element for identifier, raw_elem in e[2].items(): raw.append(dict( identifier=identifier, scan_host=scan_host, test=test, **raw_elem)) if isinstance(e[3], dict): result.update(e[3]) else: errors.append(e) return raw, result, errors
def get_form_and_formset( request: HttpRequest=None, extra: int=1, initial_form: SessionBaseForm=None, initial_formset=None ) -> Tuple[SessionBaseForm, Any]: ItemMovementFormSet = forms.formset_factory(ItemMovementForm, extra=extra) if request: form = SessionBaseForm(request.POST, prefix='session') formset = ItemMovementFormSet(request.POST, prefix='items') elif initial_form or initial_formset: form = SessionBaseForm(initial=initial_form, prefix='session') formset = ItemMovementFormSet(initial=initial_formset, prefix='items') else: form = SessionBaseForm(prefix='session') formset = ItemMovementFormSet(prefix='items') return form, formset
def _split_sample( split: Callable[[object], bool], X: np.ndarray, y: np.ndarray ) -> Tuple[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]]: """ Split X, y sample set in two with a split function :return: ((X_left, y_left), (X_right, y_right)) """ if split.type is 'numerical': left_indexes = X[:, split.attribute] < split.criteria right_indexes = ~left_indexes else: Z = ( pd.Index(pd.unique(split.criteria)) .get_indexer(X[:, split.attribute])) left_indexes = np.where(Z >= 0)[0] right_indexes = np.where(Z < 0)[0] left = X[left_indexes], y[left_indexes] right = X[right_indexes], y[right_indexes] return left, right
def linearize(path: List, obstacles: List[Tuple]) ->List: """ Remplit l'espace entre deux cases non consecutives :param path: -> Liste de coordonnees du chemin :param obstacles: -> Liste de coordonnees des obstacles :return: -> Une liste linearisee """ y_dir = 1 if path[0][1] < path[-1][1] else -1 x_dir = 1 if path[0][0] < path[-1][0] else -1 list2 = [] for i in range(1, len(path) + 1): try: list2.append(path[i - 1]) if path[i - 1][0] != path[i][0] and path[i - 1][1] != path[i][1]: if (path[i - 1][0], path[i - 1][1] + y_dir) not in obstacles: list2.append((path[i - 1][0], path[i - 1][1] + y_dir)) elif (path[i - 1][0] + x_dir, path[i - 1][1]) not in obstacles: list2.append((path[i - 1][0] + x_dir, path[i - 1][1])) except IndexError: continue return list(remove_duplicates(list2))
def verif_conditions(self, entitee: Entitee, cible: Tuple[int, int]) -> bool: """Cette fonction détermine si le sort est valide""" if entitee.var_attributs.ap >= self.cost: if entitee.combat_coords[0] == cible[0]: if not (self.max_range >= abs(entitee.combat_coords[1] - cible[1]) >= self.min_range): return False elif entitee.combat_coords[1] == cible[1]: if not (self.max_range >= abs(entitee.combat_coords[0] - cible[0]) >= self.min_range): return False else: return False cases_traversee = bresenham(entitee.combat_coords, cible) for i in cases_traversee: if i in entitee.combat.map.fullobs: return False entitee.var_attributs.ap -= self.cost return True return False
def config_per_platform(config: ConfigType, domain: str) -> Iterable[Tuple[Any, Any]]: """Generator to break a component config into different platforms. For example, will find 'switch', 'switch 2', 'switch 3', .. etc """ for config_key in extract_domain_configs(config, domain): platform_config = config[config_key] if not platform_config: continue elif not isinstance(platform_config, list): platform_config = [platform_config] for item in platform_config: try: platform = item.get(CONF_PLATFORM) except AttributeError: platform = None yield platform, item
def metrics_from_counts(counts: List[int]) -> Tuple[float, float, float, float]: """ Computes classifier metrics given counts of correct, incorrect, missing and spurious :param counts: A (4,) vector of (correct, incorrect, missing, spurious) :return: acc, recall, precision and f1 """ eps = 1e-16 correct, incorrect, missing, spurious = counts acc = correct / (correct + incorrect + missing + spurious + eps) recall = correct / (correct + incorrect + missing + eps) precision = correct / (correct + incorrect + spurious + eps) f1 = 2 * (precision * recall) / (recall + precision + eps) return acc, recall, precision, f1
def _batch_questions(self, questions: List[Tuple[QASetting, List[Answer]]], batch_size, is_eval: bool): """Optionally shuffles and batches annotations. By default, all annotations are shuffled (if self.shuffle(is_eval) and then batched. Override this method if you want to customize the batching, e.g., to do stratified sampling, sampling with replacement, etc. Args: - annotations: List of annotations to shuffle & batch. - is_eval: Whether batches are generated for evaluation. Returns: Batch iterator """ rng = _rng if self._shuffle(is_eval) else None return shuffle_and_batch(questions, batch_size, rng)
def train(self, optimizer, training_set: Iterable[Tuple[QASetting, List[Answer]]], batch_size: int, max_epochs=10, hooks=tuple(), l2=0.0, clip=None, clip_op=tf.clip_by_value, summary_writer=None, **kwargs): """ This method trains the reader (and changes its state). Args: optimizer: TF optimizer training_set: the training instances. batch_size: size of training batches max_epochs: maximum number of epochs hooks: TrainingHook implementations that are called after epochs and batches l2: whether to use l2 regularization clip: whether to apply gradient clipping and at which value clip_op: operation to perform for clipping """ batches, loss, min_op, summaries = self._setup_training( batch_size, clip, optimizer, training_set, summary_writer, l2, clip_op, **kwargs) self._train_loop(min_op, loss, batches, hooks, max_epochs, summaries, summary_writer, **kwargs)
def __init__(self, text: str, span: Tuple[int, int] = None, doc_idx: int = 0, score: float = 1.0): """ Create a new answer. Args: text: The text string of the answer. span: For extractive QA, a span in the support documents. The triple `(start, end)` represents a span in support document with index `doc_index` in the ordered sequence of doc_idx: index of document where answer was found support documents. The span starts at `start` and ends at `end` (exclusive). score: the score a model associates with this answer. """ assert span is None or len(span) == 2, "span should be (char_start, char_end) tuple" self.score = score self.span = span self.doc_idx = doc_idx self.text = text
def quadratic(a: float, b: float, c: float) -> Tuple[complex, complex]: ''' Compute the roots of the quadratic equation: ax^2 + bx + c = 0 Written in Python as: a*x**2 + b*x + c == 0.0 For example: >>> x1, x2 = quadratic(a=8, b=22, c=15) >>> x1 (-1.25+0j) >>> x2 (-1.5+0j) >>> 8*x1**2 + 22*x1 + 15 0j >>> 8*x2**2 + 22*x2 + 15 0j ''' discriminant = cmath.sqrt(b**2.0 - 4.0*a*c) x1 = (-b + discriminant) / (2.0 * a) x2 = (-b - discriminant) / (2.0 * a) return x1, x2
def tokenize(sentences: List[str]) -> Tuple[List[int], List[List[str]]]: tokenizer = Tokenizer() lengths = [] texts = [] for s in sentences: result = tokenizer.tokenize(s) surfaces = [t.surface for t in result] lengths.append(len(surfaces)) text = ' '.join(surfaces) texts.append(text) return lengths, texts
def option_make(optionname, # type: AnyStr optiontype, # type: AnyStr configs, # type: Iterable[OptionValue] nestedopts=None # type: Optional[StyleDef] ): # type: (...) -> Tuple[str, str, List[OptionValue], Optional[StyleDef]] configs = [typeconv(c) for c in configs] return unistr(optionname), unistr(optiontype), configs, nestedopts
def style_make(options=None): # type: (Union[dict, List[Tuple[str, OptionValue]], None]) -> Style if options is None: return Style() if isinstance(options, dict): s = style_make() for k, v in sorted(options.items()): if isinstance(v, dict): v = style_make(v) set_option(s, k, v) return s raise TypeError('options must be a dict or None')