我们从Python开源项目中,提取了以下2个代码示例,用于说明如何使用gevent.subprocess()。
def evaluate_output(task_name, input_path, output_path): """ Given an input of a task, evaluate the correctness of the output :param task_name: Name of the task :param input_path: Path to the user's input file :param output_path: Path to the user's output file :return: The stdout of the checker """ try: # call the checker and store the output output = gevent.subprocess.check_output([ ContestManager.tasks[task_name]["checker"], StorageManager.get_absolute_path(input_path), StorageManager.get_absolute_path(output_path) ]) except: # TODO log the stdout and stderr of the checker Logger.error( "TASK", "Error while evaluating output %s " "for task %s, with input %s: %s" % (output_path, task_name, input_path, traceback.format_exc()) ) raise Logger.info("TASK", "Evaluated output %s for task %s, with input %s" % (output_path, task_name, input_path)) return output
def test_drivenmatlabagent(volttron_instance1): print("** Setting up test_drivenagent module **") wrapper = volttron_instance1 #Write config files for master driver process = Popen(['python', 'config_builder.py', '--count=1', '--publish-only-depth-all', '--campus=fakecampus', '--building=fakebuilding', '--interval=5', '--config-dir=../../applications/pnnl/FakeDrivenMatlabAgent/tests', 'fake', '../../applications/pnnl/FakeDrivenMatlabAgent/tests/test_fake.csv', 'null'], env=volttron_instance1.env, cwd='scripts/scalability-testing', stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = process.wait() print result assert result == 0 #Actuator Agent agent_uuid = volttron_instance1.install_agent( agent_dir="services/core/ActuatorAgent", config_file="services/core/ActuatorAgent/actuator-deploy.service", start=True) print("agent id: ", agent_uuid) assert agent_uuid actuator_agent = wrapper.build_agent() #Driven Matlab Agent agent_uuid = volttron_instance1.install_agent( agent_dir="applications/pnnl/FakeDrivenMatlabAgent", config_file=config_wh, start=True) print("agent id: ", agent_uuid) assert agent_uuid driven_agent = wrapper.build_agent() #Fake Master Driver agent_uuid = volttron_instance1.install_agent( agent_dir="services/core/MasterDriverAgent", config_file="applications/pnnl/FakeDrivenMatlabAgent/tests/master-driver.agent", start=True) print("agent id: ", agent_uuid) assert agent_uuid driver_agent = wrapper.build_agent() gevent.sleep(5) path = 'fakecampus/fakebuilding/fakedriver0/HPWH_Phy0_PowerState' value = driven_agent.vip.rpc.call('platform.actuator', 'get_point', path).get() print('The set point value is '+str(value)) assert value == 1 path = 'fakecampus/fakebuilding/fakedriver0/ERWH_Phy0_ValveState' value = driven_agent.vip.rpc.call('platform.actuator', 'get_point', path).get() print('The set point value is '+str(value)) assert value == 1