Python csv 模块,Sniffer() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用csv.Sniffer()

项目:dynamicpricing    作者:marcelja    | 项目源码 | 文件源码
def append_by_csvs(self, market_situations_path, buy_offer_path, csv_merchant_id=None):
        with open(market_situations_path, 'r') as csvfile:
            has_header = csv.Sniffer().has_header(csvfile.read(16384))
            csvfile.seek(0)
            if has_header:
                situation_data = csv.DictReader(csvfile)
            else:
                situation_data = csv.DictReader(csvfile, fieldnames=get_market_situation_fieldnames())
            for line in situation_data:
                self.append_marketplace_situations(line, csv_merchant_id)
        self.update_timestamps()
        with open(buy_offer_path, 'r') as csvfile:
            has_header = csv.Sniffer().has_header(csvfile.read(16384))
            csvfile.seek(0)
            if has_header:
                buy_offer_data = csv.DictReader(csvfile)
            else:
                buy_offer_data = csv.DictReader(csvfile, fieldnames=get_buy_offer_fieldnames())
            for line in buy_offer_data:
                self.append_sales(line)
        self.print_info()
项目:girder_worker    作者:girder    | 项目源码 | 文件源码
def get_csv_reader(input):

    # csv package does not support unicode
    input = str(input)

    # Special case: detect single-column files.
    # This check assumes that our only valid delimiters are commas and tabs.
    firstLine = input.split('\n')[0]
    if not ('\t' in firstLine or ',' in firstLine) \
            or len(input.splitlines()) == 1:
        dialect = 'excel'

    else:
        # Take a data sample to determine dialect, but
        # don't include incomplete last line
        sample = ''
        sampleSize = 0
        while len(sample) == 0:
            sampleSize += 5000
            sample = '\n'.join(input[:sampleSize].splitlines()[:-1])
        dialect = csv.Sniffer().sniff(sample)
        dialect.skipinitialspace = True
    return csv.DictReader(input.splitlines(), dialect=dialect)
项目:pheweb    作者:statgen    | 项目源码 | 文件源码
def import_phenolist(filepath, has_header):
    # Return a list-of-dicts with the original column names, or integers if none.
    # It'd be great to use pandas for this.
    if not os.path.exists(filepath):
        raise PheWebError("ERROR: unable to import {!r} because it doesn't exist".format(filepath))
    # 1. try openpyxl.
    phenos = _import_phenolist_xlsx(filepath, has_header)
    if phenos is not None:
        return phenos
    with read_maybe_gzip(filepath) as f:
        # 2. try json.load(f)
        try:
            return json.load(f)
        except ValueError:
            if filepath.endswith('.json'):
                raise PheWebError("The filepath {!r} ends with '.json' but reading it as json failed.".format(filepath))
        # 3. try csv.reader() with csv.Sniffer().sniff()
        f.seek(0)
        phenos = _import_phenolist_csv(f, has_header)
        if phenos is not None:
            return phenos
        raise PheWebError("I couldn't figure out how to open the file {!r}, sorry.".format(filepath))
项目:planet-b-saleor    作者:planet-b    | 项目源码 | 文件源码
def test_write_feed(product_in_stock, monkeypatch):
    buffer = StringIO()
    write_feed(buffer)
    buffer.seek(0)
    dialect = csv.Sniffer().sniff(buffer.getvalue())
    assert dialect.delimiter == csv.excel_tab.delimiter
    assert dialect.quotechar == csv.excel_tab.quotechar
    assert dialect.escapechar == csv.excel_tab.escapechar
    assert csv.Sniffer().has_header(buffer.getvalue())
    lines = [line for line in csv.reader(buffer, dialect=csv.excel_tab)]
    assert len(lines) == 2
    header = lines[0]
    google_required_fields = ['id', 'title', 'link', 'image_link',
                              'availability', 'price', 'condition']
    for field in google_required_fields:
        assert field in header
项目:paragraph2vec    作者:thunlp    | 项目源码 | 文件源码
def __init__(self, fname, labels):
        """
        Initialize the corpus from a file.
        `labels` = are class labels present in the input file? => skip the first column

        """
        logger.info("loading corpus from %s" % fname)
        self.fname = fname
        self.length = None
        self.labels = labels

        # load the first few lines, to guess the CSV dialect
        head = ''.join(itertools.islice(open(self.fname), 5))
        self.headers = csv.Sniffer().has_header(head)
        self.dialect = csv.Sniffer().sniff(head)
        logger.info("sniffed CSV delimiter=%r, headers=%s" % (self.dialect.delimiter, self.headers))
