我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用rospy.get_name()。
def __init__(self): rospy.init_node('multiplexer_node', anonymous=False) rospy.loginfo('\033[93m[%s]\033[0m wait for bringup social_events_memory...'%rospy.get_name()) rospy.wait_for_service('social_events_memory/read_data') self.rd_event_mem = rospy.ServiceProxy('social_events_memory/read_data', ReadData) rospy.Subscriber('forwarding_event', ForwardingEvent, self.handle_social_events) self.pub_rasing_event = rospy.Publisher('raising_events', RaisingEvents, queue_size=10) self.events_queue = Queue.Queue() self.recognized_words_queue = Queue.Queue() event_period = rospy.get_param('~event_period', 0.5) rospy.Timer(rospy.Duration(event_period), self.handle_trigger_events) rospy.loginfo('\033[93m[%s]\033[0m initialized...'%rospy.get_name()) rospy.spin()
def save_to_memory(self, perception_name, data={}): if data == {}: rospy.logwarn('Empty data inserted...') return for item in data.keys(): if not item in self.conf_data[perception_name]['data'].keys(): rospy.logwarn('[%s -- wrong data inserted [%s]...'%(perception_name, item)) return srv_req = WriteDataRequest() srv_req.perception_name = perception_name srv_req.data = json.dumps(data) srv_req.by = rospy.get_name() target_memory = self.conf_data[perception_name]['target_memory'] try: rospy.wait_for_service('/%s/write_data'%target_memory) self.dict_srv_wr[target_memory](srv_req) except rospy.ServiceException as e: pass
def __init__(self): rospy.init_node('gaze', anonymous=False) self.lock = threading.RLock() with self.lock: self.current_state = GazeState.IDLE self.last_state = self.current_state # Initialize Variables self.glance_timeout = 0 self.glance_timecount = 0 self.glance_played = False self.idle_timeout = 0 self.idle_timecount = 0 self.idle_played = False rospy.loginfo('\033[92m[%s]\033[0m waiting for bringup social_mind...'%rospy.get_name()) rospy.wait_for_service('environmental_memory/read_data') rospy.wait_for_service('social_events_memory/read_data') self.rd_memory = {} self.rd_memory['environmental_memory'] = rospy.ServiceProxy('environmental_memory/read_data', ReadData) self.rd_memory['social_events_memory'] = rospy.ServiceProxy('social_events_memory/read_data', ReadData) rospy.Subscriber('raising_events', RaisingEvents, self.handle_raising_events) rospy.Subscriber('gaze_focusing', String, self.handle_gaze_focusing) self.pub_gaze_cmd = rospy.Publisher('gaze_command', GazeCommand, queue_size=10) self.pub_viz_gaze_cmd = rospy.Publisher('visualization_gaze_cmd', PointStamped, queue_size=10) rospy.Timer(rospy.Duration(GAZE_CONTROLLER_PERIOD), self.handle_gaze_controller) rospy.loginfo('\033[92m[%s]\033[0m initialized...'%rospy.get_name()) rospy.spin()
def __init__(self): rospy.loginfo("Initializing AudioFilePlayer...") self.current_playing_process = None self.afp_as = SimpleActionServer(rospy.get_name(), AudioFilePlayAction, self.as_cb, auto_start=False) self.afp_sub = rospy.Subscriber('~play', String, self.topic_cb, queue_size=1) # By default this node plays files using the aplay command # Feel free to use any other command or flags # by using the params provided self.command = rospy.get_param('~/command', 'play') self.flags = rospy.get_param('~/flags', '') self.feedback_rate = rospy.get_param('~/feedback_rate', 10) self.afp_as.start() # Needs to be done after start self.afp_as.register_preempt_callback(self.as_preempt_cb) rospy.loginfo( "Done, playing files from action server or topic interface.")
def __init__(self, node_name): rospy.init_node(node_name, anonymous=False) try: conf_file = rospy.get_param('~config_file') except KeyError as e: rospy.logerr('You should set the parameter for perception config file...') quit() with open(os.path.abspath(conf_file)) as f: self.conf_data = yaml.load(f.read()) rospy.loginfo('loading perception config file... %d perception(s) exists...'%len(self.conf_data.keys())) for item in self.conf_data.keys(): rospy.loginfo('\033[92m - %s: %d event(s) and %d data(s).\033[0m'%(item, len(self.conf_data[item]['events']), len(self.conf_data[item]['data']))) self.dict_srv_wr = {} self.dict_srv_rd = {} for item in self.conf_data.keys(): if self.conf_data[item].has_key('target_memory'): memory_name = self.conf_data[item]['target_memory'] rospy.loginfo('\033[94m[%s]\033[0m wait for bringup %s...'%(rospy.get_name(), memory_name)) rospy.wait_for_service('/%s/write_data'%memory_name) self.dict_srv_wr[memory_name] = rospy.ServiceProxy('/%s/write_data'%memory_name, WriteData) self.dict_srv_rd[memory_name] = rospy.ServiceProxy('/%s/read_data'%memory_name, ReadData) self.register_data_to_memory(memory_name, item, self.conf_data[item]['data']) self.is_enable_perception = True rospy.Subscriber('%s/start'%rospy.get_name(), Empty, self.handle_start_perception) rospy.Subscriber('%s/stop'%rospy.get_name(), Empty, self.handle_stop_perception) self.pub_event = rospy.Publisher('forwarding_event', ForwardingEvent, queue_size=10) rospy.loginfo('\033[94m[%s]\033[0m initialize perception_base done...'%rospy.get_name())
def handle_start_perception(self, msg): self.is_enable_perception = True rospy.loginfo('%s is enabled...'%rospy.get_name())
def handle_stop_perception(self, msg): self.is_enable_perception = False rospy.loginfo('%s is disabled...'%rospy.get_name())
def preempt_callback(self): rospy.logwarn('\033[94m[%s]\033[0m rendering preempted.'%rospy.get_name()) for k in self.is_rendering.keys(): if self.is_rendering[k]: self.render_client[k].cancel_all_goals()
def __init__(self): rospy.init_node('memory_node', anonymous=False) while not rospy.is_shutdown(): try: port = rospy.get_param('/social_mind/port') host = rospy.get_param('/social_mind/host') break except KeyError as e: rospy.loginfo('[%s] wait for bringup social_mind node...'%rospy.get_name()) rospy.sleep(1.0) continue try: self.client = pymongo.MongoClient(host, port, serverSelectionTimeoutMS=2000) self.client.is_mongos # Wait for connection self.db = self.client[rospy.get_param('~name_data_group')] # self.db.set_profiling_level(0, 400) # Performance Setting: Set the database’s profiling level to 0, and slow_ms is 400ms. self.collections = {} self.data_template = {} except pymongo.errors.ServerSelectionTimeoutError, e: rospy.logerr('Error: %s'%e.message) quit() except KeyError, e: quit() self.srv_read_data = rospy.Service('%s/read_data'%rospy.get_name(), ReadData, self.handle_read_data) self.srv_write_data = rospy.Service('%s/write_data'%rospy.get_name(), WriteData, self.handle_write_data) self.srv_register_data = rospy.Service('%s/register_data'%rospy.get_name(), RegisterData, self.handle_register_data) self.srv_get_data_list = rospy.Service('%s/get_data_list'%rospy.get_name(), GetDataList, self.handle_get_data_list) self.wait_event_server = actionlib.SimpleActionServer('%s/wait_event'%rospy.get_name(), WaitEventAction, self.handle_wait_event, auto_start=False) self.wait_event_server.start() rospy.loginfo('[%s] initialzed and ready to use...'%rospy.get_name())
def handle_write_data(self, req): recv_data = json.loads(req.data) write_data = recv_data.copy() write_data['time'] = datetime.datetime.fromtimestamp(rospy.get_time()) write_data['by'] = req.by if rospy.get_name() == '/environmental_memory' and req.perception_name != 'objects_info': self.collections[req.perception_name].update_one( {'name':write_data['name']}, {'$set': write_data}, upsert=True) else: self.collections[req.perception_name].insert_one(write_data) return WriteDataResponse(True)
def render_active(self): rospy.loginfo('\033[91m[%s]\033[0m scene rendering started...'%rospy.get_name()) self.is_rendering = True self.pub_stop_speech_recognizer.publish() self.pub_start_robot_speech.publish()
def render_feedback(self, feedback): rospy.loginfo('\033[91m[%s]\033[0m scene rendering feedback...'%rospy.get_name())
def render_done(self, state, result): rospy.loginfo('\033[91m[%s]\033[0m scene rendering done...'%rospy.get_name()) self.is_rendering = False # rospy.sleep(0.4) # while self.is_speaking_started and not rospy.is_shutdown(): # pass rospy.sleep(0.5) self.pub_start_speech_recognizer.publish() self.pub_stop_robot_speech.publish()
def setupSerial(devName, baudrate): serialDev = serial.Serial(port=devName, baudrate=baudrate, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE) if serialDev is None: raise RuntimeError('[%s]: Serial port %s not found!\n' % (rospy.get_name(), devDame)) serialDev.write_timeout = .05 serialDev.timeout= 0.05 serialDev.flushOutput() serialDev.flushInput() return serialDev
def ROS_Subscribe(self): def handler(event): if event._connection_header["callerid"] == rospy.get_name(): # The message came from canros so ignore return uavcan_msg = canros.copy_ros_uavcan(self.UAVCAN_Type(), event) uavcan_node.broadcast(uavcan_msg, priority=uavcan.TRANSFER_PRIORITY_LOWEST) self.Subscriber(callback=handler)
def __init__(self, node, uavcan_type, callback): def cb_wrapper(ros_event): if ros_event._connection_header["callerid"] != rospy.get_name(): callback(uavcan_node._Event(node, uavcan_type, ros_event)) sub = Message(uavcan_type.full_name).Subscriber(callback=cb_wrapper) self.remove = sub.unregister # Mimics a service handler. Used in add_handler.
def __init__(self): local_uri = rosgraph.get_master_uri() foreign_uri = rospy.get_param('~foreign_master', None) if not foreign_uri: raise Exception('foreign_master URI not specified') local_pubs = rospy.get_param('~local_pubs', []) foreign_pubs = rospy.get_param('~foreign_pubs', []) self._local_services = rospy.get_param('~local_services', []) self._foreign_services = rospy.get_param('~foreign_services', []) self.foreign_master = rosgraph.Master(rospy.get_name(), master_uri=foreign_uri) r = rospy.Rate(1) while not self.is_master_up(self.foreign_master) and not rospy.is_shutdown(): rospy.logdebug('Waiting for foreign master to come up...') r.sleep() self._local_manager = None self._foreign_manager = None if not rospy.is_shutdown(): self._local_manager = _RemoteManager(local_uri, self._local_publisher_update) self._foreign_manager = _RemoteManager(foreign_uri, self._foreign_publisher_update) for t in local_pubs: self._local_manager.subscribe(t) for t in foreign_pubs: self._foreign_manager.subscribe(t)
def __init__(self, limb, reconfig_server, rate=100.0, mode='position_w_id', interpolation='minjerk'): self._dyn = reconfig_server self._ns = 'robot/limb/' + limb self._fjt_ns = self._ns + '/follow_joint_trajectory' self._server = actionlib.SimpleActionServer( self._fjt_ns, FollowJointTrajectoryAction, execute_cb=self._on_trajectory_action, auto_start=False) self._action_name = rospy.get_name() self._limb = intera_interface.Limb(limb, synchronous_pub=True) self._enable = intera_interface.RobotEnable() self._name = limb self._interpolation = interpolation self._cuff = intera_interface.Cuff(limb=limb) # Verify joint control mode self._mode = mode if (self._mode != 'position' and self._mode != 'position_w_id' and self._mode != 'velocity'): rospy.logerr("%s: Action Server Creation Failed - " "Provided Invalid Joint Control Mode '%s' (Options: " "'position_w_id', 'position', 'velocity')" % (self._action_name, self._mode,)) return self._server.start() self._alive = True # Action Feedback/Result self._fdbk = FollowJointTrajectoryFeedback() self._result = FollowJointTrajectoryResult() # Controller parameters from arguments, messages, and dynamic # reconfigure self._control_rate = rate # Hz self._control_joints = [] self._pid_gains = {'kp': dict(), 'ki': dict(), 'kd': dict()} self._goal_time = 0.0 self._stopped_velocity = 0.0 self._goal_error = dict() self._path_thresh = dict() # Create our PID controllers self._pid = dict() for joint in self._limb.joint_names(): self._pid[joint] = intera_control.PID() # Create our spline coefficients self._coeff = [None] * len(self._limb.joint_names()) # Set joint state publishing to specified control rate self._pub_rate = rospy.Publisher( '/robot/joint_state_publish_rate', UInt16, queue_size=10) self._pub_rate.publish(self._control_rate)
def __init__(self): rospy.init_node('motion_renderer', anonymous=False) self.server = actionlib.SimpleActionServer('render_scene', RenderSceneAction, self.execute_callback, False) self.server.register_preempt_callback(self.preempt_callback) self.server.start() rospy.loginfo('\033[94m[%s]\033[0m wait for render item...'%rospy.get_name()) try: rospy.wait_for_service('get_installed_gestures') except rospy.exceptions.ROSException as e: rospy.logerr(e) quit() self.gazefocus_pub = rospy.Publisher('gaze_focusing', String, queue_size=10) self.render_client['say'] = actionlib.SimpleActionClient('render_speech', RenderItemAction) self.render_client['say'].wait_for_server() self.render_client['sm'] = actionlib.SimpleActionClient('render_gesture', RenderItemAction) self.render_client['sm'].wait_for_server() self.get_motion = rospy.ServiceProxy('get_installed_gestures', GetInstalledGestures) json_data = self.get_motion() self.motion_tag = json.loads(json_data.gestures) rospy.loginfo('\033[94m[%s]\033[0m success to get motion_tag from gesture server' % rospy.get_name()) self.render_client['expression'] = actionlib.SimpleActionClient('render_facial_expression', RenderItemAction) self.render_client['expression'].wait_for_server() self.render_client['sound'] = actionlib.SimpleActionClient('render_sound', RenderItemAction) self.render_client['sound'].wait_for_server() # Status flags self.is_rendering['say'] = False self.is_rendering['sm'] = False self.is_rendering['expression'] = False self.is_rendering['sound'] = False self.return_to_last_expression = False # Register callback functions. self.cb_start['say'] = self.handle_render_say_start self.cb_done['say'] = self.handle_render_say_done self.cb_start['sm'] = self.handle_render_sm_start self.cb_done['sm'] = self.handle_render_sm_done self.cb_start['expression'] = self.handle_render_expression_start self.cb_done['expression'] = self.handle_render_expression_done self.cb_start['sound'] = self.handle_render_sound_start self.cb_done['sound'] = self.handle_render_sound_done rospy.loginfo("\033[94m[%s]\033[0m initialized."%rospy.get_name()) rospy.spin()
def proc_imu(quat1, acc, gyro): # New info: # https://github.com/thalmiclabs/myo-bluetooth/blob/master/myohw.h#L292-L295 # Scale values for unpacking IMU data # define MYOHW_ORIENTATION_SCALE 16384.0f # See myohw_imu_data_t::orientation # define MYOHW_ACCELEROMETER_SCALE 2048.0f # See myohw_imu_data_t::accelerometer # define MYOHW_GYROSCOPE_SCALE 16.0f # See myohw_imu_data_t::gyroscope if not any(quat1): # If it's all 0's means we got no data, don't do anything return h = Header() h.stamp = rospy.Time.now() # frame_id is node name without / h.frame_id = rospy.get_name()[1:] # We currently don't know the covariance of the sensors with each other cov = [0, 0, 0, 0, 0, 0, 0, 0, 0] quat = Quaternion(quat1[0] / 16384.0, quat1[1] / 16384.0, quat1[2] / 16384.0, quat1[3] / 16384.0) # Normalize the quaternion and accelerometer values quatNorm = sqrt(quat.x * quat.x + quat.y * quat.y + quat.z * quat.z + quat.w * quat.w) normQuat = Quaternion(quat.x / quatNorm, quat.y / quatNorm, quat.z / quatNorm, quat.w / quatNorm) normAcc = Vector3(acc[0] / 2048.0, acc[1] / 2048.0, acc[2] / 2048.0) normGyro = Vector3(gyro[0] / 16.0, gyro[1] / 16.0, gyro[2] / 16.0) imu = Imu(h, normQuat, cov, normGyro, cov, normAcc, cov) imuPub.publish(imu) roll, pitch, yaw = euler_from_quaternion([normQuat.x, normQuat.y, normQuat.z, normQuat.w]) oriPub.publish(Vector3(roll, pitch, yaw)) oriDegPub.publish(Vector3(degrees(roll), degrees(pitch), degrees(yaw))) posePub.publish( PoseStamped(h,Pose(Point(0.0,0.0,0.0),normQuat)) ) # Package the arm and x-axis direction into an Arm message