Python rospy 模块,get_param() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rospy.get_param()

项目:geom_rcnn    作者:asbroad    | 项目源码 | 文件源码
def __init__(self):

        self.run_recognition = rospy.get_param('/rgb_object_detection/run_recognition')
        self.model_filename = rospy.get_param('/rgb_object_detection/model_file')
        self.weights_filename = rospy.get_param('/rgb_object_detection/weights_file')
        self.categories_filename = rospy.get_param('/rgb_object_detection/category_file')
        self.verbose = rospy.get_param('/rgb_object_detection/verbose', False)

        self.bridge = CvBridge()
        self.image_sub = rospy.Subscriber('/camera/rgb/image_color', Image, self.img_cb)
        self.patches_sub = rospy.Subscriber('/candidate_regions_depth', PolygonStamped, self.patches_cb)
        self.detection_pub = rospy.Publisher('/detections', Detection, queue_size=1)
        # you can read this value off of your sensor from the '/camera/depth_registered/camera_info' topic
        self.P = np.array([[525.0, 0.0, 319.5, 0.0], [0.0, 525.0, 239.5, 0.0], [0.0, 0.0, 1.0, 0.0]])

        if self.run_recognition:
            self.cnn = CNN('', self.model_filename, self.weights_filename, self.categories_filename, '', 0, 0, self.verbose)
            self.cnn.load_model()
项目:mhri    作者:mhri    | 项目源码 | 文件源码
def __init__(self):
        rospy.init_node('multiplexer_node', anonymous=False)

        rospy.loginfo('\033[93m[%s]\033[0m wait for bringup social_events_memory...'%rospy.get_name())
        rospy.wait_for_service('social_events_memory/read_data')
        self.rd_event_mem = rospy.ServiceProxy('social_events_memory/read_data', ReadData)
        rospy.Subscriber('forwarding_event', ForwardingEvent, self.handle_social_events)
        self.pub_rasing_event = rospy.Publisher('raising_events', RaisingEvents, queue_size=10)

        self.events_queue = Queue.Queue()
        self.recognized_words_queue = Queue.Queue()

        event_period = rospy.get_param('~event_period', 0.5)
        rospy.Timer(rospy.Duration(event_period), self.handle_trigger_events)

        rospy.loginfo('\033[93m[%s]\033[0m initialized...'%rospy.get_name())
        rospy.spin()
项目:overhead_mobile_tracker    作者:NU-MSR    | 项目源码 | 文件源码
def __init__(self):
        # first let's load all parameters:
        self.frame_id = rospy.get_param("~frame_id", "odom_meas")
        self.camera_frame_id = rospy.get_param("~camera_frame_id", "overhead_cam_frame")
        self.base_frame_id = rospy.get_param("~base_frame_id", "base_meas")
        self.x0 = rospy.get_param("~x0", 0.0)
        self.y0 = rospy.get_param("~y0", 0.0)
        self.th0 = rospy.get_param("~th0", 0.0)
        self.pubstate = rospy.get_param("~pubstate", True)
        self.marker_id = rospy.get_param("~marker_id", 12)

        # setup other required vars:
        self.odom_offset = odom_conversions.numpy_to_odom(np.array([self.x0, self.y0, self.th0]),
                                                          self.frame_id)
        self.path_list = deque([], maxlen=PATH_LEN)

        # now let's create publishers, listeners, and subscribers
        self.br = tf.TransformBroadcaster()
        self.listener = tf.TransformListener()
        self.meas_pub = rospy.Publisher("meas_pose", Odometry, queue_size=5)
        self.path_pub = rospy.Publisher("meas_path", Path, queue_size=1)
        self.alvar_sub = rospy.Subscriber("ar_pose_marker", AlvarMarkers, self.alvarcb)
        self.publish_serv = rospy.Service("publish_bool", SetBool, self.pub_bool_srv_cb)
        self.offset_serv = rospy.Service("set_offset", SetOdomOffset, self.offset_srv_cb)
        return
项目:cozmo_driver    作者:OTL    | 项目源码 | 文件源码
def __init__(self):
        """
        Create HeadLiftJoy object.

        """
        # params
        self._head_axes = rospy.get_param('~head_axes', 3)
        self._lift_axes = rospy.get_param('~lift_axes', 3)
        self._head_button = rospy.get_param('~head_button', 4)
        self._lift_button = rospy.get_param('~lift_button', 5)

        # pubs
        self._head_pub = rospy.Publisher('head_angle', Float64, queue_size=1)
        self._lift_pub = rospy.Publisher('lift_height', Float64, queue_size=1)

        # subs
        self._joy_sub = rospy.Subscriber('joy', Joy, self._joy_cb, queue_size=1)
项目:uwb_tracker_ros    作者:eth-ait    | 项目源码 | 文件源码
def _read_unit_offsets(self):
        if not rospy.has_param('~num_of_units'):
            rospy.logwarn("No unit offset parameters found!")
        num_of_units = rospy.get_param('~num_of_units', 0)
        self._unit_offsets = np.zeros((num_of_units, 3))
        self._unit_coefficients = np.zeros((num_of_units, 2))
        for i in xrange(num_of_units):
            unit_params = rospy.get_param('~unit_{}'.format(i))
            x = unit_params['x']
            y = unit_params['y']
            z = unit_params['z']
            self._unit_offsets[i, :] = [x, y, z]
            p0 = unit_params['p0']
            p1 = unit_params['p1']
            self._unit_coefficients[i, :] = [p0, p1]
        rospy.loginfo("Unit offsets: {}".format(self._unit_offsets))
        rospy.loginfo("Unit coefficients: {}".format(self._unit_coefficients))