项目:KiField    作者:xesscorp    | 项目源码 | 文件源码
def csvfile_to_wb(csv_filename):
    '''Open a CSV file and return an openpyxl workbook.'''

    logger.log(
        DEBUG_DETAILED,
        'Converting CSV file {} into an XLSX workbook.'.format(csv_filename))

    with open(csv_filename) as csv_file:
        dialect = csv.Sniffer().sniff(csv_file.read())
        if USING_PYTHON2:
            for attr in dir(dialect):
                a = getattr(dialect, attr)
                if type(a) == unicode:
                    setattr(dialect, attr, bytes(a))
        csv_file.seek(0)
        reader = csv.reader(csv_file, dialect)
        wb = pyxl.Workbook()
        ws = wb.active
        for row_index, row in enumerate(reader, 1):
            for column_index, cell in enumerate(row, 1):
                if cell not in ('', None):
                    ws.cell(row=row_index, column=column_index).value = cell
    return (wb, dialect)
项目:map-of-innovation    作者:AnanseGroup    | 项目源码 | 文件源码
def addfromcsv(self):
        if os.environ.get("REDIS_URL") :
            redis_url = os.environ.get("REDIS_URL")
        else:
            redis_url = "localhost"        
        r_server = redis.from_url(redis_url)
        with open('mapofinnovation/public/spaces_ready_for_merge.csv', 'rb') as csv_file:
                dialect = csv.Sniffer().sniff(csv_file.read(), delimiters=',')
                csv_file.seek(0)
                csv_reader = csv.DictReader(csv_file, dialect=dialect)
                for row in csv_reader:
            key = row['name']+str(datetime.now())                    
            row.update({'archived':False})
            row.update({'verified':True})
            r_server.hmset(re.sub(' ','',key),row) 
        return {'success':'true'}
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def from_csv(fp, field_names = None, **kwargs):

    dialect = csv.Sniffer().sniff(fp.read(1024))
    fp.seek(0)
    reader = csv.reader(fp, dialect)

    table = PrettyTable(**kwargs)
    if field_names:
        table.field_names = field_names
    else:
        if py3k:
            table.field_names = [x.strip() for x in next(reader)]
        else:
            table.field_names = [x.strip() for x in reader.next()]

    for row in reader:
        table.add_row([x.strip() for x in row])

    return table
项目:spartacus    作者:wind39    | 项目源码 | 文件源码
def Open(self):
        try:
            if not os.path.isfile(self.v_filename):
                raise Spartacus.Utils.Exception('File {0} does not exist or is not a file.'.format(self.v_filename))
            if self.v_extension == 'csv':
                self.v_file = open(self.v_filename, encoding=self.v_encoding)
                v_sample = self.v_file.read(1024)
                self.v_file.seek(0)
                v_sniffer = csv.Sniffer()
                if not v_sniffer.has_header(v_sample):
                    raise Spartacus.Utils.Exception('CSV file {0} does not have a header.'.format(self.v_filename))
                v_dialect = v_sniffer.sniff(v_sample)
                self.v_object = csv.DictReader(self.v_file, self.v_header, None, None, v_dialect)
                self.v_open = True
            elif self.v_extension == 'xlsx':
                self.v_object = openpyxl.load_workbook(self.v_filename, read_only=True)
                self.v_open = True
            else:
                raise Spartacus.Utils.Exception('File extension "{0}" not supported.'.format(self.v_extension))
        except Spartacus.Utils.Exception as exc:
            raise exc
        except Exception as exc:
            raise Spartacus.Utils.Exception(str(exc))
项目:slidoc    作者:mitotic    | 项目源码 | 文件源码
def restoreSheet(sheetName, filepath, csvfile, overwrite=None):
    # Restore sheet from backup CSV file
    try:
        ##dialect = csv.Sniffer().sniff(csvfile.read(1024))
        ##csvfile.seek(0)
        reader = csv.reader(csvfile, delimiter=',')  # Ignore dialect for now
        rows = [row for row in reader]
        if not rows:
            raise Exception('No rows in CSV file %s for sheet %s' % (filepath, sheetName))

        sdproxy.importSheet(sheetName, rows[0], rows[1:], overwrite=overwrite)
        return ''

    except Exception, excp:
        if Options['debug']:
            import traceback
            traceback.print_exc()
        return 'Error in restoreSheet: '+str(excp)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_sniff(self):
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(self.sample1)
        self.assertEqual(dialect.delimiter, ",")
        self.assertEqual(dialect.quotechar, '"')
        self.assertEqual(dialect.skipinitialspace, True)

        dialect = sniffer.sniff(self.sample2)
        self.assertEqual(dialect.delimiter, ":")
        self.assertEqual(dialect.quotechar, "'")
        self.assertEqual(dialect.skipinitialspace, False)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_delimiters(self):
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(self.sample3)
        # given that all three lines in sample3 are equal,
        # I think that any character could have been 'guessed' as the
        # delimiter, depending on dictionary order
        self.assertIn(dialect.delimiter, self.sample3)
        dialect = sniffer.sniff(self.sample3, delimiters="?,")
        self.assertEqual(dialect.delimiter, "?")
        dialect = sniffer.sniff(self.sample3, delimiters="/,")
        self.assertEqual(dialect.delimiter, "/")
        dialect = sniffer.sniff(self.sample4)
        self.assertEqual(dialect.delimiter, ";")
        dialect = sniffer.sniff(self.sample5)
        self.assertEqual(dialect.delimiter, "\t")
        dialect = sniffer.sniff(self.sample6)
        self.assertEqual(dialect.delimiter, "|")
        dialect = sniffer.sniff(self.sample7)
        self.assertEqual(dialect.delimiter, "|")
        self.assertEqual(dialect.quotechar, "'")
