Python signal 模块,pause() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用signal.pause()

项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def begin(queue, event=None):
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)

  if sys.platform == 'win32':
    win.SetConsoleCtrlHandler(win_handler,1)

  load(queue)
  queue.put('CONNECTED')
  start()

  try:
    # linux signal handling
    while True:
      signal.pause()
  except AttributeError:
    # signal.pause() not implemented on windows
    while not event.is_set():
      time.sleep(1)
    print('event was set in bci, stopping')
    stop(queue)
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def begin(queue, event=None):
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)

  load(queue)
  start()

  try:
    while True:
      signal.pause()
  except AttributeError:
    # signal.pause() not implemented on windows
   # while not event.is_set():
    while not event:
      time.sleep(1)

    print('event was set, stopping')
    stop()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
项目:python_gray_8_9_11_12    作者:3xp10it    | 项目源码 | 文件源码
def do_everything (self):
        if "pause" in self.path:
            self.session.pause_flag = True

        if "resume" in self.path:
            self.session.pause_flag = False

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        if "view_crash" in self.path:
            response = self.view_crash(self.path)
        elif "view_pcap" in self.path:
            response = self.view_pcap(self.path)
        else:
            response = self.view_index()

        self.wfile.write(response)
项目:aiogrpc    作者:hubo1016    | 项目源码 | 文件源码
def main(listen_addrs=['127.0.0.1:9901']):
    s = create_server(listen_addrs)
    print("Server created on", listen_addrs)
    s.start()
    print("Server started")
    import signal
    old1 = signal.signal(signal.SIGINT, signal_handler)
    old2 = signal.signal(signal.SIGTERM, signal_handler)
    import time
    # signal.pause is not valid in windows
    try:
        while True:
            time.sleep(3600 * 24)
    except QuitException:
        print("Quit server")
        shutdown_event = s.stop(5)
        shutdown_event.wait()
    finally:
        signal.signal(signal.SIGINT, old1)
        signal.signal(signal.SIGTERM, old2)
项目:picraftzero    作者:WayneKeenan    | 项目源码 | 文件源码
def stop(self):
        """Spops all animation"""
        if self.fading:
            self.fader.stop()
            self.fading = False

        if self.pulsing:
            self.pulsing = False
            self.pulser.pause()

        if self.blinking:
            self.blinking = False

        #self.gpio_pwm.stop()
        #time.sleep(0.01)
        if self._value:
            self.duty_cycle(100)
        else:
            self.duty_cycle(0)

        #GPIO.output(self.pin, self._value)

        return True
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def main():
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)
  load(queue=None)
  start()

  signal.pause()
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def main():
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)
  load(queue=None)

  # Required message for subprocess comms
  print('CONNECTED')

  start()

  signal.pause()
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def begin(queue):
  signal.signal(signal.SIGTERM, sigterm_handler)

  load(queue)
  queue.put('CONNECTED')
  start()

  signal.pause()
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def begin(queue, event=None):
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)

  if sys.platform == 'win32':
    win.SetConsoleCtrlHandler(win_handler, 1)

  load(queue)
  start()

  while True:
    try:
      signal.pause()
    except AttributeError:
    # signal.pause() not implemented on windows
      while not event.is_set():
        time.sleep(1)
    print('event was set in graphing utility, stopping')
    stop()
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def _graph_lsl(self):
    print('checking if stream has be initialized')
    self.streams = resolve_byprop('name', 'bci', timeout=2.5)
    try:
      self.inlet = StreamInlet(self.streams[0])
    except IndexError:
      raise ValueError('Make sure stream name=bci is opened first.')
    while self.running:
      # initial run
      self.sample, self.timestamp = self.inlet.pull_sample(timeout=5)
    # time correction to sync to local_clock()
      try:
        if self.timestamp is not None and self.sample is not None:
          self.timestamp = self.timestamp + self.inlet.time_correction(timeout=5) 

      except TimeoutError:
        pass
      self.SecondTimes.append(self.sample[1])                         #add time stamps to array 'timeValSeconds'
      self.ProcessedSig.append(self.sample[0])                           #add processed signal values to 'processedSig'

      self.count = self.count + 1

      if((self.count % 20 == 0) and (self.count != 0)):   #every 20 samples (ie ~ 0.10 s) is when plot updates
    self.lineHandle[0].set_ydata(self.ProcessedSig)
    self.lineHandle[0].set_xdata(self.SecondTimes)
    #plt.xlim(0, 5)
    plt.xlim(self.SecondTimes[0], self.SecondTimes[-1])
    plt.ylim(0, 10)
    plt.pause(0.01)

      if(self.count >= 399): 
        self.ProcessedSig.pop(0)    
        self.SecondTimes.pop(0)

    plt.pause(0.01)
    print('closing graphing utility')
    self.inlet.close_stream()
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def main():
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)
  load(queue=None)
  start()

  try:
    signal.pause()
  except AttributeError:
    while True:
      time.sleep(1)

  stop()
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def main():
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)
  load(queue=None)
  start()

  try:
    signal.pause()
  except AttributeError:
    while True:
      time.sleep(1)

  stop()