项目:mav_rtk_gps    作者:ethz-asl    | 项目源码 | 文件源码
def get_topic_service_names(self):
        topic_names = {}
        service_names = {}

        # Topics.
        topic_names['msf_odometry'] = rospy.get_param('~msf_odometry_topic', 'msf_core/odometry')

        # Services.
        service_names['init_msf_height'] = rospy.get_param('~init_msf_height_srv',
                                                           'pose_sensor_rovio/pose_sensor/initialize_msf_height')
        service_names['init_msf_scale'] = rospy.get_param('~init_msf_scale_srv',
                                                          'pose_sensor_rovio/pose_sensor/initialize_msf_scale')

        # Check if we should add a leading namespace
        name_space = rospy.get_param('~namespace', '')
        for key, value in topic_names.iteritems():
            topic_names[key] = helpers.get_full_namespace(name_space, value)

        for key, value in service_names.iteritems():
            service_names[key] = helpers.get_full_namespace(name_space, value)

        return topic_names, service_names
项目:mav_rtk_gps    作者:ethz-asl    | 项目源码 | 文件源码
def get_topic_names(self):
        # RTK info topics
        topic_names = {}

        topic_names['piksi_receiver_state'] = rospy.get_param('~piksi_receiver_state_topic',
                                                              'piksi/debug/receiver_state')
        topic_names['piksi_uart_state'] = rospy.get_param('~piksi_uart_state_topic',
                                                          'piksi/debug/uart_state')
        topic_names['piksi_baseline_ned'] = rospy.get_param('~piksi_baseline_ned_topic',
                                                            'piksi/baseline_ned')
        topic_names['piksi_wifi_corrections'] = rospy.get_param('~piksi_num_wifi_corrections_topic',
                                                                'piksi/debug/wifi_corrections')
        topic_names['piksi_navsatfix_rtk_fix'] = rospy.get_param('~piksi_navsatfix_rtk_fix',
                                                                 'piksi/navsatfix_rtk_fix')
        topic_names['piksi_age_of_corrections'] = rospy.get_param('~piksi_age_of_corrections_topic',
                                                                  'piksi/age_of_corrections')

        # Check if we should add a leading namespace
        name_space = rospy.get_param('~namespace', '')
        for key, value in topic_names.iteritems():
            topic_names[key] = helpers.get_full_namespace(name_space, value)

        return topic_names
项目:mav_rtk_gps    作者:ethz-asl    | 项目源码 | 文件源码
def __init__(self):

        if rospy.has_param('~orientation_offset'):
            # Orientation offset as quaterion q = [x,y,z,w].
            self.orientation_offset = rospy.get_param('~orientation_offset')
        else:
            yaw_offset_deg = rospy.get_param('~yaw_offset_deg', 0.0)
            self.orientation_offset = tf.quaternion_from_euler(0.0, 0.0, math.radians(yaw_offset_deg))

        rospy.Subscriber(rospy.get_name() + "/imu_in", Imu, self.imu_callback)

        self.pub_imu_out = rospy.Publisher(rospy.get_name() + '/imu_out',
                                           Imu, queue_size=10)

        rospy.spin()
项目:cs6244_motionplanning    作者:chenmin1107    | 项目源码 | 文件源码
def init(self):
        pygame.init()
        clock = pygame.time.Clock()
        screen = pygame.display.set_mode((250, 250))

        rospy.init_node('highway_teleop')
        self.rate = rospy.Rate(rospy.get_param('~hz', 10)) 
        self.acc = rospy.get_param('~acc', 5)
        self.yaw = rospy.get_param('~yaw', 0)

        self.robot_pub = rospy.Publisher('robot_0/cmd_vel', Twist, queue_size=1)

        print "Usage: \n \
                up arrow: accelerate \n \
                down arrow: decelerate \n \
                left arrow: turn left \n \
                right arrow: turn right"
项目:cs6244_motionplanning    作者:chenmin1107    | 项目源码 | 文件源码
def init(self):
        pygame.init()
        clock = pygame.time.Clock()
        screen = pygame.display.set_mode((250, 250))

        rospy.init_node('teleop')
        self.rate = rospy.Rate(rospy.get_param('~hz', 20)) 
        self.acc = rospy.get_param('~acc', 1)
        self.yaw = rospy.get_param('~yaw', 0.25)

        self.robot_pub = rospy.Publisher('control_command', controlCommand, queue_size=1)

        self.path_record_pub = rospy.Publisher('record_state', \
                RecordState, queue_size=1)

        self.highway_game_start_pub = rospy.Publisher('highway_game_start', RecordState, queue_size=1)

        print "Usage: \n \
                up arrow: accelerate \n \
                down arrow: decelerate \n \
                left arrow: turn left \n \
                right arrow: turn right"
项目:easy_handeye    作者:IFL-CAMP    | 项目源码 | 文件源码
def from_parameters(self):
        """
        Stores this calibration as ROS parameters in the namespace of the current node.

        :rtype: None
        """
        calib_dict = {}

        root_params = ['eye_on_hand', 'tracking_base_frame']
        for rp in root_params:
            calib_dict[rp] = rospy.get_param(rp)

        if calib_dict['eye_on_hand']:
            calib_dict['robot_effector_frame'] = rospy.get_param('robot_effector_frame')
        else:
            calib_dict['robot_base_frame'] = rospy.get_param('robot_base_frame')

        transf_params = 'x', 'y', 'z', 'qx', 'qy', 'qz', 'qw'
        calib_dict['transformation'] = {}
        for tp in transf_params:
            calib_dict['transformation'][tp] = rospy.get_param('transformation/'+tp)

        self.from_dict(calib_dict)