项目:topical_word_embeddings    作者:thunlp    | 项目源码 | 文件源码
def __init__(self, fname, labels):
        """
        Initialize the corpus from a file.
        `labels` = are class labels present in the input file? => skip the first column

        """
        logger.info("loading corpus from %s" % fname)
        self.fname = fname
        self.length = None
        self.labels = labels

        # load the first few lines, to guess the CSV dialect
        head = ''.join(itertools.islice(open(self.fname), 5))
        self.headers = csv.Sniffer().has_header(head)
        self.dialect = csv.Sniffer().sniff(head)
        logger.info("sniffed CSV delimiter=%r, headers=%s" % (self.dialect.delimiter, self.headers))
项目:topical_word_embeddings    作者:thunlp    | 项目源码 | 文件源码
def __init__(self, fname, labels):
        """
        Initialize the corpus from a file.
        `labels` = are class labels present in the input file? => skip the first column

        """
        logger.info("loading corpus from %s" % fname)
        self.fname = fname
        self.length = None
        self.labels = labels

        # load the first few lines, to guess the CSV dialect
        head = ''.join(itertools.islice(open(self.fname), 5))
        self.headers = csv.Sniffer().has_header(head)
        self.dialect = csv.Sniffer().sniff(head)
        logger.info("sniffed CSV delimiter=%r, headers=%s" % (self.dialect.delimiter, self.headers))
项目:topical_word_embeddings    作者:thunlp    | 项目源码 | 文件源码
def __init__(self, fname, labels):
        """
        Initialize the corpus from a file.
        `labels` = are class labels present in the input file? => skip the first column

        """
        logger.info("loading corpus from %s" % fname)
        self.fname = fname
        self.length = None
        self.labels = labels

        # load the first few lines, to guess the CSV dialect
        head = ''.join(itertools.islice(open(self.fname), 5))
        self.headers = csv.Sniffer().has_header(head)
        self.dialect = csv.Sniffer().sniff(head)
        logger.info("sniffed CSV delimiter=%r, headers=%s" % (self.dialect.delimiter, self.headers))
项目:pysos    作者:dagnelies    | 项目源码 | 文件源码
def csv2sos(path, keys=None, encoding=None, dialect=None):

    if not encoding:
        encoding = detectEncoding(path)
        print('Detected encoding: %s' % encoding)

    csvfile = open(path, 'rt', encoding=encoding)
    sosfile = open(path + '.sos', 'wt', encoding='utf8')

    if not dialect:
        dialect = csv.Sniffer().sniff(csvfile.read(1024*1024), delimiters=[';','\t',','])
        print('Detected csv dialect: %s' % dialect)

    csvfile.seek(0)
    reader = csv.DictReader(csvfile, dialect=dialect)
    i = 0
    for row in reader:
        sosfile.write(str(i) + '\t' + json.dumps(row, ensure_ascii=False) + '\n')
        i += 1
        if i % 100000 == 0:
            print("%10d items converted" % i)

    csvfile.close()    
    sosfile.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_delimiters(self):
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(self.sample3)
        # given that all three lines in sample3 are equal,
        # I think that any character could have been 'guessed' as the
        # delimiter, depending on dictionary order
        self.assertIn(dialect.delimiter, self.sample3)
        dialect = sniffer.sniff(self.sample3, delimiters="?,")
        self.assertEqual(dialect.delimiter, "?")
        dialect = sniffer.sniff(self.sample3, delimiters="/,")
        self.assertEqual(dialect.delimiter, "/")
        dialect = sniffer.sniff(self.sample4)
        self.assertEqual(dialect.delimiter, ";")
        dialect = sniffer.sniff(self.sample5)
        self.assertEqual(dialect.delimiter, "\t")
        dialect = sniffer.sniff(self.sample6)
        self.assertEqual(dialect.delimiter, "|")
        dialect = sniffer.sniff(self.sample7)
        self.assertEqual(dialect.delimiter, "|")
        self.assertEqual(dialect.quotechar, "'")
        dialect = sniffer.sniff(self.sample8)
        self.assertEqual(dialect.delimiter, '+')
        dialect = sniffer.sniff(self.sample9)
        self.assertEqual(dialect.delimiter, '+')
        self.assertEqual(dialect.quotechar, "'")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_delimiters(self):
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(self.sample3)
        # given that all three lines in sample3 are equal,
        # I think that any character could have been 'guessed' as the
        # delimiter, depending on dictionary order
        self.assertIn(dialect.delimiter, self.sample3)
        dialect = sniffer.sniff(self.sample3, delimiters="?,")
        self.assertEqual(dialect.delimiter, "?")
        dialect = sniffer.sniff(self.sample3, delimiters="/,")
        self.assertEqual(dialect.delimiter, "/")
        dialect = sniffer.sniff(self.sample4)
        self.assertEqual(dialect.delimiter, ";")
        dialect = sniffer.sniff(self.sample5)
        self.assertEqual(dialect.delimiter, "\t")
        dialect = sniffer.sniff(self.sample6)
        self.assertEqual(dialect.delimiter, "|")
        dialect = sniffer.sniff(self.sample7)
        self.assertEqual(dialect.delimiter, "|")
        self.assertEqual(dialect.quotechar, "'")
        dialect = sniffer.sniff(self.sample8)
        self.assertEqual(dialect.delimiter, '+')
        dialect = sniffer.sniff(self.sample9)
        self.assertEqual(dialect.delimiter, '+')
        self.assertEqual(dialect.quotechar, "'")
