Python rospy 模块,set_param() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rospy.set_param()

项目:easy_handeye    作者:IFL-CAMP    | 项目源码 | 文件源码
def to_parameters(self):
        """
        Fetches a calibration from ROS parameters in the namespace of the current node into this object.

        :rtype: None
        """
        calib_dict = self.to_dict()

        root_params = ['eye_on_hand', 'tracking_base_frame']
        if calib_dict['eye_on_hand']:
            root_params.append('robot_effector_frame')
        else:
            root_params.append('robot_base_frame')

        for rp in root_params:
            rospy.set_param(rp, calib_dict[rp])

        transf_params = 'x', 'y', 'z', 'qx', 'qy', 'qz', 'qw'

        for tp in transf_params:
            rospy.set_param('transformation/'+tp, calib_dict['transformation'][tp])
项目:piksi_ros    作者:uscresl    | 项目源码 | 文件源码
def _copy_to_parameter_server(self):
        for parents, param in self._extract_param_tree(self.type.config_description):
            rospy.set_param(self._get_param_name(parents, param['name']), self.config[self._get_config_name(parents,param['name'])])
项目:piksi_ros    作者:uscresl    | 项目源码 | 文件源码
def update_dr_param(self, section, name, value):
        #print 'set %s:%s to %s' % (section, name, value)
        #print '~dynamic_reconfigure/piksi__%s__%s' % (section, name), value
        rospy.set_param('~dynamic_reconfigure/piksi__%s__%s' % (section, name), value)
项目:piksi_ros    作者:uscresl    | 项目源码 | 文件源码
def callback_sbp_settings_read_by_index_resp(self, msg, **metadata):
        p = msg.setting.split('\0')
        try:
            i = self.piksi_settings_info[p[0]][p[1]]
            p[2] = i['parser'](p[2])
        except:
            pass

        rospy.set_param('~piksi_original_settings/%s/%s' % (p[0],p[1]), p[2])
        self.piksi_settings[p[0]][p[1]] = p[2]
        self.update_dr_param(p[0], p[1], p[2])


        self.settings_index += 1
        self.piksi_framer(MsgSettingsReadByIndexReq(index=self.settings_index))
项目:drivebot    作者:matpalm    | 项目源码 | 文件源码
def emit(n, v):
    print datetime.datetime.now().strftime('%c'), v
    rospy.set_param(o.param, v)
项目:openag_brain    作者:OpenAgInitiative    | 项目源码 | 文件源码
def set_recipe(self, recipe, start_time):
        """
        Set the currently running recipe... this is the CouchDB recipe document.
        """
        with self.lock:
            if self.__recipe is not None:
                raise RecipeRunningError("Recipe is already running")
            # Set recipe and time
            self.__recipe = recipe
            self.__start_time = start_time
            if self.__start_time is None:
                self.__start_time = rospy.get_time()
            rospy.set_param(params.CURRENT_RECIPE, recipe["_id"])
            rospy.set_param(params.CURRENT_RECIPE_START, self.__start_time  )
        return self
项目:openag_brain    作者:OpenAgInitiative    | 项目源码 | 文件源码
def clear_recipe(self):
        with self.lock:
            if self.__recipe is None:
                raise RecipeIdleError("No recipe is running")
            # Clear recipe and time
            rospy.set_param(params.CURRENT_RECIPE, "")
            rospy.set_param(params.CURRENT_RECIPE_START, 0)
            self.__recipe = None
            self.__start_time = None
        return self
项目:openag_brain    作者:OpenAgInitiative    | 项目源码 | 文件源码
def register_services(self):
        """Register services for instance"""
        rospy.Service(services.START_RECIPE, StartRecipe,
            self.start_recipe_service)
        rospy.Service(services.STOP_RECIPE, Empty, self.stop_recipe_service)
        rospy.set_param(
            params.SUPPORTED_RECIPE_FORMATS,
            ','.join(RECIPE_INTERPRETERS.keys())
        )
        return self
