我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sensor_msgs.msg.JointState()。
def get(self, x_des, seed=None, bounds=()): """ Get the IK by minimization :param x_des: desired task space pose [[x, y, z], [x, y, z, w]] :param seed: RobotState message :param bounds:[(min, max), (min, max), (min, max), ... for each joint] :return: (bool, joints) """ if isinstance(seed, RobotState): seed = seed.joint_state elif not isinstance(seed, JointState) and seed is not None: raise TypeError('ros.IK.get only accepts RS or JS, got {}'.format(type(seed))) seed = [seed.position[seed.name.index(joint)] for joint in self._ik.joints] if seed is not None else () result = self._ik.get(x_des, seed, bounds) return result[0], JointState(name=self._ik.joints, position=list(result[1]))
def add_viapoint(self, t, obsys, sigmay=1e-6): """ Add a viapoint i.e. an observation at a specific time :param t: Time of observation :param obsys: RobotState observed at the time :param sigmay: :return: """ if isinstance(obsys, RobotState): obsys = obsys.joint_state elif not isinstance(obsys, JointState): raise TypeError("ros.ProMP.add_viapoint only accepts RS or JS, got {}".format(type(obsys))) try: positions = [obsys.position[obsys.name.index(joint)] for joint in self.joint_names] # Make sure joints are in right order except KeyError as e: raise KeyError("Joint {} provided as viapoint is unknown to the demonstrations".format(e)) else: self.promp.add_viapoint(t, map(float, positions), sigmay)
def __init__(self): #init code rospy.init_node("robotGame") self.currentDist = 1 self.previousDist = 1 self.reached = False self.tf = TransformListener() self.left_joint_names = ['yumi_joint_1_l', 'yumi_joint_2_l', 'yumi_joint_3_l', 'yumi_joint_4_l', 'yumi_joint_5_l', 'yumi_joint_6_l', 'yumi_joint_7_l', 'gripper_l_joint'] self.right_joint_names = ['yumi_joint_1_r', 'yumi_joint_2_r', 'yumi_joint_3_r', 'yumi_joint_4_r', 'yumi_joint_5_r', 'yumi_joint_6_r', 'yumi_joint_7_r', 'gripper_r_joint'] self.left_positions = [-2.01081820427463881, 1.4283937236421274, -1.3593418228836045, -0.19315625641494183, 1.7016501799872579, 0.6573540231496411, 3.404315594906305, 0.0] self.right_positions = [0.01081820427463881, 2.4283937236421274, 0.3593418228836045, -0.19315625641494183, 1.7016501799872579, 0.6573540231496411, 3.404315594906305, 1.8145107750466565] self.rjv = [] self.ljv = [] self.pub = rospy.Publisher('my_joint_states', JointState, queue_size=1) self.js = JointState() self.js.header = Header() self.js.name = self.left_joint_names + self.right_joint_names self.js.velocity = [] self.js.effort = [] self.sub = rospy.Subscriber('/joint_states', JointState, self.jsCB) self.destPos = np.random.uniform(0,0.25, size =(3)) self.reset()
def main(): rospy.init_node('issue_com') pub = rospy.Publisher(POS_COM_TOPIC, PositionCommand, queue_size=10) test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10) sub = rospy.Subscriber(POS_COM_TOPIC, PositionCommand, listen) #sub = rospy.Subscriber('/joint_states', JointState, listen) pc = PositionCommand() pc.mode = JOINT_SPACE #pc.arm = PositionCommand.LEFT_ARM pc.arm = 1#PositionCommand.RIGHT_ARM pc.data = np.zeros(7) r = rospy.Rate(1) #while not rospy.is_shutdown(): # pub.publish(pc) # r.sleep() # print 'published!' r.sleep() test_pub.publish(Empty()) pub.publish(pc)
def __init__(self): self.topics = { "ball": {"topic": "environment/ball", "sub": None, "data": CircularState(), "type": CircularState}, "light": {"topic": "environment/light", "sub": None, "data": UInt8(), "type": UInt8}, "sound": {"topic": "environment/sound", "sub": None, "data": Float32(), "type": Float32}, "ergo_state": {"topic": "ergo/state", "sub": None, "data": CircularState(), "type": CircularState}, "joy1": {"topic": "sensors/joystick/1", "sub": None, "data": Joy(), "type": Joy}, "joy2": {"topic": "sensors/joystick/2", "sub": None, "data": Joy(), "type": Joy}, "torso_l_j": {"topic": "{}/joint_state".format("poppy_torso"), "sub": None, "data": JointState(), "type": JointState}, "torso_l_eef": {"topic": "{}/left_arm/end_effector_pose".format("poppy_torso"), "sub": None, "data": PoseStamped(), "type": PoseStamped}, "torso_r_eef": {"topic": "{}/right_arm/end_effector_pose".format("poppy_torso"), "sub": None, "data": PoseStamped(), "type": PoseStamped} } self.topics["ball"]["sub"] = rospy.Subscriber(self.topics["ball"]["topic"], self.topics["ball"]["type"], self.cb_ball) self.topics["light"]["sub"] = rospy.Subscriber(self.topics["light"]["topic"], self.topics["light"]["type"], self.cb_light) self.topics["sound"]["sub"] = rospy.Subscriber(self.topics["sound"]["topic"], self.topics["sound"]["type"], self.cb_sound) self.topics["ergo_state"]["sub"] = rospy.Subscriber(self.topics["ergo_state"]["topic"], self.topics["ergo_state"]["type"], self.cb_ergo) self.topics["joy1"]["sub"] = rospy.Subscriber(self.topics["joy1"]["topic"], self.topics["joy1"]["type"], self.cb_joy1) self.topics["joy2"]["sub"] = rospy.Subscriber(self.topics["joy2"]["topic"], self.topics["joy2"]["type"], self.cb_joy2) self.topics["torso_l_j"]["sub"] = rospy.Subscriber(self.topics["torso_l_j"]["topic"], self.topics["torso_l_j"]["type"], self.cb_torso_l_j) self.topics["torso_l_eef"]["sub"] = rospy.Subscriber(self.topics["torso_l_eef"]["topic"], self.topics["torso_l_eef"]["type"], self.cb_torso_l_eef) self.topics["torso_r_eef"]["sub"] = rospy.Subscriber(self.topics["torso_r_eef"]["topic"], self.topics["torso_r_eef"]["type"], self.cb_torso_r_eef)
def _init_pubsub(self): self.joint_states_pub = rospy.Publisher('joint_states', JointState) self.odom_pub = rospy.Publisher('odom', Odometry) self.sensor_state_pub = rospy.Publisher('~sensor_state', TurtlebotSensorState) self.operating_mode_srv = rospy.Service('~set_operation_mode', SetTurtlebotMode, self.set_operation_mode) self.digital_output_srv = rospy.Service('~set_digital_outputs', SetDigitalOutputs, self.set_digital_outputs) if self.drive_mode == 'twist': self.cmd_vel_sub = rospy.Subscriber('cmd_vel', Twist, self.cmd_vel) self.drive_cmd = self.robot.direct_drive elif self.drive_mode == 'drive': self.cmd_vel_sub = rospy.Subscriber('cmd_vel', Drive, self.cmd_vel) self.drive_cmd = self.robot.drive elif self.drive_mode == 'turtle': self.cmd_vel_sub = rospy.Subscriber('cmd_vel', Turtle, self.cmd_vel) self.drive_cmd = self.robot.direct_drive else: rospy.logerr("unknown drive mode :%s"%(self.drive_mode)) self.transform_broadcaster = None if self.publish_tf: self.transform_broadcaster = tf.TransformBroadcaster()
def nearest_neighbour(self, fk_dict, scale_orient=40): def query_database(link, fk): x_vector = deepcopy(fk[0]) if self.with_orient: x_vector += (np.array(fk[1]) / scale_orient).tolist() res = self.trees[link]['tree'].query(x_vector) joints = self.trees[link]['data']['data'][res[1]][:-7] joint_names = self.trees[link]['data']['names'][:-7] return joints.tolist(), joint_names.tolist() state = JointState() for key, value in fk_dict.iteritems(): joints, joint_names = query_database(key, value) state.position += joints state.name += joint_names return state
def getAnglesFromDict(d):# NOTE Fails currently if dict is None """ Converts a dictionary to a angles of a JointState() :param d (dict): The dictionary to be converted :return (sensor_msgs.msg.JointState): The angles """ # print datapath if d is None: rospy.logerr("Could not get angles for non existing dictionary") return None js=JointState() for n,v in d.items(): js.name.append(n.encode()) js.position.append(v) return js
def timer_callback(self, event): msg = JointState() self.mutex.acquire() if time.time() - self.last_command_time < 0.5: dq = cartesian_control(self.joint_transforms, self.x_current, self.x_target, False, self.q_current, self.q0_desired) msg.velocity = dq elif time.time() - self.last_red_command_time < 0.5: dq = cartesian_control(self.joint_transforms, self.x_current, self.x_current, True, self.q_current, self.q0_desired) msg.velocity = dq else: msg.velocity = numpy.zeros(7) self.mutex.release() self.pub_vel.publish(msg)
def set_start_state(self, msg): """ Specify a start state for the group. Parameters ---------- msg : moveit_msgs/RobotState Examples -------- >>> from moveit_msgs.msg import RobotState >>> from sensor_msgs.msg import JointState >>> joint_state = JointState() >>> joint_state.header = Header() >>> joint_state.header.stamp = rospy.Time.now() >>> joint_state.name = ['joint_a', 'joint_b'] >>> joint_state.position = [0.17, 0.34] >>> moveit_robot_state = RobotState() >>> moveit_robot_state.joint_state = joint_state >>> group.set_start_state(moveit_robot_state) """ self._g.set_start_state(conversions.msg_to_string(msg))
def go(self, joints = None, wait = True): """ Set the target of the group and then move the group to the specified target """ if type(joints) is bool: wait = joints joints = None elif type(joints) is JointState: self.set_joint_value_target(joints) elif type(joints) is Pose: self.set_pose_target(joints) elif not joints == None: try: self.set_joint_value_target(self.get_remembered_joint_values()[joints]) except: self.set_joint_value_target(joints) if wait: return self._g.move() else: return self._g.async_move()
def __init__(self, server): self.server = server self.mutex = Lock() # Publisher to send commands self.pub_command = rospy.Publisher("/motion_planning_goal", geometry_msgs.msg.Transform, queue_size=1) self.listener = tf.TransformListener() # Subscribes to information about what the current joint values are. rospy.Subscriber("/joint_states", sensor_msgs.msg.JointState, self.joint_states_callback) # Publisher to set robot position self.pub_reset = rospy.Publisher("/joint_command", JointState, queue_size=1) rospy.sleep(0.5) # Wait for validity check service rospy.wait_for_service("check_state_validity") self.state_valid_service = rospy.ServiceProxy('check_state_validity', moveit_msgs.srv.GetStateValidity) self.reset_robot()
def js_cb(self, a): rospy.loginfo('Received array') js = JointState() js.name = ['head_pan', 'right_s0', 'right_s1', 'right_e0', 'right_e1', 'right_w0', 'right_w1', 'right_w2', 'left_s0', 'left_s1', 'left_e0', 'left_e1', 'left_w0', 'left_w1', 'left_w2'] jList = a.data jMatrix = np.reshape(jList,(jList.shape[0]/15,15)) i = 0 for pos in jMatrix: rospy.loginfo(pos) js.position = pos gsvr = GetStateValidityRequest() rs = RobotState() rs.joint_state = js gsvr.robot_state = rs gsvr.group_name = "both_arms" resp = self.coll_client(gsvr) if (resp.valid == False): rospy.loginfo('false') rospy.loginfo(i) self.js_pub.publish(0) return i = i + 1 self.js_pub.publish(1) rospy.loginfo('true') return
def to_joint_state(state): if isinstance(state, RobotState): state = state.joint_state elif not isinstance(state, JointState): raise TypeError("ROSBridge.to_joint_trajectory only accepts RT or JT, got {}".format(type(trajectory))) return state
def get(self, state): """ Get the FK :param state: RobotState message to get the forward kinematics for :return: [[x, y, z], [x, y, z, w]] """ if isinstance(state, RobotState): state = state.joint_state elif not isinstance(state, JointState): raise TypeError('ros.FK.get only accepts RS or JS, got {}'.format(type(state))) return self._fk.get([state.position[state.name.index(joint)] for joint in self.joints])
def set_start(self, obsy, sigmay=1e-6): if isinstance(obsy, RobotState): obsy = obsy.joint_state elif not isinstance(obsy, JointState): raise TypeError("ros.ProMP.set_start only accepts RS or JS, got {}".format(type(obsy))) try: positions = [obsy.position[obsy.name.index(joint)] for joint in self.joint_names] # Make sure joints are in right order except KeyError as e: raise KeyError("Joint {} provided as start state is unknown to the demonstrations".format(e)) else: self.promp.set_start(map(float, positions), sigmay)
def __init__(self): self.joint_name = rospy.get_param("~joint_name") self.command_pub = rospy.Publisher("command", Float64, queue_size=1) self.joint_sub = rospy.Subscriber("joint_state", JointState, self.joint_state_callback, queue_size=1)
def __init__(self): self.joint_name = rospy.get_param("~joint_name") self.joint_state = JointState() self.joint_state.name.append(self.joint_name) self.joint_state.position.append(0.0) self.joint_state.velocity.append(0.0) self.joint_pub = rospy.Publisher("joint_states", JointState, queue_size=1) self.command_sub = rospy.Subscriber("command", Float64, self.command_callback, queue_size=1)
def fk_service_client(limb = "right"): ns = "ExternalTools/" + limb + "/PositionKinematicsNode/FKService" fksvc = rospy.ServiceProxy(ns, SolvePositionFK) fkreq = SolvePositionFKRequest() joints = JointState() joints.name = ['right_j0', 'right_j1', 'right_j2', 'right_j3', 'right_j4', 'right_j5', 'right_j6'] joints.position = [0.763331, 0.415979, -1.728629, 1.482985, -1.135621, -1.674347, -0.496337] # Add desired pose for forward kinematics fkreq.configuration.append(joints) # Request forward kinematics from base to "right_hand" link fkreq.tip_names.append('right_hand') try: rospy.wait_for_service(ns, 5.0) resp = fksvc(fkreq) except (rospy.ServiceException, rospy.ROSException), e: rospy.logerr("Service call failed: %s" % (e,)) return False # Check if result valid if (resp.isValid[0]): rospy.loginfo("SUCCESS - Valid Cartesian Solution Found") rospy.loginfo("\nFK Cartesian Solution:\n") rospy.loginfo("------------------") rospy.loginfo("Response Message:\n%s", resp) else: rospy.logerr("INVALID JOINTS - No Cartesian Solution Found.") return False return True
def check_get_states_init(self): if self.hand_state_subscriber is None: self.hand_state_subscriber = rospy.Subscriber("/joint_states", JointState, self.recv_joint_state) rospy.sleep(0.1)
def run(self): """ Run our code """ rospy.loginfo("Start laser test code") #self.blank_image() #for x in range(0, 99): # self.add_point(x * 1.0, x * 1.0, 10) #cv2.imshow("Hackme", self.img) #cv2.waitKey(0) self.joint_subscriber = rospy.Subscriber("/multisense/joint_states", JointState, self.recv_joint_state) self.scan_subscriber = rospy.Subscriber("/multisense/lidar_scan", LaserScan, self.recv_scan) self.spindle = rospy.Publisher("/multisense/set_spindle_speed", Float64, queue_size=10) self.set_x_range(-1.0, 1.0) self.set_y_range(-1.0, 1.0) self.set_z_range(0.0, 2.0) rospy.sleep(0.1) rospy.loginfo("Setting spindle going") self.spindle.publish(Float64(1.0)) #self.zarj.zps.look_down() rospy.loginfo("Spin forever, hopefully receiving data...") while True: rospy.sleep(1.0) #print "Got cloud {}x{}".format(resp.cloud.height, resp.cloud.width) #img = self.create_image_from_cloud(resp.cloud.points) #cv2.destroyAllWindows() #print "New image" #cv2.imshow("My image", img) #cv2.waitKey(1) #print resp #cv_image = self.bridge.imgmsg_to_cv2(resp.cloud, "bgr8") #cv2.imshow("Point cloud", cv_image)
def get_endeffector_pos(self, pos_only=True): """ :param pos_only: only return postion :return: """ fkreq = SolvePositionFKRequest() joints = JointState() joints.name = self.ctrl.limb.joint_names() joints.position = [self.ctrl.limb.joint_angle(j) for j in joints.name] # Add desired pose for forward kinematics fkreq.configuration.append(joints) fkreq.tip_names.append('right_hand') try: rospy.wait_for_service(self.name_of_service, 5) resp = self.fksvc(fkreq) except (rospy.ServiceException, rospy.ROSException), e: rospy.logerr("Service call failed: %s" % (e,)) return False pos = np.array([resp.pose_stamp[0].pose.position.x, resp.pose_stamp[0].pose.position.y, resp.pose_stamp[0].pose.position.z]) return pos
def move_with_impedance(self, des_joint_angles): """ non-blocking """ js = JointState() js.name = self.ctrl.limb.joint_names() js.position = [des_joint_angles[n] for n in js.name] self.imp_ctrl_publisher.publish(js)
def fk_service_client(limb = "right"): ns = "ExternalTools/" + limb + "/PositionKinematicsNode/FKService" fksvc = rospy.ServiceProxy(ns, SolvePositionFK) fkreq = SolvePositionFKRequest() joints = JointState() joints.name = ['right_j0', 'right_j1', 'right_j2', 'right_j3', 'right_j4', 'right_j5', 'right_j6'] joints.position = [0.763331, 0.415979, -1.728629, 1.482985, -1.135621, -1.674347, -0.496337] # Add desired pose for forward kinematics fkreq.configuration.append(joints) # Request forward kinematics from base to "right_hand" link fkreq.tip_names.append('right_hand') try: rospy.wait_for_service(ns, 5.0) resp = fksvc(fkreq) except (rospy.ServiceException, rospy.ROSException), e: rospy.logerr("Service call failed: %s" % (e,)) return False # Check if result valid if (resp.isValid[0]): rospy.loginfo("SUCCESS - Valid Cartesian Solution Found") rospy.loginfo("\nFK Cartesian Solution:\n") rospy.loginfo("------------------") rospy.loginfo("Response Message:\n%s", resp) else: rospy.loginfo("INVALID JOINTS - No Cartesian Solution Found.") return True
def joint_state_from_cmd(cmd): js = JointState() js.name = cmd.keys() js.position = cmd.values() return js
def __init__(self, limb = "right"): # control parameters self._rate = 1000.0 # Hz self._missed_cmds = 20.0 # Missed cycles before triggering timeout # create our limb instance self._limb = intera_interface.Limb(limb) # initialize parameters self._springs = dict() self._damping = dict() self._des_angles = dict() # create cuff disable publisher cuff_ns = 'robot/limb/' + limb + '/suppress_cuff_interaction' self._pub_cuff_disable = rospy.Publisher(cuff_ns, Empty, queue_size=1) # verify robot is enabled print("Getting robot state... ") self._rs = intera_interface.RobotEnable(CHECK_VERSION) self._init_state = self._rs.state().enabled print("Enabling robot... ") self._rs.enable() print("Running. Ctrl-c to quit") rospy.Subscriber("desired_joint_pos", JointState, self._set_des_pos) rospy.Subscriber("release_spring", Float32, self._release) rospy.Subscriber("imp_ctrl_active", Int64, self._imp_ctrl_active) self.max_stiffness = 20 self.time_to_maxstiffness = .3 ######### 0.68 self.t_release = rospy.get_time() self._imp_ctrl_is_active = True for joint in self._limb.joint_names(): self._springs[joint] = 30 self._damping[joint] = 4
def subscribe(self): self.subscribeAcc = rospy.Subscriber("/robot/accelerometer/left_accelerometer/state",Imu, self.callback) self.subscribeEff = rospy.Subscriber("/robot/joint_states", JointState, self.callback2) rospy.spin()
def main(): rospy.init_node('issue_com') pub = rospy.Publisher(TRIAL_COM_TOPIC, TrialCommand, queue_size=10) test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10) sub = rospy.Subscriber(POS_COM_TOPIC, TrialCommand, listen) sub2 = rospy.Subscriber(RESULT_TOPIC, SampleResult, listen_report) #sub = rospy.Subscriber('/joint_states', JointState, listen) tc = TrialCommand() T = 1 tc.controller = get_lin_gauss_test(T=T) tc.T = T tc.frequency = 20.0 # NOTE: ordering of datatypes in state is determined by the order here tc.state_datatypes = [JOINT_ANGLES, JOINT_VELOCITIES] tc.obs_datatypes = tc.state_datatypes tc.ee_points = EE_SITES.reshape(EE_SITES.size).tolist() r = rospy.Rate(1) #while not rospy.is_shutdown(): # pub.publish(pc) # r.sleep() # print 'published!' r.sleep() test_pub.publish(Empty()) pub.publish(tc) rospy.spin()
def __init__(self): self.rospack = RosPack() with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f: self.params = json.load(f) self.button = Button(self.params) self.rate = rospy.Rate(self.params['publish_rate']) self.button.switch_led(False) # Service callers self.robot_reach_srv_name = '{}/reach'.format(self.params['robot_name']) self.robot_compliant_srv_name = '{}/set_compliant'.format(self.params['robot_name']) rospy.loginfo("Ergo node is waiting for poppy controllers...") rospy.wait_for_service(self.robot_reach_srv_name) rospy.wait_for_service(self.robot_compliant_srv_name) self.reach_proxy = rospy.ServiceProxy(self.robot_reach_srv_name, ReachTarget) self.compliant_proxy = rospy.ServiceProxy(self.robot_compliant_srv_name, SetCompliant) rospy.loginfo("Controllers connected!") self.state_pub = rospy.Publisher('ergo/state', CircularState, queue_size=1) self.button_pub = rospy.Publisher('sensors/buttons/help', Bool, queue_size=1) self.goals = [] self.goal = 0. self.joy_x = 0. self.joy_y = 0. self.motion_started_joy = 0. self.js = JointState() rospy.Subscriber('sensors/joystick/{}'.format(self.params["control_joystick_id"]), Joy, self.cb_joy) rospy.Subscriber('{}/joint_state'.format(self.params['robot_name']), JointState, self.cb_js) rospy.Subscriber('sensors/button_leds/pause', Bool, self.cb_bt_led) self.t = rospy.Time.now() self.srv_reset = None self.extended = False self.standby = False self.last_activity = rospy.Time.now() self.delta_t = rospy.Time.now()
def reach(self, target, duration): js = JointState() js.name = target.keys() js.position = target.values() self.reach_proxy(ReachTargetRequest(target=js, duration=rospy.Duration(duration)))
def ros_to_list(joints): assert isinstance(joints, JointState) return joints.position
def list_to_ros(joints): return JointState(position=list(joints))
def reach(self, positions, duration): self.reach_proxy(ReachTargetRequest(target=JointState(name=positions.keys(), position=positions.values()), duration=rospy.Duration(duration)))
def main(): def listener(): rospy.init_node('listener', anonymous=True) # defines anonymous listener node rospy.Subscriber('joint_states',JointState,callback) rospy.spin() # spin() simply keeps python from exiting until this node is stopped def callback(msg): # callback is executed when a message is published to 'joint_states' pos = [1,1,1,1,1,1,1,1,1,1] # creates an array of 10 to store joint_states for i in range (0,10): # for all servos ... pos[i]=msg.position[i+14] #stores joint states to pos[] while bypassing the initial 14 passive revolute joints pos[i]=converter(pos[i]) #converts servo posisitons into commands for the AR10 hand print int(pos[i]) hand.move(0,int(pos[0])) #sends commands to the AR10 hand.move(1,int(pos[1])) hand.move(2,int(pos[2])) hand.move(3,int(pos[3])) hand.move(4,int(pos[4])) hand.move(5,int(pos[5])) hand.move(6,int(pos[6])) hand.move(7,int(pos[7])) hand.move(8,int(pos[8])) hand.move(9,int(pos[9])) return pos def converter(pos): #Converter that is executed when called in callback command_value = (((pos*-3500)/0.02)+8000) return command_value hand = ar10() # create hand object listener() # start subscriber
def __init__(self): self.joint_state = JointState() self.arm_map = dict() self.joint_pub = rospy.Publisher("/as_arm/set_joints_states", JointState, queue_size=1) self.pose_sub = rospy.Subscriber("/as_arm/joint_states", JointState, callback=self.callback_joint, queue_size=1) # end effector service self.check_collision_client = rospy.ServiceProxy('/check_collision', CheckCollisionValid) self.tf_listener = tf.TransformListener()
def __init__(self): self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint', 'finger_joint1', 'finger_joint2'] # subscriber for setting joints position self.joint_state = JointState() self.states_pub = rospy.Publisher("/joint_states", JointState, queue_size=1) print "======joint range:" for name in self.joint_names[:5]: v = self.JointMap[name] v['range'][0] *= DEG_TO_RAD v['range'][1] *= DEG_TO_RAD print '\t', name, v['range']
def __init__(self): self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint', ] # subscriber for setting joints position self.joint_state = JointState() self.states_pub = rospy.Publisher("/as_arm/set_joints_states", JointState, queue_size=1) print "======joint range:" for name in self.joint_names[:5]: v = self.JointMap[name] v['range'][0] *= DEG_TO_RAD v['range'][1] *= DEG_TO_RAD print '\t', name, v['range']
def __init__(self): self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint', 'finger_joint1', 'finger_joint2'] # subscriber for setting joints position self.states_sub = rospy.Subscriber("/as_arm/set_joints_states", JointState, self.callback, queue_size=2) # a list of publisher self.joint_pub = dict() self.joint_pose = dict() for idx, name in enumerate(self.joint_names): pub = rospy.Publisher("/as_arm/joint%d_position_controller/command" % (idx + 1), Float64, queue_size=3) self.joint_pub[name] = pub self.joint_pose[name] = Float64()
def __init__(self): self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint', ] # subscriber for setting joints position self.joint_state = JointState() self.states_pub = rospy.Publisher("/joint_states", JointState, queue_size=1) self.pose_sub = rospy.Subscriber("/as_arm/joint_states", JointState, callback=self.callback_joint, queue_size=1) print "======joint range:" for name in self.joint_names[:5]: v = self.JointMap[name] v['range'][0] *= DEG_TO_RAD v['range'][1] *= DEG_TO_RAD print '\t', name, v['range']
def __init__(self): self.odom = Odometry() self.odom_received = False self.odom_sub = rospy.Subscriber('state', Odometry, self.OdomCallBack) self.thruster_received = False self.thruster_sub = rospy.Subscriber('thruster_command', JointState, self.ThrusterCallBack)
def __init__(self, description='human_description', prefix='human', control=False): self.description = description self.robot_commander = RobotCommander(description) if control: self.joint_publisher = rospy.Publisher('/human/set_joint_values', JointState, queue_size=1) self.groups = {} self.groups['head'] = MoveGroupCommander('Head', description) self.groups['right_arm'] = MoveGroupCommander('RightArm', description) self.groups['left_arm'] = MoveGroupCommander('LeftArm', description) self.groups['right_leg'] = MoveGroupCommander('RightLeg', description) self.groups['left_leg'] = MoveGroupCommander('LeftLeg', description) self.groups['upper_body'] = MoveGroupCommander('UpperBody', description) self.groups['lower_body'] = MoveGroupCommander('LowerBody', description) self.groups['whole_body'] = MoveGroupCommander('WholeBody', description) # initialize end-effectors dict self.end_effectors = {} # fill both dict for key, value in self.groups.iteritems(): self.end_effectors[key] = value.get_end_effector_link() # add the list of end-effectors for bodies self.end_effectors['upper_body'] = [self.end_effectors['head'], self.end_effectors['right_arm'], self.end_effectors['left_arm']] self.end_effectors['lower_body'] = [self.end_effectors['right_leg'], self.end_effectors['left_leg']] self.end_effectors['whole_body'] = self.end_effectors['upper_body'] + self.end_effectors['lower_body'] self.prefix = prefix self.urdf_reader = URDFReader() rospy.wait_for_service('compute_fk') self.compute_fk = rospy.ServiceProxy('compute_fk', GetPositionFK) self.current_state = self.get_initial_state()
def inverse_kinematic(self, desired_poses, fixed_joints={}, tolerance=0.001, group_names='whole_body', seed=None): def compute_ik_client(): rospy.wait_for_service('compute_human_ik') try: compute_ik = rospy.ServiceProxy('compute_human_ik', GetHumanIK) res = compute_ik(poses, fixed_joint_state, tolerance, group_names, seed) return res.joint_state except rospy.ServiceException, e: print "Service call failed: %s" % e # in case of failure return T pose return self.get_initial_state() if seed is None: seed = self.get_current_state() if group_names is not list: group_names = [group_names] # convert the desired poses to PoseStamped poses = [] for key, value in desired_poses.iteritems(): pose = transformations.list_to_pose(value) pose.header.frame_id = key poses.append(pose) # convert the fixed joints to joint state fixed_joint_state = JointState() for key, value in fixed_joints.iteritems(): fixed_joint_state.name += [key] fixed_joint_state.position += [value] return compute_ik_client()
def jacobian(self, group_name, joint_state, use_quaternion=False, link=None, ref_point=None): def compute_jacobian_srv(): rospy.wait_for_service('compute_jacobian') try: compute_jac = rospy.ServiceProxy('compute_jacobian', GetJacobian) js = JointState() js.name = self.get_joint_names(group_name) js.position = self.extract_group_joints(group_name, joint_state) p = Point(x=ref_point[0], y=ref_point[1], z=ref_point[2]) # call the service res = compute_jac(group_name, link, js, p, use_quaternion) # reorganize the jacobian jac_array = np.array(res.jacobian).reshape((res.nb_rows, res.nb_cols)) # reorder the jacobian wrt to the joint state ordered_jac = np.zeros((len(jac_array), len(joint_state.name))) for i, n in enumerate(js.name): ordered_jac[:, joint_state.name.index(n)] = jac_array[:, i] return ordered_jac except rospy.ServiceException, e: print "Service call failed: %s" % e # compute the jacobian only for chains # if group_name not in ['right_arm', 'left_arm', 'head', 'right_leg', 'left_leg']: # rospy.logerr('The Jacobian matrix can only be computed on kinematic chains') # return [] # assign values if link is None: link = self.end_effectors[group_name] if ref_point is None: ref_point = [0, 0, 0] # return the jacobian return compute_jacobian_srv()
def getDictFromAngles(angles): """ Converts angles of a JointState() to a dictionary :param angles (sensor_msgs.msg.JointState): angles to be converted :return (dict): dictionary with angles """ d=dict(zip(angles.name,angles.position)) return collections.OrderedDict(sorted( d.items() ))
def getStrFromAngles(angles): """ Converts all angles of a JointState() to a printable string :param angles (sensor_msgs.msg.JointState): JointState() angles to be converted :return (string): string of the angles """ d=getDictFromAngles(angles) return str( dict(d.items()))