项目:My-Web-Server-Framework-With-Python2.7    作者:syjsu    | 项目源码 | 文件源码
def from_csv(fp, field_names = None, **kwargs):

    dialect = csv.Sniffer().sniff(fp.read(1024))
    fp.seek(0)
    reader = csv.reader(fp, dialect)

    table = PrettyTable(**kwargs)
    if field_names:
        table.field_names = field_names
    else:
        if py3k:
            table.field_names = [x.strip() for x in next(reader)]
        else:
            table.field_names = [x.strip() for x in reader.next()]

    for row in reader:
        table.add_row([x.strip() for x in row])

    return table
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_delimiters(self):
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(self.sample3)
        # given that all three lines in sample3 are equal,
        # I think that any character could have been 'guessed' as the
        # delimiter, depending on dictionary order
        self.assertIn(dialect.delimiter, self.sample3)
        dialect = sniffer.sniff(self.sample3, delimiters="?,")
        self.assertEqual(dialect.delimiter, "?")
        dialect = sniffer.sniff(self.sample3, delimiters="/,")
        self.assertEqual(dialect.delimiter, "/")
        dialect = sniffer.sniff(self.sample4)
        self.assertEqual(dialect.delimiter, ";")
        dialect = sniffer.sniff(self.sample5)
        self.assertEqual(dialect.delimiter, "\t")
        dialect = sniffer.sniff(self.sample6)
        self.assertEqual(dialect.delimiter, "|")
        dialect = sniffer.sniff(self.sample7)
        self.assertEqual(dialect.delimiter, "|")
        self.assertEqual(dialect.quotechar, "'")
        dialect = sniffer.sniff(self.sample8)
        self.assertEqual(dialect.delimiter, '+')
        dialect = sniffer.sniff(self.sample9)
        self.assertEqual(dialect.delimiter, '+')
        self.assertEqual(dialect.quotechar, "'")
项目:PocHunter    作者:DavexPro    | 项目源码 | 文件源码
def from_csv(fp, field_names = None, **kwargs):

    dialect = csv.Sniffer().sniff(fp.read(1024))
    fp.seek(0)
    reader = csv.reader(fp, dialect)

    table = PrettyTable(**kwargs)
    if field_names:
        table.field_names = field_names
    else:
        if py3k:
            table.field_names = [x.strip() for x in next(reader)]
        else:
            table.field_names = [x.strip() for x in reader.next()]

    for row in reader:
        table.add_row([x.strip() for x in row])

    return table
项目:dfViewer    作者:sterry24    | 项目源码 | 文件源码
def parseDelimiter(self,f):

        infile = open(f)
        lines = infile.readlines()
        infile.close()

        sniffer = csv.Sniffer()
        text = sniffer.sniff(lines[0])
        return text.delimiter



###############################################################################
# The following are GUI shortcut tools
###############################################################################
    ## Create an action for GUIs
