我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用rospy.DEBUG。
def setUp(self): self.namespace = rospy.get_namespace() rospy.logdebug("Initializing test_publish_to_topics in namespace:" + self.namespace) self.variables = [("air_flush_on", 1), ("air_temperature", 23), ("light_intensity_blue", 1), ("light_intensity_red", 1), ("light_intensity_white", 1), ("nutrient_flora_duo_a", 5), ("nutrient_flora_duo_b", 5), ("water_potential_hydrogen", 6) ] # self.topic_ending = ["raw", "measured", "commanded", "desired"] rospy.init_node(NAME, log_level=rospy.DEBUG)
def __init__(self, hyperparams): config = copy.deepcopy(agent_superball) config.update(hyperparams) Agent.__init__(self, config) self._sensor_types = set(self.x_data_types + self.obs_data_types) self.x0 = None self._sensor_readings = {} self._prev_sensor_readings = {} self._prev2_sensor_readings = {} if self._hyperparams['constraint']: import superball_kinematic_tool as skt self._constraint = skt.KinematicMotorConstraints( self._hyperparams['constraint_file'], **self._hyperparams['constraint_params'] ) rospy.init_node('superball_agent', disable_signals=True, log_level=rospy.DEBUG) self._state_update = False self._state_update_cv = threading.Condition() if 'state_estimator' in self._hyperparams and self._hyperparams['state_estimator']: self._state_sub = rospy.Subscriber( '/superball/state', SUPERballStateArray, callback=self._handle_state_msg, queue_size=1 ) else: self._state_sub = rospy.Subscriber( '/superball/state_sim', SUPERballStateArray, callback=self._handle_state_msg, queue_size=1 ) self._ctrl_pub = rospy.Publisher('/superball/control', String, queue_size=1) self._timestep_pub = rospy.Publisher('/superball/timestep', UInt16, queue_size=1) self._init_motor_pubs() self._compute_rel_pos = True self.reset(0)
def __init__(self, use_acc_only): self._init_obs() config = copy.deepcopy(HYPERPARAMS) self._hyperparams = config self._use_acc_only = use_acc_only if self._hyperparams['constraint']: import superball_kinematic_tool as skt self._constraint = skt.KinematicMotorConstraints( self._hyperparams['constraint_file'], **self._hyperparams['constraint_params'] ) rospy.init_node('superball_agent', disable_signals=True, log_level=rospy.DEBUG) self._obs_update_lock = threading.Lock() self._motor_pos_sub = rospy.Subscriber( '/ranging_data_matlab', Float32MultiArray, callback=self._motor_pos_cb, queue_size=1 ) self._imu_sub = [] for i in range(12): self._imu_sub.append( rospy.Subscriber( SUPERBALL_IMU_TOPICS[i], Imu, callback=self._imu_cb, queue_size=1 ) ) self._ctrl_pub = rospy.Publisher('/superball/control', String, queue_size=1) self._timestep_pub = rospy.Publisher('/superball/timestep', UInt16, queue_size=1) self._init_motor_pubs() self._run_sim = False self._sim_thread = threading.Thread(target=self._continue_simulation) self._sim_thread.daemon = True self._sim_thread.start() self._action_rate = rospy.Rate(10)
def __init__(self): super(RosCommunicationProxy, self).__init__('RosCommunicationProxy') self.log_level_map = {LOG_LEVEL_DEBUG: rospy.DEBUG, LOG_LEVEL_INFO: rospy.INFO, LOG_LEVEL_WARN: rospy.WARN, LOG_LEVEL_ERROR: rospy.ERROR, LOG_LEVEL_FATAL: rospy.FATAL}
def debug(self, msg): if LOG_LEVEL_DEBUG >= self.log_level: print(self._get_formatted_msg(msg, 'DEBUG'), file=sys.stdout)
def __init__(self): self.lock = threading.Lock() rospy.init_node("roboclaw_node",log_level=rospy.DEBUG) rospy.on_shutdown(self.shutdown) rospy.loginfo("Connecting to roboclaw") self.wheels_speeds_pub = rospy.Publisher('/motors/commanded_speeds', Wheels_speeds, queue_size=1) self.motors_currents_pub = rospy.Publisher('/motors/read_currents', Motors_currents, queue_size=1) self.address = int(rospy.get_param("~address", "128")) if self.address > 0x87 or self.address < 0x80: rospy.logfatal("Address out of range") rospy.signal_shutdown("Address out of range") # TODO need someway to check if address is correct self.MAX_ABS_LINEAR_SPEED = float(rospy.get_param("~max_abs_linear_speed", "1.0")) self.MAX_ABS_ANGULAR_SPEED = float(rospy.get_param("~max_abs_angular_speed", "1.0")) self.TICKS_PER_METER = float(rospy.get_param("~ticks_per_meter", "4342.2")) self.BASE_WIDTH = float(rospy.get_param("~base_width", "0.315")) self.ACC_LIM = float(rospy.get_param("~acc_lim", "0.1")) self.encodm = EncoderOdom(self.TICKS_PER_METER, self.BASE_WIDTH) self.last_set_speed_time = rospy.get_rostime() rospy.Subscriber("cmd_vel", Twist, self.cmd_vel_callback, queue_size=1) rospy.sleep(1) rospy.logdebug("address %d", self.address) rospy.logdebug("max_abs_linear_speed %f", self.MAX_ABS_LINEAR_SPEED) rospy.logdebug("max_abs_angular_speed %f", self.MAX_ABS_ANGULAR_SPEED) rospy.logdebug("ticks_per_meter %f", self.TICKS_PER_METER) rospy.logdebug("base_width %f", self.BASE_WIDTH)
def main(): """SDK Gripper Button Control Example Connects cuff buttons to gripper open/close commands: 'Circle' Button - open gripper 'Dash' Button - close gripper Cuff 'Squeeze' - turn on Nav lights Run this example in the background or in another terminal to be able to easily control the grippers by hand while using the robot. Can be run in parallel with other code. """ rp = RobotParams() valid_limbs = rp.get_limb_names() if not valid_limbs: rp.log_message(("Cannot detect any limb parameters on this robot. " "Exiting."), "ERROR") return if len(valid_limbs) > 1: valid_limbs.append("all_limbs") arg_fmt = argparse.RawDescriptionHelpFormatter parser = argparse.ArgumentParser(formatter_class=arg_fmt, description=main.__doc__) parser.add_argument('-g', '--gripper', dest='gripper', default=valid_limbs[0], choices=[valid_limbs], help='gripper limb to control (default: both)') parser.add_argument('-n', '--no-lights', dest='lights', action='store_false', help='do not trigger lights on cuff grasp') parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const', const=rospy.DEBUG, default=rospy.INFO, help='print debug statements') args = parser.parse_args(rospy.myargv()[1:]) rospy.init_node('sdk_gripper_cuff_control_{0}'.format(args.gripper), log_level=args.verbosity) arms = (args.gripper,) if args.gripper != 'all_limbs' else valid_limbs[:-1] grip_ctrls = [GripperConnect(arm, args.lights) for arm in arms] print("Press cuff buttons for gripper control. Spinning...") rospy.spin() print("Gripper Button Control Finished.") return 0
def __init__(self): rospy.init_node('Arduino', log_level=rospy.DEBUG) # Cleanup when termniating the node rospy.on_shutdown(self.shutdown) self.port = rospy.get_param("~port", "/dev/ttyACM0") self.baud = int(rospy.get_param("~baud", 57600)) self.timeout = rospy.get_param("~timeout", 0.5) self.base_frame = rospy.get_param("~base_frame", 'base_link') # Overall loop rate: should be faster than fastest sensor rate self.rate = int(rospy.get_param("~rate", 50)) r = rospy.Rate(self.rate) # Rate at which summary SensorState message is published. Individual sensors publish # at their own rates. self.sensorstate_rate = int(rospy.get_param("~sensorstate_rate", 10)) self.use_base_controller = rospy.get_param("~use_base_controller", False) # Set up the time for publishing the next SensorState message now = rospy.Time.now() self.t_delta_sensors = rospy.Duration(1.0 / self.sensorstate_rate) self.t_next_sensors = now + self.t_delta_sensors # Initialize a Twist message self.cmd_vel = Twist() # A cmd_vel publisher so we can stop the robot when shutting down self.cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=5) # Initialize the controlller self.controller = Arduino(self.port, self.baud, self.timeout) # Make the connection self.controller.connect() rospy.loginfo("Connected to Arduino on port " + self.port + " at " + str(self.baud) + " baud") # Reserve a thread lock mutex = thread.allocate_lock() # Initialize the base controller if used if self.use_base_controller: self.myBaseController = BaseController(self.controller, self.base_frame) # Start polling the sensors and base controller while not rospy.is_shutdown(): if self.use_base_controller: mutex.acquire() self.myBaseController.poll() mutex.release() r.sleep()