我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.QUOTE_NONNUMERIC。
def _writeToCSV(self): ''' INFO ---- Writes a 2-dimensional list to a CSV text file Comma-delimits values. If there is no data, then there is no attempt to creat a file. RETURNS ------- None ''' if self._dataAsList: with open(self._filePathAndName,'w') as csvFile: writer = csv.writer(csvFile, lineterminator='\n', quoting=csv.QUOTE_NONNUMERIC ) writer.writerows(self._dataAsList) csvFile.close()
def clear(filename): rows = [] with open(filename, 'rb') as csvfile: reader = csv.reader(csvfile, delimiter=';') rows = [row for row in reader] with open(filename, 'wb') as csvfile: writer = csv.writer(csvfile, delimiter=';', quotechar='"', skipinitialspace=True, quoting=csv.QUOTE_NONNUMERIC) # QUOTE_MINIMAL vs QUOTE_NONNUMERIC for row in rows: if not row[0].startswith("#"): # ??????? ??????????? ??????? ? ????????? ????????? row[2] = row[2].strip(' ') writer.writerow(row)
def get_csv_stats(stats, recorded_cols=None): """ Create a CSV buffer from the stats DataFrame. Parameters ---------- path: str stats: list[Object] recorded_cols: list[str] Returns ------- """ df, columns = prepare_stats(stats, recorded_cols=recorded_cols) return df.to_csv( None, columns=columns, # encoding='utf-8', quoting=csv.QUOTE_NONNUMERIC ).encode()
def getFeatures(self): csv_files = self.experiment.getFeaturesFilesFullpaths() features_names = [] features = None for csv_file in csv_files: with open(csv_file, 'r') as f: header = f.readline().strip('\n').split(',')[1:] features_names += header current_features = list(list(rec) for rec in csv.reader(f, quoting = csv.QUOTE_NONNUMERIC)) if features is None: features = [l[1:] for l in current_features] else: features = [f1 + f2[1:] for f1, f2 in zip(features, current_features)] features = np.array(features) return features, features_names
def query_to_csv(cursor, cols, outname): """ Turns the query into a CSV file for the GPA calculation. :param cursor: the database cursor :type cursor: DictCursor :param cols: the header names :type cols: list :param outname: the CSV output filename :type outname: str """ logger.info("Generating CSV: {0}".format(outname)) with open(outname, 'w') as outfile: writer = csv.writer(outfile, quoting=csv.QUOTE_NONNUMERIC) writer.writerow(cols.split(",")) for row in cursor.fetchall(): writer.writerow(row) outfile.flush() logger.info("Generated CSV ({0}) exists: ".format(outname, os.path.isfile(outname)))
def writerow(self, row): def _check_as_is(x): return (self.quoting == csv.QUOTE_NONNUMERIC and is_number(x)) or isinstance(x, str) row = [x if _check_as_is(x) else pprint_thing(x).encode("utf-8") for x in row] self.writer.writerow([s for s in row]) # Fetch UTF-8 output from the queue ... data = self.queue.getvalue() data = data.decode("utf-8") # ... and reencode it into the target encoding data = self.encoder.encode(data) # write to the target stream self.stream.write(data) # empty queue self.queue.truncate(0)
def writerows(self, rows): def _check_as_is(x): return (self.quoting == csv.QUOTE_NONNUMERIC and is_number(x)) or isinstance(x, str) for i, row in enumerate(rows): rows[i] = [x if _check_as_is(x) else pprint_thing(x).encode("utf-8") for x in row] self.writer.writerows([[s for s in row] for row in rows]) # Fetch UTF-8 output from the queue ... data = self.queue.getvalue() data = data.decode("utf-8") # ... and reencode it into the target encoding data = self.encoder.encode(data) # write to the target stream self.stream.write(data) # empty queue self.queue.truncate(0)
def save_csv(matrix, output_directory, output_file_name): """ Saves the input matrix as a CSV File Args: matrix(list): an array containing data to be saved output_drectory(str): location to save graph output_file_name(str): name of the csv file to be saved Returns: null """ if config.DEBUGGER: print "Generating", (output_file_name + ".csv") check_if_dir_exists(output_directory) #create output directory if doesn't exist output_file = output_directory + "/" + output_file_name +".csv" with open(output_file, 'wb') as myfile: wr = csv.writer(myfile, quoting=csv.QUOTE_NONNUMERIC) if matrix is not None: for col in matrix: wr.writerow(col)
def writecsv(estimates, filepath, append=False): """ Write a CSV file of estimates. If append = True, I do NOT overwrite an existing file, but append to it ! """ if append: f = open(filepath, "a") else: f = open(filepath, "w") writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC) for est in estimates: #est.td = "%.5f" % (est.td) #est.tderr = "%.5f" % (est.tderr) #est.ms = "%.5f" % (est.ms) #est.timetaken = "%.5f" % (est.timetaken) writer.writerow(est.aslist()) f.close() print "Wrote %i estimates into %s" % (len(estimates), filepath)
def build_graph_for_file(file_path, dir_name, name): data = open(file_path, 'r') G=nx.DiGraph() rows = csv.reader(data, quoting=csv.QUOTE_NONNUMERIC) next(rows) #skip the header for row in rows: row_fil = list(filter(lambda x: type(x) is float, row)) if G.has_node(row_fil[0]) is not True: G.add_node(row_fil[0], market_id=row_fil[1]) if G.has_node(row_fil[2]) is not True: G.add_node(row_fil[2], market_id=row_fil[3]) if G.has_edge(row_fil[0], row_fil[2]): old = G.get_edge_data(row_fil[0], row_fil[2]) G.add_edge(row_fil[0], row_fil[2], num_of_people=old['num_of_people'] + row_fil[4], total_price=old['total_price'] + row_fil[5]) else: G.add_edge(row_fil[0], row_fil[2], num_of_people=row_fil[4], total_price=row_fil[5]) output_file_path = ('graphs/' + name + '.gexf') nx.write_gexf(G, output_file_path)
def render(self, form_data: dict): output = io.StringIO() writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC, delimiter=",") tz = pytz.timezone(self.event.settings.timezone) headers = [ _('Order code'), _('Order date'), _('Invoices'), _('SEPA export date'), _('Payment amount') ] writer.writerow(headers) qs = SepaExportOrder.objects.filter(export__event=self.event).order_by('export__datetime').select_related( 'export', 'order' ).prefetch_related('order__invoices') for seo in qs: row = [ seo.order.code, seo.order.datetime.astimezone(tz).strftime('%Y-%m-%d'), ', '.join([i.number for i in seo.order.invoices.all()]), seo.export.datetime.astimezone(tz).strftime('%Y-%m-%d %H:%M:%S'), seo.amount, ] writer.writerow(row) return 'sepaexports.csv', 'text/csv', output.getvalue().encode("utf-8")
def _build_file(self): self.total_rows = 0 #Get recent tweets from dril and add to new file for status in tweepy.Cursor(api.user_timeline, 'dril', since_id=self.since).items(): self.total_rows += self._process_status(status) #Put content of old file in new file #This is kind of messy uhhh try: #Open things for reading and writing readFile = open('data/dril.csv', 'rt', encoding='utf-8') writeFile = open('data/new.csv', 'at', encoding='utf-8') read = reader(readFile) write = writer(writeFile, delimiter=',', quoting=QUOTE_NONNUMERIC) #Uhhhhmmmmmhmh mmmm for row in read: write.writerow([int(row[0]), row[1]]) self.total_rows += 1 except IOError: print('Failed to open file (1) [okay if this is the first time running]') #Rename the new file to be the old file os.rename('data/new.csv', 'data/dril.csv')
def make_csv(edata, verbose=None): fieldnames = ["id", 'doc_number', 'doc_date', 'doc_v_date', "trans_date", "amount", "payer_edrpou", "payer_name", 'payer_account', "payer_mfo", "payer_bank", "recipt_edrpou", "recipt_name", 'recipt_account', "recipt_mfo", "recipt_bank", "region_id", 'doc_add_attr', "payment_details",] try: with open('edata.csv', 'w') as csvfile: writer = csv.DictWriter( csvfile, fieldnames=fieldnames, delimiter=';', quoting=csv.QUOTE_NONNUMERIC ) writer.writeheader() for row in edata: writer.writerow(row) if verbose: sys.stdout.write("{} ?????? ????????\n".format(len(edata))) except: raise
def to_csv(self, fileobj, header=False, totals=False, delimiter=';', quotechar='"', quoting=csv.QUOTE_NONNUMERIC, escapechar='', extra_rows=None, **kwargs): writer = csv.writer(fileobj, delimiter=str(delimiter), quotechar=str(quotechar), quoting=quoting, escapechar=str(escapechar), **kwargs) if extra_rows is not None: writer.writerows(extra_rows) if header: writer.writerow([name.encode(settings.DEFAULT_CHARSET) for name, _ in self.get_fields()]) for record in self.iter_results(): writer.writerow([elem.encode(settings.DEFAULT_CHARSET) if isinstance(elem, unicode) else elem for elem in record]) if totals and self.get_has_totals(): writer.writerow(self.totals)
def run(self): output = open(self.output, 'w', newline="\n", encoding="utf-8") threads = requests.get(self.uri).json() if 'error' in threads: print(threads) return fieldnames = ['from_id', 'from', 'time', 'message', 'attachments', 'shares', 'url'] self.writer = csv.DictWriter(output, dialect='excel', fieldnames=fieldnames, extrasaction='ignore', quoting=csv.QUOTE_NONNUMERIC) self.writer.writerow(dict((n, n) for n in fieldnames)) self.scrape_thread_list(threads, 5) output.close()
def __init__(self, outputFile): self.writer = csv.writer( outputFile, quoting=csv.QUOTE_NONNUMERIC, ) self.group_name = ''
def insert(self, table, columns, types, values, primary_key_index=[], is_orreplace=False, is_commit=True): """ Insert into the table :param table: Table name :param columns: Column array :param types: Type array :param values: Value array :param primary_key_index: An array of indices of primary keys in columns, e.g. [0] means the first column is the primary key :param is_orreplace: Indicate if the query is "INSERT OR REPLACE" """ ret = True file_path = os.path.join(self.file_directory, table + ".csv") if len(columns) != len(values): return False self.lock.acquire() if not os.path.isfile(file_path): ret = False else: with open(file_path, "a+") as csvfile: writer = csv.writer(csvfile, lineterminator='\n', quotechar='\"', quoting=csv.QUOTE_NONNUMERIC) writer.writerow(values) self.lock.release() if not ret: raise Exception("File (%s) has not been created.") return True
def csvWrite(filePath, data): with open(filePath, 'a') as fd: writer = csv.writer(fd, delimiter=',', quoting=csv.QUOTE_NONNUMERIC) writer.writerow(data)
def get_csv(self): csvout = StringIO() csvwriter = csv.writer( csvout, dialect='excel', quoting=csv.QUOTE_NONNUMERIC ) csvwriter.writerow(self.column_names) for row in self.rows: row_formatted = [] for cell in row: if isinstance(cell, datetime.datetime): cell = cell.strftime('%Y-%m-%d %H:%M') elif isinstance(cell, (int, long)): cell = str(cell) elif isinstance(cell, (list, tuple)): cell = str(cell) elif cell is None: cell = '' else: cell = cell.encode('utf8') row_formatted.append(cell) try: csvwriter.writerow(row_formatted) except Exception, e: raise Exception("%s: %s, %s"%(e, row, row_formatted)) csvout.seek(0) return csvout.read()
def test_write_quoting(self): self._write_test(['a',1,'p,q'], 'a,1,"p,q"') self.assertRaises(csv.Error, self._write_test, ['a',1,'p,q'], 'a,1,p,q', quoting = csv.QUOTE_NONE) self._write_test(['a',1,'p,q'], 'a,1,"p,q"', quoting = csv.QUOTE_MINIMAL) self._write_test(['a',1,'p,q'], '"a",1,"p,q"', quoting = csv.QUOTE_NONNUMERIC) self._write_test(['a',1,'p,q'], '"a","1","p,q"', quoting = csv.QUOTE_ALL) self._write_test(['a\nb',1], '"a\nb","1"', quoting = csv.QUOTE_ALL)
def test_read_quoting(self): self._read_test(['1,",3,",5'], [['1', ',3,', '5']]) self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']], quotechar=None, escapechar='\\') self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']], quoting=csv.QUOTE_NONE, escapechar='\\') # will this fail where locale uses comma for decimals? self._read_test([',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]], quoting=csv.QUOTE_NONNUMERIC) self._read_test(['"a\nb", 7'], [['a\nb', ' 7']]) self.assertRaises(ValueError, self._read_test, ['abc,3'], [[]], quoting=csv.QUOTE_NONNUMERIC)
def test_write_quoting(self): self._write_test(['a',1,'p,q'], 'a,1,"p,q"') self._write_error_test(csv.Error, ['a',1,'p,q'], quoting = csv.QUOTE_NONE) self._write_test(['a',1,'p,q'], 'a,1,"p,q"', quoting = csv.QUOTE_MINIMAL) self._write_test(['a',1,'p,q'], '"a",1,"p,q"', quoting = csv.QUOTE_NONNUMERIC) self._write_test(['a',1,'p,q'], '"a","1","p,q"', quoting = csv.QUOTE_ALL) self._write_test(['a\nb',1], '"a\nb","1"', quoting = csv.QUOTE_ALL)
def test_write_float(self): # Issue 13573: loss of precision because csv.writer # uses str() for floats instead of repr() orig_row = [1.234567890123, 1.0/7.0, 'abc'] f = StringIO() c = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC) c.writerow(orig_row) f.seek(0) c = csv.reader(f, quoting=csv.QUOTE_NONNUMERIC) new_row = next(c) self.assertEqual(orig_row, new_row)
def run_experiment(experiment): ''' This function is the main function It takes a string 'experiment' that drives the naming of the files - it generates the filenames for the datasources, schemas, recipes - it sends a randomized version of the data to an s3 bucket - creates the train, valid and test datasources - creates the model - creates the evaluation ''' # start by generating all teh filenames for the current experiment filenames = generate_filenames(experiment) # load the data form local, shuffle and save back to original file df = pd.read_csv(filepath + filenames['main']) df = df.reindex(np.random.permutation(df.index)) df.to_csv(filepath + filenames['main'], quoting= csv.QUOTE_NONNUMERIC, index=False) # sends the original file to s3 os.system("aws s3 cp %s%s %s "% (filepath, filenames['main'], s3_path) ) # write cli JSON create_dsrc("train", 0, 60) create_dsrc("valid", 60, 80) create_dsrc("test", 80, 100) create_model() create_eval("valid") create_eval("test") # ---------------------------------------------------------------------------- # datasource, model, evaluation and batch predictions functions # These functions all do the same thing # 1. write the JSON parameters to a JSON formatted file in json_path folder # 2. execute the AWS CLI command that will create the object: datasource, model, ... # ----------------------------------------------------------------------------
def _quotestr_to_constants(_str): if _str is None: return csv.QUOTE_NONE elif _str.upper() == "MINIMAL": return csv.QUOTE_MINIMAL elif _str.upper() == "ALL": return csv.QUOTE_ALL elif _str.upper() == "NONNUMERIC": return csv.QUOTE_NONNUMERIC else: raise Exception("Error in _quotestr_to_constants: " + str(_str) + " is an invalid quotestr.")
def to_csv(df): data = df.to_csv(header=False, index=False, encoding='utf-8', quoting=csv.QUOTE_NONNUMERIC, escapechar='\\') if PY3: return data.encode('utf-8') else: return data
def test_to_csv_quoting(self): df = DataFrame({'A': [1, 2, 3], 'B': ['foo', 'bar', 'baz']}) buf = StringIO() df.to_csv(buf, index=False, quoting=csv.QUOTE_NONNUMERIC) result = buf.getvalue() expected = ('"A","B"\n' '1,"foo"\n' '2,"bar"\n' '3,"baz"\n') self.assertEqual(result, expected) # quoting windows line terminators, presents with encoding? # #3503 text = 'a,b,c\n1,"test \r\n",3\n' df = pd.read_csv(StringIO(text)) buf = StringIO() df.to_csv(buf, encoding='utf-8', index=False) self.assertEqual(buf.getvalue(), text) # testing if quoting parameter is passed through with multi-indexes # related to issue #7791 df = pd.DataFrame({'a': [1, 2], 'b': [3, 4], 'c': [5, 6]}) df = df.set_index(['a', 'b']) expected = '"a","b","c"\n"1","3","5"\n"2","4","6"\n' self.assertEqual(df.to_csv(quoting=csv.QUOTE_ALL), expected)
def test_to_csv_unicodewriter_quoting(self): df = DataFrame({'A': [1, 2, 3], 'B': ['foo', 'bar', 'baz']}) buf = StringIO() df.to_csv(buf, index=False, quoting=csv.QUOTE_NONNUMERIC, encoding='utf-8') result = buf.getvalue() expected = ('"A","B"\n' '1,"foo"\n' '2,"bar"\n' '3,"baz"\n') self.assertEqual(result, expected)
def _stringify(s, encoding, errors): if s is None: return '' if isinstance(s, unicode): return s.encode(encoding, errors) elif isinstance(s, (int , float)): pass #let csv.QUOTE_NONNUMERIC do its thing. elif not isinstance(s, str): s=str(s) return s
def pandasWriteCallback(data, outputFile, **kw): """Write callback for Pandas data frames""" # import only when required global pandas; import pandas # pylint: disable=F0401 if not isinstance(data, pandas.DataFrame): raise TypeError("pandas.DataFrame expected as first argument") data.to_csv(outputFile, header = False, index = False, quoting = csv.QUOTE_NONNUMERIC, **kw)
def __init__(self, logPath, logFileName, serialPort, fieldnames): self.logPath = logPath self.logFileName = logFileName # open the output file (for appending) self.out = open(os.path.join(logPath, logFileName), 'w' ) self.fieldnames = fieldnames self.writer = csv.DictWriter(self.out, fieldnames = fieldnames, dialect = 'excel', quoting = csv.QUOTE_NONNUMERIC) self.writer.writeheader() self.serialPort = serialPort self._connect_arduino()
def printFreqDistCSV( dist, filename='' ): n_samples = len(dist.keys()) n_repeating_samples = sum([ 1 for (k,v) in dist.items () if v>1 ]) n_outcomes = dist._N print '%-12s %-12s %-12s'%( 'Samples', 'RepSamples', 'Outcomes' ) print '%12d %12d %12d'%( n_samples, n_repeating_samples, n_outcomes ) if( len(filename)>0 and '_'!=filename[0] ): with open( filename, 'w' ) as fcsv: distwriter = csv.writer( fcsv, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC ) for (key,value) in dist.items(): distwriter.writerow( [key, value] ) #print key, '\t,\t', dist[key]
def oldStats2CSV( in_file, fileprefix=''): if fileprefix == '': fileprefix = in_file.rstrip('_stats.txt') fp = open( in_file, 'r' ) fq = open( fileprefix+'_statsnew.txt', 'w' ) line = '' line_start = 0 line_count = 20 line_end = line_start+line_count for line_num in range(line_start, line_end): # write Statistics line = fp.readline() fq.write( line ) for section in [1,2,3]: line_start = line_end line_count = 2 line_end = line_start+line_count for line_num in range( line_start, line_end ): line = fp.readline() fq.write( line ) line_start = line_end line_count = [int(l) for l in line.split() if l.isdigit()][0] line_end = line_start+line_count fr = open( fileprefix+'_%dgrams.csv'%section, 'w') fwrt = csv.writer( fr, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC ) for line_num in range( line_start, line_end ): # write unigrams line = fp.readline() row = line.split('\t,\t') row[0] = row[0].strip() row[1] = int(row[1]) fwrt.writerow( row ) fr.close() fp.close() fq.close()
def write_sql_to_file( self, sql, bind, last_id_name=None, with_header=True, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC): conn = self.connection cur = conn.cursor(pymysql.cursors.SSCursor) cur.execute(sql, bind) fields_name = [] schemas = [] count = 0 last_id_column = None last_id = None sql_type = [] description = cur.description for column in description: fields_name.append(column[0]) sql_type.append(column[1]) schemas.append( SchemaField(column[0], self.get_type(column[1]))) if column[0] == last_id_name: last_id_column = count count += 1 ofile = open(self.full_path_name(), 'wb') csv_writer = csv.writer( ofile, delimiter=delimiter, quotechar=quotechar, quoting=quoting) if with_header: csv_writer.writerow(fields_name) while True: results = cur.fetchmany(size=1000) if len(results) != 0: for result in results: data = self.process_binary(result, sql_type, description) csv_writer.writerow(data) if last_id_column is not None: last_id = result[last_id_column] else: break cur.close() ofile.close() return schemas, last_id
def _merge_data(appId, ids, group_by, ext): eStatAPI._['appId'] = appId aggregate = request.args.get('aggregate') if request.args.get( 'aggregate') is not None else '' data = eStatAPI.merge_data(ids, group_by, aggregate) eStatAPI.path['tmp_merge'] = eStatAPI.path['tmp'] + '.'.join( [eStatAPI._['appId'], ''.join([l for l in random.choice(eStatAPI.random_str)]), 'csv']) data.to_csv(eStatAPI.path['tmp_merge'], quoting=csv.QUOTE_NONNUMERIC, index=None) tmp_csv = eStatAPI.cmd_line(eStatAPI.build_cmd( ['cat', eStatAPI.path['tmp_merge']])) eStatAPI.cmd_line(eStatAPI.build_cmd(['rm', eStatAPI.path['tmp_merge']])) return eStatAPI.response(eStatAPI.get_output(tmp_csv, ext), ext)