项目:flotilla-easy    作者:archieroques    | 项目源码 | 文件源码
def from_csv(fp, field_names = None, **kwargs):

    dialect = csv.Sniffer().sniff(fp.read(1024))
    fp.seek(0)
    reader = csv.reader(fp, dialect)

    table = PrettyTable(**kwargs)
    if field_names:
        table.field_names = field_names
    else:
        if py3k:
            table.field_names = [x.strip() for x in next(reader)]
        else:
            table.field_names = [x.strip() for x in reader.next()]

    for row in reader:
        table.add_row([x.strip() for x in row])

    return table
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_delimiters(self):
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(self.sample3)
        # given that all three lines in sample3 are equal,
        # I think that any character could have been 'guessed' as the
        # delimiter, depending on dictionary order
        self.assertIn(dialect.delimiter, self.sample3)
        dialect = sniffer.sniff(self.sample3, delimiters="?,")
        self.assertEqual(dialect.delimiter, "?")
        dialect = sniffer.sniff(self.sample3, delimiters="/,")
        self.assertEqual(dialect.delimiter, "/")
        dialect = sniffer.sniff(self.sample4)
        self.assertEqual(dialect.delimiter, ";")
        dialect = sniffer.sniff(self.sample5)
        self.assertEqual(dialect.delimiter, "\t")
        dialect = sniffer.sniff(self.sample6)
        self.assertEqual(dialect.delimiter, "|")
        dialect = sniffer.sniff(self.sample7)
        self.assertEqual(dialect.delimiter, "|")
        self.assertEqual(dialect.quotechar, "'")
        dialect = sniffer.sniff(self.sample8)
        self.assertEqual(dialect.delimiter, '+')
        dialect = sniffer.sniff(self.sample9)
        self.assertEqual(dialect.delimiter, '+')
        self.assertEqual(dialect.quotechar, "'")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_delimiters(self):
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(self.sample3)
        # given that all three lines in sample3 are equal,
        # I think that any character could have been 'guessed' as the
        # delimiter, depending on dictionary order
        self.assertIn(dialect.delimiter, self.sample3)
        dialect = sniffer.sniff(self.sample3, delimiters="?,")
        self.assertEqual(dialect.delimiter, "?")
        dialect = sniffer.sniff(self.sample3, delimiters="/,")
        self.assertEqual(dialect.delimiter, "/")
        dialect = sniffer.sniff(self.sample4)
        self.assertEqual(dialect.delimiter, ";")
        dialect = sniffer.sniff(self.sample5)
        self.assertEqual(dialect.delimiter, "\t")
        dialect = sniffer.sniff(self.sample6)
        self.assertEqual(dialect.delimiter, "|")
        dialect = sniffer.sniff(self.sample7)
        self.assertEqual(dialect.delimiter, "|")
        self.assertEqual(dialect.quotechar, "'")
        dialect = sniffer.sniff(self.sample8)
        self.assertEqual(dialect.delimiter, '+')
        dialect = sniffer.sniff(self.sample9)
        self.assertEqual(dialect.delimiter, '+')
        self.assertEqual(dialect.quotechar, "'")
项目:CVProject    作者:hieuxinhe94    | 项目源码 | 文件源码
def from_csv(fp, field_names = None, **kwargs):

    dialect = csv.Sniffer().sniff(fp.read(1024))
    fp.seek(0)
    reader = csv.reader(fp, dialect)

    table = PrettyTable(**kwargs)
    if field_names:
        table.field_names = field_names
    else:
        if py3k:
            table.field_names = [x.strip() for x in next(reader)]
        else:
            table.field_names = [x.strip() for x in reader.next()]

    for row in reader:
        table.add_row([x.strip() for x in row])

    return table
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_delimiters(self):
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(self.sample3)
        # given that all three lines in sample3 are equal,
        # I think that any character could have been 'guessed' as the
        # delimiter, depending on dictionary order
        self.assertIn(dialect.delimiter, self.sample3)
        dialect = sniffer.sniff(self.sample3, delimiters="?,")
        self.assertEqual(dialect.delimiter, "?")
        dialect = sniffer.sniff(self.sample3, delimiters="/,")
        self.assertEqual(dialect.delimiter, "/")
        dialect = sniffer.sniff(self.sample4)
        self.assertEqual(dialect.delimiter, ";")
        dialect = sniffer.sniff(self.sample5)
        self.assertEqual(dialect.delimiter, "\t")
        dialect = sniffer.sniff(self.sample6)
        self.assertEqual(dialect.delimiter, "|")
        dialect = sniffer.sniff(self.sample7)
        self.assertEqual(dialect.delimiter, "|")
        self.assertEqual(dialect.quotechar, "'")
