我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用sensor_msgs.msg.PointCloud2()。
def xyz_array_to_pointcloud2(points, stamp=None, frame_id=None): ''' Create a sensor_msgs.PointCloud2 from an array of points. ''' msg = PointCloud2() if stamp: msg.header.stamp = stamp if frame_id: msg.header.frame_id = frame_id if len(points.shape) == 3: msg.height = points.shape[1] msg.width = points.shape[0] else: msg.height = 1 msg.width = len(points) msg.fields = [ PointField('x', 0, PointField.FLOAT32, 1), PointField('y', 4, PointField.FLOAT32, 1), PointField('z', 8, PointField.FLOAT32, 1)] msg.is_bigendian = False msg.point_step = 12 msg.row_step = 12*points.shape[0] msg.is_dense = int(np.isfinite(points).all()) msg.data = np.asarray(points, np.float32).tostring() return msg
def xyzrgb_array_to_pointcloud2(points, colors, stamp=None, frame_id=None, seq=None): ''' Create a sensor_msgs.PointCloud2 from an array of points. ''' msg = PointCloud2() assert(points.shape == colors.shape) buf = [] if stamp: msg.header.stamp = stamp if frame_id: msg.header.frame_id = frame_id if seq: msg.header.seq = seq if len(points.shape) == 3: msg.height = points.shape[1] msg.width = points.shape[0] else: N = len(points) xyzrgb = np.array(np.hstack([points, colors]), dtype=np.float32) msg.height = 1 msg.width = N msg.fields = [ PointField('x', 0, PointField.FLOAT32, 1), PointField('y', 4, PointField.FLOAT32, 1), PointField('z', 8, PointField.FLOAT32, 1), PointField('r', 12, PointField.FLOAT32, 1), PointField('g', 16, PointField.FLOAT32, 1), PointField('b', 20, PointField.FLOAT32, 1) ] msg.is_bigendian = False msg.point_step = 24 msg.row_step = msg.point_step * N msg.is_dense = True; msg.data = xyzrgb.tostring() return msg
def ros_to_pcl(ros_cloud): """ Converts a ROS PointCloud2 message to a pcl PointXYZRGB Args: ros_cloud (PointCloud2): ROS PointCloud2 message Returns: pcl.PointCloud_PointXYZRGB: PCL XYZRGB point cloud """ points_list = [] for data in pc2.read_points(ros_cloud, skip_nans=True): points_list.append([data[0], data[1], data[2], data[3]]) pcl_data = pcl.PointCloud_PointXYZRGB() pcl_data.from_list(points_list) return pcl_data
def array_to_pointcloud2(cloud_arr, stamp=None, frame_id=None): '''Converts a numpy record array to a sensor_msgs.msg.PointCloud2. ''' # make it 2d (even if height will be 1) cloud_arr = np.atleast_2d(cloud_arr) cloud_msg = PointCloud2() if stamp is not None: cloud_msg.header.stamp = stamp if frame_id is not None: cloud_msg.header.frame_id = frame_id cloud_msg.height = cloud_arr.shape[0] cloud_msg.width = cloud_arr.shape[1] cloud_msg.fields = dtype_to_fields(cloud_arr.dtype) cloud_msg.is_bigendian = False # assumption cloud_msg.point_step = cloud_arr.dtype.itemsize cloud_msg.row_step = cloud_msg.point_step*cloud_arr.shape[1] cloud_msg.is_dense = all([np.isfinite(cloud_arr[fname]).all() for fname in cloud_arr.dtype.names]) cloud_msg.data = cloud_arr.tostring() return cloud_msg
def start_processing(self): """ Start processing data """ if self.disabled: rospy.loginfo("Processing started") self.disabled = False if self.sub_left is None: self.sub_left = rospy.Subscriber( "/multisense/camera/left/image_color", Image, self.left_color_detection) rospy.sleep(0.1) if self.sub_right is None: self.sub_right = rospy.Subscriber( "/multisense/camera/right/image_color", Image, self.right_color_detection) rospy.sleep(0.1) if self.sub_cloud is None: self.sub_cloud = rospy.Subscriber( "/multisense/image_points2_color", PointCloud2, self.stereo_cloud)
def pointcloud2_to_array(cloud_msg, squeeze=True): ''' Converts a rospy PointCloud2 message to a numpy recordarray Reshapes the returned array to have shape (height, width), even if the height is 1. The reason for using np.fromstring rather than struct.unpack is speed... especially for large point clouds, this will be <much> faster. ''' # construct a numpy record type equivalent to the point type of this cloud dtype_list = fields_to_dtype(cloud_msg.fields, cloud_msg.point_step) # parse the cloud into an array cloud_arr = np.fromstring(cloud_msg.data, dtype_list) # remove the dummy fields that were added cloud_arr = cloud_arr[ [fname for fname, _type in dtype_list if not (fname[:len(DUMMY_FIELD_PREFIX)] == DUMMY_FIELD_PREFIX)]] if squeeze and cloud_msg.height == 1: return np.reshape(cloud_arr, (cloud_msg.width,)) else: return np.reshape(cloud_arr, (cloud_msg.height, cloud_msg.width))
def pointcloud2_to_array(cloud_msg): ''' Converts a rospy PointCloud2 message to a numpy recordarray Assumes all fields 32 bit floats, and there is no padding. ''' dtype_list = [(f.name, np.float32) for f in cloud_msg.fields] cloud_arr = np.fromstring(cloud_msg.data, dtype_list) return np.reshape(cloud_arr, (cloud_msg.height, cloud_msg.width))
def pc_map_pub(self, ns): if ns not in self.pc_map: self.pc_map[ns] = rospy.Publisher(ns, sensor_msg.PointCloud2, latch=False, queue_size=10) return self.pc_map[ns] # Helper functions
def process_radar_tracks(msg): assert msg._md5sum == '6a2de2f790cb8bb0e149d45d297462f8' tracks = RadarObservation.from_msg(msg) num_tracks = len(tracks) acc=[] cloud = np.zeros([num_tracks, 3], dtype=np.float32) for i, track in enumerate(tracks): cloud[i] = [track.x, track.y, track.z] - np.array(RADAR_TO_LIDAR) if np.abs(track.y) < 2: #acc.append(track.x) #acc.append(track.power) print track.vx*3.7, track.vy*3.7 #print x, y, z #print acc #msg.header.stamp header = Header() header.stamp = msg.header.stamp header.frame_id = 'velodyne' cloud_msg = pc2.create_cloud_xyz32(header, cloud) cloud_msg.width = 1 cloud_msg.height = num_tracks rospy.Publisher('radar_points', PointCloud2, queue_size=1).publish(cloud_msg)
def listen(): global pub, data_list, rospy rospy.init_node('listen', anonymous=True) pub = rospy.Publisher('camera/depth/points_replay', PointCloud2, queue_size=10) rospy.Subscriber("points_photo_trigger", std_msgs.msg.Int32, callback_photo)
def callback_photo(data) : global pub,data_list, sub sub = rospy.Subscriber("camera/depth/points", PointCloud2, callback_listen)
def capture_sample(): """ Captures a PointCloud2 using the sensor stick RGBD camera Args: None Returns: PointCloud2: a single point cloud from the RGBD camrea """ get_model_state_prox = rospy.ServiceProxy('gazebo/get_model_state',GetModelState) model_state = get_model_state_prox('training_model','world') set_model_state_prox = rospy.ServiceProxy('gazebo/set_model_state', SetModelState) roll = random.uniform(0, 2*math.pi) pitch = random.uniform(0, 2*math.pi) yaw = random.uniform(0, 2*math.pi) quaternion = tf.transformations.quaternion_from_euler(roll, pitch, yaw) model_state.pose.orientation.x = quaternion[0] model_state.pose.orientation.y = quaternion[1] model_state.pose.orientation.z = quaternion[2] model_state.pose.orientation.w = quaternion[3] sms_req = SetModelStateRequest() sms_req.model_state.pose = model_state.pose sms_req.model_state.twist = model_state.twist sms_req.model_state.model_name = 'training_model' sms_req.model_state.reference_frame = 'world' set_model_state_prox(sms_req) return rospy.wait_for_message('/sensor_stick/point_cloud', PointCloud2)
def start_processing(self): """ Start processing data """ rospy.loginfo("Processing started") self.image_sub_left = rospy.Subscriber( "/multisense/camera/left/image_raw", Image, self.color_detection) self.image_sub_cloud = rospy.Subscriber( "/multisense/image_points2_color", PointCloud2, self.cloud)
def __init__(self): self.point_cloud_sub = rospy.Subscriber('/camera/depth_registered/points', PointCloud2, self.point_cloud_callback) self.br = tf2_ros.TransformBroadcaster() circle_rgb = [110, 70, 90] triangle_rgb = [75, 120, 60] square_rgb = [30, 90, 155] rectangle_rgb = [165, 175, 90] hexagon_rgb = [120, 50, 50] self.shapes_rgb = [circle_rgb, triangle_rgb, square_rgb, rectangle_rgb, hexagon_rgb] self.shape_names = ['circle', 'triangle', 'square', 'rectangle', 'hexagon']
def add_subscriber(self, pc2_callback): rospy.Subscriber('/velodyne_points', PointCloud2, pc2_callback)
def __init__(self): rospy.init_node('EvaDetectionPipeline', anonymous=True) rospy.Subscriber('/velodyne_points', PointCloud2, self.pointcloud_received) self.pipeline = dp.DetectionPipeline(enable_birdseye = True, enable_camera = False, enable_kalman = False)
def startlistening(self): rospy.init_node('tracker', anonymous=True) rospy.Subscriber('/image_raw', Image, self.handle_image_msg) # for frame number rospy.Subscriber('/velodyne_points', PointCloud2, self.handle_lidar_msg) # for timing data rospy.Subscriber("/bbox", MarkerArray, self.handle_bbox_msg) print 'tracker node initialzed' rospy.spin()
def start(self): """ Start the sensor """ # initialize subscribers self._pointcloud_sub = rospy.Subscriber('/%s/depth/points' %(self.frame), PointCloud2, self._pointcloud_callback) self._camera_info_sub = rospy.Subscriber('/%s/left/camera_info' %(self.frame), CameraInfo, self._camera_info_callback) while self._camera_intr is None: time.sleep(0.1) self._running = True
def pc2_parse(data): """ PointCloud2 parser pc2.read_points returns a generator of (x,y,z) tuples """ gen = pc2.read_points(data, skip_nans=True, field_names=("x","y","z")) return list(gen)
def test_roundtrip(self): points_arr = self.makeArray(100) cloud_msg = ros_numpy.msgify(PointCloud2, points_arr) new_points_arr = ros_numpy.numpify(cloud_msg) np.testing.assert_equal(points_arr, new_points_arr)
def test_roundtrip_numpy(self): points_arr = self.makeArray(100) cloud_msg = ros_numpy.msgify(numpy_msg(PointCloud2), points_arr) new_points_arr = ros_numpy.numpify(cloud_msg) np.testing.assert_equal(points_arr, new_points_arr)
def pcl_to_ros(pcl_array): """ Converts a ROS PointCloud2 message to a pcl PointXYZRGB Args: pcl_array (PointCloud_PointXYZRGB): A PCL XYZRGB point cloud Returns: PointCloud2: A ROS point cloud """ ros_msg = PointCloud2() ros_msg.header.stamp = rospy.Time.now() ros_msg.header.frame_id = "world" ros_msg.height = 1 ros_msg.width = pcl_array.size ros_msg.fields.append(PointField( name="x", offset=0, datatype=PointField.FLOAT32, count=1)) ros_msg.fields.append(PointField( name="y", offset=4, datatype=PointField.FLOAT32, count=1)) ros_msg.fields.append(PointField( name="z", offset=8, datatype=PointField.FLOAT32, count=1)) ros_msg.fields.append(PointField( name="rgb", offset=16, datatype=PointField.FLOAT32, count=1)) ros_msg.is_bigendian = False ros_msg.point_step = 32 ros_msg.row_step = ros_msg.point_step * ros_msg.width * ros_msg.height ros_msg.is_dense = False buffer = [] for data in pcl_array: s = struct.pack('>f', data[3]) i = struct.unpack('>l', s)[0] pack = ctypes.c_uint32(i).value r = (pack & 0x00FF0000) >> 16 g = (pack & 0x0000FF00) >> 8 b = (pack & 0x000000FF) buffer.append(struct.pack('ffffBBBBIII', data[0], data[1], data[2], 1.0, b, g, r, 0, 0, 0, 0)) ros_msg.data = "".join(buffer) return ros_msg
def pcl_to_ros(pcl_array): """ Converts a pcl PointXYZRGB to a ROS PointCloud2 message Args: pcl_array (PointCloud_PointXYZRGB): A PCL XYZRGB point cloud Returns: PointCloud2: A ROS point cloud """ ros_msg = PointCloud2() ros_msg.header.stamp = rospy.Time.now() ros_msg.header.frame_id = "world" ros_msg.height = 1 ros_msg.width = pcl_array.size ros_msg.fields.append(PointField( name="x", offset=0, datatype=PointField.FLOAT32, count=1)) ros_msg.fields.append(PointField( name="y", offset=4, datatype=PointField.FLOAT32, count=1)) ros_msg.fields.append(PointField( name="z", offset=8, datatype=PointField.FLOAT32, count=1)) ros_msg.fields.append(PointField( name="rgb", offset=16, datatype=PointField.FLOAT32, count=1)) ros_msg.is_bigendian = False ros_msg.point_step = 32 ros_msg.row_step = ros_msg.point_step * ros_msg.width * ros_msg.height ros_msg.is_dense = False buffer = [] for data in pcl_array: s = struct.pack('>f', data[3]) i = struct.unpack('>l', s)[0] pack = ctypes.c_uint32(i).value r = (pack & 0x00FF0000) >> 16 g = (pack & 0x0000FF00) >> 8 b = (pack & 0x000000FF) buffer.append(struct.pack('ffffBBBBIII', data[0], data[1], data[2], 1.0, b, g, r, 0, 0, 0, 0)) ros_msg.data = "".join(buffer) return ros_msg
def __init__(self): rospy.init_node("follower") # Set the shutdown function (stop the robot) rospy.on_shutdown(self.shutdown) # The dimensions (in meters) of the box in which we will search # for the person (blob). These are given in camera coordinates # where x is left/right,y is up/down and z is depth (forward/backward) self.min_x = rospy.get_param("~min_x", -0.2) self.max_x = rospy.get_param("~max_x", 0.2) self.min_y = rospy.get_param("~min_y", -0.3) self.max_y = rospy.get_param("~max_y", 0.5) self.max_z = rospy.get_param("~max_z", 1.2) # The goal distance (in meters) to keep between the robot and the person self.goal_z = rospy.get_param("~goal_z", 0.6) # How far away from the goal distance (in meters) before the robot reacts self.z_threshold = rospy.get_param("~z_threshold", 0.05) # How far away from being centered (x displacement) on the person # before the robot reacts self.x_threshold = rospy.get_param("~x_threshold", 0.05) # How much do we weight the goal distance (z) when making a movement self.z_scale = rospy.get_param("~z_scale", 1.0) # How much do we weight left/right displacement of the person when making a movement self.x_scale = rospy.get_param("~x_scale", 2.5) # The maximum rotation speed in radians per second self.max_angular_speed = rospy.get_param("~max_angular_speed", 2.0) # The minimum rotation speed in radians per second self.min_angular_speed = rospy.get_param("~min_angular_speed", 0.0) # The max linear speed in meters per second self.max_linear_speed = rospy.get_param("~max_linear_speed", 0.3) # The minimum linear speed in meters per second self.min_linear_speed = rospy.get_param("~min_linear_speed", 0.1) # Slow down factor when stopping self.slow_down_factor = rospy.get_param("~slow_down_factor", 0.8) # Initialize the movement command self.move_cmd = Twist() # Publisher to control the robot's movement self.cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=5) # Subscribe to the point cloud self.depth_subscriber = rospy.Subscriber('point_cloud', PointCloud2, self.set_cmd_vel, queue_size=1) rospy.loginfo("Subscribing to point cloud...") # Wait for the pointcloud topic to become available rospy.wait_for_message('point_cloud', PointCloud2) rospy.loginfo("Ready to follow!")
def __init__(self): rospy.init_node("follower") # Set the shutdown function (stop the robot) rospy.on_shutdown(self.shutdown) # The goal distance (in meters) to keep between the robot and the person self.goal_z = rospy.get_param("~goal_z", 0.6) # How far away from the goal distance (in meters) before the robot reacts self.z_threshold = rospy.get_param("~z_threshold", 0.05) # How far away from being centered (x displacement) on the person # before the robot reacts self.x_threshold = rospy.get_param("~x_threshold", 0.05) # How much do we weight the goal distance (z) when making a movement self.z_scale = rospy.get_param("~z_scale", 1.0) # How much do we weight x-displacement of the person when making a movement self.x_scale = rospy.get_param("~x_scale", 2.5) # The maximum rotation speed in radians per second self.max_angular_speed = rospy.get_param("~max_angular_speed", 2.0) # The minimum rotation speed in radians per second self.min_angular_speed = rospy.get_param("~min_angular_speed", 0.0) # The max linear speed in meters per second self.max_linear_speed = rospy.get_param("~max_linear_speed", 0.3) # The minimum linear speed in meters per second self.min_linear_speed = rospy.get_param("~min_linear_speed", 0.1) # Slow down factor when stopping self.slow_down_factor = rospy.get_param("~slow_down_factor", 0.8) # Initialize the movement command self.move_cmd = Twist() # Publisher to control the robot's movement self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=5) # Subscribe to the point cloud self.depth_subscriber = rospy.Subscriber('point_cloud', PointCloud2, self.set_cmd_vel, queue_size=1) rospy.loginfo("Subscribing to point cloud...") # Wait for the pointcloud topic to become available rospy.wait_for_message('point_cloud', PointCloud2) rospy.loginfo("Ready to follow!")