我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用time.sleep_ms()。
def do_connect(ssid, pwd, TYPE, hard_reset=True): interface = network.WLAN(TYPE) # Stage zero if credential are null disconnect if not pwd or not ssid : print('Disconnecting ', TYPE) interface.active(False) return None if TYPE == network.AP_IF: interface.active(True) time.sleep_ms(200) interface.config(essid=ssid, password=pwd) return interface if hard_reset: interface.active(True) interface.connect(ssid, pwd) # Stage one check for default connection print('Connecting') for t in range(120): time.sleep_ms(250) if interface.isconnected(): print('Yes! Connected') return interface if t == 60 and not hard_reset: # if still not connected interface.active(True) interface.connect(ssid, pwd) # No way we are not connected print('Cant connect', ssid) return None #---------------------------------------------------------------- # MAIN PROGRAM STARTS HERE
def deauth(_ap,_client,type,reason): # 0 - 1 type, subtype c0: deauth (a0: disassociate) # 2 - 3 duration (SDK takes care of that) # 4 - 9 reciever (target) # 10 - 15 source (ap) # 16 - 21 BSSID (ap) # 22 - 23 fragment & squence number # 24 - 25 reason code (1 = unspecified reason) packet=bytearray([0xC0,0x00,0x00,0x00,0xBB,0xBB,0xBB,0xBB,0xBB,0xBB,0xCC, 0xCC, 0xCC, 0xCC, 0xCC, 0xCC,0xCC, 0xCC, 0xCC, 0xCC, 0xCC, 0xCC,0x00, 0x00,0x01, 0x00]) for i in range(0,6): packet[4 + i] =_client[i] packet[10 + i] = packet[16 + i] =_ap[i] #set type packet[0] = type; packet[24] = reason result=sta_if.send_pkt_freedom(packet) if result==0: time.sleep_ms(1) return True else: return False
def read_temp(): # the device is on GPIO12 dat = machine.Pin(5) # create the onewire object ds = ds18x20.DS18X20(onewire.OneWire(dat)) # scan for devices on the bus roms = ds.scan() # print('found devices:', roms) # print('temperature:', end=' ') ds.convert_temp() time.sleep_ms(750) temp = ds.read_temp(roms[0]) # print(temp) return temp
def active(self, value=None): if value is None: return self._active value = bool(value) if self._active == value: return self._active = value enable = self._register8(_REGISTER_ENABLE) if value: self._register8(_REGISTER_ENABLE, enable | _ENABLE_PON) time.sleep_ms(3) self._register8(_REGISTER_ENABLE, enable | _ENABLE_PON | _ENABLE_AEN) else: self._register8(_REGISTER_ENABLE, enable & ~(_ENABLE_PON | _ENABLE_AEN))
def loop(): show_temp=True while True: g=get() display.fill(0) display.text("Weather in",0,0) display.text(g['name'],0,8) if left_button.value()==0 or right_button.value()==0: show_temp=not show_temp if show_temp: celsius=g['main']['temp']-273.15 display.text("Temp/C: %.1f"%celsius,0,24) display.text("Temp/F: %.1f"%(celsius*1.8+32),0,32) else: # humidity display.text("Humidity: %d%%"%g['main']['humidity'],0,24) if lower_button.value()==0: break display.show() time.sleep_ms(500)
def __init__(self, i2c, i2c_addr, num_lines, num_columns): self.i2c = i2c self.i2c_addr = i2c_addr self.i2c.writeto(self.i2c_addr, bytes([0])) sleep_ms(20) # Allow LCD time to powerup # Send reset 3 times self.hal_write_init_nibble(self.LCD_FUNCTION_RESET) sleep_ms(5) # need to delay at least 4.1 msec self.hal_write_init_nibble(self.LCD_FUNCTION_RESET) sleep_ms(1) self.hal_write_init_nibble(self.LCD_FUNCTION_RESET) sleep_ms(1) # Put LCD into 4 bit mode self.hal_write_init_nibble(self.LCD_FUNCTION) sleep_ms(1) LcdApi.__init__(self, num_lines, num_columns) cmd = self.LCD_FUNCTION if num_lines > 1: cmd |= self.LCD_FUNCTION_2LINES self.hal_write_command(cmd)
def hal_write_command(self, cmd): """Writes a command to the LCD. Data is latched on the falling edge of E. """ byte=((self.backlight << SHIFT_BACKLIGHT) | (((cmd >> 4) & 0x0f) << SHIFT_DATA)) self.i2c.writeto(self.i2c_addr, bytes([byte | MASK_E])) self.i2c.writeto(self.i2c_addr, bytes([byte])) byte=((self.backlight << SHIFT_BACKLIGHT) | ((cmd & 0x0f) << SHIFT_DATA)) self.i2c.writeto(self.i2c_addr, bytes([byte | MASK_E])) self.i2c.writeto(self.i2c_addr, bytes([byte])) if cmd <= 3: # The home and clear commands require a worst # case sleep_ms of 4.1 msec sleep_ms(5)
def readDS18x20(): from machine import Pin import onewire, ds18x20 OneWirePin = 0 # the device is on GPIO12 dat = Pin(OneWirePin) # create the onewire object ds = ds18x20.DS18X20(onewire.OneWire(dat)) # scan for devices on the bus roms = ds.scan() ds.convert_temp() time.sleep_ms(750) values = [] for rom in roms: values.append(ds.read_temp(rom)) print(values) return values
def get_sensor_avg(self, samples, softstart=100): '''Return the average readings from the sensors over the given number of samples. Discard the first softstart samples to give things time to settle.''' sample = self.read_sensors() counters = [0] * 7 for i in range(samples + softstart): # the sleep here is to ensure we read a new sample # each time time.sleep_ms(2) sample = self.read_sensors() if i < softstart: continue for j, val in enumerate(sample): counters[j] += val return [x//samples for x in counters]
def measure(self): buf = self.buf address = self.address # wake sensor try: self.i2c.writeto(address, b'') except OSError: pass # read 4 registers starting at offset 0x00 self.i2c.writeto(address, b'\x03\x00\x04') # wait at least 1.5ms time.sleep_ms(2) # read data self.i2c.readfrom_mem_into(address, 0, buf) crc = ustruct.unpack('<H', bytearray(buf[-2:]))[0] if (crc != self.crc16(buf[:-2])): raise Exception("checksum error")
def measure(self): buf = self.buf address = self.address # wake sensor try: self.i2c.writeto(address, b'') except OSError: pass # read 4 registers starting at offset 0x00 self.i2c.writeto(address, b'\x03\x00\x04') # wait at least 1.5ms time.sleep_ms(2) # read data self.i2c.readfrom_mem_into(address, 0, buf) # debug print print(ustruct.unpack('BBBBBBBB', buf)) crc = ustruct.unpack('<H', bytearray(buf[-2:]))[0] if (crc != self.crc16(buf[:-2])): raise Exception("checksum error")
def measure(self): self.wakeup() # request measure wbuf = bytearray( 3 ) wbuf[0] = AM2315_READREG wbuf[1] = 0x00 # Start at adress 0x00 wbuf[2] = 0x04 # Request 4 byte data self.i2c.writeto( self.addr, wbuf ) # b'\x03\x00\x04' ) # wait 1.5+ ms before reading time.sleep_ms( 2 ) # Request 8 bytes from sensor #rbuf = bytearray( 8 ) self.i2c.readfrom_mem_into(self.addr, 0, self.rbuf) # Read from Reg 0 return self.check_response()
def status(self): ow = onewire.OneWire(Pin(2)) ds = ds18x20.DS18X20(ow) roms = ds.scan() if not len(roms): print("Nao encontrei um dispositivo onewire.") return None ds.convert_temp() time.sleep_ms(750) temp = 0 for i in range(10): for rom in roms: temp += ds.read_temp(rom) return temp / 10 # envia a temperatura para o broker
def _init__(self, cspin): self.cspin = Pin(cspin, mode=Pin.OUT) self.cspin.value(1) self.x = 0 self.y = 0 self.z = 0 self.t = 0 self.spi = SPI(0, mode=SPI.MASTER, baudrate=1000000, polarity=0, phase=0, firstbit=SPI.MSB) time.sleep_ms(1000) self.SPIwriteOneRegister(0x1F, 0x52) #RESET time.sleep_ms(10) self.DEVICEID = self.SPIreadOneRegister(0x00) self.DEVID_MST = self.SPIreadOneRegister(0x01) self.PART_ID = self.SPIreadOneRegister(0x02) self.REV_ID = self.SPIreadOneRegister(0x03) self.STATUS = self.getStatus()
def __init__(self, csPIN, ISRPin, ActivityThreshold=70, ActivityTime=5, InactivityThreshold=1000, InactivityTime=5): self.steps = 0 ADXL362.__init__(self, csPIN) self.isrpin = Pin(ISRPin, mode=Pin.IN) self.isrpin.callback(trigger=Pin.IRQ_RISING, handler=self._isr_handler) self.setupDCActivityInterrupt(ActivityThreshold, ActivityTime) self.setupDCInactivityInterrupt(InactivityThreshold, InactivityTime) #ACTIVITY/INACTIVITY CONTROL REGISTER self.SPIwriteOneRegister(0x27, 0B111111) #B111111 Link/Loop:11 (Loop Mode), Inactive Ref Mode:1,Inactive Enable (under threshold):1,Active Ref Mode:1, Active Enable (over threshold):1 #MAP INTERRUPT 1 REGISTER. self.SPIwriteOneRegister(0x2A,0B10010000) #B00010000 0=HIGH on ISR,AWAKE:0,INACT:0,ACT:1,FIFO_OVERRUN:0,FIFO_WATERMARK:0,FIFO_READY:0,DATA_READY:0 #POWER CONTROL REGISTER self.SPIwriteOneRegister(0x2D, 0B1110) #B1011 Wake-up:1,Autosleep:0,measure:11=Reserved (Maybe works better) self.beginMeasure() time.sleep_ms(100)
def send_cmd(self, cmd_request, response_size=6, read_delay_ms=100): """ Send a command to the sensor and read (optionally) the response The responsed data is validated by CRC """ try: self.i2c.start(); self.i2c.writeto(self.i2c_addr, cmd_request); if not response_size: self.i2c.stop(); return time.sleep_ms(read_delay_ms) data = self.i2c.readfrom(self.i2c_addr, response_size) self.i2c.stop(); for i in range(response_size//3): if not self._check_crc(data[i*3:(i+1)*3]): # pos 2 and 5 are CRC raise SHT30Error(SHT30Error.CRC_ERROR) if data == bytearray(response_size): raise SHT30Error(SHT30Error.DATA_ERROR) return data except OSError as ex: if 'I2C' in ex.args[0]: raise SHT30Error(SHT30Error.BUS_ERROR) raise ex
def run(func): result = func() for wait in result: time.sleep_ms(wait)
def _send(self, data, send_anyway=False): if self._tx_count < MAX_MSG_PER_SEC or send_anyway: retries = 0 while retries <= MAX_TX_RETRIES: try: self.conn.send(data) self._tx_count += 1 break except socket.error as er: if er.args[0] != EAGAIN: raise else: time.sleep_ms(RE_TX_DELAY) retries += 1
def read(self, channel): self._write_register(_REGISTER_CONFIG, _CQUE_NONE | _CLAT_NONLAT | _CPOL_ACTVLOW | _CMODE_TRAD | _DR_1600SPS | _MODE_SINGLE | _OS_SINGLE | _GAINS[self.gain] | _CHANNELS[channel]) while not self._read_register(_REGISTER_CONFIG) & _OS_NOTBUSY: time.sleep_ms(1) return self._read_register(_REGISTER_CONVERT)
def diff(self, channel1, channel2): self._write_register(_REGISTER_CONFIG, _CQUE_NONE | _CLAT_NONLAT | _CPOL_ACTVLOW | _CMODE_TRAD | _DR_1600SPS | _MODE_SINGLE | _OS_SINGLE | _GAINS[self.gain] | _DIFFS[(channel1, channel2)]) while not self._read_register(_REGISTER_CONFIG) & _OS_NOTBUSY: time.sleep_ms(1) return self._read_register(_REGISTER_CONVERT)
def blink(n): for i in range(n): led.value(0) time.sleep_ms(100) led.value(1) time.sleep_ms(100) #if machine.reset_cause() == machine.SOFT_RESET: # print ("Soft reset, doing nothing") # print('Connected!! network config:', wifi.wlan.ifconfig())
def vpage(self,delay=10): show = False self.write_cmd(SET_START_LINE | 0) for x in range(self.height-1): self.write_cmd(SET_START_LINE | x) if not show: self.show() show = True time.sleep_ms(delay) self.fill(0)
def reset(self, res): res.value(1) time.sleep_ms(1) res.value(0) time.sleep_ms(20) res.value(1) time.sleep_ms(20)
def calibrate(self, samples=300): max_val = 0 for _ in range(samples): val = self.pin() if val > max_val: max_val = val time.sleep_ms(10) self.threshold = max_val * 1.2
def __init__(self, i2c=None, sda='P22', scl='P21'): if i2c is not None: self.i2c = i2c else: self.i2c = I2C(0, mode=I2C.MASTER, pins=(sda, scl)) self.sda = sda self.scl = scl self.clk_cal_factor = 1 self.reg = bytearray(6) self.wake_int = False self.wake_int_pin = False self.wake_int_pin_rising_edge = True try: self.read_fw_version() except Exception: time.sleep_ms(2) try: # init the ADC for the battery measurements self.poke_memory(ANSELC_ADDR, 1 << 2) self.poke_memory(ADCON0_ADDR, (0x06 << _ADCON0_CHS_POSN) | _ADCON0_ADON_MASK) self.poke_memory(ADCON1_ADDR, (0x06 << _ADCON1_ADCS_POSN)) # enable the pull-up on RA3 self.poke_memory(WPUA_ADDR, (1 << 3)) # make RC5 an input self.set_bits_in_memory(TRISC_ADDR, 1 << 5) # set RC6 and RC7 as outputs and enable power to the sensors and the GPS self.mask_bits_in_memory(TRISC_ADDR, ~(1 << 6)) self.mask_bits_in_memory(TRISC_ADDR, ~(1 << 7)) if self.read_fw_version() < 6: raise ValueError('Firmware out of date') except Exception: raise Exception('Board not detected')
def activity(self): if not self.debounced: time.sleep_ms(self.act_dur) self.debounced = True if self.int_pin(): return True return False
def pulse(led, delay): for i in range(200): led.duty(int(math.sin(i/10*math.pi)*500 + 500)) time.sleep_ms(delay)
def pulse(led, delay): for i in range(20): led.duty(int(math.sin(i/10*math.pi)*500 + 500)) time.sleep_ms(delay)
def sleep_ms(self, mseconds): try: time.sleep_ms(mseconds) except AttributeError: machine.delay(mseconds)
def power_off(self): self.clear() self.command([0x20, 0x08]) # 0x20 - basic instruction set # 0x08 - set display to blank (doesn't delete contents) self.sleep_ms(10) if self.pwr: self.pwr.value(0) # turn off power
def beacon(): fader.stop() for repeat in range(0, 10): for pos in range(0, NEOPIXEL_COUNT): for i in range(0, NEOPIXEL_COUNT): if i == pos: np[i] = (249, 72, 119) else: np[i] = (0, 0, 0) np.write() time.sleep_ms(100) fader.start()
def read(self, raw=False): was_active = self.active() self.active(True) while not self._valid(): time.sleep_ms(int(self._integration_time + 0.9)) data = tuple(self._register16(register) for register in ( _REGISTER_RDATA, _REGISTER_GDATA, _REGISTER_BDATA, _REGISTER_CDATA, )) self.active(was_active) if raw: return data return self._temperature_and_lux(data)
def poweron(self): self.res(1) time.sleep_ms(1) self.res(0) time.sleep_ms(10) self.res(1)
def run(updates=5, sleepms=1, poll_rate_inputs=4, poll_rate_network=10): # updates: send out a status every this amount of seconds. # If 0, never send time based status updates only when change happened. # sleepms: going o sleep for how long between each loop run # poll_rate_network: how often to evaluate incoming network commands # poll_rate_inputs: how often to evaluate inputs _init_mqtt() if updates == 0: overflow = 1000 else: overflow = updates * 1000 overflow *= poll_rate_network * poll_rate_inputs / sleepms overflow = int(overflow) # print("Overflow:",overflow) counter = 0 while True: if counter % poll_rate_network == 0: _poll_subscripton() if counter % poll_rate_inputs == 0: device_list = [] for d in _devlist.values(): if d.update(): device_list.append(d) if len(device_list) > 0: _publish_status(device_list) if updates != 0 and counter % (updates * 1000) == 0: print("Publishing full update.") _publish_status(ignore_time=True) # execute things on timestack now = time.ticks_ms() for id in list(_timestack): t, cb = _timestack[id] if time.ticks_diff(now, t) >= 0: del (_timestack[id]) cb(id) time.sleep_ms(sleepms) counter += 1 if counter >= overflow: counter = 0
def acquire_out_buffer(self): while self.out_buffer_lock == True: time.sleep_ms(1) # Wait for release self.irqstate=machine.disable_irq() if self.out_buffer_lock == True: # TODO: check if this locking is enough machine.enable_irq(self.irqstate) return False self.out_buffer_lock=True return True
def sleep_ms(t): time.sleep(t/1000)
def receive(self, request=0, timeoutms=None): # receive into network buffer, # fill buffer once and decrypt # if request>0 wait blocking for request number of bytes (if timeout # given interrupt after timeoutms ms) # timeout==None: block # timeout==0: return immediately after trying to read something from # network buffer # timeout>0: try for time specified to read something before returning # this function always returns a pointer to the buffer and # number of bytes read (could be 0) data = self.netbuf_in data_mv = self.netbuf_in_mv readbytes = 0 start_t = ticks_ms() while readbytes < self.netbuf_size: try: if self.sock_read(data_mv[readbytes:readbytes+1]): readbytes += 1 # result was not 0 or none else: if readbytes >= request: break # break if not blocking to request size except OSError as e: if len(e.args) > 0 \ and (e.args[0] == errno.EAGAIN or e.args[0] == errno.ETIMEDOUT): if readbytes >= request: break # break if not blocking to request size else: raise if timeoutms is not None \ and ticks_diff(ticks_ms(), start_t) >= timeoutms: break sleep_ms(1) # prevent busy waiting if readbytes>0: self.crypt_in.decrypt(data,length=readbytes) return data,readbytes
def sub_cb(topic, msg): if lcd.cursor_y * lcd.num_columns + lcd.cursor_x + len(str(msg)) > (lcd.num_lines * lcd.num_columns): lcd.clear() lcd.move_to(0,0) lcd.putstr("Puffer[" + str(topic, "utf-8").split("/")[-1] + "]:" + str(msg, "utf-8") + "\n") time.sleep_ms(3000)
def main(): c.set_callback(sub_cb) c.connect() #c.subscribe(CLIENT_ID + "/display") c.subscribe("esp8266-35cacf00/temperature/0") c.subscribe("esp8266-35cacf00/temperature/1") c.subscribe("esp8266-35cacf00/temperature/2") while True: c.check_msg() time.sleep_ms(500) c.disconnect()
def read_temps(): """Read DS18B20's.""" dat = machine.Pin(PIN_1W) ds = ds18x20.DS18X20(onewire.OneWire(dat)) ds.convert_temp() time.sleep_ms(750) return {rom_to_hex(rom): ds.read_temp(rom) for rom in ds.scan()}
def _read(self): was_active = self.active() self.active(True) if not was_active: # if the sensor was off, wait for measurement time.sleep_ms(_INTEGRATION_TIME[self._integration_time][1]) broadband = self._register16(_REGISTER_CHANNEL0) ir = self._register16(_REGISTER_CHANNEL1) self.active(was_active) return broadband, ir
def power_off(self): self.clear() self.set_power(self.POWER_DOWN) self.sleep_ms(10) if self.pwr: self.pwr.value(0) # turn off power
def led_state(): p2 = Pin(2, Pin.OUT) p2.value(0) time.sleep_ms(500) p2.value(1) time.sleep_ms(500) p2.value(0) time.sleep_ms(500) p2.value(1)