项目:lalascan    作者:blackye    | 项目源码 | 文件源码
def from_csv(fp, field_names = None, **kwargs):

    dialect = csv.Sniffer().sniff(fp.read(1024))
    fp.seek(0)
    reader = csv.reader(fp, dialect)

    table = PrettyTable(**kwargs)
    if field_names:
        table.field_names = field_names
    else:
        if py3k:
            table.field_names = [x.strip() for x in next(reader)]
        else:
            table.field_names = [x.strip() for x in reader.next()]

    for row in reader:
        table.add_row([x.strip() for x in row])

    return table
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def from_csv(fp, field_names = None, **kwargs):

    dialect = csv.Sniffer().sniff(fp.read(1024))
    fp.seek(0)
    reader = csv.reader(fp, dialect)

    table = PrettyTable(**kwargs)
    if field_names:
        table.field_names = field_names
    else:
        if py3k:
            table.field_names = [x.strip() for x in next(reader)]
        else:
            table.field_names = [x.strip() for x in reader.next()]

    for row in reader:
        table.add_row([x.strip() for x in row])

    return table
项目:func_stats    作者:rfyiamcool    | 项目源码 | 文件源码
def from_csv(fp, field_names = None, **kwargs):

    dialect = csv.Sniffer().sniff(fp.read(1024))
    fp.seek(0)
    reader = csv.reader(fp, dialect)

    table = PrettyTable(**kwargs)
    if field_names:
        table.field_names = field_names
    else:
        if py3k:
            table.field_names = [x.strip() for x in next(reader)]
        else:
            table.field_names = [x.strip() for x in reader.next()]

    for row in reader:
        table.add_row([x.strip() for x in row])

    return table
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_delimiters(self):
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(self.sample3)
        # given that all three lines in sample3 are equal,
        # I think that any character could have been 'guessed' as the
        # delimiter, depending on dictionary order
        self.assertIn(dialect.delimiter, self.sample3)
        dialect = sniffer.sniff(self.sample3, delimiters="?,")
        self.assertEqual(dialect.delimiter, "?")
        dialect = sniffer.sniff(self.sample3, delimiters="/,")
        self.assertEqual(dialect.delimiter, "/")
        dialect = sniffer.sniff(self.sample4)
        self.assertEqual(dialect.delimiter, ";")
        dialect = sniffer.sniff(self.sample5)
        self.assertEqual(dialect.delimiter, "\t")
        dialect = sniffer.sniff(self.sample6)
        self.assertEqual(dialect.delimiter, "|")
        dialect = sniffer.sniff(self.sample7)
        self.assertEqual(dialect.delimiter, "|")
        self.assertEqual(dialect.quotechar, "'")
        dialect = sniffer.sniff(self.sample8)
        self.assertEqual(dialect.delimiter, '+')
        dialect = sniffer.sniff(self.sample9)
        self.assertEqual(dialect.delimiter, '+')
        self.assertEqual(dialect.quotechar, "'")
项目:catchments    作者:Luqqk    | 项目源码 | 文件源码
def load_input_data(points):
    """Creates DictReader from *.csv file.

    :param points (file object):
        *.csv file with
        'lon' (required),
        'lat' (required), 
        'name' (optional) columns.

    Returns:
        data (csv.DictReader)
    """

    dialect = csv.Sniffer().sniff(points.read())

    points.seek(0)

    data = csv.DictReader(points, dialect=dialect)

    return data
项目:nlpSentiment    作者:ClimbsRocks    | 项目源码 | 文件源码
def loadDataset(fileName):
    with open(fileName, 'rU') as trainingInput:
        # detect the "dialect" of this type of csv file
        try:
            dialect = csv.Sniffer().sniff(trainingInput.read(1024))
        except:
            # if we fail to detect the dialect, defautl to Microsoft Excel
            dialect = 'excel'
        trainingInput.seek(0)
        trainingRows = csv.reader(trainingInput, dialect)

        allTweets = []
        allTweetSentiments = []
        entireDataset = []
        for row in trainingRows:
            # csv only gives us an iterable, not the data itself
            entireDataset.append(row)

    return entireDataset
项目:Snakepit    作者:K4lium    | 项目源码 | 文件源码
def from_csv(fp, field_names = None, **kwargs):

    dialect = csv.Sniffer().sniff(fp.read(1024))
    fp.seek(0)
    reader = csv.reader(fp, dialect)

    table = PrettyTable(**kwargs)
    if field_names:
        table.field_names = field_names
    else:
        if py3k:
            table.field_names = [x.strip() for x in next(reader)]
        else:
            table.field_names = [x.strip() for x in reader.next()]

    for row in reader:
        table.add_row([x.strip() for x in row])

    return table
