Python std_msgs.msg 模块,ColorRGBA() 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用std_msgs.msg.ColorRGBA()

项目:TA_example_labs    作者:mit-racecar    | 项目源码 | 文件源码
def marker_from_circle(circle, index=0, linewidth=0.1, color=ColorRGBA(1, 0, 0, 1), z=0., lifetime=10.0):
    marker = Marker()
    marker.header = make_header("/map")

    marker.ns = "Markers_NS"
    marker.id = index
    marker.type = Marker.CYLINDER
    marker.action = 0 # action=0 add/modify object
    # marker.color.r = 1.0
    # marker.color.g = 0.0
    # marker.color.b = 0.0
    # marker.color.a = 0.4
    marker.color = color
    marker.lifetime = rospy.Duration.from_sec(lifetime)

    marker.pose = Pose()
    marker.pose.position.z = z
    marker.pose.position.x = circle.center[0]
    marker.pose.position.y = circle.center[1]

    marker.scale = Vector3(circle.radius*2.0, circle.radius*2.0, 0)
    return marker
项目:TA_example_labs    作者:mit-racecar    | 项目源码 | 文件源码
def marker_from_point_radius(point, radius, index=0, linewidth=0.1, color=ColorRGBA(1, 0, 0, 1), z=0., lifetime=10.0):
    marker = Marker()
    marker.header = make_header("/map")

    marker.ns = "Speed_NS"
    marker.id = index
    marker.type = Marker.CYLINDER
    marker.action = 0 # action=0 add/modify object
    # marker.color.r = 1.0
    # marker.color.g = 0.0
    # marker.color.b = 0.0
    # marker.color.a = 0.4
    marker.color = color
    marker.lifetime = rospy.Duration.from_sec(lifetime)

    marker.pose = Pose()
    marker.pose.position.z = z
    marker.pose.position.x = point[0]
    marker.pose.position.y = point[1]

    marker.scale = Vector3(radius*2.0, radius*2.0, 0.001)
    return marker
项目:TA_example_labs    作者:mit-racecar    | 项目源码 | 文件源码
def publish_speeds(self, duration=0.0, scale=0.7):
        should_publish = len(self.speed_profile) > 1
        if self.visualize and self.speed_pub.get_num_connections() > 0:
            if self.dirty():
                self.make_np_array()
            markers = [marker_clear_all("/map")]
            normed_speeds = np.array(self.speed_profile) / np.max(self.speed_profile)
            last_speed = 0.0
            for i, speed in enumerate(normed_speeds):
                if speed >= last_speed * 0.99:
                    color = ColorRGBA(0, 1, 0, 0.8)
                else:
                    color = ColorRGBA(1, 0, 0, 0.8)
                last_speed = speed
                markers.append(marker_from_point_radius(self.np_points[i,:], np.power(speed, 0.8) * scale,
                    index=i, linewidth=0.05, color=color, lifetime=duration))

            marker_array = MarkerArray(markers=markers)
            self.speed_pub.publish(marker_array)
项目:pybot    作者:spillai    | 项目源码 | 文件源码
def publish_cloud_markers(pub_ns, _arr, c='b', stamp=None, flip_rb=False, frame_id='map'): 
    """
    Publish point cloud on:
    pub_ns: Namespace on which the cloud will be published
    arr: numpy array (N x 3) for point cloud data
    c: Option 1: 'c', 'b', 'r' etc colors accepted by matplotlibs color
       Option 2: float ranging from 0 to 1 via matplotlib's jet colormap
       Option 3: numpy array (N x 3) with r,g,b vals ranging from 0 to 1
    s: supported only by matplotlib plotting
    alpha: supported only by matplotlib plotting
    """
    arr, carr = copy_pointcloud_data(_arr, c, flip_rb=flip_rb)

    marker = vis_msg.Marker(type=vis_msg.Marker.SPHERE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD)
    marker.header.frame_id = frame_id
    marker.header.stamp = stamp if stamp is not None else rospy.Time.now()
    marker.scale.x = 0.01
    marker.scale.y = 0.01

    # XYZ
    inds, = np.where(~np.isnan(arr).any(axis=1))
    marker.points = [geom_msg.Point(arr[j,0], arr[j,1], arr[j,2]) for j in inds]

    # RGB (optionally alpha)
    N, D = arr.shape[:2]
    if D == 3: 
        marker.colors = [std_msg.ColorRGBA(carr[j,0], carr[j,1], carr[j,2], 1.0) 
                         for j in inds]
    elif D == 4: 
        marker.colors = [std_msg.ColorRGBA(carr[j,0], carr[j,1], carr[j,2], carr[j,3])
                         for j in inds]

    marker.lifetime = rospy.Duration()
    _publish_marker(marker)
    print 'Publishing marker', N
