我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用csv.excel_tab()。
def open_anything(source, format, ignoreheader, force_unbuffered=False): source = open_regular_or_compressed(source) if force_unbuffered: # simply disabling buffering is not enough, see this for details: http://stackoverflow.com/a/6556862 source = iter(source.readline, '') if format == 'vw': return source if format == 'tsv': reader = csv.reader(source, csv.excel_tab) if ignoreheader: reader.next() elif format == 'csv': reader = csv.reader(source, csv.excel) if ignoreheader: reader.next() else: raise ValueError('format not supported: %s' % format) return reader
def main(): if len(sys.argv) < 2: sys.stderr.write("USAGE: %s measurement\n" % sys.argv[0]) sys.exit(1) path = sys.argv[1] with open(os.path.join(path, "metadata.json")) as f: metadata = json.load(f) start = date(metadata["start"][:-1]) end = date(metadata["start"][:-1]) print('open measurement "%s" from "%s" to "%s"', metadata["name"], start, end) for service in metadata["services"]: print('open service "%s"' % service["name"]) with open(os.path.join(path, service["filename"])) as csvfile: r = csv.DictReader(csvfile, dialect=csv.excel_tab) for row in r: print(row["time"])
def test_write_feed(product_in_stock, monkeypatch): buffer = StringIO() write_feed(buffer) buffer.seek(0) dialect = csv.Sniffer().sniff(buffer.getvalue()) assert dialect.delimiter == csv.excel_tab.delimiter assert dialect.quotechar == csv.excel_tab.quotechar assert dialect.escapechar == csv.excel_tab.escapechar assert csv.Sniffer().has_header(buffer.getvalue()) lines = [line for line in csv.reader(buffer, dialect=csv.excel_tab)] assert len(lines) == 2 header = lines[0] google_required_fields = ['id', 'title', 'link', 'image_link', 'availability', 'price', 'condition'] for field in google_required_fields: assert field in header
def write_feed(file_obj): """ Writes feed contents info provided file object """ writer = csv.DictWriter(file_obj, ATTRIBUTES, dialect=csv.excel_tab) writer.writeheader() categories = Category.objects.all() discounts = Sale.objects.all().prefetch_related('products', 'categories') attributes_dict = {a.slug: a.pk for a in ProductAttribute.objects.all()} attribute_values_dict = {smart_text(a.pk): smart_text(a) for a in AttributeChoiceValue.objects.all()} category_paths = {} current_site = Site.objects.get_current() for item in get_feed_items(): item_data = item_attributes(item, categories, category_paths, current_site, discounts, attributes_dict, attribute_values_dict) writer.writerow(item_data)
def main(): from sys import argv, stdout from csv import DictWriter, excel_tab net_data = car(sexp.load(open(argv[1]))) pcb_data = car(sexp.load(open(argv[2]))) comps = comps_from_netlist(net_data) bom = bom_from_comps(comps) bof = open(argv[3], 'w') bom_writer = DictWriter(bof, fieldnames=_bom_fields, extrasaction='ignore') bom_writer.writeheader() for b in bom: bom_writer.writerow(b) xyrs = do_xyrs(pcb_data, comps) xof = open(argv[4], 'w') xyrs_writer = DictWriter(xof, fieldnames=_xyrs_fields, dialect=excel_tab, extrasaction='ignore') xyrs_writer.writeheader() for r in xyrs: xyrs_writer.writerow(r)
def read_csv_to_dict(source, encoding='utf-8', dialect=csv.excel_tab): """ Reads a CSV to a list of dicts CSV must have header row """ reader = csv.reader( BytesIO(source.decode(encoding).encode('utf-8')), dialect=dialect ) # get header header = reader.next() while not header and header is not None: header = reader.next() for row in reader: yield dict(zip(header, row))
def tab(self, output): """Output data in excel-compatible tab-delimited format""" import csv csvwriter = csv.writer(self.outfile, dialect=csv.excel_tab) csvwriter.writerows(output)
def _create_writer(self, filedir, file, inreader: csv.DictReader) -> RF2DictWriter: outdir = filedir.replace(self._indir, self._outdir) os.makedirs(outdir, exist_ok=True) output_file = os.path.join(filedir.replace(self._indir, self._outdir), file) is_new = self._init or not os.path.exists(output_file) writer = RF2DictWriter(open(output_file, 'w' if is_new else 'a'), fieldnames=inreader.fieldnames, dialect=csv.excel_tab) if is_new: writer.writeheader() return writer
def test_outputs_tab(self): sample = StringIO() writer = csv.writer(sample, dialect=csv.excel_tab) writer.writerows(self.output_data) sample.seek(0) self.of.tab(self.output_data) self.outfile.seek(0) self.assertEqual(self.outfile.read(), sample.read())
def dump_app(app_args, path, begin, now, container_image_pattern = ""): # the app_args argument is supplied in the format # <app name>:<additional influxdb table 1>:<additional influxdb table 2>:... app_args = app_args.split(":") # get the tag (keys and values) and fields from the docker measurements and # any additional tables app = dump_column_names(app_args) # build queries queries = [] # always extract docker metrics (here referred to as 'system metrics') for system in SYSTEM_METRICS: pattern = CONTAINER_IMAGE_PATTERNS[app.name].format(container_image_pattern) q = """select * from "docker_container_{}" where container_name =~ /{}/ and container_image =~ /{}/ and time > '%s' and time < '%s' """.format(system, app.name, pattern) queries.append(scroll(q, begin, now)) if len(app_args) > 1: for app_arg in app_args[1:]: q = "select * from \"{}\" where time > '%s' and time < '%s'".format(app_arg) print(q) queries.append(scroll(q, begin, now, prefix = app.name)) path = os.path.join(path, app.filename) with gzip.open(path, "wb") as f: columns = app.fields + app.tags + ["time"] writer = csv.DictWriter(f, fieldnames=columns, dialect=csv.excel_tab, extrasaction='ignore') writer.writeheader() for _, row in heapq.merge(*queries): writer.writerow(row) return app # in general, all apps have docker metrics for them, se we retrieve the # metrics stored in 'docker_container_*' tables by default. we save these # metrics under an app name equal to the first string in a sequence of strings # separated by a ':'. app-specific metrics for such app names are gathered from # the tables specified in subsequent strings.