我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.QUOTE_MINIMAL。
def load(corpus_csv_file: Path, sampled_training_example_count: Optional[int] = None) -> 'Corpus': import csv with corpus_csv_file.open(encoding='utf8') as opened_csv: reader = csv.reader(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) def to_absolute(audio_file_path: Path) -> Path: return audio_file_path if audio_file_path.is_absolute() else Path( corpus_csv_file.parent) / audio_file_path examples = [ ( LabeledExampleFromFile( audio_file=to_absolute(Path(audio_file_path)), id=id, label=label, positional_label=None if positional_label == "" else PositionalLabel.deserialize( positional_label)), Phase[phase]) for id, audio_file_path, label, phase, positional_label in reader] return Corpus(training_examples=[e for e, phase in examples if phase == Phase.training], test_examples=[e for e, phase in examples if phase == Phase.test], sampled_training_example_count=sampled_training_example_count)
def sensor_live(self): x = [] y1 = [] y2 = [] for i in range(0,330,30): # change time interval here, if required self.sensor_wake() time.sleep(10) pm = self.sensor_read() if pm is not None: x.append(i) y1.append(pm[0]) y2.append(pm[1]) with open('/home/pi/data.csv', 'ab') as csvfile: file = csv.writer(csvfile, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL) file.writerow([datetime.datetime.now().replace(microsecond=0).isoformat().replace('T', ' '), pm[0], pm[1]]) csvfile.close() line1, = self.ax.plot(x,y1,'r-x') line2, = self.ax.plot(x,y2,'b-x') self.canvas.draw() self.sensor_sleep() time.sleep(20)
def list_to_csv(directory_and_filename, list): if directory_and_filename[-4:] == '.csv': directory_and_filename = directory_and_filename[:-4] with open(directory_and_filename + '.csv', 'wb') as csvfile: spamwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL) for row in list: try: spamwriter.writerow(row) except UnicodeEncodeError: new_row = [] for element in row: if type(element) is unicode: new_row.append(element.encode('utf-8')) else: new_row.append(element) csvfile.close()
def __init__(self, file_path, type='text', **kwargs): self._file_path = file_path self._type = type self._kwargs = kwargs self._file_handler = open(file_path, 'r') if type == 'json_line': # pre-compile json path, raise exception if not exists self._id_path_parser = parse(kwargs['id_path']) elif type == 'csv': self._id_column = kwargs['id_column'] # raise exception if not exists delimiter = kwargs['delimiter'] if 'delimiter' in kwargs else ',' quote_char = kwargs['quote_char'] if 'quote_char' in kwargs else '"' quoting = kwargs['quoting'] if 'quoting' in kwargs else csv.QUOTE_MINIMAL column_names = kwargs['column_names'] if 'column_names' in kwargs else None self._csv_reader = csv.DictReader( self._file_handler, delimiter=delimiter, quotechar=quote_char, quoting=quoting, fieldnames=column_names) else: # text self._id_prefix = hashlib.md5(file_path).hexdigest()[:6]
def save_to_tsv(out_file, geocoding, out_path=''): try: out_file_with_extension = '{}.tsv'.format(out_file) with open(os.path.realpath(os.path.join(out_path, out_file_with_extension)), 'w+', newline='') as csvfile: fieldnames = ['geonameId', 'name', 'toponymName', 'fcl', 'fcode', 'fcodeName', 'fclName', 'lat', 'lng', 'adminCode1', 'adminName1', 'adminCode2', 'adminName2', 'adminCode3', 'adminName3', 'adminCode4', 'adminName4', 'countryName', 'population', 'continentCode', 'countryCode', ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter='\t', quotechar='"', quoting=csv.QUOTE_MINIMAL) writer.writeheader() id, locations = geocoding[0] for data in locations: writer.writerow(data) csvfile.close() return out_file_with_extension except Exception as e: logger.error('Error while writing tsv file {}'.format(e)) raise
def render(self, data, media_type=None, renderer_context=None): sbuf = StringIO() writer = csv.writer(sbuf, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) writer.writerow(( 'Vendor', 'Kit', 'Subkit', 'Version', 'Barcode', 'Index Sequence', 'Full Sequence', )) for result in data: writer.writerow(( result['vendor'], result['kit'], result['subkit'], result['version'], result['barcode'], result['index_sequence'], result['full_sequence'] )) return sbuf.getvalue().encode(self.charset)
def save_db_objects(db_engine, db_objects): """Saves a collection of SQLAlchemy model objects to the database using a COPY command Args: db_engine (sqlalchemy.engine) db_objects (list) SQLAlchemy model objects, corresponding to a valid table """ with tempfile.TemporaryFile(mode='w+') as f: writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL) for db_object in db_objects: writer.writerow([ getattr(db_object, col.name) for col in db_object.__table__.columns ]) f.seek(0) postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
def writeDataToFile(self): end_time = 0 with open(self.datafilePath, 'a', newline='') as csvfile: spamwriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL) if self.newfile: spamwriter.writerow(self.header) for dr in self.data: row = [] end_time = dr for v in self.data[dr]: row.append(self.data[dr][v]) spamwriter.writerow(row) with open(self.timefilePath, "w") as ft: ft.write("%d\n" % end_time) ft.close() self.data.clear() print(datetime.datetime.utcnow()) print("data written")
def run(self): users = {} for tweet_str in self.input().open('r'): tweet = json.loads(tweet_str) user = tweet['user']['screen_name'] followers = int(tweet['user']['followers_count']) following = int(tweet['user']['friends_count']) if following > 0: r = followers / float(following) users[user] = r with self.output().open('w') as fp_counts: writer = csv.DictWriter(fp_counts, delimiter=',', quoting=csv.QUOTE_MINIMAL, fieldnames=['user', 'count']) writer.writeheader() for user, r in users.items(): writer.writerow({'user': user, 'count': r})
def _test_default_attrs(self, ctor, *args): obj = ctor(*args) # Check defaults self.assertEqual(obj.dialect.delimiter, ',') self.assertEqual(obj.dialect.doublequote, True) self.assertEqual(obj.dialect.escapechar, None) self.assertEqual(obj.dialect.lineterminator, "\r\n") self.assertEqual(obj.dialect.quotechar, '"') self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL) self.assertEqual(obj.dialect.skipinitialspace, False) self.assertEqual(obj.dialect.strict, False) # Try deleting or changing attributes (they are read-only) self.assertRaises(AttributeError, delattr, obj.dialect, 'delimiter') self.assertRaises(AttributeError, setattr, obj.dialect, 'delimiter', ':') self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting') self.assertRaises(AttributeError, setattr, obj.dialect, 'quoting', None)
def test_write_escape(self): self._write_test(['a',1,'p,q'], 'a,1,"p,q"', escapechar='\\') self.assertRaises(csv.Error, self._write_test, ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""', escapechar=None, doublequote=False) self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""', escapechar='\\', doublequote = False) self._write_test(['"'], '""""', escapechar='\\', quoting = csv.QUOTE_MINIMAL) self._write_test(['"'], '\\"', escapechar='\\', quoting = csv.QUOTE_MINIMAL, doublequote = False) self._write_test(['"'], '\\"', escapechar='\\', quoting = csv.QUOTE_NONE) self._write_test(['a',1,'p,q'], 'a,1,p\\,q', escapechar='\\', quoting = csv.QUOTE_NONE)
def save_order_list(self, tday): orders = self.id2order.keys() if len(self.id2order) > 1: orders.sort() order_list = [self.id2order[key] for key in orders] filename = self.file_prefix + 'order_' + tday.strftime('%y%m%d') + '.csv' with open(filename, 'wb') as log_file: file_writer = csv.writer(log_file, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL); file_writer.writerow( ['order_ref', 'local_id', 'sysID', 'inst', 'volume', 'filledvolume', 'filledprice', 'filledorders', 'action_type', 'direction', 'price_type', 'limitprice', 'order_time', 'status', 'order_class', 'trade_ref']) for iorder in order_list: forders = [ str(key) + ':' + '_'.join([str(s) for s in iorder.filled_orders[key]]) for key in iorder.filled_orders if len(str(key))>0 ] filled_str = '|'.join(forders) file_writer.writerow( [iorder.order_ref, iorder.local_id, iorder.sys_id, iorder.instrument, iorder.volume, iorder.filled_volume, iorder.filled_price, filled_str, iorder.action_type, iorder.direction, iorder.price_type, iorder.limit_price, iorder.start_tick, iorder.status, iorder.type, iorder.trade_ref])
def _test_default_attrs(self, ctor, *args): obj = ctor(*args) # Check defaults self.assertEqual(obj.dialect.delimiter, ',') self.assertEqual(obj.dialect.doublequote, True) self.assertEqual(obj.dialect.escapechar, None) self.assertEqual(obj.dialect.lineterminator, "\r\n") self.assertEqual(obj.dialect.quotechar, '"') self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL) self.assertEqual(obj.dialect.skipinitialspace, False) self.assertEqual(obj.dialect.strict, False) # Try deleting or changing attributes (they are read-only) self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter') self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':') self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting') self.assertRaises(AttributeError, setattr, obj.dialect, 'quoting', None)
def test_write_escape(self): self._write_test(['a',1,'p,q'], 'a,1,"p,q"', escapechar='\\') self._write_error_test(csv.Error, ['a',1,'p,"q"'], escapechar=None, doublequote=False) self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""', escapechar='\\', doublequote = False) self._write_test(['"'], '""""', escapechar='\\', quoting = csv.QUOTE_MINIMAL) self._write_test(['"'], '\\"', escapechar='\\', quoting = csv.QUOTE_MINIMAL, doublequote = False) self._write_test(['"'], '\\"', escapechar='\\', quoting = csv.QUOTE_NONE) self._write_test(['a',1,'p,q'], 'a,1,p\\,q', escapechar='\\', quoting = csv.QUOTE_NONE)
def create_memcsv(self): output = io.StringIO() writer = csv.writer(output, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) writer.writerow(self.data_headers) for row in self.data: writer.writerow(row) self.data = output if self.data_subcat is not None: output1 = io.StringIO() writer = csv.writer(output1, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) writer.writerow(self.data_subcat_headers) for row in self.data_subcat: writer.writerow(row) self.data_subcat = output1
def monte_carlo_export(self, nonterminal, filename, samplesscalar=1, ): """ returns a tab seperated value list of productions, duplicates removed. one thing I need to change is to output the set of markup in a nicer fashion """ expansion = collections.Counter(sorted(self.monte_carlo_expand(nonterminal, samplesscalar))) with open(filename, 'a') as csvfile: row_writer = csv.writer(csvfile, delimiter='\t', quotechar='|', quoting= csv.QUOTE_MINIMAL) prob_range = 0 for deriv in expansion: rng_interval = float(expansion[deriv]) / sum(expansion.values()) rng_max = prob_range + rng_interval temp_prob = [prob_range, rng_max] row_writer.writerow( [nonterminal, str(deriv.expansion), '^'.join(str(annotation) for annotation in list(deriv.markup)), [prob_range, rng_max]] ) prob_range += rng_interval
def exhaustively_and_nonprobabilistically_export(self, nonterminal, filename): """Append to a tab-separated file lines specifying each of the templated lines of dialogue that may be yielded by each of the possible terminal expansions of nonterminal. """ all_possible_expansions_of_this_symbol = ( self.exhaustively_and_nonprobabilistically_expand(nonterminal=nonterminal) ) with open(filename, 'a') as export_tsv_file: row_writer = csv.writer( export_tsv_file, delimiter='\t', quotechar='|', quoting=csv.QUOTE_MINIMAL ) for intermediate_derivation_object in all_possible_expansions_of_this_symbol: row_writer.writerow( [nonterminal, str(intermediate_derivation_object.expansion), '^'.join(str(annotation) for annotation in list(intermediate_derivation_object.markup)), ['N/A', 'N/A']] # We write 'N/A' here to indicate that we did not expand probabilistically )
def export_line_by_line(logger, csv_path): conn = PostgresDb('workshop', 'postgres', 'db', 'postgres') query = "SELECT data->'location' as location, \ data->>'uri' as uri, \ data->>'num_requests' as num_requests, \ data->>'bytes' as bytes, \ data->>'ip' as ip \ FROM events WHERE \ data->>'location'='MAD50' and data->>'num_requests'='1'" records = conn.execute(query) with open(csv_path, 'w') as f: writer = csv.writer(f, delimiter=';', quotechar='|', quoting=csv.QUOTE_MINIMAL) writer.writerow(['location', 'uri', 'num_requests', 'bytes', 'ip']) for idx, row in enumerate(records): writer.writerow(row) if idx % 1000 == 0: print('1000 inserted: {}'.format(idx)) logger.info('Done writing')
def cpuinfo(): men=psutil.virtual_memory() menab=(men.available)/(1024*1024) menta=(men.total)/(1024*1024) menus=(men.used)/(1024*1024) disk=psutil.disk_usage('/') diskta=(disk.total)/(1024*1024*1024) diskus=(disk.used)/(1024*1024*1024) diskfr=(disk.free)/(1024*1024*1024) time = datetime.datetime.now() with open('eggs.csv', 'a') as csvfile: spamwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) spamwriter.writerow({"??????%s"%time}) spamwriter.writerow({"cpu?????%d%s"%(psutil.cpu_percent(interval=1),"%")}) spamwriter.writerow({"???????%sMB" % int(menta)}) spamwriter.writerow({"?????????%sMB" % int(menus)}) spamwriter.writerow({"????????%sMB" % int(menab)}) spamwriter.writerow({"???????%sG" % int(diskta)}) spamwriter.writerow({"???????%sG" % int(diskus)}) spamwriter.writerow({"???????%sG" % int(diskfr)}) with open('eggs.csv', 'r') as csvfile: spamreader = csv.reader(csvfile, delimiter=' ', quotechar='|') for row in spamreader: print(row)
def _test_default_attrs(self, ctor, *args): obj = ctor(*args) # Check defaults self.assertEqual(obj.dialect.delimiter, ',') self.assertEqual(obj.dialect.doublequote, True) self.assertEqual(obj.dialect.escapechar, None) self.assertEqual(obj.dialect.lineterminator, "\r\n") self.assertEqual(obj.dialect.quotechar, '"') self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL) self.assertEqual(obj.dialect.skipinitialspace, False) self.assertEqual(obj.dialect.strict, False) # Try deleting or changing attributes (they are read-only) self.assertRaises((TypeError, AttributeError), delattr, obj.dialect, 'delimiter') self.assertRaises((TypeError, AttributeError), setattr, obj.dialect, 'delimiter', ':') self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting') # PyPy gets a TypeError instead of an AttributeError self.assertRaises((AttributeError, TypeError), setattr, obj.dialect, 'quoting', None)
def write_data_file(self, output_file_path): """Write our course info file information out output_file_path: path to file file""" with open(output_file_path, "w") as f_out: f_out.write("Assignments\n") for assign in self.assignments: f_out.write(assign.name+","+assign.internal_name+","+str(assign.id)) f_out.write("\n") f_out.write("Roster\n") csvwriter = csv.writer(f_out, quoting=csv.QUOTE_MINIMAL) for student in self.roster.students_no_errors: csvwriter.writerow([student.first_name,\ student.last_name,\ student.email,\ student.glid])
def callback(data): print("callback called...") global i global vc rospy.loginfo("I heard %s",data) with open('dataset.csv', 'a') as csvfile: spamwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) spamwriter.writerow([data.angular.z]) if vc.isOpened(): # try to get the first frame rval, frame = vc.read() else: rval = False rval, frame = vc.read() cv2.imwrite("images/image_" + str(i) + ".png", frame) i += 1
def dl_and_parse(s3,headers,keylist,bucket): for key in keylist: with TemporaryFile() as fp: try: s3.Bucket(bucket).download_fileobj(key,fp) except ClientError as e: logger.error('Unable to download s3://{}/{}'.format(bucket, key)) logger.debug('Received error: {}'.format(e)) sys.exit(5) fp.seek(0) with TextIOWrapper(GzipFile(fileobj=fp,mode='r')) as f: try: reader = csv.DictReader(f,fieldnames=headers,delimiter=',',quoting=csv.QUOTE_MINIMAL) for row in reader: yield row except csv.Error as e: logger.error("Unable to read CSV '{}'".format(reader.line)) logger.debug('Received error: {}'.format(e)) sys.exit(3) # main parser
def to_string(table): ''' Return table as a StringIO object for writing via copy() ''' string = StringIO() writer = csv.writer(string, delimiter=",", quoting=csv.QUOTE_MINIMAL) dict_encoder = json.JSONEncoder() jsonb_cols = set([i for i, j in enumerate(table.col_types) if j == 'jsonb']) datetime_cols = set([i for i, j in enumerate(table.col_types) if j == 'datetime']) for row in table: for i in jsonb_cols: row[i] = dict_encoder.encode(row[i]) for i in datetime_cols: row[i] = psycopg2.extensions.adapt(i) writer.writerow(row) string.seek(0) return string
def create_csv(csv_filename, header_list, sql_query_string): """Transfer records from database query to a new .csv file. Keyword arguments: csv_filename -- String with a '.csv' suffix header_list -- List of strings (as CSV header names) sql_query_string -- Passed to match header_list format """ with open(csv_filename, 'w', newline='\n') as csvfile: csv_writer = csv.writer(csvfile, quotechar="'", quoting=csv.QUOTE_MINIMAL) csv_writer.writerow(header_list) CUR.execute(sql_query_string) records = CUR.fetchall() CONN.commit() for entry in records: csv_writer.writerow(entry)
def write_csv_rows(rows, path): """ Write CSV rows in a standard format. :type rows: list[list[string]] :type path: string """ with open(path, 'w') as outfile: writer = csv.writer(outfile, delimiter=',', quoting=csv.QUOTE_MINIMAL) writer.writerows(rows)
def run_query(src, fn, rows=200): print('writing data for %s to %s' % (source, fn)) query['query']['function_score']['query']['bool']['must'][0]['term']['analysis.source'] = source emails = set() batches = 0 print(json.dumps(query)) total = None with open(fn, 'w', newline='') as outfile: writer = csv.writer(outfile, delimiter='\t', quoting=csv.QUOTE_MINIMAL) writer.writerow(['email', 'name', 'date', 'comment', 'url']) while len(emails) < rows and batches < 10: offset = batches * 100 if total and offset > total: break resp = es.search(index='fcc-comments', body=query, size=100, from_=offset) if batches == 0: total = resp['hits']['total'] print('\t%s matches' % (total)) else: print('\tbatch %s: have %s' % (batches+1, len(emails))) batches += 1 for doc in resp['hits']['hits']: if len(emails) == rows: break data = doc['_source'] if data['contact_email'] in emails: continue emails.add(data['contact_email']) writer.writerow([data['contact_email'], data['filers'][0]['name'], data['date_received'], data['text_data'], 'https://www.fcc.gov/ecfs/filing/%s' % doc['_id'] ])
def validate_to_csv(model_name: str, last_epoch: int, configuration: Configuration = Configuration.german(), step_count=10, first_epoch: int = 0, csv_directory: Path = configuration.default_data_directories.test_results_directory) -> List[ Tuple[int, ExpectationsVsPredictionsInGroupedBatches]]: step_size = (last_epoch - first_epoch) / (step_count - 1) epochs = distinct(list(int(first_epoch + index * step_size) for index in range(step_count))) log("Testing model {} on epochs {}.".format(model_name, epochs)) model = configuration.load_model(model_name, last_epoch, allowed_characters_for_loaded_model=configuration.allowed_characters, use_kenlm=True, language_model_name_extension="-incl-trans") def get_result(epoch: int) -> ExpectationsVsPredictionsInGroupedBatches: log("Testing epoch {}.".format(epoch)) model.load_weights( allowed_characters_for_loaded_model=configuration.allowed_characters, load_model_from_directory=configuration.directories.nets_base_directory / model_name, load_epoch=epoch) return configuration.test_model_grouped_by_loaded_corpus_name(model) results_with_epochs = [] csv_file = csv_directory / "{}.csv".format(model_name + "-incl-trans") import csv with csv_file.open('w', encoding='utf8') as opened_csv: writer = csv.writer(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) for epoch in epochs: result = get_result(epoch) writer.writerow((epoch, result.average_loss, result.average_letter_error_rate, result.average_word_error_rate, result.average_letter_error_count, result.average_word_error_count)) return results_with_epochs
def summarize_to_csv(self, summary_csv_file: Path) -> None: import csv with summary_csv_file.open('w', encoding='utf8') as csv_summary_file: writer = csv.writer(csv_summary_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) for row in self.csv_rows(): writer.writerow(row)
def save(self, corpus_csv_file: Path, use_relative_audio_file_paths: bool = True): import csv with corpus_csv_file.open('w', encoding='utf8') as opened_csv: writer = csv.writer(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) examples_and_phase = [(e, Phase.training) for e in self.training_examples] + \ [(e, Phase.test) for e in self.test_examples] for e, phase in examples_and_phase: writer.writerow( (e.id, str(e.audio_file.relative_to( corpus_csv_file.parent) if use_relative_audio_file_paths else e.audio_file), e.label, phase.value, e.positional_label.serialize() if e.positional_label else ""))
def csv_writer(filename, data, fields=None, delimiter=',', quoting=csv.QUOTE_MINIMAL): log.info('writing', filename) rows = 0 with open(filename, 'w') as f: if not fields: fields = data[0].keys() writer = csv.DictWriter(f, fieldnames=fields, delimiter=delimiter, quoting=quoting) writer.writeheader() for row in data: writer.writerow(row) rows += 1 log.info('wrote %d rows' % rows)
def load_csv_columns(filename, column_names=None, skip=0, delimiter=',', quoting=csv.QUOTE_MINIMAL): r = [] log.info('opening', filename) with open(filename, 'r') as f: data_file = csv_reader_converter(f, delimiter=delimiter, quoting=quoting) for i in range(skip): next(data_file) headers = next(data_file, None) # parse the headers columns = {} for (i, h) in enumerate(headers): h = h.strip() if (not column_names) or h in column_names: columns[i] = h log.info("headers", headers) log.info("columns", column_names) for line in data_file: d = {} if not line: continue for (column, index) in columns.items(): if column_names: rename = column_names[index] else: rename = headers[column] value = line[column].strip() d[rename] = value r.append(d) log.info('read %d lines' % len(r)) return r
def __init__(self, file, include_default_phase: bool = True): writer = csv.writer(file, delimiter='\t', quoting=csv.QUOTE_MINIMAL) writer.writerow(['name', 'phase', 'slope', 'intercept', 'N0', 'SNR', 'rank']) self._writer = writer self._file = file self._include_default_phase = include_default_phase
def run(): note_files = ['promrep/scripts/data/MRR3ShortNotesV2.csv', 'promrep/scripts/data/MRR3LongNotesV2.csv'] source = SecondarySource.objects.get(abbrev_name='Broughton MRR3') for ifname in note_files: print 'Will read notes from file', ifname, '\n\n' ifile = codecs.open(ifname, 'r', encoding='latin1') log_fname = os.path.splitext(os.path.basename(ifname))[0] + '.log' with open(log_fname, 'wb') as log_file: spamwriter = csv.writer( log_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) spamwriter.writerow(('id', 'note')) for i, line in enumerate(ifile): note_text = line.encode('utf-8').replace("**", ";").strip('"') print str(i) + ":", note_text note = PostAssertionNote.objects.create( text=note_text, secondary_source=source) spamwriter.writerow((note.id, note_text[0:20]))
def handle(self, *args, **options): tmpdir = tempfile.mkdtemp() try: queryset = AnalysisJob.objects.all().filter(status=AnalysisJob.Status.COMPLETE) filter_set = AnalysisJobFilterSet() queryset = filter_set.filter_latest(queryset, 'latest', True) tmp_csv_filename = os.path.join(tmpdir, 'results.csv') with open(tmp_csv_filename, 'w') as csv_file: writer = None fieldnames = [] for job in queryset: row_data = {} for export in EXPORTS: columns, values = export(job) if writer is None: fieldnames = fieldnames + columns for column, value in zip(columns, values): row_data[column] = value if writer is None: writer = csv.DictWriter(csv_file, fieldnames=fieldnames, dialect=csv.excel, quoting=csv.QUOTE_MINIMAL) writer.writeheader() writer.writerow(row_data) s3_client = boto3.client('s3') now = datetime.utcnow() s3_key = 'analysis-spreadsheets/results-{}.csv'.format(now.strftime('%Y-%m-%dT%H%M')) s3_client.upload_file(tmp_csv_filename, settings.AWS_STORAGE_BUCKET_NAME, s3_key) logger.info('File uploaded to: s3://{}/{}' .format(settings.AWS_STORAGE_BUCKET_NAME, s3_key)) finally: shutil.rmtree(tmpdir)
def add_row_to_csv(directory, filename, row, columns_to_skip): row = ['' for i in range(columns_to_skip)] + row with open(directory + '/' + filename + '.csv', 'a') as csvfile: spamwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL) spamwriter.writerow(row) csvfile.close()
def __init__(self, *args, **kwargs): super(RankerRelevanceFileQueryStream, self).__init__(*args, **kwargs) dialect = csv.excel # The following explicit assignments shadow the dialect defaults # but are necessary to avoid strange behavior while called by # certain unit tests. Please do not delete. dialect.doublequote = True dialect.quoting = csv.QUOTE_MINIMAL dialect.skipinitialspace = True self.__reader__ = csv.reader(self.query_file, dialect=dialect)