Python csv 模块,excel() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.excel()

项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def setup_output_writers(parent_dir, fold_number):
    """
    Create an output directory for the fold under the provided parent dir
    :param str parent_dir: file path to the output dir
    :param int fold_number: fold number to use in directory name
    :return: writer for <outdir.name>/fold<fold_number>/train.csv and <outdir.name>/fold<fold_number>/validation.csv
    :rtype: tuple(csv.writer,csv.writer)
    """
    output_dir = path.join(parent_dir, "Fold%d" % fold_number)
    if not path.isdir(output_dir):
        LOGGER.debug("Creating output for fold %d at the location: %s" % (fold_number, output_dir))
        makedirs(output_dir)
    else:
        LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir)

    train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'),
                              dialect=csv.excel, delimiter=',')
    validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'),
                                   dialect=csv.excel, delimiter=',')

    return train_writer, validation_writer
项目:vwoptimize    作者:denik    | 项目源码 | 文件源码
def open_anything(source, format, ignoreheader, force_unbuffered=False):
    source = open_regular_or_compressed(source)

    if force_unbuffered:
        # simply disabling buffering is not enough, see this for details: http://stackoverflow.com/a/6556862
        source = iter(source.readline, '')

    if format == 'vw':
        return source

    if format == 'tsv':
        reader = csv.reader(source, csv.excel_tab)
        if ignoreheader:
            reader.next()
    elif format == 'csv':
        reader = csv.reader(source, csv.excel)
        if ignoreheader:
            reader.next()
    else:
        raise ValueError('format not supported: %s' % format)

    return reader
项目:pycsvw    作者:bloomberg    | 项目源码 | 文件源码
def read_csv(handle):
    """ Read CSV file
    :param handle: File-like object of the CSV file
    :return: csv.reader object
    """

    # These functions are to handle unicode in Python 2 as described in:
    # https://docs.python.org/2/library/csv.html#examples
    def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs):
        """ csv.py doesn't do Unicode; encode temporarily as UTF-8."""
        csv_reader = csv.reader(utf_8_encoder(unicode_csv_data),
                                dialect=dialect, **kwargs)
        for row in csv_reader:
            # decode UTF-8 back to Unicode, cell by cell:
            yield [unicode(cell, 'utf-8') for cell in row]

    def utf_8_encoder(unicode_csv_data):
        """ Encode with UTF-8."""
        for line in unicode_csv_data:
            yield line.encode('utf-8')

    return unicode_csv_reader(handle) if PY2 else csv.reader(handle)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_space_dialect(self):
        class space(csv.excel):
            delimiter = " "
            quoting = csv.QUOTE_NONE
            escapechar = "\\"

        fd, name = tempfile.mkstemp()
        fileobj = os.fdopen(fd, "w+b")
        try:
            fileobj.write("abc def\nc1ccccc1 benzene\n")
            fileobj.seek(0)
            rdr = csv.reader(fileobj, dialect=space())
            self.assertEqual(rdr.next(), ["abc", "def"])
            self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
        finally:
            fileobj.close()
            os.unlink(name)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_int_write(self):
        import array
        contents = [(20-i) for i in range(20)]
        a = array.array('i', contents)

        fd, name = tempfile.mkstemp()
        fileobj = os.fdopen(fd, "w+b")
        try:
            writer = csv.writer(fileobj, dialect="excel")
            writer.writerow(a)
            expected = ",".join([str(i) for i in a])+"\r\n"
            fileobj.seek(0)
            self.assertEqual(fileobj.read(), expected)
        finally:
            fileobj.close()
            os.unlink(name)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_space_dialect(self):
        class space(csv.excel):
            delimiter = " "
            quoting = csv.QUOTE_NONE
            escapechar = "\\"

        fd, name = tempfile.mkstemp()
        fileobj = os.fdopen(fd, "w+b")
        try:
            fileobj.write("abc def\nc1ccccc1 benzene\n")
            fileobj.seek(0)
            rdr = csv.reader(fileobj, dialect=space())
            self.assertEqual(rdr.next(), ["abc", "def"])
            self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
        finally:
            fileobj.close()
            os.unlink(name)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_dialect(self):
        data = """\
label1,label2,label3
index1,"a,c,e
index2,b,d,f
"""

        dia = csv.excel()
        dia.quoting = csv.QUOTE_NONE
        df = self.read_csv(StringIO(data), dialect=dia)

        data = '''\
label1,label2,label3
index1,a,c,e
index2,b,d,f
'''
        exp = self.read_csv(StringIO(data))
        exp.replace('a', '"a', inplace=True)
        tm.assert_frame_equal(df, exp)
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def setup_train_and_test_writer(output_dir):
    """
    Create an output directory for the fold under the provided parent dir
    :param str output_dir: file path to the output dir
    :return: writer for <outdir.name>/train.csv and <outdir.name>/validation.csv
    :rtype: tuple(csv.writer,csv.writer)
    """
    if not path.isdir(output_dir):
        makedirs(output_dir)
    else:
        LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir)

    train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'),
                              dialect=csv.excel, delimiter=',')
    validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'),
                                   dialect=csv.excel, delimiter=',')

    return train_writer, validation_writer
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def __init__(self, f, fieldnames=None, encoding=UTF8, **kwds):
    self.encoding = encoding
    try:
      self.reader = csv.reader(UTF8Recoder(f, encoding) if self.encoding != UTF8 else f, dialect=csv.excel, **kwds)
      if not fieldnames:
        self.fieldnames = self.reader.next()
        if len(self.fieldnames) > 0 and self.fieldnames[0].startswith(codecs.BOM_UTF8):
          self.fieldnames[0] = self.fieldnames[0].replace(codecs.BOM_UTF8, u'', 1)
      else:
        self.fieldnames = fieldnames
    except (csv.Error, StopIteration):
      self.fieldnames = []
    except LookupError as e:
      Cmd.Backup()
      usageErrorExit(e)
    self.numfields = len(self.fieldnames)
