我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用copy.deepcopy()。
def load_previous(self, path=None): """Load previous copy of config from disk. In normal usage you don't need to call this method directly - it is called automatically at object initialization. :param path: File path from which to load the previous config. If `None`, config is loaded from the default location. If `path` is specified, subsequent `save()` calls will write to the same path. """ self.path = path or self.path with open(self.path) as f: self._prev_dict = json.load(f) for k, v in copy.deepcopy(self._prev_dict).items(): if k not in self: self[k] = v
def handle_results_collected(self, signal, sender, results, context, **kw): name_value_pairs = list(map(self.to_name_value_pair, results)) self.ensure_unique_names(name_value_pairs) def get_data(*parts): d = self.data for p in parts: d.setdefault(p, {}) d = d[p] return d def handle_result(name, result): d = get_data(sender.id_, sender.type_name) current = d.get(name, None) if current is None or current.value < result: # TODO: once serialization, no need to deepcopy d[name] = Result(value=result, context=copy.deepcopy(context)) for name, result in name_value_pairs: handle_result(name, result)
def __exit__(self, exc_type, exc_val, exc_tb): self.before_exit() signal_responses = results_collected.send_robust( sender=self, results=self.get_results_to_send(), context=copy.deepcopy(context.current.data)) if exc_type is None: for (receiver, response) in signal_responses: if isinstance(response, BaseException): orig_tb = ''.join( traceback.format_tb(response.__traceback__)) error_msg = '{}{}: {}'.format( orig_tb, type(response).__name__, str(response) ) if hasattr(response, 'clone_with_more_info'): new_exc = response.clone_with_more_info( orig_tb=orig_tb) else: new_exc = type(response)(error_msg) raise new_exc
def list_kuryr_opts(): """Return a list of oslo_config options available in Kuryr service. Each element of the list is a tuple. The first element is the name of the group under which the list of elements in the second element will be registered. A group name of None corresponds to the [DEFAULT] group in config files. This function is also discoverable via the 'kuryr' entry point under the 'oslo_config.opts' namespace. The purpose of this is to allow tools like the Oslo sample config file generator to discover the options exposed to users by Kuryr. :returns: a list of (group_name, opts) tuples """ return ([(k, copy.deepcopy(o)) for k, o in _kuryr_k8s_opts] + lib_opts.list_kuryr_opts() + _options.list_opts())
def get_item_label(self, item): """ retrieve item label property :param item: single item in JSON format :return: label property """ label = EMPTY_RESPONSE item_type = get_json_property(item, TYPE) if item_type == 'smartfield': custom_response_id_to_label_map = self.audit_custom_response_id_to_label_map() conditional_id = get_json_property(item, 'options', 'condition') if conditional_id: label = copy.deepcopy(smartfield_conditional_id_to_statement_map.get(conditional_id)) or EMPTY_RESPONSE for value in get_json_property(item, 'options', 'values'): label += '|' if value in standard_response_id_map.keys(): label += standard_response_id_map[value] elif value in custom_response_id_to_label_map.keys(): label += custom_response_id_to_label_map[value] else: label += str(value) label += '|' return label else: return get_json_property(item, LABEL)
def test_file_load(self): """ Load the simple filters by file """ entry = copy(self.template_entry) fb = NNTPFilterBase(paths=join(self.var_dir, 'simple.nrf')) # Our hash will start at 0 (Zero) assert len(fb._regex_hash) == 0 # But now we meet our score entry['subject'] = 'A great video called "blah.avi"' assert fb.blacklist(**entry) == False entry['subject'] = 'A malicious file because it is "blah.avi.exe"' assert fb.blacklist(**entry) == True # Now load the directory; it should just find the same nrf file. fbd = NNTPFilterBase(paths=self.var_dir)
def test_scoring_image_files(self): """ Test that we correctly score image files """ sf = NNTPSimpleFilter() entry = copy(self.template_entry) # Expected Score score = 15 # Test against video files: for e in [ 'jpg', 'jpeg', 'gif', 'png', 'bmp' ]: entry['subject'] = 'What.A.Great.Image (1/1) ' +\ '"what.a.great.image.%s" Yenc (1/1)' % e assert sf.score(**entry) == score
def _get_tags(data): ret = {} for toplist, toplevel in data.get('firewall', {}).iteritems(): for audit_dict in toplevel: for audit_id, audit_data in audit_dict.iteritems(): tags_dict = audit_data.get('data', {}) tag = tags_dict.pop('tag') if tag not in ret: ret[tag] = [] formatted_data = copy.deepcopy(tags_dict) formatted_data['type'] = toplist formatted_data['tag'] = tag formatted_data['module'] = 'firewall' formatted_data.update(audit_data) formatted_data.pop('data') ret[tag].append(formatted_data) return ret
def test_update_text(self): with open(os.path.join(self.FIXTURES_DIR, 'eyes_in_the_skies.json')) as f: final_data = json.load(f) original_text = final_data['RTR']['cards'][0]['originalText'] final_text = final_data['RTR']['cards'][0]['text'] # Copy the data and munge it into its original state. original_data = copy.deepcopy(final_data) original_data['RTR']['cards'][0]['text'] = original_text # Import the original data. parse_data(original_data, ['RTR']) eyes_in_the_skies = Card.objects.first() self.assertEqual(eyes_in_the_skies.text, original_text) # Import the final, updated data. parse_data(final_data, ['RTR']) eyes_in_the_skies.refresh_from_db() self.assertEqual(eyes_in_the_skies.text, final_text)
def test_update_types(self): with open(os.path.join(self.FIXTURES_DIR, 'jackal_pup.json')) as f: final_data = json.load(f) # Copy the data and munge the types. original_data = copy.deepcopy(final_data) original_subtype = 'Hound' original_data['TMP']['cards'][0]['subtypes'] = [original_subtype] # Import the original data. parse_data(original_data, ['TMP']) jackal_pup = Card.objects.first() self.assertEqual(jackal_pup.subtypes.count(), 1) self.assertEqual(jackal_pup.subtypes.first().name, original_subtype) # Import the final, updated data. parse_data(final_data, ['TMP']) jackal_pup.refresh_from_db() self.assertEqual(jackal_pup.subtypes.count(), 1) self.assertEqual(jackal_pup.subtypes.first().name, 'Jackal') # The Hound subtype has been deleted. self.assertFalse(CardSubtype.objects.filter(name=original_subtype).exists())
def test_update_loyalty(self): """ Simulates the upgrade process from version 0.2 to version 0.4. """ with open(os.path.join(self.FIXTURES_DIR, 'vraska_the_unseen.json')) as f: final_data = json.load(f) # Copy the data and munge it to remove the loyalty. original_data = copy.deepcopy(final_data) del original_data['RTR']['cards'][0]['loyalty'] # Import the original data. parse_data(original_data, ['RTR']) vraska = Card.objects.first() self.assertIsNone(vraska.loyalty) # Import the final, updated data. parse_data(final_data, ['RTR']) vraska.refresh_from_db() self.assertEqual(vraska.loyalty, 5)
def test_pytest_report_header(): old_settings = deepcopy(settings) settings['USE_IMAGEMAGICK'] = True settings['USE_PERCEPTUALDIFF'] = True assert 'ImageMagick' in pytest_report_header(None) settings['USE_IMAGEMAGICK'] = False assert 'perceptualdiff' in pytest_report_header(None) settings['USE_PERCEPTUALDIFF'] = False with pytest.raises(Exception) as e: pytest_report_header(None) settings.update(old_settings)
def test_create_router_failure(self): router_info = copy.deepcopy(fake_router_object) router_info['router'].update({'status': 'ACTIVE', 'id': fake_router_uuid}) context = mock.Mock(current=fake_router_object) response = self._create_rest_response(requests.codes.bad_gateway) with mock.patch.object(ac_rest.RestClient, 'process_request', return_value=response): with mock.patch.object(L3_NAT_db_mixin, 'create_router', return_value=fake_router_db): acl3router = HuaweiACL3RouterPlugin() self.assertRaises(ml2_exc.MechanismDriverError, acl3router.create_router, context, router_info)
def test_add_router_interface_key_error_exception(self): router_info = copy.deepcopy(fake_router_object) router_info['router'].update({'status': 'ACTIVE', 'id': fake_router_uuid}) context = mock.Mock(current=fake_router_object) interface_info = {'port_id': fake_port_id} del interface_info['port_id'] with mock.patch.object(L3_NAT_db_mixin, 'get_router', return_value=fake_router_db): with mock.patch.object(L3_NAT_with_dvr_db_mixin, 'add_router_interface', return_value=interface_info): acl3router = HuaweiACL3RouterPlugin() self.assertRaises(KeyError, acl3router.add_router_interface, context, fake_router_db['id'], interface_info)
def get_problem_instance(pid, tid): """ Returns the problem instance dictionary that can be displayed to the user. Args: pid: the problem id tid: the team id Returns: The problem instance """ problem = deepcopy(get_problem(pid=pid, tid=tid)) instance = get_instance_data(pid, tid) problem['solves'] = api.stats.get_problem_solves(pid=pid) problem.pop("instances") problem.update(instance) return problem
def setType(props, prop_id, _type, typeCheck=True): ''' change the property type for the element in the props sequence with id of prop_id This method returns a copy of props (does not modify the props input) ''' if typeCheck: __typeCheck(props) if not properties.getTypeMap().has_key(_type): raise BadValue('Type "'+_type+'" does not exist') if _type == getType(props, prop_id, False): return props prop_idx = __getPropIdx(props, prop_id) ret_props = copy.deepcopy(props) if props[prop_idx].value._t._k == CORBA.tk_sequence: ret_props[prop_idx].value._t._d = (props[prop_idx].value._t._d[0], tvCode(_type), props[prop_idx].value._t._d[2]) else: ret_props[prop_idx].value = properties.to_tc_value(props[prop_idx].value,_type) return ret_props
def sendChangedPropertiesEvent(self): eventable_ids = [] for prop_id in self._component._props.keys(): prop_def = self._component._props.getPropDef(prop_id) if prop_def.isSendEventChange(): newValue = self._component._props[prop_id] try: oldValue = self._last_property_event_state[prop_id] self._last_property_event_state[prop_id] = copy.deepcopy(newValue) except KeyError: self._component._log.debug("Issuing event for the first time %s", prop_id) self._last_property_event_state[prop_id] = copy.deepcopy(newValue) eventable_ids.append(prop_id) else: if prop_def.compareValues(oldValue, newValue): self._component._log.debug("Issuing event for %s (%s != %s)", prop_id, oldValue, newValue) eventable_ids.append(prop_id) self._component._log.debug("Eventing for properties %s", eventable_ids) self.sendPropertiesEvent(eventable_ids)
def prepare(self): gt_labels = self.load_labels() if self.flipped: print('Appending horizontally-flipped training examples ...') gt_labels_cp = copy.deepcopy(gt_labels) for idx in range(len(gt_labels_cp)): gt_labels_cp[idx]['flipped'] = True gt_labels_cp[idx]['label'] = gt_labels_cp[idx]['label'][:, ::-1, :] for i in xrange(self.cell_size): for j in xrange(self.cell_size): if gt_labels_cp[idx]['label'][i, j, 0] == 1: gt_labels_cp[idx]['label'][i, j, 1] = self.image_size - 1 - gt_labels_cp[idx]['label'][i, j, 1] gt_labels += gt_labels_cp np.random.shuffle(gt_labels) self.gt_labels = gt_labels return gt_labels
def clustering_plot_func(chart, sample_properties, sample_data, plot_func, args=[], kwargs={}): if len(sample_properties['genomes']) > 1: return None analysis = sample_data.analysis if analysis is None: return None new_charts = [] for clustering_key, clustering in analysis.clusterings.iteritems(): kwargs['clustering'] = clustering kwargs['original_cluster_sizes'] = sample_data.original_cluster_sizes[clustering_key] kwargs['diff_expr'] = analysis.differential_expression[clustering_key] new_chart = plot_func(copy.deepcopy(chart), *args, **kwargs) if new_chart is not None: new_chart['filters'] = {ws_gex_constants.CLUSTERS_FILTER_TITLE: clustering.description} new_charts.append(new_chart) return new_charts
def build_charts(sample_properties, chart_dicts, sample_data, module=None): modules = [module, globals()] if module else [globals()] filters = make_chart_filters(sample_properties, sample_data.analysis) charts = [] for chart_dict in chart_dicts: chart_dict = copy.deepcopy(chart_dict) function = chart_dict.pop('function') for module in modules: f = module.get(function) if f is not None: break kwargs = chart_dict.pop('kwargs', {}) new_chart_obj = f(chart_dict, sample_properties, sample_data, **kwargs) if new_chart_obj is None: continue new_charts = new_chart_obj if isinstance(new_chart_obj, list) else [new_chart_obj] charts.extend(new_charts) return charts, filters
def build_web_summary_json(sample_properties, sample_data, pipeline): view = copy.deepcopy(sample_properties) metrics, alarms, charts, all_prefixes = get_constants_for_pipeline(pipeline) tables, alarms = build_tables(sample_properties, metrics, alarms, sample_data, all_prefixes=all_prefixes) if tables: view['tables'] = tables if alarms: view['alarms'] = alarms charts, filters = build_charts(sample_properties, charts, sample_data=sample_data) if charts: view['charts'] = charts if filters: view['filters'] = filters # Selected metrics that the web summary template needs info = build_info_dict(sample_properties, sample_data, pipeline) if info: view['info'] = info return view
def collapse_recursive(self, buf, level): # print 'level:\t', level merged = self.collapse(self.buffer[level], buf) if level + 1 >= len(self.buffer): self.buffer.append([]) self.b += 1 using_tmp_merge = True if len(self.buffer[level + 1]) == 0: self.buffer[level + 1] = copy.deepcopy(merged) using_tmp_merge = False if not using_tmp_merge: return self.collapse_recursive(merged, level + 1)
def _deepcopy_tuple(x, memo): y = [] for a in x: y.append(deepcopy(a, memo)) d = id(x) try: return memo[d] except KeyError: pass for i in range(len(x)): if x[i] is not y[i]: y = tuple(y) break else: y = x memo[d] = y return y
def _deepcopy_inst(x, memo): if hasattr(x, '__deepcopy__'): return x.__deepcopy__(memo) if hasattr(x, '__getinitargs__'): args = x.__getinitargs__() args = deepcopy(args, memo) y = x.__class__(*args) else: y = _EmptyClass() y.__class__ = x.__class__ memo[id(x)] = y if hasattr(x, '__getstate__'): state = x.__getstate__() else: state = x.__dict__ state = deepcopy(state, memo) if hasattr(y, '__setstate__'): y.__setstate__(state) else: y.__dict__.update(state) return y
def __deepcopy__(self, memo): """Return a deep copy of a set; used by copy module.""" # This pre-creates the result and inserts it in the memo # early, in case the deep copy recurses into another reference # to this same set. A set can't be an element of itself, but # it can certainly contain an object that has a reference to # itself. from copy import deepcopy result = self.__class__() memo[id(self)] = result data = result._data value = True for elt in self: data[deepcopy(elt, memo)] = value return result # Standard set operations: union, intersection, both differences. # Each has an operator version (e.g. __or__, invoked with |) and a # method version (e.g. union). # Subtle: Each pair requires distinct code so that the outcome is # correct when the type of other isn't suitable. For example, if # we did "union = __or__" instead, then Set().union(3) would return # NotImplemented instead of raising TypeError (albeit that *why* it # raises TypeError as-is is also a bit subtle).
def _update_entire_object(self): if self.__class__.primary_key in self._dirty_attributes.keys() or self._model_unique_id is None: new_object_info = deepcopy(self._info) try: if not self._new_object_needs_primary_key: del(new_object_info[self.__class__.primary_key]) except Exception: pass log.debug("Creating a new {0:s} object".format(self.__class__.__name__)) ret = self._cb.api_json_request(self.__class__._new_object_http_method, self.urlobject, data={self.info_key: new_object_info}) else: log.debug("Updating {0:s} with unique ID {1:s}".format(self.__class__.__name__, str(self._model_unique_id))) ret = self._cb.api_json_request(self.__class__._change_object_http_method, self._build_api_request_uri(), data={self.info_key: self._info}) return self._refresh_if_needed(ret)
def _update_object(self): if self.__class__.primary_key in self._dirty_attributes.keys() or self._model_unique_id is None: new_object_info = deepcopy(self._info) try: if not self._new_object_needs_primary_key: del(new_object_info[self.__class__.primary_key]) except Exception: pass log.debug("Creating a new {0:s} object".format(self.__class__.__name__)) ret = self._cb.api_json_request(self.__class__._new_object_http_method, self.urlobject, data=new_object_info) else: log.debug("Updating {0:s} with unique ID {1:s}".format(self.__class__.__name__, str(self._model_unique_id))) http_method = self.__class__._change_object_http_method ret = self._cb.api_json_request(http_method,self._build_api_request_uri(http_method=http_method), data=self._info) return self._refresh_if_needed(ret)
def nested_derivations(self, style): # type: (Style) -> List[Style] options = [('BreakBeforeBraces', 'Custom')] nstyles = [] for optionname, value in options: optdef = styledef_option(self.styledefinition, optionname) # We can only use this nested option if the clang version in use supports it. if optdef is None: continue if value not in option_configs(optdef): continue if style.get(optionname) != value: nstyle = Style(copy.deepcopy(style)) set_option(nstyle, optionname, value) nstyles.append(nstyle) return nstyles
def shift_dataset(m,boundarynoise): if boundarynoise==0: return m nonzero_rows=np.where(m.any(axis=1))[0] small_m=copy.deepcopy(m) small_m=small_m[nonzero_rows,:] small_m=small_m[:,nonzero_rows] print small_m print 'roll' small_m=np.roll(small_m,boundarynoise,axis=0) print small_m print 'roll2' small_m=np.roll(small_m,boundarynoise,axis=1) print small_m outm=np.zeros(m.shape) for i_idx in range(len(nonzero_rows)): i=nonzero_rows[i_idx] for j_idx in range(i_idx,len(nonzero_rows)): j=nonzero_rows[j_idx] outm[i,j]=small_m[i_idx,j_idx] outm[j,i]=outm[i,j] return outm
def test_get_pipeline_id(mock_get_properties, mock_get_details, mock_boto3): """Tests getting the pipeline ID from boto3""" test_pipelines = [{ 'pipelineIdList': [{ "name": "Test Pipeline", "id": "1234" }, { "name": "Other", "id": "5678" }], "hasMoreResults": False }] generated = {"project": "test"} properties = copy.deepcopy(TEST_PROPERTIES) mock_get_details.return_value.data = generated mock_get_properties.return_value = properties mock_boto3.return_value.get_paginator.return_value.paginate.return_value = test_pipelines dp = AWSDataPipeline(app='test_app', env='test_env', region='us-east-1', prop_path='other') dp.get_pipeline_id() assert dp.pipeline_id == '1234'
def _merge_mapping(a, b): """ MERGE TWO MAPPINGS, a TAKES PRECEDENCE """ for name, b_details in b.items(): a_details = a[literal_field(name)] if a_details.properties and not a_details.type: a_details.type = "object" if b_details.properties and not b_details.type: b_details.type = "object" if a_details: a_details.type = _merge_type[a_details.type][b_details.type] if b_details.type in ES_STRUCT: _merge_mapping(a_details.properties, b_details.properties) else: a[literal_field(name)] = deepcopy(b_details) return a
def floyd(matrix): """ Floyd's algorithm, straight from a textbook. Floyd's algorithm transforms a weight matrix into a matrix of shortest paths, such that the shortest path from node M to node N is equal to matrix[m][n] :return: An array of shortest-path distance calculations. """ n = len(matrix) spaths = deepcopy(matrix) for k in range(n): for i in range(n): for j in range(n): if spaths[i][k] + spaths[k][j] < spaths[i][j]: spaths[i][j] = spaths[i][k] + spaths[k][j] return spaths
def configure_logging(debug=False, verbose=True, stderr=True): config = copy.deepcopy(LOG_CONFIG) for handler in config["handlers"].values(): if verbose: handler["level"] = "INFO" if debug: handler["level"] = "DEBUG" if verbose: config["handlers"]["stderr"]["formatter"] = "verbose" if debug: config["handlers"]["stderr"]["formatter"] = "debug" if stderr: config["loggers"][LOG_NAMESPACE]["handlers"].append("stderr") logging.config.dictConfig(config)
def agent_place(self, state): next_state = copy.deepcopy(state) agent = next_state.get_first_obj_of_class("agent") agent.set_attribute("has_block", 0) next_x = agent.get_attribute("x") + agent.get_attribute("dx") next_y = agent.get_attribute("y") + agent.get_attribute("dy") if self._is_lava_state_action(next_state, "forward"): lava_remove = 0 for l in next_state.get_objects_of_class("lava"): if next_x == l.get_attribute("x") and next_y == l.get_attribute("y"): break lava_remove += 1 next_state.get_objects_of_class("lava").pop(lava_remove) else: new_block = {"x": next_x, "y": next_y} new_block_obj = self._make_oomdp_objs_from_list_of_dict([new_block], "block") next_state.get_objects_of_class("block").append(new_block_obj[0]) return next_state
def agent_pickup(self, state): ''' Args: state (TaxiState) ''' next_state = copy.deepcopy(state) agent = next_state.get_first_obj_of_class("agent") # update = False if agent.get_attribute("has_passenger") == 0: # If the agent does not have a passenger. for i, passenger in enumerate(next_state.get_objects_of_class("passenger")): if agent.get_attribute("x") == passenger.get_attribute("x") and agent.get_attribute("y") == passenger.get_attribute("y"): # Pick up passenger at agent location. agent.set_attribute("has_passenger", 1) passenger.set_attribute("in_taxi", 1) return next_state
def agent_dropoff(self, state): ''' Args: state (TaxiState) Returns: (TaxiState) ''' next_state = copy.deepcopy(state) # Get Agent, Walls, Passengers. agent = next_state.get_first_obj_of_class("agent") # agent = OOMDPObject(attributes=agent_att, name="agent") passengers = next_state.get_objects_of_class("passenger") if agent.get_attribute("has_passenger") == 1: # Update if the agent has a passenger. for i, passenger in enumerate(passengers): if passenger.get_attribute("in_taxi") == 1: # Drop off the passenger. passengers[i].set_attribute("in_taxi", 0) agent.set_attribute("has_passenger", 0) return next_state
def sample_models(self, nsamples): b = self.b_search samples = [] choice_hists = [] for _ in xrange(nsamples): bk = copy.deepcopy(b) bk.initialize(self.in_d, Scope()) hist = [] while( not bk.is_specified() ): name, vals = bk.get_choices() #print(name, vals) assert len(vals) > 1 choice_i = np.random.randint(0, len(vals)) bk.choose(choice_i) hist.append(choice_i) # keep the sampled model once specified. samples.append(bk) choice_hists.append(hist) return (samples, choice_hists)
def resource_map(): ''' Dynamically generate a map of resources that will be managed for a single hook execution. ''' resource_map = deepcopy(BASE_RESOURCE_MAP) return resource_map
def points(self): """List[tuple(int, int)]: List of all points to define regions around.""" return copy.deepcopy(self._points) # list is mutable, don't allow user to alter it
def _load(self): """Load cached settings from JSON file `self._filepath`.""" self._nosave = True d = {} with open(self._filepath, 'rb') as file_obj: for key, value in json.load(file_obj, encoding='utf-8').items(): d[key] = value self.update(d) self._original = deepcopy(d) self._nosave = False
def _pop_token(self, lineno: int, token_value: str) -> Token: tokensline = self._lines[lineno - 1] # Pop the first token with the same name in the same line for t in tokensline: if t.name != 'STRING': line_value = t.value else: if t.value[0] == 'f' and t.value[1] in ('"', "'"): # fstring: token identify as STRING but they parse into the AST as a # collection of nodes so the token_value is different. To find the # real token position we'll search inside the fstring token value. tok_subpos = t.value.find(token_value) if tok_subpos != -1: # We don't remove the fstring token from the line in this case; other # nodes could match different parts of it newtok = deepcopy(t) newtok.start.col = t.start.col + tok_subpos return newtok raise TokenNotFoundException("Could not find token '{}' inside f-string '{}'" .format(token_value, t.value)) else: # normal string; they include the single or double quotes so we liteval line_value = literal_eval(t.value) if str(line_value) == str(token_value): tokensline.remove(t) return t raise TokenNotFoundException("Token named '{}' not found in line {}" .format(token_value, lineno))
def __deepcopy__(self, memo): cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result result.floor = deepcopy(self.floor, memo) if self.content == None: result.content = None else: result.content = dict() for k, v in self.content.items(): if k not in ['move', MEMORY]: result.content[k] = deepcopy(v, memo) return result
def __deepcopy__(self, memo): cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result for k, v in self.__dict__.items(): if k != 'ai': # Don't copy the ai setattr(result, k, deepcopy(v, memo)) return result
def log_state(state): if config.interactive: print(state) input() if config.save_replay: replay['states'].append(copy.deepcopy(state))
def __init__(self, data_, n_clusters): self.position = [] if not self.position: for i in range(len(data_)): self.position.append(random.randrange(n_clusters)) print(self.position) self.velocity = [0 for clusterPoint in self.position] self.S = [0 for clusterPoint in self.position] self.best = deepcopy(self.position) self.bestfit = 0
def test_blacklist_bad_files(self): """ Blacklist testing of bad files """ sf = NNTPSimpleFilter() entry = copy(self.template_entry) # hash table always starts empty and is populated on demand assert len(sf._regex_hash) == 0 # Test against bad file extensions: for e in [ 'exe', 'pif', 'application', 'gadget', 'msi', 'msp', 'com', 'scr', 'hta', 'cpl', 'msc', 'jar', 'bat', 'vb', 'vbs', # Encrypted VBE Script file 'vbe', # Javascript (Windows can execute these outside of browsers) # so treat it as bad 'js', 'jse', # Windows Script File 'ws', 'wsf', # Windows PowerShell Scripts 'ps1', 'ps1xml', 'ps2', 'ps2xml', 'psc1', 'psc2', # Monad Scripts (later renamed to Powershell) 'msh', 'msh1', 'msh1xml', 'msh2', 'msh2xml', # Windows Explorer Command file 'scf', # A link to a program on your computer (usually # populated with some malicious content) 'lnk', # A text file used by AutoRun 'inf', # A windows registry file 'reg', ]: entry['subject'] = 'What.A.Great.Show (1/1) ' +\ '"what.a.great.show.%s" Yenc (1/1)' % e assert sf.blacklist(**entry) == True
def test_scoring_video_files(self): """ Test that we correctly score video files """ sf = NNTPSimpleFilter() entry = copy(self.template_entry) # Expected Score score = 25 # Test against video files: for e in [ 'avi', 'mpeg', 'mpg', 'mp4', 'mov', 'mkv', 'asf', 'ogg', 'iso', 'rm' ]: entry['subject'] = 'What.A.Great.Show (1/1) ' +\ '"what.a.great.show.%s" Yenc (1/1)' % e assert sf.score(**entry) == score # now test that we can support .??? extensions after # the initial one for i in range(1000): entry['subject'] = 'What.A.Great.Show (1/1) ' +\ '"what.a.great.show.%s.%.3d" Yenc (1/1)' % (e, i) assert sf.score(**entry) == score
def test_scoring_compressed_files(self): """ Test that we correctly score compressed files """ sf = NNTPSimpleFilter() entry = copy(self.template_entry) # Expected Score score = 25 # Test against video files: for e in [ 'rar', '7z', 'zip', 'tgz', 'tar.gz']: entry['subject'] = 'What.A.Great.Archive (1/1) ' +\ '"what.a.great.archive.%s" Yenc (1/1)' % e assert sf.score(**entry) == score # now test that we can support .??? extensions after # the initial one for i in range(1000): entry['subject'] = 'What.A.Great.Archive (1/1) ' +\ '"what.a.great.archive.%s.%.3d" Yenc (1/1)' % (e, i) assert sf.score(**entry) == score # Test Sub Rar and Zip files (R?? and Z??) for e in [ 'r', 'z' ]: for i in range(100): entry['subject'] = 'What.A.Great.Archive (1/1) ' +\ '"what.a.great.archive.%s%.2d" Yenc (1/1)' % (e, i) assert sf.score(**entry) == score for ii in range(1000): entry['subject'] = 'What.A.Great.Archive (1/1) ' +\ '"what.a.great.archive.%s%.2d.%.3d" Yenc (1/1)' % ( e, i, ii) assert sf.score(**entry) == score