项目:openag_brain    作者:OpenAgInitiative    | 项目源码 | 文件源码
def set_param(param_name):
    """
    POST /api/<version>/param/<param_name> {"value": "x"}

    POST to the ROS Parameter Server. Value should be in the value field
    of the request body.
    """
    if not "value" in request.values:
        return error("No value supplied")
    rospy.set_param(param_name, request.values["value"])
    return "", 204
项目:crazyswarm    作者:USC-ACTLab    | 项目源码 | 文件源码
def setParam(self, name, value):
        rospy.set_param(self.prefix + "/" + name, value)
        self.updateParamsService(0, [name])
项目:crazyswarm    作者:USC-ACTLab    | 项目源码 | 文件源码
def setParam(self, name, value, group = 0):
        rospy.set_param("/cfgroup" + str(group) + "/" + name, value)
        self.updateParamsService(group, [name])
项目:needybot-core    作者:needybot    | 项目源码 | 文件源码
def set_param(self, param_name, val):
        # Set private var value
        setattr(self, '_{}'.format(param_name), val)
        if val == None:
            full_param_name = '/needybot/blackboard/{}'.format(param_name)
            if rospy.has_param(full_param_name):
                rospy.delete_param(full_param_name)
            self.publish("delete_{}".format(param_name))
        else:
            rospy.set_param('/needybot/blackboard/{}'.format(param_name), val)
            self.publish("set_{}".format(param_name))
项目:needybot-core    作者:needybot    | 项目源码 | 文件源码
def priority_task(self, val):
        self.set_param('priority_task', val)
项目:needybot-core    作者:needybot    | 项目源码 | 文件源码
def sequence_tasks(self, val):
        self.set_param('sequence_tasks', val)
项目:needybot-core    作者:needybot    | 项目源码 | 文件源码
def simulate(self, val):
        self.set_param('simulate', val)
项目:needybot-core    作者:needybot    | 项目源码 | 文件源码
def step_success(self, val):
        self.set_param('step_success', val)
项目:UArmForROS    作者:uArm-Developer    | 项目源码 | 文件源码
def readCurrentCoords():
    cc = uarm.get_position()
    print 'Current location is x: %2.2fcm, y: %2.2fcm, z: %2.2fcm.' %(float(cc[0]), float(cc[1]), float(cc[2]))

    rospy.set_param('current_x', cc[0])
    rospy.set_param('current_y', float(cc[1]))
    rospy.set_param('current_z', float(cc[2]))

# Read current Angles function
项目:UArmForROS    作者:uArm-Developer    | 项目源码 | 文件源码
def readCurrentAngles():
    ra = {}
    ra['s1'] = uarm.get_servo_angle(0)
    ra['s2'] = uarm.get_servo_angle(1)
    ra['s3'] = uarm.get_servo_angle(2)
    ra['s4'] = uarm.get_servo_angle(3)

    print 'Four Servo Angles: %2.2f, %2.2f, %2.2f and %2.2f degrees.' %(ra['s1'], ra['s2'],ra['s3'],ra['s4'])

    rospy.set_param('servo_1',ra['s1'])
    rospy.set_param('servo_2',ra['s2'])
    rospy.set_param('servo_3',ra['s3'])
    rospy.set_param('servo_4',ra['s4'])

# Read stopper function
项目:UArmForROS    作者:uArm-Developer    | 项目源码 | 文件源码
def readStopperStatus():
    val = uarm.get_tip_sensor()
    if val == True:
        print 'Stopper is actived'
        rospy.set_param('stopper_status','HIGH')
    else:
        print 'Stopper is not actived'
        rospy.set_param('stopper_status','LOW')


