我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用six.text_type()。
def is_crm_leader(resource, retry=False): """ Returns True if the charm calling this is the elected corosync leader, as returned by calling the external "crm" command. We allow this operation to be retried to avoid the possibility of getting a false negative. See LP #1396246 for more info. """ if resource == DC_RESOURCE_NAME: return is_crm_dc() cmd = ['crm', 'resource', 'show', resource] try: status = subprocess.check_output(cmd, stderr=subprocess.STDOUT) if not isinstance(status, six.text_type): status = six.text_type(status, "utf-8") except subprocess.CalledProcessError: status = None if status and get_unit_hostname() in status: return True if status and "resource %s is NOT running" % (resource) in status: raise CRMResourceNotFound("CRM resource %s not found" % (resource)) return False
def bool_from_string(value): """Interpret string value as boolean. Returns True if value translates to True otherwise False. """ if isinstance(value, six.string_types): value = six.text_type(value) else: msg = "Unable to interpret non-string value '%s' as boolean" % (value) raise ValueError(msg) value = value.strip().lower() if value in ['y', 'yes', 'true', 't', 'on']: return True elif value in ['n', 'no', 'false', 'f', 'off']: return False msg = "Unable to interpret string value '%s' as boolean" % (value) raise ValueError(msg)
def is_crm_dc(): """ Determine leadership by querying the pacemaker Designated Controller """ cmd = ['crm', 'status'] try: status = subprocess.check_output(cmd, stderr=subprocess.STDOUT) if not isinstance(status, six.text_type): status = six.text_type(status, "utf-8") except subprocess.CalledProcessError as ex: raise CRMDCNotFound(str(ex)) current_dc = '' for line in status.split('\n'): if line.startswith('Current DC'): # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum current_dc = line.split(':')[1].split()[0] if current_dc == get_unit_hostname(): return True elif current_dc == 'NONE': raise CRMDCNotFound('Current DC: NONE') return False
def Parse(text, message, ignore_unknown_fields=False): """Parses a JSON representation of a protocol message into a message. Args: text: Message JSON representation. message: A protocol buffer message to merge into. ignore_unknown_fields: If True, do not raise errors for unknown fields. Returns: The same message passed as argument. Raises:: ParseError: On JSON parsing problems. """ if not isinstance(text, six.text_type): text = text.decode('utf-8') try: if sys.version_info < (2, 7): # object_pair_hook is not supported before python2.7 js = json.loads(text) else: js = json.loads(text, object_pairs_hook=_DuplicateChecker) except ValueError as e: raise ParseError('Failed to load JSON: {0}.'.format(str(e))) return ParseDict(js, message, ignore_unknown_fields)
def _ConvertInteger(value): """Convert an integer. Args: value: A scalar value to convert. Returns: The integer value. Raises: ParseError: If an integer couldn't be consumed. """ if isinstance(value, float) and not value.is_integer(): raise ParseError('Couldn\'t parse integer: {0}.'.format(value)) if isinstance(value, six.text_type) and value.find(' ') != -1: raise ParseError('Couldn\'t parse integer: "{0}".'.format(value)) return int(value)
def deploy(self, task): """Perform a deployment to a node.""" manager_utils.node_power_action(task, states.REBOOT) if CONF.ansible.use_ramdisk_callback: return states.DEPLOYWAIT node = task.node ip_addr = _get_node_ip_dhcp(task) try: self._ansible_deploy(task, ip_addr) except Exception as e: error = _('Deploy failed for node %(node)s: ' 'Error: %(exc)s') % {'node': node.uuid, 'exc': six.text_type(e)} LOG.exception(error) deploy_utils.set_failed_state(task, error, collect_logs=False) else: self.reboot_to_instance(task) return states.DEPLOYDONE
def test__parse_root_device_hints_fail_advanced(self): hints = {"wwn": "s!= fake wwn", "size": ">= 12345", "name": "<or> spam <or> ham", "rotational": True} expected = {"wwn": "s!= fake%20wwn", "name": "<or> spam <or> ham", "size": ">= 12345"} props = self.node.properties props['root_device'] = hints self.node.properties = props self.node.save() with task_manager.acquire(self.context, self.node.uuid) as task: exc = self.assertRaises( exception.InvalidParameterValue, ansible_deploy._parse_root_device_hints, task.node) for key, value in expected.items(): self.assertIn(six.text_type(key), six.text_type(exc)) self.assertIn(six.text_type(value), six.text_type(exc))
def _params_to_urlencoded(params): """ Returns a application/x-www-form-urlencoded ``str`` representing the key/value pairs in ``params``. Keys are values are ``str()``'d before calling ``urllib.urlencode``, with the exception of unicode objects which are utf8-encoded. """ def encode(o): if isinstance(o, six.binary_type): return o else: if isinstance(o, six.text_type): return o.encode('utf-8') else: return str(o).encode('utf-8') utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)} return url_encode(utf8_params)
def test_unregistered_event(self): project = self.project # force creation url = '/plugins/github/organizations/{}/webhook/'.format( project.organization.id, ) secret = 'b3002c3e321d4b7880360d397db2ccfd' OrganizationOption.objects.set_value( organization=project.organization, key='github:webhook_secret', value=secret, ) response = self.client.post( path=url, data=PUSH_EVENT_EXAMPLE, content_type='application/json', HTTP_X_GITHUB_EVENT='UnregisteredEvent', HTTP_X_HUB_SIGNATURE='sha1=98196e70369945ffa6b248cf70f7dc5e46dff241', HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4()) ) assert response.status_code == 204
def test_invalid_signature_event(self): project = self.project # force creation url = '/plugins/github/organizations/{}/webhook/'.format( project.organization.id, ) secret = '2d7565c3537847b789d6995dca8d9f84' OrganizationOption.objects.set_value( organization=project.organization, key='github:webhook_secret', value=secret, ) response = self.client.post( path=url, data=PUSH_EVENT_EXAMPLE, content_type='application/json', HTTP_X_GITHUB_EVENT='push', HTTP_X_HUB_SIGNATURE='sha1=33521abeaaf9a57c2abf486e0ccd54d23cf36fec', HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4()) ) assert response.status_code == 401
def test_simple(self): url = '/plugins/github/installations/webhook/' response = self.client.post( path=url, data=INSTALLATION_EVENT_EXAMPLE, content_type='application/json', HTTP_X_GITHUB_EVENT='installation', HTTP_X_HUB_SIGNATURE='sha1=348e46312df2901e8cb945616ee84ce30d9987c9', HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4()) ) assert response.status_code == 204 assert Integration.objects.filter( provider='github_apps', external_id=2, name='octocat', ).exists()
def validate_config(self, organization, config, actor=None): """
if config['foo'] and not config['bar']: raise PluginError('You cannot configure foo with bar') return config ``` """ if config.get('name'): client = self.get_client(actor) try: repo = client.get_repo(config['name']) except Exception as e: self.raise_error(e) else: config['external_id'] = six.text_type(repo['id']) return config
```
def validate_config(self, organization, config, actor=None): if config.get('url'): client = self.get_client(actor) # parse out the repo name and the instance parts = urlparse(config['url']) instance = parts.netloc name = parts.path.rsplit('_git/', 1)[-1] project = config.get('project') or name try: repo = client.get_repo(instance, name, project) except Exception as e: self.raise_error(e, identity=client.auth) config.update({ 'instance': instance, 'project': project, 'name': repo['name'], 'external_id': six.text_type(repo['id']), 'url': repo['_links']['web']['href'], }) return config
def link_issue(self, request, group, form_data, **kwargs): comment = form_data.get('comment') if not comment: return _url = '%s/%s/comments' % (self.build_api_url(group, 'stories'), form_data['issue_id']) try: req = self.make_api_request(group.project, _url, json_data={"text": comment}) body = safe_urlread(req) except requests.RequestException as e: msg = six.text_type(e) raise PluginError('Error communicating with Pivotal: %s' % (msg, )) try: json_resp = json.loads(body) except ValueError as e: msg = six.text_type(e) raise PluginError('Error communicating with Pivotal: %s' % (msg, )) if req.status_code > 399: raise PluginError(json_resp['error'])
def get_event_from_url_params(self, group_id, event_id=None, slug_vars=None): if event_id is not None: try: event = Event.objects.get(pk=int(event_id)) except (ValueError, Event.DoesNotExist): return None group = event.group if six.text_type(group.id) != group_id: return None else: try: group = Group.objects.get(pk=int(group_id)) except (ValueError, Group.DoesNotExist): return None event = group.get_latest_event() event = self._ensure_and_bind_event(event) if event is None: return None if slug_vars is not None: if slug_vars['org_slug'] != group.organization.slug or \ slug_vars['proj_slug'] != group.project.slug: return None return event
def get(self, request, project, *args, **kwargs): queryset = project.hipchat_tenant_set.select_related('auth_user') return self.respond( [ { 'id': t.id, 'room': { 'id': t.room_id, 'name': t.room_name, 'owner': { 'id': t.room_owner_id, 'name': t.room_owner_name, }, 'homepage': t.homepage, }, 'authUser': { 'id': six.text_type(t.auth_user.id), 'name': t.auth_user.get_display_name(), 'username': t.auth_user.username, 'email': t.auth_user.email, } if t.auth_user else None, } for t in queryset ] )
def raise_error(self, exc, identity=None): if isinstance(exc, ApiUnauthorized): six.reraise( InvalidIdentity, InvalidIdentity(self.message_from_error(exc), identity=identity), sys.exc_info()[2] ) elif isinstance(exc, ApiError): six.reraise( PluginError, PluginError(self.message_from_error(exc)), sys.exc_info()[2] ) elif isinstance(exc, PluginError): raise else: self.logger.exception(six.text_type(exc)) six.reraise( PluginError, PluginError(self.message_from_error(exc)), sys.exc_info()[2] )
def csv_safe_unicode(row, ignoreErrors=True): """ Given an array of values, make sure all strings are unicode in Python 3 and str in Python 2. The csv.writer in Python 2 does not handle unicode strings and in Python 3 it does not handle byte strings. :param row: an array which could contain mixed types. :param ignoreErrors: if True, convert what is possible and ignore errors. If false, conversion errors will raise an exception. :returns: either the original array if safe, or an array with all byte strings converted to unicode in Python 3 or str in Python 2. """ # This is unicode in Python 2 and bytes in Python 3 wrong_type = six.text_type if str == six.binary_type else six.binary_type # If all of the data is not the wrong type, just return it as is. This # allows non-string types, such as integers and floats. if not any(isinstance(value, wrong_type) for value in row): return row # Convert the wrong type of string to the type needed for this version of # Python. For Python 2 unicode gets encoded to str (bytes). For Python 3 # bytes get decoded to str (unicode). func = 'encode' if str == six.binary_type else 'decode' row = [getattr(value, func)('utf8', 'ignore' if ignoreErrors else 'strict') if isinstance(value, wrong_type) else value for value in row] return row
def tokenize_text(string): """ Tokenize input text to paragraphs, sentences and words. Tokenization to paragraphs is done using simple Newline algorithm For sentences and words tokenizers above are used :param string: Text to tokenize :type string: str or unicode :return: text, tokenized into paragraphs, sentences and words :rtype: list of list of list of words """ string = six.text_type(string) rez = [] for part in string.split('\n'): par = [] for sent in tokenize_sents(part): par.append(tokenize_words(sent)) if par: rez.append(par) return rez
def add(self, offset, entity_type, entity_value): """ Adds a found entity to list. The `offset` parameter is used as key, so only one entity can start at the given offset. :param offset: Offset in extracted text where the found entity starts. :param entity_type: The type of entity that is found. :param entity_value: The found entity. :type start: ``int`` :type entity_type: ``unicode`` :type entity_value: ``unicode`` """ self.obj[offset] = { 'type': entity_type, 'value': entity_value, 'entity_id': re.sub(r'\s', '_', entity_value).lower() }
def set_id(self, data): """ Set ID of document. To allow multiple files with the same path, the digest of supplied data (e.g. first 4KB) is appended to the doc ID. :param data: Data to append to docuemnt path. :type data: Anything digestable by hashlib. """ digest = hashlib.md5() digest.update(self.path.encode('utf-8')) if isinstance(data, unicode): data = data.encode('utf-8') digest.update(data) self.docid = digest.hexdigest()
def _from_bytes(value): """Converts bytes to a string value, if necessary. Args: value: The string/bytes value to be converted. Returns: The original value converted to unicode (if bytes) or as passed in if it started out as unicode. Raises: ValueError if the value could not be converted to unicode. """ result = (value.decode('utf-8') if isinstance(value, six.binary_type) else value) if isinstance(result, six.text_type): return result else: raise ValueError('%r could not be converted to unicode' % (value,))
def request(self, uri, method='GET', body=None, headers=None, redirections=1, connection_type=None): resp, content = self._iterable.pop(0) if content == 'echo_request_headers': content = headers elif content == 'echo_request_headers_as_json': content = json.dumps(headers) elif content == 'echo_request_body': if hasattr(body, 'read'): content = body.read() else: content = body elif content == 'echo_request_uri': content = uri if isinstance(content, six.text_type): content = content.encode('utf-8') return httplib2.Response(resp), content
def _build_query(self, params): """Builds a query string. Args: params: dict, the query parameters Returns: The query parameters properly encoded into an HTTP URI query string. """ if self.alt_param is not None: params.update({'alt': self.alt_param}) astuples = [] for key, value in six.iteritems(params): if type(value) == type([]): for x in value: x = x.encode('utf-8') astuples.append((key, x)) else: if isinstance(value, six.text_type) and callable(value.encode): value = value.encode('utf-8') astuples.append((key, value)) return '?' + urlencode(astuples)
def exeute(self, method, value): str_convert_type = ( six.text_type, ipaddress.IPv4Address, ipaddress.IPv6Address) try: result = getattr( self.typeclass(value, self.strict_level), method)() if method == "validate": result = "NOP [#f1]_" elif isinstance(result, str_convert_type): result = '``"{}"``'.format(result) except TypeError: result = "E [#f2]_" except typepy.TypeConversionError: result = "E [#f3]_" return result
def prep_blob(self, blob): """Cleanup input.""" # remove empty lines if type(blob) == list: blob = [line for line in blob if line.strip() != ''] if len(blob) == 1: blob = blob[0].replace('\\n', '\n').split('\n') # Split by line if type(blob) == str or type(blob) == six.text_type: lines = blob.split('\n') elif type(blob) == list: if len(blob) == 1: lines = blob[0].split('\n') else: lines = [line.rstrip() for line in blob] else: message = "Unknown input format" log.debug("%s - '%s", message, blob) raise ParseException(message) return lines
def set_session_data(self, item_dict): """ Set a dictionary of items directly into the Django session. """ # Cookies don't work unless we visit a page first if not self._have_visited_page(): self.get_url('django_functest.emptypage') session = self._get_session() for name, value in item_dict.items(): session[name] = text_type(value) session.save() s2 = self._get_session() if all(s2.get(name) == text_type(value) for name, value in item_dict.items()): return raise RuntimeError("Session not saved correctly")