项目:LensCalibrator    作者:1024jp    | 项目源码 | 文件源码
def process_coordinates(self, processor_handler, output):
        in_cols = self.in_cols
        out_cols= self.out_cols

        with open(self.datafile.name) as file_in:
            # detect delimiter
            dialect = csv.Sniffer().sniff(file_in.read(2048), delimiters=',\t')
            file_in.seek(0)

            reader = csv.reader(file_in, dialect)
            writer = csv.writer(output, dialect)

            for row in reader:
                new_row = row[:]  # copy

                try:
                    x = float(row[in_cols[0]])
                    y = float(row[in_cols[1]])
                except:  # go to next column if not number
                    writer.writerow(new_row)
                    continue

                # translate
                x, y = processor_handler(x, y)

                new_row[out_cols[0]] = int(x)
                new_row[out_cols[1]] = int(y)

                writer.writerow(new_row)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def generate_rows(f):
    sniffer = csv.Sniffer()
    dialect = sniffer.sniff(f.readline())
    f.seek(0)

    reader = csv.reader(f, dialect)
    for line in reader:
        yield line
项目:mobot    作者:JokerQyou    | 项目源码 | 文件源码
def load_data(year):
    '''
    Load data into memory cache
    '''
    year = str(year)
    if year in CACHE:
        return True

    data_file = os.path.join(
        os.path.dirname(__file__), 'data', '{}.csv'.format(year)
    )
    if not os.path.isfile(data_file):
        return False

    CACHE[year] = {}
    with io.open(data_file, encoding='utf-8') as rf:
        # Detect CSV header line
        has_header = csv.Sniffer().has_header(rf.read(1024))
        rf.seek(0)

        reader = csv.DictReader(rf, DATA_FIELDS)
        if has_header:
            next(reader)

        for data_line in reader:
            day = clean_up_dict(data_line)
            # Convert into `int` type so we don't need to parse it afterwards
            dt = datetime.strptime(day['date'], '%Y-%m-%d')
            day['year'] = dt.year
            day['month'] = dt.month
            day['day'] = dt.day
            day['isholiday'] = bool(int(day['isholiday']))
            day['isworkday'] = bool(int(day['isworkday']))
            CACHE[year][day.pop('date')] = day

    return True
项目:pheweb    作者:statgen    | 项目源码 | 文件源码
def _import_phenolist_csv(f, has_header):
    # Note: If a csv (1) contains commas in quoted cells and (2) doesn't have any line that starts with a quoted cell,
    #       then sometimes this makes very bad choices.
    #       In particular, if all lines have the same number of some other character (even a letter), that character might become the delimeter.
    try:
        dialect = csv.Sniffer().sniff(f.read(4096))
    except Exception as exc:
        raise PheWebError("Sniffing csv format failed.  Check that your csv file is well-formed.  If it is, try delimiting with tabs or semicolons.") from exc
    if dialect.delimiter in string.ascii_letters or dialect.delimiter in string.digits:
        raise PheWebError("Our csv sniffer decided that {!r} looks like the most likely delimiter in your csv file, but that's crazy.")
    f.seek(0)
    try:
        rows = list(csv.reader(f, dialect))
    except ValueError:
        return None
    num_cols = len(rows[0])
    if has_header:
        fieldnames, rows = rows[0], rows[1:]
        if any(fieldname is None or fieldname == '' for fieldname in fieldnames):
            if has_header == 'augment':
                fieldnames = [i if fieldname is None else fieldname for i, fieldname in enumerate(fieldnames)]
            else:
                raise PheWebError('bad csv header')
        assert len(set(fieldnames)) == len(fieldnames)
    else:
        fieldnames = list(range(num_cols))
    return [{fieldnames[i]: row[i] for i in range(num_cols)} for row in rows]
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def read_key_value_file(csvfile):
    """Reads CSV file, parses content into dict

    Args:
        csvfile (FILE): Readable file

    Returns:
        DICT: Dictionary containing file content
    """
    kvstore = {}  # init key value store
    first_line = csvfile.readline()
    if 'key' not in first_line or 'value' not in first_line:
        csvfile.seek(0)  # Seek to start if first_line is not an header
    dialect = csv.Sniffer().sniff(first_line, delimiters=',\t')
    reader = csv.reader(csvfile, dialect)  # create reader
    for row in reader:
        kvstore[row[0]] = row[1]
    return kvstore
