我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用utils.read_file()。
def write_keywords(topK): data = utils.read_file(source_file) dic = get_keywords_year(data) with open(keywords_year_file, 'w', newline='') as csvfile: writer = csv.writer(csvfile) for year in year_range: row = [ year ] count = 0 for k, v in dic[year]: row.append(k + ':' + str(v)) count = count + 1 if count == topK: break writer.writerow(row)
def batch_url_extractor(input_path, output_path): last_id = False if os.path.isfile(output_path): last_id = get_last_written_id(output_path) f = read_file(input_path) for line_count, link in enumerate(f): user_id = link[0].strip() if last_id == user_id: last_id = False break if last_id is False: processes = Parallel(n_jobs=4)( delayed(get_twitter_account_state)(user_id) for user_id in f) processes = [x for x in processes if x is not None] # if line_count % 10000 == 0: append_list_to_csv(output_path, processes) # write_to_file(output_path, two_dimensional_list_to_string(result))
def __init__(self, config_path): contents = utils.read_file(config_path) self.config = utils.parse_yaml(contents) aws_config = self.config.get('aws', {}) region = os.environ.get('REGION') or aws_config.get('region') aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') subnet_ids = aws_config.get('subnet_ids') or [] security_group_ids = aws_config.get('security_group_ids') or [] role_name = os.environ.get('LAMBDA_EXECUTION_ROLE_NAME') or aws_config.get('lambda_execution_role_name') general_config = self.config.get('general', {}) timeout_time = int(os.environ.get('LAMBDA_TIMEOUT_TIME') or general_config.get('lambda_timeout_time') or 10) log.debug('region=%s, role_name=%s' % (region, role_name)) log.debug('timeout_time=%s' % timeout_time) self.awslambda = self.setup_lambda(region, role_name, timeout_time, aws_access_key_id, aws_secret_access_key, subnet_ids=subnet_ids, security_group_ids=security_group_ids) self.kinesis = self.setup_kinesis(region, aws_access_key_id, aws_secret_access_key) self.cwlogs = self.setup_cloud_watch_logs(region, aws_access_key_id, aws_secret_access_key)
def setUp(self): raw = read_file('data/test_rec.xml') self.rec = WosRecord(raw)
def write_authors(): data = utils.read_file(source_file) dic = get_authors(data) for k in dic: print(k, dic[k])
def test_speg_parser_valid(grammar_file): content = read_file(grammar_file) speg = sp.SimplePegParser() ast = speg.parse(content) last_error = speg.get_last_error() if last_error: print(last_error) assert ast
def test_speg_parser_invalid(grammar_file): content = read_file(grammar_file) speg = sp.SimplePegParser() ast = speg.parse(content) assert not ast
def read_csv(file_path): return [line.strip().split(",") for line in read_file(file_path)]
def get_last_written_id(file_path): for line in read_file(file_path): pass return line