我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用csv.field_size_limit()。
def test_read_bigfield(self): # This exercises the buffer realloc functionality and field size # limits. limit = csv.field_size_limit() try: size = 50000 bigstring = 'X' * size bigline = '%s,%s' % (bigstring, bigstring) self._read_test([bigline], [[bigstring, bigstring]]) csv.field_size_limit(size) self._read_test([bigline], [[bigstring, bigstring]]) self.assertEqual(csv.field_size_limit(), size) csv.field_size_limit(size-1) self.assertRaises(csv.Error, self._read_test, [bigline], []) self.assertRaises(TypeError, csv.field_size_limit, None) self.assertRaises(TypeError, csv.field_size_limit, 1, None) finally: csv.field_size_limit(limit)
def Records(self): """Reads the CSV data file and generates row records. Yields: Lists of strings Raises: ResumeError: If the progress database and data file indicate a different number of rows. """ csv_file = self.openfile(self.csv_filename, 'rb') reader = self.create_csv_reader(csv_file, skipinitialspace=True) try: for record in reader: yield record except csv.Error, e: if e.args and e.args[0].startswith('field larger than field limit'): raise FieldSizeLimitError(csv.field_size_limit()) else: raise
def main(filename): """ Will split the big input file of 1000 user into 1000 files of 1 user """ with open(filename) as origfile: dir = os.path.dirname(filename) csv_reader = csv.reader(origfile, delimiter='\t') # Fixes a bug: # http://stackoverflow.com/questions/15063936/csv-error-field-larger-than-field-limit-131072 csv.field_size_limit(sys.maxsize) lastuser = None for row in csv_reader: if lastuser != row[0]: print(row[0]) lastuser = row[0] with open(os.path.join(dir, "split", lastuser + ".tsv"), "a") as f: f.write("{}\n".format("\t".join(row)))
def getEdges(docTypes): import csv csv.field_size_limit(2147483647) for docType in docTypes: print(docType) with open("../output/edgelists/{}-edgelist.csv".format(docType.lower()), "r") as csvfile: datareader = csv.reader(csvfile) count = 0 for row in datareader: if row[9].lower() in docTypes: yield (row[0], row[2]) count += 1 elif count < 2: continue else: return
def read_file(self, filename): # self.extractor.debug = True csv.field_size_limit(sys.maxsize) with open(filename, 'r') as csvfile: first = True for row in csv.reader(csvfile, delimiter=',', quotechar='"'): if first: first = False else: post = {'id': row[0], 'url': row[1], 'web_entity_id': row[2], 'web_entity': row[3], 'text': row[4]} self.process_post(post) print('main edges created: %s' % self.main_edges) print('extra edges created: %s' % self.extra_edges) print('ignored edges: %s' % self.ignored)
def _get_city_db(): csv.field_size_limit(sys.maxsize) cities_file = os.path.join(os.path.dirname(__file__), 'cities.txt') with open(cities_file, 'rt') as f: r = csv.reader(f, delimiter='\t') city_db = list(r) return city_db
def run() : if len(sys.argv) != 3 : # Exception handling on starting program print('Usage: "shaman-trainer <code_bunch.csv> <result.json>"') sys.exit(-1) # Args codebunch_file = sys.argv[1] result_file = sys.argv[2] if not os.path.isfile(codebunch_file) : # Exception handling of <code bunch> file print('"%s" is not a file' % codebunch_file) sys.exit(-1) # Read CSV file csv.field_size_limit(sys.maxsize) # Set CSV limit to sys.maxsize filedata = [] print('Load CSV file') with open(codebunch_file) as csvfile : reader = csv.reader(csvfile, delimiter=',') for row in reader : filedata.append(row) # Fetch keyword data trained_data = {} trained_data['keywords'] = fetch_keywords(filedata) trained_data['patterns'] = match_patterns(filedata) # Save result with open(result_file, 'w') as file : file.write( json.dumps(trained_data) ) print('Trained result is saved at "%s"' % result_file)
def main(dataset = 'proton-beam-xml'): csv.field_size_limit(430000) global mat, rel, turk_dic if dataset == 'proton-beam-xml': pub_dic_tmp = get_pub_dic_xml() # pub_dic_items are already sorted by key [rec_nums, texts] = zip(*pub_dic.items()) rel = get_relevant() else: pub_dic_tmp = get_pub_dic_csv(dataset) #[rec_nums, texts] = zip(*pub_dic.items()) (turk_dic_tmp, rel_dic_tmp) = get_turk_data(dataset) texts = [] pub_dic = {}; turk_dic = {}; rel_dic = {} for i in sorted(pub_dic_tmp.keys()): if pub_dic_tmp.has_key(i) and turk_dic_tmp.has_key(i) and rel_dic_tmp.has_key(i): texts.append(pub_dic_tmp[i]) pub_dic[i] = pub_dic_tmp[i] turk_dic[i] = turk_dic_tmp[i] rel_dic[i] = rel_dic_tmp[i] #else: # if pub_dic.has_key(i): pub_dic.pop(i) # if turk_dic.has_key(i): turk_dic.pop(i) # if rel_dic.has_key(i): rel_dic.pop(i) (_,rel) = zip(*sorted(rel_dic.items())) rel = map(int, rel) vectorizer = TfidfVectorizer() #save_texts = texts mat = vectorizer.fit_transform(texts) return (pub_dic, texts)
def parse_csv(self, doc, delim=','): """ Csv reader ===== Function to read in a csv file Parameters ----- doc : str The name of the csv file Returns ----- lines : list of lists Each list corresponds to the cell values of a row """ csv.field_size_limit(sys.maxsize) try: lines = [] with open(doc, 'r', encoding = 'utf-8') as csvfile: csv_reader = csv.reader(csvfile, delimiter = delim) for line in csv_reader: lines.append(line) except: lines = [] csvfile = open(doc, 'r', encoding = 'utf-8') csv_reader = csv.reader(line.replace('\0','') for line in csvfile.readlines()) for line in csv_reader: lines.append(line) return lines
def __init__(self, test_daemon, netem_master, csv_file_path, algorithm): self.test_daemon = test_daemon # csv.field_size_limit(500 * 1024 * 1024) self.csv_writer = csv.writer(open(csv_file_path, 'w', newline='')) self.netem_master = netem_master self.algorithm = algorithm # calls run function on test_daemon and saves results to csv
def Load(self, kind, data): """Parses CSV data, uses a Loader to convert to entities, and stores them. On error, fails fast. Returns a "bad request" HTTP response code and includes the traceback in the output. Args: kind: a string containing the entity kind that this loader handles data: a string containing the CSV data to load Returns: tuple (response code, output) where: response code: integer HTTP response code to return output: string containing the HTTP response body """ data = data.encode('utf-8') Validate(kind, basestring) Validate(data, basestring) output = [] try: loader = Loader.RegisteredLoaders()[kind] except KeyError: output.append('Error: no Loader defined for kind %s.' % kind) return (httplib.BAD_REQUEST, ''.join(output)) buffer = StringIO.StringIO(data) reader = csv.reader(buffer, skipinitialspace=True) try: csv.field_size_limit(800000) except AttributeError: pass return self.LoadEntities(self.IterRows(reader), loader)
def __init__(self, limit): self.message = """ A field in your CSV input file has exceeded the current limit of %d. You can raise this limit by adding the following lines to your config file: import csv csv.field_size_limit(new_limit) where new_limit is number larger than the size in bytes of the largest field in your CSV. """ % limit Error.__init__(self, self.message)
def __set_max_csv_length(self): maxInt = sys.maxsize decrement = True while decrement: # decrease the maxInt value by factor 10 # as long as the OverflowError occurs. decrement = False try: csv.field_size_limit(maxInt) except OverflowError: maxInt = int(maxInt/10) decrement = True
def all_fb_data(combined_ids, filename='local_data/FacebookCachedObjectEvent.csv'): csv.field_size_limit(1000000000) for row in csv.reader(open(filename)): source_id, row_id, row_type = row[0].split('.') if source_id == "701004" and row_type == 'OBJ_EVENT' and (not combined_ids or row_id in combined_ids): fb_event = json.loads(row[1]) if fb_event and not fb_event.get('deleted') and not fb_event.get('empty') and fb_events.is_public(fb_event): yield row_id, fb_event
def set_csv_field_size(): maxInt = sys.maxsize decrement = True while decrement: decrement = False try: csv.field_size_limit(maxInt) except OverflowError: maxInt = int(maxInt / 10) decrement = True return maxInt
def ContentGenerator(csv_file, batch_size, create_csv_reader=csv.reader, create_csv_writer=csv.writer): """Retrieves CSV data up to a batch size at a time. Args: csv_file: A file-like object for reading CSV data. batch_size: Maximum number of CSV rows to yield on each iteration. create_csv_reader, create_csv_writer: Used for dependency injection. Yields: Tuple (entity_count, csv_content) where: entity_count: Number of entities contained in the csv_content. Will be less than or equal to the batch_size and greater than 0. csv_content: String containing the CSV content containing the next entity_count entities. """ try: csv.field_size_limit(800000) except AttributeError: pass reader = create_csv_reader(csv_file, skipinitialspace=True) exhausted = False while not exhausted: rows_written = 0 content = StringIO.StringIO() writer = create_csv_writer(content) try: for i in xrange(batch_size): row = reader.next() writer.writerow(row) rows_written += 1 except StopIteration: exhausted = True if rows_written > 0: yield rows_written, content.getvalue()
def init_csv_reader(): # Hack csv_max = sys.maxsize overflow = True while overflow: overflow = False try: csv.field_size_limit(csv_max) except OverflowError: overflow = True csv_max = int(csv_max/16)
def __iter__(self): """Iterate over all of the lines in the file""" import csv try: # For: _csv.Error: field larger than field limit (131072) if os.name == 'nt': # Using sys.maxsize throws an Overflow error on Windows 64-bit platforms since internal # representation of 'int'/'long' on Win64 is only 32-bit wide. Ideally limit on Win64 # should not exceed ((2**31)-1) as long as internal representation uses 'int' and/or 'long' csv.field_size_limit((2**31)-1) else: csv.field_size_limit(sys.maxsize) except OverflowError as e: # skip setting the limit for now pass self.start() try: # Python 3.6 considers None to mean 'utf8', but Python 3.5 considers it to be 'ascii' encoding = self.url.encoding or 'utf8' with open(self.url.path, encoding=encoding) as f: yield from csv.reader(f, delimiter=self.delimiter) except UnicodeError as e: raise self.finish()
def really_big_fields_enabled(self): old_limit = csv.field_size_limit() csv.field_size_limit(2 ** 28) yield csv.field_size_limit(old_limit)
def test_with_bunch(filename) : """ Test shaman with code bunch and show statistics """ if not os.path.exists(filename) : print('File not exists: ' + filename) sys.exit(-1) # Read CSV file print('Load CSV file') csv.field_size_limit(sys.maxsize) # Set CSV limit to sys.maxsize filedata = [] with open(filename) as csvfile : reader = csv.reader(csvfile, delimiter=',') for row in reader : filedata.append(row) detector = shaman.Shaman.default() correct = 0 totals = len(filedata) results = {} print('Start testing') for index, (language, code) in enumerate(filedata) : print ('Testing %s/%s ' % (index, len(filedata)), end="\r") if language not in shaman.SUPPORTING_LANGUAGES: totals -= 1 continue try : glang = detector.detect( code )[0][0] except IndexError : glang = None if language not in results : results[ language ] = [0, 0, 0] if glang == language : correct += 1 results[ language ][0] += 1 results[ language ][1] += 1 results[ language ][2] = results[ language ][0] / results[ language ][1] print("------------------------------------------------") print("Accuracy: %.2lf%% (Correct: %d / Valid Data: %d)" % (correct/totals*100, correct, totals)) print("------------------------------------------------") results = sorted(results.items(), key=lambda x: x[1][0], reverse=True) for lang, l in results : print("%s: %.2lf%% (%s/%s)" % (lang, l[2] * 100, l[0], l[1]))