我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用sensor_msgs.msg.LaserScan()。
def build_laser_scan(self, ranges): result = LaserScan() result.header.stamp = rospy.Time.now() result.angle_min = -almath.PI result.angle_max = almath.PI if len(ranges[1]) > 0: result.angle_increment = (result.angle_max - result.angle_min) / len(ranges[1]) result.range_min = 0.0 result.range_max = ranges[0] for range_it in ranges[1]: result.ranges.append(range_it[1]) return result # do it!
def __init__(self): """Constructor for the class initialize topic subscription and instance variables """ self.r = rospy.Rate(5) self.publisher = rospy.Publisher('/cmd_vel', Twist, queue_size=10) rospy.Subscriber('/scan', LaserScan, self.process_scan) # user chooses which parking mode to be performed self.is_parallel = rospy.get_param('~parallel', False) # Instance Variables self.timestamp1 = None self.timestamp2 = None self.dist2Neato = None self.dist2Wall = None self.widthOfSpot = None self.twist = None self.radius = None # Adjusment to be made before moving along the arc self.adjustment = 0 self.isAligned = False
def __init__(self): #constants for racecar speed and angle calculations self.pSpeed = 0.3 #tweak self.pAngle = 1 #positive charge behind racecar to give it a "kick" (forward vector) self.propelling_charge = 4.5 #more constants self.charge = 0.01 self.safety_threshold = 0.3 self.speeds = [1] #Creates a list of speeds self.stuck_time = 0 self.stuck = False self.stuck_threshold = 2 rospy.init_node("explorer") self.sub = rospy.Subscriber('scan', LaserScan, self.laser_callback) self.pub = rospy.Publisher('/vesc/ackermann_cmd_mux/input/navigation', AckermannDriveStamped) rospy.on_shutdown(self.shutdown)
def run(self): """ Run our code """ rospy.loginfo("Start laser test code") #self.blank_image() #for x in range(0, 99): # self.add_point(x * 1.0, x * 1.0, 10) #cv2.imshow("Hackme", self.img) #cv2.waitKey(0) self.joint_subscriber = rospy.Subscriber("/multisense/joint_states", JointState, self.recv_joint_state) self.scan_subscriber = rospy.Subscriber("/multisense/lidar_scan", LaserScan, self.recv_scan) self.spindle = rospy.Publisher("/multisense/set_spindle_speed", Float64, queue_size=10) self.set_x_range(-1.0, 1.0) self.set_y_range(-1.0, 1.0) self.set_z_range(0.0, 2.0) rospy.sleep(0.1) rospy.loginfo("Setting spindle going") self.spindle.publish(Float64(1.0)) #self.zarj.zps.look_down() rospy.loginfo("Spin forever, hopefully receiving data...") while True: rospy.sleep(1.0) #print "Got cloud {}x{}".format(resp.cloud.height, resp.cloud.width) #img = self.create_image_from_cloud(resp.cloud.points) #cv2.destroyAllWindows() #print "New image" #cv2.imshow("My image", img) #cv2.waitKey(1) #print resp #cv_image = self.bridge.imgmsg_to_cv2(resp.cloud, "bgr8") #cv2.imshow("Point cloud", cv_image)
def laser_listener(): rospy.init_node('laser_listener', anonymous=True) rospy.Subscriber("/scan", LaserScan,callback) rospy.spin()
def publish_scan(self, angles, ranges): # publish the given angels and ranges as a laser scan message ls = LaserScan() ls.header = Utils.make_header("laser", stamp=self.last_stamp) ls.angle_min = np.min(angles) ls.angle_max = np.max(angles) ls.angle_increment = np.abs(angles[0] - angles[1]) ls.range_min = 0 ls.range_max = np.max(ranges) ls.ranges = ranges self.pub_fake_scan.publish(ls)
def setup(): """ Setup ROS node. Create listener and talker threads """ global pub_cmd_vel global srv_reset_positions print("\n Initiating ROS Node") rospy.init_node(NODE_NAME, anonymous=True) # ToDo: Change to 'odom' in ROS launch for Giraff: rospy.Subscriber('odom_giraff', Odometry, callback_odom) rospy.Subscriber('laser_scan', LaserScan, callback_base_scan) pub_cmd_vel = rospy.Publisher( 'cmd_vel', Twist, queue_size=1, tcp_nodelay=True) srv_reset_positions = rospy.ServiceProxy('reset_positions', Empty) thread_listener = Thread(target=listener, args=[]) thread_talker = Thread(target=talker, args=[]) thread_listener.start() thread_talker.start() rospy.loginfo("Node Initiated: %s", NODE_NAME) return
def __init__(self): self.received_data = None self.parsed_data = None self.angles = None self.bins = None self.averages = None self.sub = rospy.Subscriber("/scan", LaserScan, self.lidarCB, queue_size=1) self.pub = rospy.Publisher("/vesc/low_level/ackermann_cmd_mux/input/safety",\ AckermannDriveStamped, queue_size =1 ) self.thread = Thread(target=self.drive) self.thread.start() rospy.loginfo("safety node initialized")
def publish_scan(self, angles, ranges): ls = LaserScan() ls.header = Utils.make_header("laser", stamp=self.last_stamp) ls.angle_min = np.min(angles) ls.angle_max = np.max(angles) ls.angle_increment = np.abs(angles[0] - angles[1]) ls.range_min = 0 ls.range_max = np.max(ranges) ls.ranges = ranges self.pub_fake_scan.publish(ls)
def __init__(self): #constants for racecar speed and angle calculations self.pSpeed = 2 #tweak self.pAngle = .7 #positive charge behind racecar to give it a "kick" (forward vector) self.propelling_charge = 5 #more constants self.charge = 0.01 self.safety_threshold = 0.3 self.speeds = [1] #Creates a list of speeds self.stuck_time = 0 self.stuck = False self.stuck_threshold = 2 self.e1=0 rospy.init_node("explorer") self.sub = rospy.Subscriber('scan', LaserScan, self.laser_callback) self.pub = rospy.Publisher('/vesc/ackermann_cmd_mux/input/navigation', AckermannDriveStamped) rospy.on_shutdown(self.shutdown)
def __init__(self): super(Robot, self).__init__() rospy.init_node('betelgeuse-ai') self.speed = 0.0 self.turn = 0.0 self.drift = 0.0 self.drive_pub = rospy.Publisher("/ai/ai/drive_command", KamaroDriveCommand, queue_size=1) self.lidar_front_sub = rospy.Subscriber("/lidar_filtered/lidar_front_scan_filtered", LaserScan, self._on_lidar_front) self.lidar_back_sub = rospy.Subscriber("/lidar_filtered/lidar_back_scan_filtered", LaserScan, self._on_lidar_back) self.odom_sub = rospy.Subscriber("/odom/fused", Odometry, self._on_odometry)
def __init__(self, max_random_start, observation_dims, data_format, display, use_cumulated_reward): self.max_random_start = max_random_start self.action_size = 28 self.use_cumulated_reward = use_cumulated_reward self.display = display self.data_format = data_format self.observation_dims = observation_dims self.screen = np.zeros((self.observation_dims[0],self.observation_dims[1]), np.uint8) #self.dirToTarget = 0 self.robotPose = Pose() self.goalPose = Pose() self.robotRot = 0.0 self.prevDist = 0.0 self.boom = False self.numWins = 0 self.ep_reward = 0.0 self.terminal = 0 self.sendTerminal = 0 self.readyForNewData = True rospy.wait_for_service('reset_positions') self.resetStage = rospy.ServiceProxy('reset_positions', EmptySrv) # Subscribers: # rospy.Subscriber('base_scan', LaserScan, self.scanCB,queue_size=1) # rospy.Subscriber('base_pose_ground_truth', Odometry, self.poseUpdateCB, queue_size=1) # trying time sync: sub_scan_ = message_filters.Subscriber('base_scan', LaserScan) sub_pose_ = message_filters.Subscriber('base_pose_ground_truth', Odometry) ts = message_filters.TimeSynchronizer( [sub_scan_, sub_pose_], 1) ts.registerCallback( self.syncedCB) # publishers: self.pub_vel_ = rospy.Publisher('cmd_vel', Twist, queue_size = 1) self.pub_rew_ = rospy.Publisher("lastreward",Float64,queue_size=10) self.pub_goal_ = rospy.Publisher("goal", Pose, queue_size = 15)
def __init__(self, direction): if direction not in [RIGHT, LEFT]: rospy.loginfo("incorect %s wall selected. choose left or right") rospy.signal_shutdown() self.direction = direction if SHOW_VIS: self.viz = DynamicPlot() self.viz.initialize() self.pub = rospy.Publisher("/vesc/high_level/ackermann_cmd_mux/input/nav_0",\ AckermannDriveStamped, queue_size =1 ) self.sub = rospy.Subscriber("/scan", LaserScan, self.lidarCB, queue_size=1) if PUBLISH_LINE: self.line_pub = rospy.Publisher("/viz/line_fit", PolygonStamped, queue_size =1 ) # computed control instructions self.control = None self.steering_hist = CircularArray(HISTORY_SIZE) # containers for laser scanner related data self.data = None self.xs = None self.ys = None self.m = 0 self.c = 0 # flag to indicate the first laser scan has been received self.received_data = False # cached constants self.min_angle = None self.max_angle = None self.direction_muliplier = 0 self.laser_angles = None self.drive_thread = Thread(target=self.apply_control) self.drive_thread.start() if SHOW_VIS: while not rospy.is_shutdown(): self.viz_loop() rospy.sleep(0.1)