我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用std_msgs.msg.Header()。
def ik_solve(limb, pos, orient): ns = "ExternalTools/" + limb + "/PositionKinematicsNode/IKService" iksvc = rospy.ServiceProxy(ns, SolvePositionIK) ikreq = SolvePositionIKRequest() hdr = Header(stamp=rospy.Time.now(), frame_id='base') poses = { str(limb): PoseStamped(header=hdr, pose=Pose(position=pos, orientation=orient))} ikreq.pose_stamp.append(poses[limb]) try: rospy.wait_for_service(ns, 5.0) resp = iksvc(ikreq) except (rospy.ServiceException, rospy.ROSException), e: rospy.logerr("Service call failed: %s" % (e,)) return 1 if resp.isValid[0]: limb_joints = dict(zip(resp.joints[0].name, resp.joints[0].position)) return limb_joints else: return Errors.RaiseNotReachable(pos)
def __init__(self): #init code rospy.init_node("robotGame") self.currentDist = 1 self.previousDist = 1 self.reached = False self.tf = TransformListener() self.left_joint_names = ['yumi_joint_1_l', 'yumi_joint_2_l', 'yumi_joint_3_l', 'yumi_joint_4_l', 'yumi_joint_5_l', 'yumi_joint_6_l', 'yumi_joint_7_l', 'gripper_l_joint'] self.right_joint_names = ['yumi_joint_1_r', 'yumi_joint_2_r', 'yumi_joint_3_r', 'yumi_joint_4_r', 'yumi_joint_5_r', 'yumi_joint_6_r', 'yumi_joint_7_r', 'gripper_r_joint'] self.left_positions = [-2.01081820427463881, 1.4283937236421274, -1.3593418228836045, -0.19315625641494183, 1.7016501799872579, 0.6573540231496411, 3.404315594906305, 0.0] self.right_positions = [0.01081820427463881, 2.4283937236421274, 0.3593418228836045, -0.19315625641494183, 1.7016501799872579, 0.6573540231496411, 3.404315594906305, 1.8145107750466565] self.rjv = [] self.ljv = [] self.pub = rospy.Publisher('my_joint_states', JointState, queue_size=1) self.js = JointState() self.js.header = Header() self.js.name = self.left_joint_names + self.right_joint_names self.js.velocity = [] self.js.effort = [] self.sub = rospy.Subscriber('/joint_states', JointState, self.jsCB) self.destPos = np.random.uniform(0,0.25, size =(3)) self.reset()
def publish_interframe_motion(self, last_features, new_features, status, err): self.good_old = last_features[(status == 1) & (err < 12.0)] self.good_new = new_features[(status == 1) & (err < 12.0)] # TODO: clean up these features before publishing self.pub_keypoint_motion.publish( header=Header( stamp=rospy.Time.now(), # TODO: use camera image time frame_id='tango_camera_2d' ), prev_x=self.good_old[:, 0], prev_y=self.good_old[:, 1], cur_x=self.good_new[:, 0], cur_y=self.good_new[:, 1] )
def __get_point_msg(cls, data, frame): """ Deserializes a location to a message of type PointStamped. Args: data: A dictionary containing the location. frame: Frame for the point. Returns: A message of type PointStamped with the location. """ header = Header() header.stamp = rospy.get_rostime() header.frame_id = frame msg = GeoPointStamped() msg.header = header msg.position.latitude = data["latitude"] msg.position.longitude = data["longitude"] return msg
def compute_fk_client(self, group, joint_values, links): rospy.wait_for_service('compute_fk') try: compute_fk = rospy.ServiceProxy('compute_fk', GetPositionFK) header = Header() header.stamp = rospy.Time.now() header.frame_id = group.get_pose_reference_frame() rs = RobotState() rs.joint_state.header = header rs.joint_state.name = group.get_active_joints() rs.joint_state.position = joint_values res = compute_fk(header, links, rs) return res.pose_stamped except rospy.ServiceException, e: print "Service call failed: %s" % e
def process_radar_tracks(msg): assert msg._md5sum == '6a2de2f790cb8bb0e149d45d297462f8' tracks = RadarObservation.from_msg(msg) num_tracks = len(tracks) acc=[] cloud = np.zeros([num_tracks, 3], dtype=np.float32) for i, track in enumerate(tracks): cloud[i] = [track.x, track.y, track.z] - np.array(RADAR_TO_LIDAR) if np.abs(track.y) < 2: #acc.append(track.x) #acc.append(track.power) print track.vx*3.7, track.vy*3.7 #print x, y, z #print acc #msg.header.stamp header = Header() header.stamp = msg.header.stamp header.frame_id = 'velodyne' cloud_msg = pc2.create_cloud_xyz32(header, cloud) cloud_msg.width = 1 cloud_msg.height = num_tracks rospy.Publisher('radar_points', PointCloud2, queue_size=1).publish(cloud_msg)
def publish(): header = Header() header.stamp = rospy.Time.now() header.frame_id = 'map' for role, robot_id in WorldModel.assignments.items(): if robot_id is not None: command = WorldModel.commands[role] if command.velocity_control_enable: msg = TwistStamped() msg.header = header msg.twist = command.target_velocity pubs_velocity[robot_id].publish(msg) else: send_pose = WorldModel.tf_listener.transformPose("map",command.target_pose) send_pose.header.stamp = rospy.Time.now() pubs_position[robot_id].publish(send_pose) pubs_kick_velocity[robot_id].publish(Float32(command.kick_power)) status = AIStatus() # TODO(Asit) use navigation_enable instead avoidBall. status.avoidBall = command.navigation_enable status.do_chip = command.chip_enable status.dribble_power = command.dribble_power pubs_ai_status[robot_id].publish(status) command.reset_adjustments()
def test_import_absolute_msg(self): print_importers() # Verify that files exists and are importable import std_msgs.msg as std_msgs self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header)
def test_import_class_from_absolute_msg(self): """Verify that""" print_importers() # Verify that files exists and are importable from std_msgs.msg import Bool, Header self.assert_std_message_classes(Bool, Header)
def test_double_import_uses_cache(self): print_importers() import std_msgs.msg as std_msgs self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header) import std_msgs.msg as std_msgs2 self.assertTrue(std_msgs == std_msgs2)
def test_import_absolute_msg(self): print_importers() # Verify that files exists and are importable import std_msgs.msg as std_msgs print_importers_of(std_msgs) self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header)
def _vector_to(self, vector, to='base'): h = Header() h.stamp = rospy.Time(0) h.frame_id = '{}_gripper'.format(self.limb_name) v = Vector3(*vector) v_base = self.tl.transformVector3(to, Vector3Stamped(h, v)).vector v_cartesian = [v_base.x, v_base.y, v_base.z, 0, 0, 0] return v_cartesian
def pick(self, pose, direction=(0, 0, 1), distance=0.1): pregrasp_pose = self.translate(pose, direction, distance) while True: try: #stamped_pose = PoseStamped( Header(0, rospy.Time(0), n.robot.base), pose) self.move_ik(pregrasp_pose) break except AttributeError: print("can't find valid pose for gripper cause I'm dumb") return False rospy.sleep(0.5) # We want to block end effector opening so that the next # movement happens with the gripper fully opened. #self.gripper.open() while True: #rospy.loginfo("Going down to pick (at {})".format(self.tip.max())) if(not self.sensors_updated()): return if self.tip.max() > 2000: break else: scaled_direction = (di / 100 for di in direction) #print("Scaled direction: ", scaled_direction) v_cartesian = self._vector_to(scaled_direction) v_cartesian[2] = -.01 #print("cartesian: ", v_cartesian) v_joint = self.compute_joint_velocities(v_cartesian) #print("joint : ", v_joint) self.limb.set_joint_velocities(v_joint) rospy.loginfo('Went down!')
def _transform_vector(self, v, mode='speed', to='base'): h = Header() h.stamp = rospy.Time(0) h.frame_id = '{}_gripper'.format(self.bx.limb_name) v = Vector3(*v) v_base = self.tl.transformVector3(to, Vector3Stamped(h, v)).vector if mode == 'speed': v_cartesian = np.array([0, 0, 0, v_base.x, v_base.y, v_base.z]) elif mode == 'direction': v_cartesian = np.array([v_base.x, v_base.y, v_base.z, 0, 0, 0]) return v_cartesian
def move_calib_position(self): # move arm to the calibration position self.calib_pose = PoseStamped( Header(0, rospy.Time(0), self.robot.base), Pose(Point(0.338520675898,-0.175860479474,0.0356775075197), Quaternion(-0.0183493755758,0.708424150944, 0.704712092876, 0.0343413949013))) self.robot.move_ik(self.calib_pose)
def move_ik(self, stamped_pose): """Take a PoseStamped and move the arm there. """ action_address = ('/' + self.robot_type + '_driver/pose_action/tool_pose') self.client = actionlib.SimpleActionClient( action_address, kinova_msgs.msg.ArmPoseAction) if not isinstance(stamped_pose, PoseStamped): raise TypeError("No duck typing here? :(") pose = stamped_pose.pose position, orientation = pose.position, pose.orientation # Send a cartesian goal to the action server. Adapted from kinova_demo rospy.loginfo("Waiting for SimpleAction server") self.client.wait_for_server() goal = kinova_msgs.msg.ArmPoseGoal() goal.pose.header = Header(frame_id=(self.robot_type + '_link_base')) goal.pose.pose.position = position goal.pose.pose.orientation = orientation rospy.loginfo("Sending goal") print goal self.client.send_goal(goal) # rospy.loginfo("Waiting for up to {} s for result".format(t)) if self.client.wait_for_result(rospy.Duration(100)): rospy.loginfo("Action succeeded") print self.client.get_result() return self.client.get_result() else: self.client.cancel_all_goals() rospy.loginfo(' the cartesian action timed-out') return None
def make_header(frame_id, stamp=None): ''' Creates a Header object for stamped ROS objects ''' if stamp == None: stamp = rospy.Time.now() header = Header() header.stamp = stamp header.frame_id = frame_id return header
def __get_flyzone(cls, data, frame): """ Deserializes flight boundary data into a FlyZoneArray message. Args: data: List of dictionaries. frame: Frame id for the boundaries. Returns: A FlyzoneArray message type which contains an array of FlyZone messages, which contains a polygon for the boundary, a max altitude and a min altitude. """ # Generate header for all zones. header = Header() header.stamp = rospy.get_rostime() header.frame_id = frame flyzones = FlyZoneArray() for zone in data: flyzone = FlyZone() flyzone.zone.header = header flyzone.max_alt = feet_to_meters(zone["altitude_msl_max"]) flyzone.min_alt = feet_to_meters(zone["altitude_msl_min"]) # Change boundary points to ros message of type polygon. for waypoint in zone["boundary_pts"]: point = GeoPoint() point.latitude = waypoint["latitude"] point.longitude = waypoint["longitude"] flyzone.zone.polygon.points.append(point) flyzones.flyzones.append(flyzone) return flyzones
def __get_waypoints(cls, data, frame): """ Deserializes a list of waypoints into a marker message. Args: data: List of dictionaries corresponding to waypoints. frame: Frame of the markers. Returns: A marker message of type Points, with a list of points in order corresponding to each waypoint. """ header = Header() header.stamp = rospy.get_rostime() header.frame_id = frame waypoints = WayPoints() waypoints.header = header # Ensure there is no rotation by setting w to 1. for point in data: waypoint = GeoPoint() altitude = feet_to_meters(point["altitude_msl"]) waypoint.latitude = point["latitude"] waypoint.longitude = point["longitude"] waypoint.altitude = altitude waypoints.waypoints.append(waypoint) return waypoints
def __get_search_grid(cls, data, frame): """ Deserializes a the search grid into a polygon message. Args: data: List of dictionaries corresponding to the search grid points. frame: Frame for the polygon. Returns: Message of type PolygonStamped with the bounds of the search grid. """ header = Header() header.stamp = rospy.get_rostime() header.frame_id = frame search_grid = GeoPolygonStamped() search_grid.header = header for point in data: boundary_pnt = GeoPoint() altitude = feet_to_meters(point["altitude_msl"]) boundary_pnt.latitude = point["latitude"] boundary_pnt.longitude = point["longitude"] boundary_pnt.altitude = altitude search_grid.polygon.points.append(boundary_pnt) return search_grid
def baxter_ik_move(self, rpy_pose): # quaternion_pose = conversions.list_to_pose_stamped(rpy_pose, "base") quaternion_pose = convert_pose_to_msg(rpy_pose, "base") node = "ExternalTools/" + self.limb + "/PositionKinematicsNode/IKService" ik_service = rospy.ServiceProxy(node, SolvePositionIK) ik_request = SolvePositionIKRequest() hdr = Header(stamp=rospy.Time.now(), frame_id="base") ik_request.pose_stamp.append(quaternion_pose) try: rospy.wait_for_service(node, 5.0) ik_response = ik_service(ik_request) except (rospy.ServiceException, rospy.ROSException), error_message: rospy.logerr("Service request failed: %r" % (error_message,)) print "ERROR - baxter_ik_move - Failed to append pose" return 1 if (ik_response.isValid[0]): # convert response to joint position control dictionary limb_joints = dict(zip(ik_response.joints[0].name, ik_response.joints[0].position)) # move limb self.limb_interface.move_to_joint_positions(limb_joints) else: print "ERROR - baxter_ik_move - No valid joint configuration found" return 2 self.getPose() #Store current pose into self.pose print "Move Executed" return -1 #Because we can't get full moveit_commander on the raspberry pi due to memory limitations, rewritten implementation of the required conversion function
def forward_kinematic(self, joint_state, base='base', links=None): def compute_fk_client(): try: header = Header() header.stamp = rospy.Time.now() header.frame_id = self.prefix + '/base' rs = RobotState() rs.joint_state = joint_state res = self.compute_fk(header, links, rs) return res.pose_stamped except rospy.ServiceException, e: print "Service call failed: %s" % e # in case of troubles return 0 pose stamped return [] if links is None: links = self.end_effectors['whole_body'] if type(links) is not list: if links == "all": links = self.get_link_names('whole_body') else: links = [links] # check that the base is in links if base != 'base' and base not in links: links.append(base) pose_stamped_list = compute_fk_client() if not pose_stamped_list: return {} # transform it in a dict of poses pose_dict = {} if base != 'base': tr_base = transformations.pose_to_list(pose_stamped_list[links.index(base)].pose) inv_base = transformations.inverse_transform(tr_base) for i in range(len(links)): if links[i] != base: tr = transformations.pose_to_list(pose_stamped_list[i].pose) pose_dict[links[i]] = transformations.multiply_transform(inv_base, tr) else: for i in range(len(links)): pose_dict[links[i]] = transformations.pose_to_list(pose_stamped_list[i].pose) return pose_dict
def get_ik_joints(target_x, target_y, initial_z, target_z, target_theta, n_steps): ns = "ExternalTools/left/PositionKinematicsNode/IKService" iksvc = rospy.ServiceProxy(ns, SolvePositionIK) ikreq = SolvePositionIKRequest(seed_mode=SolvePositionIKRequest.SEED_CURRENT) hdr = Header(stamp=rospy.Time.now(), frame_id='base') qx = np.sin(target_theta * 0.5) qy = np.cos(target_theta * 0.5) for z in np.arange(initial_z, target_z, (target_z-initial_z)/n_steps): pose = PoseStamped( header=hdr, pose=Pose( position=Point( x=target_x, y=target_y, z=z, ), orientation=Quaternion( x=qx, y=qy, z=0., w=0., ), ), ) ikreq.pose_stamp.append(pose) try: rospy.wait_for_service(ns, 5.0) resp = iksvc(ikreq) except (rospy.ServiceException, rospy.ROSException), e: rospy.logerr("Service call failed: %s" % (e,)) return 1 return [j for v, j in zip(resp.isValid, resp.joints) if v]
def make_header(frame_id, stamp=None): if stamp == None: stamp = rospy.Time.now() header = Header() header.stamp = stamp header.frame_id = frame_id return header
def rosmsg(self): """:obj:`sensor_msgs.CamerInfo` : Returns ROS CamerInfo msg """ msg_header = Header() msg_header.frame_id = self._frame msg_roi = RegionOfInterest() msg_roi.x_offset = 0 msg_roi.y_offset = 0 msg_roi.height = 0 msg_roi.width = 0 msg_roi.do_rectify = 0 msg = CameraInfo() msg.header = msg_header msg.height = self._height msg.width = self._width msg.distortion_model = 'plumb_bob' msg.D = [0.0, 0.0, 0.0, 0.0, 0.0] msg.K = [self._fx, 0.0, self._cx, 0.0, self._fy, self._cy, 0.0, 0.0, 1.0] msg.R = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0] msg.P = [self._fx, 0.0, self._cx, 0.0, 0.0, self._fx, self._cy, 0.0, 0.0, 0.0, 1.0, 0.0] msg.binning_x = 0 msg.binning_y = 0 msg.roi = msg_roi return msg
def handle_radar_msg(msg, dont_fuse): assert msg._md5sum == '6a2de2f790cb8bb0e149d45d297462f8' publish_rviz_topics = True with g_fusion_lock: # do we have any estimation? if g_fusion.last_state_mean is not None: pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean) observations = RadarObservation.from_msg(msg, RADAR_TO_LIDAR, 0.9115) # find nearest observation to current object position estimation distance_threshold = 4.4 nearest = None nearest_dist = 1e9 for o in observations: dist = [o.x - pose[0], o.y - pose[1], o.z - pose[2]] dist = np.sqrt(np.array(dist).dot(dist)) if dist < nearest_dist and dist < distance_threshold: nearest_dist = dist nearest = o if nearest is not None: # FUSION if not dont_fuse: g_fusion.filter(nearest) if publish_rviz_topics: header = Header() header.frame_id = '/velodyne' header.stamp = rospy.Time.now() point = np.array([[nearest.x, nearest.y, nearest.z]], dtype=np.float32) rospy.Publisher(name='obj_radar_nearest', data_class=PointCloud2, queue_size=1).publish(pc2.create_cloud_xyz32(header, point)) #pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean) #br = ros_tf.TransformBroadcaster() #br.sendTransform(tuple(pose), (0,0,0,1), rospy.Time.now(), 'car_fuse_centroid', 'velodyne') # if last_known_box_size is not None: # # bounding box # marker = Marker() # marker.header.frame_id = "car_fuse_centroid" # marker.header.stamp = rospy.Time.now() # marker.type = Marker.CUBE # marker.action = Marker.ADD # marker.scale.x = last_known_box_size[2] # marker.scale.y = last_known_box_size[1] # marker.scale.z = last_known_box_size[0] # marker.color = ColorRGBA(r=1., g=1., b=0., a=0.5) # marker.lifetime = rospy.Duration() # pub = rospy.Publisher("car_fuse_bbox", Marker, queue_size=10) # pub.publish(marker)
def main_fcn(): pub = rospy.Publisher('joint_states',JointState,queue_size = 10) pub2 = rospy.Publisher('read_angles',Int32,queue_size = 10) pub3 = rospy.Publisher('uarm_status',String,queue_size = 100) rospy.init_node('display',anonymous = True) rate = rospy.Rate(20) joint_state_send = JointState() joint_state_send.header = Header() joint_state_send.name = ['base_to_base_rot','base_rot_to_link_1','link_1_to_link_2' ,'link_2_to_link_3','link_3_to_link_end'] joint_state_send.name = joint_state_send.name+ ['link_1_to_add_1','link_2_to_add_4','link_3_to_add_2','base_rot_to_add_3','link_2_to_add_5'] angle = {} trigger = 1 while not rospy.is_shutdown(): joint_state_send.header.stamp = rospy.Time.now() pub2.publish(1) if trigger == 1: pub3.publish("detach") time.sleep(1) trigger = 0 angle['s1'] = rospy.get_param('servo_1')*math.pi/180 angle['s2'] = rospy.get_param('servo_2')*math.pi/180 angle['s3'] = rospy.get_param('servo_3')*math.pi/180 angle['s4'] = rospy.get_param('servo_4')*math.pi/180 joint_state_send.position = [angle['s1'],angle['s2'],-angle['s2']-angle['s3'],angle['s3'],angle['s4']] joint_state_send.position = joint_state_send.position + [-angle['s2']-angle['s3'],angle['s3'],PI/2-angle['s3']] joint_state_send.position = joint_state_send.position + [angle['s2']-PI,angle['s2']+angle['s3']-PI/2] joint_state_send.velocity = [0] joint_state_send.effort = [] pub.publish(joint_state_send) rate.sleep()
def move_cup(self): self.gripper.close() rotate = long(random.randrange(20, 180))*(3.14156)/(180) print("rotate: " + str(rotate)) t = self.tl.getLatestCommonTime("/root", "/right_gripper") cur_position, quaternion = self.tl.lookupTransform("/root", "/right_gripper", t) quaternion = [1,0,0,0] requrd_rot = (0,0,rotate) # in radians requrd_trans = (0,0,0) rotated_pose = self.getOffsetPoses(cur_position, quaternion, requrd_rot, requrd_trans) trans_5= tuple(rotated_pose[:3]) quat_5= tuple(rotated_pose[3:]) br = tf.TransformBroadcaster() br.sendTransform(trans_5, quat_5, rospy.Time.now(), "pick_pose", "/root") pick_pose = PoseStamped(Header(0, rospy.Time(0), n.robot.base), Pose(Point(*trans_5), Quaternion(*quat_5))) self.move_ik(pick_pose) self.gripper.open() t = self.tl.getLatestCommonTime("/root", "/right_gripper") cur_position, quaternion = self.tl.lookupTransform("/root", "/right_gripper", t) cur_position = list(cur_position) cur_position[2] = cur_position[2] + 0.08 pose = Pose(Point(*cur_position), Quaternion(*quaternion)) while True: try: stamped_pose = PoseStamped( Header(0, rospy.Time(0), n.robot.base), pose) self.move_ik(stamped_pose) break except AttributeError: print("can't find valid pose for gripper cause I'm dumb") return False rospy.sleep(2) home = PoseStamped( Header(0, rospy.Time(0), n.robot.base), Pose(Point(0.60558, -0.24686, 0.093535), Quaternion(0.99897, -0.034828, 0.027217, 0.010557))) self.move_ik(home) rospy.sleep(2)
def control_from_sensor_values(self): log_values = np.log(self.bx.inside) # Match which side is which. Ideally, if the sign of the diff # matches whether the gripper needs to move towards the # positive or negative part of the y axis in left_gripper. # That is, we need left side - right side (using left/right # like in l_gripper_{l, r}_finger_tip tfs) # We want to take log(values) for a better behaved controller inside_diff = log_values[7:] - log_values[:7] scalar_diff = sum(inside_diff) / len(inside_diff) # Take negative for one of the sides, so that angles should # match for a parallel object in the gripper l_angle, _ = np.polyfit(np.arange(7), log_values[:7], 1) r_angle, _ = np.polyfit(np.arange(7), -log_values[7:], 1) rospy.loginfo('Angle computed from l: {}'.format(np.rad2deg(l_angle))) rospy.loginfo('Angle computed from r: {}'.format(np.rad2deg(r_angle))) avg_angle = np.arctan((l_angle + r_angle) / 2.0) # Let's get a frame by the middle of the end effector # p = Point(0, 0, -0.05) p = (0, 0, -0.05) # Of course, tf uses (w, x, y, z), Quaternion uses x, y, z, # w. However, tf.TransformBroadcaster().sendTransform uses the # tf order. # q = Quaternion(q[1], q[2], q[3], q[0]) q = tf.transformations.quaternion_about_axis(avg_angle, (1, 0, 0)) # We had a lot of trouble sending a transform (p, q) from # left_gripper, and then asking for a point in the last frame # in the tf coordinate. Timing issues that I couldn't # solve. Instead, do it manually here: m1 = tf.transformations.quaternion_matrix(q) m1[:3, 3] = p p = (0, scalar_diff / 100, 0.05) m2 = np.eye(4) m2[:3, 3] = p m = m2.dot(m1) # Extract pose now p = Point(*m[:3, 3]) q = Quaternion(*tf.transformations.quaternion_from_matrix(m)) time = rospy.Time(0) h = Header() h.frame_id = 'left_gripper' h.stamp = time pose = Pose(p, q) new_endpose = self.tl.transformPose('base', PoseStamped(h, pose)) self.bx.move_ik(new_endpose)
def __init__(self, robot, *robotargs): super(PickAndPlaceNode, self).__init__('pp_node') rospy.loginfo("PickAndPlaceNode") _post_perceive_trans = defaultdict(lambda: self._pick) _post_perceive_trans.update({'c': self._calibrate}) _preplace = defaultdict(lambda: self._preplace) self.transition_table = { # If calibration has already happened, allow skipping it 'initial': {'c': self._calibrate, 'q': self._perceive, 's': self._preplace}, 'calibrate': {'q': self._perceive, 'c': self._calibrate}, 'perceive': {'p': self._post_perceive, 'q': self._perceive, 's': self._stop_perceive, 'c': self._calibrate}, 'post_perceive': _post_perceive_trans, 'postpick': {'1': self._level, '2': self._level, '9': self._level}, 'level': _preplace, 'preplace': {'s': self._place}, 'place': {'q': self._perceive, 'c': self._calibrate} } rospy.loginfo("PickAndPlaceNode1") if callable(robot): self.robot = robot(*robotargs) else: self.robot = robot self.robot.level = 1 rospy.loginfo("PickAndPlaceNode2") # Hardcoded place for now self.place_pose = PoseStamped( Header(0, rospy.Time(0), self.robot.base), Pose(Point(0.526025806, 0.4780144, -0.161326153), Quaternion(1, 0, 0, 0))) self.tl = tf.TransformListener() self.num_objects = 0 # Would this work too? Both tf and tf2 have (c) 2008... # self.tf2 = tf2_ros.TransformListener() self.n_objects_sub = rospy.Subscriber("/num_objects", Int64, self.update_num_objects, queue_size=1) self.perception_pub = rospy.Publisher("/perception/enabled", Bool, queue_size=1) self.alignment_pub = rospy.Publisher("/alignment/doit", Bool, queue_size=1) self.br = tf.TransformBroadcaster() rospy.loginfo("PickAndPlaceNode3") self.int_marker_server = InteractiveMarkerServer('int_markers') # Dict to map imarker names and their updated poses self.int_markers = {} # Ideally this call would be in a Factory/Metaclass/Parent self.show_options() self.perceive = False # self.robot.home() # self.move_calib_position()
def proc_imu(quat1, acc, gyro): # New info: # https://github.com/thalmiclabs/myo-bluetooth/blob/master/myohw.h#L292-L295 # Scale values for unpacking IMU data # define MYOHW_ORIENTATION_SCALE 16384.0f # See myohw_imu_data_t::orientation # define MYOHW_ACCELEROMETER_SCALE 2048.0f # See myohw_imu_data_t::accelerometer # define MYOHW_GYROSCOPE_SCALE 16.0f # See myohw_imu_data_t::gyroscope if not any(quat1): # If it's all 0's means we got no data, don't do anything return h = Header() h.stamp = rospy.Time.now() # frame_id is node name without / h.frame_id = rospy.get_name()[1:] # We currently don't know the covariance of the sensors with each other cov = [0, 0, 0, 0, 0, 0, 0, 0, 0] quat = Quaternion(quat1[0] / 16384.0, quat1[1] / 16384.0, quat1[2] / 16384.0, quat1[3] / 16384.0) # Normalize the quaternion and accelerometer values quatNorm = sqrt(quat.x * quat.x + quat.y * quat.y + quat.z * quat.z + quat.w * quat.w) normQuat = Quaternion(quat.x / quatNorm, quat.y / quatNorm, quat.z / quatNorm, quat.w / quatNorm) normAcc = Vector3(acc[0] / 2048.0, acc[1] / 2048.0, acc[2] / 2048.0) normGyro = Vector3(gyro[0] / 16.0, gyro[1] / 16.0, gyro[2] / 16.0) imu = Imu(h, normQuat, cov, normGyro, cov, normAcc, cov) imuPub.publish(imu) roll, pitch, yaw = euler_from_quaternion([normQuat.x, normQuat.y, normQuat.z, normQuat.w]) oriPub.publish(Vector3(roll, pitch, yaw)) oriDegPub.publish(Vector3(degrees(roll), degrees(pitch), degrees(yaw))) posePub.publish( PoseStamped(h,Pose(Point(0.0,0.0,0.0),normQuat)) ) # Package the arm and x-axis direction into an Arm message
def from_dict(cls, data, frame, lifetime): """Deserializes obstacle data into two MarkerArrays. Args: data: A dictionary. frame: Frame ID of every Marker. lifetime: Lifetime of every Marker in seconds. Returns: Tuple of two visualization_msgs/MarkerArray, MarkerArray) tuple. The first is of moving obstacles, and the latter is of stationary obstacles. """ # Generate base header. header = Header() header.stamp = rospy.get_rostime() header.frame_id = frame # Parse moving obstacles, and populate markers with spheres. moving_obstacles = GeoSphereArrayStamped() moving_obstacles.header = header if "moving_obstacles" in data: for obj in data["moving_obstacles"]: # Moving obstacles are spheres. obstacle = GeoSphere() # Set scale as radius. obstacle.radius = feet_to_meters(obj["sphere_radius"]) obstacle.center.latitude = obj["latitude"] obstacle.center.longitude = obj["longitude"] obstacle.center.altitude = feet_to_meters(obj["altitude_msl"]) moving_obstacles.spheres.append(obstacle) # Parse stationary obstacles, and populate markers with cylinders. stationary_obstacles = GeoCylinderArrayStamped() stationary_obstacles.header = header if "stationary_obstacles" in data: for obj in data["stationary_obstacles"]: # Stationary obstacles are cylinders. obstacle = GeoCylinder() # Set scale to define size. obstacle.radius = feet_to_meters(obj["cylinder_radius"]) obstacle.height = feet_to_meters(obj["cylinder_height"]) obstacle.center.latitude = obj["latitude"] obstacle.center.longitude = obj["longitude"] stationary_obstacles.cylinders.append(obstacle) return (moving_obstacles, stationary_obstacles)
def get_ik_joints_linear(initial_x, initial_y, initial_z, initial_w2, target_x, target_y, target_z, target_w2, n_steps): ns = "ExternalTools/left/PositionKinematicsNode/IKService" iksvc = rospy.ServiceProxy(ns, SolvePositionIK) ikreq = SolvePositionIKRequest(seed_mode=SolvePositionIKRequest.SEED_CURRENT) hdr = Header(stamp=rospy.Time.now(), frame_id='base') # current_pose = arm.endpoint_pose() x0 = initial_x y0 = initial_y z0 = initial_z # linear interpolate between current pose and target pose for i in xrange(n_steps): t = (i + 1) * 1. / n_steps x = (1. - t) * x0 + t * target_x y = (1. - t) * y0 + t * target_y z = (1. - t) * z0 + t * target_z pose = PoseStamped( header=hdr, pose=Pose( position=Point( x=x, y=y, z=z, ), # endeffector pointing down orientation=Quaternion( x=1., y=0., z=0., w=0., ), ), ) ikreq.pose_stamp.append(pose) try: rospy.wait_for_service(ns, 5.0) resp = iksvc(ikreq) except (rospy.ServiceException, rospy.ROSException), e: rospy.logerr("Service call failed: %s" % (e,)) return [] js = [] # control w2 separately from other joints for i, (v, j) in enumerate(zip(resp.isValid, resp.joints)): t = (i + 1) * 1. / n_steps if v: w2 = (1. - t) * initial_w2 + t * target_w2 j.position = j.position[:-1] + (w2,) js.append(j) return js