项目:pybot    作者:spillai    | 项目源码 | 文件源码
def publish_line_segments(pub_ns, _arr1, _arr2, c='b', stamp=None, frame_id='camera', flip_rb=False, size=0.002): 
    """
    Publish point cloud on:
    pub_ns: Namespace on which the cloud will be published
    arr: numpy array (N x 3) for point cloud data
    c: Option 1: 'c', 'b', 'r' etc colors accepted by matplotlibs color
       Option 2: float ranging from 0 to 1 via matplotlib's jet colormap
       Option 3: numpy array (N x 3) with r,g,b vals ranging from 0 to 1
    s: supported only by matplotlib plotting
    alpha: supported only by matplotlib plotting
    """
    arr1, carr = copy_pointcloud_data(_arr1, c, flip_rb=flip_rb)
    arr2 = (deepcopy(_arr2)).reshape(-1,3)

    marker = vis_msg.Marker(type=vis_msg.Marker.LINE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD)
    if not arr1.shape == arr2.shape: raise AssertionError    

    marker.header.frame_id = frame_id
    marker.header.stamp = stamp if stamp is not None else rospy.Time.now()
    marker.scale.x = size
    marker.scale.y = size
    marker.color.b = 1.0
    marker.color.a = 1.0
    marker.pose.position = geom_msg.Point(0,0,0)
    marker.pose.orientation = geom_msg.Quaternion(0,0,0,1)

    # Handle 3D data: [ndarray or list of ndarrays]
    arr12 = np.hstack([arr1, arr2])
    inds, = np.where(~np.isnan(arr12).any(axis=1))

    marker.points = []
    for j in inds: 
        marker.points.extend([geom_msg.Point(arr1[j,0], arr1[j,1], arr1[j,2]), 
                              geom_msg.Point(arr2[j,0], arr2[j,1], arr2[j,2])])

    # RGB (optionally alpha)
    marker.colors = [std_msg.ColorRGBA(carr[j,0], carr[j,1], carr[j,2], 1.0) 
                     for j in inds]

    marker.lifetime = rospy.Duration()
    _publish_marker(marker)
项目:pybot    作者:spillai    | 项目源码 | 文件源码
def publish_octomap(pub_ns, arr, carr, size=0.1, stamp=None, frame_id='map', flip_rb=False): 
    """
    Publish cubes list:
    """
    marker = vis_msg.Marker(type=vis_msg.Marker.CUBE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD)

    marker.header.frame_id = frame_id
    marker.header.stamp = stamp if stamp is not None else rospy.Time.now()

    # Point width, and height
    marker.scale.x = size
    marker.scale.y = size
    marker.scale.z = size

    N, D = arr.shape

    # XYZ
    inds, = np.where(~np.isnan(arr).any(axis=1))
    marker.points = [geom_msg.Point(arr[j,0], arr[j,1], arr[j,2]) for j in inds]

    # RGB (optionally alpha)
    rax, bax = 0, 2
    # carr = carr.astype(np.float32) * 1.0 / 255
    if flip_rb: rax, bax = 2, 0
    if D == 3: 
        marker.colors = [std_msg.ColorRGBA(carr[j,rax], carr[j,1], carr[j,bax], 1.0) 
                         for j in inds]
    elif D == 4: 
        marker.colors = [std_msg.ColorRGBA(carr[j,rax], carr[j,1], carr[j,bax], carr[j,3])
                         for j in inds]

    marker.lifetime = rospy.Duration()
    _publish_marker(marker)