# Write single angle
项目:APEX    作者:ymollard    | 项目源码 | 文件源码
def __init__(self, outside_ros=False):
        self.rospack = RosPack()
        self.num_workers = 0
        self.outside_ros = rospy.get_param('/use_sim_time', outside_ros)  # True if work manager <-> controller comm must use ZMQ
        self.file_experiment = join(self.rospack.get_path('apex_playground'), 'config', 'experiment.json')

        with open(self.file_experiment) as f:
            experiment = self.check(json.load(f))
        rospy.set_param('/work', experiment)
        self.lock_experiment = RLock()
        self.failing_workers = {}  # association {num_worker: remaining attempts before blacklist}
项目:APEX    作者:ymollard    | 项目源码 | 文件源码
def _cb_get_work(self, worker):
        with self.lock_experiment:
            experiment = rospy.get_param('/work')
            disabled_workers = rospy.get_param("/experiment/disabled_workers", [])
            if worker not in disabled_workers:
                for task in range(len(experiment)):
                    for trial in range(experiment[task]['num_trials']):
                        status = experiment[task]['progress'][trial]['status']
                        resuming = status =='taken' and experiment[task]['progress'][trial]['worker'] == worker
                        # If this work is open or taken and the same worker is requesting some work, distribute it!
                        if status == 'open' or resuming:
                            if self.is_completed(task, trial, experiment):
                                experiment[task]['progress'][trial]['status'] = 'complete'
                                self.num_workers -= 1
                                rospy.set_param('/work', experiment)
                            else:
                                # This task needs work, distribute it to the worker
                                experiment[task]['progress'][trial]['status'] = 'taken'
                                experiment[task]['progress'][trial]['worker'] = worker
                                rospy.set_param('/work', experiment)
                                self.num_workers += 1
                                if resuming:
                                    rospy.logwarn("Resuming worker {} {} from iteration {}/{}".format(worker, experiment[task]['method'],
                                                                                                   experiment[task]['progress'][trial]['iteration'],
                                                                                                   experiment[task]['num_iterations']))
                                else:
                                    rospy.logwarn("Distributing {} iterations {} trial {} to worker {}".format(experiment[task]['num_iterations'], experiment[task]['method'], trial, worker))
                                return dict(method=experiment[task]['method'],
                                            iteration=experiment[task]['progress'][trial]['iteration'],
                                            num_iterations=experiment[task]['num_iterations'],
                                            task=task, trial=trial, work_available=True)
            else:
                pass
                #rospy.logwarn("Worker {} requested work but it is blacklisted".format(worker))
        return dict(work_available=False)
项目:APEX    作者:ymollard    | 项目源码 | 文件源码
def _cb_add_work(self, request):
        with self.lock_experiment:
            experiment = rospy.get_param('/work')
            task = {
                    "num_iterations": request.num_iterations,
                    "num_trials": request.num_trials,
                    "method": request.method
                }
            experiment.append(task)
            rospy.set_param('/work', self.check(experiment))
            return AddWorkResponse(task=len(experiment)-1)
项目:particle_filter    作者:mit-racecar    | 项目源码 | 文件源码
def load_params_from_yaml(fp):
    from yaml import load
    with open(fp, 'r') as infile:
        yaml_data = load(infile)
        for param in yaml_data:
            print "param:", param, ":", yaml_data[param]
            rospy.set_param("~"+param, yaml_data[param])

# this function can be used to generate flame graphs easily
项目:gps_superball_public    作者:young-geng    | 项目源码 | 文件源码
def reset(self, condition=0, bottom_face=None, start_motor_positions=None):
        if bottom_face is None:
            bottom_face = 0
        rospy.set_param('/bottom_face', bottom_face + 1)
        self._ctrl_pub.publish(String('reset'))
        if start_motor_positions is None:
            self._set_motor_positions(np.ones(12) * 0.95)
        else:
            self._set_motor_positions(np.array(start_motor_positions))
        time.sleep(0.5)
        for _ in range(30):
            self._advance_simulation()