项目:elephant-sms-ews    作者:stultus    | 项目源码 | 文件源码
def export_csv(modeladmin, request, queryset):
    import csv
    from django.utils.encoding import smart_str
    response = HttpResponse(content_type='text/csv')
    response['Content-Disposition'] = 'attachment; filename=elehphant_sightings.csv'
    writer = csv.writer(response, csv.excel)
    response.write(u'\ufeff'.encode('utf8')) # BOM (optional...Excel needs it to open UTF-8 file properly)
    writer.writerow([
        smart_str(u"ID"),
        smart_str(u"Reported At"),
        smart_str(u"Latitude"),
        smart_str(u"Longitude"),
        smart_str(u"Message"),
        smart_str(u"Informer"),
    ])
    for obj in queryset:
        writer.writerow([
            smart_str(obj.pk),
            smart_str(str(obj.created_at)),
            smart_str(obj.location),
            smart_str(obj.message),
            smart_str(obj.informer.name),
        ])
    return response
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_dialect_apply(self):
        class testA(csv.excel):
            delimiter = "\t"
        class testB(csv.excel):
            delimiter = ":"
        class testC(csv.excel):
            delimiter = "|"
        class testUni(csv.excel):
            delimiter = "\u039B"

        csv.register_dialect('testC', testC)
        try:
            self.compare_dialect_123("1,2,3\r\n")
            self.compare_dialect_123("1\t2\t3\r\n", testA)
            self.compare_dialect_123("1:2:3\r\n", dialect=testB())
            self.compare_dialect_123("1|2|3\r\n", dialect='testC')
            self.compare_dialect_123("1;2;3\r\n", dialect=testA,
                                     delimiter=';')
            self.compare_dialect_123("1\u039B2\u039B3\r\n",
                                     dialect=testUni)

        finally:
            csv.unregister_dialect('testC')
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_space_dialect(self):
        class space(csv.excel):
            delimiter = " "
            quoting = csv.QUOTE_NONE
            escapechar = "\\"

        fd, name = tempfile.mkstemp()
        fileobj = os.fdopen(fd, "w+b")
        try:
            fileobj.write("abc def\nc1ccccc1 benzene\n")
            fileobj.seek(0)
            rdr = csv.reader(fileobj, dialect=space())
            self.assertEqual(rdr.next(), ["abc", "def"])
            self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
        finally:
            fileobj.close()
            os.unlink(name)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_int_write(self):
        import array
        contents = [(20-i) for i in range(20)]
        a = array.array('i', contents)

        fd, name = tempfile.mkstemp()
        fileobj = os.fdopen(fd, "w+b")
        try:
            writer = csv.writer(fileobj, dialect="excel")
            writer.writerow(a)
            expected = ",".join([str(i) for i in a])+"\r\n"
            fileobj.seek(0)
            self.assertEqual(fileobj.read(), expected)
        finally:
            fileobj.close()
            os.unlink(name)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_dialect_apply(self):
        class testA(csv.excel):
            delimiter = "\t"
        class testB(csv.excel):
            delimiter = ":"
        class testC(csv.excel):
            delimiter = "|"
        class testUni(csv.excel):
            delimiter = "\u039B"

        csv.register_dialect('testC', testC)
        try:
            self.compare_dialect_123("1,2,3\r\n")
            self.compare_dialect_123("1\t2\t3\r\n", testA)
            self.compare_dialect_123("1:2:3\r\n", dialect=testB())
            self.compare_dialect_123("1|2|3\r\n", dialect='testC')
            self.compare_dialect_123("1;2;3\r\n", dialect=testA,
                                     delimiter=';')
            self.compare_dialect_123("1\u039B2\u039B3\r\n",
                                     dialect=testUni)

        finally:
            csv.unregister_dialect('testC')
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_space_dialect(self):
        class space(csv.excel):
            delimiter = " "
            quoting = csv.QUOTE_NONE
            escapechar = "\\"

        fd, name = tempfile.mkstemp()
        fileobj = os.fdopen(fd, "w+b")
        try:
            fileobj.write("abc def\nc1ccccc1 benzene\n")
            fileobj.seek(0)
            rdr = csv.reader(fileobj, dialect=space())
            self.assertEqual(rdr.next(), ["abc", "def"])
            self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
        finally:
            fileobj.close()
            os.unlink(name)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_int_write(self):
        import array
        contents = [(20-i) for i in range(20)]
        a = array.array('i', contents)

        fd, name = tempfile.mkstemp()
        fileobj = os.fdopen(fd, "w+b")
        try:
            writer = csv.writer(fileobj, dialect="excel")
            writer.writerow(a)
            expected = ",".join([str(i) for i in a])+"\r\n"
            fileobj.seek(0)
            self.assertEqual(fileobj.read(), expected)
        finally:
            fileobj.close()
            os.unlink(name)
