我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用csv.DictReader()。
def generate_info(): tickets_archive_path = ROOT_DIR_PATH.joinpath('tickets.zip') ensure_data_file(tickets_archive_path, DATA_FILE_INFO['TICKETS_URL']) with zipfile.ZipFile(str(tickets_archive_path)) as zf: for name in zf.namelist(): stem, ext = os.path.splitext(name) if ext != '.csv': continue with zf.open(name) as f: # Zipfile only opens file in binary mode, but csv only accepts # text files, so we need to wrap this. # See <https://stackoverflow.com/questions/5627954>. textfile = io.TextIOWrapper(f, encoding='utf8', newline='') for row in csv.DictReader(textfile): yield Registration(row)
def get_credential_report(iam_client): resp1 = iam_client.generate_credential_report() if resp1['State'] == 'COMPLETE' : try: response = iam_client.get_credential_report() credential_report_csv = response['Content'] # print(credential_report_csv) reader = csv.DictReader(credential_report_csv.splitlines()) # print(reader.fieldnames) credential_report = [] for row in reader: credential_report.append(row) return(credential_report) except ClientError as e: print("Unknown error getting Report: " + e.message) else: sleep(2) return get_credential_report(iam_client) # Query the account's password policy for the password age. Return that number of days
def get_iam_credential_report(self): report = None while report == None: try: report = self.iam_client.get_credential_report() except botocore.exceptions.ClientError as e: if 'ReportNotPresent' in e.message: self.iam_client.generate_credential_report() else: raise e time.sleep(5) document = StringIO.StringIO(report['Content']) reader = csv.DictReader(document) report_rows = [] for row in reader: report_rows.append(row) return report_rows
def main(): if len(sys.argv) < 2: sys.stderr.write("USAGE: %s measurement\n" % sys.argv[0]) sys.exit(1) path = sys.argv[1] with open(os.path.join(path, "metadata.json")) as f: metadata = json.load(f) start = date(metadata["start"][:-1]) end = date(metadata["start"][:-1]) print('open measurement "%s" from "%s" to "%s"', metadata["name"], start, end) for service in metadata["services"]: print('open service "%s"' % service["name"]) with open(os.path.join(path, service["filename"])) as csvfile: r = csv.DictReader(csvfile, dialect=csv.excel_tab) for row in r: print(row["time"])
def insertar(archivo): """ Verificar si existe el archivo CSV """ if not os.path.isfile(archivo): raise Exception("No existe el archivo {}".format(archivo)) """ Insertar registros del archivo CSV a la base de datos """ contador = 0 with basededatos.inegi() as bd: with open(archivo, newline='') as contenedor: lector = csv.DictReader(contenedor) for renglon in lector: codigo = renglon['Código'].strip() titulo = renglon['Título'].strip() descripcion = renglon['Descripción'].strip() bd.cursor.execute(""" INSERT INTO scian_sectores (codigo, titulo, descripcion) VALUES (%s, %s, %s) """, (codigo, titulo, descripcion,)) contador = contador + 1 print(" Se insertaron {} sectores.".format(contador))
def insertar(archivo): """ Verificar si existe el archivo CSV """ if not os.path.isfile(archivo): raise Exception("No existe el archivo {}".format(archivo)) """ Insertar registros del archivo CSV a la base de datos """ contador = 0 with basededatos.inegi() as bd: with open(archivo, newline='') as contenedor: lector = csv.DictReader(contenedor) for renglon in lector: codigo = renglon['Código'].strip() titulo = renglon['Título'].strip() descripcion = renglon['Descripción'].strip() bd.cursor.execute(""" INSERT INTO scian_subramas (rama, codigo, titulo, descripcion) VALUES (%s, %s, %s, %s) """, (scian3ramas.consultar_codigo(codigo[:4]), codigo, titulo, descripcion,)) contador = contador + 1 print(" Se insertaron {} subramas.".format(contador))
def insertar(archivo): """ Verificar si existe el archivo CSV """ if not os.path.isfile(archivo): raise Exception("No existe el archivo {}".format(archivo)) """ Insertar registros del archivo CSV a la base de datos """ contador = 0 with basededatos.inegi() as bd: with open(archivo, newline='') as contenedor: lector = csv.DictReader(contenedor) for renglon in lector: codigo = renglon['Código'].strip() titulo = renglon['Título'].strip() descripcion = renglon['Descripción'].strip() bd.cursor.execute(""" INSERT INTO scian_ramas (subsector, codigo, titulo, descripcion) VALUES (%s, %s, %s, %s) """, (scian2subsectores.consultar_codigo(codigo[:3]), codigo, titulo, descripcion,)) contador = contador + 1 print(" Se insertaron {} ramas.".format(contador))
def proc(csv_na,con): dicts=[] for i in range(0,len(con)): dicts.append(dict()) sum=0 f=csv.DictReader(open(csv_na)) for rec in f: rec['single']='1' #print(csv_na,rec['clickTime']) label=int(rec['label']) for i in range(0,len(con)): k=rec[con[i][0]]+'#'+rec[con[i][1]] if dicts[i].__contains__(k): dicts[i][k]=np.add(dicts[i][k],[label,1]) else: dicts[i][k]=[label,1] sum+=1 return dicts,sum
def get_iterator(self): tweet_parser = TweetParser() if self.compression == 'bz2': self.mode = binary_mode(self.mode) csv_handle = bz2.open(self.filepath, self.mode, encoding=self.encoding) elif self.compression == 'gzip': self.mode = binary_mode(self.mode) csv_handle = gzip.open(self.filepath, self.mode, encoding=self.encoding) else: csv_handle = open(self.filepath, self.mode, encoding=self.encoding) for count, tweet in enumerate(csv.DictReader(csv_handle)): if self.limit < count+1 and self.limit != 0: csv_handle.close() return elif tweet_parser.tweet_passes_filter(self.filter, tweet) \ and tweet_parser.tweet_passes_custom_filter_list(self.custom_filters, tweet): if self.should_strip: yield tweet_parser.strip_tweet(self.keep_fields, tweet) else: yield dict(tweet) csv_handle.close()
def post(inx,k=3): out = inx.replace('.csv','_sub.csv') #if os.path.exists(out): # return fo = open(out,'w') last = '' pred = {} for c,row in enumerate(csv.DictReader(open(inx))): if last != '' and row['orderid'] != last: pred = ','.join(sort_value(pred)[:3]) fo.write('%s,%s\n'%(last,pred)) pred = {} yp = float(row['prob']) pred[row['candidate_loc']] = yp last = row['orderid'] if c % 10000000 == 0 and c>0: print c#, 'log loss', score/(c+1), 'm12 apk', apks/dc pred = ','.join(sort_value(pred)[:k]) fo.write('%s,%s\n'%(row['orderid'],pred)) fo.close()
def post_ffm(inx): out = inx.replace('.csv','_sub.csv') idx = "comps/mobike/sol_carl/data/va_20-24.id" last = '' pred = {} f = open(inx) fo = open(out,'w') for c,row in enumerate(csv.DictReader(open(idx))): line = f.readline() row['prob'] = line.strip() if last != '' and row['orderid'] != last: pred = ','.join(sort_value(pred)[:3]) fo.write('%s,%s\n'%(last,pred)) pred = {} yp = float(row['prob']) pred[row['candidate_loc']] = yp last = row['orderid'] if c % 10000000 == 0 and c>0: print c#, 'log loss', score/(c+1), 'm12 apk', apks/dc pred = ','.join(sort_value(pred)[:3]) fo.write('%s,%s\n'%(row['orderid'],pred)) fo.close() f.close()
def sample(name,ratio=0.05): oname = name.replace('.csv','_sample.csv') if os.path.exists(oname): return num = int(1/ratio) fo = open(oname,'w') f = open(name) fo.write(f.readline()) dic = {} for row in csv.DictReader(open('comps/mobike/sol_carl/data/va_label.csv')): dic[row['orderid']] = row['geohashed_end_loc'] for c,line in enumerate(f): xx = line.split(',') orderid,loc,label = 0,1,2 idx = hash(xx[orderid])%100000 if idx%num==0:#random()<ratio: xx[label] = str(int(xx[loc]==dic[xx[orderid]])) line = ",".join(xx) fo.write(line) if c%10000000 == 0: print(name,c) f.close() fo.close()
def build_hash_to_coord(paths): if os.path.exists("comps/mobike/sol_carl/data/h2c.p") and os.path.exists("comps/mobike/sol_carl/data/c2h.p"): return h2c,c2h = {},{} for path in paths: for c,row in enumerate(csv.DictReader(open(path))): for tag in ["geohashed_end_loc","geohashed_start_loc"]: if tag not in row: continue h = row[tag] if h not in h2c: coord = str_coord(decode(h)) h2c[h] = coord #lat,lon = int(lat+0.5),int(lon+0.5) if coord not in c2h: c2h[coord] = set() c2h[coord].add(h) if c>0 and c%100000 == 0: print(path,c) print(len(h2c),len(c2h)) pickle.dump(h2c,open("comps/mobike/sol_carl/data/h2c.p","wb")) pickle.dump(c2h,open("comps/mobike/sol_carl/data/c2h.p","wb"))
def mean_target_rate(name,out,idcol,ycol): if os.path.exists(out): return pickle.load(open(out,'rb')) yc,cc = defaultdict(float),defaultdict(float) for c,row in enumerate(csv.DictReader(open(name))): y = float(row[ycol]) for i in row: if i in [idcol,ycol]: continue v = "%s-%s"%(i,row[i]) yc[v] += y cc[v] += 1.0 if c>0 and c%100000 == 0: print("rows %d len_cc %d"%(c,len(cc))) for i in yc: yc[i] = yc[i]/cc[i] pickle.dump(yc,open(out,'wb')) return yc
def get_csv_reader(input): # csv package does not support unicode input = str(input) # Special case: detect single-column files. # This check assumes that our only valid delimiters are commas and tabs. firstLine = input.split('\n')[0] if not ('\t' in firstLine or ',' in firstLine) \ or len(input.splitlines()) == 1: dialect = 'excel' else: # Take a data sample to determine dialect, but # don't include incomplete last line sample = '' sampleSize = 0 while len(sample) == 0: sampleSize += 5000 sample = '\n'.join(input[:sampleSize].splitlines()[:-1]) dialect = csv.Sniffer().sniff(sample) dialect.skipinitialspace = True return csv.DictReader(input.splitlines(), dialect=dialect)
def loadRecord(line): """ ????csv?? """ input_line=StringIO.StringIO(line) #row=unicodecsv.reader(input_line, encoding="utf-8") #return row.next() #reader=csv.DictReader(input_line,fieldnames=["id","qid1","qid2","question1","question2","is_duplicate"]) reader=csv.reader(input_line) return reader.next() #data=[] #for row in reader: # print row # data.append([unicode(cell,"utf-8") for cell in row]) #return data[0] #return reader.next() #raw_data=sc.textFile(train_file_path).map(loadRecord) #print raw_data.take(10)
def handle(self, *args, **options): job_uuid = options['job_uuid'] csv_filename = options['csv_file'] key_column = options['key_column'] skip_columns = options['skip_columns'] skip_columns = skip_columns.split(',') if skip_columns is not None else [] skip_columns.append('id') try: job = AnalysisJob.objects.get(pk=job_uuid) except (AnalysisJob.DoesNotExist, ValueError, KeyError): print('WARNING: Tried to update overall_scores for invalid job {} ' 'from file {}'.format(job_uuid, csv_filename)) return with open(csv_filename, 'r') as csv_file: reader = csv.DictReader(csv_file) results = {} for row in reader: key_column_value = row.pop(key_column) metric = self.clean_metric_dict(row.copy(), skip_columns=skip_columns) results[key_column_value] = metric job.overall_scores = results job.save() self.stdout.write('{}: Loaded overall_scores from {}'.format(job, csv_filename))
def scan(self): self.import_file = self.config["instance"]["import-file"] self.bits = self.import_lister.get(self.import_file) self.data_file = Path(self.bits["metadata"]).with_suffix(".data") self.metadata = ConfigParser(inline_comment_prefixes=None) self.metadata.read(str(self.bits["metadata"])) if self.data_file.exists(): with self.data_file.open(newline="") as csvfile: data_reader = csv.DictReader(csvfile, dialect=ImportCsvDialect) for row in data_reader: location = float(row["location"]) row["location"] = location self.data[location] = row if len(self.data) == 0: self.add_row(0.0, mark="START") self.add_row(self.bits["length_secs"], mark="END") self.update_order() self.clean()
def _(): """ load language file into language table """ import os import csv f_name = os.path.join( request.folder, os.path.join('private', 'language-codes.csv')) with open(f_name) as lang_codes: reader = csv.DictReader(lang_codes) for row in reader: db.languages.insert( language_tag=row['alpha2'], english_name=row['English'] )
def export_speakers_csv(speakers_csv, guidebook_csv): speakers_reader = csv.DictReader(speakers_csv) writer = csv.DictWriter(guidebook_csv, fieldnames=[ "Name", "Sub-Title (i.e. Location, Table/Booth, or Title/Sponsorship Level)", "Description (Optional)", "Location/Room", "Image (Optional)", ]) writer.writeheader() for speaker in speakers_reader: writer.writerow({ "Name": speaker['Name'], "Sub-Title (i.e. Location, Table/Booth, or Title/Sponsorship Level)": "", "Description (Optional)": speaker['Biography'], "Location/Room": "", "Image (Optional)": "", # TODO })
def main(): tests = load_test_data('./test-data.edited') tests = dict([(f[0], f[1]) for f in tests]) import sys, csv from collections import OrderedDict with open(sys.argv[1]) as reader, open(sys.argv[1] + '.fixed.csv', 'w') as writer: rows = csv.DictReader(reader) ordered_fieldnames = OrderedDict([(f, '') for f in rows.fieldnames]) dw = csv.DictWriter(writer, fieldnames=ordered_fieldnames) dw.writeheader() for row in rows: row['%s_gold_reason' % MATCH_RESULT] = reason % tests[row['url1']] row['%s_gold' % MATCH_RESULT] = pos_res dw.writerow(row) pass
def load_metadata(md_path): data = [] with open(md_path, 'r') as f: reader = csv.DictReader(f) for row in reader: # convert str to float row['l'] = float(row['l']) row['w'] = float(row['w']) row['h'] = float(row['h']) if 'rear_gps_l' in row.keys(): # release 3 format row['rear_gps_l'] = float(row['rear_gps_l']) row['rear_gps_w'] = float(row['rear_gps_w']) row['rear_gps_h'] = float(row['rear_gps_h']) else: # release2 format row['rear_gps_l'] = float(row['gps_l']) row['rear_gps_w'] = float(row['gps_w']) row['rear_gps_h'] = float(row['gps_h']) data.append(row) return data
def process_radar_csv_file(filename): with open(filename) as csvfile: reader = csv.DictReader(csvfile) csv_rows = [row for row in reader] print "%s radar records" % len(csv_rows) n_limit_rows = 1000000 radar_obss = [] for i, row in enumerate(csv_rows): if i > n_limit_rows - 1: break time = float(row['timestamp']) x, y, z, vx, vy = float(row['x']), float(row['y']), float(row['z']), float(row['vx']), float(row['vy']) obs = RadarObservation(time, x, y, z, vx, vy) #print obs radar_obss.append(obs) return radar_obss
def normalize_data(): data = [] with open(folder + filename, 'rb') as csvfile: spamreader = csv.DictReader(csvfile) for row in spamreader: for key in keys_to_remove: del row[key] row['Sex'] = gender_to_number[row['Sex']] row['Embarked'] = port_to_number[row['Embarked']] row['Age'] = 0 if row['Age'] == "" else float(row['Age']) row['Parch'] = 0 if row['Parch'] == "" else int(row['Parch']) row['Pclass'] = 3 if row['Pclass'] == "" else int(row['Pclass']) row['Survived'] = int(row['Survived']) row['SibSp'] = 0 if row['SibSp'] == "" else int(row['SibSp']) row['Cabin'] = 0 if row['Cabin'] == "" else 1 data.append(row) return data
def __init__(self, file_path, type='text', **kwargs): self._file_path = file_path self._type = type self._kwargs = kwargs self._file_handler = open(file_path, 'r') if type == 'json_line': # pre-compile json path, raise exception if not exists self._id_path_parser = parse(kwargs['id_path']) elif type == 'csv': self._id_column = kwargs['id_column'] # raise exception if not exists delimiter = kwargs['delimiter'] if 'delimiter' in kwargs else ',' quote_char = kwargs['quote_char'] if 'quote_char' in kwargs else '"' quoting = kwargs['quoting'] if 'quoting' in kwargs else csv.QUOTE_MINIMAL column_names = kwargs['column_names'] if 'column_names' in kwargs else None self._csv_reader = csv.DictReader( self._file_handler, delimiter=delimiter, quotechar=quote_char, quoting=quoting, fieldnames=column_names) else: # text self._id_prefix = hashlib.md5(file_path).hexdigest()[:6]
def read_data(user_id=None, email=None): filename = "data.csv" with open(filename, "r") as csvfile: reader = csv.DictReader(csvfile) items = [] unknown_user_id = None unknown_email = None for row in reader: if user_id is not None: if int(user_id) == int(row.get("id")): return row else: unknown_user_id = user_id if email is not None: if email == row.get("email"): return row else: unknown_email = email if unknown_user_id is not None: return "User id {user_id} not found".format(user_id=user_id) if unknown_email is not None: return "Email {email} not found".format(email=email) return None
def read_data(user_id=None, email=None): filename = file_item_path with open(filename, "r") as csvfile: reader = csv.DictReader(csvfile) items = [] unknown_user_id = None unknown_email = None for row in reader: if user_id is not None: if int(user_id) == int(row.get("id")): return row else: unknown_user_id = user_id if email is not None: if email == row.get("email"): return row else: unknown_email = email if unknown_user_id is not None: return "User id {user_id} not found".format(user_id=user_id) if unknown_email is not None: return "Email {email} not found".format(email=email) return None
def get_user_data(self, user_id=None, email=None): filename = file_item_path with open(filename, "r") as csvfile: reader = csv.DictReader(csvfile) items = [] unknown_user_id = None unknown_email = None for row in reader: if user_id is not None: if int(user_id) == int(row.get("id")): return row else: unknown_user_id = user_id if email is not None: if email == row.get("email"): return row else: unknown_email = email if unknown_user_id is not None: print("User id {user_id} not found".format(user_id=user_id)) if unknown_email is not None: print("Email {email} not found".format(email=email)) return None
def sample_to_run_data_mapping(samples_dir): ''' return dict each key is string "sample_id" each value is a list of tuples ("library", "barcode") ''' runs_file = samples_dir + "runs.tsv" sr_mapping = {} with open(runs_file) as tsv: for row in csv.DictReader(tsv, delimiter="\t"): sample = row["sample_id"] rb_pair = (row["run_name"], row["barcode_id"]) if sample not in sr_mapping: sr_mapping[sample] = [] sr_mapping[sample].append(rb_pair) return sr_mapping
def sample_to_metadata_mapping(samples_dir): ''' return dict each key is string "sample_id" each value is a list of metadata ordered as ["strain", "sample_id", "collect_date", "country", "division", "location"] ''' metadata_file = samples_dir + "samples.tsv" sm_mapping = {} with open(metadata_file) as tsv: for row in csv.DictReader(tsv, delimiter="\t"): sample = row["sample_id"] metadata = [row["strain"], row["sample_id"], row["collection_date"], row["country"], row["division"], row["location"]] sm_mapping[sample] = metadata return sm_mapping
def get_gtfs_infos(gtfs): gtfs_infos = {} gtfs_infos["stop_points_count"] = 0 gtfs_infos["stop_areas_count"] = 0 gtfs_infos["routes_count"] = 0 with zipfile.ZipFile(gtfs) as zf: reader = csv.DictReader(zf.open("stops.txt")) for r in reader: if r["location_type"] == "1": gtfs_infos["stop_areas_count"] += 1 else: gtfs_infos["stop_points_count"] += 1 reader = csv.DictReader(zf.open("routes.txt")) for r in reader: gtfs_infos["routes_count"] += 1 return gtfs_infos
def get_filters(filepath): """Extract the filters from the file with description of filters in ENA as a dictionary with the key being the filter id and the value a dictionary with related results, type of filter, filter description filepath: path with csv with filter description """ filters = {} with open(filepath, "r") as f: reader = csv.DictReader(f, delimiter=';') for row in reader: filter_id = row["Filter Column"] filters.setdefault(filter_id, {}) filters[filter_id]["results"] = row["Result"].split(", ") filters[filter_id]["type"] = row["Type"] filters[filter_id]["description"] = ''.join(row["Description"]) return filters
def summary_table_to_bed_long(sample_summary_table, output_file, filename_suffix = 'long', min_frequency = 1): ''' Write out the low frequency variants NOTE: See 'check_for_IGV_long_regions_snapshot' function in run_parser.py UPDATE: Naima wants long snapshots for ALL variants from now on. ''' import csv print('Find low frequency variants...') print('input file: {0}'.format(sample_summary_table)) print('output file: {0}'.format(output_file)) with open(sample_summary_table, 'r') as tsvin, open(output_file, 'w') as bedout: reader = csv.DictReader(tsvin, delimiter='\t') writer = csv.writer(bedout, delimiter='\t') for row in reader: if float(row['Frequency']) < min_frequency: print(row['Frequency']) filename = make_snapshot_filename(summary_dict = row, filename_suffix = filename_suffix) entry = [row['Chrom'], row['Position'], row['Position'], filename] print(entry) writer.writerow(entry)
def process_file(infile, outfile, precision=1, format='csv', pages=None): reader = csv.DictReader(infile) char_height_dict = get_chars_hashed_by_yoffset(reader, precision, pages=pages) # page numbers come back as strings #pages_to_read = ['1'] words_by_array = coalesce_into_words(char_height_dict) word_list = merge_word_arrays(words_by_array) if format=='csv': to_csv(word_list, outfile) elif format=='json': to_json(word_list, outfile) return 1
def read(self, tsv_file): """ Reads the rows from the designated file using the configured fields. Arguments: tsv_file: a file-like object to read the data from Returns: records(list): a list of the records cat to read_as_cls """ file_reader = csv.DictReader( tsv_file, **PEARSON_DIALECT_OPTIONS ) valid_rows, invalid_rows = [], [] for row in file_reader: try: valid_rows.append(self.map_row(row)) except InvalidTsvRowException: invalid_rows.append(row) return (valid_rows, invalid_rows)
def parse_exam_grade_adjustments(self, csv_reader): """ Parses all rows of grade adjustment info from a CSV and yields each ProctoredExamGrade object with its associated grade adjustment row from the CSV Args: csv_reader (csv.DictReader): A DictReader instance Returns: tuple(ProctoredExamGrade, RowProps): A tuple containing a ProctoredExamGrade and its associated parsed CSV row """ parsed_row_dict = {} for row in csv_reader: parsed_row = self.parse_and_validate_row(row) parsed_row_dict[parsed_row.exam_grade_id] = parsed_row exam_grade_query = ProctoredExamGrade.objects.filter(id__in=parsed_row_dict.keys()) if exam_grade_query.count() < len(parsed_row_dict): bad_exam_grade_ids = set(parsed_row_dict.keys()) - set(exam_grade_query.values_list('id', flat=True)) raise ParsingError( 'Some exam grade IDs do not match any ProctoredExamGrade records: {}'.format(bad_exam_grade_ids) ) for exam_grade in exam_grade_query.all(): yield exam_grade, parsed_row_dict[exam_grade.id]
def load_dataset(filename): data = None try: with open(filename, encoding=ENCODING) as fh: reader = csv.DictReader(fh) if reader.fieldnames != FIELDNAMES: print(reader.fieldnames) print(FIELDNAMES) error = 'ERROR: Incorrect headers in: {}'.format(filename) raise FNCException(error) else: data = list(reader) if data is None: error = 'ERROR: No data found in: {}'.format(filename) raise FNCException(error) except FileNotFoundError: error = "ERROR: Could not find file: {}".format(filename) raise FNCException(error) return data
def get_arp_table(): """ Get ARP table from /proc/net/arp """ with open('/proc/net/arp') as arpt: names = [ 'IP address', 'HW type', 'Flags', 'HW address', 'Mask', 'Device' ] # arp 1.88, net-tools 1.60 reader = csv.DictReader( arpt, fieldnames=names, skipinitialspace=True, delimiter=' ') next(reader) # Skip header. return [block for block in reader]
def _get_records(self): with tf.gfile.Open(self._labels_filename) as label_file: csv_reader = csv.DictReader(label_file, fieldnames=self._columns) images_gt_boxes = {} first = True for csv_line in csv_reader: if first and self._with_header: first = False continue csv_line = dict(csv_line) label_dict = self._normalize_csv_line(csv_line) image_id = label_dict.pop('image_id') images_gt_boxes.setdefault(image_id, []).append(label_dict) return images_gt_boxes
def populate(): with open('sample-user-patches.csv') as csvfile: rows = csv.DictReader(csvfile) for row in rows: person = Person.objects.get(id=row['person_id']) patch = IssueResolverPatch.objects.create( content_object=person, jurisdiction_id=row['jurisdiction_id'], status=row['status'], old_value=row['old_value'], new_value=row['new_value'], category=row['category'], alert=row['alert'], note=row['note'], source=row['source'], reporter_name=row['reporter_name'], reporter_email=row['reporter_email'], applied_by=row['applied_by'] ) patch.save()
def save_companies(self): """ Receives path to the dataset file and create a Company object for each row of each file. It creates the related activity when needed. """ skip = ('main_activity', 'secondary_activity') keys = tuple(f.name for f in Company._meta.fields if f not in skip) with lzma.open(self.path, mode='rt', encoding='utf-8') as file_handler: for row in csv.DictReader(file_handler): main, secondary = self.save_activities(row) filtered = {k: v for k, v in row.items() if k in keys} obj = Company.objects.create(**self.serialize(filtered)) for activity in main: obj.main_activity.add(activity) for activity in secondary: obj.secondary_activity.add(activity) obj.save() self.count += 1 self.print_count(Company, count=self.count)
def weeks_of_tickets(datetime, tzutc, AnalyzedAgileTicket): """A bunch of tickets.""" from dateutil.parser import parse parsed = [] default = datetime(1979, 8, 15, 0, 0, 0, tzinfo=tzutc) current_path = path.dirname(path.abspath(__file__)) csv_file = path.join(current_path, 'data', 'weeks_of_tickets.csv') count = 1 for row in csv.DictReader(open(csv_file, 'r')): t = AnalyzedAgileTicket( key="FOO-{}".format(count), committed=dict(state="committed", entered_at=parse(row['committed'], default=default)), started=dict(state="started", entered_at=parse(row['started'], default=default)), ended=dict(state="ended", entered_at=parse(row['ended'], default=default)) ) parsed.append(t) count += 1 return parsed
def read_and_wrangle(src, dest): wf = dest.open('w') wcsv = csv.DictWriter(wf, fieldnames=FINAL_HEADERS) wcsv.writeheader() # only 2011.csv has windows-1252 instead of ascii encoding, # but we open all files as windows-1252 just to be safe with src.open("r", encoding='windows-1252') as rf: records = csv.DictReader(rf) for i, row in enumerate(records): row = strip_record(row) newrow = wrangle_record(row) wcsv.writerow(newrow) # a little status checker if i % 10000 == 1: print("...wrote row #", i) # done writing file print("Wrangled", i, "rows and saved to", dest) wf.close()