项目:tea    作者:antorsae    | 项目源码 | 文件源码
def handle_image_msg(msg):
    assert msg._type == 'sensor_msgs/Image'

    with g_fusion_lock:
        g_fusion.filter(EmptyObservation(msg.header.stamp.to_sec()))

        if g_fusion.last_state_mean is not None:
            pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean)

            yaw_q = ros_tf.transformations.quaternion_from_euler(0, 0, pose[3])
            br = ros_tf.TransformBroadcaster()
            br.sendTransform(tuple(pose[:3]), tuple(yaw_q), rospy.Time.now(), 'obj_fuse_centroid', 'velodyne')

            if last_known_box_size is not None:
                # bounding box
                marker = Marker()
                marker.header.frame_id = "obj_fuse_centroid"
                marker.header.stamp = rospy.Time.now()
                marker.type = Marker.CUBE
                marker.action = Marker.ADD
                marker.scale.x = last_known_box_size[2]
                marker.scale.y = last_known_box_size[1]
                marker.scale.z = last_known_box_size[0]
                marker.color = ColorRGBA(r=0., g=1., b=0., a=0.5)
                marker.lifetime = rospy.Duration()
                pub = rospy.Publisher("obj_fuse_bbox", Marker, queue_size=10)
                pub.publish(marker)
项目:arm_scenario_simulator    作者:cmaestre    | 项目源码 | 文件源码
def set_color(self, rgba, link="link", ambient_coeff=0.7, specular_coeff=0.7):
        rgba = list(rgba)
        if len(rgba) is 3: rgba +=[self.color_range]
        r,g,b,a = tuple(rgba)
        a = self.color_range if a is None else a
        message = MaterialColor()
        message.color_type = [COLOR_TYPE['diffuse'], COLOR_TYPE['ambient'], COLOR_TYPE['specular']]
        message.color.append(ColorRGBA(r/self.color_range, g/self.color_range, b/self.color_range, a/self.color_range))
        message.color.append(ColorRGBA(0.7*r/self.color_range, 0.7*g/self.color_range, 0.7*b/self.color_range, a/self.color_range))
        message.color.append(ColorRGBA(1-specular_coeff*(1-r/self.color_range), 1-specular_coeff*(1-g/self.color_range), 1-specular_coeff*(1-b/self.color_range), a/self.color_range))
        self.color_publishers[link].publish(message)
项目:arm_scenario_simulator    作者:cmaestre    | 项目源码 | 文件源码
def set_color(self, color):
        if len(color) is 3: color += [self.color_range]
        previous_color = self.color
        self.color = ColorRGBA(color[0]/self.color_range, color[1]/self.color_range, color[2]/self.color_range, color[3]/self.color_range)
        if self.color != previous_color and self._on : self._send_color_cmd(self.color)
项目:pybot    作者:spillai    | 项目源码 | 文件源码
def publish_pose_list(pub_ns, _poses, texts=[], stamp=None, size=0.05, frame_id='camera', seq=1):
    """
    Publish Pose List on:
    pub_channel: Channel on which the cloud will be published
    """
    poses = deepcopy(_poses)
    if not len(poses): return 
    arrs = np.vstack([pose.matrix[:3,:3].T.reshape((1,9)) for pose in poses]) * size
    arrX = np.vstack([pose.tvec.reshape((1,3)) for pose in poses])
    arrx, arry, arrz = arrs[:,0:3], arrs[:,3:6], arrs[:,6:9]

    # Point width, and height
    N = len(poses)

    markers = vis_msg.Marker(type=vis_msg.Marker.LINE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD)

    markers.header.frame_id = frame_id
    markers.header.stamp = stamp if stamp is not None else rospy.Time.now()
    markers.header.seq = seq
    markers.scale.x = size/20 # 0.01
    markers.scale.y = size/20 # 0.01

    markers.color.a = 1.0

    markers.pose.position = geom_msg.Point(0,0,0)
    markers.pose.orientation = geom_msg.Quaternion(0,0,0,1)

    markers.points = []
    markers.lifetime = rospy.Duration()        

    for j in range(N): 
        markers.points.extend([geom_msg.Point(arrX[j,0], arrX[j,1], arrX[j,2]), 
                                       geom_msg.Point(arrX[j,0] + arrx[j,0], 
                                                       arrX[j,1] + arrx[j,1], 
                                                       arrX[j,2] + arrx[j,2])])
        markers.colors.extend([std_msg.ColorRGBA(1.0, 0.0, 0.0, 1.0), std_msg.ColorRGBA(1.0, 0.0, 0.0, 1.0)])

        markers.points.extend([geom_msg.Point(arrX[j,0], arrX[j,1], arrX[j,2]), 
                               geom_msg.Point(arrX[j,0] + arry[j,0], 
                                               arrX[j,1] + arry[j,1], 
                                               arrX[j,2] + arry[j,2])])
        markers.colors.extend([std_msg.ColorRGBA(0.0, 1.0, 0.0, 1.0), std_msg.ColorRGBA(0.0, 1.0, 0.0, 1.0)])


        markers.points.extend([geom_msg.Point(arrX[j,0], arrX[j,1], arrX[j,2]), 
                               geom_msg.Point(arrX[j,0] + arrz[j,0], 
                                               arrX[j,1] + arrz[j,1], 
                                               arrX[j,2] + arrz[j,2])])
        markers.colors.extend([std_msg.ColorRGBA(0.0, 0.0, 1.0, 1.0), std_msg.ColorRGBA(0.0, 0.0, 1.0, 1.0)])

    _publish_marker(markers)