项目:torchsample    作者:ncullen93    | 项目源码 | 文件源码
def on_train_end(self, logs=None):
        REJECT_KEYS={'has_validation_data'}
        row_dict = self.row_dict

        class CustomDialect(csv.excel):
            delimiter = self.sep
        self.keys = self.keys
        temp_file = NamedTemporaryFile(delete=False, mode='w')
        with open(self.file, 'r') as csv_file, temp_file:
            reader = csv.DictReader(csv_file,
                fieldnames=['model'] + [k for k in self.keys if k not in REJECT_KEYS], 
                dialect=CustomDialect)
            writer = csv.DictWriter(temp_file,
                fieldnames=['model'] + [k for k in self.keys if k not in REJECT_KEYS], 
                dialect=CustomDialect)
            for row_idx, row in enumerate(reader):
                if row_idx == 0:
                    # re-write header with on_train_end's metrics
                    pass
                if row['model'] == self.row_dict['model']:
                    writer.writerow(row_dict)
                else:
                    writer.writerow(row)
        shutil.move(temp_file.name, self.file)
项目:GAMADV-X    作者:taers232c    | 项目源码 | 文件源码
def __init__(self, f, fieldnames=None, encoding=UTF8, **kwds):
    self.encoding = encoding
    try:
      self.reader = csv.reader(UTF8Recoder(f, encoding) if self.encoding != UTF8 else f, dialect=csv.excel, **kwds)
      if not fieldnames:
        self.fieldnames = self.reader.next()
        if len(self.fieldnames) > 0 and self.fieldnames[0].startswith(codecs.BOM_UTF8):
          self.fieldnames[0] = self.fieldnames[0].replace(codecs.BOM_UTF8, u'', 1)
      else:
        self.fieldnames = fieldnames
    except (csv.Error, StopIteration):
      self.fieldnames = []
    except LookupError as e:
      Cmd.Backup()
      usageErrorExit(e)
    self.numfields = len(self.fieldnames)
