我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用math.copysign()。
def get_cubic_root(self): # We have the equation x^2 D^2 + (1-x)^4 * C / h_min^2 # where x = sqrt(mu). # We substitute x, which is sqrt(mu), with x = y + 1. # It gives y^3 + py = q # where p = (D^2 h_min^2)/(2*C) and q = -p. # We use the Vieta's substution to compute the root. # There is only one real solution y (which is in [0, 1] ). # http://mathworld.wolfram.com/VietasSubstitution.html # eps in the numerator is to prevent momentum = 1 in case of zero gradient p = (self._dist_to_opt + eps)**2 * (self._h_min + eps)**2 / 2 / (self._grad_var + eps) w3 = (-math.sqrt(p**2 + 4.0 / 27.0 * p**3) - p) / 2.0 w = math.copysign(1.0, w3) * math.pow(math.fabs(w3), 1.0/3.0) y = w - p / 3.0 / (w + eps) x = y + 1 if DEBUG: logging.debug("p %f, den %f", p, self._grad_var + eps) logging.debug("w3 %f ", w3) logging.debug("y %f, den %f", y, w + eps) return x
def process_transactions(self, trade_event, current_orders): for order, txn in self.transact(trade_event, current_orders): if txn.type == zp.DATASOURCE_TYPE.COMMISSION: order.commission = (order.commission or 0.0) + txn.cost else: if txn.amount == 0: raise zipline.errors.TransactionWithNoAmount(txn=txn) if math.copysign(1, txn.amount) != order.direction: raise zipline.errors.TransactionWithWrongDirection( txn=txn, order=order) if abs(txn.amount) > abs(self.orders[txn.order_id].amount): raise zipline.errors.TransactionVolumeExceedsOrder( txn=txn, order=order) order.filled += txn.amount if txn.commission is not None: order.commission = ((order.commission or 0.0) + txn.commission) # mark the date of the order to match the transaction # that is filling it. order.dt = txn.dt yield txn, order
def scrollEvent(self, dx=0, dy=0): """ Generate scroll events from parametters and displacement @param int dx delta movement from last call on x axis @param int dy delta movement from last call on y axis @return float absolute distance moved this tick """ # Compute mouse mouvement from interger part of d * scale self._scr_dx += dx * self._scr_xscale self._scr_dy += dy * self._scr_yscale _syn = False if int(self._scr_dx): self.relEvent(rel=Rels.REL_HWHEEL, val=int(copysign(1, self._scr_dx))) self._scr_dx -= int(self._scr_dx) _syn = True if int(self._scr_dy): self.relEvent(rel=Rels.REL_WHEEL, val=int(copysign(1, self._scr_dy))) self._scr_dy -= int(self._scr_dy) _syn = True if _syn: self.synEvent()
def mode_ROUND(self, x, y, range): """ If input value bellow deadzone range, output value is zero If input value is above deadzone range, output value is 1 (or maximum allowed) """ if y == 0: # Small optimalization for 1D input, for example trigger if abs(x) > self.upper: return copysign(range, x) return (0 if abs(x) < self.lower else x), 0 distance = sqrt(x*x + y*y) if distance < self.lower: return 0, 0 if distance > self.upper: angle = atan2(x, y) return range * sin(angle), range * cos(angle) return x, y
def mode_LINEAR(self, x, y, range): """ Input value is scaled, so entire output range is covered by reduced input range of deadzone. """ if y == 0: # Small optimalization for 1D input, for example trigger return copysign( clamp( 0, ((x - self.lower) / (self.upper - self.lower)) * range, range), x ), 0 distance = clamp(self.lower, sqrt(x*x + y*y), self.upper) distance = (distance - self.lower) / (self.upper - self.lower) * range angle = atan2(x, y) return distance * sin(angle), distance * cos(angle)
def _subplot_scrolled(self, event): keys = event['key'] if not keys: # at least 1 step per event but a maximum of 5 step = max(1, abs(int(event['step']))) step = min(step, 5) step = copysign(step, event['step']) slice_model = self._get_slice_view(event).model() index = int(slice_model.index + step) if 0 <= index < len(slice_model): self._context.update_index_for_direction(slice_model.index_direction, index) elif keys.state(ctrl=True) or keys.state(shift=True): x = event['x'] y = event['y'] step = max(1, abs(int(event['step']))) step = copysign(step / 20.0, event['step']) slice_view = self._get_slice_view(event) if slice_view.zoom(x, y, step): self._context_changed()
def _from_float(cls, f): if isinstance(f, int): # handle integer inputs return cls(f) if not isinstance(f, float): raise TypeError("argument must be int or float.") if _math.isinf(f) or _math.isnan(f): return cls(repr(f)) if _math.copysign(1.0, f) == 1.0: sign = 0 else: sign = 1 n, d = abs(f).as_integer_ratio() #k = d.bit_length() - 1 k = _bit_length(d) - 1 result = _dec_from_triple(sign, str(n*5**k), -k) if cls is Decimal: return result else: return cls(result)
def assertFloatIdentical(self, x, y): """Fail unless floats x and y are identical, in the sense that: (1) both x and y are nans, or (2) both x and y are infinities, with the same sign, or (3) both x and y are zeros, with the same sign, or (4) x and y are both finite and nonzero, and x == y """ msg = 'floats {!r} and {!r} are not identical' if math.isnan(x) or math.isnan(y): if math.isnan(x) and math.isnan(y): return elif x == y: if x != 0.0: return # both zero; check that signs match elif math.copysign(1.0, x) == math.copysign(1.0, y): return else: msg += ': zeros have different signs' self.fail(msg.format(x, y))
def assertFloatsAreIdentical(self, x, y): """assert that floats x and y are identical, in the sense that: (1) both x and y are nans, or (2) both x and y are infinities, with the same sign, or (3) both x and y are zeros, with the same sign, or (4) x and y are both finite and nonzero, and x == y """ msg = 'floats {!r} and {!r} are not identical' if isnan(x) or isnan(y): if isnan(x) and isnan(y): return elif x == y: if x != 0.0: return # both zero; check that signs match elif copysign(1.0, x) == copysign(1.0, y): return else: msg += ': zeros have different signs' self.fail(msg.format(x, y))
def __init__(self, lattice, m, n): self.lattice = lattice self.m = m self.n = n # Chiral vector self.c = lattice.pos(m,n) self.magC = mag(self.c) # Translation vector d = gcd(2*n+m,2*m+n) self.t = lattice.pos((2*n+m)/d, -(2*m+n)/d); self.magT = mag(self.t) # Chiral rotation matrix (rotate a1 along x-axis) self.theta = acos(norm(self.c)[0]*copysign(1, self.c[1])) self.rotM = np.array([ [cos(self.theta), sin(self.theta)], [-sin(self.theta), cos(self.theta)]]).T # Calculate atoms and bonds in unit cell self._boundsErr = mag(lattice.pos(0,0,0) - lattice.pos(0,0,1)) self.indices = self._calcIndices(m, n) self.atoms = self._calcAtoms(self.indices) self.bonds = self._calcBonds(self.indices)
def start_run(self, at_home_door, distance, angular): if at_home_door == True: self.brake() self.robot_move.move_to(x=self.last_move_goal) return True if distance > (self.line_distance + self.distance_tolerance): self.vel.linear.x = self.x_speed self.vel.linear.y = math.copysign(self.y_speed, self.move_direction) elif distance < (self.line_distance - self.distance_tolerance): self.vel.linear.x = self.x_speed * -1 self.vel.linear.y = self.y_speed else: self.vel.linear.x = 0 self.vel.linear.y = self.y_speed if math.fabs(angular) < self.angular_tolerance: self.vel.angular.z = 0 elif angular > 0: self.vel.angular.z = -1 * self.z_speed else: self.vel.angular.z = self.z_speed self.cmd_move_pub.publish(self.vel) return False
def turn(self, goal_angular): # ???????????????,?????????????? # ????,???? rospy.loginfo('[robot_move_pkg]->move_in_robot.turn will turn %s'%goal_angular) current_angular = start_angular = self.robot_state.get_robot_current_w()#?????????? r = rospy.Rate(100) delta_angular = current_angular - start_angular move_velocity = g_msgs.Twist() while not rospy.is_shutdown() : if abs(delta_angular - abs(goal_angular)) < self.turn_move_stop_tolerance: break current_angular = self.robot_state.get_robot_current_w() move_velocity.angular.z = math.copysign(self.w_speed, goal_angular) delta_angular += abs(abs(current_angular) - abs(start_angular) ) start_angular = current_angular self.cmd_move_pub.publish(move_velocity) #??????????? r.sleep() self.brake()
def _drive_to_1( self, speed: int, pos_x: float, pos_y: float ) -> None: diff_x = pos_x - self._pos_x diff_y = pos_y - self._pos_y if abs(diff_x) > abs(diff_y): direct = math.degrees(math.atan(diff_y/diff_x)) else: fract = diff_x / diff_y sign = math.copysign(1, fract) direct = sign * 90 - math.degrees(math.atan(fract)) if diff_x < 0: direct += 180 self._rotate_to(speed, direct)
def drain_asset(positions, asset, quantity): """ Generic method of reducing the quantity of assets across an entire set of positions This is hard to do manually because positions can duplicates and arbitrary quantities Traverse the entire position set reducing quantities to zero until it hits the target reduction """ remaining_quantity = quantity # get a list of positions that are opposite to the quantity we are draining positions = [_ for _ in positions if _.asset == asset and copysign(1,_.quantity) == copysign(1, quantity * -1)] for position in positions: if abs(remaining_quantity) <= abs(position.quantity): # there are enough quantity in this position to complete it position.quantity += remaining_quantity remaining_quantity = 0 return remaining_quantity if abs(remaining_quantity) > abs(position.quantity): # we are going to have some left over remaining_quantity += position.quantity position.quantity = 0 return remaining_quantity
def estimate(self, quote:Quote, quantity:int = None): # quantity is only used for direction quantity = copysign(1, quantity) if quote.bid is None or quote.ask is None or quote.bid == 0.0 or quote.ask == 0.0: raise Exception( 'SlippageEstimator.estimate: Cannot estimate a price if the bid or ask are None or 0.0') if quantity is None or quantity == 0: raise Exception( 'SlippageEstimator.estimate: Must provide a signed quantity to buy or sell.') spread = (quote.ask - quote.bid) / 2 midpoint = quote.bid + spread if quantity > 0: return midpoint - spread * self.slippage else: return midpoint + spread * self.slippage
def _check_battery_state(_battery_acpi_path): """ @return LaptopChargeStatus """ o = slerp(_battery_acpi_path+'/state') batt_info = yaml.load(o) rv = LaptopChargeStatus() state = batt_info.get('charging state', 'discharging') rv.charge_state = state_to_val.get(state, 0) rv.rate = _strip_A(batt_info.get('present rate', '-1 mA')) if rv.charge_state == LaptopChargeStatus.DISCHARGING: rv.rate = math.copysign(rv.rate, -1) # Need to set discharging rate to negative rv.charge = _strip_Ah(batt_info.get('remaining capacity', '-1 mAh')) rv.voltage = _strip_V(batt_info.get('present voltage', '-1 mV')) rv.present = batt_info.get('present', False) rv.header.stamp = rospy.get_rostime() return rv
def _simulateTemps(self, delta=1): timeDiff = self.lastTempAt - time.time() self.lastTempAt = time.time() for i in range(len(self.temp)): if abs(self.temp[i] - self.targetTemp[i]) > delta: oldVal = self.temp[i] self.temp[i] += math.copysign(timeDiff * 10, self.targetTemp[i] - self.temp[i]) if math.copysign(1, self.targetTemp[i] - oldVal) != math.copysign(1, self.targetTemp[i] - self.temp[i]): self.temp[i] = self.targetTemp[i] if self.temp[i] < 0: self.temp[i] = 0 if abs(self.bedTemp - self.bedTargetTemp) > delta: oldVal = self.bedTemp self.bedTemp += math.copysign(timeDiff * 10, self.bedTargetTemp - self.bedTemp) if math.copysign(1, self.bedTargetTemp - oldVal) != math.copysign(1, self.bedTargetTemp - self.bedTemp): self.bedTemp = self.bedTargetTemp if self.bedTemp < 0: self.bedTemp = 0
def _simulateTemps(self, delta=1): timeDiff = self.lastTempAt - time.time() self.lastTempAt = time.time() for i in range(len(self.temp)): if abs(self.temp[i] - self.targetTemp[i]) > delta: oldVal = self.temp[i] self.temp[i] += math.copysign(timeDiff * 10, self.targetTemp[i] - self.temp[i]) if math.copysign(1, self.targetTemp[i] - oldVal) != math.copysign(1, self.targetTemp[i] - self.temp[i]): self.temp[i] = self.targetTemp[i] if self.temp[i] < self.ambientTemp: self.temp[i] = self.ambientTemp if abs(self.bedTemp - self.bedTargetTemp) > delta: oldVal = self.bedTemp self.bedTemp += math.copysign(timeDiff * 10, self.bedTargetTemp - self.bedTemp) if math.copysign(1, self.bedTargetTemp - oldVal) != math.copysign(1, self.bedTargetTemp - self.bedTemp): self.bedTemp = self.bedTargetTemp if self.bedTemp < self.ambientTemp: self.bedTemp = self.ambientTemp
def setf(self, value): """store /value/ into a binary format""" exponentbias = (2**self.components[1])/2 - 1 m,e = math.frexp(value) # extract components from value s = math.copysign(1.0, m) m = abs(m) # convert to integrals sf = 1 if s < 0 else 0 exponent = e + exponentbias - 1 m = m*2.0 - 1.0 # remove the implicit bit mantissa = int(m * (2**self.components[2])) # store components result = bitmap.zero result = bitmap.push( result, bitmap.new(sf,self.components[0]) ) result = bitmap.push( result, bitmap.new(exponent,self.components[1]) ) result = bitmap.push( result, bitmap.new(mantissa,self.components[2]) ) return self.__setvalue__( result[0] )
def getf(self): """convert the stored floating-point number into a python native float""" exponentbias = (2**self.components[1])/2 - 1 res = bitmap.new( self.__getvalue__(), sum(self.components) ) # extract components res,sign = bitmap.shift(res, self.components[0]) res,exponent = bitmap.shift(res, self.components[1]) res,mantissa = bitmap.shift(res, self.components[2]) if exponent > 0 and exponent < (2**self.components[2]-1): # convert to float s = -1 if sign else +1 e = exponent - exponentbias m = 1.0 + (float(mantissa) / 2**self.components[2]) # done return math.ldexp( math.copysign(m,s), e) # FIXME: this should return NaN or something Log.warn('float_t.getf : {:s} : Invalid exponent value : {:d}'.format(self.instance(), exponent)) return 0.0
def rect(r, phi): _rect_special = [ [inf+nanj, None, -inf, complex(-float("inf"), -0.0), None, inf+nanj, inf+nanj], [nan+nanj, None, None, None, None, nan+nanj, nan+nanj], [0, None, complex(-0.0, 0.0), complex(-0.0, -0.0), None, 0, 0], [0, None, complex(0.0, -0.0), 0, None, 0, 0], [nan+nanj, None, None, None, None, nan+nanj, nan+nanj], [inf+nanj, None, complex(float("inf"), -0.0), inf, None, inf+nanj, inf+nanj], [nan+nanj, nan+nanj, nan, nan, nan+nanj, nan+nanj, nan+nanj] ] if not math.isfinite(r) or not math.isfinite(phi): if math.isinf(phi) and not math.isnan(r) and r != 0: raise ValueError if math.isinf(r) and math.isfinite(phi) and phi != 0: if r > 0: return complex(math.copysign(inf, math.cos(phi)), math.copysign(inf, math.sin(phi))) return complex(-math.copysign(inf, math.cos(phi)), -math.copysign(inf, math.sin(phi))) return _rect_special[_special_type(r)][_special_type(phi)] return complex(r*math.cos(phi), r*math.sin(phi))
def segment_points(start_pose, curvature, length, delta_length): """Return points of segment, at delta_length intervals.""" l = 0.0 delta_length = copysign(delta_length, length) points = [] while abs(l) < abs(length): points.append(CurveSegment.end_pose(start_pose, curvature, l)[0:2]) l += delta_length return points # -------------------------------------------------------------------------- # Exploration of car's kinematic state space. # -------------------------------------------------------------------------- # Allowed movements. These are given as tuples: (curvature, length). # The list contains forward and backward movements.
def ventogeostrofico(lat,GradientePressione,R): if R==0.0 or GradientePressione==0.0: tws=0 else: omega=2*math.pi/(24*3600)#rotazione terrestre f=2*omega*math.sin(lat)#parametro di coriolis densitaaria=1.2#kgm/mc vale per T=20gradi R=R*1853.0#raggio di curvatura isobare espressa in m GradientePressione=GradientePressione*100.0/1853.0#Gradiente espresso in Pa/m segno=GradientePressione/math.copysign(GradientePressione,1) a=segno/R b=-f c=math.copysign(GradientePressione,1)/densitaaria discriminante=(b**2-4*a*c) if discriminante >0: tws=(-b-discriminante**0.5)/(2*a) tws=tws*3600.0/1853.0#converte tws in knts else: tws=0.0#caso del centro di alta pressione return tws
def ventogeostroficoCentroAzione(PCentroAzione,LatCentroAzione,LonCentroAzione,extension,lat,lon): #campo di pressione: P=0.5*(PCentro-1013)*cos(pi/extension*r)+(PCentro+1013)*0.5, per r<=extension, P=1013 per r >extension #r distanza dal centro d'azione e raggio curvatura isobara losso=lossodromica(LatCentroAzione,LonCentroAzione,lat,lon) r=losso[0] brg=losso[1] tws=0 twd=0 segno=0 exp=0 if r<extension: if PCentroAzione>1013: segno=1 else: segno=-1 #GradientePressione=-0.5*(PCentroAzione-1013)*math.pi/extension*math.sin(math.pi/extension*r) GradientePressione=0.5*(PCentroAzione-1013)*math.pi/extension*math.sin(math.pi/extension*r) GradientePressione=math.copysign(GradientePressione,1) tws=ventogeostrofico(LatCentroAzione,segno*GradientePressione,r) if LatCentroAzione>0: twd=riduci360(brg-segno*math.radians(100))#emisfero Nord else: twd=riduci360(brg+segno*math.radians(100))#emisfero Sud return tws,twd
def interno(poligono,pto): #poligono orientato in senso orario somma=0 for i in range(0,len(poligono)-1): v1=poligono[i] v2=poligono[i+1] rlv1=math.atan2((v1[1]-pto[1]),(v1[0]-pto[0])) if rlv1<0:rlv1=2*math.pi+rlv1 rlv2=math.atan2((v2[1]-pto[1]),(v2[0]-pto[0])) if rlv2<0:rlv2=2*math.pi+rlv2 angolo=math.copysign(rlv2-rlv1,1) if angolo>math.pi: angolo=2*math.pi-angolo somma=somma+angolo if somma<1.99*math.pi: Interno=False else: Interno=True return Interno
def loxodrome(latA, lonA, latB, lonB): # lossodromica # Rhumb line navigation # Takes two points on earth and returns: # the distance and constant (true) bearing one would need to # follow to reach it. # Doesn't function near poles: # but you shouldn't be sailing near the poles anyways! # when latB = -90: math domain error log(0) # when latA = -90 [zero divison error] # when A==B returns (0.0,0.0) # if latA == latB: if math.copysign(latA-latB, 1) <= math.radians(0.1/3600.0): q = math.cos(latA) else: Df = math.log(math.tan(latB/2+math.pi/4) / math.tan(latA/2+math.pi/4), math.e) q = (latB-latA) * 1.0/Df Distance = EARTH_RADIUS * ((latA-latB)**2+(q*(lonA-lonB))**2)**0.5 Heading = math.atan2(-q*(lonB-lonA), (latB-latA)) if Heading < 0: Heading = Heading + 2.0 * math.pi # atan2:[-pi,pi] return Distance, Heading
def print_lat(lat): # def stampalat(lat): # returns a string in the format xxDegrees,xxMinutes, N/S lat_decimal = math.copysign(math.degrees(lat), 1) # latdecimali lat_degree = int(lat_decimal) # latgradi lat_minute = (lat_decimal - lat_degree) * 60 # latprimi if lat_minute > 59.51: lat_degree = lat_degree + 1 lat_minute = 0 else: if lat_minute - int(lat_minute) > 0.51: lat_minute = int(lat_minute) + 1 else: lat_minute = int(lat_minute) if lat > 0: hemisphere = "N" # segno else: hemisphere = "S" gradi = "%2d" % lat_degree # gradi primi = "%2d" % lat_minute # primi lat = (gradi.replace(" ", "0") + u"°" + primi.replace(" ", "0") + "'" + hemisphere) return lat
def print_lon(lon): # def stampalon(lon): # returns a string in the format xxDegrees,xxMinutes, N/S lon_decimal = math.copysign(math.degrees(lon), 1) # londecimali lon_degree = int(lon_decimal) # longradi lon_minutes = (lon_decimal - lon_degree) * 60 # lonprimi if lon_minutes > 59.51: lon_degree = lon_degree + 1 lon_minutes = 0 else: if lon_minutes - int(lon_minutes) > 0.51: lon_minutes = int(lon_minutes) + 1 else: lon_minutes = int(lon_minutes) if lon > 0: hemisphere = "W" else: hemisphere = "E" degrees = "%3d" % lon_degree # gradi minutes = "%2d" % lon_minutes lon = (degrees.replace(" ", "0") + u"°" + minutes.replace(" ", "0") + "'" + hemisphere) return lon
def calcolaventogeostrofico(lat,GradientePressione,R): omega=2*math.pi/(24*3600)#rotazione terrestre f=2*omega*math.sin(lat)#parametro di coriolis densitaaria=1.2#kgm/mc vale per T=20gradi R=R*1853.0#curvatura isobare espressa in m GradientePressione=GradientePressione*100.0/1853.0#Gradiente espresso in Pa/m segno=GradientePressione/math.copysign(GradientePressione,1) a=segno/R b=-f c=math.copysign(GradientePressione,1)/densitaaria discriminante=(b**2-4*a*c) if discriminante >0: tws=(-b-discriminante**0.5)/(2*a) tws=tws*3600.0/1853.0#converte tws in knts else: tws=0.0#caso del centro di alta pressione return tws
def interno(poligono,pto): #poligono orientato in senso orario somma=0 for i in range(0,len(poligono)-1): v1=poligono[i] v2=poligono[i+1] rlv1=math.atan2((v1[1]-pto[1]),(v1[0]-pto[0])) if rlv1<0:rlv1=2*math.pi+rlv1 rlv2=math.atan2((v2[1]-pto[1]),(v2[0]-pto[0])) if rlv2<0:rlv2=2*math.pi+rlv2 angolo=math.copysign(rlv2-rlv1,1) if angolo>math.pi: angolo=2*math.pi-angolo somma=somma+angolo if somma<2*math.pi: Interno=False else: Interno=True return Interno
def get_cubic_root(self): # We have the equation x^2 D^2 + (1-x)^4 * C / h_min^2 # where x = sqrt(mu). # We substitute x, which is sqrt(mu), with x = y + 1. # It gives y^3 + py = q # where p = (D^2 h_min^2)/(2*C) and q = -p. # We use the Vieta's substution to compute the root. # There is only one real solution y (which is in [0, 1] ). # http://mathworld.wolfram.com/VietasSubstitution.html # eps in the numerator is to prevent momentum = 1 in case of zero gradient if np.isnan(self._dist_to_opt) or np.isnan(self._h_min) or np.isnan(self._grad_var) \ or np.isinf(self._dist_to_opt) or np.isinf(self._h_min) or np.isinf(self._grad_var): logging.warning("Input to cubic solver has invalid nan/inf value!") raise Exception("Input to cubic solver has invalid nan/inf value!") p = (self._dist_to_opt + eps)**2 * (self._h_min + eps)**2 / 2 / (self._grad_var + eps) w3 = (-math.sqrt(p**2 + 4.0 / 27.0 * p**3) - p) / 2.0 w = math.copysign(1.0, w3) * math.pow(math.fabs(w3), 1.0/3.0) y = w - p / 3.0 / (w + eps) x = y + 1 if self._verbose: logging.debug("p %f, denominator %f", p, self._grad_var + eps) logging.debug("w3 %f ", w3) logging.debug("y %f, denominator %f", y, w + eps) if np.isnan(x) or np.isinf(x): logging.warning("Output from cubic is invalid nan/inf value!") raise Exception("Output from cubic is invalid nan/inf value!") return x
def transact_stub(slippage, commission, event, open_orders): """ This is intended to be wrapped in a partial, so that the slippage and commission models can be enclosed. """ for order, transaction in slippage(event, open_orders): if transaction and transaction.amount != 0: direction = math.copysign(1, transaction.amount) per_share, total_commission = commission.calculate(transaction) transaction.price += per_share * direction transaction.commission = total_commission yield order, transaction
def update(self, txn): if self.sid != txn.sid: raise Exception('updating position with txn for a ' 'different sid') total_shares = self.amount + txn.amount if total_shares == 0: self.cost_basis = 0.0 else: prev_direction = copysign(1, self.amount) txn_direction = copysign(1, txn.amount) if prev_direction != txn_direction: # we're covering a short or closing a position if abs(txn.amount) > abs(self.amount): # we've closed the position and gone short # or covered the short position and gone long self.cost_basis = txn.price else: prev_cost = self.cost_basis * self.amount txn_cost = txn.amount * txn.price total_cost = prev_cost + txn_cost self.cost_basis = total_cost / total_shares # Update the last sale price if txn is # best data we have so far if self.last_sale_date is None or txn.dt > self.last_sale_date: self.last_sale_price = txn.price self.last_sale_date = txn.dt self.amount = total_shares
def __init__(self, dt, sid, amount, stop=None, limit=None, filled=0, commission=None, id=None): """ @dt - datetime.datetime that the order was placed @sid - stock sid of the order @amount - the number of shares to buy/sell a positive sign indicates a buy a negative sign indicates a sell @filled - how many shares of the order have been filled so far """ # get a string representation of the uuid. self.id = id or self.make_id() self.dt = dt self.reason = None self.created = dt self.sid = sid self.amount = amount self.filled = filled self.commission = commission self._status = ORDER_STATUS.OPEN self.stop = stop self.limit = limit self.stop_reached = False self.limit_reached = False self.direction = math.copysign(1, self.amount) self.type = zp.DATASOURCE_TYPE.ORDER
def from_float(cls, f): """Converts a float to a decimal number, exactly. Note that Decimal.from_float(0.1) is not the same as Decimal('0.1'). Since 0.1 is not exactly representable in binary floating point, the value is stored as the nearest representable value which is 0x1.999999999999ap-4. The exact equivalent of the value in decimal is 0.1000000000000000055511151231257827021181583404541015625. >>> Decimal.from_float(0.1) Decimal('0.1000000000000000055511151231257827021181583404541015625') >>> Decimal.from_float(float('nan')) Decimal('NaN') >>> Decimal.from_float(float('inf')) Decimal('Infinity') >>> Decimal.from_float(-float('inf')) Decimal('-Infinity') >>> Decimal.from_float(-0.0) Decimal('-0') """ if isinstance(f, (int, long)): # handle integer inputs return cls(f) if _math.isinf(f) or _math.isnan(f): # raises TypeError if not a float return cls(repr(f)) if _math.copysign(1.0, f) == 1.0: sign = 0 else: sign = 1 n, d = abs(f).as_integer_ratio() k = d.bit_length() - 1 result = _dec_from_triple(sign, str(n*5**k), -k) if cls is Decimal: return result else: return cls(result)
def __update(self): area = self.area if not area: self.__zero() return # Center of mass # https://en.wikipedia.org/wiki/Center_of_mass#A_continuous_volume self.meanX = meanX = self.momentX / area self.meanY = meanY = self.momentY / area # Var(X) = E[X^2] - E[X]^2 self.varianceX = varianceX = self.momentXX / area - meanX**2 self.varianceY = varianceY = self.momentYY / area - meanY**2 self.stddevX = stddevX = math.copysign(abs(varianceX)**.5, varianceX) self.stddevY = stddevY = math.copysign(abs(varianceY)**.5, varianceY) # Covariance(X,Y) = ( E[X.Y] - E[X]E[Y] ) self.covariance = covariance = self.momentXY / area - meanX*meanY # Correlation(X,Y) = Covariance(X,Y) / ( stddev(X) * stddev(Y) ) # https://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient correlation = covariance / (stddevX * stddevY) self.correlation = correlation if abs(correlation) > 1e-3 else 0 slant = covariance / varianceY self.slant = slant if abs(slant) > 1e-3 else 0
def copysign(x, y): """return x with the sign of y (missing from python 2.5.2)""" if sys.version_info > (2, 6): return math.copysign(x, y) return math.fabs(x) * (-1 if y < 0 or (y == 0 and 1/y < 0) else 1)
def f0(cls, x): return math.sqrt(abs(x))*math.copysign(1,x)*math.sin(1/(abs(x)+pow(10, -9)))