项目:easy_handeye    作者:IFL-CAMP    | 项目源码 | 文件源码
def __init__(self):
        self.eye_on_hand = rospy.get_param('eye_on_hand', False)

        # tf names
        self.robot_base_frame = rospy.get_param('robot_base_frame', 'base_link')
        self.robot_effector_frame = rospy.get_param('robot_effector_frame', 'tool0')
        self.tracking_base_frame = rospy.get_param('tracking_base_frame', 'optical_origin')
        self.tracking_marker_frame = rospy.get_param('tracking_marker_frame', 'optical_target')

        rospy.wait_for_service(hec.GET_SAMPLE_LIST_TOPIC)
        self.get_sample_proxy = rospy.ServiceProxy(hec.GET_SAMPLE_LIST_TOPIC, hec.srv.TakeSample)
        rospy.wait_for_service(hec.TAKE_SAMPLE_TOPIC)
        self.take_sample_proxy = rospy.ServiceProxy(hec.TAKE_SAMPLE_TOPIC, hec.srv.TakeSample)
        rospy.wait_for_service(hec.REMOVE_SAMPLE_TOPIC)
        self.remove_sample_proxy = rospy.ServiceProxy(hec.REMOVE_SAMPLE_TOPIC, hec.srv.RemoveSample)
        rospy.wait_for_service(hec.COMPUTE_CALIBRATION_TOPIC)
        self.compute_calibration_proxy = rospy.ServiceProxy(hec.COMPUTE_CALIBRATION_TOPIC, hec.srv.ComputeCalibration)
        rospy.wait_for_service(hec.SAVE_CALIBRATION_TOPIC)
        self.save_calibration_proxy = rospy.ServiceProxy(hec.SAVE_CALIBRATION_TOPIC, std_srvs.srv.Empty)
项目:intera_sdk    作者:RethinkRobotics    | 项目源码 | 文件源码
def wait(self):
        """
        Waits for and verifies trajectory execution result
        """
        #create a timeout for our trajectory execution
        #total time trajectory expected for trajectory execution plus a buffer
        last_time = self.goal.trajectory.points[-1].time_from_start.to_sec()
        time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
        timeout = rospy.Duration(self._slow_move_offset +
                                 last_time +
                                 time_buffer)

        finish = self._limb_client.wait_for_result(timeout)
        result = (self._limb_client.get_result().error_code == 0)

        #verify result
        if all([finish, result]):
            return True
        else:
            msg = ("Trajectory action failed or did not finish before "
                   "timeout/interrupt.")
            rospy.logwarn(msg)
            return False
项目:intera_sdk    作者:RethinkRobotics    | 项目源码 | 文件源码
def get_camera_details(self):
        """
        Return the details of the cameras.

        @rtype: list [str]
        @return: ordered list of camera details
        """
        camera_dict = dict()
        camera_config_param = "/robot_config/camera_config"
        try:
            camera_dict = rospy.get_param(camera_config_param)
        except KeyError:
            rospy.logerr("RobotParam:get_camera_details cannot detect any "
                "cameras under the parameter {0}".format(camera_config_param))
        except (socket.error, socket.gaierror):
            self._log_networking_error()
        return  camera_dict
项目:intera_sdk    作者:RethinkRobotics    | 项目源码 | 文件源码
def get_robot_assemblies(self):
        """
        Return the names of the robot's assemblies from ROS parameter.

        @rtype: list [str]
        @return: ordered list of assembly names
                 (e.g. right, left, torso, head). on networked robot
        """
        assemblies = list()
        try:
            assemblies = rospy.get_param("/robot_config/assembly_names")
        except KeyError:
            rospy.logerr("RobotParam:get_robot_assemblies cannot detect assembly names"
                         " under param /robot_config/assembly_names")
        except (socket.error, socket.gaierror):
            self._log_networking_error()
        return assemblies
项目:intera_sdk    作者:RethinkRobotics    | 项目源码 | 文件源码
def get_joint_names(self, limb_name):
        """
        Return the names of the joints for the specified
        limb from ROS parameter.

        @type  limb_name: str
        @param limb_name: name of the limb for which to retrieve joint names

        @rtype: list [str]
        @return: ordered list of joint names from proximal to distal
                 (i.e. shoulder to wrist). joint names for limb
        """
        joint_names = list()
        try:
            joint_names = rospy.get_param(
                            "robot_config/{0}_config/joint_names".format(limb_name))
        except KeyError:
            rospy.logerr(("RobotParam:get_joint_names cannot detect joint_names for"
                          " arm \"{0}\"").format(limb_name))
        except (socket.error, socket.gaierror):
            self._log_networking_error()
        return joint_names
项目:intera_sdk    作者:RethinkRobotics    | 项目源码 | 文件源码
def move_to_neutral(self, timeout=15.0, speed=0.3):
        """
        Command the Limb joints to a predefined set of "neutral" joint angles.
        From rosparam named_poses/<limb>/poses/neutral.

        @type timeout: float
        @param timeout: seconds to wait for move to finish [15]
        @type speed: float
        @param speed: ratio of maximum joint speed for execution
                      default= 0.3; range= [0.0-1.0]
        """
        try:
            neutral_pose = rospy.get_param("named_poses/{0}/poses/neutral".format(self.name))
        except KeyError:
            rospy.logerr(("Get neutral pose failed, arm: \"{0}\".").format(self.name))
            return
        angles = dict(zip(self.joint_names(), neutral_pose))
        self.set_joint_position_speed(speed)
        return self.move_to_joint_positions(angles, timeout)
