我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rospy.set_param()。
def to_parameters(self): """ Fetches a calibration from ROS parameters in the namespace of the current node into this object. :rtype: None """ calib_dict = self.to_dict() root_params = ['eye_on_hand', 'tracking_base_frame'] if calib_dict['eye_on_hand']: root_params.append('robot_effector_frame') else: root_params.append('robot_base_frame') for rp in root_params: rospy.set_param(rp, calib_dict[rp]) transf_params = 'x', 'y', 'z', 'qx', 'qy', 'qz', 'qw' for tp in transf_params: rospy.set_param('transformation/'+tp, calib_dict['transformation'][tp])
def _copy_to_parameter_server(self): for parents, param in self._extract_param_tree(self.type.config_description): rospy.set_param(self._get_param_name(parents, param['name']), self.config[self._get_config_name(parents,param['name'])])
def update_dr_param(self, section, name, value): #print 'set %s:%s to %s' % (section, name, value) #print '~dynamic_reconfigure/piksi__%s__%s' % (section, name), value rospy.set_param('~dynamic_reconfigure/piksi__%s__%s' % (section, name), value)
def callback_sbp_settings_read_by_index_resp(self, msg, **metadata): p = msg.setting.split('\0') try: i = self.piksi_settings_info[p[0]][p[1]] p[2] = i['parser'](p[2]) except: pass rospy.set_param('~piksi_original_settings/%s/%s' % (p[0],p[1]), p[2]) self.piksi_settings[p[0]][p[1]] = p[2] self.update_dr_param(p[0], p[1], p[2]) self.settings_index += 1 self.piksi_framer(MsgSettingsReadByIndexReq(index=self.settings_index))
def emit(n, v): print datetime.datetime.now().strftime('%c'), v rospy.set_param(o.param, v)
def set_recipe(self, recipe, start_time): """ Set the currently running recipe... this is the CouchDB recipe document. """ with self.lock: if self.__recipe is not None: raise RecipeRunningError("Recipe is already running") # Set recipe and time self.__recipe = recipe self.__start_time = start_time if self.__start_time is None: self.__start_time = rospy.get_time() rospy.set_param(params.CURRENT_RECIPE, recipe["_id"]) rospy.set_param(params.CURRENT_RECIPE_START, self.__start_time ) return self
def clear_recipe(self): with self.lock: if self.__recipe is None: raise RecipeIdleError("No recipe is running") # Clear recipe and time rospy.set_param(params.CURRENT_RECIPE, "") rospy.set_param(params.CURRENT_RECIPE_START, 0) self.__recipe = None self.__start_time = None return self
def register_services(self): """Register services for instance""" rospy.Service(services.START_RECIPE, StartRecipe, self.start_recipe_service) rospy.Service(services.STOP_RECIPE, Empty, self.stop_recipe_service) rospy.set_param( params.SUPPORTED_RECIPE_FORMATS, ','.join(RECIPE_INTERPRETERS.keys()) ) return self
def set_param(param_name): """ POST /api/<version>/param/<param_name> {"value": "x"} POST to the ROS Parameter Server. Value should be in the value field of the request body. """ if not "value" in request.values: return error("No value supplied") rospy.set_param(param_name, request.values["value"]) return "", 204
def setParam(self, name, value): rospy.set_param(self.prefix + "/" + name, value) self.updateParamsService(0, [name])
def setParam(self, name, value, group = 0): rospy.set_param("/cfgroup" + str(group) + "/" + name, value) self.updateParamsService(group, [name])
def set_param(self, param_name, val): # Set private var value setattr(self, '_{}'.format(param_name), val) if val == None: full_param_name = '/needybot/blackboard/{}'.format(param_name) if rospy.has_param(full_param_name): rospy.delete_param(full_param_name) self.publish("delete_{}".format(param_name)) else: rospy.set_param('/needybot/blackboard/{}'.format(param_name), val) self.publish("set_{}".format(param_name))
def priority_task(self, val): self.set_param('priority_task', val)
def sequence_tasks(self, val): self.set_param('sequence_tasks', val)
def simulate(self, val): self.set_param('simulate', val)
def step_success(self, val): self.set_param('step_success', val)
def readCurrentCoords(): cc = uarm.get_position() print 'Current location is x: %2.2fcm, y: %2.2fcm, z: %2.2fcm.' %(float(cc[0]), float(cc[1]), float(cc[2])) rospy.set_param('current_x', cc[0]) rospy.set_param('current_y', float(cc[1])) rospy.set_param('current_z', float(cc[2])) # Read current Angles function
def readCurrentAngles(): ra = {} ra['s1'] = uarm.get_servo_angle(0) ra['s2'] = uarm.get_servo_angle(1) ra['s3'] = uarm.get_servo_angle(2) ra['s4'] = uarm.get_servo_angle(3) print 'Four Servo Angles: %2.2f, %2.2f, %2.2f and %2.2f degrees.' %(ra['s1'], ra['s2'],ra['s3'],ra['s4']) rospy.set_param('servo_1',ra['s1']) rospy.set_param('servo_2',ra['s2']) rospy.set_param('servo_3',ra['s3']) rospy.set_param('servo_4',ra['s4']) # Read stopper function
def readStopperStatus(): val = uarm.get_tip_sensor() if val == True: print 'Stopper is actived' rospy.set_param('stopper_status','HIGH') else: print 'Stopper is not actived' rospy.set_param('stopper_status','LOW') # Write single angle
def __init__(self, outside_ros=False): self.rospack = RosPack() self.num_workers = 0 self.outside_ros = rospy.get_param('/use_sim_time', outside_ros) # True if work manager <-> controller comm must use ZMQ self.file_experiment = join(self.rospack.get_path('apex_playground'), 'config', 'experiment.json') with open(self.file_experiment) as f: experiment = self.check(json.load(f)) rospy.set_param('/work', experiment) self.lock_experiment = RLock() self.failing_workers = {} # association {num_worker: remaining attempts before blacklist}
def _cb_get_work(self, worker): with self.lock_experiment: experiment = rospy.get_param('/work') disabled_workers = rospy.get_param("/experiment/disabled_workers", []) if worker not in disabled_workers: for task in range(len(experiment)): for trial in range(experiment[task]['num_trials']): status = experiment[task]['progress'][trial]['status'] resuming = status =='taken' and experiment[task]['progress'][trial]['worker'] == worker # If this work is open or taken and the same worker is requesting some work, distribute it! if status == 'open' or resuming: if self.is_completed(task, trial, experiment): experiment[task]['progress'][trial]['status'] = 'complete' self.num_workers -= 1 rospy.set_param('/work', experiment) else: # This task needs work, distribute it to the worker experiment[task]['progress'][trial]['status'] = 'taken' experiment[task]['progress'][trial]['worker'] = worker rospy.set_param('/work', experiment) self.num_workers += 1 if resuming: rospy.logwarn("Resuming worker {} {} from iteration {}/{}".format(worker, experiment[task]['method'], experiment[task]['progress'][trial]['iteration'], experiment[task]['num_iterations'])) else: rospy.logwarn("Distributing {} iterations {} trial {} to worker {}".format(experiment[task]['num_iterations'], experiment[task]['method'], trial, worker)) return dict(method=experiment[task]['method'], iteration=experiment[task]['progress'][trial]['iteration'], num_iterations=experiment[task]['num_iterations'], task=task, trial=trial, work_available=True) else: pass #rospy.logwarn("Worker {} requested work but it is blacklisted".format(worker)) return dict(work_available=False)
def _cb_add_work(self, request): with self.lock_experiment: experiment = rospy.get_param('/work') task = { "num_iterations": request.num_iterations, "num_trials": request.num_trials, "method": request.method } experiment.append(task) rospy.set_param('/work', self.check(experiment)) return AddWorkResponse(task=len(experiment)-1)
def load_params_from_yaml(fp): from yaml import load with open(fp, 'r') as infile: yaml_data = load(infile) for param in yaml_data: print "param:", param, ":", yaml_data[param] rospy.set_param("~"+param, yaml_data[param]) # this function can be used to generate flame graphs easily
def reset(self, condition=0, bottom_face=None, start_motor_positions=None): if bottom_face is None: bottom_face = 0 rospy.set_param('/bottom_face', bottom_face + 1) self._ctrl_pub.publish(String('reset')) if start_motor_positions is None: self._set_motor_positions(np.ones(12) * 0.95) else: self._set_motor_positions(np.array(start_motor_positions)) time.sleep(0.5) for _ in range(30): self._advance_simulation()
def reset(self, bottom_face=0): rospy.set_param('/bottom_face', bottom_face + 1) self._ctrl_pub.publish(String('reset'))
def define(self): self.say=rospy.Publisher('speak_string', String, queue_size=1) if not rospy.has_param('~words'): rospy.set_param('~words','???????') if not rospy.has_param('~SpeakerSubTopic'): rospy.set_param('~SpeakerSubTopic', 'stop_flag') self.words=rospy.get_param('~words') self.topic=rospy.get_param('~SpeakerSubTopic')
def engine_setting(self, name = 'baidu') rospy.loginfo('currently just support baidu and google') rospy.set_param('~engine_name', name) #speaker_settings
def Gender_setting(self, data): rospy.loginfo('please input man/women') if data in self.gender_lib: rospy.set_param('~Gender', data)
def language_setting(self, data): rospy.loginfo('please input zh (Chinese), ct (Cantonese), en ?English?') if data in self.language_lib: rospy.set_param('~LAN', data)
def speed_setting(self, data): rospy.loginfo('speak speed range is 0 ~ 9') if data in self.level: rospy.set_param('~SPEED', data) else: rospy.loginfo('currently not support this speak speed level')
def volume_setting(self, data): rospy.loginfo('intonation range is 0 ~ 9') if data in self.level: rospy.set_param('~VOL', data) else: rospy.loginfo('currently not support this intonation level')
def format_setting(self, data): rospy.set_param('~FORMAT', data)
def response_sensitivity(self, data): rospy.set_param('~ResponseSensitivity', data)
def workspace_setting(self, data): rospy.set_param('~WorkSpaces', data) #engine setting
def CTP_setting(self, data): if data == 1: rospy.set_param('~CTP', data) else: rospy.loginfo('currently not support this customer terminal type')
def Api_Key_setting(self, data): rospy.set_param('~Api_Key', data)
def Secrect_Key_setting(self, data): rospy.set_param('~Secrect_Key', data)
def Grant_type_setting(self, data): rospy.set_param('~Grant_type', data)
def Token_url_setting(self, data): rospy.set_param('~Token_url', data)
def Speeker_url_setting(self, data): rospy.set_param('~Speeker_url', data)
def sample_rate_setting(self, data): rospy.loginfo('please input sampling rate') if data in self.sample_rate_range: rospy.set_param('~REG_SAMPLING_RATE', data) else: rospy.loginfo('data out of range')
def upper_level_setting(self, data): rospy.loginfo('please input upper level of sampling') rospy.set_param('~REG_UPPER_LEVEL', data)
def lower_level_setting(self, data): rospy.loginfo('please input lower level of sampling') rospy.set_param('~REG_LOWER_LEVEL', data)
def count_number_setting(self, data): rospy.loginfo('please input how much number of data higher than lower level counted before recording') rospy.set_param('~REG_COUNT_NUM:', data)
def save_length_setting(self, data): rospy.loginfo('please input recording uni length of voice data') rospy.set_param('~REG_SAVE_LENGTH:', data)
def no_words_setting(self, data): rospy.loginfo('please input the period of stop recording if there is no words coming') rospy.set_param('~REG_NO_WORDS', data)
def Api_Key_setting(self, data): rospy.loginfo('please input a Baidu API KEY') rospy.set_param('~REG_Api_Key', data)
def Secrect_Key_setting(self, data): rospy.loginfo('please input a Baidu Secrect Key') rospy.set_param('~REG_secrect_Key', data)
def Grant_type_setting(self, data): rospy.loginfo('please input Baidu Grant type') rospy.set_param('~REG_Grant_type')
def Token_url_setting(self, data): rospy.loginfo('please input Baidu Token url') rospy.set_param('~REG_Token_url', data)