项目:tea    作者:antorsae    | 项目源码 | 文件源码
def handle_radar_msg(msg, dont_fuse):
    assert msg._md5sum == '6a2de2f790cb8bb0e149d45d297462f8'

    publish_rviz_topics = True

    with g_fusion_lock:
        # do we have any estimation?
        if g_fusion.last_state_mean is not None:
            pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean)

            observations = RadarObservation.from_msg(msg, RADAR_TO_LIDAR, 0.9115)

            # find nearest observation to current object position estimation
            distance_threshold = 4.4
            nearest = None
            nearest_dist = 1e9
            for o in observations:
                dist = [o.x - pose[0], o.y - pose[1], o.z - pose[2]]
                dist = np.sqrt(np.array(dist).dot(dist))

                if dist < nearest_dist and dist < distance_threshold:
                    nearest_dist = dist
                    nearest = o

            if nearest is not None:
                # FUSION
                if not dont_fuse:
                    g_fusion.filter(nearest)

                if publish_rviz_topics:
                    header = Header()
                    header.frame_id = '/velodyne'
                    header.stamp = rospy.Time.now()
                    point = np.array([[nearest.x, nearest.y, nearest.z]], dtype=np.float32)
                    rospy.Publisher(name='obj_radar_nearest',
                      data_class=PointCloud2,
                      queue_size=1).publish(pc2.create_cloud_xyz32(header, point))

                    #pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean)

                    #br = ros_tf.TransformBroadcaster()
                    #br.sendTransform(tuple(pose), (0,0,0,1), rospy.Time.now(), 'car_fuse_centroid', 'velodyne')

#                     if last_known_box_size is not None:
#                         # bounding box
#                         marker = Marker()
#                         marker.header.frame_id = "car_fuse_centroid"
#                         marker.header.stamp = rospy.Time.now()
#                         marker.type = Marker.CUBE
#                         marker.action = Marker.ADD
#                         marker.scale.x = last_known_box_size[2]
#                         marker.scale.y = last_known_box_size[1]
#                         marker.scale.z = last_known_box_size[0]
#                         marker.color = ColorRGBA(r=1., g=1., b=0., a=0.5)
#                         marker.lifetime = rospy.Duration()
#                         pub = rospy.Publisher("car_fuse_bbox", Marker, queue_size=10)
#                         pub.publish(marker)
项目:spqrel_tools    作者:LCAS    | 项目源码 | 文件源码
def run(self):
        while self.is_looping():
            navigation_tf_msg = TFMessage()
            try:
                motion_to_robot = almath.Pose2D(self.motion.getRobotPosition(True))
                localization = self.navigation.getRobotPositionInMap()
                exploration_path = self.navigation.getExplorationPath()
            except Exception as e:
                navigation_tf_msg.transforms.append(self.get_navigation_tf(almath.Pose2D()))
                self.publishers["map_tf"].publish(navigation_tf_msg)
                self.rate.sleep()
                continue
            if len(localization) > 0 and len(localization[0]) == 3:
                navigation_to_robot = almath.Pose2D(localization[0][0], localization[0][1], localization[0][2])
                navigation_to_motion = navigation_to_robot * almath.pinv(motion_to_robot)
                navigation_tf_msg.transforms.append(self.get_navigation_tf(navigation_to_motion))
            self.publishers["map_tf"].publish(navigation_tf_msg)
            if len(localization) > 0:
                if self.publishers["uncertainty"].get_num_connections() > 0:
                    uncertainty = Marker()
                    uncertainty.header.frame_id = "/base_footprint"
                    uncertainty.header.stamp = rospy.Time.now()
                    uncertainty.type = Marker.CYLINDER
                    uncertainty.scale = Vector3(localization[1][0], localization[1][1], 0.2)
                    uncertainty.pose.position = Point(0, 0, 0)
                    uncertainty.pose.orientation = self.get_ros_quaternion(
                        almath.Quaternion_fromAngleAndAxisRotation(0, 0, 0, 1))
                    uncertainty.color = ColorRGBA(1, 0.5, 0.5, 0.5)
                    self.publishers["uncertainty"].publish(uncertainty)
            if self.publishers["map"].get_num_connections() > 0:
                aggregated_map = None
                try:
                    aggregated_map = self.navigation.getMetricalMap()
                except Exception as e:
                    print("error " + str(e))
                if aggregated_map is not None:
                    map_marker = OccupancyGrid()
                    map_marker.header.stamp = rospy.Time.now()
                    map_marker.header.frame_id = "/map"
                    map_marker.info.resolution = aggregated_map[0]
                    map_marker.info.width = aggregated_map[1]
                    map_marker.info.height = aggregated_map[2]
                    map_marker.info.origin.orientation = self.get_ros_quaternion(
                        almath.Quaternion_fromAngleAndAxisRotation(-1.57, 0, 0, 1))
                    map_marker.info.origin.position.x = aggregated_map[3][0]
                    map_marker.info.origin.position.y = aggregated_map[3][1]
                    map_marker.data = aggregated_map[4]
                    self.publishers["map"].publish(map_marker)
            if self.publishers["exploration_path"].get_num_connections() > 0:
                path = Path()
                path.header.stamp = rospy.Time.now()
                path.header.frame_id = "/map"
                for node in exploration_path:
                    current_node = PoseStamped()
                    current_node.pose.position.x = node[0]
                    current_node.pose.position.y = node[1]
                    path.poses.append(current_node)
                self.publishers["exploration_path"].publish(path)
            self.rate.sleep()
