我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.excel()。
def setup_output_writers(parent_dir, fold_number): """ Create an output directory for the fold under the provided parent dir :param str parent_dir: file path to the output dir :param int fold_number: fold number to use in directory name :return: writer for <outdir.name>/fold<fold_number>/train.csv and <outdir.name>/fold<fold_number>/validation.csv :rtype: tuple(csv.writer,csv.writer) """ output_dir = path.join(parent_dir, "Fold%d" % fold_number) if not path.isdir(output_dir): LOGGER.debug("Creating output for fold %d at the location: %s" % (fold_number, output_dir)) makedirs(output_dir) else: LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir) train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'), dialect=csv.excel, delimiter=',') validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'), dialect=csv.excel, delimiter=',') return train_writer, validation_writer
def open_anything(source, format, ignoreheader, force_unbuffered=False): source = open_regular_or_compressed(source) if force_unbuffered: # simply disabling buffering is not enough, see this for details: http://stackoverflow.com/a/6556862 source = iter(source.readline, '') if format == 'vw': return source if format == 'tsv': reader = csv.reader(source, csv.excel_tab) if ignoreheader: reader.next() elif format == 'csv': reader = csv.reader(source, csv.excel) if ignoreheader: reader.next() else: raise ValueError('format not supported: %s' % format) return reader
def read_csv(handle): """ Read CSV file :param handle: File-like object of the CSV file :return: csv.reader object """ # These functions are to handle unicode in Python 2 as described in: # https://docs.python.org/2/library/csv.html#examples def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs): """ csv.py doesn't do Unicode; encode temporarily as UTF-8.""" csv_reader = csv.reader(utf_8_encoder(unicode_csv_data), dialect=dialect, **kwargs) for row in csv_reader: # decode UTF-8 back to Unicode, cell by cell: yield [unicode(cell, 'utf-8') for cell in row] def utf_8_encoder(unicode_csv_data): """ Encode with UTF-8.""" for line in unicode_csv_data: yield line.encode('utf-8') return unicode_csv_reader(handle) if PY2 else csv.reader(handle)
def test_space_dialect(self): class space(csv.excel): delimiter = " " quoting = csv.QUOTE_NONE escapechar = "\\" fd, name = tempfile.mkstemp() fileobj = os.fdopen(fd, "w+b") try: fileobj.write("abc def\nc1ccccc1 benzene\n") fileobj.seek(0) rdr = csv.reader(fileobj, dialect=space()) self.assertEqual(rdr.next(), ["abc", "def"]) self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"]) finally: fileobj.close() os.unlink(name)
def test_int_write(self): import array contents = [(20-i) for i in range(20)] a = array.array('i', contents) fd, name = tempfile.mkstemp() fileobj = os.fdopen(fd, "w+b") try: writer = csv.writer(fileobj, dialect="excel") writer.writerow(a) expected = ",".join([str(i) for i in a])+"\r\n" fileobj.seek(0) self.assertEqual(fileobj.read(), expected) finally: fileobj.close() os.unlink(name)
def test_dialect(self): data = """\ label1,label2,label3 index1,"a,c,e index2,b,d,f """ dia = csv.excel() dia.quoting = csv.QUOTE_NONE df = self.read_csv(StringIO(data), dialect=dia) data = '''\ label1,label2,label3 index1,a,c,e index2,b,d,f ''' exp = self.read_csv(StringIO(data)) exp.replace('a', '"a', inplace=True) tm.assert_frame_equal(df, exp)
def setup_train_and_test_writer(output_dir): """ Create an output directory for the fold under the provided parent dir :param str output_dir: file path to the output dir :return: writer for <outdir.name>/train.csv and <outdir.name>/validation.csv :rtype: tuple(csv.writer,csv.writer) """ if not path.isdir(output_dir): makedirs(output_dir) else: LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir) train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'), dialect=csv.excel, delimiter=',') validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'), dialect=csv.excel, delimiter=',') return train_writer, validation_writer
def __init__(self, f, fieldnames=None, encoding=UTF8, **kwds): self.encoding = encoding try: self.reader = csv.reader(UTF8Recoder(f, encoding) if self.encoding != UTF8 else f, dialect=csv.excel, **kwds) if not fieldnames: self.fieldnames = self.reader.next() if len(self.fieldnames) > 0 and self.fieldnames[0].startswith(codecs.BOM_UTF8): self.fieldnames[0] = self.fieldnames[0].replace(codecs.BOM_UTF8, u'', 1) else: self.fieldnames = fieldnames except (csv.Error, StopIteration): self.fieldnames = [] except LookupError as e: Cmd.Backup() usageErrorExit(e) self.numfields = len(self.fieldnames)
def export_csv(modeladmin, request, queryset): import csv from django.utils.encoding import smart_str response = HttpResponse(content_type='text/csv') response['Content-Disposition'] = 'attachment; filename=elehphant_sightings.csv' writer = csv.writer(response, csv.excel) response.write(u'\ufeff'.encode('utf8')) # BOM (optional...Excel needs it to open UTF-8 file properly) writer.writerow([ smart_str(u"ID"), smart_str(u"Reported At"), smart_str(u"Latitude"), smart_str(u"Longitude"), smart_str(u"Message"), smart_str(u"Informer"), ]) for obj in queryset: writer.writerow([ smart_str(obj.pk), smart_str(str(obj.created_at)), smart_str(obj.location), smart_str(obj.message), smart_str(obj.informer.name), ]) return response
def test_dialect_apply(self): class testA(csv.excel): delimiter = "\t" class testB(csv.excel): delimiter = ":" class testC(csv.excel): delimiter = "|" class testUni(csv.excel): delimiter = "\u039B" csv.register_dialect('testC', testC) try: self.compare_dialect_123("1,2,3\r\n") self.compare_dialect_123("1\t2\t3\r\n", testA) self.compare_dialect_123("1:2:3\r\n", dialect=testB()) self.compare_dialect_123("1|2|3\r\n", dialect='testC') self.compare_dialect_123("1;2;3\r\n", dialect=testA, delimiter=';') self.compare_dialect_123("1\u039B2\u039B3\r\n", dialect=testUni) finally: csv.unregister_dialect('testC')
def on_train_end(self, logs=None): REJECT_KEYS={'has_validation_data'} row_dict = self.row_dict class CustomDialect(csv.excel): delimiter = self.sep self.keys = self.keys temp_file = NamedTemporaryFile(delete=False, mode='w') with open(self.file, 'r') as csv_file, temp_file: reader = csv.DictReader(csv_file, fieldnames=['model'] + [k for k in self.keys if k not in REJECT_KEYS], dialect=CustomDialect) writer = csv.DictWriter(temp_file, fieldnames=['model'] + [k for k in self.keys if k not in REJECT_KEYS], dialect=CustomDialect) for row_idx, row in enumerate(reader): if row_idx == 0: # re-write header with on_train_end's metrics pass if row['model'] == self.row_dict['model']: writer.writerow(row_dict) else: writer.writerow(row) shutil.move(temp_file.name, self.file)
def on_epoch_end(self, epoch, logs=None): logs = logs or {} def handle_value(k): is_zero_dim_ndarray = isinstance(k, np.ndarray) and k.ndim == 0 if isinstance(k, Iterable) and not is_zero_dim_ndarray: return '"[%s]"' % (', '.join(map(str, k))) else: return k if not self.writer: self.keys = sorted(logs.keys()) class CustomDialect(csv.excel): delimiter = self.sep self.writer = csv.DictWriter(self.csv_file, fieldnames=['epoch'] + self.keys, dialect=CustomDialect) if self.append_header: self.writer.writeheader() row_dict = OrderedDict({'epoch': epoch}) row_dict.update((key, handle_value(logs[key])) for key in self.keys) self.writer.writerow(row_dict) self.csv_file.flush()
def write_csv(output_filename, dict_list, delimiter, verbose=False): """Write a CSV file """ if not dict_list: if verbose: print('Not writing %s; no lines to write' % output_filename) return dialect = csv.excel dialect.delimiter = delimiter with open(output_filename, 'w') as f: dict_writer = csv.DictWriter(f, fieldnames=dict_list[0].keys(), dialect=dialect) dict_writer.writeheader() dict_writer.writerows(dict_list) if verbose: print 'Wrote %s' % output_filename
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds): f = UTF8Recoder(f, encoding) self.reader = csv.reader(f, dialect=dialect, **kwds)
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds): self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()
def __init__(self, filename, dialect=csv.excel, encoding="utf-8", **kw): self.filename = filename self.dialect = dialect self.encoding = encoding self.kw = kw
def process(options, args): ''' Save air quality data in a csv_file ''' url = "http://www.seremisaludrm.cl/sitio/pag/aire/indexjs3aireindices-prueba.asp" sock = urllib.urlopen(url) htmlSource = sock.read() sock.close() csv_file = options.csv_file csv_exists = False # Append sensors data air_data = extract_data(htmlSource) encabezado = air_data.next() encabezado.append('DATE') encabezado.append('TIME') date = extract_date(htmlSource) time = extract_time(htmlSource) if os.path.exists(csv_file): f_in = open(csv_file, "rb") Reader = csv.reader(f_in, dialect=csv.excel) encabezado_old = Reader.next() csv_exists = True f_out = open(csv_file+'~', 'wb') Writer = csv.writer(f_out, dialect=csv.excel) # TODO check encabezado == encabezado_old Writer.writerow(encabezado) for row in air_data: row.append(date) row.append(time) Writer.writerow(row) if csv_exists: for row in Reader: Writer.writerow(row) f_in.close() os.remove(csv_file) f_out.close() os.rename(csv_file+'~', csv_file)
def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs): """Unicode CSV reader.""" # This code is taken from http://docs.python.org/library/csv.html#examples # csv.py doesn't do Unicode; encode temporarily as UTF-8: csv_reader = csv.reader(utf_8_encoder(unicode_csv_data), dialect=dialect, **kwargs) for row in csv_reader: # decode UTF-8 back to Unicode, cell by cell: yield [unicode(cell, 'utf-8') for cell in row]
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): """Init method.""" # Redirect output to a queue self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()
def csv_reader_converter(utf8_data, dialect=csv.excel, **kwargs): csv_reader = csv.reader(utf8_data, dialect=dialect, **kwargs) for row in csv_reader: yield [unicode(cell, 'latin-1') for cell in row]
def __init__(self, f, dialect=csv.excel, **kwds): f = UTF8Recoder(f) self.reader = csv.reader(f, dialect=dialect, **kwds)
def __init__(self, f, dialect=csv.excel, **kwds): self.queue = io.BytesIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f
def handle(self, *args, **options): tmpdir = tempfile.mkdtemp() try: queryset = AnalysisJob.objects.all().filter(status=AnalysisJob.Status.COMPLETE) filter_set = AnalysisJobFilterSet() queryset = filter_set.filter_latest(queryset, 'latest', True) tmp_csv_filename = os.path.join(tmpdir, 'results.csv') with open(tmp_csv_filename, 'w') as csv_file: writer = None fieldnames = [] for job in queryset: row_data = {} for export in EXPORTS: columns, values = export(job) if writer is None: fieldnames = fieldnames + columns for column, value in zip(columns, values): row_data[column] = value if writer is None: writer = csv.DictWriter(csv_file, fieldnames=fieldnames, dialect=csv.excel, quoting=csv.QUOTE_MINIMAL) writer.writeheader() writer.writerow(row_data) s3_client = boto3.client('s3') now = datetime.utcnow() s3_key = 'analysis-spreadsheets/results-{}.csv'.format(now.strftime('%Y-%m-%dT%H%M')) s3_client.upload_file(tmp_csv_filename, settings.AWS_STORAGE_BUCKET_NAME, s3_key) logger.info('File uploaded to: s3://{}/{}' .format(settings.AWS_STORAGE_BUCKET_NAME, s3_key)) finally: shutil.rmtree(tmpdir)
def get_csv(self): # Call the URL, get the response, parse it strictly as CSV, # and return the list of dictionaries rsp = self.client.get(self.url) self.assertEqual(200, rsp.status_code) dialect = csv.excel() dialect.strict = True reader = csv.DictReader(StringIO(rsp.content), dialect=dialect) result = [] for item in reader: for k, v in item.iteritems(): item[k] = v.decode('utf-8') result.append(item) return result
def __init__(self, *args, **kwargs): super(RankerRelevanceFileQueryStream, self).__init__(*args, **kwargs) dialect = csv.excel # The following explicit assignments shadow the dialect defaults # but are necessary to avoid strange behavior while called by # certain unit tests. Please do not delete. dialect.doublequote = True dialect.quoting = csv.QUOTE_MINIMAL dialect.skipinitialspace = True self.__reader__ = csv.reader(self.query_file, dialect=dialect)
def __init__(self, f, delimiter=',', dialect=csv.excel, encoding="utf-8"): self.reader = csv.DictReader(f, delimiter=delimiter, dialect=dialect)
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs): """ Instantiates the UnicodeWriter instance :param f: File like object to write CSV data to :param dialect: The dialect for the CSV :param encoding: The CSV encoding :param kwargs: Keyword args """ self.writer = csv.writer(f)
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs): """ Instantiates the UnicodeWriter instance :param f: File like object to write CSV data to :param dialect: The dialect for the CSV :param encoding: The CSV encoding :param kwargs: Keyword args """ self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwargs) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()
def __init__(self, f, fieldnames, dialect=csv.excel, encoding="utf-8", newfile=True, **kwds): # Redirect output to a queue self.queue = cStringIO.StringIO() self.writer = csv.DictWriter(self.queue, fieldnames, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)() if newfile: self.writebom()
def writebom(self): """Write BOM, so excel can identify this as UTF8""" self.stream.write(u'\ufeff'.encode('utf8'))
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): # Redirect output to a queue self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()
def __init__(self, f, dialect=csv.excel, encoding='utf-8', **kwds): f = CSVRecoder(f, encoding) self.reader = csv.reader(f, dialect=dialect, **kwds)
def test_registry(self): class myexceltsv(csv.excel): delimiter = "\t" name = "myexceltsv" expected_dialects = csv.list_dialects() + [name] expected_dialects.sort() csv.register_dialect(name, myexceltsv) self.addCleanup(csv.unregister_dialect, name) self.assertEqual(csv.get_dialect(name).delimiter, '\t') got_dialects = sorted(csv.list_dialects()) self.assertEqual(expected_dialects, got_dialects)
def test_space_dialect(self): class space(csv.excel): delimiter = " " quoting = csv.QUOTE_NONE escapechar = "\\" with TemporaryFile("w+") as fileobj: fileobj.write("abc def\nc1ccccc1 benzene\n") fileobj.seek(0) reader = csv.reader(fileobj, dialect=space()) self.assertEqual(next(reader), ["abc", "def"]) self.assertEqual(next(reader), ["c1ccccc1", "benzene"])
def test_int_write(self): import array contents = [(20-i) for i in range(20)] a = array.array('i', contents) with TemporaryFile("w+", newline='') as fileobj: writer = csv.writer(fileobj, dialect="excel") writer.writerow(a) expected = ",".join([str(i) for i in a])+"\r\n" fileobj.seek(0) self.assertEqual(fileobj.read(), expected)
def test_double_write(self): import array contents = [(20-i)*0.1 for i in range(20)] a = array.array('d', contents) with TemporaryFile("w+", newline='') as fileobj: writer = csv.writer(fileobj, dialect="excel") writer.writerow(a) expected = ",".join([str(i) for i in a])+"\r\n" fileobj.seek(0) self.assertEqual(fileobj.read(), expected)
def test_char_write(self): import array, string a = array.array('u', string.ascii_letters) with TemporaryFile("w+", newline='') as fileobj: writer = csv.writer(fileobj, dialect="excel") writer.writerow(a) expected = ",".join(a)+"\r\n" fileobj.seek(0) self.assertEqual(fileobj.read(), expected)
def __init__(self, f, dialect=csv.excel, encoding="utf-8", errors='replace', **kwds): # Redirect output to a queue self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f encoder_cls = codecs.getincrementalencoder(encoding) self.encoder = encoder_cls(errors=errors)