项目:keras    作者:NVIDIA    | 项目源码 | 文件源码
def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}

        def handle_value(k):
            is_zero_dim_ndarray = isinstance(k, np.ndarray) and k.ndim == 0
            if isinstance(k, Iterable) and not is_zero_dim_ndarray:
                return '"[%s]"' % (', '.join(map(str, k)))
            else:
                return k

        if not self.writer:
            self.keys = sorted(logs.keys())

            class CustomDialect(csv.excel):
                delimiter = self.sep

            self.writer = csv.DictWriter(self.csv_file,
                                         fieldnames=['epoch'] + self.keys, dialect=CustomDialect)
            if self.append_header:
                self.writer.writeheader()

        row_dict = OrderedDict({'epoch': epoch})
        row_dict.update((key, handle_value(logs[key])) for key in self.keys)
        self.writer.writerow(row_dict)
        self.csv_file.flush()
项目:faster-rcnn-scenarios    作者:djdam    | 项目源码 | 文件源码
def write_csv(output_filename, dict_list, delimiter, verbose=False):
    """Write a CSV file
    """

    if not dict_list:
        if verbose:
            print('Not writing %s; no lines to write' % output_filename)
        return

    dialect = csv.excel
    dialect.delimiter = delimiter

    with open(output_filename, 'w') as f:
        dict_writer = csv.DictWriter(f, fieldnames=dict_list[0].keys(),
                                     dialect=dialect)
        dict_writer.writeheader()
        dict_writer.writerows(dict_list)
    if verbose:
        print 'Wrote %s' % output_filename
项目:keras_superpixel_pooling    作者:parag2489    | 项目源码 | 文件源码
def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}

        def handle_value(k):
            is_zero_dim_ndarray = isinstance(k, np.ndarray) and k.ndim == 0
            if isinstance(k, Iterable) and not is_zero_dim_ndarray:
                return '"[%s]"' % (', '.join(map(str, k)))
            else:
                return k

        if not self.writer:
            self.keys = sorted(logs.keys())

            class CustomDialect(csv.excel):
                delimiter = self.sep

            self.writer = csv.DictWriter(self.csv_file,
                                         fieldnames=['epoch'] + self.keys, dialect=CustomDialect)
            if self.append_header:
                self.writer.writeheader()

        row_dict = OrderedDict({'epoch': epoch})
        row_dict.update((key, handle_value(logs[key])) for key in self.keys)
        self.writer.writerow(row_dict)
        self.csv_file.flush()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_dialect_apply(self):
        class testA(csv.excel):
            delimiter = "\t"
        class testB(csv.excel):
            delimiter = ":"
        class testC(csv.excel):
            delimiter = "|"
        class testUni(csv.excel):
            delimiter = "\u039B"

        csv.register_dialect('testC', testC)
        try:
            self.compare_dialect_123("1,2,3\r\n")
            self.compare_dialect_123("1\t2\t3\r\n", testA)
            self.compare_dialect_123("1:2:3\r\n", dialect=testB())
            self.compare_dialect_123("1|2|3\r\n", dialect='testC')
            self.compare_dialect_123("1;2;3\r\n", dialect=testA,
                                     delimiter=';')
            self.compare_dialect_123("1\u039B2\u039B3\r\n",
                                     dialect=testUni)

        finally:
            csv.unregister_dialect('testC')