项目:pymotw3    作者:reingart    | 项目源码 | 文件源码
def wait_for_signal():
    print('Waiting for signal in',
          threading.currentThread().name)
    signal.pause()
    print('Done waiting')

# Start a thread that will not receive the signal
项目:argosd    作者:danielkoster    | 项目源码 | 文件源码
def run(self):
        """Starts all runners and processes."""
        logging.info('ArgosD starting')

        self._create_database()

        logging.info('Starting taskscheduler')
        self.taskscheduler.run()

        logging.info('Starting taskrunner')
        self.taskrunner.run()

        logging.info('Starting API')
        self.api.run()

        if settings.TELEGRAM_BOT_TOKEN:
            logging.info('Starting telegrambot')
            self.bot.run()

        # Stop everything when a SIGTERM is received
        signal.signal(signal.SIGTERM, self._handle_signal)

        logging.info('ArgosD running')

        # Wait for a signal. This causes our main thread to remain alive,
        # which is needed to properly process any signals.
        signal.pause()
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def handle_noargs(self, **options):
        self.runbuild()

        signal.signal(signal.SIGUSR1, lambda sig, frame: None)

        while True:
            signal.pause()
            self.runbuild()
项目:orangepi-radio    作者:thk4711    | 项目源码 | 文件源码
def signal_term_handler(signal, frame):
        print('Exiting ...')
        sys.exit(0)

#-----------------------------------------------------------------#
#              main program                                       #
#-----------------------------------------------------------------#
#signal.pause()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_signals(self):
        # Test signal handling semantics of threads.
        # We spawn a thread, have the thread send two signals, and
        # wait for it to finish. Check that we got both signals
        # and that they were run by the main thread.
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864, unknown if this affects earlier versions of freebsd also
项目:RocketTM    作者:kianxineki    | 项目源码 | 文件源码
def main():
    # start basicevents
    default_handler = signal.getsignal(signal.SIGINT)
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    run(finish_tasks.is_set)
    list_process = []
    for queue in settings.queues:
        for x in range(queue['concurrency']):
            p = Worker(event_kill, ip=settings.RABBITMQ_IP, **queue)
            logging.info("start process worker: %s queue: %s" % (p,
                                                                 queue))
            list_process.append(p)
            p.start()

    signal.signal(signal.SIGINT, default_handler)

    try:
        signal.pause()
    except:
        print("init stop")
        event_kill.set()

    for x in list_process:
        x.join()
    print("finish tasks")
    finish_tasks.set()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if test_support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if test_support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_signals(self):
        # Test signal handling semantics of threads.
        # We spawn a thread, have the thread send two signals, and
        # wait for it to finish. Check that we got both signals
        # and that they were run by the main thread.
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        signal.pause()
        self.assertEqual(self.hndl_called, True)

    # Issue 3864, unknown if this affects earlier versions of freebsd also
项目:karton    作者:karton    | 项目源码 | 文件源码
def main():
    pid = os.getpid()
    print('Started with PID %d' % pid)

    signal.signal(signal.SIGINT, signal.SIG_IGN)
    signal.signal(signal.SIGTERM, terminate)

    with open('/tmp/karton-session-runner.pid', 'w') as pid_file:
        pid_file.write(str(pid))

    while True:
        signal.pause()
        print('Got a signal, continuing.')
