我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用django.conf.settings.DATA_DIR。
def handle(self, *args, **options): processed_data_path = os.path.join( settings.DATA_DIR, 'processed_berkeley_faculty.csv') fieldnames = ['first', 'last', 'department', 'year', 'title_category', 'title', 'gross_salary'] with open(processed_data_path, 'w') as processed_data_file: writer = csv.DictWriter( processed_data_file, fieldnames=fieldnames) writer.writeheader() for person in Person.objects.exclude( directory_record__department_obj=None): row = {} row['first'] = person.first row['last'] = person.last row['department'] = person.directory_record.department_obj\ .canonical for salaryrecord in person.salaryrecord_set.all(): row['year'] = salaryrecord.year row['title_category'] = salaryrecord.title_category row['title'] = salaryrecord.title row['gross_salary'] = salaryrecord.gross writer.writerow(row)
def remove_research_files(sender, instance, using, **kwargs): if instance.sealed: backup = Research() backup.researcher = instance.researcher backup.status = 5 backup.sealed = True backup.start_time = instance.start_time backup.finish_time = instance.finish_time backup.script_name = instance.script_name backup.save() os.rename( instance.get_folder(), os.path.join( settings.DATA_DIR, "research", str( backup.id))) else: try: rmtree(instance.get_folder()) except BaseException: pass
def handle(self, *args, **kwargs): with open(join(settings.DATA_DIR, 'config.yaml'), 'w') as f: yaml.dump(settings.CONFIG, f, width=80, indent=4, default_flow_style=False)
def build_titles(self): """ Takes a CSV of titles that determines whether they correspond to a faculty position that we're interested in analyzing. """ titles = {} titles_file_path = os.path.join( settings.DATA_DIR, 'directory', 'titles.csv') with open(titles_file_path, 'r') as titles_file: reader = csv.DictReader(titles_file) for row in reader: if row['keep'] == '1': titles[row['db_title']] = row return titles
def handle(self, *args, **options): titles_dictionary = self.build_titles() clean_file_path = os.path.join( settings.DATA_DIR, 'berkeley_faculty.csv') with open(clean_file_path, 'w') as clean_file: writer = csv.DictWriter( clean_file, fieldnames=self.get_fieldnames()) writer.writeheader() merged_file_path = os.path.join(settings.DATA_DIR, 'merged.csv') with open(merged_file_path, 'r') as raw_file: reader = csv.DictReader(raw_file) for row in reader: # Only Berkeley records if 'BERKELEY' not in row['location'].upper(): continue # Only positions we care about category = titles_dictionary.get(row['title'], None) if category: row['title_category'] = category['type'] row['title_qualifier'] = category['qualifier'] row['title_year_code'] = category['year_code'] else: continue writer.writerow(row)
def handle(self, *args, **options): SalaryRecord.objects.all().delete() clean_file_path = os.path.join( settings.DATA_DIR, 'berkeley_faculty.csv') fields = [field.name for field in SalaryRecord._meta.get_fields() if field.name != 'id'] mapping = {field: field for field in fields} c = CopyMapping( SalaryRecord, clean_file_path, mapping ) c.save()
def build_cache(self): """ Returns dict with {searched_name: row} """ cache = {} cache_path = os.path.join( settings.DATA_DIR, 'directory', 'directory.csv') with open(cache_path, 'r') as cache_file: reader = csv.DictReader(cache_file) for row in reader: cache[row['searched_name']] = row return cache
def handle(self, *args, **options): print('Delete Existing Data.') Eat.objects.all().delete() Restaurant.objects.all().delete() CongressMember.objects.all().delete() print('Load Data.') geocodes_file_path = path.join(settings.DATA_DIR, 'geocodes.json') data_file_path = path.join(settings.DATA_DIR, 'data.json') geocodes = json.load(open(geocodes_file_path)) data = json.load(open(data_file_path)) print('Save Objects to Database.') for man_name, party, restaurant_name, address, kind, price, memo in data: if geocodes[address]: lat, lng = geocodes[address] else: lat, lng = None, None member, _ = CongressMember.objects.get_or_create( name=man_name, party=party, ) restaurant, _ = Restaurant.objects.get_or_create( name=restaurant_name, address=address, lat=lat, lng=lng, ) eat = Eat( member=member, restaurant=restaurant, price=price, ) eat.save()
def get_folder(self): path = os.path.join( settings.DATA_DIR, "datasets", self.dataset.text_id, "models", self.text_id) if not os.path.exists(path): os.makedirs(path) return path
def get_visual_folder(self): path = os.path.join( settings.DATA_DIR, "datasets", self.dataset.text_id, "models", self.text_id, "visual") if not os.path.exists(path): os.makedirs(path) return path
def get_dist_folder(self): path = os.path.join( settings.DATA_DIR, "datasets", self.dataset.text_id, "models", self.text_id, "dist") if not os.path.exists(path): os.makedirs(path) return path
def get_folder(self): if self.type == "segmentation": path = os.path.join( settings.DATA_DIR, "datasets", self.dataset.text_id, "segmentation") else: path = os.path.join(settings.DATA_DIR, "assessment", str(self.id)) if not os.path.exists(path): os.makedirs(path) return path
def read_log(self): try: log_file_name = os.path.join( settings.DATA_DIR, "datasets", self.text_id, "log.txt") with open(log_file_name, "r") as f: return f.read() except BaseException: return "Datased is reloading"
def get_folder(self): path = os.path.join(settings.DATA_DIR, "datasets", self.text_id) if not os.path.exists(path): os.makedirs(path) return path
def get_folder(self): path = os.path.join(settings.DATA_DIR, "research", str(self.id)) if not os.path.exists(path): os.makedirs(path) return path
def get_pic_folder(self): path = os.path.join(settings.DATA_DIR, "research", str(self.id), "pic") if not os.path.exists(path): os.makedirs(path) return path
def fix_space_foreign_keys(apps, schema_editor): models = ('Area', 'Stair', 'LineObstacle', 'Obstacle') Space = apps.get_model('mapdata', 'Space') MAP_PATH = os.path.join(settings.DATA_DIR, 'map') PACKAGE_PATHS = [os.path.join(MAP_PATH, dirname) for dirname in os.listdir(MAP_PATH)] PACKAGE_PATHS = [path for path in PACKAGE_PATHS if os.path.isdir(path)] spaces = {} for space in Space.objects.all(): spaces.setdefault(space.section.name + '_' + space.level, []).append(space) for model_name in models: model = apps.get_model('mapdata', model_name) orig_dir_names = [model._meta.default_related_name.lower()] if model.__name__ == 'Area': orig_dir_names = ['arealocations', 'stuffedareas'] orig_objects = [] for package_path in PACKAGE_PATHS: for orig_dir_name in orig_dir_names: dir_name = os.path.join(package_path, orig_dir_name) if not os.path.isdir(dir_name): continue for filename in os.listdir(dir_name): abs_filename = os.path.join(dir_name, filename) if not filename.endswith('.json') or not os.path.isfile(abs_filename): continue obj = json.load(open(abs_filename)) obj['name'] = filename[:-5] obj['geometry'] = shape(obj['geometry']) orig_objects.append(obj) matches = {} for obj in model.objects.all().order_by('id' if hasattr(model, 'id') else 'locationslug_ptr_id'): for i, orig_obj in enumerate(orig_objects): if obj.geometry.almost_equals(orig_obj['geometry']): matches.setdefault(i, []).append(obj) break for orig_i, objects in matches.items(): orig_obj = orig_objects[orig_i] if '-' in orig_obj['level']: splitted = orig_obj['level'].split('-') possible_spaces = spaces[splitted[0] + '_upper'] + spaces[splitted[1] + '_lower'] else: possible_spaces = spaces[orig_obj['level'] + '_'] possible_spaces = [space for space in possible_spaces if space.geometry.intersects(orig_obj['geometry'])] if len(objects) == len(possible_spaces): pass # nice elif len(objects) > len(possible_spaces): pass # well, whatever else: def compare(space): return space.geometry.intersection(orig_obj['geometry']).area possible_spaces.sort(key=compare, reverse=True) for i, obj in enumerate(objects[:len(possible_spaces)]): obj.space = possible_spaces[i] obj.save()
def handle(self, *args, **options): merged_file_path = os.path.join(settings.DATA_DIR, 'merged.csv') with open(merged_file_path, 'w') as merged_file: writer = csv.DictWriter(merged_file, fieldnames=self.get_fieldnames()) writer.writeheader() for year in range(2006, 2016): print('Processing {}'.format(year)) raw_file_path = os.path.join(settings.DATA_DIR, 'salary', 'salary_{}.csv'.format(year)) with open(raw_file_path, 'r') as raw_file: reader = csv.DictReader(raw_file) for row in reader: # First name and last name are distinct fields for # 2013 onwards if year > 2012: # Don't want starred names if '*' in row['last']: continue else: # Don't want starred names if '*' in row['name'] or '---' in row['name']: continue names = row['name'].split(',') row['last'] = names[0] row['first'] = ' '.join(names[1:]) row['year'] = str(year) row = self.clean_row(row) # Attempt to deal with middle names first_names = [name.replace('.', '') for name in row['first'].split(' ')] if len(first_names) > 1: row['first'] = first_names[0] row['middle'] = ' '.join(first_names[1:]) row['name'] = '{}, {}'.format( row['last'], row['first']) writer.writerow(row)
def reload(self): self.prepare_log() self.log("Loading dataset " + self.text_id + "...") Term.objects.filter(dataset=self).delete() Document.objects.filter(dataset=self).delete() Modality.objects.filter(dataset=self).delete() from models.models import ArtmModel ArtmModel.objects.filter(dataset=self).delete() try: meta_file = os.path.join(self.get_folder(), "meta", "meta.json") with open(meta_file) as f: self.docs_info = json.load(f) except BaseException as ex: self.log("WARNING! Wasn't able to load meta.json") self.log(str(ex)) self.time_provided = False self.docs_info = {} try: preprocessing_params = json.loads(self.preprocessing_params) self.log("Preprocessing params:" + str(preprocessing_params)) except BaseException: preprocessing_params = {} self.log("Warning! Failed to load preprocessing parameters.") # Preprocessing custom_vocab = False if "parse" in preprocessing_params: self.preprocess_parse(preprocessing_params["parse"]) if "filter" in preprocessing_params: self.preprocess_filter(preprocessing_params["filter"]) custom_vocab = True if "custom_vocab" in preprocessing_params and preprocessing_params[ "custom_vocab"]: self.log("Will use custom vocab.txt") custom_vocab = True self.create_batches() self.gather_dictionary(custom_vocab=custom_vocab) self.load_documents() self.log("Loaded " + str(self.documents_count) + " documents.") # Creating folder for models model_path = os.path.join( settings.DATA_DIR, "datasets", self.text_id, "models") if not os.path.exists(model_path): os.makedirs(model_path) self.log("Dataset " + self.text_id + " loaded.") self.creation_time = datetime.now() self.status = 0 self.save()