我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用copy.copy()。
def _parseCache( self, instring, loc, doActions=True, callPreParse=True ): HIT, MISS = 0, 1 lookup = (self, instring, loc, callPreParse, doActions) with ParserElement.packrat_cache_lock: cache = ParserElement.packrat_cache value = cache.get(lookup) if value is cache.not_in_cache: ParserElement.packrat_cache_stats[MISS] += 1 try: value = self._parseNoCache(instring, loc, doActions, callPreParse) except ParseBaseException as pe: # cache a copy of the exception, without the traceback cache.set(lookup, pe.__class__(*pe.args)) raise else: cache.set(lookup, (value[0], value[1].copy())) return value else: ParserElement.packrat_cache_stats[HIT] += 1 if isinstance(value, Exception): raise value return (value[0], value[1].copy())
def __call__(self, name=None): """ Shortcut for C{L{setResultsName}}, with C{listAllMatches=False}. If C{name} is given with a trailing C{'*'} character, then C{listAllMatches} will be passed as C{True}. If C{name} is omitted, same as calling C{L{copy}}. Example:: # these are equivalent userdata = Word(alphas).setResultsName("name") + Word(nums+"-").setResultsName("socsecno") userdata = Word(alphas)("name") + Word(nums+"-")("socsecno") """ if name is not None: return self.setResultsName(name) else: return self.copy()
def postParse( self, instring, loc, tokenlist ): for i,tok in enumerate(tokenlist): if len(tok) == 0: continue ikey = tok[0] if isinstance(ikey,int): ikey = _ustr(tok[0]).strip() if len(tok)==1: tokenlist[ikey] = _ParseResultsWithOffset("",i) elif len(tok)==2 and not isinstance(tok[1],ParseResults): tokenlist[ikey] = _ParseResultsWithOffset(tok[1],i) else: dictvalue = tok.copy() #ParseResults(i) del dictvalue[0] if len(dictvalue)!= 1 or (isinstance(dictvalue,ParseResults) and dictvalue.haskeys()): tokenlist[ikey] = _ParseResultsWithOffset(dictvalue,i) else: tokenlist[ikey] = _ParseResultsWithOffset(dictvalue[0],i) if self.resultsName: return [ tokenlist ] else: return tokenlist
def locatedExpr(expr): """ Helper to decorate a returned token with its starting and ending locations in the input string. This helper adds the following results names: - locn_start = location where matched expression begins - locn_end = location where matched expression ends - value = the actual parsed results Be careful if the input text contains C{<TAB>} characters, you may want to call C{L{ParserElement.parseWithTabs}} Example:: wd = Word(alphas) for match in locatedExpr(wd).searchString("ljsdf123lksdjjf123lkkjj1222"): print(match) prints:: [[0, 'ljsdf', 5]] [[8, 'lksdjjf', 15]] [[18, 'lkkjj', 23]] """ locator = Empty().setParseAction(lambda s,l,t: l) return Group(locator("locn_start") + expr("value") + locator.copy().leaveWhitespace()("locn_end")) # convenience constants for positional expressions
def convertToDate(fmt="%Y-%m-%d"): """ Helper to create a parse action for converting parsed date string to Python datetime.date Params - - fmt - format to be passed to datetime.strptime (default=C{"%Y-%m-%d"}) Example:: date_expr = pyparsing_common.iso8601_date.copy() date_expr.setParseAction(pyparsing_common.convertToDate()) print(date_expr.parseString("1999-12-31")) prints:: [datetime.date(1999, 12, 31)] """ def cvt_fn(s,l,t): try: return datetime.strptime(t[0], fmt).date() except ValueError as ve: raise ParseException(s, l, str(ve)) return cvt_fn
def convertToDatetime(fmt="%Y-%m-%dT%H:%M:%S.%f"): """ Helper to create a parse action for converting parsed datetime string to Python datetime.datetime Params - - fmt - format to be passed to datetime.strptime (default=C{"%Y-%m-%dT%H:%M:%S.%f"}) Example:: dt_expr = pyparsing_common.iso8601_datetime.copy() dt_expr.setParseAction(pyparsing_common.convertToDatetime()) print(dt_expr.parseString("1999-12-31T23:59:59.999")) prints:: [datetime.datetime(1999, 12, 31, 23, 59, 59, 999000)] """ def cvt_fn(s,l,t): try: return datetime.strptime(t[0], fmt) except ValueError as ve: raise ParseException(s, l, str(ve)) return cvt_fn
def _apply_pax_info(self, pax_headers, encoding, errors): """Replace fields with supplemental information from a previous pax extended or global header. """ for keyword, value in pax_headers.items(): if keyword == "GNU.sparse.name": setattr(self, "path", value) elif keyword == "GNU.sparse.size": setattr(self, "size", int(value)) elif keyword == "GNU.sparse.realsize": setattr(self, "size", int(value)) elif keyword in PAX_FIELDS: if keyword in PAX_NUMBER_FIELDS: try: value = PAX_NUMBER_FIELDS[keyword](value) except ValueError: value = 0 if keyword == "path": value = value.rstrip("/") setattr(self, keyword, value) self.pax_headers = pax_headers.copy()
def copyfileobj(src, dst, length=None, exception=OSError, bufsize=None): """Copy length bytes from fileobj src to fileobj dst. If length is None, copy the entire content. """ bufsize = bufsize or 16 * 1024 if length == 0: return if length is None: shutil.copyfileobj(src, dst, bufsize) return blocks, remainder = divmod(length, bufsize) for b in range(blocks): buf = src.read(bufsize) if len(buf) < bufsize: raise exception("unexpected end of data") dst.write(buf) if remainder != 0: buf = src.read(remainder) if len(buf) < remainder: raise exception("unexpected end of data") dst.write(buf) return
def makelink(self, tarinfo, targetpath): """Make a (symbolic) link called targetpath. If it cannot be created (platform limitation), we try to make a copy of the referenced file instead of a link. """ try: # For systems that support symbolic and hard links. if tarinfo.issym(): os.symlink(tarinfo.linkname, targetpath) else: # See extract(). if os.path.exists(tarinfo._link_target): os.link(tarinfo._link_target, targetpath) else: self._extract_member(self._find_link_target(tarinfo), targetpath) except symlink_exception: try: self._extract_member(self._find_link_target(tarinfo), targetpath) except KeyError: raise ExtractError("unable to resolve link inside archive")
def install_user_service(service_file, socket_file): """ Installs the service file and socket file into the xinetd service directory, sets the service to start on boot, and starts the service now. Args: service_file: The path to the systemd service file to install socket_file: The path to the systemd socket file to install """ if service_file is None: return service_name = os.path.basename(service_file) logger.debug("...Installing user service '%s'.", service_name) # copy service file service_path = os.path.join(XINETD_SERVICE_PATH, service_name) shutil.copy2(service_file, service_path) execute(["service", "xinetd", "restart"], timeout=60)
def sri(self): if self._sri != None: return _copy.copy(self._sri) else: # make a default sri from the current state keywords = [] try: for key in self._SRIKeywords: keywords.append(_CF.DataType(key._name, _getAnyValue(key))) except: pass candidateSri = _BULKIO.StreamSRI(1, 0.0, 1, 0, self._subsize, 0.0, 0, 0, 0, "defaultStreamID", self._blocking, keywords) if self._sampleRate > 0.0: candidateSri.xdelta = 1.0/float(self._sampleRate) if self._complexData and self._complexData == True: candidateSri.mode = 1 if self._startTime >= 0.0: candidateSri.xstart = self._startTime return candidateSri
def switch(condition, then_expression, else_expression): '''Switches between two operations depending on a scalar value (int or bool). Note that both `then_expression` and `else_expression` should be symbolic tensors of the *same shape*. # Arguments condition: scalar tensor. then_expression: TensorFlow operation. else_expression: TensorFlow operation. ''' x_shape = copy.copy(then_expression.get_shape()) x = tf.cond(tf.cast(condition, 'bool'), lambda: then_expression, lambda: else_expression) x.set_shape(x_shape) return x # Extras
def __repr__(self): """ N.B. this does not yet represent a string that can be used to instantiate an exact copy of an algorithm. However, it is getting close, and provides some value as something that can be inspected interactively. """ return """ {class_name}( capital_base={capital_base} sim_params={sim_params}, initialized={initialized}, slippage={slippage}, commission={commission}, blotter={blotter}, recorded_vars={recorded_vars}) """.strip().format(class_name=self.__class__.__name__, capital_base=self.capital_base, sim_params=repr(self.sim_params), initialized=self.initialized, slippage=repr(self.slippage), commission=repr(self.commission), blotter=repr(self.blotter), recorded_vars=repr(self.recorded_vars))
def copyfileobj(src, dst, length=None): """Copy length bytes from fileobj src to fileobj dst. If length is None, copy the entire content. """ if length == 0: return if length is None: shutil.copyfileobj(src, dst) return BUFSIZE = 16 * 1024 blocks, remainder = divmod(length, BUFSIZE) for b in xrange(blocks): buf = src.read(BUFSIZE) if len(buf) < BUFSIZE: raise IOError("end of file reached") dst.write(buf) if remainder != 0: buf = src.read(remainder) if len(buf) < remainder: raise IOError("end of file reached") dst.write(buf) return
def _apply_pax_info(self, pax_headers, encoding, errors): """Replace fields with supplemental information from a previous pax extended or global header. """ for keyword, value in pax_headers.iteritems(): if keyword not in PAX_FIELDS: continue if keyword == "path": value = value.rstrip("/") if keyword in PAX_NUMBER_FIELDS: try: value = PAX_NUMBER_FIELDS[keyword](value) except ValueError: value = 0 else: value = uts(value, encoding, errors) setattr(self, keyword, value) self.pax_headers = pax_headers.copy()
def makelink(self, tarinfo, targetpath): """Make a (symbolic) link called targetpath. If it cannot be created (platform limitation), we try to make a copy of the referenced file instead of a link. """ if hasattr(os, "symlink") and hasattr(os, "link"): # For systems that support symbolic and hard links. if tarinfo.issym(): if os.path.lexists(targetpath): os.unlink(targetpath) os.symlink(tarinfo.linkname, targetpath) else: # See extract(). if os.path.exists(tarinfo._link_target): if os.path.lexists(targetpath): os.unlink(targetpath) os.link(tarinfo._link_target, targetpath) else: self._extract_member(self._find_link_target(tarinfo), targetpath) else: try: self._extract_member(self._find_link_target(tarinfo), targetpath) except KeyError: raise ExtractError("unable to resolve link inside archive")
def location(self): """Get Location instance where this RTI is running. """ return copy.copy(self._location)
def get_peers(): _Peer._lock.acquire() peers = [copy.copy(peer.location) for peer in _Peer.peers.itervalues()] _Peer._lock.release() return peers
def location(self): """Get Location instance where this task is running. Can also be used on remotely running tasks. """ if self._location: return copy.copy(self._location) return self._scheduler.location
def location(self): """Get Location instance where this channel is located. Can also be used on remote channels. """ if self._location: return copy.copy(self._location) return self._scheduler.location
def location(self): """Get Location instance where this Pycos is running. """ return copy.copy(self._location)
def get_peers(): _Peer._lock.acquire() peers = [copy.copy(peer.location) for peer in _Peer.peers.values()] _Peer._lock.release() return peers
def fit(self, X, Y): if len(Y.shape)==1: Y = np.array([Y]).transpose() # Transform vector into column matrix # This is NOT what we want: Y = Y.reshape( -1, 1 ), because Y.shape[1] out of range self.n_target = Y.shape[1] # Num target values = num col of Y self.n_label = len(set(Y.ravel())) # Num labels = num classes (categories of categorical var if n_target=1 or n_target if labels are binary ) # Create the right number of copies of the predictor instance if len(self.predictors)!=self.n_target: predictorInstance = self.predictors[0] self.predictors = [predictorInstance] for i in range(1,self.n_target): self.predictors.append(copy.copy(predictorInstance)) # Fit all predictors for i in range(self.n_target): # Update the number of desired prodictos if hasattr(self.predictors[i], 'n_estimators'): self.predictors[i].n_estimators=self.n_estimators # Subsample if desired if self.balance: pos = Y[:,i]>0 neg = Y[:,i]<=0 if sum(pos)<sum(neg): chosen = pos not_chosen = neg else: chosen = neg not_chosen = pos num = sum(chosen) idx=filter(lambda(x): x[1]==True, enumerate(not_chosen)) idx=np.array(zip(*idx)[0]) np.random.shuffle(idx) chosen[idx[0:min(num, len(idx))]]=True # Train with chosen samples self.predictors[i].fit(X[chosen,:],Y[chosen,i]) else: self.predictors[i].fit(X,Y[:,i]) return
def copy(self): """ copy the refs. :return: """ new = BleuScorer(n=self.n) new.ctest = copy.copy(self.ctest) new.crefs = copy.copy(self.crefs) new._score = None return new
def store_sample(self, value): '''Store a value and the current time into the trace.''' self.append(Sample(now(), copy(value)))
def calc_helix_points(turtle, rad, pitch): """ calculates required points to produce helix bezier curve with given radius and pitch in direction of turtle""" # alpha = radians(90) # pit = pitch/(2*pi) # a_x = rad*cos(alpha) # a_y = rad*sin(alpha) # a = pit*alpha*(rad - a_x)*(3*rad - a_x)/(a_y*(4*rad - a_x)*tan(alpha)) # b_0 = Vector([a_x, -a_y, -alpha*pit]) # b_1 = Vector([(4*rad - a_x)/3, -(rad - a_x)*(3*rad - a_x)/(3*a_y), -a]) # b_2 = Vector([(4*rad - a_x)/3, (rad - a_x)*(3*rad - a_x)/(3*a_y), a]) # b_3 = Vector([a_x, a_y, alpha*pit]) # axis = Vector([0, 0, 1]) # simplifies greatly for case inc_angle = 90 points = [Vector([0, -rad, -pitch / 4]), Vector([(4 * rad) / 3, -rad, 0]), Vector([(4 * rad) / 3, rad, 0]), Vector([0, rad, pitch / 4])] # align helix points to turtle direction and randomize rotation around axis trf = turtle.dir.to_track_quat('Z', 'Y') spin_ang = rand_in_range(0, 2 * pi) for p in points: p.rotate(Quaternion(Vector([0, 0, 1]), spin_ang)) p.rotate(trf) return points[1] - points[0], points[2] - points[0], points[3] - points[0], turtle.dir.copy()