我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用geometry_msgs.msg.PointStamped()。
def __init__(self): rospy.init_node('gaze', anonymous=False) self.lock = threading.RLock() with self.lock: self.current_state = GazeState.IDLE self.last_state = self.current_state # Initialize Variables self.glance_timeout = 0 self.glance_timecount = 0 self.glance_played = False self.idle_timeout = 0 self.idle_timecount = 0 self.idle_played = False rospy.loginfo('\033[92m[%s]\033[0m waiting for bringup social_mind...'%rospy.get_name()) rospy.wait_for_service('environmental_memory/read_data') rospy.wait_for_service('social_events_memory/read_data') self.rd_memory = {} self.rd_memory['environmental_memory'] = rospy.ServiceProxy('environmental_memory/read_data', ReadData) self.rd_memory['social_events_memory'] = rospy.ServiceProxy('social_events_memory/read_data', ReadData) rospy.Subscriber('raising_events', RaisingEvents, self.handle_raising_events) rospy.Subscriber('gaze_focusing', String, self.handle_gaze_focusing) self.pub_gaze_cmd = rospy.Publisher('gaze_command', GazeCommand, queue_size=10) self.pub_viz_gaze_cmd = rospy.Publisher('visualization_gaze_cmd', PointStamped, queue_size=10) rospy.Timer(rospy.Duration(GAZE_CONTROLLER_PERIOD), self.handle_gaze_controller) rospy.loginfo('\033[92m[%s]\033[0m initialized...'%rospy.get_name()) rospy.spin()
def lookAt(self, pos, sim=False): goal = PointHeadGoal() point = PointStamped() point.header.frame_id = self.frame point.point.x = pos[0] point.point.y = pos[1] point.point.z = pos[2] goal.target = point # Point using kinect frame goal.pointing_frame = 'head_mount_kinect_rgb_link' if sim: goal.pointing_frame = 'high_def_frame' goal.pointing_axis.x = 1 goal.pointing_axis.y = 0 goal.pointing_axis.z = 0 goal.min_duration = rospy.Duration(1.0) goal.max_velocity = 1.0 self.pointHeadClient.send_goal_and_wait(goal)
def find_button(macro): macro.fc.send_stereo_camera() image, details = macro.fc.zarj.eyes.get_cloud_image_with_details(macro.fc.cloud) things = Things(image, details, 2) if things.array_button is not None: point = PointStamped() point.header = macro.fc.cloud.header det = things.array_button.computed_center point.point.x = det[0] point.point.y = det[1] point.point.z = det[2] button = macro.fc.zarj.transform.tf_buffer.transform(point, macro.fc.zarj.walk.lfname) log("button found at: {}/{}/{}".format(button.point.x, button.point.y, button.point.z)) macro.fc.points[0] = [ button.point.x, button.point.y, button.point.z ] macro.fc.points[1] = None return True return False
def find_plug(macro): macro.fc.send_stereo_camera() image, details = macro.fc.zarj.eyes.get_cloud_image_with_details(macro.fc.cloud) things = Things(image, details, 2) if things.power_plug: point = PointStamped() point.header = macro.fc.cloud.header det = details[things.power_plug.position[1], things.power_plug.position[0]] point.point.x = det[0] point.point.y = det[1] point.point.z = det[2] plug = macro.fc.zarj.transform.tf_buffer.transform(point, macro.fc.zarj.walk.lfname) log("plug at: {}/{}/{}".format(plug.point.x, plug.point.y, plug.point.z)) macro.fc.points[0] = [plug.point.x, plug.point.y, plug.point.z] macro.fc.points[1] = None return True return False
def find_repair_button(macro): image, details = macro.fc.zarj.eyes.get_cloud_image_with_details(macro.fc.cloud) things = Things(image, details, 3) if things.repair_button is not None and things.repair_button.computed_center is not None: point = PointStamped() point.header = macro.fc.cloud.header det = things.repair_button.computed_center point.point.x = det[0] point.point.y = det[1] point.point.z = det[2] button = macro.fc.zarj.transform.tf_buffer.transform(point, macro.fc.zarj.walk.lfname) log("button found at: {}/{}/{}".format(button.point.x, button.point.y, button.point.z)) return button.point return None
def _ensure_leak(self): """ If we do not have a leak detected assume our hand is in the right place""" global LAST_LEAK, LEAK_SIDE if LAST_LEAK is None: palm = self.fc.zarj.hands.get_current_hand_center_transform( 'left', 'world') LAST_LEAK = PointStamped() LAST_LEAK.header.frame_id = 'world' LAST_LEAK.point = palm.translation log("No leak previously detected.. Assuming left hand is correct") log('Leak at {}'.format(LAST_LEAK)) current = self.fc.zarj.hands.get_joint_states('left') LEAK_SIDE = current[0]['leftShoulderPitch'][0] > 0 log('Leak is behind us: '.format(LEAK_SIDE))
def __init__(self): rospy.Subscriber('/mavros/local_position/pose', PoseStamped, self.pose_callback, queue_size=1) rospy.Subscriber('/marker_data', MarkerArray, self.marker_callback, queue_size=1) self.vision_position_pub = rospy.Publisher('/mavros/vision_pose/pose', PoseStamped, queue_size=1) self.offset_pub = rospy.Publisher('~offset', PointStamped, queue_size=1) self.position_fcu_pub = rospy.Publisher('~position/fcu', PoseStamped, queue_size=1) self.pose_pub = rospy.Publisher('~position/marker_map', PoseStamped, queue_size=1) self.vision_position_message = PoseStamped() self.position_fcu_message = PoseStamped() self.position_fcu_message.header.frame_id = 'vision_fcu' self.pose_message = PoseStamped() self.pose_message.header.frame_id = 'marker_map' self.marker_position_offset = PointStamped()
def subscribePoint(): rospy.Subscriber('/clicked_point', PointStamped, printclickpoint)
def build_point(self, x, y, z, header): """ Take an X,Y,Z and convert it into a head point """ point = PointStamped() point.header = header point.point.x = x point.point.y = y point.point.z = z head_pnt = self.tf.tf_buffer.transform(point, self.frame_id) head_pnt.header.seq = header.seq return head_pnt
def _find_stair_base(self, cloud, distance, center, width): """ Try to find the base of the stairs really exactly """ _, details = self.zarj.eyes.get_cloud_image_with_details(cloud) occ = 0 pnt = None while pnt is None: if not np.isnan(details[distance][center-occ][0]): pnt = details[distance][center-occ] break if not np.isnan(details[distance][center+occ][0]): pnt = details[distance][center+occ] break occ += 1 if occ >= width/6: occ = 0 distance -= 1 if pnt is not None: point = PointStamped() point.header = cloud.header point.point.x = pnt[0] point.point.y = pnt[1] point.point.z = pnt[2] pnt = self.zarj.transform.tf_buffer.transform(point, self.zarj.walk.lfname) return pnt.point.x log("ERROR: Could not find stair base") return None
def find_button(self): cloud = self.zarj.eyes.get_stereo_cloud() image, details = self.zarj.eyes.get_cloud_image_with_details(cloud) things = Things(image, details, 2, True) if things.array_button is not None and things.array_button.computed_center is not None: button = PointStamped() button.header = cloud.header button.point.x = things.array_button.computed_center[0] button.point.y = things.array_button.computed_center[1] button.point.z = things.array_button.computed_center[2] button_in_foot = self.zarj.transform.tf_buffer.transform(button, self.zarj.walk.lfname) p = button_in_foot.point print "JPW sez button is {}/{}/{}".format(p.x, p.y, p.z)
def handle_mouse(self, event, x, y, flags, param): if event == cv2.EVENT_LBUTTONUP: point = PointStamped() point.header = self.cloud.header point.point.x = self.details[y, x][0] point.point.y = self.details[y, x][1] point.point.z = self.details[y, x][2] print x, y, self.details[y,x] result = self.zarj.transform.tf_buffer.transform(point, 'rightIndexFingerPitch1Link') print result
def find_choke(macro): macro.fc.send_stereo_camera() image, details = macro.fc.zarj.eyes.get_cloud_image_with_details(macro.fc.cloud) things = Things(image, details, 2) if things.choke_inner: point = PointStamped() point.header = macro.fc.cloud.header det = details[things.choke_inner.position[1], things.choke_inner.position[0]] point.point.x = det[0] point.point.y = det[1] point.point.z = det[2] inner = macro.fc.zarj.transform.tf_buffer.transform(point, macro.fc.zarj.walk.lfname) log("inner choke found at: {}/{}/{}".format(inner.point.x, inner.point.y, inner.point.z)) if things.choke_outer: point = PointStamped() point.header = macro.fc.cloud.header det = details[things.choke_outer.position[1], things.choke_outer.position[0]] point.point.x = det[0] point.point.y = det[1] point.point.z = det[2] outer = macro.fc.zarj.transform.tf_buffer.transform(point, macro.fc.zarj.walk.lfname) log("outer choke found at: {}/{}/{}".format(outer.point.x, outer.point.y, outer.point.z)) macro.fc.points[0] = [outer.point.x, outer.point.y, outer.point.z] macro.fc.points[1] = [inner.point.x, inner.point.y, inner.point.z] return True return False
def start(self, _=None): """ Start the macro """ global LAST_LEAK self.leak = None self.best_leak = 0.01 sub = rospy.Subscriber("/task3/checkpoint5/leak", Leak, self._leak_update) self.fc.zarj.neck.neck_control([0.35, 1.0, 0], True) self._check_distance() found, stage = self._full_cycle(0) if self.stop: return while found is not None and not found: log('Leak not found at : {}'.format(stage)) found, stage = self._full_cycle(stage) if self.stop: break if found: self.fc.process_get_palm_msg(ZarjGetPalmCommand('left')) joint_values = self.fc.zarj.hands.get_joint_values('left') ack = ZarjGetArmJointsResponse('left', joint_values) self.fc.zarj_comm.push_message(ack) log('leak found! stage {}'.format(stage)) palm_transform = self.fc.zarj.hands.get_current_hand_center_transform( 'left', 'world') LAST_LEAK = PointStamped() LAST_LEAK.header.frame_id = 'world' LAST_LEAK.point = palm_transform.translation log('Leak at {}'.format(LAST_LEAK)) log('Arm and palm positions updated') sub.unregister()
def clicked_pose(self, msg): ''' Receive pose messages from RViz and initialize the particle distribution in response. ''' if isinstance(msg, PointStamped): self.initialize_global() elif isinstance(msg, PoseWithCovarianceStamped): self.initialize_particles_pose(msg.pose.pose)
def __init__(self): rospy.init_node('tag_position_publisher', anonymous=True) self.rate = rospy.Rate(LocalizationNode.EVERY_SECOND) self.basestations = [] self.extractParams() self.frame = '/target_' + str(self.tag) self.ekf = ROSEKF(self.tag, self.basestations, selectBestPositions, Poly3(self.measurement_model_coeffs), self.var_z, Poly5(self.measurement_weight_model_coeffs), self.max_selection, self.debug) self.publisher = rospy.Publisher(self.frame, PointStamped, queue_size=10)
def createCommunicators(self): self.measurement_requester = rospy.Publisher('measurements_request', String, queue_size=10) self.measurement_reciver = rospy.Subscriber('measurements', MeasurementList, self.receiveMeasurements) self.position_publisher = rospy.Publisher('/target_' + str(self.tag), PointStamped, queue_size=10)
def publishPosition(self): msg = PointStamped() msg.header.stamp = rospy.Time.now() msg.header.frame_id = '/target_' + str(self.tag) msg.point = Point(self.estimated_position[0], self.estimated_position[1], 0) self.position_publisher.publish(msg) self.publishCovarianceMatrix()
def __init__(self, address, x, y): self.address = address self.position = np.array([x, y]) self.frame = "basestation" self.station_id = address.split('.', 4)[-1] self.custom_frame = self.frame + '_' + self.station_id self.publisher = rospy.Publisher(self.frame, PointStamped, queue_size=10)
def publishPosition(self): msg = PointStamped() msg.header.stamp = rospy.Time.now() msg.header.frame_id = self.frame msg.point = Point(self.position[0], self.position[1], 0) self.publisher.publish(msg)
def __init__(self, address, x, y): self.address = address self.position = np.array([x, y]) self.frame = "basestation" self.station_id = address.split('.', 4)[-1] self.namespace = self.frame + '_' + self.station_id self.publisher = rospy.Publisher(self.frame, PointStamped, queue_size=10)
def __init__(self): self.save_path = os.path.join(rospy.get_param("~save_path"), time.strftime("%Y-%m-%d-%H-%M-%S") + ".traj") self.trajectory = LineTrajectory("/built_trajectory") # receive either goal points or clicked points self.publish_point_sub = rospy.Subscriber("/clicked_point", PointStamped, self.clicked_point_callback, queue_size=1) self.nav_goal_sub = rospy.Subscriber("/move_base_simple/goal", PoseStamped, self.clicked_point_callback, queue_size=1) # save the built trajectory on shutdown rospy.on_shutdown(self.saveTrajectory)
def clicked_point_callback(self, msg): if isinstance(msg, PointStamped): self.trajectory.addPoint(msg.point) elif isinstance(msg, PoseStamped): self.trajectory.addPoint(msg.pose.position) # publish visualization of the path being built self.trajectory.publish_viz()
def __init__(self): self.rate = rospy.get_param("~rate", 20.0) self.period = 1.0 / self.rate # angular mode maps angular z directly to steering angle # (adjusted appropriately) # non-angular mode is somewhat suspect, but it turns # a linear y into a command to turn just so that the # achieved linear x and y match the desired, though # the vehicle has to turn to do so. # Non-angular mode is not yet supported. self.angular_mode = rospy.get_param("~angular_mode", True) # Not used yet # self.tf_buffer = tf2_ros.Buffer() # self.tf = tf2_ros.TransformListener(self.tf_buffer) # broadcast odometry self.br = tf2_ros.TransformBroadcaster() self.ts = TransformStamped() self.ts.header.frame_id = "map" self.ts.child_frame_id = "base_link" self.ts.transform.rotation.w = 1.0 self.angle = 0 # The cmd_vel is assumed to be in the base_link frame centered # on the middle of the two driven wheels # This is half the distance between the two drive wheels self.base_radius = rospy.get_param("~base_radius", 0.06) self.wheel_radius = rospy.get_param("~wheel_radius", 0.04) self.left_wheel_joint = rospy.get_param("~left_wheel_joint", "wheel_front_left_joint") self.right_wheel_joint = rospy.get_param("~right_wheel_joint", "wheel_front_right_joint") self.point_pub = rospy.Publisher("cmd_vel_spin_center", PointStamped, queue_size=1) self.joint_pub = {} self.joint_pub['left'] = rospy.Publisher("front_left/joint_state", JointState, queue_size=1) self.joint_pub['right'] = rospy.Publisher("front_right/joint_state", JointState, queue_size=1) # TODO(lucasw) is there a way to get TwistStamped out of standard # move_base publishers? # TODO(lucasw) make this more generic, read in a list of any number of wheels # the requirement is that that all have to be aligned, and also need # a set spin center. self.ind = {} self.joint_state = {} self.joint_state['left'] = JointState() self.joint_state['left'].name.append(self.left_wheel_joint) self.joint_state['left'].position.append(0.0) self.joint_state['left'].velocity.append(0.0) self.joint_state['right'] = JointState() self.joint_state['right'].name.append(self.right_wheel_joint) self.joint_state['right'].position.append(0.0) self.joint_state['right'].velocity.append(0.0) self.cmd_vel = Twist() rospy.Subscriber("cmd_vel", Twist, self.cmd_vel_callback, queue_size=2) self.timer = rospy.Timer(rospy.Duration(self.period), self.update)
def detection(self, hsv_image): """Check for detection in the image """ mask = cv2.inRange(hsv_image, self.color_low, self.color_high) if self.baseline_cnt > 0: nmask = cv2.bitwise_not(mask) if self.baseline is None: rospy.loginfo("getting baseline for {}".format(self.name)) self.baseline = nmask else: self.baseline = cv2.bitwise_or(nmask, self.baseline) mask = cv2.bitwise_and(mask, self.baseline) count = cv2.countNonZero(mask) + self.baseline_fuzzy self.low_count = max(self.low_count, count) self.high_count = self.low_count + self.baseline_fuzzy self.baseline_cnt -= 1 return elif self.baseline is not None: mask = cv2.bitwise_and(mask, self.baseline) count = cv2.countNonZero(mask) if count > self.low_count and self.active is None: self.active = rospy.get_rostime() rospy.loginfo("{} ACTIVE ({})".format(self.name, count)) self.cloud.reset() if self.callbacks[0] is not None: self.callbacks[0](self.name) elif self.active is not None and count < self.high_count: rospy.loginfo("{} GONE ({})".format(self.name, count)) self.cloud.reset() self.active = None if self.callbacks[2] is not None: self.published = False self.report_count = 0 if self.callbacks[1] is not None: self.callbacks[1](self.name) # DEBUGGING to see what the masked image for the request is if self.debug: cv2.namedWindow(self.name, cv2.WINDOW_NORMAL) if self.baseline is not None: cv2.namedWindow(self.name+'_baseline', cv2.WINDOW_NORMAL) cv2.imshow(self.name+'_baseline', self.baseline) cv2.imshow(self.name, mask) cv2.waitKey(1) # to to see if we notify the location callback if self.is_active() and self.report_count > self.min_reports: now = rospy.get_rostime() if (self.active + self.stablize_time) < now: self.published = True point = PointStamped() center = self.cloud.find_center() point.header.seq = 1 point.header.stamp = now point.header.frame_id = self.frame_id point.point.x = center[0] point.point.y = center[1] point.point.z = center[2] if self.callbacks[2] is not None: self.callbacks[2](self.name, point)
def process_palm_msg(self, msg): if math.isnan(msg.x) or math.isnan(msg.y) or math.isnan(msg.z): self.oclog("Warning - NAN position found. skipping"); return point_vector = Vector3(msg.x, msg.y, msg.z) if not msg.relative: point = PointStamped() point.point = point_vector if self.cloud is None: point.header.seq = 1 point.header.stamp = rospy.get_rostime() - rospy.Duration(0.1) else: point.header = deepcopy(self.cloud.header) if msg.use_left_foot: point.header.frame_id = self.zarj.walk.lfname desired_frame = self.zarj.transform.lookup_transform('world', point.header.frame_id, point.header.stamp) result = self.zarj.transform.tf_buffer.transform(point, 'world') if math.isnan(msg.yaw) or math.isnan(msg.pitch) or math.isnan(msg.roll): current = self.zarj.hands.get_current_hand_center_transform(msg.sidename) rotation = current.rotation else: desired_rotation = quaternion_multiply( msg_q_to_q(desired_frame.transform.rotation), yaw_pitch_roll_to_q([msg.yaw, msg.pitch, msg.roll]) ) rotation = q_to_msg_q(desired_rotation) self.zarj.hands.move_hand_center_abs(msg.sidename, result.point, rotation) self.oclog("Palm move to {}, {}, {}; yaw {}, pitch {}, roll {} commanded".format( fmt(msg.x), fmt(msg.y), fmt(msg.z), fmt(msg.yaw), fmt(msg.pitch), fmt(msg.roll))) else: if math.isnan(msg.yaw): msg.yaw = 0.0 if math.isnan(msg.pitch): msg.pitch = 0.0 if math.isnan(msg.roll): msg.roll = 0.0 self.zarj.hands.move_with_yaw_pitch_roll(msg.sidename, point_vector, [ msg.yaw, msg.pitch, msg.roll ]) self.oclog("Relative palm move to {}, {}, {} commanded; yaw {}, pitch {}, roll {}".format( fmt(msg.x), fmt(msg.y), fmt(msg.z), fmt(msg.yaw), fmt(msg.pitch), fmt(msg.roll)))
def _find_control(self, tag, mask, details, header, factor, other_one=None): """ Find a control """ corners = [None, None] indexes = mask.nonzero() points = [] indexes2 = [] for i in range(len(indexes[1])): pnt = details[indexes[0][i], indexes[1][i]] if pnt[2] > DISH_MAXIMUM: print 'rejecting point, too distant ', pnt[2] mask[indexes[0][i], indexes[1][i]] = 0 continue if other_one: dist = np.sqrt((other_one[0].point.y - pnt[0])**2 + \ (other_one[0].point.z - pnt[1])**2 +\ (other_one[0].point.x - pnt[2])**2) if dist > 1.0: print 'rejecting point, too far away from other one', dist mask[indexes[0][i], indexes[1][i]] = 0 continue points.append(details[indexes[0][i], indexes[1][i]]) indexes2.append((indexes[0][i], indexes[1][i])) points = np.array(points) if len(points) < 3: log("Error: only {} {} points found.".format(len(points), tag)) raise ZarjConfused('Error: only {} {} points found.'.format( len(points), tag)) mean = np.mean(points, axis=0)[2] stdv = np.std(points, axis=0)[2] for i, pnt in enumerate(points): if abs(pnt[2] - mean) < factor * stdv: _add_bounding_point(pnt, corners) else: mask[indexes[0][i], indexes[1][i]] = 0 point = PointStamped() point.header = header point.point.x = corners[0][0] point.point.y = corners[0][1] point.point.z = corners[0][2] cor1 = self.fc.zarj.transform.tf_buffer.transform(point, 'pelvis') point = PointStamped() point.header = header point.point.x = corners[1][0] point.point.y = corners[1][1] point.point.z = corners[1][2] cor2 = self.fc.zarj.transform.tf_buffer.transform(point, 'pelvis') log("{}, {} {}".format(tag, cor1, cor2)) return (cor1, cor2)
def _find_feature(self, tag, low, high): """ Find a feature on the door""" # we only want the center of the image cloud = self.fc.zarj.eyes.get_stereo_cloud() image, details = self.fc.zarj.eyes.get_cloud_image_with_details(cloud) shape = image.shape image = image[0:shape[0], shape[1]/3:2*shape[1]/3] details = details[0:shape[0], shape[1]/3:2*shape[1]/3] colors = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) image_dump = os.environ.get("ZARJ_IMAGE_DUMP") mask = cv2.inRange(colors, low, high) if image_dump is not None: image_idx = rospy.get_time() cv2.imwrite(image_dump+'/door_{}.png'.format(image_idx), image) cv2.imwrite(image_dump+'/door_{}_{}.png'.format(tag, image_idx), mask) indexes = mask.nonzero() points = [] for i in range(len(indexes[1])): pnt = details[indexes[0][i], indexes[1][i]] if pnt[2] > 2.0: print 'Discard non {} point, too far away'.format(tag) continue points.append(details[indexes[0][i], indexes[1][i]]) points = np.array(points) if len(points) < 10: log("Failed to find {} in the door".format(tag)) return None avg = np.mean(points, axis=0) point = PointStamped() point.header = cloud.header point.point.x = avg[0] point.point.y = avg[1] point.point.z = avg[2] final = self.fc.zarj.transform.tf_buffer.transform(point, 'pelvis') log('{} located at about {}'.format(tag, final.point)) return final.point.y
def __init__(self): self.rate = rospy.get_param("~rate", 20.0) self.period = 1.0 / self.rate # angular mode maps angular z directly to steering angle # (adjusted appropriately) # non-angular mode is somewhat suspect, but it turns # a linear y into a command to turn just so that the # achieved linear x and y match the desired, though # the vehicle has to turn to do so. self.angular_mode = rospy.get_param("~angular_mode", True) self.tf_buffer = tf2_ros.Buffer() self.tf = tf2_ros.TransformListener(self.tf_buffer) self.steer_link = rospy.get_param("~steer_link", "lead_steer") self.steer_joint = rospy.get_param("~steer_joint", "lead_steer_joint") # +/- this angle self.min_steer_angle = rospy.get_param("~min_steer_angle", -0.7) self.max_steer_angle = rospy.get_param("~max_steer_angle", 0.7) self.wheel_joint = rospy.get_param("~wheel_joint", "wheel_lead_axle") self.wheel_radius = rospy.get_param("~wheel_radius", 0.15) # the spin center is always on the fixed axle y axis of the fixed axle, # it is assume zero rotation on the steer_joint puts the steering # at zero rotation with respect to fixed axle x axis (or xz plane) self.fixed_axle_link = rospy.get_param("~fixed_axle_link", "back_axle") self.point_pub = rospy.Publisher("cmd_vel_spin_center", PointStamped, queue_size=1) self.steer_pub = rospy.Publisher("steer_joint_states", JointState, queue_size=1) # TODO(lucasw) is there a way to get TwistStamped out of standard # move_base publishers? self.joint_state = JointState() self.joint_state.name.append(self.steer_joint) self.joint_state.position.append(0.0) self.joint_state.velocity.append(0.0) self.joint_state.name.append(self.wheel_joint) self.joint_state.position.append(0.0) self.joint_state.velocity.append(0.0) self.cmd_vel = Twist() rospy.Subscriber("cmd_vel", Twist, self.cmd_vel_callback, queue_size=2) self.timer = rospy.Timer(rospy.Duration(self.period), self.update)