我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用time.ticks_ms()。
def __init__(self, name, pin, on_change=None): import onewire, ds18x20 gc.collect() Device.__init__(self, name, pin,on_change=on_change) self.ds = ds18x20.DS18X20(onewire.OneWire(pin)) self.roms = self.ds.scan() self.lasttime = time.ticks_ms() self.ds.convert_temp() self.temp_list = None self.getters[""] = self.value
def templog(sleep=True): """Log voltage and temperature to MQTT.""" start = time.ticks_ms() # get sensor values values = read_voltage() values.update(read_temps()) print(values) # send values over MQTT if connected if wait_connect(): if not mqtt_send(values): machine.reset() else: # failed to connect, reboot machine.reset() if sleep: delta = time.ticks_diff(start, time.ticks_ms()) deepsleep(delta)
def start_server(): print("Starting web server") sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind( ('', 80) ); sock.listen(1) sock.settimeout(5.0) print("Synchronous web server running...") gc.collect() print("gc.mem_free=", gc.mem_free()) t0 = time.ticks_ms() try: while True: accept_conn(sock) now = time.ticks_ms() if (now - t0) > 8000: periodic_tasks() t0 = now finally: sock.close()
def _read_timeout(cnt, timeout_ms=2000): time_support = "ticks_ms" in dir(time) s_time = time.ticks_ms() if time_support else 0 data = sys.stdin.read(cnt) if len(data) != cnt or (time_support and time.ticks_diff(time.ticks_ms(), s_time) > timeout_ms): return None return data
def v2_read_handler(): # This widget will show some time in seconds.. blynk.virtual_write(2, time.ticks_ms() // 1000) # Start Blynk (this call should never return)
def my_user_task(): # do any non-blocking operations print('Action') blynk.virtual_write(2, time.ticks_ms() // 1000)
def sleep_from_until (start, delay): while time.ticks_diff(start, time.ticks_ms()) < delay: idle_func() return start + delay
def _run_task(self): if self._task: c_millis = time.ticks_ms() if c_millis - self._task_millis >= self._task_period: self._task_millis += self._task_period self._task()
def v0_read_handler(): # we must call virtual write in order to send the value to the widget blynk.virtual_write(0, time.ticks_ms() // 1000) print('v0') # register the virtual pin
def sleep_from_until (start, delay): ## while time.ticks_diff(start, time.ticks_ms()) < delay: while time.ticks_diff(time.ticks_ms(),start) < delay: machine.idle() return start + delay
def mensagem_recebida(topico, payload): print('Alerta!') # Guardar tempo de início da mensagem comeco = time.ticks_ms() # Enquanto o valor da sirene não for atingido, alternamos os leds vermelho e azul while comeco + tempo_sirene > time.ticks_ms(): led_vermelho.value(time.tick_ms() % 500 > 250) led_azul.value(time.tick_ms() % 500 <= 250) # Conectar-se ao broker MQTT
def go_sleep(sleep_minutes): """Perform deepsleep to save energy.""" rtc = machine.RTC() rtc.irq(trigger=rtc.ALARM0, wake=machine.DEEPSLEEP) # ticks_ms is used to make wake up period more consistent sleep_seconds = (sleep_minutes * 60) - (ticks_ms() // 1000) rtc.alarm(rtc.ALARM0, sleep_seconds * 1000) print(str(sleep_seconds // 60) + ":" + str(sleep_seconds % 60) + " minutes deep sleep NOW!") machine.deepsleep()
def _trigger_next_turn(self): self.turn_start=time.ticks_ms() if len(self.angle_list)>0: self.write_angle(self.angle_list[0]) del self.angle_list[0]
def _update(self): if self.turn_start is not None: # turn in process current=time.ticks_ms() if time.ticks_diff(current, self.turn_start) >= self.turn_time_ms: if len(self.angle_list) > 0: self._trigger_next_turn() else: self._release()
def do_later(time_delta_s, callback, id=None): delta = int(time_delta_s * 1000) later = time.ticks_add(time.ticks_ms(), delta) if id is None: id = hash((hash(callback), later)) _timestack[id] = (later, callback) return id
def run(updates=5, sleepms=1, poll_rate_inputs=4, poll_rate_network=10): # updates: send out a status every this amount of seconds. # If 0, never send time based status updates only when change happened. # sleepms: going o sleep for how long between each loop run # poll_rate_network: how often to evaluate incoming network commands # poll_rate_inputs: how often to evaluate inputs _init_mqtt() if updates == 0: overflow = 1000 else: overflow = updates * 1000 overflow *= poll_rate_network * poll_rate_inputs / sleepms overflow = int(overflow) # print("Overflow:",overflow) counter = 0 while True: if counter % poll_rate_network == 0: _poll_subscripton() if counter % poll_rate_inputs == 0: device_list = [] for d in _devlist.values(): if d.update(): device_list.append(d) if len(device_list) > 0: _publish_status(device_list) if updates != 0 and counter % (updates * 1000) == 0: print("Publishing full update.") _publish_status(ignore_time=True) # execute things on timestack now = time.ticks_ms() for id in list(_timestack): t, cb = _timestack[id] if time.ticks_diff(now, t) >= 0: del (_timestack[id]) cb(id) time.sleep_ms(sleepms) counter += 1 if counter >= overflow: counter = 0
def __init__(self, name, pin, dht_dev, delay, on_change=None,report_change=True): self.delay = delay import dht self.dht = dht_dev self.lasttime = time.ticks_ms() self.dht.measure() Device.__init__(self, name, pin, on_change=on_change, getters={"temperature":self.temperature, "humidity":self.humidity}, report_change=report_change)
def time_controlled_measure(self): newtime = time.ticks_ms() if newtime - self.lasttime < 0 or newtime - self.lasttime > DS18X20.MEASURE_DELAY: self.temp_list = [] for rom in self.roms: self.temp_list.append(self.ds.read_temp(rom)) if len(self.temp_list)==1: self.temp_list = self.temp_list[0] self.ds.convert_temp() self.lasttime = newtime
def _update(self): t = time.ticks_ms() #if self.suspend_start is not None \ # and time.ticks_diff(t,self.suspend_start) <= self.suspend_time: # print("Bus access suspended.") if self.suspend_start is None \ or time.ticks_diff(t,self.suspend_start) > self.suspend_time: self.suspend_start = None # done waiting try: if self.msgq is not None: self.pin.writeto(self.addr, self.msgq) self.msgq = None s = self.pin.readfrom(self.addr,self.BUFFER_SIZE) except: print("Trouble accessing i2c.") else: if s[0]==255 and s[1]==255: # illegal read print("Got garbage, waiting 1s before accessing bus again.") print("data:", s) self.suspend_time=1000 # take a break self.suspend_start = time.ticks_ms(); else: count = s[0]*256+s[1] if self.count != count: l = s[2]; self.suspend_time = s[3]; # scale timer if self.suspend_time & 128: self.suspend_time = (self.suspend_time & 127) * 100 if self.suspend_time>5000: # should not happen print("Garbage time -> waiting 1s before accessing bus again.") print("data:",s) self.suspend_time=1000 if self.suspend_time > 0: self.suspend_start = time.ticks_ms(); print("Bus suspend for",self.suspend_time,"ms requested.") if l <= self.BUFFER_SIZE: # discard if too big self.count = count self.current_value = s[4:4+l]
def _update(self): if ticks_diff(ticks_ms(), self._last_measured) > self.INTERVAL: value = self._measure() if abs(value - self._distance) >= self.precision: self._distance = value self._last_measured = ticks_ms()
def __init__(self, device, command_str): self.program = command_str.split() self.device = device self.playing = True self.program = command_str.split() self.step = 0 self.fade_goals = {} self.length = 0 self.starttime = 0 self.delta = 0 self.lasttime = time.ticks_ms() self.effect = None
def _send(self): # TODO: does this need to be unblocking? #dp.println("s {},{},{}".format(self.out_fill,int(self.out_buffer[0]),int(self.out_buffer[1]))) # debug self.cs.send(self.out_buffer,length=self.out_fill) self.out_fill = 0 self.out_last_sent = time.ticks_ms()
def flush(self): t = time.ticks_ms() if self.out_fill == 0: # reset time, if there is nothing to send self.out_last_sent = t # debug dp.println("rt {}".format(t)) elif time.ticks_diff(t, self.out_last_sent) > self.INTERVAL: # debug dp.println("t {},{},{}".format(time.ticks_diff(t,self.out_last_sent), # t,self.out_last_sent)) self.acquire_out_buffer() self._send() self.release_out_buffer()
def ticks_ms(): return int(time.time() * 1000)
def receive(self, request=0, timeoutms=None): # receive into network buffer, # fill buffer once and decrypt # if request>0 wait blocking for request number of bytes (if timeout # given interrupt after timeoutms ms) # timeout==None: block # timeout==0: return immediately after trying to read something from # network buffer # timeout>0: try for time specified to read something before returning # this function always returns a pointer to the buffer and # number of bytes read (could be 0) data = self.netbuf_in data_mv = self.netbuf_in_mv readbytes = 0 start_t = ticks_ms() while readbytes < self.netbuf_size: try: if self.sock_read(data_mv[readbytes:readbytes+1]): readbytes += 1 # result was not 0 or none else: if readbytes >= request: break # break if not blocking to request size except OSError as e: if len(e.args) > 0 \ and (e.args[0] == errno.EAGAIN or e.args[0] == errno.ETIMEDOUT): if readbytes >= request: break # break if not blocking to request size else: raise if timeoutms is not None \ and ticks_diff(ticks_ms(), start_t) >= timeoutms: break sleep_ms(1) # prevent busy waiting if readbytes>0: self.crypt_in.decrypt(data,length=readbytes) return data,readbytes
def time_controlled_measure(): global _lasttime newtime = time.ticks_ms() if newtime - _lasttime < 0 or newtime - _lasttime > MEASURE_DELAY: d.measure() _lasttime = newtime
def makegauge(self): ''' Generator refreshing the raw measurments. ''' delays = (5, 8, 14, 25) while True: self._bmp_i2c.writeto_mem(self._bmp_addr, 0xF4, bytearray([0x2E])) t_start = time.ticks_ms() while (time.ticks_ms() - t_start) <= 5: # 5mS delay yield None try: self.UT_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF6, 2) except: yield None self._bmp_i2c.writeto_mem(self._bmp_addr, 0xF4, bytearray([0x34+(self.oversample_setting << 6)])) t_pressure_ready = delays[self.oversample_setting] t_start = time.ticks_ms() while (time.ticks_ms() - t_start) <= t_pressure_ready: yield None try: self.MSB_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF6, 1) self.LSB_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF7, 1) self.XLSB_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF8, 1) except: yield None yield True
def isr(self, pin): # debounce if time.ticks_diff(time.ticks_ms(), self.last_isr) < 10: return print('! reset gyro request') self.flag_reset_gyro = True self.last_isr = time.ticks_ms()
def serve(self): print('starting mpu server on port {}'.format(self.port)) lastgc = lastsent = lastread = time.ticks_ms() while True: now = time.ticks_ms() write_dt = time.ticks_diff(now, lastsent) read_dt = time.ticks_diff(now, lastread) gc_dt = time.ticks_diff(now, lastgc) time.sleep_ms(max(0, 1-read_dt)) if self.flag_reset_gyro: self.mpu.filter.reset_gyro() self.flag_reset_gyro = False values = self.mpu.read_position() lastread = now if write_dt >= self.write_interval: lastsent = time.ticks_ms() self.sock.sendto(tojson(values), ('192.168.4.2', 8000)) if gc_dt >= self.gc_interval: gc.collect()
def input(self, vals): now = time.ticks_ms() # unpack sensor readings accel_data = vals[0:3] gyro_data = vals[4:7] # convert accelerometer reading to degrees self.accel_pos = self.calculate_accel_pos(*accel_data) # if this is our first chunk of data, simply accept # the accelerometer reads and move on. if self.last == 0: self.filter_pos = self.gyro_pos = self.accel_pos self.last = now return # calculate the elapsed time (in seconds) since last data. # we need this because the gyroscope measures movement in # degrees/second. dt = time.ticks_diff(now, self.last)/1000 self.last = now # calculate change in position from gyroscope readings gyro_delta = [i * dt for i in gyro_data] self.gyro_pos = [i + j for i, j in zip(self.gyro_pos, gyro_delta)] # pitch self.filter_pos[0] = ( self.gyro_weight * (self.filter_pos[0] + gyro_delta[0]) + (1-self.gyro_weight) * self.accel_pos[0]) # roll self.filter_pos[1] = ( self.gyro_weight * (self.filter_pos[1] + gyro_delta[1]) + (1-self.gyro_weight) * self.accel_pos[1])
def waiting(self): acum = time.ticks_ms() delta = time.ticks_diff(time.ticks_ms(),acum) while delta < 5000: delta = time.ticks_diff(time.ticks_ms(), acum) time.sleep_ms(5) self.DO_SLEEP = True print("Ordem para dormir")
def do_get(clisock, uri, content_length): clisock.write(b'HTTP/1.0 200 OK\r\n' b'Content-type: text/html; charset=utf-8\r\n' b'\r\n') clisock.write(b'<!DOCTYPE html><html><head><title>Current time</title></head>') clisock.write(b'<body>The current time is: ') timestr ='{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}'.format(*time.localtime()) clisock.write(timestr.encode('ascii')) del timestr uptime_s = int(time.ticks_ms() / 1000) uptime_h = int(uptime_s / 3600) uptime_m = int(uptime_s / 60) uptime_m = uptime_m % 60 uptime_s = uptime_s % 60 clisock.write(b'<p>Uptime: {:02d}h {:02d}:{:02d}'.format( uptime_h, uptime_m, uptime_s)) clisock.write(b'<p>Flash ID: {:x}'.format(esp.flash_id())) clisock.write(b'<p>Flash size: {:d}'.format(esp.flash_size())) clisock.write(b'<p>Python: {:s} on {:s} '.format(str(sys.implementation), sys.platform)) clisock.write(b'<p>Unique ID: ') for b in machine.unique_id(): clisock.write(b'{:02x}'.format(b)) clisock.write(b'\n<h2>Network interfaces</h2>\n') clisock.write(b'\n<table><tr><th>mac<th>active</th><th>connected</th><th>IP</th><th>Gateway</th>') for i in network.STA_IF, network.AP_IF: wlan = network.WLAN(i) # Show MAC address. clisock.write(b'<tr>') clisock.write(b'<td>') for b in wlan.config('mac'): clisock.write(b'{:02X}'.format(b)) clisock.write(b'<td>{:}</td>'.format(wlan.active())) clisock.write(b'<td>{:}</td>'.format(wlan.isconnected())) ifconfig = wlan.ifconfig() #ip, netmask, gateway, dns ifconfig = (ifconfig[0], ifconfig[2]) # ip, gw for item in ifconfig: clisock.write(b'<td>{:}</td>'.format(item)) clisock.write(b'\n</table>\n')