项目:CodeLabs    作者:TheIoTLearningInitiative    | 项目源码 | 文件源码
def main():

    try:

        mq2 = threading.Thread(name='fMq2', target=fMq2)
        mq2.setDaemon(True)
        mq5 = threading.Thread(name='fMq5', target=fMq5)
        mq5.setDaemon(True)
        mq2.start()
        mq5.start()
        signal.pause()

    except(KeyboardInterrupt, SystemExit):

        print 'Exiting program'
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if test_support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_signals(self):
        # Test signal handling semantics of threads.
        # We spawn a thread, have the thread send two signals, and
        # wait for it to finish. Check that we got both signals
        # and that they were run by the main thread.
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        signal.pause()
        self.assertEqual(self.hndl_called, True)

    # Issue 3864, unknown if this affects earlier versions of freebsd also
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if test_support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def _subproc_pre():
    os.setpgrp()
    signal.signal(signal.SIGTSTP, lambda n, f: signal.pause())
项目:sparkly    作者:Tubular    | 项目源码 | 文件源码
def __init__(self, additional_options=None):
        os.environ['PYSPARK_PYTHON'] = sys.executable
        submit_args = [
            self._setup_repositories(),
            self._setup_packages(),
            self._setup_jars(),
            'pyspark-shell',
        ]
        os.environ['PYSPARK_SUBMIT_ARGS'] = ' '.join(filter(None, submit_args))

        def _create_spark_context():
            spark_conf = SparkConf()
            spark_conf.set('spark.sql.catalogImplementation', 'hive')
            spark_conf.setAll(self._setup_options(additional_options))
            return SparkContext(conf=spark_conf)

        # If we are in instant testing mode
        if InstantTesting.is_activated():
            spark_context = InstantTesting.get_context()

            # It's the first run, so we have to create context and demonise the process.
            if spark_context is None:
                spark_context = _create_spark_context()
                if os.fork() == 0:  # Detached process.
                    signal.pause()
                else:
                    InstantTesting.set_context(spark_context)
        else:
            spark_context = _create_spark_context()

        # Init HiveContext
        super(SparklySession, self).__init__(spark_context)
        self._setup_udfs()

        self.read_ext = SparklyReader(self)
        self.catalog_ext = SparklyCatalog(self)

        attach_writer_to_dataframe()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_signals(self):
        # Test signal handling semantics of threads.
        # We spawn a thread, have the thread send two signals, and
        # wait for it to finish. Check that we got both signals
        # and that they were run by the main thread.
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        signal.pause()
        self.assertEqual(self.hndl_called, True)

    # Issue 3864, unknown if this affects earlier versions of freebsd also
项目:Sample-Code    作者:meigrafd    | 项目源码 | 文件源码
def Interrupt_event(pin):
    # only for debug:
    if GPIO.input(pin) == GPIO.HIGH:
        print "rising edge on %s" % pin
    elif GPIO.input(pin) == GPIO.LOW:
        print "falling edge on %s" % pin

    if pin == Play:
        if GPIO.input(Play) == GPIO.HIGH:
            globalVar['timeTrigger'] = time()
        elif GPIO.input(Play) == GPIO.LOW:
            globalVar['timeTrigger'] = time() - globalVar['timeTrigger']
            if globalVar['timeTrigger'] >= SpecialTime:
                #special function: play another playlist
                print "special play"
            else: #normal play
                print "normal play"
                client.volume = 10
                client.play()

    elif GPIO.input(Pause) == GPIO.HIGH:
        client.pause()

    elif GPIO.input(Lauter) == GPIO.HIGH:
        client.volume += 2

    elif GPIO.input(Leiser) == GPIO.HIGH:
        client.volume -=2

    elif GPIO.input(Next) == GPIO.HIGH:
        client.next()

    elif GPIO.input(Prev) == GPIO.HIGH:
        client.previous()

    else:
        print "ERROR! Unknown GPIO pin triggered: %s" % pin

#------------------------------------------------------------------------
项目:Sample-Code    作者:meigrafd    | 项目源码 | 文件源码
def main():
    try:
        with Button(21, hold_time=0) as button:
            button.when_held = lambda: count_time(button)
            signal.pause()
    except KeyboardInterrupt:
        pass
项目:Sample-Code    作者:meigrafd    | 项目源码 | 文件源码
def main():
    try:
        GPIO.add_event_detect(PIR, GPIO.RISING, callback=interrupt_event, bouncetime=100)
        #keep script running
        signal.pause()
    except (KeyboardInterrupt, SystemExit):
        print "Quit"
