我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gym.spaces.Discrete()。
def __init__(self, ball_speed=4, bat_speed=4, max_num_rounds=20): SCREEN_WIDTH, SCREEN_HEIGHT = 160, 210 self.observation_space = spaces.Tuple([ spaces.Box( low=0, high=255, shape=(SCREEN_HEIGHT, SCREEN_WIDTH, 3)), spaces.Box( low=0, high=255, shape=(SCREEN_HEIGHT, SCREEN_WIDTH, 3)) ]) self.action_space = spaces.Tuple( [spaces.Discrete(3), spaces.Discrete(3)]) pygame.init() self._surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT)) self._viewer = None self._game = PongGame( has_double_players=True, window_size=(SCREEN_WIDTH, SCREEN_HEIGHT), ball_speed=ball_speed, bat_speed=bat_speed, max_num_rounds=max_num_rounds)
def __init__(self, env, gym_core_id=None): super(GymCoreAction, self).__init__(env) if gym_core_id is None: # self.spec is None while inside of the make, so we need # to pass gym_core_id in explicitly there. This case will # be hit when instantiating by hand. gym_core_id = self.spec._kwargs['gym_core_id'] spec = gym.spec(gym_core_id) raw_action_space = gym_core_action_space(gym_core_id) self._actions = raw_action_space.actions self.action_space = gym_spaces.Discrete(len(self._actions)) if spec._entry_point.startswith('gym.envs.atari:'): self.key_state = translator.AtariKeyState(gym.make(gym_core_id)) else: self.key_state = None
def configureActions(self, discrete_actions): # true if action space is discrete; 3 values; no push, left, right # false if action space is continuous; fx, both (-action_force, action_force) self.discrete_actions = discrete_actions # 3 discrete actions: no push, left, right # 1 continuous action elements; fx if self.discrete_actions: self.action_space = spaces.Discrete(3) else: self.action_space = spaces.Box(-1.0, 1.0, shape=(1, 1)) # Our observations can be within this box float_max = np.finfo(np.float32).max self.observation_space = gym.spaces.Box(-float_max, float_max, self.state_shape)
def configureActions(self, discrete_actions): # true if action space is discrete; 5 values; no push, left, right, up & down # false if action space is continuous; fx, fy both (-action_force, action_force) self.discrete_actions = discrete_actions # 5 discrete actions: no push, left, right # 2 continuous action elements; fx & fy if self.discrete_actions: self.action_space = spaces.Discrete(5) else: self.action_space = spaces.Box(-1.0, 1.0, shape=(2,)) # Our observations can be within this box float_max = np.finfo(np.float32).max self.observation_space = gym.spaces.Box(-float_max, float_max, self.state_shape)
def configureActions(self, discrete_actions): # if it is possible to switch actions, do this here # true if action space is discrete # false if action space is continuous self.discrete_actions = discrete_actions # if self.discrete_actions: # self.action_space = spaces.Discrete(3) # else: # self.action_space = spaces.Box(-1.0, 1.0, shape=(1, 1)) # # Our observations can be within this box # float_max = np.finfo(np.float32).max # self.observation_space = gym.spaces.Box(-float_max, float_max, self.state_shape)
def __init__(self, maxUmbralAstral): # Print debug self.debug = False # Outer bound for Astral Fire and Umbral Ice BLM.MAXUMBRALASTRAL = maxUmbralAstral # Available buffs self.BUFFS = [] # Maximum time available self.MAXTIME = 45 self.HELPER = BLM.Helper() # Available abilities self.ABILITIES = [ BLM.Ability("Blizzard 1", 180, 6, 2.5, 2.49, self.HELPER.UmbralIceIncrease, BLM.DamageType.Ice, self.HELPER), #480 BLM.Ability("Fire 1", 180, 15, 2.5, 2.49, self.HELPER.AstralFireIncrease, BLM.DamageType.Fire, self.HELPER), #1200 BLM.Ability("Transpose", 0, 0, 0.75, 12.9, self.HELPER.SwapAstralUmbral, BLM.DamageType.Neither, self.HELPER), BLM.Ability("Fire 3", 240, 30, 3.5, 2.5, self.HELPER.AstralFireMax, BLM.DamageType.Fire, self.HELPER), #2400 BLM.Ability("Blizzard 3", 240, 18, 3.5, 2.5, self.HELPER.UmbralIceMax, BLM.DamageType.Ice, self.HELPER), #2400 BLM.Ability("Fire 4", 260, 15, 2.8, 2.5, None, BLM.DamageType.Fire, self.HELPER)] #2400 # State including ability cooldowns, buff time remaining, mana, and Astral/Umbral self.initialState = np.array([0] * (len(self.ABILITIES) + len(self.BUFFS)) + [BLM.MAXMANA] + [0]) self.state = self._reset() # What the learner can pick between self.action_space = spaces.Discrete(len(self.ABILITIES)) # What the learner can see to make a choice (cooldowns and buffs) self.observation_space = spaces.MultiDiscrete([[0,180]] * (len(self.ABILITIES) + len(self.BUFFS)) + [[0, BLM.MAXMANA]] + [[-3,3]])
def __init__(self, env, monitor_path, video=True, **usercfg): super(CEM, self).__init__(**usercfg) self.env = wrappers.Monitor(env, monitor_path, force=True, video_callable=(None if video else False)) self.config.update(dict( num_steps=env.spec.tags.get("wrapper_config.TimeLimit.max_episode_steps"), # maximum length of episode n_iter=100, # number of iterations of CEM batch_size=25, # number of samples per batch elite_frac=0.2 # fraction of samples used as elite set )) self.config.update(usercfg) if isinstance(env.action_space, Discrete): self.dim_theta = (env.observation_space.shape[0] + 1) * env.action_space.n elif isinstance(env.action_space, Box): self.dim_theta = (env.observation_space.shape[0] + 1) * env.action_space.shape[0] else: raise NotImplementedError # Initialize mean and standard deviation self.theta_mean = np.zeros(self.dim_theta) self.theta_std = np.ones(self.dim_theta)
def __init__(self): self._seed() self.viewer = None self.world = Box2D.b2World() self.moon = None self.lander = None self.particles = [] self.prev_reward = None high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher self.observation_space = spaces.Box(-high, high) if self.continuous: # Action is two floats [main engine, left-right engines]. # Main engine: -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power. # Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off self.action_space = spaces.Box(-1, +1, (2,)) else: # Nop, fire left engine, main engine, right engine self.action_space = spaces.Discrete(4) self._reset()
def __init__(self): self.min_position = -1.2 self.max_position = 0.6 self.max_speed = 0.07 self.goal_position = 0.5 self.low = np.array([self.min_position, -self.max_speed]) self.high = np.array([self.max_position, self.max_speed]) self.viewer = None self.action_space = spaces.Discrete(3) self.observation_space = spaces.Box(self.low, self.high) self._seed() self.reset()
def __init__(self, *args, **kwargs): super(TestConverters, self).__init__(*args, **kwargs) self.space_d = spaces.Discrete(4) self.gym_out_d = 2 self.rf_out_d = [0, 0, 1, 0] self.space_c = spaces.Box(-1, 1, [2, 4]) self.gym_out_c = np.random.uniform(low=-1, high=1, size=(2, 4)) self.rf_out_c = self.gym_out_c self.space_b = spaces.MultiBinary(4) self.gym_out_b = [0, 1, 0, 1] self.rf_out_b = [[1, 0], [0, 1], [1, 0], [0, 1]] self.space_t = spaces.Tuple((self.space_d, self.space_c, self.space_b, spaces.Tuple((self.space_d, self.space_c)) )) self.gym_out_t = tuple([self.gym_out_d, self.gym_out_c, self.gym_out_b, tuple([self.gym_out_d, self.gym_out_c])]) self.rf_out_t = tuple([self.rf_out_d, self.rf_out_c, self.rf_out_b, tuple([self.rf_out_d, self.rf_out_c])])
def __init__(self, ball_speed=4, bat_speed=4, max_num_rounds=20): SCREEN_WIDTH, SCREEN_HEIGHT = 160, 210 self.observation_space = spaces.Box( low=0, high=255, shape=(SCREEN_HEIGHT, SCREEN_WIDTH, 3)) self.action_space = spaces.Discrete(3) pygame.init() self._surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT)) self._viewer = None self._game = PongGame( has_double_players=False, window_size=(SCREEN_WIDTH, SCREEN_HEIGHT), ball_speed=ball_speed, bat_speed=bat_speed, max_num_rounds=max_num_rounds)
def __init__(self, action_space, observation_space, q_init=0.0, learning_rate=0.1, discount=1.0, epsilon=0.05): if not isinstance(action_space, spaces.Discrete): raise TypeError("Action space type should be Discrete.") if not isinstance(observation_space, spaces.Discrete): raise TypeError("Observation space type should be Discrete.") self._action_space = action_space self._learning_rate = learning_rate self._discount = discount self._epsilon = epsilon self._q = defaultdict(lambda: q_init * np.ones(action_space.n))
def __init__(self, action_space, observation_space, batch_size=128, learning_rate=1e-3, discount=1.0, epsilon=0.05): if not isinstance(action_space, spaces.Discrete): raise TypeError("Action space type should be Discrete.") self._action_space = action_space self._batch_size = batch_size self._discount = discount self._epsilon = epsilon self._q_network = ConvNet( num_channel_input=observation_space.shape[0], num_output=action_space.n) self._optimizer = optim.RMSprop( self._q_network.parameters(), lr=learning_rate) self._memory = ReplayMemory(100000)
def __init__(self, action_space, observation_space, batch_size=128, learning_rate=1e-3, discount=1.0, epsilon=0.05): if not isinstance(action_space, spaces.Discrete): raise TypeError("Action space type should be Discrete.") self._action_space = action_space self._batch_size = batch_size self._discount = discount self._epsilon = epsilon self._q_network = FCNet( input_size=reduce(lambda x, y: x * y, observation_space.shape), output_size=action_space.n) self._optimizer = optim.RMSprop( self._q_network.parameters(), lr=learning_rate) self._memory = ReplayMemory(100000)
def is_compound(space): """ Checks whether a space is a compound space. These are non-scalar `Box` spaces, `MultiDiscrete`, `MultiBinary` and `Tuple` spaces (A Tuple space with a single, non-compound subspace is still considered compound). :raises TypeError: If the space is no `gym.Space`. """ assert_space(space) if isinstance(space, spaces.Discrete): return False elif isinstance(space, spaces.Box): return len(space.shape) != 1 or space.shape[0] != 1 elif isinstance(space, (spaces.MultiDiscrete, spaces.MultiBinary)): return True elif isinstance(space, spaces.Tuple): return True raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
def num_discrete_actions(space): """ For a discrete space, gets the number of available actions as a tuple. :param gym.Space space: The discrete space which to inspect. :return tuple: Tuple of integers containing the number of discrete actions. :raises TypeError: If the space is no `gym.Space`. """ assert_space(space) if not is_discrete(space): raise TypeError("Space {} is not discrete".format(space)) if isinstance(space, spaces.Discrete): return tuple((space.n,)) elif isinstance(space, spaces.MultiDiscrete): # add +1 here as space.high is an inclusive bound return tuple(space.high - space.low + 1) elif isinstance(space, spaces.MultiBinary): return (2,) * space.n raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space))) # pragma: no cover
def get_actions(game_or_env): if isinstance(game_or_env, str): env = gym.make(game_or_env) else: env = game_or_env if isinstance(env.action_space, Discrete): num_actions = env.action_space.n elif isinstance(env.action_space, Box): num_actions = np.prod(env.action_space.shape) else: raise Exception('Unsupported Action Space \'{}\''.format( type(env.action_space).__name__)) indices = range(num_actions) if env.spec.id in ['Pong-v0', 'Breakout-v0']: # Gym currently specifies 6 actions for pong and breakout when only 3 are needed # TODO: patch the environments instead num_actions = 3 indices = [1 ,2, 3] return num_actions, env.action_space, indices
def __init__(self, env, actrep=4, memlen=4, w=84, h=84, random_start=30): print('Creating wrapper around Gym Environment') self.env = env self.memlen = memlen self.W = w self.H = h self.actrep = actrep self.random_start = random_start if not isinstance(self.env.action_space, spaces.Discrete): raise ValueError("Unsupported environment's (%s) action space. Expected: %s, Got: %s." % (self.env.spec.id, self.env.action_space, spaces.Discrete)) self.action_space = list(range(self.env.action_space.n)) self.action_size = len(self.action_space) self.stacked_s = None for key in __custom_actions__: if key == self.env.spec.id: self.set_custom_actions(__custom_actions__[key]) break print('Environment: %s. Action space: %s' % (self.env.spec.id, self.action_space))
def __init__(self, config=ENV_CONFIG): self.config = config if config["discrete_actions"]: self.action_space = Discrete(10) else: self.action_space = Box(-1.0, 1.0, shape=(3,)) if config["use_depth_camera"]: self.observation_space = Box( -1.0, 1.0, shape=(config["x_res"], config["y_res"], 1)) else: self.observation_space = Box( 0.0, 255.0, shape=(config["x_res"], config["y_res"], 3)) self._spec = lambda: None self._spec.id = "Carla-v0" self.server_port = None self.server_process = None self.client = None self.num_steps = 0 self.prev_measurement = None
def configure(self, actions, frame_size, *, raw_array=False, max_step=-1): ''' Usage: self.super()._configure(actions, frame_size) ''' self.frame_size = frame_size self.raw_array = raw_array self.image = Image.new('RGB', self.frame_size, 'black') self.draw = ImageDraw.Draw(self.image) self.max_step = max_step self.step_cnt = 0 self.actions = actions self.action_space = spaces.Discrete(len(actions)) self.observation_space = spaces.Box(0., 255., (*self.frame_size, 3)) self.__configured = True
def __init__(self, dim=(14, 9)): self.dim = dim self.size = dim[0] * dim[1] self.max_blocks_per_turn = min(dim) self.target_difficulty = None self.target_pos = None # Observe the world self.observation_space = spaces.Tuple(( spaces.Box(0, num_block_type, shape=dim), spaces.Box(np.array([0, 0]), np.array(dim)), spaces.Discrete(num_directions), spaces.Box(0, 1, shape=(1)) )) # Actions allow the world to be populated. self.action_space = spaces.Discrete(num_actions)
def __init__(self): self._seed() self.viewer = None self.world = Box2D.b2World((0,0)) self.moon = None self.robots = [] self.prev_reward = None high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher self.observation_space = spaces.Box(-high, high) if self.continuous: # Action is two floats [main engine, left-right engines]. # Up-Down: -1.0..-0.5 fire down engine, +0.5..+1.0 fire up engine, -0.5..0.5 off # Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off self.action_space = spaces.Box(-1, +1, (2,)) else: # Nop, fire left engine, up engine, right engin, down self.action_space = spaces.Discrete(5) self.hard_reset()
def __init__(self): self.min_position = -1.2 self.max_position = 0.6 self.max_speed = 0.07 self.goal_position = 0.5 self.init_red = 0.0025 self.low = np.array([self.min_position, -self.max_speed]) self.high = np.array([self.max_position, self.max_speed]) self.viewer = None self.pro_action_space = spaces.Discrete(3) # Adversarial space is continuous on gravity here grav_change_abs = np.array([0.0025]) self.adv_action_space = spaces.Box(-grav_change_abs,grav_change_abs) self.observation_space = spaces.Box(self.low, self.high) self._seed() self.reset()
def __init__(self, room_length=3, num_rooms_per_side=2): assert room_length % 2 == 1, "room_length must be odd" assert room_length >= 3, "room_length must be greater than 3" assert num_rooms_per_side >= 1, "must have at least 1 room" self.room_length = room_length self.num_rooms_per_side = num_rooms_per_side # 0 = up, 1 = right, 2 = down, 3 = left self.action_space = spaces.Discrete(4) self.max_pos = room_length * num_rooms_per_side - 1 obs_space = (self.max_pos + 1, self.max_pos + 1, 1) self.observation_space = spaces.Box(low=0, high=1, shape=obs_space) self.goal_reward = 1 self.goal_state = [self.max_pos, self.max_pos] self._obs = np.zeros(obs_space) self._reset()
def is_discrete(env): return isinstance(env.action_space, Discrete)
def __init__(self, env): self.env = env if isinstance(env.observation_space, Discrete): self.state_size = 1 else: self.state_size = numel(env.observation_space.shape) if isinstance(self.env.action_space, Discrete): self.is_discrete = True self.action_size = env.action_space.n self.actions = np.arange(self.action_size) else: self.is_discrete = False self.action_size = numel(env.action_space.sample())
def __init__(self, env, keys): super(DiscreteToFixedKeysVNCActions, self).__init__(env) self._keys = keys self._generate_actions() self.action_space = spaces.Discrete(len(self._actions))
def __init__(self, alpha=0.02, show_number=False): self.action_space = spaces.Discrete(NUM_LOC) self.observation_space = spaces.Discrete(NUM_LOC) self.alpha = alpha self.set_start_mark('O') self.show_number = show_number self._seed() self._reset()
def __init__(self): # Angle at which to fail the episode self.theta_threshold_radians = 12 * 2 * math.pi / 360 self.x_threshold = 2.4 # Initializing Course : predfined Oval Course # ToDo: ???????????? Rad = 190.0 Poly = 16 self.Course = Walls(240, 50, 640-(50+Rad),50) for i in range(1, Poly): self.Course.addPoint(Rad*math.cos(-np.pi/2.0 + np.pi*i/Poly)+640-(50+Rad), Rad*math.sin(-np.pi/2.0 + np.pi*i/Poly)+50+Rad) self.Course.addPoint(240, 50+Rad*2) for i in range(1, Poly): self.Course.addPoint(Rad*math.cos(np.pi/2.0 + np.pi*i/Poly)+(50+Rad), Rad*math.sin(np.pi/2.0 + np.pi*i/Poly)+50+Rad) self.Course.addPoint(240,50) # Outr Boundary Box self.BBox = Walls(640, 479, 0, 479) self.BBox.addPoint(0,0) self.BBox.addPoint(640,0) self.BBox.addPoint(640,479) # Mono Sensor Line Follower self.A = Agent((640, 480), 240, 49) # Action Space : left wheel speed, right wheel speed # Observation Space : Detect Line (True, False) self.action_space = spaces.Box( np.array([-1.,-1.]), np.array([+1.,+1.])) self.observation_space = spaces.Discrete(1) self._seed() self.reset() self.viewer = None self.steps_beyond_done = None self._configure()
def __init__(self, gravity=9.8, masscart=1.0, masspole=0.1, length = .5, force_mag = 10.0): self.gravity = gravity self.masscart = masscart self.masspole = masspole self.total_mass = (self.masspole + self.masscart) self.length = length # actually half the pole's length self.polemass_length = (self.masspole * self.length) self.force_mag = force_mag self.tau = 0.02 # seconds between state updates # Angle at which to fail the episode self.theta_threshold_radians = 12 * 2 * math.pi / 360 self.x_threshold = 2.4 # Angle limit set to 2 * theta_threshold_radians so failing observation is still within bounds high = np.array([ self.x_threshold * 2, np.finfo(np.float32).max, self.theta_threshold_radians * 2, np.finfo(np.float32).max]) self.action_space = spaces.Discrete(2) self.observation_space = spaces.Box(-high, high) self._seed() self.viewer = None self.state = None self.steps_beyond_done = None
def __init__(self): pygame.init() self._seed() self.viewer = None self.world = Box2D.b2World() self.sea_surface = None self.falcon_rocket = None self.floating_drone_ship = None self.particles = [] self.prev_reward = None high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher self.observation_space = spaces.Box(-high, high) if self.continuous: # Action is two floats [main engine, left-right engines]. # Main engine: -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power. # Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off self.action_space = spaces.Box(-1, +1, (2,)) else: # Nop, fire left engine, main engine, right engine self.action_space = spaces.Discrete(4) self._reset()
def __init__(self): self.observation_space = spaces.Discrete(NUM_CLASSES) self.action_space = spaces.Tuple( tuple(spaces.Discrete(2) for _ in range(NUM_CLASSES)) ) # Total number of notes self.num_notes = 32 self.key = C_MAJOR_KEY
def __init__(self): super().__init__() self.lua_interface_path = os.path.join(package_directory, '../lua/soccer.lua') self.rom_file_path = os.path.join(package_directory, '../roms/soccer.nes') self.actions = [ 'R', 'UR', 'DR', 'B', 'URB', 'DRB', 'RB', 'AB', 'RAB', 'URAB', 'DRAB' ] self.action_space = spaces.Discrete(len(self.actions))
def __init__(self, **kwargs): utils.EzPickle.__init__(self) self.curr_seed = 0 self.screen = np.zeros((SCREEN_HEIGHT, SCREEN_WIDTH, 3), dtype=np.uint8) self.closed = False self.can_send_command = True self.command_cond = Condition() self.viewer = None self.reward = 0 episode_time_length_secs = 7 frame_skip = 5 fps = 60 self.episode_length = episode_time_length_secs * fps / frame_skip self.actions = [ 'U', 'D', 'L', 'R', 'UR', 'DR', 'URA', 'DRB', 'A', 'B', 'RB', 'RA'] self.action_space = spaces.Discrete(len(self.actions)) self.frame = 0 # for communication with emulator self.pipe_in = None self.pipe_out = None self.thread_incoming = None self.rom_file_path = None self.lua_interface_path = None self.emulator_started = False ## ---------- gym.Env methods -------------
def __init__(self, nS, nA, P, isd): self.P = P self.isd = isd self.lastaction=None # for rendering self.nS = nS self.nA = nA self.action_space = spaces.Discrete(self.nA) self.observation_space = spaces.Discrete(self.nS) self._seed() self._reset()
def __init__(self, natural=False): self.action_space = spaces.Discrete(2) self.observation_space = spaces.Tuple(( spaces.Discrete(32), spaces.Discrete(11), spaces.Discrete(2))) self._seed() # Flag to payout 1.5 on a "natural" blackjack win, like casino rules # Ref: http://www.bicyclecards.com/how-to-play/blackjack/ self.natural = natural # Start the first game self._reset() # Number of self.nA = 2
def __init__(self): super(ConstantEnv, self).__init__() self.action_space = spaces.Discrete(2) self.observation_space = spaces.Discrete(2)
def __init__(self): super(RandomInputConstantGoalEnv, self).__init__() self.action_space = spaces.Discrete(2) self.observation_space = spaces.Discrete(2)
def __init__(self): super(DependentEnv, self).__init__() self.action_space = spaces.Discrete(2) self.observation_space = spaces.Discrete(2)
def space_shape(space): """return the shape of tensor expected for a given space""" if isinstance(space, spaces.Discrete): return [space.n] else: return space.shape
def __init__(self, env, keys): super(DiscreteToFixedKeysVNCActions, self).__init__(env) self._keys = keys self._generate_actions() self.action_space = spaces.Discrete(len(self._actions)) self.key_state = FixedKeyState(keys)
def __init__(self, env, actions): super().__init__(env) acsp = self.env.action_space assert isinstance(acsp, Box), "action space not continuous" self.actions = np.array(actions) assert self.actions.shape[1:] == acsp.shape, "shape of actions does not match action space" self.action_space = Discrete(self.actions.shape[0])
def __init__(self, sha256list, random_sample=True, maxturns=3, output_path='evaded/blackbox/', cache=False): self.cache = cache self.available_sha256 = sha256list self.action_space = spaces.Discrete(len(ACTION_LOOKUP)) self.maxturns = maxturns self.feature_extractor = pefeatures.PEFeatureExtractor() self.random_sample = random_sample self.sample_iteration_index = 0 self.output_path = os.path.join( os.path.dirname( os.path.dirname( os.path.dirname( os.path.abspath(__file__)))), output_path) if not os.path.exists(output_path): os.makedirs(output_path) self.history = OrderedDict() self.samples = {} if self.cache: for sha256 in self.available_sha256: try: self.samples[sha256] = interface.fetch_file(self.sha256) except interface.FileRetrievalFailure: print("failed fetching file") continue # try a new sha256...this one can't be retrieved from storage self._reset()
def __init__(self, sha256list, random_sample=True, maxturns=3, output_path='evaded/score/', cache=False): self.cache = cache self.available_sha256 = sha256list self.action_space = spaces.Discrete(len(ACTION_LOOKUP)) self.maxturns = maxturns self.feature_extractor = pefeatures.PEFeatureExtractor() self.random_sample = random_sample self.sample_iteration_index = 0 self.output_path = os.path.join( os.path.dirname( os.path.dirname( os.path.dirname( os.path.abspath(__file__)))), output_path) if not os.path.exists(output_path): os.makedirs(output_path) self.history = OrderedDict() self.samples = {} if self.cache: for sha256 in self.available_sha256: try: self.bytez = interface.fetch_file(self.sha256) except interface.FileRetrievalFailure: print("failed fetching file") continue # try a new sha256...this one can't be retrieved from storage self._reset() # self.original_score, self.bytez and self.observation_space get set here