我有一个覆盆子pizero W,它通过GPIO引脚连接到流量计,通过USB连接到条形码扫描仪。我有python脚本,当检测到GPIO输入时,该脚本使用回调函数来发出警报。该python脚本需要在pizero上连续运行,以便识别何时激活流量计并处理输入。
问题是我还有一个通过USB连接到pizero的条形码扫描仪。我希望pizero也可以识别何时扫描条形码并处理该输入。
然后,pizero应该发送一条消息,其中包含来自流量计的信息和来自条形码扫描仪的信息。
有没有办法在相同的python脚本中执行此操作?我如何才能同时从两个输入进行pizero侦听和处理?将其分成两个不同的脚本是否更容易实现,如果可以,我是否可以同时运行它们,并以某种方式统一它们在第三个连续运行的脚本中提供的信息?
谢谢!
每个注释的一些说明(感谢您的输入):
GPIO 17
该脚本需要在系统启动时运行。我会看一下,systemctl直到被提及之前我才听说过。
systemctl
当未连接流量计时,Pi通常会将正在扫描的条形码识别为键盘输入(即,一系列数字后跟换行符)。
当我发送包含流量计和条形码信息的消息时,我需要从python发送一个JSON对象,该对象既包含信息,又包含接收信息的时间戳。
该JSON对象将通过wifi发送到与pizero相同的家庭网络上具有静态ip的树莓派服务器。raspberry pi服务器可以访问Django数据库,该数据库应将JSON对象信息合并到数据库中。
更新的答案
我为条形码阅读器添加了一些代码。我这样做是为了使条形码读取器花费可变的时间,最多花费5秒来读取数据,而流量计花费恒定的0.5秒,因此您可以看到不同的线程以彼此独立的速率进行进程。
#!/usr/bin/env python3 from threading import Lock import threading import time from random import seed from random import random # Dummy function to read SPI as I don't have anything attached def readSPI(): # Take 0.5s to read time.sleep(0.5) readSPI.static += 1 return readSPI.static readSPI.static=0 class FlowMeter(threading.Thread): def __init__(self): super(FlowMeter, self).__init__() # Create a mutex self.mutex = Lock() self.currentReading = 0 def run(self): # Continuously read flowmeter and safely update self.currentReading while True: value = readSPI() self.mutex.acquire() self.currentReading = value self.mutex.release() def read(self): # Main calls this to get latest reading, we just grab it from internal variable self.mutex.acquire() value = self.currentReading self.mutex.release() return value # Dummy function to read Barcode as I don't have anything attached def readBarcode(): # Take variable time, 0..5 seconds, to read time.sleep(random()*5) result = "BC" + str(int(random()*1000)) return result class BarcodeReader(threading.Thread): def __init__(self): super(BarcodeReader, self).__init__() # Create a mutex self.mutex = Lock() self.currentReading = 0 def run(self): # Continuously read barcode and safely update self.currentReading while True: value = readBarcode() self.mutex.acquire() self.currentReading = value self.mutex.release() def read(self): # Main calls this to get latest reading, we just grab it from internal variable self.mutex.acquire() value = self.currentReading self.mutex.release() return value if __name__ == '__main__': # Generate repeatable random numbers seed(42) # Instantiate and start flow meter manager thread fmThread = FlowMeter() fmThread.daemon = True fmThread.start() # Instantiate and start barcode reader thread bcThread = BarcodeReader() bcThread.daemon = True bcThread.start() # Now you can do other things in main, but always get access to latest readings for i in range(20): fmReading = fmThread.read() bcReading = bcThread.read() print(f"Main: i = {i} FlowMeter reading = {fmReading}, Barcode={bcReading}") time.sleep(1)
样本输出
Main: i = 0 FlowMeter reading = 0, Barcode=0 Main: i = 1 FlowMeter reading = 1, Barcode=0 Main: i = 2 FlowMeter reading = 3, Barcode=0 Main: i = 3 FlowMeter reading = 5, Barcode=0 Main: i = 4 FlowMeter reading = 7, Barcode=BC25 Main: i = 5 FlowMeter reading = 9, Barcode=BC223 Main: i = 6 FlowMeter reading = 11, Barcode=BC223 Main: i = 7 FlowMeter reading = 13, Barcode=BC223 Main: i = 8 FlowMeter reading = 15, Barcode=BC223 Main: i = 9 FlowMeter reading = 17, Barcode=BC676 Main: i = 10 FlowMeter reading = 19, Barcode=BC676 Main: i = 11 FlowMeter reading = 21, Barcode=BC676 Main: i = 12 FlowMeter reading = 23, Barcode=BC676 Main: i = 13 FlowMeter reading = 25, Barcode=BC86 Main: i = 14 FlowMeter reading = 27, Barcode=BC86 Main: i = 15 FlowMeter reading = 29, Barcode=BC29 Main: i = 16 FlowMeter reading = 31, Barcode=BC505 Main: i = 17 FlowMeter reading = 33, Barcode=BC198 Main: i = 18 FlowMeter reading = 35, Barcode=BC198 Main: i = 19 FlowMeter reading = 37, Barcode=BC198
原始答案
我建议您研究一下systemd并systemctl在每次系统启动时启动您的应用程序-这里的示例。
systemd
至于同时监视两件事,我建议您使用Python的 线程 模块。这是一个简单的示例,我创建了一个子类threading,通过不断读取该对象并将其当前值保存在主程序可以随时读取的变量中来管理您的流量计。您可以启动另一个类似的程序来管理您的条形码读取器,然后并行运行它们。我不想这样做,并且会使您的代码倍增。
threading
#!/usr/bin/env python3 from threading import Lock import threading import time # Dummy function to read SPI as I don't have anything attached def readSPI(): readSPI.static += 1 return readSPI.static readSPI.static=0 class FlowMeter(threading.Thread): def __init__(self): super(FlowMeter, self).__init__() # Create a mutex self.mutex = Lock() self.currentReading = 0 def run(self): # Continuously read flowmeter and safely update self.currentReading while True: value = readSPI() self.mutex.acquire() self.currentReading = value self.mutex.release() time.sleep(0.01) def read(self): # Main calls this to get latest reading, we just grab it from internal variable self.mutex.acquire() value = self.currentReading self.mutex.release() return value if __name__ == '__main__': # Instantiate and start flow meter manager thread fmThread = FlowMeter() fmThread.start() # Now you can do other things in main, but always get access to latest reading for i in range(100000): fmReading = fmThread.read() print(f"Main: i = {i} FlowMeter reading = {fmReading}") time.sleep(1)
您可以考虑使用logging来协调和统一调试和日志记录消息-参见此处。
logging
您可以看一下events让其他线程知道某些事情达到临界水平时需要做的事情- 例如此处。
events