我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用csv.DictWriter()。
def jsondict2csv(json_file, csv_file): key_set = set() dict_list = list() try: with open(json_file,'r') as f: for line in f: dic = json.loads(line) key_set.update(dic.keys()) dict_list.append(dic) keys = list(sorted(list(key_set), key = natural_key)) with open(csv_file, 'w') as f: w = csv.DictWriter(f, keys, delimiter=';', lineterminator='\n') w.writeheader() w.writerows(dict_list) except IOError: print('could not convert to csv-file. ')
def save_users_and_groups_to_csv(user_data, csv_output_filepath): """ Creates a CSV file with exported user data :param user_data: The exported user data :param csv_output_filepath: The output file to save :return: None """ full_output_path = os.path.join(os.getcwd(), csv_output_filepath) with open(full_output_path, 'wb') as f: fields = ['email', 'lastname', 'firstname', 'groups'] w = csv.DictWriter(f, fields) w.writeheader() for key, val in sorted(user_data.items()): val['groups'] = ", ".join(val['groups'][0::2]) row = {'email': key} row.update(val) w.writerow(row)
def _make_csv_writer(self): """ :return: """ self._buffer = StringIO() self._bytes_written = 0 now = datetime.now() self._out_csv = open(self.log_folder + '/' + now.strftime('%Y%m%d_%H%M%S.csv'.format(self.make_random(6))), 'w') logging.warning("Writing to {} ({} bytes)".format(self._out_csv.name, self.max_bytes)) self._out_writer = csv.DictWriter(self._buffer, fieldnames=self.fieldnames, restval=None) self._out_writer.writeheader() self._out_csv.write(self._buffer.getvalue()) self._reset_buffer() self.writerow({'vid': self.vin})
def write(self, data): if not data: return write_header = False if self.is_empty(): write_header = True mode = 'a+b' if six.PY3: mode = 'a+t' with open(self._filename, mode) as csv_file: dict_writer = csv.DictWriter(csv_file, data[0].keys()) if write_header: dict_writer.writeheader() dict_writer.writerows(self._encode_data(data))
def backup(self): dsecs = self.data_file.stat().st_mtime meta_file = Path(self.bits["metadata"]) msecs = meta_file.stat().st_mtime secs = max(dsecs, msecs) suffix = filename_iso_time(secs) backup_data = self.data_file.with_name("{}-{}{}".format(self.data_file.stem, suffix, self.data_file.suffix)) backup_meta = meta_file.with_name("{}-{}{}".format(meta_file.stem, suffix, meta_file.suffix)) with backup_data.open("w", newline='') as csvfile: fieldnames = ['location', 'lyric', 'mark', 'track-change', "chord-change", "chord-selection", "note"] writer = csv.DictWriter(csvfile, fieldnames=fieldnames, extrasaction='ignore', dialect=ImportCsvDialect) writer.writeheader() for location in self.order: writer.writerow(self.data[location]) with backup_meta.open('w') as meta: self.metadata.write(meta)
def run(argv): gencode_filepath = get_generated_path('sites/genes/gencode-{}.gtf.gz'.format(genes_version)) genes_filepath = common_filepaths['genes'] if not os.path.exists(genes_filepath): print('genes-{}.bed will be stored at {!r}'.format(genes_version, genes_filepath)) if not os.path.exists(gencode_filepath): make_basedir(gencode_filepath) wget.download( url="ftp://ftp.sanger.ac.uk/pub/gencode/Gencode_human/release_25/GRCh37_mapping/gencode.v25lift37.annotation.gtf.gz", out=gencode_filepath ) print('') genes = get_all_genes(gencode_filepath) genes = dedup_ensg(genes) genes = dedup_symbol(genes) make_basedir(genes_filepath) with open(genes_filepath, 'w') as f: writer = csv.DictWriter(f, delimiter='\t', fieldnames='chrom start end symbol ensg'.split(), lineterminator='\n') writer.writerows(genes) else: print("gencode is at {!r}".format(genes_filepath))
def print_as_csv(phenolist): phenolist = copy.deepcopy(phenolist) all_columns = sorted(set(col for pheno in phenolist for col in pheno)) w = csv.DictWriter(sys.stdout, all_columns) w.writeheader() for pheno in phenolist: for k in pheno: if isinstance(pheno[k], (int, float)): pass elif isinstance(pheno[k], str): pass elif isinstance(pheno[k], list) and len(pheno[k])>0 and all(isinstance(v,str) for v in pheno[k]) and all('|' not in v for v in pheno[k]): pheno[k] = '|'.join(pheno[k]) else: pheno[k] = 'json:' + json.dumps(pheno[k]) w.writerow(pheno)
def write_members(self): """Write the members CSV.""" for group in itervalues(self.groups): filename = group['email'] + '-membership.csv' if self.datestamp: filename = self.append_datestamp(filename) path = os.path.join(self.path, filename) logger.debug('Writing %s...', path) with open(path, 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=[ 'kind', 'id', 'email', 'role', 'type', 'status', 'etag']) writer.writeheader() for member in group['members']: writer.writerow(member)
def write_feed(file_obj): """ Writes feed contents info provided file object """ writer = csv.DictWriter(file_obj, ATTRIBUTES, dialect=csv.excel_tab) writer.writeheader() categories = Category.objects.all() discounts = Sale.objects.all().prefetch_related('products', 'categories') attributes_dict = {a.slug: a.pk for a in ProductAttribute.objects.all()} attribute_values_dict = {smart_text(a.pk): smart_text(a) for a in AttributeChoiceValue.objects.all()} category_paths = {} current_site = Site.objects.get_current() for item in get_feed_items(): item_data = item_attributes(item, categories, category_paths, current_site, discounts, attributes_dict, attribute_values_dict) writer.writerow(item_data)
def export_speakers_csv(speakers_csv, guidebook_csv): speakers_reader = csv.DictReader(speakers_csv) writer = csv.DictWriter(guidebook_csv, fieldnames=[ "Name", "Sub-Title (i.e. Location, Table/Booth, or Title/Sponsorship Level)", "Description (Optional)", "Location/Room", "Image (Optional)", ]) writer.writeheader() for speaker in speakers_reader: writer.writerow({ "Name": speaker['Name'], "Sub-Title (i.e. Location, Table/Booth, or Title/Sponsorship Level)": "", "Description (Optional)": speaker['Biography'], "Location/Room": "", "Image (Optional)": "", # TODO })
def main(): tests = load_test_data('./test-data.edited') tests = dict([(f[0], f[1]) for f in tests]) import sys, csv from collections import OrderedDict with open(sys.argv[1]) as reader, open(sys.argv[1] + '.fixed.csv', 'w') as writer: rows = csv.DictReader(reader) ordered_fieldnames = OrderedDict([(f, '') for f in rows.fieldnames]) dw = csv.DictWriter(writer, fieldnames=ordered_fieldnames) dw.writeheader() for row in rows: row['%s_gold_reason' % MATCH_RESULT] = reason % tests[row['url1']] row['%s_gold' % MATCH_RESULT] = pos_res dw.writerow(row) pass
def dump(self, fname): if self.cursor >= len(self.record): logger.warn("Nothing to dump") return False header = ['Datetime', 'Question', 'Answer', 'Rate'] for k in self.record[0].keys(): if k not in header: header.append(k) dirname = os.path.dirname(fname) if not os.path.isdir(dirname): os.makedirs(dirname) with open(fname, 'a') as f: writer = csv.DictWriter(f, header, extrasaction='ignore') if self.cursor == 0: writer.writeheader() writer.writerows(self.record[self.cursor:]) self.cursor = len(self.record) logger.warn("Dumpped chat history to {}".format(fname)) return True return False
def save_to_tsv(out_file, geocoding, out_path=''): try: out_file_with_extension = '{}.tsv'.format(out_file) with open(os.path.realpath(os.path.join(out_path, out_file_with_extension)), 'w+', newline='') as csvfile: fieldnames = ['geonameId', 'name', 'toponymName', 'fcl', 'fcode', 'fcodeName', 'fclName', 'lat', 'lng', 'adminCode1', 'adminName1', 'adminCode2', 'adminName2', 'adminCode3', 'adminName3', 'adminCode4', 'adminName4', 'countryName', 'population', 'continentCode', 'countryCode', ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter='\t', quotechar='"', quoting=csv.QUOTE_MINIMAL) writer.writeheader() id, locations = geocoding[0] for data in locations: writer.writerow(data) csvfile.close() return out_file_with_extension except Exception as e: logger.error('Error while writing tsv file {}'.format(e)) raise
def dump_dict_table(filepath,table,fields=None): if isinstance(table,dict): keys = list(table.keys()) keys.sort() table2 = [] for key in keys: line = table[key] table2.append(line) table = table2 if table == []: fields = [] elif fields == None: fields = table[0].keys() fid = open(filepath,'w') outdict = csv.DictWriter(fid, dialect='excel-tab', lineterminator = '\n', fieldnames = fields) outdict.writerow(dict(zip(fields,fields))) outdict.writerows(table) fid.close()
def on_epoch_end(self, epoch, logs={}): def handle_value(k): is_zero_dim_ndarray = isinstance(k, np.ndarray) and k.ndim == 0 if isinstance(k, Iterable) and not is_zero_dim_ndarray: return '"[%s]"' % (', '.join(map(lambda x: str(x), k))) else: return k if not self.writer: self.keys = sorted(logs.keys()) self.writer = csv.DictWriter(self.csv_file, fieldnames=['epoch'] + self.keys) if self.append_header: self.writer.writeheader() row_dict = OrderedDict({'epoch': epoch}) row_dict.update((key, handle_value(logs[key])) for key in self.keys) self.writer.writerow(row_dict) self.csv_file.flush()
def dump_tabular(*args, **kwargs): wh = kwargs.pop("write_header", None) if len(_tabular) > 0: if _log_tabular_only: table_printer.print_tabular(_tabular) else: for line in tabulate(_tabular).split('\n'): log(line, *args, **kwargs) tabular_dict = dict(_tabular) # Also write to the csv files # This assumes that the keys in each iteration won't change! for tabular_fd in list(_tabular_fds.values()): writer = csv.DictWriter(tabular_fd, fieldnames=list(tabular_dict.keys())) if wh or (wh is None and tabular_fd not in _tabular_header_written): writer.writeheader() _tabular_header_written.add(tabular_fd) writer.writerow(tabular_dict) tabular_fd.flush() del _tabular[:]
def start_up(self, env, mod_config): self._env = env self._mod_config = mod_config env.event_bus.add_listener(EVENT.POST_BAR, self._output_feeds) output_path = mod_config.output_path filename = os.path.join(output_path, "portfolio.csv") new_file = False if not os.path.exists(filename): new_file = True self.csv_file = open(filename, 'a') fieldnames = ["datetime", "portfolio_value", "market_value", "total_returns"] self.csv_writer = csv.DictWriter(self.csv_file, fieldnames) if new_file: self.csv_writer.writeheader()
def read_and_wrangle(src, dest): wf = dest.open('w') wcsv = csv.DictWriter(wf, fieldnames=FINAL_HEADERS) wcsv.writeheader() # only 2011.csv has windows-1252 instead of ascii encoding, # but we open all files as windows-1252 just to be safe with src.open("r", encoding='windows-1252') as rf: records = csv.DictReader(rf) for i, row in enumerate(records): row = strip_record(row) newrow = wrangle_record(row) wcsv.writerow(newrow) # a little status checker if i % 10000 == 1: print("...wrote row #", i) # done writing file print("Wrangled", i, "rows and saved to", dest) wf.close()
def write_scores(scores, output=sys.stdout, do_normalize=True, default_score=0): """ Writes the given segment scores to a submission file. If a expected segment is missing, it will be filled with a default value. :param scores: A list of score dictionaries. Each if the inner dictionaries should have two keys: 'clip' and 'preictal'. 'clip' is the name of the segment and 'preictal' is the class probability of that segment being preictal. :param output: The file-like object which the scores should be written to. :param do_normalize: If True, scores will be normalized per subject. :param default_score: The value to use for missing segments in the *scores* list. :return: None. The scores are written to the output file-like object. """ submissions = scores_to_submission(scores, do_normalize=do_normalize, default_score=default_score) csv_writer = csv.DictWriter(output, fieldnames=['clip', 'preictal']) csv_writer.writeheader() csv_writer.writerows(submissions)
def write_to_csv(file, all_data): senators = set() for bill in all_data: for vote in bill['votes']: senators.add(vote['senator']) headers = ['description', 'date'] headers.extend(senators) with open(file, 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=headers, restval='N/A') writer.writeheader() for bill in all_data: row = { 'description': bill['description'], 'date': bill['date'].replace(',', '/') } row.update({b['senator']: b['voted'] for b in bill['votes']}) writer.writerow(row) # Save data to JSON
def __init__(self, stream, fieldnames, encoding='utf-8', **kwds): """Initialzer. Args: stream: Stream to write to. fieldnames: Fieldnames to pass to the DictWriter. encoding: Desired encoding. kwds: Additional arguments to pass to the DictWriter. """ writer = codecs.getwriter(encoding) if (writer is encodings.utf_8.StreamWriter or writer is encodings.ascii.StreamWriter or writer is encodings.latin_1.StreamWriter or writer is encodings.cp1252.StreamWriter): self.no_recoding = True self.encoder = codecs.getencoder(encoding) self.writer = csv.DictWriter(stream, fieldnames, **kwds) else: self.no_recoding = False self.encoder = codecs.getencoder('utf-8') self.queue = cStringIO.StringIO() self.writer = csv.DictWriter(self.queue, fieldnames, **kwds) self.stream = writer(stream)
def __init__(self, sources, indicators, exchanges, inhablers, override_vars={}, quiet=False, *args, **kwargs): super(BaseBot,self).__init__() self._quiet = quiet self.exchanges = exchanges self.sources = sources self.indicators = indicators self.inhablers = inhablers self.vars = self.DEFAULT_VARS.copy() self.stats = self.DEFAULT_STATS.copy() print("Overriding: %s" % (override_vars,)) self.vars.update(override_vars) if self.CSV_FIELDS: # Get a data log open; all bots should use one csv_fp = open(self.vars['output_stats_csv'], 'w') self._csv = csv.DictWriter(csv_fp, ['timestamp']+self.CSV_FIELDS)
def write_input_spreadsheet(data_units, outfile): # Merge all the keys to prepare the CSV headers headers = set([k for d in data_units for k in d.keys()]) # Specific field for test (gold) units headers.add('_golden') headers = list(headers) gold_columns = [] for header in headers: # Add gold answer columns for each chunk if re.search('chunk_[0-9]{2}$', header): gold_columns.append(header + '_gold') headers += gold_columns headers.sort() logger.debug('CSV headers: %s' % headers) writer = DictWriter(outfile, headers) writer.writeheader() writer.writerows(data_units) return 0
def getAvailableVolumes(): response = session.client('ec2').describe_volumes() AvailableVolumes = [] for vol in response['Volumes']: if vol['State'] == 'available': AvailableVolumes.append(vol) with open('AvailableVolumes.csv', 'wb') as fileHandler: for aVol in AvailableVolumes: if len(aVol) == max([len(i) for i in AvailableVolumes]): fieldNames = aVol.keys() break writer = DictWriter(fileHandler, fieldnames=fieldNames) writer.writeheader() for aVol in AvailableVolumes: writer.writerow(aVol)
def Open(self): try: if self.v_extension == 'csv': self.v_file = open(self.v_filename, 'w', encoding=self.v_encoding) self.v_object = csv.DictWriter(v_file, fieldnames=self.v_header) self.v_object.writeheader() self.v_open = True elif self.v_extension == 'xlsx': self.v_object = openpyxl.Workbook(write_only=True) self.v_open = True else: raise Spartacus.Utils.Exception('File extension "{0}" not supported.'.format(self.v_extension)) except Spartacus.Utils.Exception as exc: raise exc except Exception as exc: raise Spartacus.Utils.Exception(str(exc))
def write_bars_to_file(bars, filename, tz): """Creates CSV file from list of Bar instances""" date_format_str = "%Y%m%d %H%M%S" rows = [{'DateTime': bar.datetime.astimezone(tz).strftime(date_format_str), 'Open': bar.open, 'High': bar.high, 'Low': bar.low, 'Close': bar.close, 'Volume': bar.volume, } for bar in bars] if os.path.exists(filename): raise Exception("File already exists!") fd = os.popen("gzip > %s" % filename, 'w') if filename.endswith('.gz') else open(filename, 'w') with fd: csv_writer = csv.DictWriter(fd, ['DateTime', 'Open', 'High', 'Low', 'Close', 'Volume']) csv_writer.writeheader() csv_writer.writerows(rows)
def export_from(datadir, tofile='out.csv'): data = [] for (root,dirs,files) in os.walk(datadir): for fn in [os.path.join(root,x) for x in files if x == 'entry.json']: with open(fn, 'r', encoding='utf-8') as fd: entry = json.load(fd) data.append(entry) data = sorted(data, lambda a,b: int(a['refid']) - int(b['refid'])) with open(tofile, 'wb') as csvfile: writer = csv.DictWriter(csvfile, ['id','name','section','provisional','country','url','image','pdf']) writer.writeheader() for entry in data: for n in ['name', 'section', 'country']: entry[n] = unicode(entry[n]).encode('utf-8') entry['id'] = entry['refid'] entry.pop('refid') entry['image'] = None if 'thumb' in entry: entry['image'] = entry['thumb'] entry.pop('thumb') writer.writerow(entry)
def history_add_entry(**kwargs): if etc.history_path is not None: # add missing fields kwargs["timestamp"] = "{0}".format(time.time()) for fieldname in HISTORY_FIELDNAMES: if fieldname not in kwargs: kwargs[fieldname] = "" # update history file writeheader = not os.path.exists(etc.history_path) with open(etc.history_path, 'a') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=HISTORY_FIELDNAMES) if writeheader: writer.writeheader() writer.writerow(kwargs)
def save_predictions(pred, file): """ Save predictions to CSV file Args: pred: numpy array, of numeric predictions file: str, filename + extension """ with open(file, 'w') as csvfile: fieldnames = ['Stance'] writer = DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for instance in pred: writer.writerow({'Stance': label_ref_rev[instance]})
def write_results_at_various_thresholds(token_to_probes, check_fingerprints, increment_threshold_by=0.01): """Output to CSV results at various thresholds. Used to draw ROC curve. :param token_to_probes: Dictionary of token to list of probe dictionary :param check_fingerprints: Optional step to remove false positives. """ def drange(x, y, jump): """Because python doesn't support decimal steps...""" while x <= y: yield float(x) x += decimal.Decimal(jump) with open("jaccard_threshold_results.csv", "w") as f: writer = csv.DictWriter(f, fieldnames=["tp", "fp", "tn", "fn", "tpr", "fpr", "accuracy", "clusters", "macs", "median"]) writer.writeheader() for threshold in drange(0, 1.01, increment_threshold_by): writer.writerow(cluster_with_threshold(token_to_probes, threshold, check_fingerprints))
def run(self): users = {} for tweet_str in self.input().open('r'): tweet = json.loads(tweet_str) user = tweet['user']['screen_name'] followers = int(tweet['user']['followers_count']) following = int(tweet['user']['friends_count']) if following > 0: r = followers / float(following) users[user] = r with self.output().open('w') as fp_counts: writer = csv.DictWriter(fp_counts, delimiter=',', quoting=csv.QUOTE_MINIMAL, fieldnames=['user', 'count']) writer.writeheader() for user, r in users.items(): writer.writerow({'user': user, 'count': r})
def extractFieldsFromDoc(self, doc, prefix=""): """Extract field values defined in self.columns from document into a dict required for csv.DictWriter.""" res = dict() for field, value in doc.items(): if type(value) == dict: # subdocument res.update(self.extractFieldsFromDoc(value, prefix + field + ".")) elif type(value) == list: if type(value[0]) == dict: # multiple nested documents nestedcolfield = self.params['nestedcolfield'] nestedvalfield = self.params['nestedvalfield'] if nestedcolfield != None and nestedvalfield != None: for subdoc in value: try: nestedcol = prefix + field + "." + subdoc[nestedcolfield] if nestedcol in self.columns: res[nestedcol] = subdoc[nestedvalfield] except KeyError: # nested document doesn't contains column name or value field pass else: # multivalued field if prefix + field in self.columns: res[prefix + field] = self.params["listsep"].join(value) else: # simple value if prefix + field in self.columns: res[prefix + field] = value return res
def render(self, result): output = engine.EQUELOutput(engine.EQUELOutput.TYPE_TEXT, ["search"]) self.columns = list() # First step: determine all columns that should appear in result of search result CSV if len(self.params['fields']) > 0: # if field whitelist is given, take this self.columns = self.params['fields'] else: for doc in result.result["hits"]["hits"]: # iterate over all documents from result and pull columns from there doccolumns = self.columnNames(doc["_source"]) for column in doccolumns: if column not in self.columns: self.columns.append(column) import csv csvwriter = csv.DictWriter(output, self.columns, dialect=self.params['dialect']) if self.params['header']: csvwriter.writeheader() # Next: Iterate over documents and fill CSV with data for doc in result.result["hits"]["hits"]: extracted = self.extractFieldsFromDoc(doc["_source"]) csvwriter.writerow(extracted) return output
def write_csv(contacts, output_file, dialect='unix', verbosity=0): """ makes a csv out of the contacts dict and writes it to output_file (an open file descriptor) """ verbose_print("generating csv", verbosity, 1) new_contacts = [] for contact in contacts: # make contacts csv compatible new_contacts.append(make_contact_csv_compatible(contact, verbosity)) writer = csv.DictWriter(output_file, fieldnames=new_contacts[0].keys(), dialect=dialect) writer.writeheader() for contact in new_contacts: writer.writerow(contact)
def csv_download(request): """ Creates a CSV file using all of the applications to the users organization. """ apps = get_all_applications_for_users_org(request.user) data = ApplicationCSVDownloadSerializer(apps, many=True).data fields = [] for datum in data: these_fields = list(datum.keys()) # Finds the largest set of fields and uses it # There should not be a case where a smaller set of fields would have # a field not in a larger one. if len(these_fields) > len(fields): fields = these_fields response = HttpResponse(content_type='text/csv') csv_writer = csv.DictWriter(response, fieldnames=fields) csv_writer.writeheader() csv_writer.writerows(data) file = 'all_applications_to_%s_%s.csv' % ( request.user.profile.organization.slug, timezone.now().strftime('%m-%d-%Y'), ) response['Content-Disposition'] = 'attachment; filename="%s"' % file return response
def buildSampleData(numPapers, inputDir, outputDir): papers = set() authors = set() with open(dataDir + "/PaperAuthor.csv") as csvfile: reader = csv.DictReader(csvfile) with open(sampleDataDir + "/PaperAuthor.csv", 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=reader.fieldnames) writer.writeheader() for row in reader: # make sure to stop after numPapers if len(papers) >= numPapers: break papers.add(row["PaperId"]) authors.add(row["AuthorId"]) writer.writerow(row) copyFile("Author.csv", authors, inputDir, outputDir) copyFile("Paper.csv", papers, inputDir, outputDir) return papers, authors
def init_csv_file (csv_file_param): """ Add the list of flattened event structures into the CSV file """ csv_file = None if isinstance (csv_file_param, str): # The parameter is a file-path csv_file = open (csv_file_param, 'w', newline = '') elif hasattr (csv_file_param, 'write'): # The parameter is already a file (normally, stdout) csv_file = csv_file_param else: # Unknown raise IOError ('[Error] Output file parameter "' + str(csv_file_param) + '" unkown') # Write the header fileWriter = csv.DictWriter (csv_file, delimiter='^', fieldnames = fieldnames, dialect = 'unix', quoting = csv.QUOTE_NONE) # fileWriter.writeheader()
def write_object_labels_csv(file, labeled_data): # write a csv file print('[dataset] write file %s' % file) with open(file, 'w') as csvfile: fieldnames = ['name'] fieldnames.extend(object_categories) writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for (name, labels) in labeled_data.items(): example = {'name': name} for i in range(20): example[fieldnames[i + 1]] = int(labels[i]) writer.writerow(example) csvfile.close()
def main(): writer = csv.DictWriter( sys.stdout, delimiter='\t', fieldnames=['level', 'bd', 'nabd', 'naaw', 'success', 'duration'] ) writer.writeheader() records = [translate_raw_record(r) for r in load_records()] for rec in sorted(records, key=itemgetter('success')): writer.writerow(rec) print('p(success) = {}'.format(p_success(records))) print('p(bd==c) = {}'.format(p_breakdown('c', records))) print('p(success and bd=c) = {}'.format(p_success_and_breakdown('c', records))) for breakdown in ['', 'ag', 'c', 'pd']: print('p(success | bd={}) = {}'.format( breakdown, p_success_given_breakdown(breakdown, records))) for nabd in range(4): print('p(success | nabd={}) = {}'.format(nabd, p_success_given_nabd(nabd, records))) for naaw in range(6): print('p(success | naaw={}) = {}'.format(naaw, p_success_given_naaw(naaw, records)))
def run(self, args): student_grades = {} csci = csv.DictReader(args.csci, fieldnames=['unixname'] + args.columns) for line in csci: student = line['unixname'] del line['unixname'] student_grades[student] = line blackboard = csv.DictReader(args.blackboard) fields = blackboard.fieldnames # there's some weird unicode bs in the first column of a blackboard file # this grabs just the actual column fields[0] = fields[0][2:-1] output = csv.DictWriter(args.output, fieldnames=blackboard.fieldnames) output.writeheader() for row in blackboard: unix_name = row['Username'] new_row = row new_row.update(student_grades[unix_name]) output.writerow(new_row)
def write_output(interval): with open(report_file, 'w') as csvfile: fieldnames = ['Volume Name', 'Current Data Reduction', 'Data Reduction ' + interval, 'Current Size(GB)', 'Size ' + interval + ' Ago(GB)', interval + ' Growth(GB)'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() print('Parsing volume data.') # Loop through all volumes to get historical space data for currentvol in allvolumes: thisvol = array.get_volume(currentvol['name'], space='True', historical=interval) volname = thisvol[0]['name'] volcurdr = round(thisvol[0]['data_reduction'], 2) volstartdr = round(thisvol[len(thisvol) - 1]['data_reduction'], 2) volstartsize = round(thisvol[0]['volumes'] / 1024 / 1024 / 1024, 2) volcursize = round(thisvol[len(thisvol) - 1]['volumes'] / 1024 / 1024 / 1024, 2) volsizedif = volcursize - volstartsize volsizedif = round(volsizedif, 2) writer.writerow( {'Volume Name': volname, 'Current Data Reduction': volcurdr, 'Data Reduction ' + interval: volstartdr, 'Current Size(GB)': volcursize, 'Size ' + interval + ' Ago(GB)': volstartsize, interval + ' Growth(GB)': volsizedif})
def test_write_fields_not_in_fieldnames(self): fd, name = tempfile.mkstemp() fileobj = os.fdopen(fd, "w+b") try: writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"]) # Of special note is the non-string key (issue 19449) with self.assertRaises(ValueError) as cx: writer.writerow({"f4": 10, "f2": "spam", 1: "abc"}) exception = str(cx.exception) self.assertIn("fieldnames", exception) self.assertIn("'f4'", exception) self.assertNotIn("'f2'", exception) self.assertIn("1", exception) finally: fileobj.close() os.unlink(name)
def __init__(self, optimizer, obj_func, pop_size=1, threshold=None, max_iter=10000, out='result', logging=False): self.opt = optimizer self.obj_func = obj_func self.pop_size = pop_size self.threshold = threshold self.max_iter = max_iter self.min = optimizer.w_func.min self.out = out self.logging = logging if self.logging: if not os.path.isdir(out): os.makedirs(out) with open(out+'/log.csv', 'w') as log_file: self.header = ['Generation', 'BestEval'] + self.opt.generate_header() + self.opt.target.generate_header() csv_writer = csv.DictWriter(log_file, fieldnames=self.header) csv_writer.writeheader() if self.threshold is None and self.min: self.threshold = 1e-6 elif self.threshold is None: self.threshold = 1e+6