我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rospy.get_param()。
def __init__(self): self.run_recognition = rospy.get_param('/rgb_object_detection/run_recognition') self.model_filename = rospy.get_param('/rgb_object_detection/model_file') self.weights_filename = rospy.get_param('/rgb_object_detection/weights_file') self.categories_filename = rospy.get_param('/rgb_object_detection/category_file') self.verbose = rospy.get_param('/rgb_object_detection/verbose', False) self.bridge = CvBridge() self.image_sub = rospy.Subscriber('/camera/rgb/image_color', Image, self.img_cb) self.patches_sub = rospy.Subscriber('/candidate_regions_depth', PolygonStamped, self.patches_cb) self.detection_pub = rospy.Publisher('/detections', Detection, queue_size=1) # you can read this value off of your sensor from the '/camera/depth_registered/camera_info' topic self.P = np.array([[525.0, 0.0, 319.5, 0.0], [0.0, 525.0, 239.5, 0.0], [0.0, 0.0, 1.0, 0.0]]) if self.run_recognition: self.cnn = CNN('', self.model_filename, self.weights_filename, self.categories_filename, '', 0, 0, self.verbose) self.cnn.load_model()
def __init__(self): rospy.init_node('multiplexer_node', anonymous=False) rospy.loginfo('\033[93m[%s]\033[0m wait for bringup social_events_memory...'%rospy.get_name()) rospy.wait_for_service('social_events_memory/read_data') self.rd_event_mem = rospy.ServiceProxy('social_events_memory/read_data', ReadData) rospy.Subscriber('forwarding_event', ForwardingEvent, self.handle_social_events) self.pub_rasing_event = rospy.Publisher('raising_events', RaisingEvents, queue_size=10) self.events_queue = Queue.Queue() self.recognized_words_queue = Queue.Queue() event_period = rospy.get_param('~event_period', 0.5) rospy.Timer(rospy.Duration(event_period), self.handle_trigger_events) rospy.loginfo('\033[93m[%s]\033[0m initialized...'%rospy.get_name()) rospy.spin()
def __init__(self): # first let's load all parameters: self.frame_id = rospy.get_param("~frame_id", "odom_meas") self.camera_frame_id = rospy.get_param("~camera_frame_id", "overhead_cam_frame") self.base_frame_id = rospy.get_param("~base_frame_id", "base_meas") self.x0 = rospy.get_param("~x0", 0.0) self.y0 = rospy.get_param("~y0", 0.0) self.th0 = rospy.get_param("~th0", 0.0) self.pubstate = rospy.get_param("~pubstate", True) self.marker_id = rospy.get_param("~marker_id", 12) # setup other required vars: self.odom_offset = odom_conversions.numpy_to_odom(np.array([self.x0, self.y0, self.th0]), self.frame_id) self.path_list = deque([], maxlen=PATH_LEN) # now let's create publishers, listeners, and subscribers self.br = tf.TransformBroadcaster() self.listener = tf.TransformListener() self.meas_pub = rospy.Publisher("meas_pose", Odometry, queue_size=5) self.path_pub = rospy.Publisher("meas_path", Path, queue_size=1) self.alvar_sub = rospy.Subscriber("ar_pose_marker", AlvarMarkers, self.alvarcb) self.publish_serv = rospy.Service("publish_bool", SetBool, self.pub_bool_srv_cb) self.offset_serv = rospy.Service("set_offset", SetOdomOffset, self.offset_srv_cb) return
def __init__(self): """ Create HeadLiftJoy object. """ # params self._head_axes = rospy.get_param('~head_axes', 3) self._lift_axes = rospy.get_param('~lift_axes', 3) self._head_button = rospy.get_param('~head_button', 4) self._lift_button = rospy.get_param('~lift_button', 5) # pubs self._head_pub = rospy.Publisher('head_angle', Float64, queue_size=1) self._lift_pub = rospy.Publisher('lift_height', Float64, queue_size=1) # subs self._joy_sub = rospy.Subscriber('joy', Joy, self._joy_cb, queue_size=1)
def _read_unit_offsets(self): if not rospy.has_param('~num_of_units'): rospy.logwarn("No unit offset parameters found!") num_of_units = rospy.get_param('~num_of_units', 0) self._unit_offsets = np.zeros((num_of_units, 3)) self._unit_coefficients = np.zeros((num_of_units, 2)) for i in xrange(num_of_units): unit_params = rospy.get_param('~unit_{}'.format(i)) x = unit_params['x'] y = unit_params['y'] z = unit_params['z'] self._unit_offsets[i, :] = [x, y, z] p0 = unit_params['p0'] p1 = unit_params['p1'] self._unit_coefficients[i, :] = [p0, p1] rospy.loginfo("Unit offsets: {}".format(self._unit_offsets)) rospy.loginfo("Unit coefficients: {}".format(self._unit_coefficients))
def get_topic_service_names(self): topic_names = {} service_names = {} # Topics. topic_names['msf_odometry'] = rospy.get_param('~msf_odometry_topic', 'msf_core/odometry') # Services. service_names['init_msf_height'] = rospy.get_param('~init_msf_height_srv', 'pose_sensor_rovio/pose_sensor/initialize_msf_height') service_names['init_msf_scale'] = rospy.get_param('~init_msf_scale_srv', 'pose_sensor_rovio/pose_sensor/initialize_msf_scale') # Check if we should add a leading namespace name_space = rospy.get_param('~namespace', '') for key, value in topic_names.iteritems(): topic_names[key] = helpers.get_full_namespace(name_space, value) for key, value in service_names.iteritems(): service_names[key] = helpers.get_full_namespace(name_space, value) return topic_names, service_names
def get_topic_names(self): # RTK info topics topic_names = {} topic_names['piksi_receiver_state'] = rospy.get_param('~piksi_receiver_state_topic', 'piksi/debug/receiver_state') topic_names['piksi_uart_state'] = rospy.get_param('~piksi_uart_state_topic', 'piksi/debug/uart_state') topic_names['piksi_baseline_ned'] = rospy.get_param('~piksi_baseline_ned_topic', 'piksi/baseline_ned') topic_names['piksi_wifi_corrections'] = rospy.get_param('~piksi_num_wifi_corrections_topic', 'piksi/debug/wifi_corrections') topic_names['piksi_navsatfix_rtk_fix'] = rospy.get_param('~piksi_navsatfix_rtk_fix', 'piksi/navsatfix_rtk_fix') topic_names['piksi_age_of_corrections'] = rospy.get_param('~piksi_age_of_corrections_topic', 'piksi/age_of_corrections') # Check if we should add a leading namespace name_space = rospy.get_param('~namespace', '') for key, value in topic_names.iteritems(): topic_names[key] = helpers.get_full_namespace(name_space, value) return topic_names
def __init__(self): if rospy.has_param('~orientation_offset'): # Orientation offset as quaterion q = [x,y,z,w]. self.orientation_offset = rospy.get_param('~orientation_offset') else: yaw_offset_deg = rospy.get_param('~yaw_offset_deg', 0.0) self.orientation_offset = tf.quaternion_from_euler(0.0, 0.0, math.radians(yaw_offset_deg)) rospy.Subscriber(rospy.get_name() + "/imu_in", Imu, self.imu_callback) self.pub_imu_out = rospy.Publisher(rospy.get_name() + '/imu_out', Imu, queue_size=10) rospy.spin()
def init(self): pygame.init() clock = pygame.time.Clock() screen = pygame.display.set_mode((250, 250)) rospy.init_node('highway_teleop') self.rate = rospy.Rate(rospy.get_param('~hz', 10)) self.acc = rospy.get_param('~acc', 5) self.yaw = rospy.get_param('~yaw', 0) self.robot_pub = rospy.Publisher('robot_0/cmd_vel', Twist, queue_size=1) print "Usage: \n \ up arrow: accelerate \n \ down arrow: decelerate \n \ left arrow: turn left \n \ right arrow: turn right"
def init(self): pygame.init() clock = pygame.time.Clock() screen = pygame.display.set_mode((250, 250)) rospy.init_node('teleop') self.rate = rospy.Rate(rospy.get_param('~hz', 20)) self.acc = rospy.get_param('~acc', 1) self.yaw = rospy.get_param('~yaw', 0.25) self.robot_pub = rospy.Publisher('control_command', controlCommand, queue_size=1) self.path_record_pub = rospy.Publisher('record_state', \ RecordState, queue_size=1) self.highway_game_start_pub = rospy.Publisher('highway_game_start', RecordState, queue_size=1) print "Usage: \n \ up arrow: accelerate \n \ down arrow: decelerate \n \ left arrow: turn left \n \ right arrow: turn right"
def from_parameters(self): """ Stores this calibration as ROS parameters in the namespace of the current node. :rtype: None """ calib_dict = {} root_params = ['eye_on_hand', 'tracking_base_frame'] for rp in root_params: calib_dict[rp] = rospy.get_param(rp) if calib_dict['eye_on_hand']: calib_dict['robot_effector_frame'] = rospy.get_param('robot_effector_frame') else: calib_dict['robot_base_frame'] = rospy.get_param('robot_base_frame') transf_params = 'x', 'y', 'z', 'qx', 'qy', 'qz', 'qw' calib_dict['transformation'] = {} for tp in transf_params: calib_dict['transformation'][tp] = rospy.get_param('transformation/'+tp) self.from_dict(calib_dict)
def __init__(self): self.eye_on_hand = rospy.get_param('eye_on_hand', False) # tf names self.robot_base_frame = rospy.get_param('robot_base_frame', 'base_link') self.robot_effector_frame = rospy.get_param('robot_effector_frame', 'tool0') self.tracking_base_frame = rospy.get_param('tracking_base_frame', 'optical_origin') self.tracking_marker_frame = rospy.get_param('tracking_marker_frame', 'optical_target') rospy.wait_for_service(hec.GET_SAMPLE_LIST_TOPIC) self.get_sample_proxy = rospy.ServiceProxy(hec.GET_SAMPLE_LIST_TOPIC, hec.srv.TakeSample) rospy.wait_for_service(hec.TAKE_SAMPLE_TOPIC) self.take_sample_proxy = rospy.ServiceProxy(hec.TAKE_SAMPLE_TOPIC, hec.srv.TakeSample) rospy.wait_for_service(hec.REMOVE_SAMPLE_TOPIC) self.remove_sample_proxy = rospy.ServiceProxy(hec.REMOVE_SAMPLE_TOPIC, hec.srv.RemoveSample) rospy.wait_for_service(hec.COMPUTE_CALIBRATION_TOPIC) self.compute_calibration_proxy = rospy.ServiceProxy(hec.COMPUTE_CALIBRATION_TOPIC, hec.srv.ComputeCalibration) rospy.wait_for_service(hec.SAVE_CALIBRATION_TOPIC) self.save_calibration_proxy = rospy.ServiceProxy(hec.SAVE_CALIBRATION_TOPIC, std_srvs.srv.Empty)
def wait(self): """ Waits for and verifies trajectory execution result """ #create a timeout for our trajectory execution #total time trajectory expected for trajectory execution plus a buffer last_time = self.goal.trajectory.points[-1].time_from_start.to_sec() time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5 timeout = rospy.Duration(self._slow_move_offset + last_time + time_buffer) finish = self._limb_client.wait_for_result(timeout) result = (self._limb_client.get_result().error_code == 0) #verify result if all([finish, result]): return True else: msg = ("Trajectory action failed or did not finish before " "timeout/interrupt.") rospy.logwarn(msg) return False
def get_camera_details(self): """ Return the details of the cameras. @rtype: list [str] @return: ordered list of camera details """ camera_dict = dict() camera_config_param = "/robot_config/camera_config" try: camera_dict = rospy.get_param(camera_config_param) except KeyError: rospy.logerr("RobotParam:get_camera_details cannot detect any " "cameras under the parameter {0}".format(camera_config_param)) except (socket.error, socket.gaierror): self._log_networking_error() return camera_dict
def get_robot_assemblies(self): """ Return the names of the robot's assemblies from ROS parameter. @rtype: list [str] @return: ordered list of assembly names (e.g. right, left, torso, head). on networked robot """ assemblies = list() try: assemblies = rospy.get_param("/robot_config/assembly_names") except KeyError: rospy.logerr("RobotParam:get_robot_assemblies cannot detect assembly names" " under param /robot_config/assembly_names") except (socket.error, socket.gaierror): self._log_networking_error() return assemblies
def get_joint_names(self, limb_name): """ Return the names of the joints for the specified limb from ROS parameter. @type limb_name: str @param limb_name: name of the limb for which to retrieve joint names @rtype: list [str] @return: ordered list of joint names from proximal to distal (i.e. shoulder to wrist). joint names for limb """ joint_names = list() try: joint_names = rospy.get_param( "robot_config/{0}_config/joint_names".format(limb_name)) except KeyError: rospy.logerr(("RobotParam:get_joint_names cannot detect joint_names for" " arm \"{0}\"").format(limb_name)) except (socket.error, socket.gaierror): self._log_networking_error() return joint_names
def move_to_neutral(self, timeout=15.0, speed=0.3): """ Command the Limb joints to a predefined set of "neutral" joint angles. From rosparam named_poses/<limb>/poses/neutral. @type timeout: float @param timeout: seconds to wait for move to finish [15] @type speed: float @param speed: ratio of maximum joint speed for execution default= 0.3; range= [0.0-1.0] """ try: neutral_pose = rospy.get_param("named_poses/{0}/poses/neutral".format(self.name)) except KeyError: rospy.logerr(("Get neutral pose failed, arm: \"{0}\".").format(self.name)) return angles = dict(zip(self.joint_names(), neutral_pose)) self.set_joint_position_speed(speed) return self.move_to_joint_positions(angles, timeout)
def __init__(self): # initialize a socket self.host = rospy.get_param('~server_address', '127.0.0.1') self.port = rospy.get_param('~server_port', 20011) self.friend_color = rospy.get_param('/friend_color', 'yellow') rospy.loginfo('server address is set to [' + self.host + ']') rospy.loginfo('server port is set to [' + str(self.port) + ']') rospy.loginfo('team color is set to [' + self.friend_color + ']') self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # make subscribers self.subscribers = [] for i in xrange(12): topic = "/robot_" + str(i) + "/robot_commands" self.subscribers.append( rospy.Subscriber( topic, robot_commands, self.sendCommands, callback_args=i))
def __init__(self): rospy.init_node('fake_renderer', anonymous=True) try: topic_name = rospy.get_param('~action_name') except KeyError as e: print('[ERROR] Needed parameter for (topic_name)...') quit() if 'render_gesture' in rospy.get_name(): self.GetInstalledGesturesService = rospy.Service( "get_installed_gestures", GetInstalledGestures, self.handle_get_installed_gestures ) self.motion_list = { 'neutral': ['neutral_motion1'], 'encourge': ['encourge_motion1'], 'attention': ['attention_motion1'], 'consolation': ['consolation_motion1'], 'greeting': ['greeting_motion1'], 'waiting': ['waiting_motion1'], 'advice': ['advice_motion1'], 'praise': ['praise_motion1'], 'command': ['command_motion1'], } self.server = actionlib.SimpleActionServer( topic_name, RenderItemAction, self.execute_callback, False) self.server.start() rospy.loginfo('[%s] initialized...' % rospy.get_name()) rospy.spin()
def __init__(self, image_path=None): height = int(rospy.get_param("~grid_height", 800)) width = int(rospy.get_param("~grid_width", 800)) resolution = rospy.get_param("~grid_resolution", .25) ogrid_topic = rospy.get_param("/lqrrt_node/ogrid_topic", "/ogrid") self.grid_drawer = DrawGrid(height, width, image_path) self.ogrid_pub = rospy.Publisher(ogrid_topic, OccupancyGrid, queue_size=1) m = MapMetaData() m.resolution = resolution m.width = width m.height = height pos = np.array([-width * resolution / 2, -height * resolution / 2, 0]) quat = np.array([0, 0, 0, 1]) m.origin = Pose() m.origin.position.x, m.origin.position.y = pos[:2] self.map_meta_data = m rospy.Timer(rospy.Duration(1), self.pub_grid)
def save_recipe_dp(self, variable): """ Save the recipe start/end to the env. data pt. DB, so we can restart the recipe if necessary. """ doc = EnvironmentalDataPoint({ "environment": self.environment, "variable": variable, "is_desired": True, "value": rospy.get_param(params.CURRENT_RECIPE), "timestamp": rospy.get_time() }) doc_id = gen_doc_id(rospy.get_time()) self.env_data_db[doc_id] = doc #------------------------------------------------------------------------------ # Our ROS node main entry point. Starts up the node and then waits forever.
def test_valid_variables(self): ENVIRONMENTAL_VARIABLES = frozenset( VariableInfo.from_dict(d) for d in rospy.get_param("/var_types/environment_variables").itervalues()) RECIPE_VARIABLES = frozenset( VariableInfo.from_dict(d) for d in rospy.get_param("/var_types/recipe_variables").itervalues()) VALID_VARIABLES = ENVIRONMENTAL_VARIABLES.union(RECIPE_VARIABLES) print(VALID_VARIABLES) assert len(VALID_VARIABLES) == 19 # This builds a dictionary of publisher instances using a # "dictionary comprehension" (syntactic sugar for building dictionaries). # The constant has to be declared here because get_message_class # needs to be called after the node is initialized. PUBLISHERS = { variable.name: rospy.Publisher( "{}/desired".format(variable.name), get_message_class(variable.type), queue_size=10) for variable in VALID_VARIABLES }
def test_method_generate_digest(self): key = 'hello_world' entry = rospy.get_param(self.server.speech_key_to_param(key)) parsed = self.server.parse_template(entry['template'], entry['params']) digest_a = self.server.generate_digest(key, parsed) self.assertIsNotNone(digest_a) self.assertIsInstance(digest_a, str) parsed = self.server.parse_template( entry['template'], _.assign(entry['params'], {'name': 'Needy'})) digest_b = self.server.generate_digest(key, parsed) self.assertIsNotNone(digest_b, 'is none') self.assertIsInstance(digest_b, str, 'is not a string') self.assertNotEqual(digest_a, digest_b, 'equal digests with diff params')
def test_method_generate_cache_payload(self): digest = self.server.generate_digest( 'hello_world', 'Hello, world! My name is a needy robot.') expected = { 'key': 'hello_world', 'hash': digest, 'file': os.path.join( self.server._cache_dir, '{}.{}'.format(digest, self.server.voice.codec)), 'effects': ' '.join(self.server.effects), 'voice': { 'name': self.server.voice.voice_name, 'speech_rate': self.server.voice.speech_rate, 'codec': self.server.voice.codec }, 'template': rospy.get_param( self.server.speech_key_to_param('hello_world'))['template'], 'params': { 'name': 'a needy robot' } } payload = self.server.generate_cache_payload(self.action) self.assertIsNotNone(payload, 'no payload created') self.assertEqual(payload, expected, 'generated payload is incorrect')
def warm_cache(self): """ Collects all dialog in your manifest and stores them in the cache directory as well as creating entries in the cache manifest. """ rospy.loginfo('Warming speech cache...') m = rospy.get_param('/needybot/speech/dialog') for k, v in m.iteritems(): goal = SpeechGoal(key=k) gen = self.generate_cache_payload(goal) payload = self.fetch_from_cache(k, gen.get('hash')) if payload is None: payload = gen if self.validate_payload(payload): self.add_to_cache(payload) self.generate_raw_speech(payload) else: raise TypeError('Invalid payload send to speech server') rospy.loginfo('Speech cache is ready.') self.warmed_pub.publish()
def __init__(self): """Constructor for the class initialize topic subscription and instance variables """ self.r = rospy.Rate(5) self.publisher = rospy.Publisher('/cmd_vel', Twist, queue_size=10) rospy.Subscriber('/scan', LaserScan, self.process_scan) # user chooses which parking mode to be performed self.is_parallel = rospy.get_param('~parallel', False) # Instance Variables self.timestamp1 = None self.timestamp2 = None self.dist2Neato = None self.dist2Wall = None self.widthOfSpot = None self.twist = None self.radius = None # Adjusment to be made before moving along the arc self.adjustment = 0 self.isAligned = False
def __init__(self, name, blackboard): """ Tree task responsible for spawning new tasks. Once a task has a status of completed of failure this branch of the tree is executed. The timer waits for 5 seconds before sending the final complete signal to the running task and spawning a new one. Args: name(str): Name of this behavior tree task for identification. blackboard(Blackboard): Behavior tree blackboard that stores global vars. """ super(Spawner, self).__init__(name, blackboard) self.timer = None self.spawning = False self.spawn_delay = rospy.get_param("~spawn-delay", 0.0)
def __init__(self, frame, transform_module=None): rospy.loginfo("Zarj eyes initialization begin") self.bridge = CvBridge() if transform_module is not None: self.tf = transform_module else: self.tf = tfzarj.TfZarj(rospy.get_param('/ihmc_ros/robot_name')) self.tf.init_transforms() self.detection_requests = [] self.frame_id = frame self.image_sub_left = None self.image_sub_cloud = None rospy.loginfo("Zarj eyes initialization finished")
def lookupFeet(name, tfBuffer): global lf_start_position global lf_start_orientation global rf_start_position global rf_start_orientation lfp = "/ihmc_ros/{0}/left_foot_frame_name".format(name) rfp = "/ihmc_ros/{0}/right_foot_frame_name".format(name) if rospy.has_param(rfp) and rospy.has_param(lfp): lfname = rospy.get_param(lfp) rfname = rospy.get_param(rfp) leftFootWorld = tfBuffer.lookup_transform('world', lfname, rospy.Time()) lf_start_orientation = leftFootWorld.transform.rotation lf_start_position = leftFootWorld.transform.translation rightFootWorld = tfBuffer.lookup_transform('world', rfname, rospy.Time()) rf_start_orientation = rightFootWorld.transform.rotation rf_start_position = rightFootWorld.transform.translation
def wait(self): """ Waits for and verifies trajectory execution result """ #create a timeout for our trajectory execution #total time trajectory expected for trajectory execution plus a buffer last_time = self._r_goal.trajectory.points[-1].time_from_start.to_sec() time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5 timeout = rospy.Duration(self._slow_move_offset + last_time + time_buffer) l_finish = self._left_client.wait_for_result(timeout) r_finish = self._right_client.wait_for_result(timeout) l_result = (self._left_client.get_result().error_code == 0) r_result = (self._right_client.get_result().error_code == 0) #verify result if all([l_finish, r_finish, l_result, r_result]): return True else: msg = ("Trajectory action failed or did not finish before " "timeout/interrupt.") rospy.logwarn(msg) return False
def __init__(self): token = rospy.get_param('/telegram/token', None) if token is None: rospy.logerr("No token found in /telegram/token param server.") exit(0) else: rospy.loginfo("Got telegram bot token from param server.") # Set CvBridge self.bridge = CvBridge() # Create the EventHandler and pass it your bot's token. updater = Updater(token) # Get the dispatcher to register handlers dp = updater.dispatcher # on noncommand i.e message - echo the message on Telegram dp.add_handler(MessageHandler(Filters.text, self.pub_received)) # log all errors dp.add_error_handler(self.error) # Start the Bot updater.start_polling()
def __init__(self): self.base_pub = rospy.Publisher("/base_controller/command", Twist, queue_size=1) token = rospy.get_param('/telegram/token', None) # Create the Updater and pass it your bot's token. updater = Updater(token) # Add command and error handlers updater.dispatcher.add_handler(CommandHandler('start', self.start)) updater.dispatcher.add_handler(CommandHandler('help', self.help)) updater.dispatcher.add_handler(MessageHandler(Filters.text, self.echo)) updater.dispatcher.add_error_handler(self.error) # Start the Bot updater.start_polling()
def __init__(self): self.rospack = RosPack() with open(join(self.rospack.get_path('apex_playground'), 'config', 'torso.json')) as f: self.params = json.load(f) self.publish_rate = rospy.Rate(self.params['publish_rate']) self.demo = rospy.get_param('demo_mode') # Protected resources self.in_rest_pose = False self.robot_lock = RLock() # Used services self.torso = TorsoServices(self.params['robot_name']) # Proposed services self.reset_srv_name = 'torso/reset' self.reset_srv = None
def __init__(self): self.rospack = RosPack() with open(join(self.rospack.get_path('apex_playground'), 'config', 'general.json')) as f: self.params = json.load(f) with open(join(self.rospack.get_path('apex_playground'), 'config', 'torso.json')) as f: self.torso_params = json.load(f) self.outside_ros = rospy.get_param('/use_sim_time', False) # True if work manager <-> controller comm must use ZMQ id = search(r"(\d+)", rospy.get_namespace()) self.worker_id = 0 if id is None else int(id.groups()[0]) # TODO string worker ID self.work = WorkManager(self.worker_id, self.outside_ros) self.torso = Torso() self.ergo = Ergo() self.learning = Learning() self.perception = Perception() self.recorder = Recorder() self.demonstrate = "" # Skill (Target space for Produce) or empty string if not running assessment # Served services self.service_name_demonstrate = "controller/assess" rospy.Service(self.service_name_demonstrate, Assess, self.cb_assess) rospy.loginfo('Controller fully started!')
def __init__(self): self.rospack = RosPack() self.port = rospy.get_param('ui_port', 80 if os.getuid() == 0 else 5000) self.web_app_root = join(self.rospack.get_path('apex_playground'), 'webapp', 'static') self.app = Flask(__name__, static_url_path='', static_folder=self.web_app_root) self.cors = CORS(self.app, resources={r'/api/*': {'origins': '*'}}) self.services = UserServices() self.window_length = 500 self.display_point_interval = 1 self.app.route('/')(self.root) self.app.route('/api/interests', methods=['GET'])(self.experiment_status) self.app.route('/api/focus', methods=['POST'])(self.update_focus) self.app.route('/api/time-travel', methods=['POST'])(self.time_travel) self.app.route('/api/reset', methods=['POST'])(self.reset) self.app.route('/api/assessment', methods=['POST'])(self.update_assessment)
def __init__(self): self.marker_id = rospy.get_param("~marker_id", 12) self.frame_id = rospy.get_param("~frame_id", "odom_meas") self.camera_frame_id = rospy.get_param("~camera_frame_id", "overhead_cam_frame") self.count = rospy.get_param("~count", 50) # local vars: self.calibrate_flag = False self.calibrated = False self.calibrate_count = 0 self.kb = kbhit.KBHit() self.trans_arr = np.zeros((self.count, 3)) self.quat_arr = np.zeros((self.count, 4)) self.trans_ave = np.zeros(3) self.quat_ave = np.zeros(3) # now create subscribers, timers, and publishers self.br = tf.TransformBroadcaster() self.kb_timer = rospy.Timer(rospy.Duration(0.1), self.keycb) rospy.on_shutdown(self.kb.set_normal_term) self.alvar_sub = rospy.Subscriber("ar_pose_marker", AlvarMarkers, self.alvarcb) return
def _init_params(self): self.port = rospy.get_param('~port', self.default_port) self.robot_type = rospy.get_param('~robot_type', 'create') #self.baudrate = rospy.get_param('~baudrate', self.default_baudrate) self.update_rate = rospy.get_param('~update_rate', self.default_update_rate) self.drive_mode = rospy.get_param('~drive_mode', 'twist') self.has_gyro = rospy.get_param('~has_gyro', True) self.odom_angular_scale_correction = rospy.get_param('~odom_angular_scale_correction', 1.0) self.odom_linear_scale_correction = rospy.get_param('~odom_linear_scale_correction', 1.0) self.cmd_vel_timeout = rospy.Duration(rospy.get_param('~cmd_vel_timeout', 0.6)) self.stop_motors_on_bump = rospy.get_param('~stop_motors_on_bump', True) self.min_abs_yaw_vel = rospy.get_param('~min_abs_yaw_vel', None) self.max_abs_yaw_vel = rospy.get_param('~max_abs_yaw_vel', None) self.publish_tf = rospy.get_param('~publish_tf', False) self.odom_frame = rospy.get_param('~odom_frame', 'odom') self.base_frame = rospy.get_param('~base_frame', 'base_footprint') self.operate_mode = rospy.get_param('~operation_mode', 3) rospy.loginfo("serial port: %s"%(self.port)) rospy.loginfo("update_rate: %s"%(self.update_rate)) rospy.loginfo("drive mode: %s"%(self.drive_mode)) rospy.loginfo("has gyro: %s"%(self.has_gyro))
def __init__(self): self.frame_id = rospy.get_param("~frame_id", "map") cov_x = rospy.get_param("~cov_x", 0.6) cov_y = rospy.get_param("~cov_y", 0.6) cov_z = rospy.get_param("~cov_z", 0.6) self.cov = self.cov_matrix(cov_x, cov_y, cov_z) self.ps_pub = rospy.Publisher( POSE_TOPIC, PoseStamped, queue_size=1) self.ps_cov_pub = rospy.Publisher( POSE_COV_TOPIC, PoseWithCovarianceStamped, queue_size=1) self.ps_pub_3d = rospy.Publisher( POSE_TOPIC_3D, PoseStamped, queue_size=1) self.ps_cov_pub_3d = rospy.Publisher( POSE_COV_TOPIC_3D, PoseWithCovarianceStamped, queue_size=1) self.last = None self.listener = tf.TransformListener() self.tag_range_topics = rospy.get_param("~tag_range_topics") self.subs = list() self.ranges = dict() self.tag_pos = dict() self.altitude = 0.0 self.last_3d = None for topic in self.tag_range_topics: self.subs.append(rospy.Subscriber(topic, Range, self.range_cb))
def __init__(self): # setup CozmoTeleop.settings = termios.tcgetattr(sys.stdin) atexit.register(self.reset_terminal) # vars self.head_angle = STD_HEAD_ANGLE self.lift_height = STD_LIFT_HEIGHT # params self.lin_vel = rospy.get_param('~lin_vel', 0.2) self.ang_vel = rospy.get_param('~ang_vel', 1.5757) # pubs self._head_pub = rospy.Publisher('head_angle', Float64, queue_size=1) self._lift_pub = rospy.Publisher('lift_height', Float64, queue_size=1) self._cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=1)
def __init__(self,dis=0.0): State.__init__(self, outcomes=['succeeded','preempted','aborted'],input_keys=['target']) rospy.on_shutdown(self.shutdown) self.test_distance = target.y-dis self.rate = 200 self.r = rospy.Rate(self.rate) self.speed = rospy.get_param('~speed',0.08) self.tolerance = rospy.get_param('~tolerance', 0.01) self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5) self.base_frame = rospy.get_param('~base_frame', '/base_link') self.odom_frame = rospy.get_param('~odom_frame', '/odom') self.tf_listener = tf.TransformListener() rospy.sleep(1) self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0)) #define a bianliang self.flag = rospy.get_param('~flag', True) rospy.loginfo(self.test_distance) #tf get position #publish cmd_vel
def __init__(self,angle=0): State.__init__(self, outcomes=['succeeded','aborted','preempted'],input_keys=["angle_in"]) self.speed = rospy.get_param('~speed', 0.10) self.secretindigal = 1.0 self.tolerance = rospy.get_param('tolerance', math.radians(5)) self.start = True self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5) self.base_frame = rospy.get_param('~base_frame', '/base_link') self.odom_frame = rospy.get_param('~odom_frame', '/odom') rospy.on_shutdown(self.shutdown) self.rate = 30 self.start_test = True self.r = rospy.Rate(self.rate) self.tf_listener = tf.TransformListener() rospy.sleep(0.5) self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self, angle=0): State.__init__(self, outcomes=['succeeded', 'aborted', 'preempted'], input_keys=["angle_in"]) self.speed = rospy.get_param('~speed', 0.03) self.secretindigal = 1.0 self.tolerance = rospy.get_param('tolerance', math.radians(5)) self.start = True self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5) self.base_frame = rospy.get_param('~base_frame', '/base_link') self.odom_frame = rospy.get_param('~odom_frame', '/odom') rospy.on_shutdown(self.shutdown) self.rate = 30 self.start_test = True self.r = rospy.Rate(self.rate) self.angle=angle self.tf_listener = tf.TransformListener() rospy.sleep(0.5) self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self, angle=0): State.__init__(self, outcomes=['succeeded', 'aborted', 'preempted']) self.speed = rospy.get_param('~speed', 0.03) self.secretindigal = 1.0 self.tolerance = rospy.get_param('tolerance', math.radians(5)) self.start = True self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5) self.base_frame = rospy.get_param('~base_frame', '/base_link') self.odom_frame = rospy.get_param('~odom_frame', '/odom') rospy.on_shutdown(self.shutdown) self.rate = 30 self.start_test = True self.r = rospy.Rate(self.rate) self.angle=angle self.tf_listener = tf.TransformListener() rospy.sleep(0.5) self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self,dis=0): State.__init__(self, outcomes=['succeeded','preempted','aborted']) rospy.on_shutdown(self.shutdown) self.test_distance = dis self.rate = 100 self.r = rospy.Rate(self.rate) self.speed = rospy.get_param('~speed',0.08) self.tolerance = rospy.get_param('~tolerance', 0.01) self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5) self.base_frame = rospy.get_param('~base_frame', '/base_link') self.odom_frame = rospy.get_param('~odom_frame', '/odom') self.tf_listener = tf.TransformListener() rospy.sleep(1) self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0)) self.flag = rospy.get_param('~flag', True) #tf get position self.position = Point() self.position = self.get_position() self.y_start = self.position.y self.x_start = self.position.x #publish cmd_vel
def __init__(self,dis=0.0): State.__init__(self, outcomes=['succeeded','preempted','aborted']) rospy.on_shutdown(self.shutdown) self.test_distance = target.y-0.0 self.rate = 100 self.r = rospy.Rate(self.rate) self.speed = rospy.get_param('~speed',0.08) self.tolerance = rospy.get_param('~tolerance', 0.01) self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5) self.base_frame = rospy.get_param('~base_frame', '/base_link') self.odom_frame = rospy.get_param('~odom_frame', '/odom') self.tf_listener = tf.TransformListener() rospy.sleep(1) self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0)) #define a bianliang self.flag = rospy.get_param('~flag', True) rospy.loginfo(self.test_distance) #tf get position #publish cmd_vel
def __init__(self,angle=0): State.__init__(self, outcomes=['succeeded','aborted','preempted']) self.speed = rospy.get_param('~speed', 0.20) self.secretindigal = 1.0 self.tolerance = rospy.get_param('tolerance', math.radians(5)) self.start = True self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5) self.base_frame = rospy.get_param('~base_frame', '/base_link') self.odom_frame = rospy.get_param('~odom_frame', '/odom') rospy.on_shutdown(self.shutdown) self.rate = 30 self.start_test = True self.r = rospy.Rate(self.rate) self.angle = angle self.tf_listener = tf.TransformListener() rospy.sleep(0.5) self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))