我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.reader()。
def harvest(self, limit=None, offset=None): """ Harvest the data from the file :param offset: Integer offset for the row - starts from 0 :param limit: Interger limit of the rows to iterate over - starts from 0 :return: list of tuples containing CAS number and IUPAC name """ response = [] for i, row in enumerate(list(self.reader)[offset:]): if limit: if i == limit: break cas = row[0].split(' ', 1)[0] cut_start_iupac = str(row[0].split('(', 1)[1]) iupac = cut_start_iupac.rsplit(')', 1)[0] response.append({ "CAS": cas, "IUPAC": iupac }) return response
def add_afsc(dict, fname): """ Add AFSCs from given filename into the dictionary. :param dict: empty dictionary :param fname: CSV file using '#' as delimiter """ with open(CSV_FOLDER + fname, newline='') as f: reader = csv.reader(f, delimiter='#') for row in reader: base_afsc = row[0] job_title = row[1] afsc_dict = {"base_afsc": base_afsc, "job_title": job_title, "shreds": {}, "link": ""} dict[base_afsc] = afsc_dict
def add_shreds(afsc_dict, fname): """ Add shreds from given filename into the dictionary. :param dict: either enlisted_dict or officer_dict :param fname: CSV file using '#' as delimiter """ with open(CSV_FOLDER + fname, newline='') as f: reader = csv.reader(f, delimiter=',') for row in reader: base_afsc = row[0] shred_char = row[1] shred_title = row[2] # if shred AFSC not in base afsc, skip it try: afsc_dict[base_afsc]["shreds"][shred_char] = shred_title except KeyError: pass
def process_desired_state(server_state, desired_state): """ Processes the user provided input file and determines any actions that need to happen because the server state is different than the desired state :param server_state: The list of all users and their associated groups in the server :param desired_state: The input provided by the user :return: None """ group_id =[] with open(desired_state) as csvDataFile: csvReader = csv.reader(csvDataFile) next(csvReader, None) for email, lastname, firstname, groups in csvReader: group_list = [] if groups != "": group_list = groups.split(",") group_list = [group.strip(' ') for group in group_list] if email not in [user for user in server_state]: actions[email] = {'action': 'add', 'groups': group_list, 'user_id':'', 'user_data': {'firstname': firstname, 'lastname': lastname, 'email': email, 'reset_password_required': True} } else: group_names_server = server_state[email]['groups'][0::2] group_names_desired = groups.split(',') group_names_desired = [group.strip(' ') for group in group_names_desired] group_diff = [i for i in group_names_desired if i not in group_names_server] if group_diff != [] and group_diff != ['']: actions[email] = {'action': 'add to group', 'groups': group_diff, 'user_id': server_state[email]['user_id'] }
def extractRows(fileName): fileName = 'results/cvpr_db_results/'+fileName+'.csv' with open(fileName, 'r') as csvfile: lines = csv.reader(csvfile) for row in lines: if row[0] != "name": retval = [int(x) if x != "" else -100 for x in row[1:-2] ] else: nameRow = row[1:-2] if row[0] == "causalgrammar": causalRow = retval elif row[0] == "origdata": origRow = retval elif row[0] == "random": randomRow = retval return {"nameRow": nameRow, "causalRow": causalRow, "origRow": origRow, "randomRow": randomRow}
def _get_external_data(url): result = {} try: # urlopen might fail if it runs into redirections, # because of Python issue #13696. Fixed in locators # using a custom redirect handler. resp = urlopen(url) headers = resp.info() ct = headers.get('Content-Type') if not ct.startswith('application/json'): logger.debug('Unexpected response for JSON request: %s', ct) else: reader = codecs.getreader('utf-8')(resp) #data = reader.read().decode('utf-8') #result = json.loads(data) result = json.load(reader) except Exception as e: logger.exception('Failed to get external data for %s: %s', url, e) return result
def reader(self, stream, context): """ Read lines from a subprocess' output stream and either pass to a progress callable (if specified) or write progress information to sys.stderr. """ progress = self.progress verbose = self.verbose while True: s = stream.readline() if not s: break if progress is not None: progress(s, context) else: if not verbose: sys.stderr.write('.') else: sys.stderr.write(s.decode('utf-8')) sys.stderr.flush() stream.close()
def loadData(bfile, extractSim, phenoFile, missingPhenotype='-9', loadSNPs=False, standardize=True): bed = Bed(bfile) if (extractSim is not None): f = open(extractSim) csvReader = csv.reader(f) extractSnpsSet = set([]) for l in csvReader: extractSnpsSet.add(l[0]) f.close() keepSnpsInds = [i for i in xrange(bed.sid.shape[0]) if bed.sid[i] in extractSnpsSet] bed = bed[:, keepSnpsInds] phe = None if (phenoFile is not None): bed, phe = loadPheno(bed, phenoFile, missingPhenotype) if (loadSNPs): bed = bed.read() if (standardize): bed = bed.standardize() return bed, phe
def prepare_data(imagery_path, train_file_path, split_points): # Read tiff image image_tuple = read_image(imagery_path) # Read samples original_data_list = [] csv_reader = csv.reader(open(train_file_path, encoding='utf-8')) for row in csv_reader: original_data_list.append(row) original_data_array = np.array(original_data_list) # Split training data into variables and lables x_s = original_data_array[:,split_points[0]:split_points[1]] y_s = original_data_array[:,split_points[1]] return x_s, y_s, image_tuple # Read image
def read_csv_rows(path): """ Extract the rows from the CSV at the specified path. Will throw an error if the file doesn't exist. :type path: string :rtype: list[list[string]] """ with open(path, 'rU') as infile: reader = csv.reader(infile, delimiter=',') rows = [row for row in reader] # eliminate trailing cols that have no entries (CSI-215) for idx, row in enumerate(rows): clipIndex = 0 for col in row[::-1]: if not col: clipIndex -= 1 else: break if clipIndex < 0: rows[idx] = rows[idx][:clipIndex] return rows
def load_solar_data(): with open('solar label.csv', 'r') as csvfile: reader = csv.reader(csvfile) rows = [row for row in reader] labels = np.array(rows, dtype=int) print(shape(labels)) with open('solar.csv', 'r') as csvfile: reader = csv.reader(csvfile) rows = [row for row in reader] rows = np.array(rows, dtype=float) rows=rows[:104832,:] print(shape(rows)) trX = np.reshape(rows.T,(-1,576)) print(shape(trX)) m = np.ndarray.max(rows) print("maximum value of solar power", m) trY=np.tile(labels,(32,1)) trX=trX/m return trX,trY
def read_from_csv(filename, column_names): data = [] with open(filename) as csv_file: reader = csv.reader(csv_file) is_header_row = True for row in reader: if is_header_row: for col in row: data.append([]) is_header_row = False else: colnum = 0 for col in row: data[colnum].append(float(col)) colnum += 1 return data
def collect_moves(self, reader, name): Moves = namedtuple('Moves', ['pokemon', 'gen', 'color', 'moves', 'versions']) if name.split('-')[-1].isdigit(): for row in reader: if name == row[0]: pokemon = name.split('-')[0].title() generation, color = switcher[row[1]], int(ast.literal_eval(row[2])) moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4]) return Moves(pokemon, generation, color, moves, versions) else: for row in reader: if name in row[0]: pokemon = name.title() generation, color = switcher[row[1]], int(ast.literal_eval(row[2])) moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4]) return Moves(pokemon, generation, color, moves, versions)
def _records_protocol_v1(self, ifile): reader = csv.reader(ifile, dialect=CsvDialect) try: fieldnames = reader.next() except StopIteration: return mv_fieldnames = {name: name[len('__mv_'):] for name in fieldnames if name.startswith('__mv_')} if len(mv_fieldnames) == 0: for values in reader: yield OrderedDict(izip(fieldnames, values)) return for values in reader: record = OrderedDict() for fieldname, value in izip(fieldnames, values): if fieldname.startswith('__mv_'): if len(value) > 0: record[mv_fieldnames[fieldname]] = self._decode_list(value) elif fieldname not in record: record[fieldname] = value yield record
def main(): indexfilepath = r'C:\Users\Paul Bilokon\Documents\dev\alexandria\bilokon-msc\dissertation\code\winbugs\svl2\dataset-1\coda-index.txt' chainfilepath = r'C:\Users\Paul Bilokon\Documents\dev\alexandria\bilokon-msc\dissertation\code\winbugs\svl2\dataset-1\coda-for-chain-1.txt' index = readindexfile(indexfilepath) print(index) data = [] indexoffset = None with open(chainfilepath, 'rt') as chainfile: reader = csv.reader(chainfile, delimiter='\t') for row in reader: index, value = int(row[0]), float(row[1]) if not data: indexoffset = index print(index, indexoffset) assert index == len(data) + indexoffset data.append(value) print(data)
def _get_external_data(url): result = {} try: # urlopen might fail if it runs into redirections, # because of Python issue #13696. Fixed in locators # using a custom redirect handler. resp = urlopen(url) headers = resp.info() if headers.get('Content-Type') != 'application/json': logger.debug('Unexpected response for JSON request') else: reader = codecs.getreader('utf-8')(resp) #data = reader.read().decode('utf-8') #result = json.loads(data) result = json.load(reader) except Exception as e: logger.exception('Failed to get external data for %s: %s', url, e) return result
def loadRecord(line): """ ????csv?? """ input_line=StringIO.StringIO(line) #row=unicodecsv.reader(input_line, encoding="utf-8") #return row.next() #reader=csv.DictReader(input_line,fieldnames=["id","qid1","qid2","question1","question2","is_duplicate"]) reader=csv.reader(input_line) return reader.next() #data=[] #for row in reader: # print row # data.append([unicode(cell,"utf-8") for cell in row]) #return data[0] #return reader.next() #raw_data=sc.textFile(train_file_path).map(loadRecord) #print raw_data.take(10)
def __init__(self, bot): self.bot = bot # Add commands as random subcommands for name, command in inspect.getmembers(self): if isinstance(command, commands.Command) and command.parent is None and name != "random": self.bot.add_command(command) self.random.add_command(command) # Add fact subcommands as subcommands of corresponding commands for command, parent in ((self.fact_cat, self.cat), (self.fact_date, self.date), (self.fact_number, self.number)): utilities.add_as_subcommand(self, command, parent, "fact") # Add random subcommands as subcommands of corresponding commands self.random_subcommands = ((self.color, "Resources.color"), (self.giphy, "Resources.giphy"), (self.map, "Resources.map"), (self.streetview, "Resources.streetview"), (self.uesp, "Search.uesp"), (self.wikipedia, "Search.wikipedia"), (self.xkcd, "Resources.xkcd")) for command, parent_name in self.random_subcommands: utilities.add_as_subcommand(self, command, parent_name, "random") # Import jokes self.jokes = [] try: with open("data/jokes.csv", newline = "") as jokes_file: jokes_reader = csv.reader(jokes_file) for row in jokes_reader: self.jokes.append(row[0]) except FileNotFoundError: pass
def main(): print() csvnames = sys.argv[1:] columns_named=False scores = [] for csvname in sys.argv[1:] : print("Reading scores from "+csvname+" .") with open(csvname, 'r') as csvfile: score_reader = csv.reader(csvfile) for row in score_reader: print(', '.join(row)) if 'Winner' in row[0]: continue if 'Index' in row[0] and columns_named: continue else: scores.append(row) columns_named=True csvfile.close() winner = metarank(scores)
def read_turk_dic_proton(): """ dic of abs_id to answers to (wid, q3,q4) for each worker """ f = open("data/proton-beam-RawTurkResults.csv") first_line = f.readline() csv_reader = csv.reader(f) turk_dic = {} for row in csv_reader: (AssignmentId, WorkerId, HITId, AcceptTime, SubmitTime, ApprovalTime, TimeToComplete, AbstractId, Question1, Question2, Question3, Question4, Relevant) = tuple(row) AbstractId = int(AbstractId) if AbstractId not in turk_dic: turk_dic[AbstractId] = [] turk_dic[AbstractId].append( (Question3, Question4) ) return turk_dic
def get_pub_dic_csv(dataset): filename = "data/" + dataset + "-text.csv" f = open(filename) f.readline() csv_reader = csv.reader(f) # Create dic of : id -> text features pub_dic = {} for row in csv_reader: if dataset.startswith("RCT"): (abstract_id, abstract, title) = tuple(row)[0:3] else: (abstract_id, title, publisher, abstract) = tuple(row)[0:4] abstract_id = int(abstract_id) text = title + abstract pub_dic[abstract_id] = text return pub_dic
def load(corpus_csv_file: Path, sampled_training_example_count: Optional[int] = None) -> 'Corpus': import csv with corpus_csv_file.open(encoding='utf8') as opened_csv: reader = csv.reader(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) def to_absolute(audio_file_path: Path) -> Path: return audio_file_path if audio_file_path.is_absolute() else Path( corpus_csv_file.parent) / audio_file_path examples = [ ( LabeledExampleFromFile( audio_file=to_absolute(Path(audio_file_path)), id=id, label=label, positional_label=None if positional_label == "" else PositionalLabel.deserialize( positional_label)), Phase[phase]) for id, audio_file_path, label, phase, positional_label in reader] return Corpus(training_examples=[e for e, phase in examples if phase == Phase.training], test_examples=[e for e, phase in examples if phase == Phase.test], sampled_training_example_count=sampled_training_example_count)
def test_UnicodeWriter(self): """Test UnicodeWriter class works.""" tmp = tempfile.NamedTemporaryFile() uw = util.UnicodeWriter(tmp) fake_csv = ['one, two, three, {"i": 1}'] for row in csv.reader(fake_csv): # change it for a dict row[3] = dict(i=1) uw.writerow(row) tmp.seek(0) err_msg = "It should be the same CSV content" with open(tmp.name, 'rb') as f: reader = csv.reader(f) for row in reader: for item in row: assert item in fake_csv[0], err_msg
def uninstallation_paths(dist): """ Yield all the uninstallation paths for dist based on RECORD-without-.pyc Yield paths to all the files in RECORD. For each .py file in RECORD, add the .pyc in the same directory. UninstallPathSet.add() takes care of the __pycache__ .pyc. """ from pip.utils import FakeFile # circular import r = csv.reader(FakeFile(dist.get_metadata_lines('RECORD'))) for row in r: path = os.path.join(dist.location, row[0]) yield path if path.endswith('.py'): dn, fn = os.path.split(path) base = fn[:-3] path = os.path.join(dn, base + '.pyc') yield path
def _insert_zone_names(self): """ Args: _zonetree (str): set in __init__ Returns: List of dicts (str: str) devices by zone """ self._zone_name = None self._zonetree_io = StringIO(self._zonetree) self._zonetree_csv = csv.reader(self._zonetree_io, delimiter=',') self._zonetree_lod = [] for self._row in self._zonetree_csv: if self._row[0] == '1': self._zone_name = self._row[1] if self._zone_name == 'Undefined': self._zone_name = '' continue for self._dev in self._devtree: if self._dev['ds_id'] == self._row[2]: self._dev['zone_name'] = self._zone_name return self._devtree
def meraki_ap_fw_rules_import_csv(filename): fw_ap_import_csv_list = [] try: with open(filename) as csvfile: fw_ap_import_csv = csv.reader(csvfile, delimiter=',') key=0 for item in fw_ap_import_csv: if key != 0: print(item) fw_ap_import_csv_list.append(item) if len(item) < 5: key +=1 print("") sys.exit("- Error: Row %s isn't correct, exiting... -" % key) key +=1 return fw_ap_import_csv_list except FileNotFoundError: sys.exit("- Error: CSV file not found %s, exiting... -" % filename)
def readAdults(file_name): adult_file = open(file_name, 'r') adults_csv = csv.reader(adult_file, delimiter=',') # ignore the headers next(adults_csv, None) # create the dict we need adults = {} for adult_idx in adults_csv: adult = adults[int(adult_idx[0])] = {} adult['age'] = int(adult_idx[1]) adult['workclass'] = adult_idx[2] adult['native-country'] = adult_idx[3] adult['sex'] = adult_idx[4] adult['race'] = adult_idx[5] adult['marital-status'] = adult_idx[6] adult_file.close() return adults
def test_serial(): assert s.serialize_value(None) == 'x' assert s.serialize_value(True) == 'true' assert s.serialize_value(False) == 'false' assert s.serialize_value(5) == 'i:5' assert s.serialize_value(5.0) == 'f:5.0' assert s.serialize_value(decimal.Decimal('5.5')) == 'n:5.5' assert s.serialize_value('abc') == 's:abc' assert s.serialize_value(b'abc') == 'b:YWJj' assert s.serialize_value(b'abc') == 'b:YWJj' assert s.serialize_value(datetime.date(2007, 12, 5)) == 'd:2007-12-05' assert s.serialize_value(datetime.datetime(2007, 12, 5, 12, 30, 30, tzinfo=utc)) \ == 'dt:2007-12-05 12:30:30+00:00' assert s.serialize_value(datetime.time(12, 34, 56)) == 't:12:34:56' with raises(NotImplementedError): s.serialize_value(csv.reader)
def get_countries_from_csv(filepath): """ Process a csv countaining country data. Args: filepath (str): A path to a csv file. Returns: list: sanitized country names as strings. Raises: OSError: If file does not exist at the filepath. """ if not path.exists(filepath): raise OSError('Path to file: "{}" does not exist.'.format(filepath)) with open(filepath, 'r') as csv_file: reader = csv.reader(csv_file) return [sanitize_string(''.join(row)) for row in reader]
def build_list_of_people_from_csv(filename, countries): people = [] with open(filename, 'r') as opened_file: reader = csv.reader(opened_file) for row in reader: excluded_countries = [] if len(row) > 1: excluded_countries = [ sanitize_string(name) for name in row[1].split(',') ] people.append( Person( name=row[0], countries=countries, excluded_countries=excluded_countries ) ) return people