我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用csv.Sniffer()。
def append_by_csvs(self, market_situations_path, buy_offer_path, csv_merchant_id=None): with open(market_situations_path, 'r') as csvfile: has_header = csv.Sniffer().has_header(csvfile.read(16384)) csvfile.seek(0) if has_header: situation_data = csv.DictReader(csvfile) else: situation_data = csv.DictReader(csvfile, fieldnames=get_market_situation_fieldnames()) for line in situation_data: self.append_marketplace_situations(line, csv_merchant_id) self.update_timestamps() with open(buy_offer_path, 'r') as csvfile: has_header = csv.Sniffer().has_header(csvfile.read(16384)) csvfile.seek(0) if has_header: buy_offer_data = csv.DictReader(csvfile) else: buy_offer_data = csv.DictReader(csvfile, fieldnames=get_buy_offer_fieldnames()) for line in buy_offer_data: self.append_sales(line) self.print_info()
def get_csv_reader(input): # csv package does not support unicode input = str(input) # Special case: detect single-column files. # This check assumes that our only valid delimiters are commas and tabs. firstLine = input.split('\n')[0] if not ('\t' in firstLine or ',' in firstLine) \ or len(input.splitlines()) == 1: dialect = 'excel' else: # Take a data sample to determine dialect, but # don't include incomplete last line sample = '' sampleSize = 0 while len(sample) == 0: sampleSize += 5000 sample = '\n'.join(input[:sampleSize].splitlines()[:-1]) dialect = csv.Sniffer().sniff(sample) dialect.skipinitialspace = True return csv.DictReader(input.splitlines(), dialect=dialect)
def import_phenolist(filepath, has_header): # Return a list-of-dicts with the original column names, or integers if none. # It'd be great to use pandas for this. if not os.path.exists(filepath): raise PheWebError("ERROR: unable to import {!r} because it doesn't exist".format(filepath)) # 1. try openpyxl. phenos = _import_phenolist_xlsx(filepath, has_header) if phenos is not None: return phenos with read_maybe_gzip(filepath) as f: # 2. try json.load(f) try: return json.load(f) except ValueError: if filepath.endswith('.json'): raise PheWebError("The filepath {!r} ends with '.json' but reading it as json failed.".format(filepath)) # 3. try csv.reader() with csv.Sniffer().sniff() f.seek(0) phenos = _import_phenolist_csv(f, has_header) if phenos is not None: return phenos raise PheWebError("I couldn't figure out how to open the file {!r}, sorry.".format(filepath))
def test_write_feed(product_in_stock, monkeypatch): buffer = StringIO() write_feed(buffer) buffer.seek(0) dialect = csv.Sniffer().sniff(buffer.getvalue()) assert dialect.delimiter == csv.excel_tab.delimiter assert dialect.quotechar == csv.excel_tab.quotechar assert dialect.escapechar == csv.excel_tab.escapechar assert csv.Sniffer().has_header(buffer.getvalue()) lines = [line for line in csv.reader(buffer, dialect=csv.excel_tab)] assert len(lines) == 2 header = lines[0] google_required_fields = ['id', 'title', 'link', 'image_link', 'availability', 'price', 'condition'] for field in google_required_fields: assert field in header
def __init__(self, fname, labels): """ Initialize the corpus from a file. `labels` = are class labels present in the input file? => skip the first column """ logger.info("loading corpus from %s" % fname) self.fname = fname self.length = None self.labels = labels # load the first few lines, to guess the CSV dialect head = ''.join(itertools.islice(open(self.fname), 5)) self.headers = csv.Sniffer().has_header(head) self.dialect = csv.Sniffer().sniff(head) logger.info("sniffed CSV delimiter=%r, headers=%s" % (self.dialect.delimiter, self.headers))
def csvfile_to_wb(csv_filename): '''Open a CSV file and return an openpyxl workbook.''' logger.log( DEBUG_DETAILED, 'Converting CSV file {} into an XLSX workbook.'.format(csv_filename)) with open(csv_filename) as csv_file: dialect = csv.Sniffer().sniff(csv_file.read()) if USING_PYTHON2: for attr in dir(dialect): a = getattr(dialect, attr) if type(a) == unicode: setattr(dialect, attr, bytes(a)) csv_file.seek(0) reader = csv.reader(csv_file, dialect) wb = pyxl.Workbook() ws = wb.active for row_index, row in enumerate(reader, 1): for column_index, cell in enumerate(row, 1): if cell not in ('', None): ws.cell(row=row_index, column=column_index).value = cell return (wb, dialect)
def addfromcsv(self): if os.environ.get("REDIS_URL") : redis_url = os.environ.get("REDIS_URL") else: redis_url = "localhost" r_server = redis.from_url(redis_url) with open('mapofinnovation/public/spaces_ready_for_merge.csv', 'rb') as csv_file: dialect = csv.Sniffer().sniff(csv_file.read(), delimiters=',') csv_file.seek(0) csv_reader = csv.DictReader(csv_file, dialect=dialect) for row in csv_reader: key = row['name']+str(datetime.now()) row.update({'archived':False}) row.update({'verified':True}) r_server.hmset(re.sub(' ','',key),row) return {'success':'true'}
def from_csv(fp, field_names = None, **kwargs): dialect = csv.Sniffer().sniff(fp.read(1024)) fp.seek(0) reader = csv.reader(fp, dialect) table = PrettyTable(**kwargs) if field_names: table.field_names = field_names else: if py3k: table.field_names = [x.strip() for x in next(reader)] else: table.field_names = [x.strip() for x in reader.next()] for row in reader: table.add_row([x.strip() for x in row]) return table
def Open(self): try: if not os.path.isfile(self.v_filename): raise Spartacus.Utils.Exception('File {0} does not exist or is not a file.'.format(self.v_filename)) if self.v_extension == 'csv': self.v_file = open(self.v_filename, encoding=self.v_encoding) v_sample = self.v_file.read(1024) self.v_file.seek(0) v_sniffer = csv.Sniffer() if not v_sniffer.has_header(v_sample): raise Spartacus.Utils.Exception('CSV file {0} does not have a header.'.format(self.v_filename)) v_dialect = v_sniffer.sniff(v_sample) self.v_object = csv.DictReader(self.v_file, self.v_header, None, None, v_dialect) self.v_open = True elif self.v_extension == 'xlsx': self.v_object = openpyxl.load_workbook(self.v_filename, read_only=True) self.v_open = True else: raise Spartacus.Utils.Exception('File extension "{0}" not supported.'.format(self.v_extension)) except Spartacus.Utils.Exception as exc: raise exc except Exception as exc: raise Spartacus.Utils.Exception(str(exc))
def restoreSheet(sheetName, filepath, csvfile, overwrite=None): # Restore sheet from backup CSV file try: ##dialect = csv.Sniffer().sniff(csvfile.read(1024)) ##csvfile.seek(0) reader = csv.reader(csvfile, delimiter=',') # Ignore dialect for now rows = [row for row in reader] if not rows: raise Exception('No rows in CSV file %s for sheet %s' % (filepath, sheetName)) sdproxy.importSheet(sheetName, rows[0], rows[1:], overwrite=overwrite) return '' except Exception, excp: if Options['debug']: import traceback traceback.print_exc() return 'Error in restoreSheet: '+str(excp)
def test_sniff(self): sniffer = csv.Sniffer() dialect = sniffer.sniff(self.sample1) self.assertEqual(dialect.delimiter, ",") self.assertEqual(dialect.quotechar, '"') self.assertEqual(dialect.skipinitialspace, True) dialect = sniffer.sniff(self.sample2) self.assertEqual(dialect.delimiter, ":") self.assertEqual(dialect.quotechar, "'") self.assertEqual(dialect.skipinitialspace, False)
def test_delimiters(self): sniffer = csv.Sniffer() dialect = sniffer.sniff(self.sample3) # given that all three lines in sample3 are equal, # I think that any character could have been 'guessed' as the # delimiter, depending on dictionary order self.assertIn(dialect.delimiter, self.sample3) dialect = sniffer.sniff(self.sample3, delimiters="?,") self.assertEqual(dialect.delimiter, "?") dialect = sniffer.sniff(self.sample3, delimiters="/,") self.assertEqual(dialect.delimiter, "/") dialect = sniffer.sniff(self.sample4) self.assertEqual(dialect.delimiter, ";") dialect = sniffer.sniff(self.sample5) self.assertEqual(dialect.delimiter, "\t") dialect = sniffer.sniff(self.sample6) self.assertEqual(dialect.delimiter, "|") dialect = sniffer.sniff(self.sample7) self.assertEqual(dialect.delimiter, "|") self.assertEqual(dialect.quotechar, "'")
def csv2sos(path, keys=None, encoding=None, dialect=None): if not encoding: encoding = detectEncoding(path) print('Detected encoding: %s' % encoding) csvfile = open(path, 'rt', encoding=encoding) sosfile = open(path + '.sos', 'wt', encoding='utf8') if not dialect: dialect = csv.Sniffer().sniff(csvfile.read(1024*1024), delimiters=[';','\t',',']) print('Detected csv dialect: %s' % dialect) csvfile.seek(0) reader = csv.DictReader(csvfile, dialect=dialect) i = 0 for row in reader: sosfile.write(str(i) + '\t' + json.dumps(row, ensure_ascii=False) + '\n') i += 1 if i % 100000 == 0: print("%10d items converted" % i) csvfile.close() sosfile.close()
def test_delimiters(self): sniffer = csv.Sniffer() dialect = sniffer.sniff(self.sample3) # given that all three lines in sample3 are equal, # I think that any character could have been 'guessed' as the # delimiter, depending on dictionary order self.assertIn(dialect.delimiter, self.sample3) dialect = sniffer.sniff(self.sample3, delimiters="?,") self.assertEqual(dialect.delimiter, "?") dialect = sniffer.sniff(self.sample3, delimiters="/,") self.assertEqual(dialect.delimiter, "/") dialect = sniffer.sniff(self.sample4) self.assertEqual(dialect.delimiter, ";") dialect = sniffer.sniff(self.sample5) self.assertEqual(dialect.delimiter, "\t") dialect = sniffer.sniff(self.sample6) self.assertEqual(dialect.delimiter, "|") dialect = sniffer.sniff(self.sample7) self.assertEqual(dialect.delimiter, "|") self.assertEqual(dialect.quotechar, "'") dialect = sniffer.sniff(self.sample8) self.assertEqual(dialect.delimiter, '+') dialect = sniffer.sniff(self.sample9) self.assertEqual(dialect.delimiter, '+') self.assertEqual(dialect.quotechar, "'")
def parseDelimiter(self,f): infile = open(f) lines = infile.readlines() infile.close() sniffer = csv.Sniffer() text = sniffer.sniff(lines[0]) return text.delimiter ############################################################################### # The following are GUI shortcut tools ############################################################################### ## Create an action for GUIs
def load_input_data(points): """Creates DictReader from *.csv file. :param points (file object): *.csv file with 'lon' (required), 'lat' (required), 'name' (optional) columns. Returns: data (csv.DictReader) """ dialect = csv.Sniffer().sniff(points.read()) points.seek(0) data = csv.DictReader(points, dialect=dialect) return data
def loadDataset(fileName): with open(fileName, 'rU') as trainingInput: # detect the "dialect" of this type of csv file try: dialect = csv.Sniffer().sniff(trainingInput.read(1024)) except: # if we fail to detect the dialect, defautl to Microsoft Excel dialect = 'excel' trainingInput.seek(0) trainingRows = csv.reader(trainingInput, dialect) allTweets = [] allTweetSentiments = [] entireDataset = [] for row in trainingRows: # csv only gives us an iterable, not the data itself entireDataset.append(row) return entireDataset
def process_coordinates(self, processor_handler, output): in_cols = self.in_cols out_cols= self.out_cols with open(self.datafile.name) as file_in: # detect delimiter dialect = csv.Sniffer().sniff(file_in.read(2048), delimiters=',\t') file_in.seek(0) reader = csv.reader(file_in, dialect) writer = csv.writer(output, dialect) for row in reader: new_row = row[:] # copy try: x = float(row[in_cols[0]]) y = float(row[in_cols[1]]) except: # go to next column if not number writer.writerow(new_row) continue # translate x, y = processor_handler(x, y) new_row[out_cols[0]] = int(x) new_row[out_cols[1]] = int(y) writer.writerow(new_row)
def generate_rows(f): sniffer = csv.Sniffer() dialect = sniffer.sniff(f.readline()) f.seek(0) reader = csv.reader(f, dialect) for line in reader: yield line
def load_data(year): ''' Load data into memory cache ''' year = str(year) if year in CACHE: return True data_file = os.path.join( os.path.dirname(__file__), 'data', '{}.csv'.format(year) ) if not os.path.isfile(data_file): return False CACHE[year] = {} with io.open(data_file, encoding='utf-8') as rf: # Detect CSV header line has_header = csv.Sniffer().has_header(rf.read(1024)) rf.seek(0) reader = csv.DictReader(rf, DATA_FIELDS) if has_header: next(reader) for data_line in reader: day = clean_up_dict(data_line) # Convert into `int` type so we don't need to parse it afterwards dt = datetime.strptime(day['date'], '%Y-%m-%d') day['year'] = dt.year day['month'] = dt.month day['day'] = dt.day day['isholiday'] = bool(int(day['isholiday'])) day['isworkday'] = bool(int(day['isworkday'])) CACHE[year][day.pop('date')] = day return True
def _import_phenolist_csv(f, has_header): # Note: If a csv (1) contains commas in quoted cells and (2) doesn't have any line that starts with a quoted cell, # then sometimes this makes very bad choices. # In particular, if all lines have the same number of some other character (even a letter), that character might become the delimeter. try: dialect = csv.Sniffer().sniff(f.read(4096)) except Exception as exc: raise PheWebError("Sniffing csv format failed. Check that your csv file is well-formed. If it is, try delimiting with tabs or semicolons.") from exc if dialect.delimiter in string.ascii_letters or dialect.delimiter in string.digits: raise PheWebError("Our csv sniffer decided that {!r} looks like the most likely delimiter in your csv file, but that's crazy.") f.seek(0) try: rows = list(csv.reader(f, dialect)) except ValueError: return None num_cols = len(rows[0]) if has_header: fieldnames, rows = rows[0], rows[1:] if any(fieldname is None or fieldname == '' for fieldname in fieldnames): if has_header == 'augment': fieldnames = [i if fieldname is None else fieldname for i, fieldname in enumerate(fieldnames)] else: raise PheWebError('bad csv header') assert len(set(fieldnames)) == len(fieldnames) else: fieldnames = list(range(num_cols)) return [{fieldnames[i]: row[i] for i in range(num_cols)} for row in rows]
def read_key_value_file(csvfile): """Reads CSV file, parses content into dict Args: csvfile (FILE): Readable file Returns: DICT: Dictionary containing file content """ kvstore = {} # init key value store first_line = csvfile.readline() if 'key' not in first_line or 'value' not in first_line: csvfile.seek(0) # Seek to start if first_line is not an header dialect = csv.Sniffer().sniff(first_line, delimiters=',\t') reader = csv.reader(csvfile, dialect) # create reader for row in reader: kvstore[row[0]] = row[1] return kvstore
def csv_col_current(pl, segment_info, display_name='auto', name_format=' ({column_name:.15})'): '''Display CSV column number and column name Requires filetype to be set to ``csv``. :param bool or str name: May be ``True``, ``False`` and ``"auto"``. In the first case value from the first raw will always be displayed. In the second case it will never be displayed. In thi last case ``csv.Sniffer().has_header()`` will be used to detect whether current file contains header in the first column. :param str name_format: String used to format column name (in case ``display_name`` is set to ``True`` or ``"auto"``). Accepts ``column_name`` keyword argument. Highlight groups used: ``csv:column_number`` or ``csv``, ``csv:column_name`` or ``csv``. ''' if vim_getbufoption(segment_info, 'filetype') != 'csv': return None line, col = segment_info['window'].cursor column_number, column_name = process_csv_buffer(pl, segment_info['buffer'], line, col, display_name) if not column_number: return None return [{ 'contents': column_number, 'highlight_groups': ['csv:column_number', 'csv'], }] + ([{ 'contents': name_format.format(column_name=column_name), 'highlight_groups': ['csv:column_name', 'csv'], }] if column_name else [])
def attach_file(self, filename, text=None, tabular=False, syntax='auto', fileinfo=False): attachment = Attachment() if tabular: syntax = None (mime, _) = mimetypes.guess_type(filename) attachment.title = os.path.basename(filename) if text is None: with open(filename, 'rUb') as f: text = f.read().decode('utf-8') if tabular: csvfile = StringIO(text.strip()) if tabular == 'sniff': dialect = csv.Sniffer().sniff(text) else: dialect = tabular text = md_table(csv.reader(csvfile, dialect)) elif syntax == 'auto': syntax = detect_syntax(attachment.title, mime) if syntax is not None: text = md_code(text, syntax) attachment.text = text if fileinfo: statinfo = os.stat(filename) attachment.add_field('Size', sizeof_fmt(statinfo.st_size), True) attachment.add_field('Mime', mime, True) self.attachments.append(attachment) return attachment
def send(channel, message='', filename=False, url=None, username=None, icon=None, syntax='auto', tabular=False, fileinfo=False, just_return=False, config_section='DEFAULT', config_name='mattersend', config_file=None): msg = Message(channel, url, username, icon, config_section, config_name, config_file) if filename: if syntax == 'none': syntax = None msg.attach_file(filename, None, tabular, syntax, fileinfo) else: if tabular: syntax = None csvfile = StringIO(message.strip()) if tabular == 'sniff': dialect = csv.Sniffer().sniff(message) else: dialect = tabular message = md_table(csv.reader(csvfile, dialect)) elif syntax in ('auto', 'none'): syntax = None if syntax is not None: message = md_code(message, syntax) msg.text = message if just_return: payload = msg.get_payload() return "POST {}\n{}".format(msg.url, payload) msg.send()
def loadMatrix(filepath): """ use pandas to load the csv file into the dataframe, using a header if appropriate """ with open(filepath, 'rbU') as csvfile: snippet = csvfile.read(2048) sniffer = csv.Sniffer() dialect = sniffer.sniff(snippet) if sniffer.has_header(snippet): df = pd.read_csv(filepath, dialect=dialect) else: df = pd.read_csv(filepath, dialect=dialect, header=None) return df
def sniff_file(csvfile): chunk = "\n".join([x for _,x in zip(range(50), strip_comments(csvfile))]) try: return csv.Sniffer().sniff(chunk, "\t|, ") except: return None
def test_has_header(self): sniffer = csv.Sniffer() self.assertEqual(sniffer.has_header(self.sample1), False) self.assertEqual(sniffer.has_header(self.header+self.sample1), True)
def test_doublequote(self): sniffer = csv.Sniffer() dialect = sniffer.sniff(self.header) self.assertFalse(dialect.doublequote) dialect = sniffer.sniff(self.sample2) self.assertTrue(dialect.doublequote)
def get_delimiter(path): with open(path, 'rb') as csvfile: return csv.Sniffer().sniff(csvfile.read(), delimiters=';,').delimiter # Gets the ratio of missing values to existing values in a dataframe. Either operates on rows or columns, depending # on input.
def test_has_header(self): sniffer = csv.Sniffer() self.assertEqual(sniffer.has_header(self.sample1), False) self.assertEqual(sniffer.has_header(self.header1 + self.sample1), True)