项目:Sample-Code    作者:meigrafd    | 项目源码 | 文件源码
def Interrupt_event(pin):
    _ping(client)
    PlayStat = client.status()
    if GPIO.input(Play) == GPIO.HIGH:
        if PlayStat['state'] == "stop" or PlayStat['state'] == "pause":
            client.load("MyRadio")
            client.play()
        elif PlayStat['state'] == "play":
            client.stop()
            client.clear()
    elif GPIO.input(Lauter) == GPIO.HIGH:
        vol = int(PlayStat['volume'])
        if vol >= 95:
            client.setvol(100)
        else:
            SetVol = vol + 5
            client.setvol(SetVol) 
    elif GPIO.input(Leiser) == GPIO.HIGH:
        vol = int(PlayStat['volume'])
        if vol <= 5:
            client.setvol(0)
        else:
            SetVol = vol - 5
            client.setvol(SetVol) 
    elif GPIO.input(Next) == GPIO.HIGH:
        client.next()
        client.play()
    elif GPIO.input(Prev) == GPIO.HIGH:
        client.previous()
        client.play()
    else:
        print "ERROR! Unknown GPIO pin triggered: %s" % pin

#------------------------------------------------------------------------
项目:Sample-Code    作者:meigrafd    | 项目源码 | 文件源码
def main():
    try:
        GPIO.add_event_detect(PIR_PIN, GPIO.RISING, callback=interrupt_event, bouncetime=100)
        #keep script running
        signal.pause()
    except KeyboardInterrupt: # does not work if it runs in background.
        print "Quit"
项目:Sample-Code    作者:meigrafd    | 项目源码 | 文件源码
def main():
    try:
        GPIO.add_event_detect(PIR_PIN, GPIO.BOTH, callback=interrupt_event, bouncetime=100)
        #keep script running
        signal.pause()
    except KeyboardInterrupt: # does not work if it runs in background.
        print "\nQuit"
项目:python_gray_8_9_11_12    作者:3xp10it    | 项目源码 | 文件源码
def pause (self):
        '''
        If thet pause flag is raised, enter an endless loop until it is lowered.
        '''

        while 1:
            if self.pause_flag:
                time.sleep(1)
            else:
                break


    ####################################################################################################################
项目:restfulpy    作者:Carrene    | 项目源码 | 文件源码
def launch(self):

        from restfulpy.taskqueue import worker

        signal.signal(signal.SIGINT, self.kill_signal_handler)
        signal.signal(signal.SIGTERM, self.kill_signal_handler)

        if not self.args.status:
            self.args.status = {'new'}

        if self.args.gap is not None:
            settings.worker.merge({'gap': self.args.gap})

        print('The following task types would be processed with gap of %ds:' % settings.worker.gap)
        print('Tracking task status(es): %s' % ','.join(self.args.status))

        number_of_threads = self.args.number_of_threads or settings.worker.number_of_threads
        for i in range(number_of_threads):
            t = threading.Thread(
                    target=worker,
                    name='restfulpy-worker-thread-%s' % i,
                    daemon=True,
                    kwargs=dict(
                        statuses=self.args.status,
                        filters=self.args.filter
                    )
                )
            t.start()

        print('Worker started with %d threads' % number_of_threads)
        print('Press Ctrl+C to terminate worker')
        signal.pause()

    # noinspection PyUnusedLocal
项目:kobo    作者:release-engineering    | 项目源码 | 文件源码
def wait(self, subtasks=None):
        """Wait until subtasks finish.

        subtasks = None - wait for all subtasks
        subtasks = [task_id list] - wait for selected subtasks
        """

        if self.foreground:
            # wait would call signal.pause() in the *main* worker thread and lock program forever
            raise RuntimeError("Foreground tasks can't wait on subtasks.")

        if subtasks is not None:
            subtasks = force_list(subtasks)

        self.hub.worker.wait(self.task_id, subtasks)

        finished = []
        while True:
            (finished, unfinished) = self.hub.worker.check_wait(self.task_id)

            if len(unfinished) == 0:
                # all done
                break

            # sleep
            signal.pause()
            # wake up on signal to check the status

        # remove finished subtasks from the list, check results
        fail = False
        for i in finished:
            state = self.hub.worker.get_task(i)
            if state['state'] != TASK_STATES['CLOSED']:
                fail = True
            self._subtask_list.remove(i)

        if fail:
            print("Failing because of at least one subtask hasn't closed properly.")
            self.fail()

        return finished
项目:picraftzero    作者:WayneKeenan    | 项目源码 | 文件源码
def pause(self):
        self._paused = True
