我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Text()。
def __init__(self, capture_exc_info=False): # type: (bool) -> None """ :param capture_exc_info: Whether to capture `sys.exc_info` when an handling an exception. This is turned off by default to reduce memory usage, but it is useful in certain cases (e.g., if you want to send exceptions to a logger that expect exc_info). Regardless, you can still check ``self.has_exceptions`` to see if an exception occurred. """ super(MemoryHandler, self).__init__() self.messages = OrderedDict() # type: Union[OrderedDict, Dict[Text, List[FilterMessage]]] self.has_exceptions = False self.capture_exc_info = capture_exc_info self.exc_info = [] # type: List[Tuple[type, Exception, TracebackType]]
def get_errors(self, with_context=False): # type: (bool) -> Dict[Text, List[Dict[Text, Text]]] """ Returns a dict of error messages generated by the Filter, in a format suitable for inclusion in e.g., an API 400 response payload. :param with_context: Whether to include the context object in the result (for debugging purposes). Note: context is usually not safe to expose to end users! """ return { key: [m.as_dict(with_context) for m in messages] for key, messages in iteritems(self.filter_messages) }
def is_filter_type(target): # type: (Any) -> Union[bool, Text] """ Returns whether the specified object can be registered as a filter. :return: Returns ``True`` if the object is a filter. Otherwise, returns a string indicating why it is not valid. """ if not is_class(target): return 'not a class' if not issubclass(target, BaseFilter): return 'does not extend BaseFilter' if is_abstract(target): return 'abstract class' return True
def __init__(self, pattern, keys=None): # type: (Union[Text, regex._pattern_type, re._pattern_type], Optional[Sequence[Text]]) -> None """ :param pattern: Regex used to split incoming string values. IMPORTANT: If you specify your own compiled regex, be sure to add the ``UNICODE`` flag for Unicode support! :param keys: If set, the resulting list will be converted into an OrderedDict, using the specified keys. IMPORTANT: If ``keys`` is set, the split value's length must be less than or equal to ``len(keys)``. """ super(Split, self).__init__() self.regex = ( pattern if isinstance(pattern, (regex._pattern_type, re._pattern_type)) else regex.compile(pattern, regex.UNICODE) ) self.keys = keys
def __init__(self, encoding='utf-8', normalize=False): # type: (Text, bool) -> None """ :param encoding: Used to decode non-unicode values. :param normalize: Whether to normalize the unicode value before converting back into bytes: - Convert to NFC form. - Remove non-printable characters. - Convert all line endings to unix-style ('\n'). Note that ``normalize`` is ``False`` by default for :py:class:`ByteString`, but ``True`` by default for :py:class:`Unicode`. """ super(ByteString, self).__init__(encoding, normalize) # noinspection SpellCheckingInspection
def add_route(self, command, adapter): # type: (Text, AdapterSpec) -> RoutingWrapper """ Adds a route to the wrapper. :param command: The name of the command to route (e.g., "attachToTangle"). :param adapter: The adapter object or URI to route requests to. """ if not isinstance(adapter, BaseAdapter): try: adapter = self.adapter_aliases[adapter] except KeyError: self.adapter_aliases[adapter] = adapter = resolve_adapter(adapter) self.routes[command] = adapter return self
def normalise_response_json(self, data): # type: (Dict[Text, Any]) -> List[Dict[Text, Any]] """Transform data to wit.ai format.""" entities = {} for entity in data["entities"]: entities[entity["entity"]] = { "confidence": None, "type": "value", "value": entity["value"], "start": entity["start"], "end": entity["end"] } return [ { "_text": data["text"], "confidence": data["intent"]['confidence'], "intent": data["intent"]['name'], "entities": entities } ]
def normalise_response_json(self, data): # type: (Dict[Text, Any]) -> Dict[Text, Any] """Transform data to luis.ai format.""" top_intent = self._top_intent(data) ranking = self._ranking(data) return { "query": data["text"], "topScoringIntent": top_intent, "intents": ranking, "entities": [ { "entity": e["value"], "type": e["entity"], "startIndex": None, "endIndex": None, "score": None } for e in data["entities"] ] if "entities" in data else [] }
def normalise_request_json(self, data): # type: (Dict[Text, Any]) -> Dict[Text, Any] _data = {} _data["text"] = data["q"][0] if type(data["q"]) == list else data["q"] if not data.get("project"): _data["project"] = "default" elif type(data["project"]) == list: _data["project"] = data["project"][0] else: _data["project"] = data["project"] if data.get("model"): _data["model"] = data["model"][0] if type(data["model"]) == list else data["model"] _data['time'] = data["time"] if "time" in data else None return _data
def guess_format(files): # type: (List[Text]) -> Text """Given a set of files, tries to guess which data format is used.""" for filename in files: with io.open(filename, encoding="utf-8-sig") as f: raw_data = "" try: raw_data = f.read() file_data = json.loads(raw_data) if "data" in file_data and type(file_data.get("data")) is list: return WIT_FILE_FORMAT elif "luis_schema_version" in file_data: return LUIS_FILE_FORMAT elif "supportedLanguages" in file_data: return DIALOGFLOW_FILE_FORMAT elif "rasa_nlu_data" in file_data: return RASA_FILE_FORMAT except ValueError: if "## intent:" in raw_data: return MARKDOWN_FILE_FORMAT return UNK_FILE_FORMAT
def get_component_class(component_name): # type: (Text) -> Optional[Type[Component]] """Resolve component name to a registered components class.""" if component_name not in registered_components: try: return utils.class_from_module_path(component_name) except Exception: raise Exception( "Failed to find component class for '{}'. Unknown " "component name. Check your configured pipeline and make " "sure the mentioned component is not misspelled. If you " "are creating your own component, make sure it is either " "listed as part of the `component_classes` in " "`rasa_nlu.registry.py` or is a proper name of a class " "in a module.".format(component_name)) return registered_components[component_name]
def validate_requirements(component_names, dev_requirements_file="alt_requirements/requirements_dev.txt"): # type: (List[Text], Text) -> None """Ensures that all required python packages are installed to instantiate and used the passed components.""" from rasa_nlu import registry # Validate that all required packages are installed failed_imports = set() for component_name in component_names: component_class = registry.get_component_class(component_name) failed_imports.update(find_unavailable_packages(component_class.required_packages())) if failed_imports: # pragma: no cover # if available, use the development file to figure out the correct version numbers for each requirement all_requirements = _read_dev_requirements(dev_requirements_file) if all_requirements: missing_requirements = [r for i in failed_imports for r in all_requirements[i]] raise Exception("Not all required packages are installed. " + "Failed to find the following imports {}. ".format(", ".join(failed_imports)) + "To use this pipeline, you need to install the missing dependencies, e.g. by running:\n\t" + "> pip install {}".format(" ".join(missing_requirements))) else: raise Exception("Not all required packages are installed. " + "To use this pipeline, you need to install the missing dependencies. " + "Please install {}".format(", ".join(failed_imports)))
def validate_arguments(pipeline, context, allow_empty_pipeline=False): # type: (List[Component], Dict[Text, Any], bool) -> None """Validates a pipeline before it is run. Ensures, that all arguments are present to train the pipeline.""" # Ensure the pipeline is not empty if not allow_empty_pipeline and len(pipeline) == 0: raise ValueError("Can not train an empty pipeline. " + "Make sure to specify a proper pipeline in the configuration using the `pipeline` key." + "The `backend` configuration key is NOT supported anymore.") provided_properties = set(context.keys()) for component in pipeline: for r in component.requires: if r not in provided_properties: raise Exception("Failed to validate at component '{}'. Missing property: '{}'".format( component.name, r)) provided_properties.update(component.provides)
def load_component(self, component_name, model_dir, model_metadata, **context): # type: (Text, Text, Metadata, **Any) -> Component """Tries to retrieve a component from the cache, calls `load` to create a new component.""" from rasa_nlu import registry from rasa_nlu.model import Metadata try: cached_component, cache_key = self.__get_cached_component(component_name, model_metadata) component = registry.load_component_by_name(component_name, model_dir, model_metadata, cached_component, **context) if not cached_component: # If the component wasn't in the cache, let us add it if possible self.__add_to_cache(component, cache_key) return component except MissingArgumentError as e: # pragma: no cover raise Exception("Failed to load component '{}'. {}".format(component_name, e))
def parse(self, text, time=None): # type: (Text) -> Dict[Text, Any] """Parse the input text, classify it and return pipeline result. The pipeline result usually contains intent and entities.""" if not text: # Not all components are able to handle empty strings. So we need # to prevent that... This default return will not contain all # output attributes of all components, but in the end, no one should # pass an empty string in the first place. output = self.default_output_attributes() output["text"] = "" return output message = Message(text, self.default_output_attributes(), time=time) for component in self.pipeline: component.process(message, **self.context) output = self.default_output_attributes() output.update(message.as_dict(only_output_properties=True)) return output
def do_train(config, # type: RasaNLUConfig component_builder=None # type: Optional[ComponentBuilder] ): # type: (...) -> Tuple[Trainer, Interpreter, Text] """Loads the trainer and the data and runs the training of the model.""" # Ensure we are training a model that we can save in the end # WARN: there is still a race condition if a model with the same name is # trained in another subprocess trainer = Trainer(config, component_builder) persistor = create_persistor(config) training_data = load_data(config['data'], config['language']) interpreter = trainer.train(training_data) persisted_path = trainer.persist(config['path'], persistor, config['project'], config['fixed_model_name']) return trainer, interpreter, persisted_path
def load(cls, model_dir=None, # type: Text model_metadata=None, # type: Metadata cached_component=None, # type:Optional[DucklingExtractor] **kwargs # type: **Any ): # type: (...) -> DucklingExtractor persisted = os.path.join(model_dir, model_metadata.get("ner_duckling_persisted")) if cached_component: duckling = cached_component.duckling else: language = model_metadata.get("language") duckling = cls.create_duckling_wrapper(language) if os.path.isfile(persisted): with io.open(persisted, encoding='utf-8') as f: persisted_data = json.loads(f.read()) return DucklingExtractor(duckling, persisted_data["dimensions"]) return DucklingExtractor(duckling)
def load(cls, model_dir=None, # type: Text model_metadata=None, # type: Metadata cached_component=None, # type: Optional[DucklingHTTPExtractor] **kwargs # type: **Any ): # type: (...) -> DucklingHTTPExtractor persisted = os.path.join(model_dir, model_metadata.get(cls.name)) config = kwargs.get("config", {}) dimensions = None if os.path.isfile(persisted): with io.open(persisted, encoding='utf-8') as f: persisted_data = simplejson.loads(f.read()) dimensions = persisted_data["dimensions"] return DucklingHTTPExtractor(config.get("duckling_http_url"), model_metadata.get("language"), dimensions)
def load(cls, model_dir, # type: Text model_metadata, # type: Metadata cached_component, # type: Optional[CRFEntityExtractor] **kwargs # type: **Any ): # type: (...) -> CRFEntityExtractor from sklearn.externals import joblib if model_dir and model_metadata.get("entity_extractor_crf"): meta = model_metadata.get("entity_extractor_crf") ent_tagger = joblib.load(os.path.join(model_dir, meta["model_file"])) return CRFEntityExtractor(ent_tagger=ent_tagger, entity_crf_features=meta['crf_features'], entity_crf_BILOU_flag=meta['BILOU_flag']) else: return CRFEntityExtractor()
def persist(self, model_dir): # type: (Text) -> Dict[Text, Any] """Persist this model into the passed directory. Returns the metadata necessary to load the model again.""" from sklearn.externals import joblib if self.ent_tagger: model_file_name = os.path.join(model_dir, "crf_model.pkl") joblib.dump(self.ent_tagger, model_file_name) return {"entity_extractor_crf": {"model_file": "crf_model.pkl", "crf_features": self.crf_features, "BILOU_flag": self.BILOU_flag, "version": 1}} else: return {"entity_extractor_crf": None}
def _from_json_to_crf(self, message, entity_offsets): # type: (Message, List[Tuple[int, int, Text]]) -> List[Tuple[Text, Text, Text, Text]] """Takes the json examples and switches them to a format which crfsuite likes.""" from spacy.gold import GoldParse doc = message.get("spacy_doc") gold = GoldParse(doc, entities=entity_offsets) ents = [l[5] for l in gold.orig_annot] if '-' in ents: logger.warn("Misaligned entity annotation in sentence '{}'. ".format(doc.text) + "Make sure the start and end values of the annotated training " + "examples end at token boundaries (e.g. don't include trailing whitespaces).") if not self.BILOU_flag: for i, entity in enumerate(ents): if entity.startswith('B-') or \ entity.startswith('I-') or \ entity.startswith('U-') or \ entity.startswith('L-'): ents[i] = entity[2:] # removes the BILOU tags return self._from_text_to_crf(message, ents)
def __init__(self, name=None): # type: (Optional[Text]) -> None super(TestFilterBravo, self).__init__() self.name = name
def __init__(self, message, context, exc_info=None): # type: (Text, dict, Text) -> None """ :param exc_info: Exception traceback (if applicable). """ super(FilterMessage, self).__init__() self.message = message self.context = context self.code = context.get('code') or message self.exc_info = exc_info
def errors(self): # type: () -> Dict[Text, List[Dict[Text, Text]]] """ Returns a dict of error messages generated by the Filter, in a format suitable for inclusion in e.g., an API 400 response payload. E.g.:: { 'authToken': [ { 'code': 'not_found', 'message': 'No AuthToken found matching this value.', }, ], 'data.foobar': [ { 'code': 'unexpected', 'message': 'Unexpected key "foobar".', }, ], # etc. } """ return self.get_errors()
def filter_messages(self): # type: () -> Dict[Text, List[FilterMessage]] """ Returns the raw FilterMessages that were generated by the Filter. """ self.full_clean() return self._handler.messages
def __getattr__(self, item): # type: (Text) -> Type[BaseFilter] return self[item]
def _get_cache(self): # type: () -> Dict[Text, Type[BaseFilter]] if self._cache is None: self._cache = {} try: for target in iter_entry_points(self.group): # type: EntryPoint filter_ = target.load() ift_result = is_filter_type(filter_) if ift_result is True: logger.debug( 'Registering extension filter ' '{cls.__module__}.{cls.__name__} as {name}.'.format( cls = filter_, name = target.name, ), ) self._cache[target.name] = filter_ else: logger.debug( 'Using legacy extension loader for ' '{target.name} ({reason}).'.format( reason = ift_result, target = target, ), ) self._cache.update(iter_filters_in(filter_)) except DeprecationWarning: # The user has ``simplefilter('error')`` set; reset the # cache so that the next time we try to load extension # filters, we don't miss anything. self._cache = None raise # noinspection PyTypeChecker return self._cache
def __init__(self, to_nearest = 1, rounding = ROUND_HALF_UP, result_type = DecimalType, ): # type: (Union[int, Text, DecimalType], Text, type) -> None """ :param to_nearest: The value that the filter should round to. E.g., ``Round(1)`` rounds to the nearest whole number. If you want to round to a float value, it is recommended that you provide it as a string or Decimal, to avoid floating point problems. :param rounding: Controls how to round values. :param result_type: The type of result to return. """ super(Round, self).__init__() self.to_nearest = DecimalType(to_nearest) # Rounding to negative values isn't supported. # I'm not even sure if that concept is valid. Min(DecimalType('0')).apply(self.to_nearest) self.result_type = result_type self.rounding = rounding
def ip_type(self): # type: () -> Text """ Returns the IP address versions that this Filter accepts. """ return '/'.join(filter(None, [ 'IPv4' if self.ipv4 else None, 'IPv6' if self.ipv6 else None, ]))
def __init__(self, decoder=json.loads): # type: (Callable[Text, Any]) -> None super(JsonDecode, self).__init__() self.decoder = decoder
def _apply(self, value): value = self._filter(value, Type(text_type)) # type: Text if self._has_errors: return None try: # :see: http://stackoverflow.com/a/6921760 return self.decoder(value, object_pairs_hook=OrderedDict) except ValueError: return self._invalid_value(value, self.CODE_INVALID, exc_info=True)
def __init__( self, max_bytes, truncate = True, prefix = '', encoding = 'utf-8', ): # type: (int, bool, Text, Text) -> None """ :param max_bytes: Max number of bytes to allow. :param truncate: Whether to truncate values that are too long. Set this to ``False`` to save system resources when you know that you will reject values that are too long. :param prefix: Prefix to apply to truncated values. Ignored when ``truncate`` is ``False``. :param encoding: The character encoding to check against. Note: This filter is optimized for UTF-8. """ super(MaxBytes, self).__init__() self.encoding = encoding self.max_bytes = max_bytes self.prefix = prefix self.truncate = truncate
def __init__(self, leading=r'[\p{C}\s]+', trailing=r'[\p{C}\s]+'): # type: (Text, Text) -> None """ :param leading: Regex to match at the start of the string. :param trailing: Regex to match at the end of the string. """ super(Strip, self).__init__() if leading: self.leading = regex.compile( r'^{pattern}'.format(pattern=leading), regex.UNICODE, ) else: self.leading = None if trailing: self.trailing = regex.compile( r'{pattern}$'.format(pattern=trailing), regex.UNICODE, ) else: self.trailing = None
def __init__(self, encoding='utf-8', normalize=True): # type: (Text, bool) -> None """ :param encoding: Used to decode non-unicode values. :param normalize: Whether to normalize the resulting value: - Convert to NFC form. - Remove non-printable characters. - Convert all line endings to unix-style ('\n'). """ super(Unicode, self).__init__() self.encoding = encoding self.normalize = normalize if self.normalize: # # Compile the regex that we will use to remove non- # printables from the resulting unicode. # http://www.regular-expressions.info/unicode.html#category # # Note: using a double negative so that we can exclude # newlines, which are technically considered control chars. # http://stackoverflow.com/a/3469155 # self.npr = regex.compile(r'[^\P{C}\s]+', regex.UNICODE)
def _apply(self, value): decoded = super(ByteString, self)._apply(value) # type: Text # # No need to catch UnicodeEncodeErrors here; UTF-8 can handle # any unicode value. # # Technically, we could get this error if we encounter a code # point beyond U+10FFFF (the highest valid code point in the # Unicode standard). # # However, it's not possible to create a `unicode` object with # an invalid code point, so we wouldn't even be able to get # this far if the incoming value contained a character that # can't be represented using UTF-8. # # Note that in some versions of Python, it is possible (albeit # really difficult) to trick Python into creating unicode # objects with invalid code points, but it generally requires # using specific codecs that aren't UTF-8. # # Example of exploit and release notes from the Python release # (2.7.6) that fixes the issue: # # - https://gist.github.com/rspeer/7559750 # - https://hg.python.org/cpython/raw-file/99d03261c1ba/Misc/NEWS # # Normally we return ``None`` if we get any errors, but in this # case, we'll let the superclass method decide. return decoded if self._has_errors else decoded.encode('utf-8')
def __init__(self, region: Text, key: Text): self._region = region self._key = key
def _format_api_base(self) -> Text: return 'https://{region}.api.cognitive.microsoft.com'.format( region=self._region)
def _format_headers(self, kv: Iterable[Header]) -> Dict[Text, Text]: headers = {self._auth_keyname: self._key} for key, value in kv: headers[key] = value return headers
def _get_json(self, url: Text, **kwargs) -> Dict: return self._make_json_request('get', url, **kwargs)
def _post_json(self, url: Text, **kwargs) -> Dict: return self._make_json_request('post', url, **kwargs)
def _auth_keyname(self) -> Text: raise NotImplementedError
def _format_projects_endpoint(self) -> Text: return '{base}/customvision/v1.0/Training/projects'.format( base=self._format_api_base())
def _format_new_project_endpoint(self, project_name: Text) -> Text: query = (('name', project_name), ('description', ''), ('classifier', 'MultiLabel'), ('useNegativeSet', 'true')) return '{base}?{query}'.format( base=self._format_projects_endpoint(), query='&'.join('{}={}'.format(*kv) for kv in query))
def _format_project_endpoint(self, project_id: Text) -> Text: return '{base}/{project_id}'.format( base=self._format_projects_endpoint(), project_id=project_id)
def _format_tags_endpoint(self, project_id: Text) -> Text: return '{base}/tags'.format( base=self._format_project_endpoint(project_id))
def _format_training_endpoint(self, project_id: Text) -> Text: return '{base}/train'.format( base=self._format_project_endpoint(project_id))
def _format_image_url(self, project_id: Text, tags: Iterable[Tag]) -> Text: return '{base}/images/image?tagIds={tagIds}'.format( base=self._format_project_endpoint(project_id), tagIds='&tagIds='.join(tag.Id for tag in tags))
def _fetch_project_tags(self, project_id: Text) -> Iterable[Tag]: url = self._format_tags_endpoint(project_id) response = self._get_json(url) return [create(Tag, _) for _ in response['Tags']]
def _fetch_tags_for_names(self, project_id: Text, names: Iterable[Text]) -> Iterable[Tag]: all_tags = {tag.Name: tag for tag in self._fetch_project_tags(project_id)} return [all_tags[name] for name in names]
def create_project(self, project_name: Text) -> Project: url = self._format_new_project_endpoint(project_name) response = self._post_json(url, headers=[('Content-Length', '0')]) return create(Project, response)