项目:centos-base-consul    作者:zeroc0d3lab    | 项目源码 | 文件源码
def csv_col_current(pl, segment_info, display_name='auto', name_format=' ({column_name:.15})'):
    '''Display CSV column number and column name

    Requires filetype to be set to ``csv``.

    :param bool or str name:
        May be ``True``, ``False`` and ``"auto"``. In the first case value from 
        the first raw will always be displayed. In the second case it will never 
        be displayed. In thi last case ``csv.Sniffer().has_header()`` will be 
        used to detect whether current file contains header in the first column.
    :param str name_format:
        String used to format column name (in case ``display_name`` is set to 
        ``True`` or ``"auto"``). Accepts ``column_name`` keyword argument.

    Highlight groups used: ``csv:column_number`` or ``csv``, ``csv:column_name`` or ``csv``.
    '''
    if vim_getbufoption(segment_info, 'filetype') != 'csv':
        return None
    line, col = segment_info['window'].cursor
    column_number, column_name = process_csv_buffer(pl, segment_info['buffer'], line, col, display_name)
    if not column_number:
        return None
    return [{
        'contents': column_number,
        'highlight_groups': ['csv:column_number', 'csv'],
    }] + ([{
        'contents': name_format.format(column_name=column_name),
        'highlight_groups': ['csv:column_name', 'csv'],
    }] if column_name else [])
项目:mattersend    作者:mtorromeo    | 项目源码 | 文件源码
def attach_file(self, filename, text=None, tabular=False, syntax='auto', fileinfo=False):
        attachment = Attachment()

        if tabular:
            syntax = None

        (mime, _) = mimetypes.guess_type(filename)
        attachment.title = os.path.basename(filename)

        if text is None:
            with open(filename, 'rUb') as f:
                text = f.read().decode('utf-8')

        if tabular:
            csvfile = StringIO(text.strip())

            if tabular == 'sniff':
                dialect = csv.Sniffer().sniff(text)
            else:
                dialect = tabular

            text = md_table(csv.reader(csvfile, dialect))

        elif syntax == 'auto':
            syntax = detect_syntax(attachment.title, mime)

        if syntax is not None:
            text = md_code(text, syntax)

        attachment.text = text

        if fileinfo:
            statinfo = os.stat(filename)
            attachment.add_field('Size', sizeof_fmt(statinfo.st_size), True)
            attachment.add_field('Mime', mime, True)

        self.attachments.append(attachment)
        return attachment
项目:mattersend    作者:mtorromeo    | 项目源码 | 文件源码
def send(channel, message='', filename=False, url=None, username=None,
         icon=None, syntax='auto', tabular=False, fileinfo=False,
         just_return=False, config_section='DEFAULT',
         config_name='mattersend', config_file=None):
    msg = Message(channel, url, username, icon, config_section,
                  config_name, config_file)

    if filename:
        if syntax == 'none':
            syntax = None
        msg.attach_file(filename, None, tabular, syntax, fileinfo)
    else:
        if tabular:
            syntax = None
            csvfile = StringIO(message.strip())

            if tabular == 'sniff':
                dialect = csv.Sniffer().sniff(message)
            else:
                dialect = tabular

            message = md_table(csv.reader(csvfile, dialect))

        elif syntax in ('auto', 'none'):
            syntax = None

        if syntax is not None:
            message = md_code(message, syntax)

    msg.text = message

    if just_return:
        payload = msg.get_payload()
        return "POST {}\n{}".format(msg.url, payload)

    msg.send()
项目:bedrock-core    作者:Bedrock-py    | 项目源码 | 文件源码
def loadMatrix(filepath):
    """
    use pandas to load the csv file into the dataframe,
    using a header if appropriate
    """
    with open(filepath, 'rbU') as csvfile:
        snippet = csvfile.read(2048)
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(snippet)
    if sniffer.has_header(snippet):
        df = pd.read_csv(filepath, dialect=dialect)
    else:
        df = pd.read_csv(filepath, dialect=dialect, header=None)

    return df
项目:encore    作者:statgen    | 项目源码 | 文件源码
def sniff_file(csvfile):
    chunk = "\n".join([x for _,x in zip(range(50), strip_comments(csvfile))])
    try:
        return csv.Sniffer().sniff(chunk, "\t|, ")
    except:
        return None
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_has_header(self):
        sniffer = csv.Sniffer()
        self.assertEqual(sniffer.has_header(self.sample1), False)
        self.assertEqual(sniffer.has_header(self.header+self.sample1), True)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_doublequote(self):
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(self.header)
        self.assertFalse(dialect.doublequote)
        dialect = sniffer.sniff(self.sample2)
        self.assertTrue(dialect.doublequote)
项目:MENGEL    作者:CodeSpaceHQ    | 项目源码 | 文件源码
def get_delimiter(path):
    with open(path, 'rb') as csvfile:
        return csv.Sniffer().sniff(csvfile.read(), delimiters=';,').delimiter


# Gets the ratio of missing values to existing values in a dataframe. Either operates on rows or columns, depending
# on input.
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_has_header(self):
        sniffer = csv.Sniffer()
        self.assertEqual(sniffer.has_header(self.sample1), False)
        self.assertEqual(sniffer.has_header(self.header1 + self.sample1),
                         True)