项目:tv-ad-classification    作者:Abhinav95    | 项目源码 | 文件源码
def write_csv(output_filename, dict_list, delimiter, verbose=False):
    """Write a CSV file
    """

    if not dict_list:
        if verbose:
            print('Not writing %s; no lines to write' % output_filename)
        return

    dialect = csv.excel
    dialect.delimiter = delimiter

    with open(output_filename, 'w') as f:
        dict_writer = csv.DictWriter(f, fieldnames=dict_list[0].keys(),
                                     dialect=dialect)
        dict_writer.writeheader()
        dict_writer.writerows(dict_list)
    if verbose:
        print 'Wrote %s' % output_filename
项目:AmericansStopSoros    作者:HelloKitty    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds):
        f = UTF8Recoder(f, encoding)
        self.reader = csv.reader(f, dialect=dialect, **kwds)
项目:AmericansStopSoros    作者:HelloKitty    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds):
        self.queue = cStringIO.StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def __init__(self, filename, dialect=csv.excel,
                 encoding="utf-8", **kw):
        self.filename = filename
        self.dialect = dialect
        self.encoding = encoding
        self.kw = kw
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def process(options, args):
    '''
    Save air quality data in a csv_file
    '''
    url = "http://www.seremisaludrm.cl/sitio/pag/aire/indexjs3aireindices-prueba.asp"
    sock = urllib.urlopen(url)
    htmlSource = sock.read()
    sock.close()
    csv_file = options.csv_file
    csv_exists = False
    # Append sensors data
    air_data = extract_data(htmlSource)
    encabezado = air_data.next()
    encabezado.append('DATE')
    encabezado.append('TIME')
    date = extract_date(htmlSource)
    time = extract_time(htmlSource)
    if os.path.exists(csv_file):
        f_in = open(csv_file, "rb")
        Reader = csv.reader(f_in, dialect=csv.excel)
        encabezado_old = Reader.next()
        csv_exists = True
    f_out = open(csv_file+'~', 'wb')
    Writer = csv.writer(f_out, dialect=csv.excel)
    # TODO check encabezado == encabezado_old
    Writer.writerow(encabezado)
    for row in air_data:
        row.append(date)
        row.append(time)
        Writer.writerow(row)
    if csv_exists:
        for row in Reader:
            Writer.writerow(row)
        f_in.close()
        os.remove(csv_file)
    f_out.close()
    os.rename(csv_file+'~', csv_file)
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs):
    """Unicode CSV reader."""
    # This code is taken from http://docs.python.org/library/csv.html#examples
    # csv.py doesn't do Unicode; encode temporarily as UTF-8:
    csv_reader = csv.reader(utf_8_encoder(unicode_csv_data),
                            dialect=dialect, **kwargs)
    for row in csv_reader:
        # decode UTF-8 back to Unicode, cell by cell:
        yield [unicode(cell, 'utf-8') for cell in row]
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
        """Init method."""
        # Redirect output to a queue
        self.queue = cStringIO.StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
项目:us_zipcodes_congress    作者:OpenSourceActivismTech    | 项目源码 | 文件源码
def csv_reader_converter(utf8_data, dialect=csv.excel, **kwargs):
    csv_reader = csv.reader(utf8_data, dialect=dialect, **kwargs)
    for row in csv_reader:
        yield [unicode(cell, 'latin-1') for cell in row]
项目:sqlakeyset    作者:djrobstep    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, **kwds):
        f = UTF8Recoder(f)
        self.reader = csv.reader(f, dialect=dialect, **kwds)
项目:sqlakeyset    作者:djrobstep    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, **kwds):
        self.queue = io.BytesIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