项目:TA_example_labs    作者:mit-racecar    | 项目源码 | 文件源码
def visualize(self):
        if self.circle_pub.get_num_connections() > 0 and self.circle_path:
            print "CIRCLE PUB"
            # root = self.space_explorer.tree_root
            # markers = [utils.marker_clear_all("/map")]
            # explored = self.non_overlapping_paths(root)
            # if type(explored[0]) == list:
            #   explored = reduce(lambda x,y: x+y, explored)
            # print "explored: ", len(explored)
            # markers += [utils.marker_from_circle(circle, index=i, linewidth=0.01, color=ColorRGBA(0, 1, 0, 0.1), lifetime=1.0) \
            #             for i, circle in enumerate(explored)]

            # marker_array = MarkerArray(markers=markers)
            # self.circle_pub.publish(marker_array)
            # print len(self.circle_path.states)
            # marker = utils.marker_from_circles(self.circle_path.states, 1, "Markers_NS2", "/map", 3.0, [1.,0.,0.])
            # self.circle_pub.publish(marker)

            markers = [utils.marker_clear_all("/map")]
            markers += [utils.marker_from_circle(circle, index=i, linewidth=0.05, color=ColorRGBA(1, 0, 0, 0.4), \
                        lifetime=4.0)
                        for i, circle in enumerate(self.circle_path.states)]
            # print len(markers)
            marker_array = MarkerArray(markers=markers)
            self.circle_pub.publish(marker_array)

        if self.should_refine_trajectory and self.fast_circle_pub.get_num_connections() > 0 and self.fast_path:
            print "FAST CIRCLE PUB"
            # root = self.path_planner.tree_root
            # markers = [utils.marker_clear_all("/map")]
            # explored = self.non_overlapping_paths(root)
            # if type(explored[0]) == list:
            #   explored = reduce(lambda x,y: x+y, explored)
            # print "explored: ", len(explored)
            # markers += [utils.marker_from_circle(circle, index=i, linewidth=0.01, color=ColorRGBA(0, 0, 1, 0.1), lifetime=1.0) \
            #             for i, circle in enumerate(explored)]
            markers = [utils.marker_clear_all("/map")]
            markers += [utils.marker_from_circle(circle, index=i, linewidth=0.05, color=ColorRGBA(1, 0, 0, 0.4), \
                        lifetime=4.0)
                        for i, circle in enumerate(self.fast_path.states)]

            marker_array = MarkerArray(markers=markers)
            self.fast_circle_pub.publish(marker_array)

        if self.show_exploration_buffer and self.exploration_pub.get_num_connections() > 0:
            print "Publishing exploration grid"
            self.exploration_pub.publish(self.map.get_exploration_occupancy_grid())
        # plt.imshow(self.map.exploration_buffer)
        # plt.imshow(self.map.permissible_region)
        # plt.show()
        print "visualize"