项目:CON-SAI    作者:SSL-Roots    | 项目源码 | 文件源码
def __init__(self):
        # initialize a socket
        self.host = rospy.get_param('~server_address', '127.0.0.1')
        self.port = rospy.get_param('~server_port', 20011)
        self.friend_color = rospy.get_param('/friend_color', 'yellow')

        rospy.loginfo('server address is set to [' + self.host         + ']')
        rospy.loginfo('server port is set to ['    + str(self.port)    + ']')
        rospy.loginfo('team color is set to ['     + self.friend_color + ']')

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

        # make subscribers
        self.subscribers = []
        for i in xrange(12):
            topic = "/robot_" + str(i) + "/robot_commands"
            self.subscribers.append(
                    rospy.Subscriber(
                        topic,
                        robot_commands,
                        self.sendCommands,
                        callback_args=i))
项目:mhri    作者:mhri    | 项目源码 | 文件源码
def __init__(self):
        rospy.init_node('fake_renderer', anonymous=True)

        try:
            topic_name = rospy.get_param('~action_name')
        except KeyError as e:
            print('[ERROR] Needed parameter for (topic_name)...')
            quit()

        if 'render_gesture' in rospy.get_name():
            self.GetInstalledGesturesService = rospy.Service(
                "get_installed_gestures",
                GetInstalledGestures,
                self.handle_get_installed_gestures
            )

            self.motion_list = {
                'neutral': ['neutral_motion1'],
                'encourge': ['encourge_motion1'],
                'attention': ['attention_motion1'],
                'consolation': ['consolation_motion1'],
                'greeting': ['greeting_motion1'],
                'waiting': ['waiting_motion1'],
                'advice': ['advice_motion1'],
                'praise': ['praise_motion1'],
                'command': ['command_motion1'],
            }

        self.server = actionlib.SimpleActionServer(
            topic_name, RenderItemAction, self.execute_callback, False)
        self.server.start()

        rospy.loginfo('[%s] initialized...' % rospy.get_name())
        rospy.spin()
项目:uwb_tracker_ros    作者:bennihepp    | 项目源码 | 文件源码
def _read_unit_offsets(self):
        if not rospy.has_param('~num_of_units'):
            rospy.logwarn("No unit offset parameters found!")
        num_of_units = rospy.get_param('~num_of_units', 0)
        self._unit_offsets = np.zeros((num_of_units, 3))
        self._unit_coefficients = np.zeros((num_of_units, 2))
        for i in xrange(num_of_units):
            unit_params = rospy.get_param('~unit_{}'.format(i))
            x = unit_params['x']
            y = unit_params['y']
            z = unit_params['z']
            self._unit_offsets[i, :] = [x, y, z]
            p0 = unit_params['p0']
            p1 = unit_params['p1']
            self._unit_coefficients[i, :] = [p0, p1]
        rospy.loginfo("Unit offsets: {}".format(self._unit_offsets))
        rospy.loginfo("Unit coefficients: {}".format(self._unit_coefficients))
项目:lqRRT    作者:jnez71    | 项目源码 | 文件源码
def __init__(self, image_path=None):
        height = int(rospy.get_param("~grid_height", 800))
        width = int(rospy.get_param("~grid_width", 800))
        resolution = rospy.get_param("~grid_resolution", .25)
        ogrid_topic = rospy.get_param("/lqrrt_node/ogrid_topic", "/ogrid")

        self.grid_drawer = DrawGrid(height, width, image_path)
        self.ogrid_pub = rospy.Publisher(ogrid_topic, OccupancyGrid, queue_size=1)

        m = MapMetaData()
        m.resolution = resolution
        m.width = width
        m.height = height
        pos = np.array([-width * resolution / 2, -height * resolution / 2, 0])
        quat = np.array([0, 0, 0, 1])
        m.origin = Pose()
        m.origin.position.x, m.origin.position.y = pos[:2]
        self.map_meta_data = m

        rospy.Timer(rospy.Duration(1), self.pub_grid)
项目:openag_brain    作者:OpenAgInitiative    | 项目源码 | 文件源码
def save_recipe_dp(self, variable):
        """
        Save the recipe start/end to the env. data pt. DB, so we can restart
        the recipe if necessary.
        """
        doc = EnvironmentalDataPoint({
            "environment": self.environment,
            "variable": variable,
            "is_desired": True,
            "value": rospy.get_param(params.CURRENT_RECIPE),
            "timestamp": rospy.get_time()
        })
        doc_id = gen_doc_id(rospy.get_time())
        self.env_data_db[doc_id] = doc


#------------------------------------------------------------------------------
# Our ROS node main entry point.  Starts up the node and then waits forever.
项目:openag_brain    作者:OpenAgInitiative    | 项目源码 | 文件源码
def test_valid_variables(self):
        ENVIRONMENTAL_VARIABLES = frozenset(
            VariableInfo.from_dict(d)
            for d in rospy.get_param("/var_types/environment_variables").itervalues())

        RECIPE_VARIABLES = frozenset(
            VariableInfo.from_dict(d)
            for d in rospy.get_param("/var_types/recipe_variables").itervalues())

        VALID_VARIABLES = ENVIRONMENTAL_VARIABLES.union(RECIPE_VARIABLES)
        print(VALID_VARIABLES)
        assert len(VALID_VARIABLES) == 19


        # This builds a dictionary of publisher instances using a
        # "dictionary comprehension" (syntactic sugar for building dictionaries).
        # The constant has to be declared here because get_message_class
        # needs to be called after the node is initialized.
        PUBLISHERS = {
            variable.name: rospy.Publisher(
                "{}/desired".format(variable.name),
                get_message_class(variable.type),
                queue_size=10)
            for variable in VALID_VARIABLES
        }
