我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用geometry_msgs.msg.PoseWithCovarianceStamped()。
def subscribePose(): rospy.Subscriber('/initialpose', PoseWithCovarianceStamped, getPoseData) # rospy.Subscriber('/move_base_simple/goal', PoseStamped, getPoseData) global background
def __init__(self): self.map = None self.start = None self.goal = None self.moves = [Move(0.1, 0), # forward Move(-0.1, 0), # back Move(0, 1.5708), # turn left 90 Move(0, -1.5708)] # turn right 90 self.robot = Robot(0.5, 0.5) self.is_working = False # Replace with mutex after all self.map_subscriber = rospy.Subscriber("map", OccupancyGrid, self.new_map_callback) self.start_subscriber = rospy.Subscriber("initialpose", PoseWithCovarianceStamped, self.new_start_callback) self.goal_subscriber = rospy.Subscriber("goal", PoseStamped, self.new_goal_callback) self.path_publisher = rospy.Publisher("trajectory", MarkerArray, queue_size=1) self.pose_publisher = rospy.Publisher("debug_pose", PoseStamped, queue_size=1) # what will be there. A module goes into variable. Isn't it too much memory consumption. Maybe I should assign function replan() to this variable? self.planner = planners.astar.replan
def create_pose(pose_obj): pose_stamped = PoseWithCovarianceStamped() pose_stamped.header.frame_id = 'map' pose_stamped.header.stamp = rospy.Time.now() pose = pose_obj.get("pose") position = pose.get("position") orientation = pose.get("orientation") covariance = pose_obj.get("covariance") pose_stamped.pose.pose.position.x = position.get("x") pose_stamped.pose.pose.position.y = position.get("y") pose_stamped.pose.pose.position.z = position.get("z") pose_stamped.pose.pose.orientation.y = orientation.get("x") pose_stamped.pose.pose.orientation.x = orientation.get("y") pose_stamped.pose.pose.orientation.z = orientation.get("z") pose_stamped.pose.pose.orientation.w = orientation.get("w") pose_stamped.pose.covariance = covariance return pose_stamped
def __init__(self): self.frame_id = rospy.get_param("~frame_id", "map") cov_x = rospy.get_param("~cov_x", 0.6) cov_y = rospy.get_param("~cov_y", 0.6) cov_z = rospy.get_param("~cov_z", 0.6) self.cov = self.cov_matrix(cov_x, cov_y, cov_z) self.ps_pub = rospy.Publisher( POSE_TOPIC, PoseStamped, queue_size=1) self.ps_cov_pub = rospy.Publisher( POSE_COV_TOPIC, PoseWithCovarianceStamped, queue_size=1) self.ps_pub_3d = rospy.Publisher( POSE_TOPIC_3D, PoseStamped, queue_size=1) self.ps_cov_pub_3d = rospy.Publisher( POSE_COV_TOPIC_3D, PoseWithCovarianceStamped, queue_size=1) self.last = None self.listener = tf.TransformListener() self.tag_range_topics = rospy.get_param("~tag_range_topics") self.subs = list() self.ranges = dict() self.tag_pos = dict() self.altitude = 0.0 self.last_3d = None for topic in self.tag_range_topics: self.subs.append(rospy.Subscriber(topic, Range, self.range_cb))
def publish_position(self, pos, ps_pub, ps_cov_pub, cov): x, y = pos[0], pos[1] if len(pos) > 2: z = pos[2] else: z = 0 ps = PoseStamped() ps_cov = PoseWithCovarianceStamped() ps.pose.position.x = x ps.pose.position.y = y ps.pose.position.z = z ps.header.frame_id = self.frame_id ps.header.stamp = rospy.get_rostime() ps_cov.header = ps.header ps_cov.pose.pose = ps.pose ps_cov.pose.covariance = cov ps_pub.publish(ps) ps_cov_pub.publish(ps_cov)
def PoseCallback(posedata): # PoseWithCovarianceStamped data from amcl_pose global robot_pose # [time, [x,y,yaw]] header = posedata.header pose = posedata.pose if (not robot_pose[0]) or (header.stamp > robot_pose[0]): # more recent pose data received robot_pose[0] = header.stamp # TODO: maybe add covariance check here? print('robot position update!') euler = euler_from_quaternion([pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w]) #roll, pitch, yaw robot_pose[1] = [pose.pose.position.x, pose.pose.position.y, euler[2]] # in radians return robot_pose #========================================
def SendInitialPose(InitialPosePublisher, initial_pose, time_stamp): # goal: [x, y, yaw] InitialPoseMsg = PoseWithCovarianceStamped() #InitialPoseMsg.header.seq = 0 InitialPoseMsg.header.stamp = time_stamp InitialPoseMsg.header.frame_id = 'map' InitialPoseMsg.pose.pose.position.x = initial_pose[0] InitialPoseMsg.pose.pose.position.y = initial_pose[1] #InitialPoseMsg.pose.position.z = 0.0 quaternion = quaternion_from_euler(0, 0, initial_pose[2]) InitialPoseMsg.pose.pose.orientation.x = quaternion[0] InitialPoseMsg.pose.pose.orientation.y = quaternion[1] InitialPoseMsg.pose.pose.orientation.z = quaternion[2] InitialPoseMsg.pose.pose.orientation.w = quaternion[3] InitialPosePublisher.publish(InitialPoseMsg) #========================================
def main(): trans= 0 rot = 0 rospy.init_node('odom_sync') listener = tf.TransformListener() pubfiltered = rospy.Publisher('set_pose',PoseWithCovarianceStamped,queue_size = 10) pubodom = rospy.Publisher('mobile_base/commands/reset_odometry', Empty, queue_size = 10) serv = rospy.ServiceProxy("set_offset",SetOdomOffset) rospy.Subscriber('/command_input',UInt8,callback,callback_args=(pubodom,pubfiltered,listener,trans,rot,serv)) rospy.sleep(1) print "done sleeping" while not rospy.is_shutdown(): try: (trans,rot) = listener.lookupTransform('/base_meas', '/odom_meas',rospy.Time(0)) except: continue rospy.spin()
def __init__(self): self.lock = threading.Lock() self.sub_imu = rospy.Subscriber('robot_pose_ekf/odom_combined', PoseWithCovarianceStamped, self.imu_cb) self.last_imu_angle = 0 self.imu_angle = 0 while not rospy.is_shutdown(): rospy.sleep(0.1) rospy.loginfo("imu_angle:"+str((self.imu_angle)*180/3.1415926))
def __init__(self): # Give the node a name rospy.init_node('odom_ekf', anonymous=False) # Publisher of type nav_msgs/Odometry self.ekf_pub = rospy.Publisher('output', Odometry, queue_size=5) # Wait for the /odom_combined topic to become available rospy.wait_for_message('input', PoseWithCovarianceStamped) # Subscribe to the /odom_combined topic rospy.Subscriber('input', PoseWithCovarianceStamped, self.pub_ekf_odom) rospy.loginfo("Publishing combined odometry on /odom_ekf")
def execute(self, userdata): global waypoints self.initialize_path_queue() self.path_ready = False # Start thread to listen for when the path is ready (this function will end then) def wait_for_path_ready(): """thread worker function""" data = rospy.wait_for_message('/path_ready', Empty) rospy.loginfo('Recieved path READY message') self.path_ready = True ready_thread = threading.Thread(target=wait_for_path_ready) ready_thread.start() topic = "/initialpose" rospy.loginfo("Waiting to recieve waypoints via Pose msg on topic %s" % topic) rospy.loginfo("To start following waypoints: 'rostopic pub /path_ready std_msgs/Empty -1'") # Wait for published waypoints while not self.path_ready: try: pose = rospy.wait_for_message(topic, PoseWithCovarianceStamped, timeout=1) except rospy.ROSException as e: if 'timeout exceeded' in e.message: continue # no new waypoint within timeout, looping... else: raise e rospy.loginfo("Recieved new waypoint") waypoints.append(pose) # publish waypoint queue as pose array so that you can see them in rviz, etc. self.poseArray_publisher.publish(convert_PoseWithCovArray_to_PoseArray(waypoints)) # Path is ready! return success and move on to the next state (FOLLOW_PATH) return 'success'
def clicked_pose(self, msg): ''' Receive pose messages from RViz and initialize the particle distribution in response. ''' if isinstance(msg, PointStamped): self.initialize_global() elif isinstance(msg, PoseWithCovarianceStamped): self.initialize_particles_pose(msg.pose.pose)
def ekf_pub(self, ranges, vel_data, yaw, alt): z = np.array([]) new_pose = Odometry() ps_cov = PoseWithCovarianceStamped() for tag_name in self.tag_order: measurement = ranges[tag_name] z = np.append(z, measurement) if self.last_time is None: self.last_time = rospy.Time.now().to_sec() else: dt = rospy.Time.now().to_sec() - self.last_time self.predict(dt) self.update(z, vel_data, yaw, alt) self.last_time = rospy.Time.now().to_sec() new_pose.header.stamp = rospy.get_rostime() new_pose.header.frame_id = self.frame_id new_pose.pose.pose.position.x = self.x[0] new_pose.pose.pose.position.y = self.x[1] new_pose.pose.pose.position.z = self.x[2] cov = self.P.flatten().tolist() new_pose.pose.covariance = cov new_pose.twist.twist.linear.x = self.x[3] new_pose.twist.twist.linear.y = self.x[4] new_pose.twist.twist.linear.z = self.x[5] return new_pose # @n.publisher(EKF_COV_TOPIC, PoseWithCovarianceStamped) # def cov_pub(self, ps_cov): # return ps_cov
def __init__(self): rospy.Subscriber('/amcl_pose', PoseWithCovarianceStamped, self.monitor_pose)
def PoseCallback(posedata): # PoseWithCovarianceStamped data from amcl_pose global robot_pose # [time, [x,y,yaw]] header = posedata.header pose = posedata.pose if (not robot_pose[0]) or (header.stamp > robot_pose[0]): # more recent pose data received robot_pose[0] = header.stamp # TODO: maybe add covariance check here? print('robot position update!') euler = euler_from_quaternion([pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w]) #roll, pitch, yaw robot_pose[1] = [pose.pose.position.x, pose.pose.position.y, euler[2]] # in radians return robot_pose