我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用time.split()。
def parse_qs(qs, keep_blank_values=0, strict_parsing=0, unquote=unquote): """like cgi.parse_qs, only with custom unquote function""" d = {} items = [s2 for s1 in qs.split("&") for s2 in s1.split(";")] for item in items: try: k, v = item.split("=", 1) except ValueError: if strict_parsing: raise continue if v or keep_blank_values: k = unquote(k.replace("+", " ")) v = unquote(v.replace("+", " ")) if k in d: d[k].append(v) else: d[k] = [v] return d
def lineReceived(self, line): if self.firstLine: self.firstLine = 0 l = line.split(None, 2) version = l[0] status = l[1] try: message = l[2] except IndexError: # sometimes there is no message message = "" self.handleStatus(version, status, message) return if line: key, val = line.split(':', 1) val = val.lstrip() self.handleHeader(key, val) if key.lower() == 'content-length': self.length = int(val) else: self.__buffer = StringIO() self.handleEndHeaders() self.setRawMode()
def _authorize(self): # Authorization, (mostly) per the RFC try: authh = self.getHeader("Authorization") if not authh: self.user = self.password = '' return bas, upw = authh.split() if bas.lower() != "basic": raise ValueError upw = base64.decodestring(upw) self.user, self.password = upw.split(':', 1) except (binascii.Error, ValueError): self.user = self.password = "" except: log.err() self.user = self.password = ""
def getMostValueableStockList(self): '''????????''' page = 1 cList = [] while True: res = getHtmlFromUrl(mostValueableStockUrl % page) if res:page += 1 companyListObj = getJsonObj(res) if companyListObj: list = companyListObj['Results'] if list and len(list): for item in list: stockInfo = item.split(',') jzcsyl = str(float(stockInfo[5].split('(')[0]) * 100) + '%' fhlrzzl = str(float(stockInfo[3].split('(')[0]) * 100) + '%' orgCount = stockInfo[4].split('(')[0] sz = str(int(float(stockInfo[6])/10000/10000)) cinfo = MostValueableCompanyInfo(stockInfo[1],stockInfo[2],jzcsyl,fhlrzzl,orgCount,sz) cList.append(cinfo) if len(list) < pageSize:break #??????????????????????? if int(companyListObj['PageCount']) < page:break else:break else:break return cList
def parse_time(date, time): current_time = datetime.datetime.now(timezone) if date == '???????': date = current_time.date() elif date == '?????': date = (current_time - datetime.timedelta(days=1)).date() else: date = date.split(' ') day = int(date[0]) if len(date) == 3: year = int(date[2]) else: year = current_time.year month = date[1].lower() for i, name in enumerate(months): if name.startswith(month): month = i + 1 break date = datetime.date(year=year, day=day, month=month) time = [int(i) for i in time.split(':')] time = datetime.time(hour=time[0], minute=time[1]) return datetime.datetime.combine(date=date, time=time)
def extract_consume_per_person(file_name, consume_dict): lines = open(file_name).readlines() for line in lines: temps = line.strip("\r\n").split("$") id = temps[0] totol_amount = 0 active_date_set = set() for i in range(1, len(temps)): records = temps[i].split(",") cate = records[0].strip("\"") amount = float(records[4].strip("\"")) time = records[3].strip("\"") date = time.split(" ")[0] active_date_set.add(date) if cate == "POS??": totol_amount += amount consume_dict[id] = float(totol_amount) / len(active_date_set)
def extract_rank_feature(file_name, final_rank, score_dict, if_train): if if_train: w = open("../original_data/rank_feature_train.txt", 'w') else: w = open("../original_data/rank_feature_test.txt", 'w') lines = open(file_name).readlines() for line in lines: if if_train: id = line.strip().split(",")[0] else: id = line.strip() print id w.write("{") w.write('"stuId": ' + id + ", ") if score_dict.has_key(id) and final_rank.has_key(id): w.write('"rank_in_faculty":' + str(final_rank[id]) + "," + '"rank_score_consume":' + str( final_rank[id] * score_dict[id]) + "} \n") else: w.write( '"rank_in_faculty":' + str(final_rank.get(id, -999)) + "," + '"rank_score_consume":' + str( -999) + "} \n") w.close()
def extract_stations(self, page): r = re.compile(r'station\(([^)]+)\)') stations = [] for str in r.findall(page): toks = str.replace("'",'').replace(',,',',').split(',') if toks[0] == 'name': continue st = Station( station=toks[0], network=toks[1], dist=float(toks[2]), azimuth=float(toks[3]), channels=toks[4:-1], snr=float(toks[-1]) ) stations.append(st) return stations
def readEvent(EventPath): Origin = {} logger.info('\033[31m Parsing EventFile \033[0m \n') try: for i in os.listdir(EventPath): if fnmatch.fnmatch(i, '*.origin'): evfile = os.path.join(EventPath,i) fobj = open(evfile, 'r') for line in fobj: line = str.split(line, '=') key = (line[0].replace('\'', '')).strip() value = line[1].replace('\n','').strip() Origin[key] = value except: logger.info('\033[31m File Error Exception \033[0m \n') return Origin
def readConfig(EventPath): Config = {} logger.info('\033[31m Parsing ConfigFile \033[0m \n') try: for i in os.listdir(EventPath): if fnmatch.fnmatch(i, '*.config'): evfile = os.path.join(EventPath,i) fobj = open(evfile, 'r') for line in fobj: if line[0] != '#' and len(line) > 1: line = str.split(line, '=') key = (line[0].replace('\'', '')).strip() value = line[1].replace('\n','').strip() Config[key] = value except: logger.info('\033[31m File Error Exception \033[0m \n') return Config
def parse_qs(qs, keep_blank_values=0, strict_parsing=0): """ Like C{cgi.parse_qs}, but with support for parsing byte strings on Python 3. @type qs: C{bytes} """ d = {} items = [s2 for s1 in qs.split(b"&") for s2 in s1.split(b";")] for item in items: try: k, v = item.split(b"=", 1) except ValueError: if strict_parsing: raise continue if v or keep_blank_values: k = unquote(k.replace(b"+", b" ")) v = unquote(v.replace(b"+", b" ")) if k in d: d[k].append(v) else: d[k] = [v] return d
def fromChunk(data): """ Convert chunk to string. @type data: C{bytes} @return: tuple of (result, remaining) - both C{bytes}. @raise ValueError: If the given data is not a correctly formatted chunked byte string. """ prefix, rest = data.split(b'\r\n', 1) length = int(prefix, 16) if length < 0: raise ValueError("Chunk length must be >= 0, not %d" % (length,)) if rest[length:length + 2] != b'\r\n': raise ValueError("chunk must end with CRLF") return rest[:length], rest[length + 2:]
def parseContentRange(header): """ Parse a content-range header into (start, end, realLength). realLength might be None if real length is not known ('*'). """ kind, other = header.strip().split() if kind.lower() != "bytes": raise ValueError("a range of type %r is not supported") startend, realLength = other.split("/") start, end = map(int, startend.split("-")) if realLength == "*": realLength = None else: realLength = int(realLength) return (start, end, realLength)
def parseCookies(self): """ Parse cookie headers. This method is not intended for users. """ cookieheaders = self.requestHeaders.getRawHeaders(b"cookie") if cookieheaders is None: return for cookietxt in cookieheaders: if cookietxt: for cook in cookietxt.split(b';'): cook = cook.lstrip() try: k, v = cook.split(b'=', 1) self.received_cookies[k] = v except ValueError: pass
def getRequestHostname(self): """ Get the hostname that the user passed in to the request. This will either use the Host: header (if it is available) or the host we are listening on if the header is unavailable. @returns: the requested hostname @rtype: C{bytes} """ # XXX This method probably has no unit tests. I changed it a ton and # nothing failed. host = self.getHeader(b'host') if host: return host.split(b':', 1)[0] return networkString(self.getHost().host)
def _authorize(self): # Authorization, (mostly) per the RFC try: authh = self.getHeader(b"Authorization") if not authh: self.user = self.password = '' return bas, upw = authh.split() if bas.lower() != b"basic": raise ValueError() upw = base64.decodestring(upw) self.user, self.password = upw.split(b':', 1) except (binascii.Error, ValueError): self.user = self.password = "" except: log.err() self.user = self.password = ""
def _srtToAtoms(self, srtText): subAtoms = [] srtText = srtText.replace('\r\n', '\n').split('\n\n') line = 0 for idx in range(len(srtText)): line += 1 st = srtText[idx].split('\n') #printDBG("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++") #printDBG(st) #printDBG("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++") if len(st)>=2: try: try: tmp = int(st[0].strip()) i = 1 except Exception: if '' == st[0]: i = 1 else: i = 0 if len(st)<(i+2): continue split = st[i].split(' --> ') subAtoms.append( { 'start':self._srtTc2ms(split[0].strip()), 'end':self._srtTc2ms(split[1].strip()), 'text':self._srtClearText('\n'.join(j for j in st[i+1:len(st)])) } ) except Exception: printExc("Line number [%d]" % line) return subAtoms
def test_use_cache(): """With cached files this should come back in under a second""" # Generate cached files cmd_list = [NETMIKO_GREP] + ['interface', 'all'] subprocess_handler(cmd_list) cmd_list = [NETMIKO_GREP] + ['--use-cache', '--display-runtime', 'interface', 'all'] (output, std_err) = subprocess_handler(cmd_list) match = re.search(r"Total time: (0:.*)", output) time = match.group(1) _, _, seconds = time.split(":") seconds = float(seconds) assert seconds <= 1 assert 'pynet_rtr1.txt:interface FastEthernet0' in output
def test_display_failed(): """Verify failed devices are showing""" cmd_list = [NETMIKO_GREP] + ['interface', 'all'] (output, std_err) = subprocess_handler(cmd_list) assert "Failed devices" in output failed_devices = output.split("Failed devices:")[1] failed_devices = failed_devices.strip().split("\n") failed_devices = [x.strip() for x in failed_devices] assert len(failed_devices) == 2 assert "bad_device" in failed_devices assert "bad_port" in failed_devices
def fromChunk(data): """Convert chunk to string. @returns: tuple (result, remaining), may raise ValueError. """ prefix, rest = data.split('\r\n', 1) length = int(prefix, 16) if length < 0: raise ValueError("Chunk length must be >= 0, not %d" % (length,)) if not rest[length:length + 2] == '\r\n': raise ValueError, "chunk must end with CRLF" return rest[:length], rest[length + 2:]
def parseContentRange(header): """Parse a content-range header into (start, end, realLength). realLength might be None if real length is not known ('*'). """ kind, other = header.strip().split() if kind.lower() != "bytes": raise ValueError, "a range of type %r is not supported" startend, realLength = other.split("/") start, end = map(int, startend.split("-")) if realLength == "*": realLength = None else: realLength = int(realLength) return (start, end, realLength)
def setLastModified(self, when): """Set the X{Last-Modified} time for the response to this request. If I am called more than once, I ignore attempts to set Last-Modified earlier, only replacing the Last-Modified time if it is to a later value. If I am a conditional request, I may modify my response code to L{NOT_MODIFIED} if appropriate for the time given. @param when: The last time the resource being returned was modified, in seconds since the epoch. @type when: number @return: If I am a X{If-Modified-Since} conditional request and the time given is not newer than the condition, I return L{http.CACHED<CACHED>} to indicate that you should write no body. Otherwise, I return a false value. """ # time.time() may be a float, but the HTTP-date strings are # only good for whole seconds. when = long(math.ceil(when)) if (not self.lastModified) or (self.lastModified < when): self.lastModified = when modified_since = self.getHeader('if-modified-since') if modified_since: modified_since = stringToDatetime(modified_since.split(';', 1)[0]) if modified_since >= when: self.setResponseCode(NOT_MODIFIED) return CACHED return None
def setETag(self, etag): """Set an X{entity tag} for the outgoing response. That's \"entity tag\" as in the HTTP/1.1 X{ETag} header, \"used for comparing two or more entities from the same requested resource.\" If I am a conditional request, I may modify my response code to L{NOT_MODIFIED} or L{PRECONDITION_FAILED}, if appropriate for the tag given. @param etag: The entity tag for the resource being returned. @type etag: string @return: If I am a X{If-None-Match} conditional request and the tag matches one in the request, I return L{http.CACHED<CACHED>} to indicate that you should write no body. Otherwise, I return a false value. """ if etag: self.etag = etag tags = self.getHeader("if-none-match") if tags: tags = tags.split() if (etag in tags) or ('*' in tags): self.setResponseCode(((self.method in ("HEAD", "GET")) and NOT_MODIFIED) or PRECONDITION_FAILED) return CACHED return None
def getRequestHostname(self): """Get the hostname that the user passed in to the request. This will either use the Host: header (if it is available) or the host we are listening on if the header is unavailable. """ return (self.getHeader('host') or socket.gethostbyaddr(self.getHost()[1])[0] ).split(':')[0]
def headerReceived(self, line): """Do pre-processing (for content-length) and store this header away. """ header, data = line.split(':', 1) header = header.lower() data = data.strip() if header == 'content-length': self.length = int(data) reqHeaders = self.requests[-1].received_headers reqHeaders[header] = data if len(reqHeaders) > self.maxHeaders: self.transport.write("HTTP/1.1 400 Bad Request\r\n\r\n") self.transport.loseConnection()
def checkPersistence(self, request, version): """Check if the channel should close or not.""" connection = request.getHeader('connection') if connection: tokens = map(str.lower, connection.split(' ')) else: tokens = [] # HTTP 1.0 persistent connection support is currently disabled, # since we need a way to disable pipelining. HTTP 1.0 can't do # pipelining since we can't know in advance if we'll have a # content-length header, if we don't have the header we need to close the # connection. In HTTP 1.1 this is not an issue since we use chunked # encoding if content-length is not available. #if version == "HTTP/1.0": # if 'keep-alive' in tokens: # request.setHeader('connection', 'Keep-Alive') # return 1 # else: # return 0 if version == "HTTP/1.1": if 'close' in tokens: request.setHeader('connection', 'close') return 0 else: return 1 else: return 0
def getJsonObj(obj): if not obj:return None if hasHTML(obj):return None #"var moJuuzHq="{"Results":["2,300672,???,?","2,300676,????,?","1,603612,????,?","1,603707,????,?","2,002888,????,?","2,300678,????,?","2,002889,????,?","1,603860,????,?","2,300685,????,?","2,300687,????,?","1,603880,????,?","2,300689,????,?","1,603602,????,?","2,300688,????,?","1,603721,????,?","2,300691,????,?","1,601326,????,?","1,603776,???,?","2,002892,???,?","1,603129,????,?","1,603557,????,?"],"AllCount":"21","PageCount":"1","AtPage":"1","PageSize":"40","ErrMsg":"","UpdateTime":"2017/8/19 13:37:03","TimeOut":"3ms"}" # newobj = obj.split('=')[1] #//???????= ?? # return simplejson.loads(newobj) newobj = "{" + obj.split('={')[1] return simplejson.loads(newobj)
def getJsonObj2(obj): if not obj: return None if hasHTML(obj): return None partern = re.compile("data:.*?\"]") list = re.findall(partern, obj) if list and len(list) > 0: s = list[0] sepString = s.split(':')[1] return simplejson.loads(sepString) else: return None
def getJsonObj3(obj): if not obj: return None if hasHTML(obj): return None partern = re.compile("data:.*?\"]") list = re.findall(partern, obj) if list and len(list) > 0: s = list[0] sepString = s.split(':[')[1] return simplejson.loads('[' + sepString) else: return None
def getJsonObj4(obj): if not obj: return None if hasHTML(obj): return None partern = re.compile("rank:.*?\"]") list = re.findall(partern, obj) if list and len(list) > 0: s = list[0] sepString = s.split(':[')[1] return simplejson.loads('[' + sepString) else: return None
def getJsonObj5(obj): if not obj: return None if hasHTML(obj): return None partern = re.compile("\"Value\":.*?\"]") list = re.findall(partern, obj) if list and len(list) > 0: s = list[0] sepString = s.split(':[')[1] return simplejson.loads('[' + sepString) else: return None
def getThreeDaysMaxStockList(self): '''???????''' res = getHtmlFromUrl(threeDaysMaxPriceUrl) companyListObj = getJsonObj(res) if companyListObj: list = companyListObj['Results'] cList = [] if list and len(list): for item in list: stockInfo = item.split(',') cinfo = CompanyInfo(stockInfo[1],stockInfo[2]) cList.append(cinfo) return cList return None
def getFiveDaysMaxStockList(self): '''??5????''' res = getHtmlFromUrl(fiveDaysMaxPriceUrl) companyListObj = getJsonObj(res) if companyListObj: list = companyListObj['Results'] cList = [] if list and len(list): for item in list: stockInfo = item.split(',') cinfo = CompanyInfo(stockInfo[1],stockInfo[2]) cList.append(cinfo) return cList return None
def getRcommandRankList(self): '''????????''' res = getHtmlFromUrl(reommendRankUrl) companyListObj = getJsonObj(res) if companyListObj: list = companyListObj['data'] cList = [] if list and len(list): for item in list: li = item.split(',') info = CompanyRecommandRankInfo(li[1], li[2],li[5],li[6], li[7]) cList.append(info) return cList return None
def getStockNameFromCode(self,code): res = getHtmlFromUrl(companyNameUrl % code,utf8coding=True) pa = re.compile('=.*?;') li = re.findall(pa,res) if li and len(li): s = li[0] ret = (s[1:-1]).split(',') if ret and len(ret): return ret[4] else: return None
def getGoodFundList(self): fl = [] res = getHtmlFromUrl(goodFundUrl) fundList = getJsonObj7(res) if fundList: for item in fundList['datas']: fundArray = item.split(',') m = FundDetailModel(fundArray[0],fundArray[1],fundArray[3], fundArray[5],fundArray[6],fundArray[7],fundArray[8],fundArray[4],fundArray[11]) fl.append(m) return fl
def getStockPriceEachMonth(self,code,onlyYears=False): '''??????''' res = getHtmlFromUrl(companyStockPriceEachMonth % (code+getMarketId(code))) if not res: return None if hasHTML(res): return None partern = re.compile("\({\"name\":.*?}\)") rel = re.findall(partern, res) mList = [] if rel and len(rel) > 0: s = rel[0] m = simplejson.loads(s[1:-1]) if m: plist = m['data'] if plist and len(plist): for i in plist: parray = i.split(',') time = parray[0] if onlyYears: if time and getFloatFromString(time.split('-')[1]) == 12: pmodel = StockEachMonthInfo(code,m['name'],parray[0],parray[1],parray[2],parray[3],parray[4]) mList.append(pmodel) else:continue else: pmodel = StockEachMonthInfo(code, m['name'], parray[0], parray[1], parray[2], parray[3],parray[4]) mList.append(pmodel) else: return None if mList and len(mList) > 0: return mList else:return None
def getWeekKLineForCode(self,code): dataList = [] url = weekKLineUrl % (code + getMarketId(code)) res = getHtmlFromUrl(url) if not res:return None obj = getJsonObj6(res) if not obj:return None priceList = obj['data'] for data in priceList: dList = data.split(',') if len(dList) < 8:continue d = CompanyKLineDetailDataList(dList[0],dList[1],dList[2],dList[3],dList[4],dList[5],dList[6],dList[7]) dataList.append(d) detail = CompanyKLineDataDetail(obj['code'],obj['name'],dataList) return detail
def get_year(time): year = time.split('-')[0] return int(year) # ????????????2010???????[????????????????]
def fix_text(text): if text is None: return '' if hasattr(text, 'text'): text = text.text return ' '.join(text.split())