项目:gps_superball_public    作者:young-geng    | 项目源码 | 文件源码
def reset(self, bottom_face=0):
        rospy.set_param('/bottom_face', bottom_face + 1)
        self._ctrl_pub.publish(String('reset'))
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def define(self):
  self.say=rospy.Publisher('speak_string', String, queue_size=1)
  if not rospy.has_param('~words'):
   rospy.set_param('~words','???????')

  if not rospy.has_param('~SpeakerSubTopic'):
   rospy.set_param('~SpeakerSubTopic', 'stop_flag')

  self.words=rospy.get_param('~words')
  self.topic=rospy.get_param('~SpeakerSubTopic')
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def engine_setting(self, name = 'baidu')
  rospy.loginfo('currently just support baidu and google')
  rospy.set_param('~engine_name', name)

 #speaker_settings
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def Gender_setting(self, data):
  rospy.loginfo('please input man/women')
  if data in self.gender_lib:
   rospy.set_param('~Gender', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def language_setting(self, data):
  rospy.loginfo('please input zh (Chinese), ct (Cantonese), en ?English?')
  if data in self.language_lib:
   rospy.set_param('~LAN', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def speed_setting(self, data):
  rospy.loginfo('speak speed range is 0 ~ 9')
  if data in self.level:
   rospy.set_param('~SPEED', data)
  else:
   rospy.loginfo('currently not support this speak speed level')
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def volume_setting(self, data):
  rospy.loginfo('intonation range is 0 ~ 9')
  if data in self.level:
   rospy.set_param('~VOL', data)
  else:
   rospy.loginfo('currently not support this intonation level')
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def format_setting(self, data):
  rospy.set_param('~FORMAT', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def response_sensitivity(self, data):
  rospy.set_param('~ResponseSensitivity', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def workspace_setting(self, data):
  rospy.set_param('~WorkSpaces', data)

 #engine setting
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def CTP_setting(self, data):
  if data == 1:
   rospy.set_param('~CTP', data)
  else:
   rospy.loginfo('currently not support this customer terminal type')
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def Api_Key_setting(self, data):
  rospy.set_param('~Api_Key', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def Secrect_Key_setting(self, data):
  rospy.set_param('~Secrect_Key', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def Grant_type_setting(self, data):
  rospy.set_param('~Grant_type', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def Token_url_setting(self, data):
  rospy.set_param('~Token_url', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def Speeker_url_setting(self, data):
  rospy.set_param('~Speeker_url', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def sample_rate_setting(self, data):
  rospy.loginfo('please input sampling rate')
  if data in self.sample_rate_range:
   rospy.set_param('~REG_SAMPLING_RATE', data)
  else:
   rospy.loginfo('data out of range')
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def upper_level_setting(self, data):
  rospy.loginfo('please input upper level of sampling')
  rospy.set_param('~REG_UPPER_LEVEL', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def lower_level_setting(self, data):
  rospy.loginfo('please input lower level of sampling')
  rospy.set_param('~REG_LOWER_LEVEL', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def count_number_setting(self, data):
  rospy.loginfo('please input how much number of data higher than lower level counted before recording')
  rospy.set_param('~REG_COUNT_NUM:', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def save_length_setting(self, data):
  rospy.loginfo('please input recording uni length of voice data')
  rospy.set_param('~REG_SAVE_LENGTH:', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def no_words_setting(self, data):
  rospy.loginfo('please input the period of stop recording if there is no words coming')
  rospy.set_param('~REG_NO_WORDS', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def Api_Key_setting(self, data):
  rospy.loginfo('please input a Baidu API KEY')
  rospy.set_param('~REG_Api_Key', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def Secrect_Key_setting(self, data):
  rospy.loginfo('please input a Baidu Secrect Key')
  rospy.set_param('~REG_secrect_Key', data)
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def Grant_type_setting(self, data):
  rospy.loginfo('please input Baidu Grant type')
  rospy.set_param('~REG_Grant_type')
项目:baidu_speech    作者:DinnerHowe    | 项目源码 | 文件源码
def Token_url_setting(self, data):
  rospy.loginfo('please input Baidu Token url')
  rospy.set_param('~REG_Token_url', data)