项目:needybot-speech    作者:needybot    | 项目源码 | 文件源码
def test_method_generate_digest(self):
        key = 'hello_world'
        entry = rospy.get_param(self.server.speech_key_to_param(key))
        parsed = self.server.parse_template(entry['template'], entry['params'])
        digest_a = self.server.generate_digest(key, parsed)

        self.assertIsNotNone(digest_a)
        self.assertIsInstance(digest_a, str)

        parsed = self.server.parse_template(
            entry['template'],
            _.assign(entry['params'], {'name': 'Needy'}))
        digest_b = self.server.generate_digest(key, parsed)

        self.assertIsNotNone(digest_b, 'is none')
        self.assertIsInstance(digest_b, str, 'is not a string')
        self.assertNotEqual(digest_a, digest_b,
                            'equal digests with diff params')
项目:needybot-speech    作者:needybot    | 项目源码 | 文件源码
def test_method_generate_cache_payload(self):
        digest = self.server.generate_digest(
            'hello_world',
            'Hello, world! My name is a needy robot.')
        expected = {
            'key': 'hello_world',
            'hash': digest,
            'file': os.path.join(
                self.server._cache_dir,
                '{}.{}'.format(digest, self.server.voice.codec)),
            'effects': ' '.join(self.server.effects),
            'voice': {
                'name': self.server.voice.voice_name,
                'speech_rate': self.server.voice.speech_rate,
                'codec': self.server.voice.codec
            },
            'template': rospy.get_param(
                self.server.speech_key_to_param('hello_world'))['template'],
            'params': {
                'name': 'a needy robot'
            }
        }
        payload = self.server.generate_cache_payload(self.action)
        self.assertIsNotNone(payload, 'no payload created')
        self.assertEqual(payload, expected, 'generated payload is incorrect')
项目:needybot-speech    作者:needybot    | 项目源码 | 文件源码
def warm_cache(self):
        """
        Collects all dialog in your manifest and stores them in the cache
        directory as well as creating entries in the cache manifest.
        """
        rospy.loginfo('Warming speech cache...')
        m = rospy.get_param('/needybot/speech/dialog')
        for k, v in m.iteritems():
            goal = SpeechGoal(key=k)
            gen = self.generate_cache_payload(goal)
            payload = self.fetch_from_cache(k, gen.get('hash'))
            if payload is None:
                payload = gen
                if self.validate_payload(payload):
                    self.add_to_cache(payload)
                    self.generate_raw_speech(payload)
                else:
                    raise TypeError('Invalid payload send to speech server')
        rospy.loginfo('Speech cache is ready.')
        self.warmed_pub.publish()
项目:AutonomousParking    作者:jovanduy    | 项目源码 | 文件源码
def __init__(self):
        """Constructor for the class
        initialize topic subscription and 
        instance variables
        """
        self.r = rospy.Rate(5)
        self.publisher = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
        rospy.Subscriber('/scan', LaserScan, self.process_scan)
        # user chooses which parking mode to be performed
        self.is_parallel = rospy.get_param('~parallel', False)

        # Instance Variables
        self.timestamp1 = None
        self.timestamp2 = None
        self.dist2Neato = None
        self.dist2Wall = None
        self.widthOfSpot = None
        self.twist = None
        self.radius = None
        # Adjusment to be made before moving along the arc
        self.adjustment = 0 
        self.isAligned = False
项目:needybot-core    作者:needybot    | 项目源码 | 文件源码
def __init__(self, name, blackboard):

        """
        Tree task responsible for spawning new tasks. Once a task has a status of completed of failure
        this branch of the tree is executed. The timer waits for 5 seconds before sending the final 
        complete signal to the running task and spawning a new one.

        Args:
            name(str):              Name of this behavior tree task for identification.
            blackboard(Blackboard): Behavior tree blackboard that stores global vars.
        """

        super(Spawner, self).__init__(name, blackboard)
        self.timer = None
        self.spawning = False
        self.spawn_delay = rospy.get_param("~spawn-delay", 0.0)
项目:srcsim2017    作者:ZarjRobotics    | 项目源码 | 文件源码
def __init__(self, frame, transform_module=None):
        rospy.loginfo("Zarj eyes initialization begin")
        self.bridge = CvBridge()

        if transform_module is not None:
            self.tf = transform_module
        else:
            self.tf = tfzarj.TfZarj(rospy.get_param('/ihmc_ros/robot_name'))
            self.tf.init_transforms()

        self.detection_requests = []
        self.frame_id = frame

        self.image_sub_left = None
        self.image_sub_cloud = None

        rospy.loginfo("Zarj eyes initialization finished")