项目:picraftzero    作者:WayneKeenan    | 项目源码 | 文件源码
def pause():
    signal.pause()
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def _graph_lsl(self):
    while self.running:
      # initial run
      self.sample, self.timestamp = self.inlet.pull_sample(timeout=5)
      #if self.timeBuffer[0] == 0.0:
       # self.timeBuffer = collections.deque([self.timestamp] * self._bufsize, self._bufsize)
    # time correction to sync to local_clock()
      try:
        if self.timestamp is not None and self.sample is not None:
          self.timestamp = self.timestamp + self.inlet.time_correction(timeout=5) 

      except TimeoutError:
        pass
      self.SecondTimes.append(self.timestamp)                         #add time stamps to array 'timeValSeconds'
      #print(abs(self.sample[3])/1000)
      self.ProcessedSig.append(abs(self.sample[3])/1000)                           #add processed signal values to 'processedSig'
      if(abs(self.sample[3]/1000) > self.maximum):
          self.maximum = abs(self.sample[3]/1000)
      if(abs(self.sample[3]/1000) < self.minimum):
          self.minimum = abs(self.sample[3]/1000)

      self.sampleCount = self.sampleCount + 1  
      self.count = self.count + 1
      #plt.show()
      if((self.count % 20 == 0) and (self.count != 0)):   #every 20 samples (ie ~ 0.2 ms) is when plot updates. Change the sample number (ie 20) to modify frequency at which plot updates
      #if(self.count == 20):
        self.count = -1
    self.lineHandle[0].set_ydata(self.ProcessedSig)
    self.lineHandle[0].set_xdata(self.SecondTimes)
    #plt.xlim(0, 5)
    plt.xlim(self.SecondTimes[0], self.SecondTimes[-1])

        plt.ylim(self.minimum - 0.75, self.maximum + 0.75)
        #plt.ylim(0, 20)
    #plt.ylim(0, 10)
        #elf.ax.set_autoscaley_on(True)
        #plt.autoscale(enable=True, axis='y', tight=True)
    plt.pause(0.01)



      if(self.sampleCount >= 511):        #shows up to 2 seconds of data (512 samples = 2s of data given a 256 Hz sampling freq by the BCI)
        self.ProcessedSig.pop(0)    
        self.SecondTimes.pop(0)

    plt.pause(0.01)
    print('closing graphing utility')
    self.inlet.close_stream()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def run_test(self):
        # Install handlers. This function runs in a sub-process, so we
        # don't worry about re-setting the default handlers.
        signal.signal(signal.SIGHUP, self.handlerA)
        signal.signal(signal.SIGUSR1, self.handlerB)
        signal.signal(signal.SIGUSR2, signal.SIG_IGN)
        signal.signal(signal.SIGALRM, signal.default_int_handler)

        # Variables the signals will modify:
        self.a_called = False
        self.b_called = False

        # Let the sub-processes know who to send signals to.
        pid = os.getpid()

        child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
        if child:
            self.wait(child)
            if not self.a_called:
                time.sleep(1)  # Give the signal time to be delivered.
        self.assertTrue(self.a_called)
        self.assertFalse(self.b_called)
        self.a_called = False

        # Make sure the signal isn't delivered while the previous
        # Popen object is being destroyed, because __del__ swallows
        # exceptions.
        del child
        try:
            child = subprocess.Popen(['kill', '-USR1', str(pid)])
            # This wait should be interrupted by the signal's exception.
            self.wait(child)
            time.sleep(1)  # Give the signal time to be delivered.
            self.fail('HandlerBCalled exception not raised')
        except HandlerBCalled:
            self.assertTrue(self.b_called)
            self.assertFalse(self.a_called)

        child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
        if child:
            self.wait(child)  # Nothing should happen.

        try:
            signal.alarm(1)
            # The race condition in pause doesn't matter in this case,
            # since alarm is going to raise a KeyboardException, which
            # will skip the call.
            signal.pause()
            # But if another signal arrives before the alarm, pause
            # may return early.
            time.sleep(1)
        except KeyboardInterrupt:
            pass
        except:
            self.fail("Some other exception woke us from pause: %s" %
                      traceback.format_exc())
        else:
            self.fail("pause returned of its own accord, and the signal"
                      " didn't arrive after another second.")

    # Issue 3864, unknown if this affects earlier versions of freebsd also