项目:pfb-network-connectivity    作者:azavea    | 项目源码 | 文件源码
def handle(self, *args, **options):

        tmpdir = tempfile.mkdtemp()

        try:
            queryset = AnalysisJob.objects.all().filter(status=AnalysisJob.Status.COMPLETE)
            filter_set = AnalysisJobFilterSet()
            queryset = filter_set.filter_latest(queryset, 'latest', True)

            tmp_csv_filename = os.path.join(tmpdir, 'results.csv')
            with open(tmp_csv_filename, 'w') as csv_file:
                writer = None
                fieldnames = []

                for job in queryset:
                    row_data = {}
                    for export in EXPORTS:
                        columns, values = export(job)
                        if writer is None:
                            fieldnames = fieldnames + columns
                        for column, value in zip(columns, values):
                            row_data[column] = value
                    if writer is None:
                        writer = csv.DictWriter(csv_file,
                                                fieldnames=fieldnames,
                                                dialect=csv.excel,
                                                quoting=csv.QUOTE_MINIMAL)
                        writer.writeheader()
                    writer.writerow(row_data)

            s3_client = boto3.client('s3')
            now = datetime.utcnow()
            s3_key = 'analysis-spreadsheets/results-{}.csv'.format(now.strftime('%Y-%m-%dT%H%M'))
            s3_client.upload_file(tmp_csv_filename, settings.AWS_STORAGE_BUCKET_NAME, s3_key)
            logger.info('File uploaded to: s3://{}/{}'
                        .format(settings.AWS_STORAGE_BUCKET_NAME, s3_key))
        finally:
            shutil.rmtree(tmpdir)
项目:pyconjp-website    作者:pyconjp    | 项目源码 | 文件源码
def get_csv(self):
        # Call the URL, get the response, parse it strictly as CSV,
        # and return the list of dictionaries
        rsp = self.client.get(self.url)
        self.assertEqual(200, rsp.status_code)
        dialect = csv.excel()
        dialect.strict = True
        reader = csv.DictReader(StringIO(rsp.content), dialect=dialect)
        result = []
        for item in reader:
            for k, v in item.iteritems():
                item[k] = v.decode('utf-8')
            result.append(item)
        return result
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super(RankerRelevanceFileQueryStream, self).__init__(*args, **kwargs)

        dialect = csv.excel

        # The following explicit assignments shadow the dialect defaults
        # but are necessary to avoid strange behavior while called by
        # certain unit tests. Please do not delete.
        dialect.doublequote = True
        dialect.quoting = csv.QUOTE_MINIMAL
        dialect.skipinitialspace = True

        self.__reader__ = csv.reader(self.query_file, dialect=dialect)
项目:python-fritzbox    作者:pamapa    | 项目源码 | 文件源码
def __init__(self, f, delimiter=',', dialect=csv.excel, encoding="utf-8"):
        self.reader = csv.DictReader(f, delimiter=delimiter, dialect=dialect)
项目:csv_generator    作者:fatboystring    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs):
            """
            Instantiates the UnicodeWriter instance

            :param f: File like object to write CSV data to
            :param dialect: The dialect for the CSV
            :param encoding: The CSV encoding
            :param kwargs: Keyword args
            """
            self.writer = csv.writer(f)
项目:csv_generator    作者:fatboystring    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs):
            """
            Instantiates the UnicodeWriter instance

            :param f: File like object to write CSV data to
            :param dialect: The dialect for the CSV
            :param encoding: The CSV encoding
            :param kwargs: Keyword args
            """
            self.queue = cStringIO.StringIO()
            self.writer = csv.writer(self.queue, dialect=dialect, **kwargs)
            self.stream = f
            self.encoder = codecs.getincrementalencoder(encoding)()
