我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.writer()。
def setup_output_writers(parent_dir, fold_number): """ Create an output directory for the fold under the provided parent dir :param str parent_dir: file path to the output dir :param int fold_number: fold number to use in directory name :return: writer for <outdir.name>/fold<fold_number>/train.csv and <outdir.name>/fold<fold_number>/validation.csv :rtype: tuple(csv.writer,csv.writer) """ output_dir = path.join(parent_dir, "Fold%d" % fold_number) if not path.isdir(output_dir): LOGGER.debug("Creating output for fold %d at the location: %s" % (fold_number, output_dir)) makedirs(output_dir) else: LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir) train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'), dialect=csv.excel, delimiter=',') validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'), dialect=csv.excel, delimiter=',') return train_writer, validation_writer
def csv_safe_unicode(row, ignoreErrors=True): """ Given an array of values, make sure all strings are unicode in Python 3 and str in Python 2. The csv.writer in Python 2 does not handle unicode strings and in Python 3 it does not handle byte strings. :param row: an array which could contain mixed types. :param ignoreErrors: if True, convert what is possible and ignore errors. If false, conversion errors will raise an exception. :returns: either the original array if safe, or an array with all byte strings converted to unicode in Python 3 or str in Python 2. """ # This is unicode in Python 2 and bytes in Python 3 wrong_type = six.text_type if str == six.binary_type else six.binary_type # If all of the data is not the wrong type, just return it as is. This # allows non-string types, such as integers and floats. if not any(isinstance(value, wrong_type) for value in row): return row # Convert the wrong type of string to the type needed for this version of # Python. For Python 2 unicode gets encoded to str (bytes). For Python 3 # bytes get decoded to str (unicode). func = 'encode' if str == six.binary_type else 'decode' row = [getattr(value, func)('utf8', 'ignore' if ignoreErrors else 'strict') if isinstance(value, wrong_type) else value for value in row] return row
def save_gem_class_csv(self, base_dir): csv_file_path = os.path.join(base_dir, 'gem_classification.csv') cr_utils.makedirs(os.path.dirname(csv_file_path), allow_existing=True) with open(csv_file_path, 'wb') as f: writer = csv.writer(f) writer.writerow(['barcode', self.result['genome0'], self.result['genome1'], 'call' ]) for i in xrange(len(self.result['barcode'])): call = self.result['call'][i] call = call.replace(cr_constants.GEM_CLASS_GENOME0, self.result['genome0']) call = call.replace(cr_constants.GEM_CLASS_GENOME1, self.result['genome1']) writer.writerow([ self.result['barcode'][i], self.result['count0'][i], self.result['count1'][i], call, ])
def build_gtf(self): print "Writing new genes GTF file (may take 10 minutes for a 1GB input GTF file)..." with open(self.out_gtf_fn, 'wb') as f: writer = csv.writer(f, delimiter='\t', quoting=csv.QUOTE_NONE, quotechar='') for row, is_comment, properties in self.gtf_reader_iter(self.in_gtf_fn): if is_comment: writer.writerow(row) continue remove = False for key, value in properties.iteritems(): if key in self.attributes and value not in self.attributes[key]: remove = True if not remove: writer.writerow(row) print "...done\n"
def build_metrics_summary_csv(filename, sample_properties, sample_data, pipeline): metrics, alarms, charts, all_prefixes = get_constants_for_pipeline(pipeline) tables, _ = build_tables(sample_properties, metrics, alarms, sample_data, all_prefixes=all_prefixes) if not tables: sys.stderr.write("No metrics tables were generated, skipping CSV generation.\n") return csv_metrics = collections.OrderedDict() for table in tables: if not table: continue for metric, _, value in table['rows']: if type(metric) == dict: metric = metric['v'] if type(value) == dict: value = value['v'] if metric not in csv_metrics: csv_metrics[metric] = value with open(filename, 'wb') as f: writer = csv.writer(f) writer.writerow(csv_metrics.keys()) writer.writerow(csv_metrics.values())
def dump_to_csv(self, output_csv, input_fields, write_header=True, top_level=False, mode='a', encoding='utf-8', compression=None): if compression == 'bz2': mode = binary_mode(mode) filehandle = bz2.open(output_csv, mode) elif compression == 'gzip': mode = binary_mode(mode) filehandle = gzip.open(output_csv, mode) else: filehandle = open(output_csv, mode) writer = csv.writer(filehandle) if write_header: writer.writerow(input_fields) tweet_parser = TweetParser() for tweet in self.get_iterator(): if top_level: ret = list(zip(input_fields, [tweet.get(field) for field in input_fields])) else: ret = tweet_parser.parse_columns_from_tweet(tweet,input_fields) ret_values = [col_val[1] for col_val in ret] writer.writerow(ret_values) filehandle.close()
def __init__(self, ofile, maxresultrows=None): self._maxresultrows = 50000 if maxresultrows is None else maxresultrows self._ofile = ofile self._fieldnames = None self._buffer = StringIO() self._writer = csv.writer(self._buffer, dialect=CsvDialect) self._writerow = self._writer.writerow self._finished = False self._flushed = False self._inspector = OrderedDict() self._chunk_count = 0 self._record_count = 0 self._total_record_count = 0L
def QA_SU_save_account_to_csv(message,path=os.getcwd()): __file_name_1 = '{}backtest-ca&history-{}.csv'.format(path,str(message['header']['cookie'])) with open(__file_name_1, 'w', newline='') as C: csvwriter = csv.writer(C) csvwriter.writerow(['date', 'code', 'price', 'towards', 'amount', 'order_id', 'trade_id', 'commission_fee', 'cash', 'assets']) for i in range(0, max(len(message['body']['account']['cash']), len(message['body']['account']['assets']))): try: message['body']['account']['history'][i].append( message['body']['account']['cash'][i]) message['body']['account']['history'][i].append( message['body']['account']['assets'][i]) csvwriter.writerow(message['body']['account']['history'][i]) except: pass
def QA_SU_save_pnl_to_csv(detail, cookie): __file_name_2 = 'backtest-pnl--' + \ str(cookie) + '.csv' with open(__file_name_2, 'w', newline='') as E: csvwriter_1 = csv.writer(E) csvwriter_1.writerow(detail.columns) for item in detail: csvwriter_1.writerow(item) """ 'cash': message['body']['account']['cash'], 'hold': message['body']['account']['hold'], 'history': message['body']['account']['history'], 'assets': message['body']['account']['assets'], 'detail': message['body']['account']['detail'] """
def convert_hypnograms(datadir): """ This function is quite a hack to read the edf hypnogram as a byte array. I found no working reader for the hypnogram edfs. """ print('Converting hypnograms') files = [x for x in os.listdir(datadir) if x.endswith('.hyp')] for file in files: file = os.path.join(datadir,file) hypnogram = [] with open(file, mode='rb') as f: # b is important -> binary raw_hypno = [x for x in str(f.read()).split('Sleep_stage_')][1:] for h in raw_hypno: stage = h[0] repeat = int(h.split('\\')[0][12:])//30 # no idea if this also works on linux hypnogram.extend(stage*repeat) with open(file[:-4] + '.csv', "w") as f: writer = csv.writer(f, lineterminator='\r') writer.writerows(hypnogram)
def export_to_csv(request, variants): #export to csv export = request.GET.get('export', '') if export != '': if export == 'csv': response = HttpResponse(content_type='text/csv') response['Content-Disposition'] = 'attachment; filename=export.csv' writer = csv.writer(response) elif export == 'txt': response = HttpResponse(content_type='text/plain') response['Content-Disposition'] = 'attachment; filename=export.txt' writer = csv.writer(response, delimiter='\t', quoting=csv.QUOTE_NONE) writer.writerow(['Individual', 'Index', 'Pos_index', 'Chr', 'Pos', 'Variant_id', 'Ref', 'Alt', 'Qual', 'Filter', 'Info', 'Format', 'Genotype_col', 'Genotype', 'Read_depth', 'Gene', 'Mutation_type', 'Vartype', 'Genomes1k_maf', 'Dbsnp_maf', 'Esp_maf', 'Dbsnp_build', 'Sift', 'Sift_pred', 'Polyphen2', 'Polyphen2_pred', 'Condel', 'Condel_pred', 'DANN', 'CADD', 'Is_at_omim', 'Is_at_hgmd', 'Hgmd_entries', 'Effect', 'Impact', 'Func_class', 'Codon_change', 'Aa_change', 'Aa_len', 'Gene_name', 'Biotype', 'Gene_coding', 'Transcript_id', 'Exon_rank', 'Genotype_number', 'Allele', 'Gene', 'Feature', 'Feature_type', 'Consequence', 'Cdna_position', 'Cds_position', 'Protein_position', 'Amino_acids', 'Codons', 'Existing_variation', 'Distance', 'Strand', 'Symbol', 'Symbol_source', 'Sift', 'Polyphen', 'Condel']) for variant in variants: # print 'variant', variant.index writer.writerow([variant.individual, variant.index, variant.pos_index, variant.chr, variant.pos, variant.variant_id, variant.ref, variant.alt, variant.qual, variant.filter, pickle.loads(variant.info), variant.format, variant.genotype_col, variant.genotype, variant.read_depth, variant.gene, variant.mutation_type, variant.vartype, variant.genomes1k_maf, variant.dbsnp_maf, variant.esp_maf, variant.dbsnp_build, variant.sift, variant.sift_pred, variant.polyphen2, variant.polyphen2_pred, variant.condel, variant.condel_pred, variant.dann, variant.cadd, variant.is_at_omim, variant.is_at_hgmd, variant.hgmd_entries]) return response
def writerow(self, row): """Write row.""" line = [] for s in row: if (type(s) == dict): line.append(json.dumps(s)) else: line.append(unicode(s).encode("utf-8")) self.writer.writerow(line) # Fetch UTF-8 output from the queue ... data = self.queue.getvalue() data = data.decode("utf-8") # ... and reencode it into the target encoding data = self.encoder.encode(data) # write to the target stream self.stream.write(data) # empty queue self.queue.truncate(0)
def get(self, request, *args, **kwargs): queryset = self.get_queryset() field_names = self.get_fields(queryset) response = HttpResponse(content_type='text/csv') filename = self.get_filename(queryset) response['Content-Disposition'] = 'attachment; filename="{}.csv"'.format(filename) writer = csv.writer(response, **self.get_csv_writer_fmtparams()) if self.specify_separator: response.write('sep={}{}'.format(writer.dialect.delimiter, writer.dialect.lineterminator)) if self.header: writer.writerow([self.get_header_name(queryset.model, field_name) for field_name in list(field_names)]) for obj in queryset: writer.writerow([self.get_field_value(obj, field) for field in field_names]) return response
def _write_csv_output(note_phrase_matches, output_filename): """Write one CSV row for each phrase_match where the row contains all of the RPDR note keys along with the extracted numerical value at the end of the row.""" rpdr_rows_with_regex_value = [] for phrase_matches in note_phrase_matches: row = phrase_matches.rpdr_note.get_keys() if not phrase_matches.phrase_matches: extracted_value = None else: extracted_value = phrase_matches.phrase_matches[0].extracted_value row.append(extracted_value) rpdr_rows_with_regex_value.append(row) with open(output_filename, 'wb') as output_file: csv_writer = csv.writer(output_file) csv_writer.writerows(rpdr_rows_with_regex_value)
def write_data_to_csv(csv_name, people, countries): """ Loop through the list of people and write them to a csv. Args: csv_name (str): Name of the file to write results to. people (list): List of instantiated ``Person`` objects. countries (list): List of strings that represent countries. """ with open(csv_name, 'w') as outfile: writer = csv.writer(outfile) columns = ['name'] + countries writer.writerow(columns) for person in people: person_row = [person.name] + [ getattr(person, country, 0) for country in countries ] writer.writerow(person_row)
def sensor_live(self): x = [] y1 = [] y2 = [] for i in range(0,330,30): # change time interval here, if required self.sensor_wake() time.sleep(10) pm = self.sensor_read() if pm is not None: x.append(i) y1.append(pm[0]) y2.append(pm[1]) with open('/home/pi/data.csv', 'ab') as csvfile: file = csv.writer(csvfile, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL) file.writerow([datetime.datetime.now().replace(microsecond=0).isoformat().replace('T', ' '), pm[0], pm[1]]) csvfile.close() line1, = self.ax.plot(x,y1,'r-x') line2, = self.ax.plot(x,y2,'b-x') self.canvas.draw() self.sensor_sleep() time.sleep(20)
def list_to_csv(directory_and_filename, list): if directory_and_filename[-4:] == '.csv': directory_and_filename = directory_and_filename[:-4] with open(directory_and_filename + '.csv', 'wb') as csvfile: spamwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL) for row in list: try: spamwriter.writerow(row) except UnicodeEncodeError: new_row = [] for element in row: if type(element) is unicode: new_row.append(element.encode('utf-8')) else: new_row.append(element) csvfile.close()
def generate_csv(domains, file_name): output = open(file_name, 'w') writer = csv.writer(output) # First row should always be the headers writer.writerow(CSV_HEADERS) for domain in domains: row = [] # Grab the dictionary for each row. # Keys for the dict are the column headers. results = domain.generate_results() for column in CSV_HEADERS: row.append(results[column]) writer.writerow(row) output.close()
def result_write_to_csv(devices_info_list): """ Write network devices information to csv format. Args: devices_info_list (list[NetworkDeviceDTO]): Lists of network device info instances. """ f = csv.writer(open("network_devices.csv", "w+")) f.writerow(["Device Name", "IP Address", "MAC Address", "IOS/Firmware", "Platform", "Serial Number", "Devcie Role", "Device Family"]) for device_info in devices_info_list: f.writerow([device_info.hostname, device_info.managementIpAddress, device_info.macAddress, device_info.softwareVersion, device_info.platformId, device_info.serialNumber, device_info.role, device_info.family])
def write_snp_summary(self, file="snp_summary.csv", summary_parameters=None, sort=False): if summary_parameters is None: summary_parameters = ["maf", "hwe", "rep", "call_rate"] out_file = os.path.join(self.out_path, self.attributes["project"] + "_" + file) out_data = [["id"] + summary_parameters] snps = [[snp] + [data[parameter] for parameter in summary_parameters] for snp, data in self.data.items()] if sort: snps = sorted(snps, key=operator.itemgetter(*[i for i in range(1, len(summary_parameters)+1)]), reverse=True) out_data += snps with open(out_file, "w") as snp_summary: writer = csv.writer(snp_summary) writer.writerows(out_data)
def _writeToCSV(self): ''' INFO ---- Writes a 2-dimensional list to a CSV text file Comma-delimits values. If there is no data, then there is no attempt to creat a file. RETURNS ------- None ''' if self._dataAsList: with open(self._filePathAndName,'w') as csvFile: writer = csv.writer(csvFile, lineterminator='\n', quoting=csv.QUOTE_NONNUMERIC ) writer.writerows(self._dataAsList) csvFile.close()
def export_data(self): self.running = False self.pupil_thread.join(5) self.lsl_thread.join(5) print('Joined threads, now outputting pupil data.') i = 0 while os.path.exists("data/pupil/data-%s.csv" % i): i += 1 # csv writer with stim_type, msg, and timestamp, then data with open('data/pupil/data-%s.csv' % i, 'w+') as f: writer = csv.writer(f) writer.writerow(('Signal Type', 'Msg', 'Time', 'Channel 1', 'Channel 2', 'Channel 3', 'Channel 4', 'Channel 5', 'Channel 6', 'Channel 7', 'Channel 8' )) for sample in self.samples: signal_type, timestamp, datas = sample out = (signal_type, 'msg', timestamp) for data in datas: out = out + (data,) writer.writerow(out)
def export_annotations(self,export_range,export_dir): if not self.annotations: logger.warning('No annotations in this recording nothing to export') return annotations_in_section = chain(*self.annotations_by_frame[export_range]) annotations_in_section = list({a['index']: a for a in annotations_in_section}.values()) # remove duplicates annotations_in_section.sort(key=lambda a:a['index']) with open(os.path.join(export_dir,'annotations.csv'),'w',encoding='utf-8',newline='') as csvfile: csv_writer = csv.writer(csvfile) csv_writer.writerow(self.csv_representation_keys()) for a in annotations_in_section: csv_writer.writerow(self.csv_representation_for_annotations(a)) logger.info("Created 'annotations.csv' file.")
def write_key_value_file(csvfile,dictionary,append=False): """Writes a dictionary to a writable file in a CSV format Args: csvfile (FILE): Writable file dictionary (dict): Dictionary containing key-value pairs append (bool, optional): Writes `key,value` as fieldnames if False Returns: None: No return """ writer = csv.writer(csvfile, delimiter=',') if not append: writer.writerow(['key','value']) for key,val in dictionary.items(): writer.writerow([key,val])
def _drop_answer_id_col_from_feature_file(self, train_file_location): file_without_aid = insert_modifier_in_filename(train_file_location, 'no_aid') if path.isfile(file_without_aid): self.logger.info('Found a previously generated version of the training file without answer id column, ' 're-using it: %s' % file_without_aid) else: self.logger.info('Generating a version of the feature file without answer id (which is what ranker' ' training expects') temp_file = get_temp_file(file_without_aid) with smart_file_open(temp_file, 'w') as outfile: writer = csv.writer(outfile) with smart_file_open(train_file_location) as infile: reader = csv.reader(infile) for row in reader: writer.writerow(row[:1] + row[2:]) move(temp_file, file_without_aid) self.logger.info('Done generating file: %s' % file_without_aid) return file_without_aid
def setup_train_and_test_writer(output_dir): """ Create an output directory for the fold under the provided parent dir :param str output_dir: file path to the output dir :return: writer for <outdir.name>/train.csv and <outdir.name>/validation.csv :rtype: tuple(csv.writer,csv.writer) """ if not path.isdir(output_dir): makedirs(output_dir) else: LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir) train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'), dialect=csv.excel, delimiter=',') validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'), dialect=csv.excel, delimiter=',') return train_writer, validation_writer
def _print_feature_vectors_and_check_for_correct_answers(writer, rnr_search_results, qid, correct_ans_lookup): """ write the search results to file as a feature vector with the qid and gt labels from the query. :param csv.writer writer: :param list(list(str)) rnr_search_results: :param str qid: the qid to print at the start of each feature vector :param dict(str,int) correct_ans_lookup: label lookup for correct answer ids :return: num_possible_correct, num_correct_answers_in_search_results :rtype: tuple(int,int) """ num_possible_correct = len(correct_ans_lookup) num_correct_answers_in_search_results = 0 for row in rnr_search_results: gt_label = 0 doc_id = row[_ANS_ID_COL].strip() if doc_id in correct_ans_lookup: gt_label = correct_ans_lookup[doc_id] num_correct_answers_in_search_results += 1 writer.writerow([qid] + row + [gt_label]) return num_possible_correct, num_correct_answers_in_search_results
def scaneventresultexport(self, id, type, dialect="excel"): dbh = SpiderFootDb(self.config) data = dbh.scanResultEvent(id, type) fileobj = StringIO() parser = csv.writer(fileobj, dialect=dialect) parser.writerow(["Updated", "Type", "Module", "Source", "F/P", "Data"]) for row in data: if row[4] == "ROOT": continue lastseen = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(row[0])) datafield = str(row[1]).replace("<SFURL>", "").replace("</SFURL>", "") parser.writerow([lastseen, str(row[4]), str(row[3]), str(row[2]), row[13], datafield]) cherrypy.response.headers['Content-Disposition'] = "attachment; filename=SpiderFoot.csv" cherrypy.response.headers['Content-Type'] = "application/csv" cherrypy.response.headers['Pragma'] = "no-cache" return fileobj.getvalue()
def scaneventresultexportmulti(self, ids, dialect="excel"): dbh = SpiderFootDb(self.config) scaninfo = dict() data = list() for id in ids.split(','): scaninfo[id] = dbh.scanInstanceGet(id) data = data + dbh.scanResultEvent(id) fileobj = StringIO() parser = csv.writer(fileobj, dialect=dialect) parser.writerow(["Scan Name", "Updated", "Type", "Module", "Source", "F/P", "Data"]) for row in data: if row[4] == "ROOT": continue lastseen = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(row[0])) datafield = str(row[1]).replace("<SFURL>", "").replace("</SFURL>", "") parser.writerow([scaninfo[row[12]][0], lastseen, str(row[4]), str(row[3]), str(row[2]), row[13], datafield]) cherrypy.response.headers['Content-Disposition'] = "attachment; filename=SpiderFoot.csv" cherrypy.response.headers['Content-Type'] = "application/csv" cherrypy.response.headers['Pragma'] = "no-cache" return fileobj.getvalue()
def write_gs_param_results_to_file(trained_gs, most_recent_filename): timestamp_time = datetime.datetime.now() write_most_recent_gs_result_to_file(trained_gs, most_recent_filename, timestamp_time) grid_scores = trained_gs.grid_scores_ scorer = trained_gs.scorer_ best_score = trained_gs.best_score_ file_name = 'pipeline_grid_search_results.csv' write_header = False if not os.path.isfile(file_name): write_header = True with open(file_name, 'a') as results_file: writer = csv.writer(results_file, dialect='excel') if write_header: writer.writerow(['timestamp', 'scorer', 'best_score', 'all_grid_scores']) writer.writerow([timestamp_time, scorer, best_score, grid_scores])
def schedule(): """Schedule or unschedule a job TODO add a lock """ name = jsonloads(request.body.read())['Name'] timer = jsonloads(request.body.read())['Timer'] action = jsonloads(request.body.read())['Action'] schedule = [] if timer: # 0 means unschedule! schedule.append((name, action, timer)) if os.path.exists(SCHEDULE): with open(SCHEDULE) as f: for n, a, t in csv.reader(f): # skip the line we want to write if n == name and a == action: continue schedule.append((n, a, t)) os.makedirs(dirname(SCHEDULE), exist_ok=True) with open(SCHEDULE, 'w') as f: for line in schedule: csv.writer(f).writerow(line) return json.dumps({'Err': ''})
def create_HexGrid_csv(self, hexgrid): with open('HexGrid-Manhattan.csv', 'w') as out: csv_out = csv.writer(out) csv_out.writerow(['latitude','longitude']) for row in hexgrid: csv_out.writerow(row)
def getExampleFromDB(exampleName, connType, conn=False): exampleNameForDB = exampleName.replace("_","") m = hashlib.md5(exampleNameForDB) tableName = TBLPFX + m.hexdigest() leaveconn = True if not conn: leaveconn = False conn = getDB(connType) allColumns = getColumns(conn, connType, tableName, exampleNameForDB) if not allColumns: return sqlStatement = "SELECT " for singleColumn in allColumns: if "act_made_call" not in singleColumn[0] and "act_unlock" not in singleColumn[0]: sqlStatement += singleColumn[0] + ", " else: pass #print singleColumn notNullColumn = allColumns[len(allColumns)-3] # the last data column (hopefully) #cursor.execute("SELECT data.* FROM {} data, cvpr2012complete tally WHERE data.name = tally.name AND data.stamp = tally.stamp and tally.hash = %s".format(tableName), m.hexdigest()) #cursor.execute("SELECT * FROM {} WHERE {} IS NOT NULL".format(tableName, notNullColumn[0])) sqlStatement = sqlStatement[:-2] sqlStatement += " FROM " + tableName + " WHERE " + notNullColumn[0] + " IS NOT NULL" cursor = conn.cursor() cursor.execute(sqlStatement) if not globalDryRun: csv_filename = kResultStorageFolder + exampleName + ".csv" print(" as {}".format(csv_filename)) csv_writer = csv.writer(open(csv_filename, "wt")) csv_writer.writerow([i[0] for i in cursor.description]) # write headers csv_writer.writerows(cursor) del csv_writer # this will close the CSV file cursor.close() if not leaveconn: conn.close() #def getAllExamplesFromDB() # fileWithExampleNames = 'testingCutPoints.txt' # f = open(fileWithExampleNames, 'r') # for line in f: # line = line.split(
def __init__(self, fn, **kwargs): self.stream = _csv_open(fn, 'w') self.writer = csv.writer(self.stream, **self.defaults)
def writerow(self, row): if sys.version_info[0] < 3: r = [] for item in row: if isinstance(item, text_type): item = item.encode('utf-8') r.append(item) row = r self.writer.writerow(row) # # Configurator functionality #
def write_record(self, bdist_dir, distinfo_dir): from wheel.util import urlsafe_b64encode record_path = os.path.join(distinfo_dir, 'RECORD') record_relpath = os.path.relpath(record_path, bdist_dir) def walk(): for dir, dirs, files in os.walk(bdist_dir): dirs.sort() for f in sorted(files): yield os.path.join(dir, f) def skip(path): """Wheel hashes every possible file.""" return (path == record_relpath) with open_for_csv(record_path, 'w+') as record_file: writer = csv.writer(record_file) for path in walk(): relpath = os.path.relpath(path, bdist_dir) if skip(relpath): hash = '' size = '' else: with open(path, 'rb') as f: data = f.read() digest = hashlib.sha256(data).digest() hash = 'sha256=' + native(urlsafe_b64encode(digest)) size = len(data) record_path = os.path.relpath( path, bdist_dir).replace(os.path.sep, '/') writer.writerow((record_path, hash, size))
def write_to_csv(stock_data, name): """ params: - stock_data(list) : list of dict objects containing stock data - name(str) : output file name specified by `-output` param. """ with open(path.join(args.path, name + '.csv'), 'w') as the_file: fieldnames = ['date', 'open', 'high', 'low', 'close', 'volume'] writer = csv.DictWriter(the_file, fieldnames=fieldnames) writer.writeheader() for line in stock_data: writer.writerow(line)
def oldmain(): output_rows, DATE = export_impl() with open('RRID-data-%s.csv' % DATE, 'wt') as f: writer = csv.writer(f, lineterminator='\n') writer.writerows(sorted(output_rows)) import json output_json, DATE = export_json_impl() with open('RRID-data-%s.json' % DATE, 'wt') as f: json.dump(output_json, f, sort_keys=True, indent=4)
def export(request): print('starting csv export') output_rows, DATE = export_impl() data = StringIO() writer = csv.writer(data) writer.writerows(sorted(output_rows)) r = Response(gzip.compress(data.getvalue().encode())) r.content_type = 'text/csv' r.headers.update({ 'Content-Disposition':'attachment;filename = RRID-data-%s.csv' % DATE, 'Content-Encoding':'gzip' }) return r
def csv_writer(filename): with open(filename, 'w') as f: yield csv.writer(f)
def event_to_csv(request, *args, **kwargs): event = Event.objects.get(pk=kwargs.get('pk', None)) if event.event_admin.id != request.user.id: return HttpResponseForbidden() participants = EventParticipant.objects.all().filter(event=event) response = HttpResponse(content_type='text/csv') response['Content-Disposition'] = f'attachment; filename="{ event.id }.csv"' writer = csv.writer(response) writer.writerow([ 'first_name', 'last_name', 'email', 'street_one', 'street_two', 'city', 'state', 'zip_code', 'telephone_number', ]) for part in participants: writer.writerow([ part.first_name, part.last_name, part.email, part.street_one, part.street_two, part.city, part.state, part.zip_code, part.telephone_number,]) return response
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds): self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()
def writerow(self, row): '''writerow(unicode) -> None This function takes a Unicode string and encodes it to the output. ''' self.writer.writerow([s.encode("utf-8") for s in row]) data = self.queue.getvalue() data = data.decode("utf-8") data = self.encoder.encode(data) self.stream.write(data) self.queue.truncate(0)
def write_record(self, bdist_dir, distinfo_dir): from .util import urlsafe_b64encode record_path = os.path.join(distinfo_dir, 'RECORD') record_relpath = os.path.relpath(record_path, bdist_dir) def walk(): for dir, dirs, files in os.walk(bdist_dir): dirs.sort() for f in sorted(files): yield os.path.join(dir, f) def skip(path): """Wheel hashes every possible file.""" return (path == record_relpath) with open_for_csv(record_path, 'w+') as record_file: writer = csv.writer(record_file) for path in walk(): relpath = os.path.relpath(path, bdist_dir) if skip(relpath): hash = '' size = '' else: with open(path, 'rb') as f: data = f.read() digest = hashlib.sha256(data).digest() hash = 'sha256=' + native(urlsafe_b64encode(digest)) size = len(data) record_path = os.path.relpath( path, bdist_dir).replace(os.path.sep, '/') writer.writerow((record_path, hash, size))
def exportNoteToFile(): data = notes with open('saves/notes.csv', 'w') as f: writer = csv.writer(f) writer.writerows(data) return ""
def saveTasks(): data = events with open('saves/task.csv', 'w', newline = '') as t: tWriter = csv.writer(t) tWriter.writerows(data) t.close()
def csv(self, output): """Output data as excel-compatible CSV""" import csv csvwriter = csv.writer(self.outfile) csvwriter.writerows(output)