我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用types.StringTypes()。
def _send_data_part(data, connection): if isinstance(data, types.StringTypes): connection.send(data) return # Check to see if data is a file-like object that has a read method. elif hasattr(data, 'read'): # Read the file and send it a chunk at a time. while 1: binarydata = data.read(100000) if binarydata == '': break connection.send(binarydata) return else: # The data object was not a file. # Try to convert to a string and send the data. connection.send(str(data)) return
def _setTarget(self, address, prefix=None, filters=None): """Add (i.e. subscribe) a new OSCTarget, or change the prefix for an existing OSCTarget. - address ((host, port) tuple): IP-address & UDP-port - prefix (string): The OSC-address prefix prepended to the address of each OSCMessage sent to this OSCTarget (optional) """ if address not in self.targets.keys(): self.targets[address] = ["",{}] if prefix != None: if len(prefix): # make sure prefix starts with ONE '/', and does not end with '/' prefix = '/' + prefix.strip('/') self.targets[address][0] = prefix if filters != None: if type(filters) in types.StringTypes: (_, filters) = parseFilterStr(filters) elif type(filters) != types.DictType: raise TypeError("'filters' argument must be a dict with {addr:bool} entries") self._updateFilters(self.targets[address][1], filters)
def setOSCTarget(self, address, prefix=None, filters=None): """Add (i.e. subscribe) a new OSCTarget, or change the prefix for an existing OSCTarget. the 'address' argument can be a ((host, port) tuple) : The target server address & UDP-port or a 'host' (string) : The host will be looked-up - prefix (string): The OSC-address prefix prepended to the address of each OSCMessage sent to this OSCTarget (optional) """ if type(address) in types.StringTypes: address = self._searchHostAddr(address) elif (type(address) == types.TupleType): (host, port) = address[:2] try: host = socket.gethostbyname(host) except: pass address = (host, port) else: raise TypeError("'address' argument must be a (host, port) tuple or a 'host' string") self._setTarget(address, prefix, filters)
def delOSCTarget(self, address, prefix=None): """Delete the specified OSCTarget from the Client's dict. the 'address' argument can be a ((host, port) tuple), or a hostname. If the 'prefix' argument is given, the Target is only deleted if the address and prefix match. """ if type(address) in types.StringTypes: address = self._searchHostAddr(address) if type(address) == types.TupleType: (host, port) = address[:2] try: host = socket.gethostbyname(host) except socket.error: pass address = (host, port) self._delTarget(address, prefix)
def getOSCTarget(self, address): """Returns the OSCTarget matching the given address as a ((host, port), [prefix, filters]) tuple. 'address' can be a (host, port) tuple, or a 'host' (string), in which case the first matching OSCTarget is returned Returns (None, ['',{}]) if address not found. """ if type(address) in types.StringTypes: address = self._searchHostAddr(address) if (type(address) == types.TupleType): (host, port) = address[:2] try: host = socket.gethostbyname(host) except socket.error: pass address = (host, port) if (address in self.targets.keys()): try: (host, _, _) = socket.gethostbyaddr(host) except socket.error: pass return ((host, port), self.targets[address]) return (None, ['',{}])
def _build_selector(self, selector, cmd): if not isinstance(selector, StringTypes): self._selector = selector(self)._selector return self _selector = selector component = self # ??? for name, _component in self.components.items(): if _selector.startswith(name): _selector = _component.selector + _selector[len(name):] component = _component(self) break if not self._selector: if cmd: self._selector = "$(NODE).%s('%s')" % (cmd, _selector) else: self._selector = "$('%s')" % _selector else: self._selector += ".%s('%s')" % (cmd, _selector) # ??? return component
def getValueStrings( val, blnUgly=True ): #Used by joinWithComma function to join list items for SQL queries. #Expects to receive 'valid' types, as this was designed specifically for joining object attributes and nonvalid attributes were pulled. #If the default blnUgly is set to false, then the nonvalid types are ignored and the output will be pretty, but the SQL Insert statement will #probably be wrong. tplStrings = (types.StringType, types.StringTypes ) tplNums = ( types.FloatType, types.IntType, types.LongType, types.BooleanType ) if isinstance( val, tplNums ): return '#num#'+ str( val ) + '#num#' elif isinstance( val, tplStrings ): strDblQuote = '"' return strDblQuote + val + strDblQuote else: if blnUgly == True: return "Error: nonconvertable value passed - value type: %s" % type(val ) else: return None
def scopes_to_string(scopes): """Converts scope value to a string. If scopes is a string then it is simply passed through. If scopes is an iterable then a string is returned that is all the individual scopes concatenated with spaces. Args: scopes: string or iterable of strings, the scopes. Returns: The scopes formatted as a single string. """ if isinstance(scopes, types.StringTypes): return scopes else: return ' '.join(scopes)
def is_ip4(ip): if type(ip) not in types.StringTypes: return False ip = ip.split('.') for s in ip: if not s.isdigit(): return False i = int(s) if i < 0 or i > 255: return False return len(ip) == 4
def parse_lock_data(data_str): """ Parse string generated by lock_data() """ node_id, ip, process_id = (data_str.split('-') + ([None] * 3))[:3] if type(process_id) in types.StringTypes and process_id.isdigit(): process_id = int(process_id) else: process_id = None return { 'node_id': node_id, 'ip': ip, 'process_id': process_id, }
def __call__(self, context): if isinstance(self._object, types.StringTypes): self._object = context[self._object] # get dialect object if isinstance(self._dialect, types.StringTypes): dialect = csv.get_dialect(self._dialect) if self._path.startswith("memory:"): buffer_ = StringIO.StringIO() self._write_object(buffer_, dialect) buffer_.seek(0) context[self._path[len("memory:"):]] = buffer_ else: with open(self._path, "w") as f: self._write_object(f, dialect) return self._path
def validate_config(self, config): """????????????????""" if self._validated: return for section in config.prefix("smtp_"): smtp_config_items = config[section] for rule in SMTPManager.config_rules: item_value = smtp_config_items.get(rule.name) rule.validate(item_value) if item_value is None: smtp_config_items[rule.name] = rule.default if rule.name == "port" and isinstance( item_value, types.StringTypes): smtp_config_items["port"] = int(item_value) smtp_config = SMTPConfig(**smtp_config_items) self._all_smtp_config[section] = smtp_config self._validated = True
def __init__(self, attachment_file, mime_type, attachment_filename=None): """ :param attachment_file ?????????????file????StringIO??? ???????????????????? :param mime_type ???mime type???application/octet-stream :param attachment_filename ????????? """ if attachment_filename is None: if isinstance(attachment_file, types.StringTypes): self._attachment_filename = os.path.split( attachment_file)[1] elif isinstance(attachment_file, types.FileType): self._attachment_filename = os.path.split( attachment_file.name)[1] else: raise InvalidArgumentException( u"????attachement_filename?????????")
def _build_query(self, engine, session, query_items): query = session.query(*query_items) if self._query is not None: if isinstance(self._query, types.StringTypes): query = query.filter(text(self._query)) else: query = query.filter(self._query) if self._order_by is not None: query = query.order_by(self._order_by) if self._group_by is not None: if isinstance(self._group_by, types.StringTypes): self._group_by = self.automap(engine, self._group_by) query = query.group_by(self._group_by) if self._params is not None: query = query.params(**self._params) return query
def __call__(self, context): global _engine_manager engine_container = _engine_manager.engine(self._engine_name) session = engine_container.session() if isinstance(self._sql, types.StringTypes) and \ _SELECT_STATEMENT_REGEX.search(self._sql): return self._execute_select_statement(session, context) else: # ????? try: if isinstance(self._sql, types.StringTypes): session.execute(self._sql, self._params) # ???? elif isinstance(self._sql, SequenceCollectionType): if isinstance(self._params, SequenceCollectionType): for idx, sql in enumerate(self._sql): session.execute(sql, self._params[idx]) else: for sql in self._sql: session.execute(sql) session.commit() finally: session.close()
def _handle_titles(self, titles, auto_title_name): """?Title????????????title ???Title????????????: (Title("id", u"??"), Title("name", u"??"), Title("grade", u"??")) ???????????: ("id", u"??", "name", u"??", "grade", u"??") """ first_ele = titles[0] if isinstance(first_ele, Title): return titles elif isinstance(first_ele, types.StringTypes): if auto_title_name: return [Title("field_%d" % idx, arg) for idx, arg in enumerate(titles)] else: return [Title(*arg) for arg in zip(titles[::2], titles[1::2])] else: raise InvalidTypeException(u"title???????????Title??")
def __init__(self, url, fileOrName, method='GET', postdata=None, headers=None, agent="Twisted client", supportPartial=0): self.requestedPartial = 0 if isinstance(fileOrName, types.StringTypes): self.fileName = fileOrName self.file = None if supportPartial and os.path.exists(self.fileName): fileLength = os.path.getsize(self.fileName) if fileLength: self.requestedPartial = fileLength if headers == None: headers = {} headers["range"] = "bytes=%d-" % fileLength else: self.file = fileOrName HTTPClientFactory.__init__(self, url, method=method, postdata=postdata, headers=headers, agent=agent) self.deferred = defer.Deferred() self.waiting = 1
def _render_without_enums(self, rendered_by_wraper): """""" _ksn = self._replace_table_without_enum.values() _ksn = tuple(_ksn) for i in self._replace_table_without_enum: _target = self._replace_table_without_enum[i].value if not isinstance(_target, types.StringTypes): _target = str(_target) _tag = self._replace_table_without_enum[i] if self._wraper_flag[_tag]: rendered_by_wraper = rendered_by_wraper.replace(i, self._wrap(_target)) else: rendered_by_wraper = rendered_by_wraper.replace(i, _target) return rendered_by_wraper #----------------------------------------------------------------------
def fmtstr(self, *fmt): str = '' encoding = 'utf8'#??utf8?? for i in fmt: if not type(i) in [types.UnicodeType, types.StringTypes, types.StringType]: s= repr(i) else: s = i if type(s) == type(u''): str += s.encode(encoding) else: str += s str += '.' #str += '/n' #print 'fmtstr:'+str return str
def expect_exact(self, pattern_list, timeout = -1, searchwindowsize = -1): """This is similar to expect(), but uses plain string matching instead of compiled regular expressions in 'pattern_list'. The 'pattern_list' may be a string; a list or other sequence of strings; or TIMEOUT and EOF. This call might be faster than expect() for two reasons: string searching is faster than RE matching and it is possible to limit the search to just the end of the input buffer. This method is also useful when you don't want to have to worry about escaping regular expression characters that you want to match.""" if type(pattern_list) in types.StringTypes or pattern_list in (TIMEOUT, EOF): pattern_list = [pattern_list] return self.expect_loop(searcher_string(pattern_list), timeout, searchwindowsize)
def __init__(self, depends_on, encoder=ENC_BITS_DEFAULT, fuzzable=True, name=None): ''' :param depends_on: (name of) field we depend on :type encoder: :class:`~kitty.model.low_level.encoder.BitsEncoder` :param encoder: encoder for the field :param fuzzable: is container fuzzable :param name: (unique) name of the container ''' self._rendered_field = None self.dependency_type = Calculated.VALUE_BASED super(Calculated, self).__init__(value=self.__class__._default_value_, encoder=encoder, fuzzable=fuzzable, name=name) if isinstance(depends_on, types.StringTypes): self._field_name = depends_on self._field = None elif isinstance(depends_on, BaseField): self._field_name = None self._field = depends_on else: raise KittyException('depends_on parameter (%s) is neither a string nor a valid field' % depends_on)
def __init__(self, depends_on, func, encoder=ENC_STR_DEFAULT, fuzzable=False, name=None): ''' :param depends_on: (name of) field we depend on :param func: function for processing of the dependant data. func(str)->str :type encoder: :class:`~kitty.model.low_level.encoder.StrEncoder` :param encoder: encoder for the field (default: ENC_STR_DEFAULT) :param fuzzable: is container fuzzable :param name: (unique) name of the container ''' try: res = func('') kassert.is_of_types(res, types.StringTypes) self._func = func except: raise KittyException('func should be func(str)->str') super(CalculatedStr, self).__init__(depends_on=depends_on, encoder=encoder, fuzzable=fuzzable, name=name)
def __init__(self, value, num_bits=1, fuzzable=True, name=None): ''' :param value: value to mutate (str) :param num_bits: number of consequtive bits to flip (invert) :param fuzzable: is field fuzzable (default: True) :param name: name of the object (default: None) :raises: ``KittyException`` if num_bits is bigger than the value length in bits :raises: ``KittyException`` if num_bits is not positive ''' kassert.is_of_types(value, types.StringTypes) if len(value) * 8 < num_bits: raise KittyException('len of value in bits(%d) < num_bits(%d)' % (len(value) * 8, num_bits)) if num_bits <= 0: raise KittyException('num_bits(%d) <= 0' % (num_bits)) super(BitFlip, self).__init__(value=Bits(bytes=value), encoder=ENC_BITS_DEFAULT, fuzzable=fuzzable, name=name) self._data_len = len(value) * 8 self._num_bits = num_bits self._num_mutations = self._data_len - (num_bits - 1)
def type_check(tags): """Perform a type check on a list of tags""" if type(tags) in (types.ListType, types.TupleType): single = False elif tags == None: tags = [] single = False else: tags = [tags] single = True if len([t for t in tags if type(t) not in types.StringTypes]) == 0: valid = True else: valid = False return tags, single, valid
def __init__(self, collection_names, collection_labels=None, file_names=None, field_names=None): # load the collections if isinstance(collection_names,types.StringTypes): collection_names = [ collection_names ] if collection_labels is None: collection_labels = [ None ] * len(collection_names) self._collections = [ Collection(name,label,file_names,field_names) \ for name,label in zip(collection_names,collection_labels) ] # find the set of common galaxies and the set of common properties if len(self._collections) > 1: self._ids = sorted(reduce(lambda x,y: x&y, [ c.galaxy_ids() for c in self._collections ])) self._props = reduce(lambda x,y: x&y, [ c.property_names() for c in self._collections ]) else: self._ids = sorted(self._collections[0].galaxy_ids()) self._props = self._collections[0].property_names() # print the number of common galaxies print "Loaded a set of {} collections with {} common galaxies and {} common properties" \ .format(len(self._collections), len(self._ids), len(self._props)) ## This function returns a two-dimensional array with the values of the specified property for all common galaxies # in all collections of the set. The index on the first axis iterates over the collections, the index on the last # axis iterates over the galaxies, in order of increasing galaxy id.
def p_prop(self, p): '''prop : prop_name operator prop_value ''' if p[1].value.upper() in self.INT_TYPE_PROPNAMES: if p[2].value == '~=': self._error('"%s"???????"~="???'%(p[1].value), p[2], p[2].lexpos) if not isinstance(p[3].value, types.IntType): try: p[3].value = int(p[3].value) except ValueError: self._error('"%s"??????"%s"??????int??'%(p[1].value, type(p[3].value)), p[3], p[3].lexpos) if p[1].value.upper() == 'MAXDEPTH': if p[3].value <= 0: self._error("MaxDepth?????>0", p[3], p[3].lexpos) elif p[2].value == '~=': if not isinstance(p[3].value, types.StringTypes): self._error('???"~="?????"%s"?????'%(type(p[3].value)), p[2], p[2].lexpos) p[0] = UIObjectProperty(p[1], p[2], p[3])
def matching_files(root_path, file_patterns, test_file_path=None): """ _matching_files returns all files in the root path that match the provided patterns. :param file_patterns: the iterable which contains the file name patterns :param root_path: is the root path in which should be searched :return matching files in a set: """ s = [] if isinstance(file_patterns, types.StringTypes): file_patterns = [file_patterns] if not test_file_path: test_file_path = "" elif test_file_path[-1:] == "/": test_file_path = test_file_path[:-1] if not root_path: root_path = os.path.abspath(os.curdir) for dirpath, _, filenames in os.walk(root_path): if test_file_path in dirpath: for f in filenames: for p in file_patterns: if fnmatch.fnmatch(f, p): s.append(dirpath + '/' + f) return frozenset(s)
def assertMultiLineEqual(self, first, second, msg=None): """Assert that two multi-line strings are equal.""" assert isinstance(first, types.StringTypes), ( 'First argument is not a string: %r' % (first,)) assert isinstance(second, types.StringTypes), ( 'Second argument is not a string: %r' % (second,)) if first == second: return if msg: failure_message = [msg, ':\n'] else: failure_message = ['\n'] for line in difflib.ndiff(first.splitlines(True), second.splitlines(True)): failure_message.append(line) if not line.endswith('\n'): failure_message.append('\n') raise self.failureException(''.join(failure_message))
def validate_request(request): if not isinstance(request, dict): fault = Fault( -32600, 'Request must be {}, not %s.' % type(request) ) return fault rpcid = request.get('id', None) version = get_version(request) if not version: fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid) return fault request.setdefault('params', []) method = request.get('method', None) params = request.get('params') param_types = (types.ListType, types.DictType, types.TupleType) if not method or type(method) not in types.StringTypes or \ type(params) not in param_types: fault = Fault( -32600, 'Invalid request parameters or method.', rpcid=rpcid ) return fault return True
def __init__(self, name, attribute, value, fuzz_attribute=False, fuzz_value=True): ''' :param name: name of the block :param attribute: attribute :type value: str/unicode/int :param value: value of the attribute :param fuzz_attribute: should we fuzz the attribute field (default: False) :param fuzz_value: should we fuzz the value field (default: True) ''' _check_type(attribute, StringTypes, 'attribute') _check_type(value, StringTypes + (IntType, ), 'value') value_name = _valuename(name) if isinstance(value, StringTypes): value_field = String(value, name=value_name, fuzzable=fuzz_value) else: value_field = SInt32(value, encoder=ENC_INT_DEC, fuzzable=fuzz_value, name=value_name) fields = [ String(attribute, fuzzable=fuzz_attribute, name='%s_attribute' % name), Static('='), Static('"'), value_field, Static('"') ] super(XmlAttribute, self).__init__(fields, name=name)
def _PrepareCommand(command): """Transform a command to list format.""" if isinstance(command, types.StringTypes): # Need to account for the fact that shlex escapes backslashes when parsing # in Posix mode. if _IsOnWindows(): command = command.replace(os.sep, os.sep + os.sep) return shlex.split(command, comments=True) if isinstance(command, tuple) or isinstance(command, list): return list(command) raise error.SDKError( 'Command [{cmd}] must be a string, list or tuple.'.format(cmd=command)) # TODO(magimaster): Windows. # TODO(magimaster): Verify that things are cleaned up if something here fails.
def similarity(self, v1, v2): """Return cosine similarity of given words or vectors. If v1/v2 is a string, look up the corresponding word vector. This is not particularly efficient function. Instead of many invocations, consider word_similarity() or direct computation. """ vs = [v1, v2] for i, v in enumerate(vs): if isinstance(v, StringTypes): v = self.word_to_unit_vector(v) else: v = v/numpy.linalg.norm(v) # costly but safe vs[i] = v return numpy.dot(vs[0], vs[1])
def __init__(self, url, fileOrName, method='GET', postdata=None, headers=None, agent="Twisted client", supportPartial=0, timeout=0, cookies=None, followRedirect=1, redirectLimit=20, afterFoundGet=False): self.requestedPartial = 0 if isinstance(fileOrName, types.StringTypes): self.fileName = fileOrName self.file = None if supportPartial and os.path.exists(self.fileName): fileLength = os.path.getsize(self.fileName) if fileLength: self.requestedPartial = fileLength if headers == None: headers = {} headers["range"] = "bytes=%d-" % fileLength else: self.file = fileOrName HTTPClientFactory.__init__( self, url, method=method, postdata=postdata, headers=headers, agent=agent, timeout=timeout, cookies=cookies, followRedirect=followRedirect, redirectLimit=redirectLimit, afterFoundGet=afterFoundGet)
def to_xml(self): """Convert CORS object into XML string representation.""" s = '<' + CORS_CONFIG + '>' for collections in self.cors: s += '<' + CORS + '>' for (collection, elements_or_value) in collections: assert collection is not None s += '<' + collection + '>' # If collection elements has type string, append atomic value, # otherwise, append sequence of values in named tags. if isinstance(elements_or_value, types.StringTypes): s += elements_or_value else: for (name, value) in elements_or_value: assert name is not None assert value is not None s += '<' + name + '>' + value + '</' + name + '>' s += '</' + collection + '>' s += '</' + CORS + '>' s += '</' + CORS_CONFIG + '>' return s
def __sanity(self, query, sections): dbg = _debug('_dnsquery::__sanity') addr, qtype, qclass = query if not isinstance(addr, types.StringTypes): raise ValueError('Invalid name %s' % str(addr)) if qtype == 0 or not DNS_TYPE.has_key(qtype): raise ValueError('Invalid type %u' % qtype) if qclass == 0 or not DNS_CLASS.has_key(qclass): raise ValueError('Invalid class %u' % qclass) self._query = query if not sections: return sections = self.__normalize(sections) for k in ['AUTHORITY', 'ADDITIONAL']: if sections.has_key(k): v = sections[k] if not (isinstance(v, types.ListType) or isinstance(v, types.TupleType)): raise ValueError('%s format error' % k) self._sections[k] = v