我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rospy.Publisher()。
def __init__(self): self.run_recognition = rospy.get_param('/rgb_object_detection/run_recognition') self.model_filename = rospy.get_param('/rgb_object_detection/model_file') self.weights_filename = rospy.get_param('/rgb_object_detection/weights_file') self.categories_filename = rospy.get_param('/rgb_object_detection/category_file') self.verbose = rospy.get_param('/rgb_object_detection/verbose', False) self.bridge = CvBridge() self.image_sub = rospy.Subscriber('/camera/rgb/image_color', Image, self.img_cb) self.patches_sub = rospy.Subscriber('/candidate_regions_depth', PolygonStamped, self.patches_cb) self.detection_pub = rospy.Publisher('/detections', Detection, queue_size=1) # you can read this value off of your sensor from the '/camera/depth_registered/camera_info' topic self.P = np.array([[525.0, 0.0, 319.5, 0.0], [0.0, 525.0, 239.5, 0.0], [0.0, 0.0, 1.0, 0.0]]) if self.run_recognition: self.cnn = CNN('', self.model_filename, self.weights_filename, self.categories_filename, '', 0, 0, self.verbose) self.cnn.load_model()
def listener(): global odom_pub,left_motor_pub,left_motor_speed_pub,right_motor_pub,right_motor_speed_pub #subscribing to required topics rospy.Subscriber("/cmd_vel", Twist, cmd_velCb) rospy.Subscriber('imu_data',Imu,imuCb) rospy.Subscriber('north',std_msgs.msg.Float32,northCb) odom_pub = rospy.Publisher('/odom', Odometry, queue_size=10) left_motor_pub = rospy.Publisher('/left_motor', std_msgs.msg.Int32, queue_size=10) right_motor_pub = rospy.Publisher('/right_motor', std_msgs.msg.Int32, queue_size=10) #left_motor_speed_pub = rospy.Publisher('/left_motor_speed', std_msgs.msg.Int32, queue_size=10) #right_motor_speed_pub = rospy.Publisher('/right_motor_speed', std_msgs.msg.Int32, queue_size=10) rospy.spin()
def __init__(self): rospy.init_node('multiplexer_node', anonymous=False) rospy.loginfo('\033[93m[%s]\033[0m wait for bringup social_events_memory...'%rospy.get_name()) rospy.wait_for_service('social_events_memory/read_data') self.rd_event_mem = rospy.ServiceProxy('social_events_memory/read_data', ReadData) rospy.Subscriber('forwarding_event', ForwardingEvent, self.handle_social_events) self.pub_rasing_event = rospy.Publisher('raising_events', RaisingEvents, queue_size=10) self.events_queue = Queue.Queue() self.recognized_words_queue = Queue.Queue() event_period = rospy.get_param('~event_period', 0.5) rospy.Timer(rospy.Duration(event_period), self.handle_trigger_events) rospy.loginfo('\033[93m[%s]\033[0m initialized...'%rospy.get_name()) rospy.spin()
def talker(): pub = rospy.Publisher('chatter', String, queue_size=10) rospy.init_node('talker', anonymous=True) rate = rospy.Rate(10) # 10hz while not rospy.is_shutdown(): hello_str = "hello world %s" % rospy.get_time() rospy.loginfo(hello_str) pub.publish(hello_str) rate.sleep() # rate = rospy.Rate(10) # hello_str = "hello world" # rospy.loginfo(hello_str) # pub.publish(hello_str) # rospy.spin() # exit(0)
def __init__(self, group_addr, port): bind_addr = '0.0.0.0' # Create a IPv4/UDP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Avoid error 'Address already in use'. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Construct a membership_request membership_request = socket.inet_aton(group_addr) + socket.inet_aton(bind_addr) # Send add membership request to socket self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, membership_request) # Bind the socket to an interfaces self.sock.bind((bind_addr, port)) # Set non-blocking receiving mode self.sock.setblocking(False) self.publisher = rospy.Publisher('/raw_referee', std_msgs.msg.String, queue_size=10)
def test_publish_to_topics(self): topic_ending = "desired" rospy.logdebug("Sleeping for 5 seconds to let ROS spin up...") rospy.sleep(5) for variable, value in self.variables: # Publish to each variable/desired topic to see if all of the # actuators come on as expected. topic_string = variable + "/" + topic_ending rospy.logdebug("Testing Publishing to " + topic_string) pub_desired = rospy.Publisher(topic_string, Float64, queue_size=10) sub_desired = rospy.Subscriber(topic_string, Float64, self.callback) rospy.sleep(2) print(self.namespace + "/" + topic_string) for _ in range(NUM_TIMES_TO_PUBLISH): pub_desired.publish(value) rospy.sleep(1) rospy.sleep(2) pub_desired.publish(0)
def setup_pubsub(self): freq_params = diagnostic_updater.FrequencyStatusParam({'min':self.diag_update_freq, 'max':self.diag_update_freq}, self.diag_freq_tolerance, self.diag_window_size) time_params = diagnostic_updater.TimeStampStatusParam(self.diag_min_delay, self.diag_max_delay) self.pub_fix = rospy.Publisher("~fix", NavSatFix, queue_size=1000) self.pub_spp_fix = diagnostic_updater.DiagnosedPublisher(rospy.Publisher("~spp_fix", NavSatFix, queue_size=1000), self.diag_updater, freq_params, time_params) self.pub_rtk_fix = diagnostic_updater.DiagnosedPublisher(rospy.Publisher("~rtk_fix", NavSatFix, queue_size=1000), self.diag_updater, freq_params, time_params) #self.pub_rtk = diagnostic_updater.DiagnosedPublisher(rospy.Publisher("~rtk_odom", Odometry, queue_size=1000), self.diag_updater, freq_params, time_params) self.pub_odom = diagnostic_updater.DiagnosedPublisher(rospy.Publisher("~odom", Odometry, queue_size=1000), self.diag_updater, freq_params, time_params) self.pub_time = diagnostic_updater.DiagnosedPublisher(rospy.Publisher("~time", TimeReference, queue_size=1000), self.diag_updater, freq_params, time_params) if self.publish_utm_rtk_tf or self.publish_rtk_child_tf: self.tf_br = tf2_ros.TransformBroadcaster() if self.publish_ephemeris: self.pub_eph = rospy.Publisher("~ephemeris", Ephemeris, queue_size=1000) if self.publish_observations: self.pub_obs = rospy.Publisher('~observations', Observations, queue_size=1000)
def __init__(self): self._read_configuration() if self.show_plots: self._setup_plots() rospy.loginfo("Receiving timestamp messages from {}".format(self.uwb_timestamps_topic)) rospy.loginfo("Publishing multi-range messages to {}".format(self.uwb_multi_range_topic)) rospy.loginfo("Publishing raw multi-range messages to {}".format(self.uwb_multi_range_raw_topic)) rospy.loginfo("Publishing multi-range-with-offsets messages to {}".format( self.uwb_multi_range_with_offsets_topic)) # ROS Publishers self.uwb_pub = rospy.Publisher(self.uwb_multi_range_topic, uwb.msg.UWBMultiRange, queue_size=1) self.uwb_raw_pub = rospy.Publisher(self.uwb_multi_range_raw_topic, uwb.msg.UWBMultiRange, queue_size=1) self.uwb_with_offsets_pub = rospy.Publisher(self.uwb_multi_range_with_offsets_topic, uwb.msg.UWBMultiRangeWithOffsets, queue_size=1) self.uwb_timestamps_sub = rospy.Subscriber(self.uwb_timestamps_topic, uwb.msg.UWBMultiRangeTimestamps, self.handle_timestamps_message) # Variables for rate display self.msg_count = 0 self.last_now = rospy.get_time()
def __init__(self): """Initialize tracker. """ self._read_configuration() self.estimates = {} self.estimate_times = {} self.ikf_prev_outlier_flags = {} self.ikf_outlier_counts = {} self.outlier_thresholds = {} rospy.loginfo("Receiving raw multi-range messages from: {}".format(self.uwb_multi_range_topic)) rospy.loginfo("Publishing tracker messages to {}".format(self.uwb_tracker_topic)) rospy.loginfo("Publishing tracker transform as {} -> {}".format(self.tracker_frame, self.target_frame)) # ROS publishers and subscribers self.tracker_frame = self.tracker_frame self.target_frame = self.target_frame self.uwb_pub = rospy.Publisher(self.uwb_tracker_topic, uwb.msg.UWBTracker, queue_size=1) self.tf_broadcaster = tf.TransformBroadcaster() self.uwb_multi_range_sub = rospy.Subscriber(self.uwb_multi_range_topic, uwb.msg.UWBMultiRangeWithOffsets, self.handle_multi_range_message)
def run(self): self.topic_type = wait_topic_ready(self.topic_name, self.url) #print str(self.topic_type)+" self.topic_type" if not self.topic_type: rospy.logerr('Type of topic %s are not equal in the remote and local sides', self.topic_name) return topic_type_module, topic_type_name = tuple(self.topic_type.split('/')) try: roslib.load_manifest(topic_type_module) msg_module = import_module(topic_type_module + '.msg') self.rostype = getattr(msg_module, topic_type_name) if self.test: self.publisher = rospy.Publisher(self.topic_name + '_rb', self.rostype, queue_size = self.queue_size) else: self.publisher = rospy.Publisher(self.topic_name, self.rostype, queue_size = self.queue_size) self.ws = websocket.WebSocketApp(self.url, on_message = self.on_message, on_error = self.on_error, on_close = self.on_close, on_open = self.on_open) rospy.loginfo('Create connection to Rosbridge server %s for subscribed topic %s successfully', self.url, self.topic_name) self.ws.run_forever() except ResourceNotFound, e: rospy.logerr('Proxy for subscribed topic %s init falied. Reason: Could not find the required resource: %s', self.topic_name, str(e)) except Exception, e: rospy.logerr('Proxy for subscribed topic %s init falied. Reason: %s', self.topic_name, str(e))
def __init__(self): if rospy.has_param('~orientation_offset'): # Orientation offset as quaterion q = [x,y,z,w]. self.orientation_offset = rospy.get_param('~orientation_offset') else: yaw_offset_deg = rospy.get_param('~yaw_offset_deg', 0.0) self.orientation_offset = tf.quaternion_from_euler(0.0, 0.0, math.radians(yaw_offset_deg)) rospy.Subscriber(rospy.get_name() + "/imu_in", Imu, self.imu_callback) self.pub_imu_out = rospy.Publisher(rospy.get_name() + '/imu_out', Imu, queue_size=10) rospy.spin()
def __init__(self): # Read Settings self.read_settings() # Init other variables self._num_magnetometer_reads = 0 self._latest_bearings = np.zeros(shape = (self._number_samples_average, 1)) self._received_enough_samples = False # Subscribe to magnetometer topic rospy.Subscriber("magnetic_field", Vector3Stamped, self.magnetic_field_callback) # Publishers self._pub_bearing_raw = rospy.Publisher(rospy.get_name() + '/bearing_raw_deg', Float64, queue_size = 10) self._pub_bearing_avg = rospy.Publisher(rospy.get_name() + '/bearing_avg_deg', Float64, queue_size = 10) self._pub_imu_bearing_avg = rospy.Publisher(rospy.get_name() + '/imu_bearing_avg', Imu, queue_size = 10) if self._verbose: self._pub_mag_corrected = rospy.Publisher(rospy.get_name() + '/mag_corrected', Vector3Stamped, queue_size = 10) rospy.spin()
def init(self): pygame.init() clock = pygame.time.Clock() screen = pygame.display.set_mode((250, 250)) rospy.init_node('highway_teleop') self.rate = rospy.Rate(rospy.get_param('~hz', 10)) self.acc = rospy.get_param('~acc', 5) self.yaw = rospy.get_param('~yaw', 0) self.robot_pub = rospy.Publisher('robot_0/cmd_vel', Twist, queue_size=1) print "Usage: \n \ up arrow: accelerate \n \ down arrow: decelerate \n \ left arrow: turn left \n \ right arrow: turn right"
def init(self): pygame.init() clock = pygame.time.Clock() screen = pygame.display.set_mode((250, 250)) rospy.init_node('teleop') self.rate = rospy.Rate(rospy.get_param('~hz', 20)) self.acc = rospy.get_param('~acc', 1) self.yaw = rospy.get_param('~yaw', 0.25) self.robot_pub = rospy.Publisher('control_command', controlCommand, queue_size=1) self.path_record_pub = rospy.Publisher('record_state', \ RecordState, queue_size=1) self.highway_game_start_pub = rospy.Publisher('highway_game_start', RecordState, queue_size=1) print "Usage: \n \ up arrow: accelerate \n \ down arrow: decelerate \n \ left arrow: turn left \n \ right arrow: turn right"
def main(args): rospy.init_node("publish_calib_file", args) files = glob.glob("left-*.png"); files.sort() print("All {} files".format(len(files))) image_pub = rospy.Publisher("image",Image, queue_size=10) bridge = CvBridge(); for f in files: if rospy.is_shutdown(): break raw_input("Publish {}?".format(f)) img = cv2.imread(f, 0) image_pub.publish(bridge.cv2_to_imgmsg(img, "mono8"))
def __init__(self): """ 'Wobbles' both arms by commanding joint velocities sinusoidally. """ self._pub_rate = rospy.Publisher('robot/joint_state_publish_rate', UInt16, queue_size=10) self._right_arm = intera_interface.limb.Limb("right") self._right_joint_names = self._right_arm.joint_names() # control parameters self._rate = 500.0 # Hz print("Getting robot state... ") self._rs = intera_interface.RobotEnable(CHECK_VERSION) self._init_state = self._rs.state().enabled print("Enabling robot... ") self._rs.enable() # set joint state publishing to 500Hz self._pub_rate.publish(self._rate)
def __init__(self): """ HomeJoints - Class that publishes on /robot/set_homing_mode to home the robot if it is not already homed. """ self._hcb_lock = threading.Lock() self._ecb_lock = threading.Lock() self._homing_state = dict() self._enable_state = False self._pub_home_joints = rospy.Publisher( '/robot/set_homing_mode', HomingCommand, latch=True, queue_size=10) self._enable_sub = rospy.Subscriber( '/robot/state', AssemblyState, self._enable_state_cb) self._homing_sub = rospy.Subscriber( '/robot/homing_states', HomingState, self._homing_state_cb)
def __init__(self): """ Constructor. """ self._state = dict() self._pub_pan = rospy.Publisher( '/robot/head/command_head_pan', HeadPanCommand, queue_size=10) state_topic = '/robot/head/head_state' self._sub_state = rospy.Subscriber( state_topic, HeadState, self._on_head_state) self._tf_listener = tf.TransformListener() intera_dataflow.wait_for( lambda: len(self._state) != 0, timeout=5.0, timeout_msg=("Failed to get current head state from %s" % (state_topic,)), )
def start(): global left_motor_pub,right_motor_pub # publishing to "turtle1/cmd_vel" to control turtle1 global pub pub = rospy.Publisher('turtle1/cmd_vel', Twist) left_motor_pub = rospy.Publisher('/left_motor', std_msgs.msg.Int32, queue_size=10) right_motor_pub = rospy.Publisher('/right_motor', std_msgs.msg.Int32, queue_size=10) # subscribed to joystick inputs on topic "joy" rospy.Subscriber("joy", Joy, callback) # starts the node rospy.init_node('Joy2Turtle') rospy.spin()
def listener(): global odom_pub,left_motor_pub,left_motor_speed_pub,right_motor_pub,right_motor_speed_pub rospy.Subscriber("/cmd_vel", Twist, callback) rospy.Subscriber('imu_data',Imu,imuCb) rospy.Subscriber('north',std_msgs.msg.Float32,northCb) odom_pub = rospy.Publisher('/odom', Odometry, queue_size=10) left_motor_pub = rospy.Publisher('/left_motor', std_msgs.msg.Int32, queue_size=10) left_motor_speed_pub = rospy.Publisher('/left_motor_speed', std_msgs.msg.Int32, queue_size=10) right_motor_pub = rospy.Publisher('/right_motor', std_msgs.msg.Int32, queue_size=10) right_motor_speed_pub = rospy.Publisher('/right_motor_speed', std_msgs.msg.Int32, queue_size=10) rospy.spin()
def listener(): global odom_pub,left_motor_pub,left_motor_speed_pub,right_motor_pub,right_motor_speed_pub rospy.Subscriber("/cmd_vel", Twist, callback) odom_pub = rospy.Publisher('/odom', Odometry, queue_size=10) left_motor_pub = rospy.Publisher('/left_motor', std_msgs.msg.Int32, queue_size=10) left_motor_speed_pub = rospy.Publisher('/left_motor_speed', std_msgs.msg.Int32, queue_size=10) right_motor_pub = rospy.Publisher('/right_motor', std_msgs.msg.Int32, queue_size=10) right_motor_speed_pub = rospy.Publisher('/right_motor_speed', std_msgs.msg.Int32, queue_size=10) rospy.spin()
def __init__(self, group_addr, port): bind_addr = '0.0.0.0' # Create a IPv4/UDP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Avoid error 'Address already in use'. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Construct a membership_request membership_request = socket.inet_aton(group_addr) + socket.inet_aton(bind_addr) # Send add membership request to socket self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, membership_request) # Bind the socket to an interfaces self.sock.bind((bind_addr, port)) # Set non-blocking receiving mode self.sock.setblocking(False) self.publisher = rospy.Publisher('/raw_vision', std_msgs.msg.String, queue_size=10)
def publisher(self, *upper_args, **kwargs): if not "queue_size" in kwargs: kwargs["queue_size"] = 1 if isinstance(upper_args[0], str): topic_name, msg_type = upper_args def __decorator(func): args = [topic_name, msg_type] pub = rospy.Publisher(*args, **kwargs) def __inner(*args, **kwargs): msg = func(*args, **kwargs) pub.publish(msg) return msg return __inner return __decorator elif isinstance(upper_args[0], types.TypeType): return self.__multi_publisher(upper_args[0], **kwargs)
def robot_listener(self): ''' rospy.wait_for_service('spawn') spawner = rospy.ServiceProxy('spawn', turtlesim.srv.Spawn) spawner(4, 2, 0, 'turtle2') ''' robot_vel_pub = rospy.Publisher('robot_0/cmd_vel', geometry_msgs.msg.Twist, queue_size=1) rate = rospy.Rate(10.0) while not rospy.is_shutdown(): try: (trans,rot) = self.listener.lookupTransform('/robot_3', '/robot_0', rospy.Time(0)) except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException): continue angular = 4 * math.atan2(trans[1], trans[0]) linear = 0.5 * math.sqrt(trans[0] ** 2 + trans[1] ** 2) cmd = geometry_msgs.msg.Twist() cmd.linear.x = linear cmd.angular.z = angular robot_vel_pub.publish(cmd) rate.sleep()
def __init__(self): rospy.init_node('gaze', anonymous=False) self.lock = threading.RLock() with self.lock: self.current_state = GazeState.IDLE self.last_state = self.current_state # Initialize Variables self.glance_timeout = 0 self.glance_timecount = 0 self.glance_played = False self.idle_timeout = 0 self.idle_timecount = 0 self.idle_played = False rospy.loginfo('\033[92m[%s]\033[0m waiting for bringup social_mind...'%rospy.get_name()) rospy.wait_for_service('environmental_memory/read_data') rospy.wait_for_service('social_events_memory/read_data') self.rd_memory = {} self.rd_memory['environmental_memory'] = rospy.ServiceProxy('environmental_memory/read_data', ReadData) self.rd_memory['social_events_memory'] = rospy.ServiceProxy('social_events_memory/read_data', ReadData) rospy.Subscriber('raising_events', RaisingEvents, self.handle_raising_events) rospy.Subscriber('gaze_focusing', String, self.handle_gaze_focusing) self.pub_gaze_cmd = rospy.Publisher('gaze_command', GazeCommand, queue_size=10) self.pub_viz_gaze_cmd = rospy.Publisher('visualization_gaze_cmd', PointStamped, queue_size=10) rospy.Timer(rospy.Duration(GAZE_CONTROLLER_PERIOD), self.handle_gaze_controller) rospy.loginfo('\033[92m[%s]\033[0m initialized...'%rospy.get_name()) rospy.spin()
def __init__(self): self.map = None self.start = None self.goal = None self.moves = [Move(0.1, 0), # forward Move(-0.1, 0), # back Move(0, 1.5708), # turn left 90 Move(0, -1.5708)] # turn right 90 self.robot = Robot(0.5, 0.5) self.is_working = False # Replace with mutex after all self.map_subscriber = rospy.Subscriber("map", OccupancyGrid, self.new_map_callback) self.start_subscriber = rospy.Subscriber("initialpose", PoseWithCovarianceStamped, self.new_start_callback) self.goal_subscriber = rospy.Subscriber("goal", PoseStamped, self.new_goal_callback) self.path_publisher = rospy.Publisher("trajectory", MarkerArray, queue_size=1) self.pose_publisher = rospy.Publisher("debug_pose", PoseStamped, queue_size=1) # what will be there. A module goes into variable. Isn't it too much memory consumption. Maybe I should assign function replan() to this variable? self.planner = planners.astar.replan
def __init__(self, image_path=None): height = int(rospy.get_param("~grid_height", 800)) width = int(rospy.get_param("~grid_width", 800)) resolution = rospy.get_param("~grid_resolution", .25) ogrid_topic = rospy.get_param("/lqrrt_node/ogrid_topic", "/ogrid") self.grid_drawer = DrawGrid(height, width, image_path) self.ogrid_pub = rospy.Publisher(ogrid_topic, OccupancyGrid, queue_size=1) m = MapMetaData() m.resolution = resolution m.width = width m.height = height pos = np.array([-width * resolution / 2, -height * resolution / 2, 0]) quat = np.array([0, 0, 0, 1]) m.origin = Pose() m.origin.position.x, m.origin.position.y = pos[:2] self.map_meta_data = m rospy.Timer(rospy.Duration(1), self.pub_grid)
def connect_topics( src_topic, dest_topic, src_topic_type, dest_topic_type, multiplier=1, deadband=0 ): rospy.loginfo("Connecting topic {} to topic {}".format( src_topic, dest_topic )) pub = rospy.Publisher(dest_topic, dest_topic_type, queue_size=10) def callback(src_item): val = src_item.data val *= multiplier if dest_topic_type == Bool: val = (val > deadband) dest_item = dest_topic_type(val) pub.publish(dest_item) sub = rospy.Subscriber(src_topic, src_topic_type, callback) return sub, pub
def test_valid_variables(self): ENVIRONMENTAL_VARIABLES = frozenset( VariableInfo.from_dict(d) for d in rospy.get_param("/var_types/environment_variables").itervalues()) RECIPE_VARIABLES = frozenset( VariableInfo.from_dict(d) for d in rospy.get_param("/var_types/recipe_variables").itervalues()) VALID_VARIABLES = ENVIRONMENTAL_VARIABLES.union(RECIPE_VARIABLES) print(VALID_VARIABLES) assert len(VALID_VARIABLES) == 19 # This builds a dictionary of publisher instances using a # "dictionary comprehension" (syntactic sugar for building dictionaries). # The constant has to be declared here because get_message_class # needs to be called after the node is initialized. PUBLISHERS = { variable.name: rospy.Publisher( "{}/desired".format(variable.name), get_message_class(variable.type), queue_size=10) for variable in VALID_VARIABLES }
def contr(keynumber): # turtlesim??topic pub = rospy.Publisher('~cmd_vel', Twist, queue_size=5) countnum = 0 if keynumber == 3: while(1): twist = Twist() twist.linear.x = 0.2 twist.linear.y = 0 twist.linear.z = 0 twist.angular.x = 0 twist.angular.y = 0 twist.angular.z = 0.14 pub.publish(twist) countnum += 1 if countnum > 100000: countnum = 0 exit(0)
def __init__(self): NaoqiNode.__init__(self, 'nao_octomap') if self.get_version() < LooseVersion('2.0'): rospy.loginfo('NAOqi version < 2.0, Octomap is not used') exit(0) proxy = self.get_proxy("ALNavigation") if proxy is None: rospy.loginfo('Could not get access to the ALNavigation proxy') exit(1) # Create ROS publisher self.pub = rospy.Publisher("octomap", Octomap, latch = True, queue_size=1) self.fps = 1 rospy.loginfo("nao_octomap initialized")
def __init__(self, node_name='naoqi_camera'): NaoqiNode.__init__(self, node_name) self.camProxy = self.get_proxy("ALVideoDevice") if self.camProxy is None: exit(1) self.nameId = None self.camera_infos = {} def returnNone(): return None self.config = defaultdict(returnNone) # ROS publishers self.pub_img_ = rospy.Publisher('~image_raw', Image, queue_size=5) self.pub_info_ = rospy.Publisher('~camera_info', CameraInfo, queue_size=5) # initialize the parameter server self.srv = Server(NaoqiCameraConfig, self.reconfigure) # initial load from param server self.init_config() # initially load configurations self.reconfigure(self.config, 0)
def __init__(self, memory_service, memory_key, latch=False, prefix='/qi/'): self.memory_service = memory_service try: self._pub = rospy.Publisher(prefix + memory_key, String, latch=latch, queue_size=1) self._sub = self.memory_service.subscriber(memory_key) self._sub.signal.connect(self._on_event) if latch: hist = self.memory_service.getEventHistory(memory_key) if len(hist) > 0: try: self._on_event(hist[-1][0]) except Exception: pass rospy.loginfo('subscribed to %s on Qi' % memory_key) except Exception as e: rospy.logwarn("Cannot set up for %s: %s" % (memory_key, str(e)))
def __init__(self): """Constructor for the class initialize topic subscription and instance variables """ self.r = rospy.Rate(5) self.publisher = rospy.Publisher('/cmd_vel', Twist, queue_size=10) rospy.Subscriber('/scan', LaserScan, self.process_scan) # user chooses which parking mode to be performed self.is_parallel = rospy.get_param('~parallel', False) # Instance Variables self.timestamp1 = None self.timestamp2 = None self.dist2Neato = None self.dist2Wall = None self.widthOfSpot = None self.twist = None self.radius = None # Adjusment to be made before moving along the arc self.adjustment = 0 self.isAligned = False
def setUp(self): self.client = UIClient() self.node = UINode() self.node.start() self.msg = None ''' self.instruct_pub = rospy.Publisher( nb_channels.Messages.instruct.value, String, queue_size=10 ) ''' self.subscribe() rospy.sleep(0.1)
def __init__(self): rospy.loginfo('[robot_move_pkg]->linear_move is initial') #??????????????? self.robot_state = robot_state.robot_position_state() self.cmd_move_pub = rospy.Publisher('/cmd_move', g_msgs.Twist, queue_size = 100) self.rate = rospy.Rate(150) #??????????? self.stop_tolerance = config.high_speed_stop_tolerance self.angular_tolerance = config.high_turn_angular_stop_tolerance #????????????? self.accurate_turn_an_angular = turn_an_angular.turn_an_angular() self.x_speed = 0.0 self.y_speed = 0.0 self.w_speed = config.high_turn_angular_speed #??????? self.linear_sp = spline_func.growth_curve() self.amend_speed = 0.12
def __init__(self): #?????????? m/s self.move_cmd_pub = rospy.Publisher('cmd_move_robot',g_msgs.Twist,queue_size=100) self.move_speed = config.go_along_circle_speed self.get_position = robot_state.robot_position_state() #????????? rad self.stop_tolerance = config.go_along_circle_angular_tolerance #???????? rospy.on_shutdown(self.brake) # ??sleep ??? ??????????? self.rate = 100.0 self.R = rospy.Rate(int(self.rate)) self.MODE = { 1:(-1, 1), 2:( 1,-1), 3:( 1, 1), 4:(-1,-1)}
def __init__(self): rospy.loginfo('[robot_move_pkg]->linear_move is initial') #??????????????? self.robot_state = robot_state.robot_position_state() self.cmd_move_pub = rospy.Publisher('/cmd_move', g_msgs.Twist, queue_size = 100) self.rate = rospy.Rate(150) #??????????? self.stop_tolerance = config.high_speed_stop_tolerance self.angular_tolerance = config.high_turn_angular_stop_tolerance #????????????? self.accurate_turn_an_angular = turn_an_angular.turn_an_angular() self.x_speed = 0.0 self.y_speed = 0.0 self.w_speed = config.high_turn_angular_speed #??????? self.linear_sp = spline_func.growth_curve() self.amend_speed = 0.18
def __init__(self): #init code rospy.init_node("robotGame") self.currentDist = 1 self.previousDist = 1 self.reached = False self.tf = TransformListener() self.left_joint_names = ['yumi_joint_1_l', 'yumi_joint_2_l', 'yumi_joint_3_l', 'yumi_joint_4_l', 'yumi_joint_5_l', 'yumi_joint_6_l', 'yumi_joint_7_l', 'gripper_l_joint'] self.right_joint_names = ['yumi_joint_1_r', 'yumi_joint_2_r', 'yumi_joint_3_r', 'yumi_joint_4_r', 'yumi_joint_5_r', 'yumi_joint_6_r', 'yumi_joint_7_r', 'gripper_r_joint'] self.left_positions = [-2.01081820427463881, 1.4283937236421274, -1.3593418228836045, -0.19315625641494183, 1.7016501799872579, 0.6573540231496411, 3.404315594906305, 0.0] self.right_positions = [0.01081820427463881, 2.4283937236421274, 0.3593418228836045, -0.19315625641494183, 1.7016501799872579, 0.6573540231496411, 3.404315594906305, 1.8145107750466565] self.rjv = [] self.ljv = [] self.pub = rospy.Publisher('my_joint_states', JointState, queue_size=1) self.js = JointState() self.js.header = Header() self.js.name = self.left_joint_names + self.right_joint_names self.js.velocity = [] self.js.effort = [] self.sub = rospy.Subscriber('/joint_states', JointState, self.jsCB) self.destPos = np.random.uniform(0,0.25, size =(3)) self.reset()
def register_extractor(self, ExtractorClass): """ :type ExtractorClass: AbstractExtractor """ name = ExtractorClass.__name__ extractor = self.extractors.pop(name, None) if extractor: extractor.finish() publisher = rospy.Publisher("/aide/rdf", RdfGraphStamped, queue_size=ExtractorClass.queue_size) extractor = ExtractorClass(publisher=publisher) self.extractors[name] = extractor loginfo(self.extractors)
def updatePoseTopic(self, next_index, wait=True): planning_group = self.planning_groups_keys[self.current_planning_group_index] topics = self.planning_groups[planning_group] if next_index >= len(topics): self.current_eef_index = 0 elif next_index < 0: self.current_eef_index = len(topics) - 1 else: self.current_eef_index = next_index next_topic = topics[self.current_eef_index] rospy.loginfo("Changed controlled end effector to " + self.planning_groups_tips[planning_group][self.current_eef_index]) self.pose_pub = rospy.Publisher(next_topic, PoseStamped, queue_size=5) if wait: self.waitForInitialPose(next_topic) self.current_pose_topic = next_topic
def pub_sub_local_legacy(self): self.pub["motor"] = rospy.Publisher("/motor", Float32MultiArray) # learning signals # FIXME: change these names to /learner/... self.pub["learn_dw"] = rospy.Publisher("/learner/dw", Float32MultiArray) self.pub["learn_w"] = rospy.Publisher("/learner/w", Float32MultiArray) self.pub["learn_perf"] = rospy.Publisher("/learner/perf", reservoir) self.pub["learn_perf_lp"] = rospy.Publisher("/learner/perf_lp", reservoir) # learning control self.sub["ctrl_eta"] = rospy.Subscriber("/learner/ctrl/eta", Float32, self.sub_cb_ctrl) self.sub["ctrl_theta"] = rospy.Subscriber("/learner/ctrl/theta", Float32, self.sub_cb_ctrl) self.sub["ctrl_target"] = rospy.Subscriber("/learner/ctrl/target", Float32, self.sub_cb_ctrl) # state self.pub["learn_x_raw"] = rospy.Publisher("/learner/x_raw", reservoir) self.pub["learn_x"] = rospy.Publisher("/learner/x", reservoir) self.pub["learn_r"] = rospy.Publisher("/learner/r", reservoir) self.pub["learn_y"] = rospy.Publisher("/learner/y", reservoir)
def execute(): # define publisher and its topic pub = rospy.Publisher('write_angles',Angles,queue_size = 10) rospy.init_node('write_angles_node',anonymous = True) rate = rospy.Rate(10) # write 4 angles if len(sys.argv) == 5: s1 = int(sys.argv[1]) s2 = int(sys.argv[2]) s3 = int(sys.argv[3]) s4 = int(sys.argv[4]) pub.publish(s1,s2,s3,s4) else: raiseError() rate.sleep() # main function