我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用sensor_msgs.msg.Imu()。
def publish_info(self, imu): self.imu_msg = Imu() self.imu_msg.linear_acceleration.x = imu.linear_acceleration_x self.imu_msg.linear_acceleration.y = imu.linear_acceleration_y self.imu_msg.linear_acceleration.z = imu.linear_acceleration_z self.imu_msg.angular_velocity.x = imu.angular_velocity_x self.imu_msg.angular_velocity.y = imu.angular_velocity_y self.imu_msg.angular_velocity.z = imu.angular_velocity_z self.imu_msg.orientation.x = imu.orientation_x self.imu_msg.orientation.y = imu.orientation_y self.imu_msg.orientation.z = imu.orientation_z self.imu_msg.orientation.w = imu.orientation_w self.imu_msg.header.stamp = rospy.Time.now() self.imu_msg.header.frame_id = self.frame_name self.imu_msg.header.seq = self.seq self.pub_imu.publish(self.imu_msg) self.seq += 1
def __init__(self): if rospy.has_param('~orientation_offset'): # Orientation offset as quaterion q = [x,y,z,w]. self.orientation_offset = rospy.get_param('~orientation_offset') else: yaw_offset_deg = rospy.get_param('~yaw_offset_deg', 0.0) self.orientation_offset = tf.quaternion_from_euler(0.0, 0.0, math.radians(yaw_offset_deg)) rospy.Subscriber(rospy.get_name() + "/imu_in", Imu, self.imu_callback) self.pub_imu_out = rospy.Publisher(rospy.get_name() + '/imu_out', Imu, queue_size=10) rospy.spin()
def __init__(self): # Read Settings self.read_settings() # Init other variables self._num_magnetometer_reads = 0 self._latest_bearings = np.zeros(shape = (self._number_samples_average, 1)) self._received_enough_samples = False # Subscribe to magnetometer topic rospy.Subscriber("magnetic_field", Vector3Stamped, self.magnetic_field_callback) # Publishers self._pub_bearing_raw = rospy.Publisher(rospy.get_name() + '/bearing_raw_deg', Float64, queue_size = 10) self._pub_bearing_avg = rospy.Publisher(rospy.get_name() + '/bearing_avg_deg', Float64, queue_size = 10) self._pub_imu_bearing_avg = rospy.Publisher(rospy.get_name() + '/imu_bearing_avg', Imu, queue_size = 10) if self._verbose: self._pub_mag_corrected = rospy.Publisher(rospy.get_name() + '/mag_corrected', Vector3Stamped, queue_size = 10) rospy.spin()
def __init__(self): self.petrone = Petrone() # subscriber self.sub_flight = rospy.Subscriber('cmd_fly', Int8, self.cb_fly, queue_size=1) self.sub_cmd = rospy.Subscriber('cmd_vel_raw', Twist, self.cb_cmd, queue_size=1) self.sub_color = rospy.Subscriber('led_color', String, self.cb_color, queue_size=1) # publisher self.pub_battery = rospy.Publisher('battery', Float32, queue_size=1) self.pub_status_flight = rospy.Publisher('status/flight', Int8, queue_size=1) self.pub_status_system = rospy.Publisher('status/system', Int8, queue_size=1) self.pub_imu = rospy.Publisher('imu', Imu, queue_size=1) # cache self.is_disconnected = True self.last_values = defaultdict(lambda: 0) # flight parameters self.twist = Twist() self.twist_at = 0
def __init__(self): self.rate = rospy.get_param('~rate', 100.0) self.imu_name = rospy.get_param('~imu_name', 'imu_steer') self.topic_name = rospy.get_param('~topic_name', 'topic_name') self.roll = 0.0 self.pitch = 0.0 self.yaw = 0.0 self.pub_imu_roll_msg = Float32() self.pub_imu_pitch_msg = Float32() self.pub_imu_yaw_msg = Float32() self.pub_imu_roll = rospy.Publisher('/' + self.imu_name +'/roll', Float32, queue_size=1) self.pub_imu_pitch = rospy.Publisher('/' + self.imu_name +'/pitch', Float32, queue_size=1) self.pub_imu_yaw = rospy.Publisher('/' + self.imu_name +'/yaw', Float32, queue_size=1) self.sub = rospy.Subscriber(self.topic_name, Imu, self.process_imu_message, queue_size=1) rate = rospy.Rate(self.rate) while not rospy.is_shutdown(): rate.sleep()
def yaw_to_imu(yaw): # WARNING: we assume zero roll and zero pitch! q_avg = tf.quaternion_from_euler(0.0, 0.0, yaw); imu_msg = Imu() imu_msg.orientation.w = q_avg[3] imu_msg.orientation.x = q_avg[0] imu_msg.orientation.y = q_avg[1] imu_msg.orientation.z = q_avg[2] pub_imu.publish(imu_msg)
def __init__(self): self.lock = threading.Lock() self.sub_imu = rospy.Subscriber('imu', Imu, self.imu_cb) self.last_imu_angle = 0 self.imu_angle = 0 while not rospy.is_shutdown(): rospy.sleep(0.3) rospy.loginfo("imu_angle:"+str((self.imu_angle)*180/3.1415926))
def __init__(self): self.lock = threading.Lock() self.sub_imu = rospy.Subscriber('imu', Imu, self.imu_cb) self.last_imu_angle = 0 self.imu_angle = 0 while not rospy.is_shutdown(): self.last_imu_angle = self.imu_angle rospy.sleep(10) rospy.loginfo("imu_drift:"+str((self.imu_angle-self.last_imu_angle)*180/3.1415926))
def __init__(self): self.pub = rospy.Publisher(cmd_vel,Twist,queue_size =1) self.subscriber = rospy.Subscriber(Imu_topic,Imu,self.callback)
def subscribe(self): self.subscribeAcc = rospy.Subscriber("/robot/accelerometer/left_accelerometer/state",Imu, self.callback) self.subscribeEff = rospy.Subscriber("/robot/joint_states", JointState, self.callback2) rospy.spin()
def handle_acc(self, msg): # Check header of Imu msg t = msg.header.stamp.secs acc = (msg.linear_acceleration.x, msg.linear_acceleration.y, msg.linear_acceleration.z) # TODO check how uniform t is, if not investigate correct # filtering option self.acc_t.append(t) self.acc.append(acc)
def proc_emg(emg, moving): # create an array of ints for emg data # and moving data emgPub.publish(EmgArray(emg, moving)) # Package the IMU data into an Imu message
def __init__(self, use_acc_only): self._init_obs() config = copy.deepcopy(HYPERPARAMS) self._hyperparams = config self._use_acc_only = use_acc_only if self._hyperparams['constraint']: import superball_kinematic_tool as skt self._constraint = skt.KinematicMotorConstraints( self._hyperparams['constraint_file'], **self._hyperparams['constraint_params'] ) rospy.init_node('superball_agent', disable_signals=True, log_level=rospy.DEBUG) self._obs_update_lock = threading.Lock() self._motor_pos_sub = rospy.Subscriber( '/ranging_data_matlab', Float32MultiArray, callback=self._motor_pos_cb, queue_size=1 ) self._imu_sub = [] for i in range(12): self._imu_sub.append( rospy.Subscriber( SUPERBALL_IMU_TOPICS[i], Imu, callback=self._imu_cb, queue_size=1 ) ) self._ctrl_pub = rospy.Publisher('/superball/control', String, queue_size=1) self._timestep_pub = rospy.Publisher('/superball/timestep', UInt16, queue_size=1) self._init_motor_pubs() self._run_sim = False self._sim_thread = threading.Thread(target=self._continue_simulation) self._sim_thread.daemon = True self._sim_thread.start() self._action_rate = rospy.Rate(10)
def __init__(self): self.rate = rospy.get_param('~rate', 100.0) self.topic_name_angle = rospy.get_param('~topic_name_angle', 'topic_name_angle') self.topic_name_reference = rospy.get_param('~topic_name_reference', 'topic_name_reference') self.imu_name = rospy.get_param('~imu_name', 'imu_steer') self.roll_steer = 0.0 self.pitch_steer = 0.0 self.yaw_steer = 0.0 self.roll_frame = 0.0 self.pitch_frame = 0.0 self.yaw_frame = 0.0 self.enable_reference = False self.enable_angle = False self.pub_imu_roll_msg = Float32() self.pub_imu_pitch_msg = Float32() self.pub_imu_yaw_msg = Float32() self.pub_imu_roll = rospy.Publisher('/' + self.imu_name +'/roll', Float32, queue_size=1) self.pub_imu_pitch = rospy.Publisher('/' + self.imu_name +'/pitch', Float32, queue_size=1) self.pub_imu_yaw = rospy.Publisher('/' + self.imu_name +'/yaw', Float32, queue_size=1) self.sub = rospy.Subscriber(self.topic_name_angle, Imu, self.process_imu_message_angle, queue_size=1) self.sub = rospy.Subscriber(self.topic_name_reference, Imu, self.process_imu_message_reference, queue_size=1) rate = rospy.Rate(self.rate) while not rospy.is_shutdown(): rate.sleep()
def pubeuler(): rospy.init_node('pub_euler') pubimu = rospy.Publisher('euler_imu', PoseStamped,queue_size = 10) pubodom = rospy.Publisher('euler_odom', PoseStamped,queue_size = 10) pubfiltered = rospy.Publisher('euler_filtered', PoseStamped,queue_size = 10) pubmeas = rospy.Publisher('euler_meas',PoseStamped,queue_size = 10) rospy.Subscriber('/imu',Imu,callbackimu, callback_args=pubimu) rospy.Subscriber('/odom',Odometry,callbackodom, callback_args=pubodom) rospy.Subscriber('/odometry/filtered',Odometry,callbackfiltered, callback_args=pubfiltered) rospy.Subscriber('/meas_pose',Odometry,callbackmeas,callback_args=pubmeas) rospy.spin()
def magnetic_field_callback(self, magnetometer_msg): # Correct magnetic filed raw_mag = np.array([magnetometer_msg.vector.x, magnetometer_msg.vector.y, magnetometer_msg.vector.z]) # corrected_mag = compensation * (raw_mag - offset) corrected_mag = np.dot(self._calibration_compensation, raw_mag - self._calibration_offset) # compute yaw angle using corrected magnetometer measurements # and ASSUMING ZERO pitch and roll of the magnetic sensor! # adapted from # https://github.com/KristofRobot/razor_imu_9dof/blob/indigo-devel/src/Razor_AHRS/Compass.ino corrected_mag = corrected_mag / np.linalg.norm(corrected_mag) mag_bearing = math.atan2(corrected_mag[1], -corrected_mag[0]) # add declination and constant bearing offset mag_bearing = mag_bearing + self._declination + self._bearing_offset # publish unfiltered bearing, degrees only for debug purposes self._pub_bearing_raw.publish(Float64(math.degrees(mag_bearing))) # compute mean self._latest_bearings[self._num_magnetometer_reads] = mag_bearing self._num_magnetometer_reads += 1 if self._num_magnetometer_reads >= self._number_samples_average: self._num_magnetometer_reads = 0 # delete oldest samples self._received_enough_samples = True if self._received_enough_samples: bearing_avg = self.angular_mean(self._latest_bearings) else: # not enough samples, use latest value bearing_avg = mag_bearing # WARNING: we assume zero roll and zero pitch! q_avg = tf.quaternion_from_euler(0.0, 0.0, bearing_avg); imu_msg = Imu() imu_msg.orientation.w = q_avg[3] imu_msg.orientation.x = q_avg[0] imu_msg.orientation.y = q_avg[1] imu_msg.orientation.z = q_avg[2] self._pub_bearing_avg.publish(Float64(math.degrees(bearing_avg))) self._pub_imu_bearing_avg.publish(imu_msg) # debug if self._verbose: rospy.loginfo("bearing_avg : " + str(math.degrees(bearing_avg)) + " deg") mag_corrected_msg = magnetometer_msg mag_corrected_msg.vector.x = corrected_mag[0] mag_corrected_msg.vector.y = corrected_mag[1] mag_corrected_msg.vector.z = corrected_mag[2] self._pub_mag_corrected.publish(mag_corrected_msg)
def __init__(self): self.recording = False self.names = ('sair', 'sail', 'fai', 'faii') self.data = {n: [] for n in self.names} # TODO hardcoded for left arm # self.acc_sub = rospy.Subscriber( # '/robot/accelerometer/left_accelerometer/state', # Imu, # self.handle_acc) self.sensor_sub = rospy.Subscriber( '/sensor_values', Int32MultiArray, self.handle_sensor) # self.kb_sub = rospy.Subscriber('/keyboard/keyup', # Key, # self.keyboard_cb, queue_size=10) # Visualising the below pulished signals in rqt_plot is recommended self.sai_pub = rospy.Publisher( '/finger_sensor/sai', FingerSAI, queue_size=5) self.fai_pub = rospy.Publisher( '/finger_sensor/fai', FingerFAI, queue_size=5) self.faii_pub = rospy.Publisher( '/finger_sensor/faii', Float64, queue_size=5) self.sensor_values = [] self.sensor_values = deque(maxlen=80) self.acc_t = deque(maxlen=400) self.acc = deque(maxlen=400) # 0.66pi rad/sample (cutoff frequency over nyquist frequency # (ie, half the sampling frequency)). For the wrist, 33 Hz / # (100 Hz/ 2). TODO Double check arguments self.b1, self.a1 = signal.butter(1, 0.66, 'high', analog=False) # 0.5p rad/sample. For the tactile sensor, 5 Hz / (20 Hz / 2). self.b, self.a = signal.butter(1, 0.5, 'high', analog=False)
def proc_imu(quat1, acc, gyro): # New info: # https://github.com/thalmiclabs/myo-bluetooth/blob/master/myohw.h#L292-L295 # Scale values for unpacking IMU data # define MYOHW_ORIENTATION_SCALE 16384.0f # See myohw_imu_data_t::orientation # define MYOHW_ACCELEROMETER_SCALE 2048.0f # See myohw_imu_data_t::accelerometer # define MYOHW_GYROSCOPE_SCALE 16.0f # See myohw_imu_data_t::gyroscope if not any(quat1): # If it's all 0's means we got no data, don't do anything return h = Header() h.stamp = rospy.Time.now() # frame_id is node name without / h.frame_id = rospy.get_name()[1:] # We currently don't know the covariance of the sensors with each other cov = [0, 0, 0, 0, 0, 0, 0, 0, 0] quat = Quaternion(quat1[0] / 16384.0, quat1[1] / 16384.0, quat1[2] / 16384.0, quat1[3] / 16384.0) # Normalize the quaternion and accelerometer values quatNorm = sqrt(quat.x * quat.x + quat.y * quat.y + quat.z * quat.z + quat.w * quat.w) normQuat = Quaternion(quat.x / quatNorm, quat.y / quatNorm, quat.z / quatNorm, quat.w / quatNorm) normAcc = Vector3(acc[0] / 2048.0, acc[1] / 2048.0, acc[2] / 2048.0) normGyro = Vector3(gyro[0] / 16.0, gyro[1] / 16.0, gyro[2] / 16.0) imu = Imu(h, normQuat, cov, normGyro, cov, normAcc, cov) imuPub.publish(imu) roll, pitch, yaw = euler_from_quaternion([normQuat.x, normQuat.y, normQuat.z, normQuat.w]) oriPub.publish(Vector3(roll, pitch, yaw)) oriDegPub.publish(Vector3(degrees(roll), degrees(pitch), degrees(yaw))) posePub.publish( PoseStamped(h,Pose(Point(0.0,0.0,0.0),normQuat)) ) # Package the arm and x-axis direction into an Arm message
def __init__(self): self.degrees2rad = math.pi/180.0 # Get values from launch file self.rate = rospy.get_param('~rate', 80.0) # the rate at which to publish the transform # Static transform between sensor and fixed frame: x, y, z, roll, pitch, yaw # <rosparam param="static_transform">[0, 0, 0, 0, 0, 0]</rosparam> self.static_transform = rospy.get_param('~static_transform', [0, 0, 0, 0, 0, 0]) self.serial_port = rospy.get_param('~serial_port', "/dev/ttyUSB0") self.topic_name = rospy.get_param('~topic_name', "/imu") self.fixed_frame = rospy.get_param('~fixed_frame', "world") self.frame_name = rospy.get_param('~frame_name', "imu") self.publish_transform = rospy.get_param('~publish_transform', False) self.imu = ImuDriver(serial_port=self.serial_port) self.imu.init_imu() # Create a publisher for imu message self.pub_imu = rospy.Publisher(self.topic_name, Imu, queue_size=1) self.odomBroadcaster_imu = TransformBroadcaster() self.imu_msg = Imu() self.imu_msg.orientation_covariance[0] = -1 self.imu_msg.angular_velocity_covariance[0] = -1 self.imu_msg.linear_acceleration_covariance[0] = -1 self.current_time = rospy.get_time() self.last_time = rospy.get_time() self.seq = 0 rospy.on_shutdown(self.shutdown_node) rate = rospy.Rate(self.rate) rospy.loginfo("Ready for publishing imu:" + self.serial_port) # Main while loop. while not rospy.is_shutdown(): self.current_time = rospy.get_time() self.imu.read() if self.publish_transform: quaternion = self.imu.quaternion_from_euler(self.static_transform[3]*self.degrees2rad, self.static_transform[4]*self.degrees2rad, self.static_transform[5]*self.degrees2rad) # send static transformation tf between imu and fixed frame self.odomBroadcaster_imu.sendTransform( (self.static_transform[0], self.static_transform[1], self.static_transform[2]), (quaternion[0], quaternion[1], quaternion[2], quaternion[3]), rospy.Time.now(), self.frame_name, self.fixed_frame ) # publish imu message self.publish_info(imu=self.imu) rate.sleep()