项目:srcsim2017    作者:ZarjRobotics    | 项目源码 | 文件源码
def lookupFeet(name, tfBuffer):
    global lf_start_position
    global lf_start_orientation
    global rf_start_position
    global rf_start_orientation

    lfp = "/ihmc_ros/{0}/left_foot_frame_name".format(name)
    rfp = "/ihmc_ros/{0}/right_foot_frame_name".format(name)

    if rospy.has_param(rfp) and rospy.has_param(lfp):
        lfname = rospy.get_param(lfp)
        rfname = rospy.get_param(rfp)

        leftFootWorld = tfBuffer.lookup_transform('world', lfname, rospy.Time())
        lf_start_orientation = leftFootWorld.transform.rotation
        lf_start_position = leftFootWorld.transform.translation

        rightFootWorld = tfBuffer.lookup_transform('world', rfname, rospy.Time())
        rf_start_orientation = rightFootWorld.transform.rotation
        rf_start_position = rightFootWorld.transform.translation
项目:srcsim2017    作者:ZarjRobotics    | 项目源码 | 文件源码
def lookupFeet(name, tfBuffer):
    global lf_start_position
    global lf_start_orientation
    global rf_start_position
    global rf_start_orientation

    lfp = "/ihmc_ros/{0}/left_foot_frame_name".format(name)
    rfp = "/ihmc_ros/{0}/right_foot_frame_name".format(name)

    if rospy.has_param(rfp) and rospy.has_param(lfp):
        lfname = rospy.get_param(lfp)
        rfname = rospy.get_param(rfp)

        leftFootWorld = tfBuffer.lookup_transform('world', lfname, rospy.Time())
        lf_start_orientation = leftFootWorld.transform.rotation
        lf_start_position = leftFootWorld.transform.translation

        rightFootWorld = tfBuffer.lookup_transform('world', rfname, rospy.Time())
        rf_start_orientation = rightFootWorld.transform.rotation
        rf_start_position = rightFootWorld.transform.translation
项目:srcsim2017    作者:ZarjRobotics    | 项目源码 | 文件源码
def lookupFeet(name, tfBuffer):
    global lf_start_position
    global lf_start_orientation
    global rf_start_position
    global rf_start_orientation

    lfp = "/ihmc_ros/{0}/left_foot_frame_name".format(name)
    rfp = "/ihmc_ros/{0}/right_foot_frame_name".format(name)

    if rospy.has_param(rfp) and rospy.has_param(lfp):
        lfname = rospy.get_param(lfp)
        rfname = rospy.get_param(rfp)

        leftFootWorld = tfBuffer.lookup_transform('world', lfname, rospy.Time())
        lf_start_orientation = leftFootWorld.transform.rotation
        lf_start_position = leftFootWorld.transform.translation

        rightFootWorld = tfBuffer.lookup_transform('world', rfname, rospy.Time())
        rf_start_orientation = rightFootWorld.transform.rotation
        rf_start_position = rightFootWorld.transform.translation
项目:cu-perception-manipulation-stack    作者:correlllab    | 项目源码 | 文件源码
def wait(self):
        """
        Waits for and verifies trajectory execution result
        """
        #create a timeout for our trajectory execution
        #total time trajectory expected for trajectory execution plus a buffer
        last_time = self._r_goal.trajectory.points[-1].time_from_start.to_sec()
        time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
        timeout = rospy.Duration(self._slow_move_offset +
                                 last_time +
                                 time_buffer)

        l_finish = self._left_client.wait_for_result(timeout)
        r_finish = self._right_client.wait_for_result(timeout)
        l_result = (self._left_client.get_result().error_code == 0)
        r_result = (self._right_client.get_result().error_code == 0)

        #verify result
        if all([l_finish, r_finish, l_result, r_result]):
            return True
        else:
            msg = ("Trajectory action failed or did not finish before "
                   "timeout/interrupt.")
            rospy.logwarn(msg)
            return False
项目:telegram_robot    作者:uts-magic-lab    | 项目源码 | 文件源码
def __init__(self):
        token = rospy.get_param('/telegram/token', None)
        if token is None:
            rospy.logerr("No token found in /telegram/token param server.")
            exit(0)
        else:
            rospy.loginfo("Got telegram bot token from param server.")

        # Set CvBridge
        self.bridge = CvBridge()

        # Create the EventHandler and pass it your bot's token.
        updater = Updater(token)

        # Get the dispatcher to register handlers
        dp = updater.dispatcher

        # on noncommand i.e message - echo the message on Telegram
        dp.add_handler(MessageHandler(Filters.text, self.pub_received))

        # log all errors
        dp.add_error_handler(self.error)

        # Start the Bot
        updater.start_polling()
项目:telegram_robot    作者:uts-magic-lab    | 项目源码 | 文件源码
def __init__(self):
        self.base_pub = rospy.Publisher("/base_controller/command", Twist,
                                        queue_size=1)
        token = rospy.get_param('/telegram/token', None)

        # Create the Updater and pass it your bot's token.
        updater = Updater(token)

        # Add command and error handlers
        updater.dispatcher.add_handler(CommandHandler('start', self.start))
        updater.dispatcher.add_handler(CommandHandler('help', self.help))
        updater.dispatcher.add_handler(MessageHandler(Filters.text, self.echo))
        updater.dispatcher.add_error_handler(self.error)

        # Start the Bot
        updater.start_polling()
项目:APEX    作者:ymollard    | 项目源码 | 文件源码
def __init__(self):
        self.rospack = RosPack()
        with open(join(self.rospack.get_path('apex_playground'), 'config', 'torso.json')) as f:
            self.params = json.load(f)

        self.publish_rate = rospy.Rate(self.params['publish_rate'])
        self.demo = rospy.get_param('demo_mode')

        # Protected resources
        self.in_rest_pose = False
        self.robot_lock = RLock()

        # Used services
        self.torso = TorsoServices(self.params['robot_name'])

        # Proposed services
        self.reset_srv_name = 'torso/reset'
        self.reset_srv = None