项目:SWProxy-plugins    作者:lstern    | 项目源码 | 文件源码
def __init__(self, f, fieldnames, dialect=csv.excel, encoding="utf-8", newfile=True, **kwds):
        # Redirect output to a queue
        self.queue = cStringIO.StringIO()
        self.writer = csv.DictWriter(self.queue, fieldnames, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
        if newfile:
            self.writebom()
项目:SWProxy-plugins    作者:lstern    | 项目源码 | 文件源码
def writebom(self):
        """Write BOM, so excel can identify this as UTF8"""
        self.stream.write(u'\ufeff'.encode('utf8'))
项目:acacia_main    作者:AcaciaTrading    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
        # Redirect output to a queue
        self.queue = cStringIO.StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding='utf-8', **kwds):
        f = CSVRecoder(f, encoding)
        self.reader = csv.reader(f, dialect=dialect, **kwds)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_registry(self):
        class myexceltsv(csv.excel):
            delimiter = "\t"
        name = "myexceltsv"
        expected_dialects = csv.list_dialects() + [name]
        expected_dialects.sort()
        csv.register_dialect(name, myexceltsv)
        self.addCleanup(csv.unregister_dialect, name)
        self.assertEqual(csv.get_dialect(name).delimiter, '\t')
        got_dialects = sorted(csv.list_dialects())
        self.assertEqual(expected_dialects, got_dialects)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_space_dialect(self):
        class space(csv.excel):
            delimiter = " "
            quoting = csv.QUOTE_NONE
            escapechar = "\\"

        with TemporaryFile("w+") as fileobj:
            fileobj.write("abc def\nc1ccccc1 benzene\n")
            fileobj.seek(0)
            reader = csv.reader(fileobj, dialect=space())
            self.assertEqual(next(reader), ["abc", "def"])
            self.assertEqual(next(reader), ["c1ccccc1", "benzene"])
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_int_write(self):
        import array
        contents = [(20-i) for i in range(20)]
        a = array.array('i', contents)

        with TemporaryFile("w+", newline='') as fileobj:
            writer = csv.writer(fileobj, dialect="excel")
            writer.writerow(a)
            expected = ",".join([str(i) for i in a])+"\r\n"
            fileobj.seek(0)
            self.assertEqual(fileobj.read(), expected)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_double_write(self):
        import array
        contents = [(20-i)*0.1 for i in range(20)]
        a = array.array('d', contents)
        with TemporaryFile("w+", newline='') as fileobj:
            writer = csv.writer(fileobj, dialect="excel")
            writer.writerow(a)
            expected = ",".join([str(i) for i in a])+"\r\n"
            fileobj.seek(0)
            self.assertEqual(fileobj.read(), expected)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_char_write(self):
        import array, string
        a = array.array('u', string.ascii_letters)

        with TemporaryFile("w+", newline='') as fileobj:
            writer = csv.writer(fileobj, dialect="excel")
            writer.writerow(a)
            expected = ",".join(a)+"\r\n"
            fileobj.seek(0)
            self.assertEqual(fileobj.read(), expected)
项目:deep-learning-keras-projects    作者:jasmeetsb    | 项目源码 | 文件源码
def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}

        def handle_value(k):
            is_zero_dim_ndarray = isinstance(k, np.ndarray) and k.ndim == 0
            if isinstance(k, Iterable) and not is_zero_dim_ndarray:
                return '"[%s]"' % (', '.join(map(str, k)))
            else:
                return k

        if not self.writer:
            self.keys = sorted(logs.keys())

            class CustomDialect(csv.excel):
                delimiter = self.sep

            self.writer = csv.DictWriter(self.csv_file,
                                         fieldnames=['epoch'] + self.keys, dialect=CustomDialect)
            if self.append_header:
                self.writer.writeheader()

        row_dict = OrderedDict({'epoch': epoch})
        row_dict.update((key, handle_value(logs[key])) for key in self.keys)
        self.writer.writerow(row_dict)
        self.csv_file.flush()
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", errors='replace', **kwds):
            # Redirect output to a queue
            self.queue = cStringIO.StringIO()
            self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
            self.stream = f
            encoder_cls = codecs.getincrementalencoder(encoding)
            self.encoder = encoder_cls(errors=errors)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_registry(self):
        class myexceltsv(csv.excel):
            delimiter = "\t"
        name = "myexceltsv"
        expected_dialects = csv.list_dialects() + [name]
        expected_dialects.sort()
        csv.register_dialect(name, myexceltsv)
        self.addCleanup(csv.unregister_dialect, name)
        self.assertEqual(csv.get_dialect(name).delimiter, '\t')
        got_dialects = sorted(csv.list_dialects())
        self.assertEqual(expected_dialects, got_dialects)