我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用gym.spaces.MultiDiscrete()。
def __init__(self, maxUmbralAstral): # Print debug self.debug = False # Outer bound for Astral Fire and Umbral Ice BLM.MAXUMBRALASTRAL = maxUmbralAstral # Available buffs self.BUFFS = [] # Maximum time available self.MAXTIME = 45 self.HELPER = BLM.Helper() # Available abilities self.ABILITIES = [ BLM.Ability("Blizzard 1", 180, 6, 2.5, 2.49, self.HELPER.UmbralIceIncrease, BLM.DamageType.Ice, self.HELPER), #480 BLM.Ability("Fire 1", 180, 15, 2.5, 2.49, self.HELPER.AstralFireIncrease, BLM.DamageType.Fire, self.HELPER), #1200 BLM.Ability("Transpose", 0, 0, 0.75, 12.9, self.HELPER.SwapAstralUmbral, BLM.DamageType.Neither, self.HELPER), BLM.Ability("Fire 3", 240, 30, 3.5, 2.5, self.HELPER.AstralFireMax, BLM.DamageType.Fire, self.HELPER), #2400 BLM.Ability("Blizzard 3", 240, 18, 3.5, 2.5, self.HELPER.UmbralIceMax, BLM.DamageType.Ice, self.HELPER), #2400 BLM.Ability("Fire 4", 260, 15, 2.8, 2.5, None, BLM.DamageType.Fire, self.HELPER)] #2400 # State including ability cooldowns, buff time remaining, mana, and Astral/Umbral self.initialState = np.array([0] * (len(self.ABILITIES) + len(self.BUFFS)) + [BLM.MAXMANA] + [0]) self.state = self._reset() # What the learner can pick between self.action_space = spaces.Discrete(len(self.ABILITIES)) # What the learner can see to make a choice (cooldowns and buffs) self.observation_space = spaces.MultiDiscrete([[0,180]] * (len(self.ABILITIES) + len(self.BUFFS)) + [[0, BLM.MAXMANA]] + [[-3,3]])
def is_compound(space): """ Checks whether a space is a compound space. These are non-scalar `Box` spaces, `MultiDiscrete`, `MultiBinary` and `Tuple` spaces (A Tuple space with a single, non-compound subspace is still considered compound). :raises TypeError: If the space is no `gym.Space`. """ assert_space(space) if isinstance(space, spaces.Discrete): return False elif isinstance(space, spaces.Box): return len(space.shape) != 1 or space.shape[0] != 1 elif isinstance(space, (spaces.MultiDiscrete, spaces.MultiBinary)): return True elif isinstance(space, spaces.Tuple): return True raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
def num_discrete_actions(space): """ For a discrete space, gets the number of available actions as a tuple. :param gym.Space space: The discrete space which to inspect. :return tuple: Tuple of integers containing the number of discrete actions. :raises TypeError: If the space is no `gym.Space`. """ assert_space(space) if not is_discrete(space): raise TypeError("Space {} is not discrete".format(space)) if isinstance(space, spaces.Discrete): return tuple((space.n,)) elif isinstance(space, spaces.MultiDiscrete): # add +1 here as space.high is an inclusive bound return tuple(space.high - space.low + 1) elif isinstance(space, spaces.MultiBinary): return (2,) * space.n raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space))) # pragma: no cover
def __init__(self, env): if isinstance(env, six.string_types): env = gym.make(env) super(GymWrapper, self).__init__(env) if isinstance(env.action_space, spaces.MultiDiscrete): raise ValueError("Gym environments with MultiDiscrete spaces aren't supported yet.") self.observation_space = _to_rf_space(self.env.observation_space) self.action_space = _to_rf_space(self.env.action_space) self._obs_to_rf = _make_gym2rf_converter(self.observation_space) self._action_to_rf = _make_gym2rf_converter(self.action_space) self._action_to_gym = _make_rf2gym_converter(self.action_space) seed = reinforceflow.get_random_seed() if seed and hasattr(self.env, 'seed'): self.env.seed(seed)
def _make_rf2gym_converter(space): """Makes space converter function that maps space samples ReinforceFlow -> Gym.""" # TODO: add spaces.MultiDiscrete support. if isinstance(space, spaces.Discrete): def converter(sample): return np.argmax(sample) return converter if isinstance(space, spaces.MultiBinary): def converter(sample): return tuple([np.argmax(s) for s in sample]) return converter if isinstance(space, spaces.Box): return lambda sample: sample if isinstance(space, spaces.Tuple): sub_converters = [] for sub_space in space.spaces: sub_converters.append(_make_rf2gym_converter(sub_space)) def converter(sample): converted_tuple = [] for sub_sample, sub_converter in zip(sample, sub_converters): converted_tuple.append(sub_converter(sub_sample)) return tuple(converted_tuple) return converter raise ValueError("Unsupported space %s." % space)
def _take_action(self, actions): # if there is only one action space, it wasn't wrapped in Tuple if len(self.action_spaces) == 1: actions = [actions] # send appropriate command for different actions for spc, cmds, acts in zip(self.action_spaces, self.action_names, actions): if isinstance(spc, spaces.Discrete): logger.debug(cmds[acts]) self.agent_host.sendCommand(cmds[acts]) elif isinstance(spc, spaces.Box): for cmd, val in zip(cmds, acts): logger.debug(cmd + " " + str(val)) self.agent_host.sendCommand(cmd + " " + str(val)) elif isinstance(spc, spaces.MultiDiscrete): for cmd, val in zip(cmds, acts): logger.debug(cmd + " " + str(val)) self.agent_host.sendCommand(cmd + " " + str(val)) else: logger.warn("Unknown action space for %s, ignoring." % cmds)
def __init__(self, name, horizon, gamma): """ Constructor. Args: name (str): gym id of the environment; horizon (int): horizon of the MDP; horizon (int): the horizon; gamma (float): the discount factor. """ self.__name__ = name # MPD creation self.env = gym.make(self.__name__) self.env._max_episode_steps = np.inf # Hack to ignore gym time limit. # MDP properties assert not isinstance(self.env.observation_space, gym_spaces.MultiDiscrete) assert not isinstance(self.env.action_space, gym_spaces.MultiDiscrete) action_space = self._convert_gym_space(self.env.action_space) observation_space = self._convert_gym_space(self.env.observation_space) mdp_info = MDPInfo(observation_space, action_space, gamma, horizon) if isinstance(action_space, Discrete): self._convert_action = self._convert_action_function else: self._convert_action = self._no_convert if isinstance(observation_space, Discrete) and len(observation_space.size) > 1: self._convert_state = self._convert_state_function else: self._convert_state = self._no_convert super(Gym, self).__init__(mdp_info)
def test_flattened_wrapper(): expect = gym.make("ProvideTest-v0") md = spaces.MultiDiscrete([(0, 1), (0, 1)]) expect.observation_space = md expect.provide_observation = (1, 1) wrapper = FlattenedObservationWrapper(expect) o, r, d, i = wrapper.step(3) assert wrapper.observation_space.contains(o) assert o == 3
def __init__(self): utils.EzPickle.__init__(self) self.rom_path = '' self.screen_height = 224 self.screen_width = 256 self.action_space = spaces.MultiDiscrete([[0, 1]] * NUM_ACTIONS) self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3)) self.launch_vars = {} self.cmd_args = ['--xscale 2', '--yscale 2', '-f 0'] self.lua_path = [] self.subprocess = None self.no_render = True self.viewer = None # Pipes self.pipe_name = '' self.path_pipe_prefix = os.path.join(tempfile.gettempdir(), 'smb-fifo') self.path_pipe_in = '' # Input pipe (maps to fceux out-pipe and to 'in' file) self.path_pipe_out = '' # Output pipe (maps to fceux in-pipe and to 'out' file) self.pipe_out = None self.lock_out = Lock() self.disable_in_pipe = False self.disable_out_pipe = False self.launch_vars['pipe_name'] = '' self.launch_vars['pipe_prefix'] = self.path_pipe_prefix # Other vars self.is_initialized = 0 # Used to indicate fceux has been launched and is running self.is_exiting = 0 # Used to stop the listening thread self.last_frame = 0 # Last processed frame self.reward = 0 # Reward for last action self.episode_reward = 0 # Total rewards for episode self.is_finished = False self.screen = np.zeros(shape=(self.screen_height, self.screen_width, 3), dtype=np.uint8) self.info = {} self.level = 0 self._reset_info_vars() self.first_step = False self.lock = (NesLock()).get_lock() # Seeding self.curr_seed = 0 self._seed()
def __init__(self, level): self.previous_level = -1 self.level = level self.game = DoomGame() self.loader = Loader() self.doom_dir = os.path.dirname(os.path.abspath(__file__)) self._mode = 'algo' # 'algo' or 'human' self.no_render = False # To disable double rendering in human mode self.viewer = None self.is_initialized = False # Indicates that reset() has been called self.curr_seed = 0 self.lock = (DoomLock()).get_lock() self.action_space = spaces.MultiDiscrete([[0, 1]] * 38 + [[-10, 10]] * 2 + [[-100, 100]] * 3) self.allowed_actions = list(range(NUM_ACTIONS)) self.screen_height = 480 self.screen_width = 640 self.screen_resolution = ScreenResolution.RES_640X480 self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3)) self._seed() self._configure()
def make_pdtype(ac_space): from gym import spaces if isinstance(ac_space, spaces.Box): assert len(ac_space.shape) == 1 return DiagGaussianPdType(ac_space.shape[0]) elif isinstance(ac_space, spaces.Discrete): return CategoricalPdType(ac_space.n) elif isinstance(ac_space, spaces.MultiDiscrete): return MultiCategoricalPdType(ac_space.low, ac_space.high) elif isinstance(ac_space, spaces.MultiBinary): return BernoulliPdType(ac_space.n) else: raise NotImplementedError
def __init__(self, env_data_filename, mode="single", max_utterance_len=20, max_game_turns=20): with open(env_data_filename, mode="rb") as in_file: self._env_data = pickle.load(in_file) self._mode = mode self._max_game_turns = max_game_turns self._num_tokens = len(self._env_data["id2token"]) self._num_entities = len(self._env_data["env_data"]) self._entities = list(self._env_data["env_data"].keys()) self.action_space = spaces.Discrete(self._num_tokens) self.observation_space = spaces.MultiDiscrete([[0, self._num_tokens] * max_utterance_len]) self._last_entity = 0 self._last_question = 0 self._last_sequence = [] self._game_turns = None self._game_score = 0
def _to_rf_space(space): """Converts Gym space instance into ReinforceFlow.""" if isinstance(space, spaces.Discrete): return DiscreteOneHot(space.n) if isinstance(space, spaces.MultiDiscrete): # space.low > 0 will lead to unused first n actions. # return Tuple([DiscreteOneHot(n) for n in space.high]) raise ValueError("MultiDiscrete spaces aren't supported yet.") if isinstance(space, spaces.MultiBinary): return Tuple([DiscreteOneHot(2) for _ in space.n]) if isinstance(space, spaces.Box): return Continious(space.low, space.high) if isinstance(space, spaces.Tuple): converted_spaces = [] for sub_space in space.spaces: converted_spaces.append(_to_rf_space(sub_space)) return Tuple(*converted_spaces) raise ValueError("Unsupported space %s." % space)
def _make_gym2rf_converter(space): """Makes converter function that maps space samples Gym -> ReinforceFlow.""" # TODO: add spaces.MultiDiscrete support. if isinstance(space, spaces.Discrete): def converter(sample): return one_hot(space.n, sample) return converter if isinstance(space, spaces.MultiBinary): def converter(sample): return tuple([one_hot(2, s) for s in sample]) return converter if isinstance(space, spaces.Box): return lambda sample: sample if isinstance(space, spaces.Tuple): sub_converters = [] for sub_space in space.spaces: sub_converters.append(_make_gym2rf_converter(sub_space)) def converter(sample): converted_tuple = [] for sub_sample, sub_converter in zip(sample, sub_converters): converted_tuple.append(sub_converter(sub_sample)) return tuple(converted_tuple) return converter raise ValueError("Unsupported space %s." % space)
def test_flattened_wrapper(): expect = gym.make("ExpectTest-v0") md = spaces.MultiDiscrete([(0, 1), (0, 1)]) expect.action_space = md expect.expectation = (1, 1) wrapper = FlattenedActionWrapper(expect) wrapper.step(3)
def is_discrete(space): """ Checks if a space is discrete. A space is considered to be discrete if it is derived from Discrete, MultiDiscrete or MultiBinary. A Tuple space is discrete if it contains only discrete subspaces. :raises TypeError: If the space is no `gym.Space`. """ assert_space(space) if isinstance(space, (spaces.Discrete, spaces.MultiDiscrete, spaces.MultiBinary)): return True elif isinstance(space, spaces.Box): return False elif isinstance(space, spaces.Tuple): return all(map(is_discrete, space.spaces)) raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
def __init__(self, multi_discrete, options=None): assert isinstance(multi_discrete, MultiDiscrete) self.multi_discrete = multi_discrete self.num_discrete_space = self.multi_discrete.num_discrete_space # Config 1 if options is None: self.n = self.num_discrete_space + 1 # +1 for NOOP at beginning self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)} for i in range(self.num_discrete_space): self.mapping[i + 1][i] = self.multi_discrete.high[i] # Config 2 elif isinstance(options, list): assert len(options) <= self.num_discrete_space self.n = len(options) + 1 # +1 for NOOP at beginning self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)} for i, disc_num in enumerate(options): assert disc_num < self.num_discrete_space self.mapping[i + 1][disc_num] = self.multi_discrete.high[disc_num] # Config 3 elif isinstance(options, dict): self.n = len(options.keys()) self.mapping = options for i, key in enumerate(options.keys()): if i != key: raise Error('DiscreteToMultiDiscrete must contain ordered keys. ' \ 'Item {0} should have a key of "{0}", but key "{1}" found instead.'.format(i, key)) if not self.multi_discrete.contains(options[key]): raise Error('DiscreteToMultiDiscrete mapping for key {0} is ' \ 'not contained in the underlying MultiDiscrete action space. ' \ 'Invalid mapping: {1}'.format(key, options[key])) # Unknown parameter provided else: raise Error('DiscreteToMultiDiscrete - Invalid parameter provided.')
def __init__(self, multi_discrete, options=None): assert isinstance(multi_discrete, MultiDiscrete) self.multi_discrete = multi_discrete self.num_discrete_space = self.multi_discrete.num_discrete_space if options is None: options = list(range(self.num_discrete_space)) if not isinstance(options, list): raise Error('BoxToMultiDiscrete - Invalid parameter provided.') assert len(options) <= self.num_discrete_space self.low = np.array([self.multi_discrete.low[x] for x in options]) self.high = np.array([self.multi_discrete.high[x] for x in options]) self.mapping = { i: disc_num for i, disc_num in enumerate(options)}
def __init__(self, multi_discrete, options=None): super().__init__(0) assert isinstance(multi_discrete, MultiDiscrete) self.multi_discrete = multi_discrete self.num_discrete_space = self.multi_discrete.num_discrete_space # Config 1 if options is None: self.n = self.num_discrete_space + 1 # +1 for NOOP at beginning self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)} for i in range(self.num_discrete_space): self.mapping[i + 1][i] = self.multi_discrete.high[i] # Config 2 elif isinstance(options, list): assert len(options) <= self.num_discrete_space self.n = len(options) + 1 # +1 for NOOP at beginning self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)} for i, disc_num in enumerate(options): assert disc_num < self.num_discrete_space self.mapping[i + 1][disc_num] = self.multi_discrete.high[disc_num] # Config 3 elif isinstance(options, dict): self.n = len(list(options.keys())) self.mapping = options for i, key in enumerate(options.keys()): if i != key: raise Error('DiscreteToMultiDiscrete must contain ordered keys. ' \ 'Item {0} should have a key of "{0}", but key "{1}" found instead.'.format(i, key)) if not self.multi_discrete.contains(options[key]): raise Error('DiscreteToMultiDiscrete mapping for key {0} is ' \ 'not contained in the underlying MultiDiscrete action space. ' \ 'Invalid mapping: {1}'.format(key, options[key])) # Unknown parameter provided else: raise Error('DiscreteToMultiDiscrete - Invalid parameter provided.')
def _set_action(self, action, agent, action_space, time=None): agent.action.u = np.zeros(self.world.dim_p) agent.action.c = np.zeros(self.world.dim_c) # process action if isinstance(action_space, spaces.MultiDiscrete): act = [] size = action_space.high - action_space.low + 1 index = 0 for s in size: act.append(action[index:(index+s)]) index += s action = act else: action = [action] if agent.movable: # physical action if self.discrete_action_input: agent.action.u = np.zeros(self.world.dim_p) # process discrete action if action[0] == 1: agent.action.u[0] = -1.0 if action[0] == 2: agent.action.u[0] = +1.0 if action[0] == 3: agent.action.u[1] = -1.0 if action[0] == 4: agent.action.u[1] = +1.0 else: if self.force_discrete_action: d = np.argmax(action[0]) action[0][:] = 0.0 action[0][d] = 1.0 if self.discrete_action_space: agent.action.u[0] += action[0][1] - action[0][2] agent.action.u[1] += action[0][3] - action[0][4] else: agent.action.u = action[0] sensitivity = 5.0 if agent.accel is not None: sensitivity = agent.accel agent.action.u *= sensitivity action = action[1:] if not agent.silent: # communication action if self.discrete_action_input: agent.action.c = np.zeros(self.world.dim_c) agent.action.c[action[0]] = 1.0 else: agent.action.c = action[0] action = action[1:] # make sure we used all elements of action assert len(action) == 0 # reset rendering assets