项目:APEX    作者:ymollard    | 项目源码 | 文件源码
def __init__(self):
        self.rospack = RosPack()
        with open(join(self.rospack.get_path('apex_playground'), 'config', 'general.json')) as f:
            self.params = json.load(f)

        with open(join(self.rospack.get_path('apex_playground'), 'config', 'torso.json')) as f:
            self.torso_params = json.load(f)

        self.outside_ros = rospy.get_param('/use_sim_time', False)  # True if work manager <-> controller comm must use ZMQ
        id = search(r"(\d+)", rospy.get_namespace())
        self.worker_id = 0 if id is None else int(id.groups()[0])  # TODO string worker ID
        self.work = WorkManager(self.worker_id, self.outside_ros)
        self.torso = Torso()
        self.ergo = Ergo()
        self.learning = Learning()
        self.perception = Perception()
        self.recorder = Recorder()
        self.demonstrate = ""  # Skill (Target space for Produce) or empty string if not running assessment

        # Served services
        self.service_name_demonstrate = "controller/assess"
        rospy.Service(self.service_name_demonstrate, Assess, self.cb_assess)

        rospy.loginfo('Controller fully started!')
项目:APEX    作者:ymollard    | 项目源码 | 文件源码
def __init__(self):
        self.rospack = RosPack()
        self.port = rospy.get_param('ui_port', 80 if os.getuid() == 0 else 5000)
        self.web_app_root = join(self.rospack.get_path('apex_playground'), 'webapp', 'static')
        self.app = Flask(__name__, static_url_path='', static_folder=self.web_app_root)
        self.cors = CORS(self.app, resources={r'/api/*': {'origins': '*'}})
        self.services = UserServices()
        self.window_length = 500
        self.display_point_interval = 1

        self.app.route('/')(self.root)
        self.app.route('/api/interests', methods=['GET'])(self.experiment_status)
        self.app.route('/api/focus', methods=['POST'])(self.update_focus)
        self.app.route('/api/time-travel', methods=['POST'])(self.time_travel)
        self.app.route('/api/reset', methods=['POST'])(self.reset)
        self.app.route('/api/assessment', methods=['POST'])(self.update_assessment)
项目:overhead_mobile_tracker    作者:NU-MSR    | 项目源码 | 文件源码
def __init__(self):
        self.marker_id = rospy.get_param("~marker_id", 12)
        self.frame_id = rospy.get_param("~frame_id", "odom_meas")
        self.camera_frame_id = rospy.get_param("~camera_frame_id", "overhead_cam_frame")
        self.count = rospy.get_param("~count", 50)

        # local vars:
        self.calibrate_flag = False
        self.calibrated = False
        self.calibrate_count = 0
        self.kb = kbhit.KBHit()
        self.trans_arr = np.zeros((self.count, 3))
        self.quat_arr = np.zeros((self.count, 4))
        self.trans_ave = np.zeros(3)
        self.quat_ave = np.zeros(3)

        # now create subscribers, timers, and publishers
        self.br = tf.TransformBroadcaster()
        self.kb_timer = rospy.Timer(rospy.Duration(0.1), self.keycb)
        rospy.on_shutdown(self.kb.set_normal_term)
        self.alvar_sub = rospy.Subscriber("ar_pose_marker", AlvarMarkers, self.alvarcb)
        return
项目:mrobot-indigo    作者:ROSClub1    | 项目源码 | 文件源码
def _init_params(self):
        self.port = rospy.get_param('~port', self.default_port)
        self.robot_type = rospy.get_param('~robot_type', 'create')
        #self.baudrate = rospy.get_param('~baudrate', self.default_baudrate)
        self.update_rate = rospy.get_param('~update_rate', self.default_update_rate)
        self.drive_mode = rospy.get_param('~drive_mode', 'twist')
        self.has_gyro = rospy.get_param('~has_gyro', True)
        self.odom_angular_scale_correction = rospy.get_param('~odom_angular_scale_correction', 1.0)
        self.odom_linear_scale_correction = rospy.get_param('~odom_linear_scale_correction', 1.0)
        self.cmd_vel_timeout = rospy.Duration(rospy.get_param('~cmd_vel_timeout', 0.6))
        self.stop_motors_on_bump = rospy.get_param('~stop_motors_on_bump', True)
        self.min_abs_yaw_vel = rospy.get_param('~min_abs_yaw_vel', None)
        self.max_abs_yaw_vel = rospy.get_param('~max_abs_yaw_vel', None)
        self.publish_tf = rospy.get_param('~publish_tf', False)
        self.odom_frame = rospy.get_param('~odom_frame', 'odom')
        self.base_frame = rospy.get_param('~base_frame', 'base_footprint')
        self.operate_mode = rospy.get_param('~operation_mode', 3)

        rospy.loginfo("serial port: %s"%(self.port))
        rospy.loginfo("update_rate: %s"%(self.update_rate))
        rospy.loginfo("drive mode: %s"%(self.drive_mode))
        rospy.loginfo("has gyro: %s"%(self.has_gyro))
