我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用csv.get_dialect()。
def __call__(self, context): if isinstance(self._object, types.StringTypes): self._object = context[self._object] # get dialect object if isinstance(self._dialect, types.StringTypes): dialect = csv.get_dialect(self._dialect) if self._path.startswith("memory:"): buffer_ = StringIO.StringIO() self._write_object(buffer_, dialect) buffer_.seek(0) context[self._path[len("memory:"):]] = buffer_ else: with open(self._path, "w") as f: self._write_object(f, dialect) return self._path
def test_registry_badargs(self): self.assertRaises(TypeError, csv.list_dialects, None) self.assertRaises(TypeError, csv.get_dialect) self.assertRaises(csv.Error, csv.get_dialect, None) self.assertRaises(csv.Error, csv.get_dialect, "nonesuch") self.assertRaises(TypeError, csv.unregister_dialect) self.assertRaises(csv.Error, csv.unregister_dialect, None) self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch") self.assertRaises(TypeError, csv.register_dialect, None) self.assertRaises(TypeError, csv.register_dialect, None, None) self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0) self.assertRaises(TypeError, csv.register_dialect, "nonesuch", badargument=None) self.assertRaises(TypeError, csv.register_dialect, "nonesuch", quoting=None) self.assertRaises(TypeError, csv.register_dialect, [])
def test_bad_dialect(self): # Unknown parameter self.assertRaises(TypeError, csv.reader, [], bad_attr = 0) # Bad values self.assertRaises(TypeError, csv.reader, [], delimiter = None) self.assertRaises(TypeError, csv.reader, [], quoting = -1) self.assertRaises(TypeError, csv.reader, [], quoting = 100) # See issue #22995 ## def test_copy(self): ## for name in csv.list_dialects(): ## dialect = csv.get_dialect(name) ## self.assertRaises(TypeError, copy.copy, dialect) ## def test_pickle(self): ## for name in csv.list_dialects(): ## dialect = csv.get_dialect(name) ## for proto in range(pickle.HIGHEST_PROTOCOL + 1): ## self.assertRaises(TypeError, pickle.dumps, dialect, proto)
def __init__(self, n_samples, out_file, pid, lid, keep_cols, n_retry, delimiter, dataset, pred_name, ui, file_context, fast_mode, encoding, skip_row_id, output_delimiter): self.n_samples = n_samples self.out_file = out_file self.project_id = pid self.model_id = lid self.keep_cols = keep_cols self.n_retry = n_retry self.delimiter = delimiter self.dataset = dataset self.pred_name = pred_name self.out_stream = None self._ui = ui self.file_context = file_context self.fast_mode = fast_mode self.encoding = encoding self.skip_row_id = skip_row_id self.output_delimiter = output_delimiter # dataset_dialect and writer_dialect are set by # investigate_encoding_and_dialect in utils self.dialect = csv.get_dialect('dataset_dialect') self.writer_dialect = csv.get_dialect('writer_dialect') self.scoring_succeeded = False # Removes shelves when True self.is_open = False # Removes shelves when True
def close(self): if not self.is_open: self._ui.debug('CLOSE CALLED ON CLOSED RUNCONTEXT') return self.is_open = False self._ui.debug('CLOSE CALLED ON RUNCONTEXT') self.dialect = csv.get_dialect('dataset_dialect') self.writer_dialect = csv.get_dialect('writer_dialect') values = ['delimiter', 'doublequote', 'escapechar', 'lineterminator', 'quotechar', 'quoting', 'skipinitialspace', 'strict'] self.dialect = {k: getattr(self.dialect, k) for k in values if hasattr(self.dialect, k)} self.writer_dialect = {k: getattr(self.writer_dialect, k) for k in values if hasattr(self.writer_dialect, k)} self.db.sync() self.db.close() if self.out_stream is not None: self.out_stream.close()
def __call__(self, context): # get Dialect object if isinstance(self._dialect, types.StringTypes): dialect = csv.get_dialect(self._dialect) # get content content = self._content if self._content is None: if self._path.startswith(HTTP_SCHEMA): content = requests.get(self._path).text else: with open(self._path, "r") as f: content = f.read() if isinstance(content, types.StringTypes): content = StringIO.StringIO(content) result = [] csv_reader = csv.reader(content, dialect=dialect) for row in csv_reader: self._handle_record(row, result.append) return self._handle_result(context, result)
def test_registry(self): class myexceltsv(csv.excel): delimiter = "\t" name = "myexceltsv" expected_dialects = csv.list_dialects() + [name] expected_dialects.sort() csv.register_dialect(name, myexceltsv) self.addCleanup(csv.unregister_dialect, name) self.assertEqual(csv.get_dialect(name).delimiter, '\t') got_dialects = sorted(csv.list_dialects()) self.assertEqual(expected_dialects, got_dialects)
def test_register_kwargs(self): name = 'fedcba' csv.register_dialect(name, delimiter=';') self.addCleanup(csv.unregister_dialect, name) self.assertEqual(csv.get_dialect(name).delimiter, ';') self.assertEqual([['X', 'Y', 'Z']], list(csv.reader(['X;Y;Z'], name)))
def __init__(self, filename, renmwo = True): self.filename = filename self.renmwo = "%s.~renmwo%d~" % (filename, os.getpid()) if renmwo else filename self.file = open(self.renmwo, "w") self.writer = csv.writer(self.file, dialect = csv.get_dialect("excel-tab"))
def open(self): if self.is_open: self._ui.debug('OPEN CALLED ON ALREADY OPEN RUNCONTEXT') return self.is_open = True self._ui.debug('OPEN CALLED ON RUNCONTEXT') csv.register_dialect('dataset_dialect', **self.dialect) csv.register_dialect('writer_dialect', **self.writer_dialect) self.dialect = csv.get_dialect('dataset_dialect') self.writer_dialect = csv.get_dialect('writer_dialect') self.db = shelve.open(self.file_context.file_name, writeback=True) if six.PY2: self.out_stream = open(self.out_file, 'ab') elif six.PY3: self.out_stream = open(self.out_file, 'a', newline='')
def __init__(self, fd, encoding, ui): self.fd = fd # dataset_dialect is set by investigate_encoding_and_dialect in utils self.dialect = csv.get_dialect('dataset_dialect') self.encoding = encoding self._ui = ui
def go(self): dataset_dialect = csv.get_dialect('dataset_dialect') args = ([self.batch_gen_args, SerializableDialect.from_dialect(dataset_dialect), self.queue]) self.p = multiprocessing.Process(target=self._shove, args=args, name='Shovel_Proc') self.p.start() return self.p
def test_investigate_encoding_and_dialect(): with UI(None, logging.DEBUG, stdout=False) as ui: data = 'tests/fixtures/windows_encoded.csv' encoding = investigate_encoding_and_dialect(data, None, ui) dialect = csv.get_dialect('dataset_dialect') assert encoding == 'iso-8859-2' assert dialect.lineterminator == '\r\n' assert dialect.quotechar == '"' assert dialect.delimiter == ','
def test_investigate_encoding_and_dialect_skip_dialect(): with UI(None, logging.DEBUG, stdout=False) as ui: with mock.patch('datarobot_batch_scoring.reader.csv.Sniffer') as sn: data = 'tests/fixtures/windows_encoded.csv' encoding = investigate_encoding_and_dialect(data, None, ui, fast=False, encoding='', skip_dialect=True) assert encoding == 'iso-8859-2' assert not sn.called dialect = csv.get_dialect('dataset_dialect') assert dialect.delimiter == ','
def test_investigate_encoding_and_dialect_substitute_delimiter(): with UI(None, logging.DEBUG, stdout=False) as ui: with mock.patch('datarobot_batch_scoring.reader.csv.Sniffer') as sn: data = 'tests/fixtures/windows_encoded.csv' encoding = investigate_encoding_and_dialect(data, '|', ui, fast=False, encoding='utf-8', skip_dialect=True) assert encoding == 'utf-8' # Intentionally wrong assert not sn.called dialect = csv.get_dialect('dataset_dialect') assert dialect.delimiter == '|'
def __init__(self, f, engine=None, **kwds): self.f = f if engine is not None: engine_specified = True else: engine = 'python' engine_specified = False self._engine_specified = kwds.get('engine_specified', engine_specified) if kwds.get('dialect') is not None: dialect = kwds['dialect'] if dialect in csv.list_dialects(): dialect = csv.get_dialect(dialect) kwds['delimiter'] = dialect.delimiter kwds['doublequote'] = dialect.doublequote kwds['escapechar'] = dialect.escapechar kwds['skipinitialspace'] = dialect.skipinitialspace kwds['quotechar'] = dialect.quotechar kwds['quoting'] = dialect.quoting if kwds.get('header', 'infer') == 'infer': kwds['header'] = 0 if kwds.get('names') is None else None self.orig_options = kwds # miscellanea self.engine = engine self._engine = None options = self._get_options_with_defaults(engine) self.chunksize = options.pop('chunksize', None) self.squeeze = options.pop('squeeze', False) # might mutate self.engine self.options, self.engine = self._clean_options(options, engine) if 'has_index_names' in kwds: self.options['has_index_names'] = kwds['has_index_names'] self._make_engine(self.engine)