我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用gym.spaces.Tuple()。
def __init__(self, ball_speed=4, bat_speed=4, max_num_rounds=20): SCREEN_WIDTH, SCREEN_HEIGHT = 160, 210 self.observation_space = spaces.Tuple([ spaces.Box( low=0, high=255, shape=(SCREEN_HEIGHT, SCREEN_WIDTH, 3)), spaces.Box( low=0, high=255, shape=(SCREEN_HEIGHT, SCREEN_WIDTH, 3)) ]) self.action_space = spaces.Tuple( [spaces.Discrete(3), spaces.Discrete(3)]) pygame.init() self._surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT)) self._viewer = None self._game = PongGame( has_double_players=True, window_size=(SCREEN_WIDTH, SCREEN_HEIGHT), ball_speed=ball_speed, bat_speed=bat_speed, max_num_rounds=max_num_rounds)
def __init__(self, natural=False): """ Initialize environment """ # I use array of len 1 to store constants (otherwise there were some errors) self.action_space = spaces.Tuple(( spaces.Box(-5.0, 0.0, 1), # learning rate spaces.Box(-7.0, -2.0, 1), # decay spaces.Box(-5.0, 0.0, 1), # momentum spaces.Box(2, 8, 1), # batch size spaces.Box(-6.0, 1.0, 1), # l1 reg spaces.Box(-6.0, 1.0, 1), # l2 reg spaces.Box(0.0, 1.0, (5, 2)), # convolutional layer parameters spaces.Box(0.0, 1.0, (2, 2)), # fully connected layer parameters )) # observation features, in order: num of instances, num of labels, # validation accuracy after training with given parameters self.observation_space = spaces.Box(-1e5, 1e5, 2) # validation accuracy # Start the first game self._reset()
def __init__(self, natural=False): """ Initialize environment """ # I use array of len 1 to store constants (otherwise there were some errors) self.action_space = spaces.Tuple(( spaces.Box(-5.0,0.0, 1), # learning rate spaces.Box(-7.0,-2.0, 1), # decay spaces.Box(-5.0,0.0, 1), # momentum spaces.Box(2, 8, 1), # batch size spaces.Box(-6.0,1.0, 1), # l1 reg spaces.Box(-6.0,1.0, 1), # l2 reg )) # observation features, in order: num of instances, num of labels, # number of filter in part A / B of neural net, num of neurons in # output layer, validation accuracy after training with given # parameters self.observation_space = spaces.Box(-1e5,1e5, 6) # validation accuracy # Start the first game self._reset()
def is_compound(space): """ Checks whether a space is a compound space. These are non-scalar `Box` spaces, `MultiDiscrete`, `MultiBinary` and `Tuple` spaces (A Tuple space with a single, non-compound subspace is still considered compound). :raises TypeError: If the space is no `gym.Space`. """ assert_space(space) if isinstance(space, spaces.Discrete): return False elif isinstance(space, spaces.Box): return len(space.shape) != 1 or space.shape[0] != 1 elif isinstance(space, (spaces.MultiDiscrete, spaces.MultiBinary)): return True elif isinstance(space, spaces.Tuple): return True raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
def num_discrete_actions(space): """ For a discrete space, gets the number of available actions as a tuple. :param gym.Space space: The discrete space which to inspect. :return tuple: Tuple of integers containing the number of discrete actions. :raises TypeError: If the space is no `gym.Space`. """ assert_space(space) if not is_discrete(space): raise TypeError("Space {} is not discrete".format(space)) if isinstance(space, spaces.Discrete): return tuple((space.n,)) elif isinstance(space, spaces.MultiDiscrete): # add +1 here as space.high is an inclusive bound return tuple(space.high - space.low + 1) elif isinstance(space, spaces.MultiBinary): return (2,) * space.n raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space))) # pragma: no cover
def __init__(self, dim=(14, 9)): self.dim = dim self.size = dim[0] * dim[1] self.max_blocks_per_turn = min(dim) self.target_difficulty = None self.target_pos = None # Observe the world self.observation_space = spaces.Tuple(( spaces.Box(0, num_block_type, shape=dim), spaces.Box(np.array([0, 0]), np.array(dim)), spaces.Discrete(num_directions), spaces.Box(0, 1, shape=(1)) )) # Actions allow the world to be populated. self.action_space = spaces.Discrete(num_actions)
def __init__(self): self.observation_space = spaces.Discrete(NUM_CLASSES) self.action_space = spaces.Tuple( tuple(spaces.Discrete(2) for _ in range(NUM_CLASSES)) ) # Total number of notes self.num_notes = 32 self.key = C_MAJOR_KEY
def __init__(self, natural=False): self.action_space = spaces.Discrete(2) self.observation_space = spaces.Tuple(( spaces.Discrete(32), spaces.Discrete(11), spaces.Discrete(2))) self._seed() # Flag to payout 1.5 on a "natural" blackjack win, like casino rules # Ref: http://www.bicyclecards.com/how-to-play/blackjack/ self.natural = natural # Start the first game self._reset() # Number of self.nA = 2
def repeat_space(space, n): return spaces.Tuple([space] * n)
def __init__(self, board_size=(5, 9), wind_proba=0.2, stay_wind=True): self.board_size = board_size self.wind_proba = wind_proba self.stay_wind = stay_wind self._seed() self._reset() self.action_space = spaces.Discrete(len(ACTION_MEANING)) self.observation_space = spaces.Tuple( (spaces.Discrete(board_size[0]), spaces.Discrete(board_size[1]))) self.window = None
def __init__(self, base=10, chars=False, starting_min_length=2): """ base: Number of distinct characters. chars: If True, use uppercase alphabet. Otherwise, digits. Only affects rendering. starting_min_length: Minimum input string length. Ramps up as episodes are consistently solved. """ self.base = base # Keep track of this many past episodes self.last = 10 # Cumulative reward earned this episode self.episode_total_reward = None # Running tally of reward shortfalls. e.g. if there were 10 points to earn and # we got 8, we'd append -2 AlgorithmicEnv.reward_shortfalls = [] if chars: self.charmap = [chr(ord('A')+i) for i in range(base)] else: self.charmap = [str(i) for i in range(base)] self.charmap.append(' ') # TODO: Not clear why this is a class variable rather than instance. # Could lead to some spooky action at a distance if someone is working # with multiple algorithmic envs at once. Also makes testing tricky. AlgorithmicEnv.min_length = starting_min_length # Three sub-actions: # 1. Move read head left or write (or up/down) # 2. Write or not # 3. Which character to write. (Ignored if should_write=0) self.action_space = Tuple( [Discrete(len(self.MOVEMENTS)), Discrete(2), Discrete(self.base)] ) # Can see just what is on the input tape (one of n characters, or nothing) self.observation_space = Discrete(self.base + 1) self._seed() self.reset()
def __init__(self): super(OffSwitchCartpoleEnv, self).__init__() self.observation_space = spaces.Tuple((spaces.Discrete(2), self.observation_space)) self.left_threshold_crossed = False # number of episodes in which the cart crossed the left/right threshold (first). self.num_crosses = [0.,0.]
def __init__(self): super(OffSwitchCartpoleProbEnv, self).__init__() self.observation_space = spaces.Tuple((spaces.Discrete(2), self.observation_space)) self.threshold_crossed = False # number of episodes in which the cart crossed the left/right threshold (first). self.num_crosses = [0.,0.]
def __init__(self): super(PredictObsCartpoleEnv, self).__init__() self.cartpole = CartPoleEnv() self.observation_space = self.cartpole.observation_space self.action_space = spaces.Tuple((self.cartpole.action_space,) + (self.cartpole.observation_space,) * (NUM_PREDICTED_OBSERVATIONS))
def __init__(self, natural=False): self.action_space = spaces.Discrete(2) self.observation_space = spaces.Tuple(( spaces.Discrete(32), spaces.Discrete(11), spaces.Discrete(2))) self._seed() # Flag to payout 1.5 on a "natural" blackjack win, like casino rules # Ref: http://www.bicyclecards.com/how-to-play/blackjack/ self.natural = natural # Start the first game self._reset()
def __init__(self, initialWealth=25.0, edge=0.6, maxWealth=250.0, maxRounds=300): self.action_space = spaces.Discrete(int(maxWealth*100)) # betting in penny increments self.observation_space = spaces.Tuple(( spaces.Box(0, maxWealth, [1]), # (w,b) spaces.Discrete(maxRounds+1))) self.reward_range = (0, maxWealth) self.edge = edge self.wealth = initialWealth self.initialWealth = initialWealth self.maxRounds = maxRounds self.maxWealth = maxWealth self._seed() self._reset()
def __init__(self, initialWealth=25.0, edgePriorAlpha=7, edgePriorBeta=3, maxWealthAlpha=5.0, maxWealthM=200.0, maxRoundsMean=300.0, maxRoundsSD=25.0, reseed=True): # store the hyperparameters for passing back into __init__() during resets so the same hyperparameters govern the next game's parameters, as the user expects: TODO: this is boilerplate, is there any more elegant way to do this? self.initialWealth=float(initialWealth) self.edgePriorAlpha=edgePriorAlpha self.edgePriorBeta=edgePriorBeta self.maxWealthAlpha=maxWealthAlpha self.maxWealthM=maxWealthM self.maxRoundsMean=maxRoundsMean self.maxRoundsSD=maxRoundsSD # draw this game's set of parameters: edge = prng.np_random.beta(edgePriorAlpha, edgePriorBeta) maxWealth = round(genpareto.rvs(maxWealthAlpha, maxWealthM, random_state=prng.np_random)) maxRounds = int(round(prng.np_random.normal(maxRoundsMean, maxRoundsSD))) # add an additional global variable which is the sufficient statistic for the Pareto distribution on wealth cap; # alpha doesn't update, but x_m does, and simply is the highest wealth count we've seen to date: self.maxEverWealth = float(self.initialWealth) # for the coinflip edge, it is total wins/losses: self.wins = 0 self.losses = 0 # for the number of rounds, we need to remember how many rounds we've played: self.roundsElapsed = 0 # the rest proceeds as before: self.action_space = spaces.Discrete(int(maxWealth*100)) self.observation_space = spaces.Tuple(( spaces.Box(0, maxWealth, shape=[1]), # current wealth spaces.Discrete(maxRounds+1), # rounds elapsed spaces.Discrete(maxRounds+1), # wins spaces.Discrete(maxRounds+1), # losses spaces.Box(0, maxWealth, [1]))) # maximum observed wealth self.reward_range = (0, maxWealth) self.edge = edge self.wealth = self.initialWealth self.maxRounds = maxRounds self.rounds = self.maxRounds self.maxWealth = maxWealth if reseed or not hasattr(self, 'np_random') : self._seed()
def __init__(self, spec): self.spec = spec self.space = spaces.Tuple([conv.space for _, conv in spec])
def __init__(self, conv, permutation): self.conv = conv self.permutation = permutation self.space = spaces.Tuple([conv.space for _ in permutation])
def reshape(self, new_shape): raise NotImplementedError("Use reshape separately for each space in Tuple.")
def __init__(self, env, obs_stack): super(ObservationStackWrap, self).__init__(env=env) assert obs_stack > 1, "Observation stack length must be higher than 1." assert not isinstance(self.observation_space, Tuple),\ "Observation stack is not compatible with Tuple spaces." self._obs_stack_len = obs_stack or 1 self.observation_space = self.env.observation_space new_shape = list(self.observation_space.shape) new_shape[-1] = self.observation_space.shape[-1] * obs_stack self.observation_space.reshape(tuple(new_shape)) self._obs_stack = None
def _make_rf2gym_converter(space): """Makes space converter function that maps space samples ReinforceFlow -> Gym.""" # TODO: add spaces.MultiDiscrete support. if isinstance(space, spaces.Discrete): def converter(sample): return np.argmax(sample) return converter if isinstance(space, spaces.MultiBinary): def converter(sample): return tuple([np.argmax(s) for s in sample]) return converter if isinstance(space, spaces.Box): return lambda sample: sample if isinstance(space, spaces.Tuple): sub_converters = [] for sub_space in space.spaces: sub_converters.append(_make_rf2gym_converter(sub_space)) def converter(sample): converted_tuple = [] for sub_sample, sub_converter in zip(sample, sub_converters): converted_tuple.append(sub_converter(sub_sample)) return tuple(converted_tuple) return converter raise ValueError("Unsupported space %s." % space)
def __init__(self, env=None): super(AtariRescale42x42Wrapper, self).__init__(env) if isinstance(self.observation_space, spaces.Tuple): self.observation_space = spaces.Tuple([ gym.spaces.Box(0.0, 1.0, [1, 42, 42]) for space in self.env.observation_space.spaces ]) else: self.observation_space = gym.spaces.Box(0.0, 1.0, [1, 42, 42])
def _take_action(self, actions): # if there is only one action space, it wasn't wrapped in Tuple if len(self.action_spaces) == 1: actions = [actions] # send appropriate command for different actions for spc, cmds, acts in zip(self.action_spaces, self.action_names, actions): if isinstance(spc, spaces.Discrete): logger.debug(cmds[acts]) self.agent_host.sendCommand(cmds[acts]) elif isinstance(spc, spaces.Box): for cmd, val in zip(cmds, acts): logger.debug(cmd + " " + str(val)) self.agent_host.sendCommand(cmd + " " + str(val)) elif isinstance(spc, spaces.MultiDiscrete): for cmd, val in zip(cmds, acts): logger.debug(cmd + " " + str(val)) self.agent_host.sendCommand(cmd + " " + str(val)) else: logger.warn("Unknown action space for %s, ignoring." % cmds)
def __init__(self): super(PredictActionsCartpoleEnv, self).__init__() self.cartpole = CartPoleEnv() self.observation_space = self.cartpole.observation_space self.action_space = spaces.Tuple((self.cartpole.action_space,) * (NUM_PREDICTED_ACTIONS+1))
def __init__(self, worlds_pickle_filename=os.path.join(os.path.dirname(__file__), "assets", "worlds_640x480_v0.pkl"), world_idx=0, initial_position = np.array([-20.0, -20.0]), destination = np.array([520.0, 400.0]), max_observation_range = 100.0, destination_tolerance_range=20.0, add_self_position_to_observation=False, add_goal_position_to_observation=False): worlds = EnvironmentCollection() worlds.read(worlds_pickle_filename) self.world = worlds.map_collection[world_idx] self.set_destination(destination) assert not (self.destination is None) self.init_position = initial_position self.state = self.init_position.copy() self.max_observation_range = max_observation_range self.destination_tolerance_range = destination_tolerance_range self.viewer = None self.num_beams = 16 self.max_speed = 5 self.add_self_position_to_observation = add_self_position_to_observation self.add_goal_position_to_observation = add_goal_position_to_observation low = np.array([0.0, 0.0]) high = np.array([self.max_speed, 2*pi]) self.action_space = Box(low, high)#Tuple( (Box(0.0, self.max_speed, (1,)), Box(0.0, 2*pi, (1,))) ) low = [-1.0] * self.num_beams high = [self.max_observation_range] * self.num_beams if add_self_position_to_observation: low.extend([-10000., -10000.]) # x and y coords high.extend([10000., 10000.]) if add_goal_position_to_observation: low.extend([-10000., -10000.]) # x and y coords high.extend([10000., 10000.]) self.observation_space = Box(np.array(low), np.array(high)) self.observation = []
def _render(self, mode='human', close=False): if close: # Nothing interesting to close return outfile = StringIO() if mode == 'ansi' else sys.stdout inp = "Total length of input instance: %d, step: %d\n" % (self.input_width, self.time) outfile.write(inp) x, y, action = self.read_head_position, self.write_head_position, self.last_action if action is not None: inp_act, out_act, pred = action outfile.write("=" * (len(inp) - 1) + "\n") y_str = "Output Tape : " target_str = "Targets : " if action is not None: pred_str = self.charmap[pred] x_str = self._render_observation() for i in range(-2, len(self.target) + 2): target_str += self._get_str_target(i) if i < y - 1: y_str += self._get_str_target(i) elif i == (y - 1): if action is not None and out_act == 1: color = 'green' if pred == self.target[i] else 'red' y_str += colorize(pred_str, color, highlight=True) else: y_str += self._get_str_target(i) outfile.write(x_str) outfile.write(y_str + "\n") outfile.write(target_str + "\n\n") if action is not None: outfile.write("Current reward : %.3f\n" % self.last_reward) outfile.write("Cumulative reward : %.3f\n" % self.episode_total_reward) move = self.MOVEMENTS[inp_act] outfile.write("Action : Tuple(move over input: %s,\n" % move) out_act = out_act == 1 outfile.write(" write to the output tape: %s,\n" % out_act) outfile.write(" prediction: %s)\n" % pred_str) else: outfile.write("\n" * 5) return outfile
def _to_rf_space(space): """Converts Gym space instance into ReinforceFlow.""" if isinstance(space, spaces.Discrete): return DiscreteOneHot(space.n) if isinstance(space, spaces.MultiDiscrete): # space.low > 0 will lead to unused first n actions. # return Tuple([DiscreteOneHot(n) for n in space.high]) raise ValueError("MultiDiscrete spaces aren't supported yet.") if isinstance(space, spaces.MultiBinary): return Tuple([DiscreteOneHot(2) for _ in space.n]) if isinstance(space, spaces.Box): return Continious(space.low, space.high) if isinstance(space, spaces.Tuple): converted_spaces = [] for sub_space in space.spaces: converted_spaces.append(_to_rf_space(sub_space)) return Tuple(*converted_spaces) raise ValueError("Unsupported space %s." % space)
def _make_gym2rf_converter(space): """Makes converter function that maps space samples Gym -> ReinforceFlow.""" # TODO: add spaces.MultiDiscrete support. if isinstance(space, spaces.Discrete): def converter(sample): return one_hot(space.n, sample) return converter if isinstance(space, spaces.MultiBinary): def converter(sample): return tuple([one_hot(2, s) for s in sample]) return converter if isinstance(space, spaces.Box): return lambda sample: sample if isinstance(space, spaces.Tuple): sub_converters = [] for sub_space in space.spaces: sub_converters.append(_make_gym2rf_converter(sub_space)) def converter(sample): converted_tuple = [] for sub_sample, sub_converter in zip(sample, sub_converters): converted_tuple.append(sub_converter(sub_sample)) return tuple(converted_tuple) return converter raise ValueError("Unsupported space %s." % space)
def is_discrete(space): """ Checks if a space is discrete. A space is considered to be discrete if it is derived from Discrete, MultiDiscrete or MultiBinary. A Tuple space is discrete if it contains only discrete subspaces. :raises TypeError: If the space is no `gym.Space`. """ assert_space(space) if isinstance(space, (spaces.Discrete, spaces.MultiDiscrete, spaces.MultiBinary)): return True elif isinstance(space, spaces.Box): return False elif isinstance(space, spaces.Tuple): return all(map(is_discrete, space.spaces)) raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))