项目:decawave_localization    作者:mit-drl    | 项目源码 | 文件源码
def __init__(self):
        self.frame_id = rospy.get_param("~frame_id", "map")
        cov_x = rospy.get_param("~cov_x", 0.6)
        cov_y = rospy.get_param("~cov_y", 0.6)
        cov_z = rospy.get_param("~cov_z", 0.6)
        self.cov = self.cov_matrix(cov_x, cov_y, cov_z)
        self.ps_pub = rospy.Publisher(
            POSE_TOPIC, PoseStamped, queue_size=1)
        self.ps_cov_pub = rospy.Publisher(
            POSE_COV_TOPIC, PoseWithCovarianceStamped, queue_size=1)
        self.ps_pub_3d = rospy.Publisher(
            POSE_TOPIC_3D, PoseStamped, queue_size=1)
        self.ps_cov_pub_3d = rospy.Publisher(
            POSE_COV_TOPIC_3D, PoseWithCovarianceStamped, queue_size=1)
        self.last = None
        self.listener = tf.TransformListener()
        self.tag_range_topics = rospy.get_param("~tag_range_topics")
        self.subs = list()
        self.ranges = dict()
        self.tag_pos = dict()
        self.altitude = 0.0
        self.last_3d = None
        for topic in self.tag_range_topics:
            self.subs.append(rospy.Subscriber(topic, Range, self.range_cb))
项目:cozmo_driver    作者:OTL    | 项目源码 | 文件源码
def __init__(self):
        # setup
        CozmoTeleop.settings = termios.tcgetattr(sys.stdin)
        atexit.register(self.reset_terminal)

        # vars
        self.head_angle = STD_HEAD_ANGLE
        self.lift_height = STD_LIFT_HEIGHT

        # params
        self.lin_vel = rospy.get_param('~lin_vel', 0.2)
        self.ang_vel = rospy.get_param('~ang_vel', 1.5757)

        # pubs
        self._head_pub = rospy.Publisher('head_angle', Float64, queue_size=1)
        self._lift_pub = rospy.Publisher('lift_height', Float64, queue_size=1)
        self._cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=1)
项目:xm_strategy    作者:xm-project    | 项目源码 | 文件源码
def __init__(self,dis=0.0):
        State.__init__(self, outcomes=['succeeded','preempted','aborted'],input_keys=['target'])
        rospy.on_shutdown(self.shutdown)
        self.test_distance = target.y-dis
        self.rate = 200
        self.r = rospy.Rate(self.rate)
        self.speed = rospy.get_param('~speed',0.08)
        self.tolerance = rospy.get_param('~tolerance', 0.01)
        self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        self.tf_listener = tf.TransformListener()
        rospy.sleep(1)
        self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
        #define a bianliang
        self.flag = rospy.get_param('~flag', True)
        rospy.loginfo(self.test_distance)
        #tf get position

        #publish cmd_vel
项目:xm_strategy    作者:xm-project    | 项目源码 | 文件源码
def __init__(self,angle=0):
        State.__init__(self, outcomes=['succeeded','aborted','preempted'],input_keys=["angle_in"])
        self.speed = rospy.get_param('~speed', 0.10)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)

        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
项目:xm_strategy    作者:xm-project    | 项目源码 | 文件源码
def __init__(self, angle=0):
        State.__init__(self, outcomes=['succeeded', 'aborted', 'preempted'], input_keys=["angle_in"])
        self.speed = rospy.get_param('~speed', 0.03)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)
        self.angle=angle
        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
项目:xm_strategy    作者:xm-project    | 项目源码 | 文件源码
def __init__(self, angle=0):
        State.__init__(self, outcomes=['succeeded', 'aborted', 'preempted'])
        self.speed = rospy.get_param('~speed', 0.03)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)
        self.angle=angle
        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
项目:xm_strategy    作者:xm-project    | 项目源码 | 文件源码
def __init__(self,dis=0):
        State.__init__(self, outcomes=['succeeded','preempted','aborted'])
        rospy.on_shutdown(self.shutdown)
        self.test_distance = dis
        self.rate = 100
        self.r = rospy.Rate(self.rate)
        self.speed = rospy.get_param('~speed',0.08)
        self.tolerance = rospy.get_param('~tolerance', 0.01)
        self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        self.tf_listener = tf.TransformListener()
        rospy.sleep(1)
        self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
        self.flag = rospy.get_param('~flag', True)
        #tf get position
        self.position = Point()
        self.position = self.get_position()
        self.y_start = self.position.y
        self.x_start = self.position.x
        #publish cmd_vel
项目:xm_strategy    作者:xm-project    | 项目源码 | 文件源码
def __init__(self,dis=0.0):
        State.__init__(self, outcomes=['succeeded','preempted','aborted'])
        rospy.on_shutdown(self.shutdown)
        self.test_distance = target.y-0.0
        self.rate = 100
        self.r = rospy.Rate(self.rate)
        self.speed = rospy.get_param('~speed',0.08)
        self.tolerance = rospy.get_param('~tolerance', 0.01)
        self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        self.tf_listener = tf.TransformListener()
        rospy.sleep(1)
        self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
        #define a bianliang
        self.flag = rospy.get_param('~flag', True)
        rospy.loginfo(self.test_distance)
        #tf get position

        #publish cmd_vel
项目:xm_strategy    作者:xm-project    | 项目源码 | 文件源码
def __init__(self,angle=0):
        State.__init__(self, outcomes=['succeeded','aborted','preempted'])
        self.speed = rospy.get_param('~speed', 0.20)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)
        self.angle = angle
        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
项目:xm_strategy    作者:xm-project    | 项目源码 | 文件源码
def __init__(self,angle=0):
        State.__init__(self, outcomes=['succeeded','aborted','preempted'])
        self.speed = rospy.get_param('~speed', 0.20)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)
        self.angle = angle
        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))