我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用std_msgs.msg.ColorRGBA()。
def marker_from_circle(circle, index=0, linewidth=0.1, color=ColorRGBA(1, 0, 0, 1), z=0., lifetime=10.0): marker = Marker() marker.header = make_header("/map") marker.ns = "Markers_NS" marker.id = index marker.type = Marker.CYLINDER marker.action = 0 # action=0 add/modify object # marker.color.r = 1.0 # marker.color.g = 0.0 # marker.color.b = 0.0 # marker.color.a = 0.4 marker.color = color marker.lifetime = rospy.Duration.from_sec(lifetime) marker.pose = Pose() marker.pose.position.z = z marker.pose.position.x = circle.center[0] marker.pose.position.y = circle.center[1] marker.scale = Vector3(circle.radius*2.0, circle.radius*2.0, 0) return marker
def marker_from_point_radius(point, radius, index=0, linewidth=0.1, color=ColorRGBA(1, 0, 0, 1), z=0., lifetime=10.0): marker = Marker() marker.header = make_header("/map") marker.ns = "Speed_NS" marker.id = index marker.type = Marker.CYLINDER marker.action = 0 # action=0 add/modify object # marker.color.r = 1.0 # marker.color.g = 0.0 # marker.color.b = 0.0 # marker.color.a = 0.4 marker.color = color marker.lifetime = rospy.Duration.from_sec(lifetime) marker.pose = Pose() marker.pose.position.z = z marker.pose.position.x = point[0] marker.pose.position.y = point[1] marker.scale = Vector3(radius*2.0, radius*2.0, 0.001) return marker
def publish_speeds(self, duration=0.0, scale=0.7): should_publish = len(self.speed_profile) > 1 if self.visualize and self.speed_pub.get_num_connections() > 0: if self.dirty(): self.make_np_array() markers = [marker_clear_all("/map")] normed_speeds = np.array(self.speed_profile) / np.max(self.speed_profile) last_speed = 0.0 for i, speed in enumerate(normed_speeds): if speed >= last_speed * 0.99: color = ColorRGBA(0, 1, 0, 0.8) else: color = ColorRGBA(1, 0, 0, 0.8) last_speed = speed markers.append(marker_from_point_radius(self.np_points[i,:], np.power(speed, 0.8) * scale, index=i, linewidth=0.05, color=color, lifetime=duration)) marker_array = MarkerArray(markers=markers) self.speed_pub.publish(marker_array)
def publish_cloud_markers(pub_ns, _arr, c='b', stamp=None, flip_rb=False, frame_id='map'): """ Publish point cloud on: pub_ns: Namespace on which the cloud will be published arr: numpy array (N x 3) for point cloud data c: Option 1: 'c', 'b', 'r' etc colors accepted by matplotlibs color Option 2: float ranging from 0 to 1 via matplotlib's jet colormap Option 3: numpy array (N x 3) with r,g,b vals ranging from 0 to 1 s: supported only by matplotlib plotting alpha: supported only by matplotlib plotting """ arr, carr = copy_pointcloud_data(_arr, c, flip_rb=flip_rb) marker = vis_msg.Marker(type=vis_msg.Marker.SPHERE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD) marker.header.frame_id = frame_id marker.header.stamp = stamp if stamp is not None else rospy.Time.now() marker.scale.x = 0.01 marker.scale.y = 0.01 # XYZ inds, = np.where(~np.isnan(arr).any(axis=1)) marker.points = [geom_msg.Point(arr[j,0], arr[j,1], arr[j,2]) for j in inds] # RGB (optionally alpha) N, D = arr.shape[:2] if D == 3: marker.colors = [std_msg.ColorRGBA(carr[j,0], carr[j,1], carr[j,2], 1.0) for j in inds] elif D == 4: marker.colors = [std_msg.ColorRGBA(carr[j,0], carr[j,1], carr[j,2], carr[j,3]) for j in inds] marker.lifetime = rospy.Duration() _publish_marker(marker) print 'Publishing marker', N
def publish_line_segments(pub_ns, _arr1, _arr2, c='b', stamp=None, frame_id='camera', flip_rb=False, size=0.002): """ Publish point cloud on: pub_ns: Namespace on which the cloud will be published arr: numpy array (N x 3) for point cloud data c: Option 1: 'c', 'b', 'r' etc colors accepted by matplotlibs color Option 2: float ranging from 0 to 1 via matplotlib's jet colormap Option 3: numpy array (N x 3) with r,g,b vals ranging from 0 to 1 s: supported only by matplotlib plotting alpha: supported only by matplotlib plotting """ arr1, carr = copy_pointcloud_data(_arr1, c, flip_rb=flip_rb) arr2 = (deepcopy(_arr2)).reshape(-1,3) marker = vis_msg.Marker(type=vis_msg.Marker.LINE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD) if not arr1.shape == arr2.shape: raise AssertionError marker.header.frame_id = frame_id marker.header.stamp = stamp if stamp is not None else rospy.Time.now() marker.scale.x = size marker.scale.y = size marker.color.b = 1.0 marker.color.a = 1.0 marker.pose.position = geom_msg.Point(0,0,0) marker.pose.orientation = geom_msg.Quaternion(0,0,0,1) # Handle 3D data: [ndarray or list of ndarrays] arr12 = np.hstack([arr1, arr2]) inds, = np.where(~np.isnan(arr12).any(axis=1)) marker.points = [] for j in inds: marker.points.extend([geom_msg.Point(arr1[j,0], arr1[j,1], arr1[j,2]), geom_msg.Point(arr2[j,0], arr2[j,1], arr2[j,2])]) # RGB (optionally alpha) marker.colors = [std_msg.ColorRGBA(carr[j,0], carr[j,1], carr[j,2], 1.0) for j in inds] marker.lifetime = rospy.Duration() _publish_marker(marker)
def publish_octomap(pub_ns, arr, carr, size=0.1, stamp=None, frame_id='map', flip_rb=False): """ Publish cubes list: """ marker = vis_msg.Marker(type=vis_msg.Marker.CUBE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD) marker.header.frame_id = frame_id marker.header.stamp = stamp if stamp is not None else rospy.Time.now() # Point width, and height marker.scale.x = size marker.scale.y = size marker.scale.z = size N, D = arr.shape # XYZ inds, = np.where(~np.isnan(arr).any(axis=1)) marker.points = [geom_msg.Point(arr[j,0], arr[j,1], arr[j,2]) for j in inds] # RGB (optionally alpha) rax, bax = 0, 2 # carr = carr.astype(np.float32) * 1.0 / 255 if flip_rb: rax, bax = 2, 0 if D == 3: marker.colors = [std_msg.ColorRGBA(carr[j,rax], carr[j,1], carr[j,bax], 1.0) for j in inds] elif D == 4: marker.colors = [std_msg.ColorRGBA(carr[j,rax], carr[j,1], carr[j,bax], carr[j,3]) for j in inds] marker.lifetime = rospy.Duration() _publish_marker(marker)
def handle_image_msg(msg): assert msg._type == 'sensor_msgs/Image' with g_fusion_lock: g_fusion.filter(EmptyObservation(msg.header.stamp.to_sec())) if g_fusion.last_state_mean is not None: pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean) yaw_q = ros_tf.transformations.quaternion_from_euler(0, 0, pose[3]) br = ros_tf.TransformBroadcaster() br.sendTransform(tuple(pose[:3]), tuple(yaw_q), rospy.Time.now(), 'obj_fuse_centroid', 'velodyne') if last_known_box_size is not None: # bounding box marker = Marker() marker.header.frame_id = "obj_fuse_centroid" marker.header.stamp = rospy.Time.now() marker.type = Marker.CUBE marker.action = Marker.ADD marker.scale.x = last_known_box_size[2] marker.scale.y = last_known_box_size[1] marker.scale.z = last_known_box_size[0] marker.color = ColorRGBA(r=0., g=1., b=0., a=0.5) marker.lifetime = rospy.Duration() pub = rospy.Publisher("obj_fuse_bbox", Marker, queue_size=10) pub.publish(marker)
def set_color(self, rgba, link="link", ambient_coeff=0.7, specular_coeff=0.7): rgba = list(rgba) if len(rgba) is 3: rgba +=[self.color_range] r,g,b,a = tuple(rgba) a = self.color_range if a is None else a message = MaterialColor() message.color_type = [COLOR_TYPE['diffuse'], COLOR_TYPE['ambient'], COLOR_TYPE['specular']] message.color.append(ColorRGBA(r/self.color_range, g/self.color_range, b/self.color_range, a/self.color_range)) message.color.append(ColorRGBA(0.7*r/self.color_range, 0.7*g/self.color_range, 0.7*b/self.color_range, a/self.color_range)) message.color.append(ColorRGBA(1-specular_coeff*(1-r/self.color_range), 1-specular_coeff*(1-g/self.color_range), 1-specular_coeff*(1-b/self.color_range), a/self.color_range)) self.color_publishers[link].publish(message)
def set_color(self, color): if len(color) is 3: color += [self.color_range] previous_color = self.color self.color = ColorRGBA(color[0]/self.color_range, color[1]/self.color_range, color[2]/self.color_range, color[3]/self.color_range) if self.color != previous_color and self._on : self._send_color_cmd(self.color)
def publish_pose_list(pub_ns, _poses, texts=[], stamp=None, size=0.05, frame_id='camera', seq=1): """ Publish Pose List on: pub_channel: Channel on which the cloud will be published """ poses = deepcopy(_poses) if not len(poses): return arrs = np.vstack([pose.matrix[:3,:3].T.reshape((1,9)) for pose in poses]) * size arrX = np.vstack([pose.tvec.reshape((1,3)) for pose in poses]) arrx, arry, arrz = arrs[:,0:3], arrs[:,3:6], arrs[:,6:9] # Point width, and height N = len(poses) markers = vis_msg.Marker(type=vis_msg.Marker.LINE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD) markers.header.frame_id = frame_id markers.header.stamp = stamp if stamp is not None else rospy.Time.now() markers.header.seq = seq markers.scale.x = size/20 # 0.01 markers.scale.y = size/20 # 0.01 markers.color.a = 1.0 markers.pose.position = geom_msg.Point(0,0,0) markers.pose.orientation = geom_msg.Quaternion(0,0,0,1) markers.points = [] markers.lifetime = rospy.Duration() for j in range(N): markers.points.extend([geom_msg.Point(arrX[j,0], arrX[j,1], arrX[j,2]), geom_msg.Point(arrX[j,0] + arrx[j,0], arrX[j,1] + arrx[j,1], arrX[j,2] + arrx[j,2])]) markers.colors.extend([std_msg.ColorRGBA(1.0, 0.0, 0.0, 1.0), std_msg.ColorRGBA(1.0, 0.0, 0.0, 1.0)]) markers.points.extend([geom_msg.Point(arrX[j,0], arrX[j,1], arrX[j,2]), geom_msg.Point(arrX[j,0] + arry[j,0], arrX[j,1] + arry[j,1], arrX[j,2] + arry[j,2])]) markers.colors.extend([std_msg.ColorRGBA(0.0, 1.0, 0.0, 1.0), std_msg.ColorRGBA(0.0, 1.0, 0.0, 1.0)]) markers.points.extend([geom_msg.Point(arrX[j,0], arrX[j,1], arrX[j,2]), geom_msg.Point(arrX[j,0] + arrz[j,0], arrX[j,1] + arrz[j,1], arrX[j,2] + arrz[j,2])]) markers.colors.extend([std_msg.ColorRGBA(0.0, 0.0, 1.0, 1.0), std_msg.ColorRGBA(0.0, 0.0, 1.0, 1.0)]) _publish_marker(markers)
def handle_radar_msg(msg, dont_fuse): assert msg._md5sum == '6a2de2f790cb8bb0e149d45d297462f8' publish_rviz_topics = True with g_fusion_lock: # do we have any estimation? if g_fusion.last_state_mean is not None: pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean) observations = RadarObservation.from_msg(msg, RADAR_TO_LIDAR, 0.9115) # find nearest observation to current object position estimation distance_threshold = 4.4 nearest = None nearest_dist = 1e9 for o in observations: dist = [o.x - pose[0], o.y - pose[1], o.z - pose[2]] dist = np.sqrt(np.array(dist).dot(dist)) if dist < nearest_dist and dist < distance_threshold: nearest_dist = dist nearest = o if nearest is not None: # FUSION if not dont_fuse: g_fusion.filter(nearest) if publish_rviz_topics: header = Header() header.frame_id = '/velodyne' header.stamp = rospy.Time.now() point = np.array([[nearest.x, nearest.y, nearest.z]], dtype=np.float32) rospy.Publisher(name='obj_radar_nearest', data_class=PointCloud2, queue_size=1).publish(pc2.create_cloud_xyz32(header, point)) #pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean) #br = ros_tf.TransformBroadcaster() #br.sendTransform(tuple(pose), (0,0,0,1), rospy.Time.now(), 'car_fuse_centroid', 'velodyne') # if last_known_box_size is not None: # # bounding box # marker = Marker() # marker.header.frame_id = "car_fuse_centroid" # marker.header.stamp = rospy.Time.now() # marker.type = Marker.CUBE # marker.action = Marker.ADD # marker.scale.x = last_known_box_size[2] # marker.scale.y = last_known_box_size[1] # marker.scale.z = last_known_box_size[0] # marker.color = ColorRGBA(r=1., g=1., b=0., a=0.5) # marker.lifetime = rospy.Duration() # pub = rospy.Publisher("car_fuse_bbox", Marker, queue_size=10) # pub.publish(marker)
def run(self): while self.is_looping(): navigation_tf_msg = TFMessage() try: motion_to_robot = almath.Pose2D(self.motion.getRobotPosition(True)) localization = self.navigation.getRobotPositionInMap() exploration_path = self.navigation.getExplorationPath() except Exception as e: navigation_tf_msg.transforms.append(self.get_navigation_tf(almath.Pose2D())) self.publishers["map_tf"].publish(navigation_tf_msg) self.rate.sleep() continue if len(localization) > 0 and len(localization[0]) == 3: navigation_to_robot = almath.Pose2D(localization[0][0], localization[0][1], localization[0][2]) navigation_to_motion = navigation_to_robot * almath.pinv(motion_to_robot) navigation_tf_msg.transforms.append(self.get_navigation_tf(navigation_to_motion)) self.publishers["map_tf"].publish(navigation_tf_msg) if len(localization) > 0: if self.publishers["uncertainty"].get_num_connections() > 0: uncertainty = Marker() uncertainty.header.frame_id = "/base_footprint" uncertainty.header.stamp = rospy.Time.now() uncertainty.type = Marker.CYLINDER uncertainty.scale = Vector3(localization[1][0], localization[1][1], 0.2) uncertainty.pose.position = Point(0, 0, 0) uncertainty.pose.orientation = self.get_ros_quaternion( almath.Quaternion_fromAngleAndAxisRotation(0, 0, 0, 1)) uncertainty.color = ColorRGBA(1, 0.5, 0.5, 0.5) self.publishers["uncertainty"].publish(uncertainty) if self.publishers["map"].get_num_connections() > 0: aggregated_map = None try: aggregated_map = self.navigation.getMetricalMap() except Exception as e: print("error " + str(e)) if aggregated_map is not None: map_marker = OccupancyGrid() map_marker.header.stamp = rospy.Time.now() map_marker.header.frame_id = "/map" map_marker.info.resolution = aggregated_map[0] map_marker.info.width = aggregated_map[1] map_marker.info.height = aggregated_map[2] map_marker.info.origin.orientation = self.get_ros_quaternion( almath.Quaternion_fromAngleAndAxisRotation(-1.57, 0, 0, 1)) map_marker.info.origin.position.x = aggregated_map[3][0] map_marker.info.origin.position.y = aggregated_map[3][1] map_marker.data = aggregated_map[4] self.publishers["map"].publish(map_marker) if self.publishers["exploration_path"].get_num_connections() > 0: path = Path() path.header.stamp = rospy.Time.now() path.header.frame_id = "/map" for node in exploration_path: current_node = PoseStamped() current_node.pose.position.x = node[0] current_node.pose.position.y = node[1] path.poses.append(current_node) self.publishers["exploration_path"].publish(path) self.rate.sleep()
def visualize(self): if self.circle_pub.get_num_connections() > 0 and self.circle_path: print "CIRCLE PUB" # root = self.space_explorer.tree_root # markers = [utils.marker_clear_all("/map")] # explored = self.non_overlapping_paths(root) # if type(explored[0]) == list: # explored = reduce(lambda x,y: x+y, explored) # print "explored: ", len(explored) # markers += [utils.marker_from_circle(circle, index=i, linewidth=0.01, color=ColorRGBA(0, 1, 0, 0.1), lifetime=1.0) \ # for i, circle in enumerate(explored)] # marker_array = MarkerArray(markers=markers) # self.circle_pub.publish(marker_array) # print len(self.circle_path.states) # marker = utils.marker_from_circles(self.circle_path.states, 1, "Markers_NS2", "/map", 3.0, [1.,0.,0.]) # self.circle_pub.publish(marker) markers = [utils.marker_clear_all("/map")] markers += [utils.marker_from_circle(circle, index=i, linewidth=0.05, color=ColorRGBA(1, 0, 0, 0.4), \ lifetime=4.0) for i, circle in enumerate(self.circle_path.states)] # print len(markers) marker_array = MarkerArray(markers=markers) self.circle_pub.publish(marker_array) if self.should_refine_trajectory and self.fast_circle_pub.get_num_connections() > 0 and self.fast_path: print "FAST CIRCLE PUB" # root = self.path_planner.tree_root # markers = [utils.marker_clear_all("/map")] # explored = self.non_overlapping_paths(root) # if type(explored[0]) == list: # explored = reduce(lambda x,y: x+y, explored) # print "explored: ", len(explored) # markers += [utils.marker_from_circle(circle, index=i, linewidth=0.01, color=ColorRGBA(0, 0, 1, 0.1), lifetime=1.0) \ # for i, circle in enumerate(explored)] markers = [utils.marker_clear_all("/map")] markers += [utils.marker_from_circle(circle, index=i, linewidth=0.05, color=ColorRGBA(1, 0, 0, 0.4), \ lifetime=4.0) for i, circle in enumerate(self.fast_path.states)] marker_array = MarkerArray(markers=markers) self.fast_circle_pub.publish(marker_array) if self.show_exploration_buffer and self.exploration_pub.get_num_connections() > 0: print "Publishing exploration grid" self.exploration_pub.publish(self.map.get_exploration_occupancy_grid()) # plt.imshow(self.map.exploration_buffer) # plt.imshow(self.map.permissible_region) # plt.show() print "visualize"