我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gzip.open()。
def uploadFile(current_user): format = "%Y-%m-%dT%H:%M:%S" now = datetime.datetime.utcnow().strftime(format) try: file = request.files['file'] except: file = None try: url = request.form['url'] except: url = None if file and allowed_file(file.filename): filename = now + '_' +str(current_user) + '_' + file.filename filename = secure_filename(filename) file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) file_uploaded = True elif url: file = urllib.urlopen(url) filename = url.split('/')[-1] filename = now + '_' +str(current_user) + '_' + filename filename = secure_filename(filename) if file and allowed_file(filename): open(os.path.join(app.config['UPLOAD_FOLDER'], filename), 'wb').write(file.read()) file_uploaded = True else: filename = None file_uploaded = False return file_uploaded, filename
def extract_images(filename): """Extract the images into a 4D uint8 numpy array [index, y, x, depth].""" print('Extracting', filename) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2051: raise ValueError( 'Invalid magic number %d in MNIST image file: %s' % (magic, filename)) num_images = _read32(bytestream) rows = _read32(bytestream) cols = _read32(bytestream) buf = bytestream.read(rows * cols * num_images) data = numpy.frombuffer(buf, dtype=numpy.uint8) data = data.reshape(num_images, rows, cols, 1) return data
def lcdict_to_pickle(lcdict, outfile=None): '''This just writes the lcdict to a pickle. If outfile is None, then will try to get the name from the lcdict['objectid'] and write to <objectid>-hptxtlc.pkl. If that fails, will write to a file named hptxtlc.pkl'. ''' if not outfile and lcdict['objectid']: outfile = '%s-hplc.pkl' % lcdict['objectid'] elif not outfile and not lcdict['objectid']: outfile = 'hplc.pkl' with open(outfile,'wb') as outfd: pickle.dump(lcdict, outfd, protocol=pickle.HIGHEST_PROTOCOL) if os.path.exists(outfile): LOGINFO('lcdict for object: %s -> %s OK' % (lcdict['objectid'], outfile)) return outfile else: LOGERROR('could not make a pickle for this lcdict!') return None
def setUp(self): tempFile = tempfile.NamedTemporaryFile() self.fileServerDir = tempFile.name tempFile.close() os.mkdir(self.fileServerDir) os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'), encoding='base64') signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)), encoding='base64').decode() VERSIONS['signature'] = signature keysFilePath = os.path.join(self.fileServerDir, 'keys.gz') with gzip.open(keysFilePath, 'wb') as keysFile: keysFile.write(json.dumps(KEYS, sort_keys=True)) versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz') with gzip.open(versionsFilePath, 'wb') as versionsFile: versionsFile.write(json.dumps(VERSIONS, sort_keys=True)) os.environ['WXUPDATEDEMO_TESTING'] = 'True' from wxupdatedemo.config import CLIENT_CONFIG self.clientConfig = CLIENT_CONFIG self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
def setUp(self): tempFile = tempfile.NamedTemporaryFile() self.fileServerDir = tempFile.name tempFile.close() os.mkdir(self.fileServerDir) os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'), encoding='base64') signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)), encoding='base64').decode() VERSIONS['signature'] = signature keysFilePath = os.path.join(self.fileServerDir, 'keys.gz') with gzip.open(keysFilePath, 'wb') as keysFile: keysFile.write(json.dumps(KEYS, sort_keys=True)) versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz') with gzip.open(versionsFilePath, 'wb') as versionsFile: versionsFile.write(json.dumps(VERSIONS, sort_keys=True)) os.environ['WXUPDATEDEMO_TESTING'] = 'True' from wxupdatedemo.config import CLIENT_CONFIG self.clientConfig = CLIENT_CONFIG self.clientConfig.PUBLIC_KEY = PUBLIC_KEY self.clientConfig.APP_NAME = APP_NAME
def main(): args = get_args() logging.basicConfig( format='%(asctime)s %(message)s', filename=os.path.join(args.outdir, "NanoQC.log"), level=logging.INFO) logging.info("NanoQC started.") sizeRange = length_histogram( fqin=gzip.open(args.fastq, 'rt'), name=os.path.join(args.outdir, "SequenceLengthDistribution.png")) fq = get_bin(gzip.open(args.fastq, 'rt'), sizeRange) logging.info("Using {} reads for plotting".format(len(fq))) fqbin = [dat[0] for dat in fq] qualbin = [dat[1] for dat in fq] logging.info("Creating plots...") per_base_sequence_content_and_quality(fqbin, qualbin, args.outdir, args.format) logging.info("per base sequence content and quality completed.") logging.info("Finished!")
def __init__(self, source, source_dict, batch_size=128, maxlen=100, minlen=0, n_words_source=-1): if source.endswith('.gz'): self.source = gzip.open(source, 'r') else: self.source = open(source, 'r') self.source_dict = {'1': 1, '0': 0, 0: 3} self.batch_size = batch_size self.maxlen = maxlen self.minlen = 10 self.n_words_source = n_words_source self.end_of_data = False
def export(metadata, start, end, container_image_pattern): queries = [] metadata["start"] = start.isoformat() + "Z" metadata["end"] = end.isoformat() + "Z" metadata["services"] = [] ts = datetime.utcnow().strftime("%Y%m%d%H%M%S-") path = os.path.join(metadata["metrics_export"], ts + metadata["measurement_name"]) if not os.path.isdir(path): os.makedirs(path) for app in APPS: metadata["services"].append(dump_app(app, path, start, end, container_image_pattern)) with open(os.path.join(path, "metadata.json"), "w+") as f: json.dump(metadata, f, cls=Encoder, sort_keys=True, indent=4) f.flush()
def check_fastq(fastq): # Check if fastq is readable if not os.access(fastq, os.R_OK): martian.exit("Do not have file read permission for FASTQ file: %s" % fastq) # Check if fastq is gzipped is_gzip_fastq = True try: with gzip.open(fastq) as f: f.read(1) except: is_gzip_fastq = False if is_gzip_fastq and not fastq.endswith(cr_constants.GZIP_SUFFIX): martian.exit("Input FASTQ file is gzipped but filename does not have %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX)) if not is_gzip_fastq and fastq.endswith(cr_constants.GZIP_SUFFIX): martian.exit("Input FASTQ file is not gzipped but filename has %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX))
def get_run_data(fn): """ Parse flowcell + lane from the first FASTQ record. NOTE: we don't check whether there are multiple FC / lanes in this file. NOTE: taken from longranger/mro/stages/reads/setup_chunks """ if fn[-2:] == 'gz': reader = gzip.open(fn) else: reader = open(fn, 'r') gen = read_generator_fastq(reader) try: (name, seq, qual) = gen.next() (flowcell, lane) = re.split(':', name)[2:4] return (flowcell, lane) except StopIteration: # empty fastq raise ValueError('Could not extract flowcell and lane from FASTQ file. File is empty: %s' % fn)
def load_primary_contigs(reference_path): '''Load set of primary contigs for variant and SV calling from reference_path. If now primary_contigs.txt file is specified, return all contigs. If reference_path is a known 10x reference genome and has no primary_contigs.txt, filter the known bad contigs ''' if not reference_path is None and os.path.exists(get_primary_contigs(reference_path)): # If we have a primary_contigs.txt file, use it with open(get_primary_contigs(reference_path), 'r') as f: primary_contigs = set([line.strip() for line in f.readlines()]) else: # Default is to include all contigs # Otherwise implement the old contig filters ref = open_reference(reference_path) primary_contigs = set(ref.keys()) if is_tenx(reference_path): primary_contigs = set(chrom for chrom in primary_contigs if not ('random' in chrom or 'U' in chrom or 'hap' in chrom or chrom == 'hs37d5')) return primary_contigs
def load_fastq(filename): reads = [] if get_compression_type(filename) == 'gz': open_func = gzip.open else: # plain text open_func = open with open_func(filename, 'rb') as fastq: for line in fastq: stripped_line = line.strip() if len(stripped_line) == 0: continue if not stripped_line.startswith(b'@'): continue name = stripped_line[1:].split()[0] sequence = next(fastq).strip() _ = next(fastq) qualities = next(fastq).strip() reads.append((name, sequence, qualities)) return reads
def get_compression_type(filename): """ Attempts to guess the compression (if any) on a file using the first few bytes. http://stackoverflow.com/questions/13044562 """ magic_dict = {'gz': (b'\x1f', b'\x8b', b'\x08'), 'bz2': (b'\x42', b'\x5a', b'\x68'), 'zip': (b'\x50', b'\x4b', b'\x03', b'\x04')} max_len = max(len(x) for x in magic_dict) unknown_file = open(filename, 'rb') file_start = unknown_file.read(max_len) unknown_file.close() compression_type = 'plain' for file_type, magic_bytes in magic_dict.items(): if file_start.startswith(magic_bytes): compression_type = file_type if compression_type == 'bz2': sys.exit('Error: cannot use bzip2 format - use gzip instead') if compression_type == 'zip': sys.exit('Error: cannot use zip format - use gzip instead') return compression_type
def _make_writer(self): """ :return: """ self._buffer = StringIO() self._bytes_written = 0 now = datetime.now() self.fname = self.log_folder + '/' + now.strftime('%Y%m%d_%H%M%S_{}.json'.format(self.make_random(6))) self.fname = str(pathlib.Path(self.fname)) self._out_fh = open(self.fname, 'w') self.write_pid() logging.warning("Writing to {} ({} bytes)".format(self._out_fh.name, self.max_bytes)) # compress any old files still lying around for fname in glob(self.log_folder+"/*.json"): if fname != self.fname: self._compress(fname)
def test_save_svgz_filename(): import gzip qr = segno.make_qr('test') f = tempfile.NamedTemporaryFile('wb', suffix='.svgz', delete=False) f.close() qr.save(f.name) f = open(f.name, mode='rb') expected = b'\x1f\x8b\x08' # gzip magic number val = f.read(len(expected)) f.close() f = gzip.open(f.name) try: content = f.read(6) finally: f.close() os.unlink(f.name) assert expected == val assert b'<?xml ' == content
def QuASAR_rep_wrapper(outdir,parameters,samplename1,samplename2,running_mode): script_comparison_file=outdir+'/scripts/QuASAR-Rep/'+samplename1+'.vs.'+samplename2+'/'+samplename1+'.vs.'+samplename2+'.QuASAR-Rep.sh' subp.check_output(['bash','-c','mkdir -p '+os.path.dirname(script_comparison_file)]) script_comparison=open(script_comparison_file,'w') script_comparison.write("#!/bin/sh"+'\n') script_comparison.write('. '+bashrc_file+'\n') outpath=outdir+'/results/reproducibility/'+samplename1+'.vs.'+samplename2+'/QuASAR-Rep/'+samplename1+'.vs.'+samplename2+'.QuASAR-Rep.scores.txt' subp.check_output(['bash','-c','mkdir -p '+os.path.dirname(outpath)]) quasar_data=outdir+'/data/forQuASAR' quasar_transform1=quasar_data+'/'+samplename1+'.quasar_transform' quasar_transform2=quasar_data+'/'+samplename2+'.quasar_transform' script_comparison.write('${mypython} '+os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(os.path.realpath(__file__)))))+"/hifive/bin/find_quasar_replicate_score"+' '+quasar_transform1+' '+quasar_transform2+' '+outpath+'\n') script_comparison.write('${mypython} '+os.path.abspath(os.path.dirname(os.path.realpath(__file__)))+"/plot_quasar_scatter.py"+' '+quasar_transform1+' '+quasar_transform2+' '+outpath+'\n') #split the scores by chromosomes script_comparison.write('${mypython} '+os.path.abspath(os.path.dirname(os.path.realpath(__file__)))+"/quasar_split_by_chromosomes.py"+' '+outpath+'\n') script_comparison.close() run_script(script_comparison_file,running_mode)
def HiCSpector_wrapper(outdir,parameters,concise_analysis,samplename1,samplename2,chromo,running_mode,f1,f2,nodefile): script_comparison_file=outdir+'/scripts/HiC-spector/'+samplename1+'.'+samplename2+'/'+chromo+'.'+samplename1+'.'+samplename2+'.sh' subp.check_output(['bash','-c','mkdir -p '+os.path.dirname(script_comparison_file)]) script_comparison=open(script_comparison_file,'w') script_comparison.write("#!/bin/sh"+'\n') script_comparison.write('. '+bashrc_file+'\n') if os.path.isfile(f1) and os.path.getsize(f1)>20: if os.path.isfile(f2) and os.path.getsize(f2)>20: outpath=outdir+'/results/reproducibility/'+samplename1+'.vs.'+samplename2+'/HiC-Spector/'+chromo+'.'+samplename1+'.vs.'+samplename2+'.scores.txt' subp.check_output(['bash','-c','mkdir -p '+os.path.dirname(outpath)]) script_comparison.write("$mypython -W ignore "+os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))+"/reproducibility_analysis/hic-spector_wrapper.py --m1 "+f1+" --m2 "+f2+" --out "+outpath+".printout --node_file "+nodefile+" --num_evec "+parameters['HiC-Spector']['n']+"\n") script_comparison.write("cat "+outpath+".printout | tail -n1 | cut -f2 | awk '{print \""+samplename1+"\\t"+samplename2+"\\t\"$3}' > "+outpath+'\n') script_comparison.write("rm "+outpath+".printout"+'\n') script_comparison.close() run_script(script_comparison_file,running_mode)
def GenomeDISCO_wrapper(outdir,parameters,concise_analysis,samplename1,samplename2,chromo,running_mode,f1,f2,nodefile): script_comparison_file=outdir+'/scripts/GenomeDISCO/'+samplename1+'.'+samplename2+'/'+chromo+'.'+samplename1+'.'+samplename2+'.sh' subp.check_output(['bash','-c','mkdir -p '+os.path.dirname(script_comparison_file)]) script_comparison=open(script_comparison_file,'w') script_comparison.write("#!/bin/sh"+'\n') script_comparison.write('. '+bashrc_file+'\n') if os.path.isfile(f1) and os.path.getsize(f1)>20: if os.path.isfile(f2) and os.path.getsize(f2)>20: concise_analysis_text='' if concise_analysis: concise_analysis_text=' --concise_analysis' #get the sample that goes for subsampling subsampling=parameters['GenomeDISCO']['subsampling'] if parameters['GenomeDISCO']['subsampling']!='NA' and parameters['GenomeDISCO']['subsampling']!='lowest': subsampling_sample=parameters['GenomeDISCO']['subsampling'] subsampling=outdir+'/data/edges/'+subsampling_sample+'/'+subsampling_sample+'.'+chromo+'.gz' outpath=outdir+'/results/reproducibility/'+samplename1+'.vs.'+samplename2+'/GenomeDISCO/' subp.check_output(['bash','-c','mkdir -p '+outpath]) script_comparison.write("$mypython -W ignore "+os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))+"/genomedisco/compute_reproducibility.py")+" --m1 "+f1+" --m2 "+f2+" --m1name "+samplename1+" --m2name "+samplename2+" --node_file "+nodefile+" --outdir "+outpath+" --outpref "+chromo+" --m_subsample "+subsampling+" --approximation 10000000 --norm "+parameters['GenomeDISCO']['norm']+" --method RandomWalks "+" --tmin "+parameters['GenomeDISCO']['tmin']+" --tmax "+parameters['GenomeDISCO']['tmax']+concise_analysis_text+'\n') script_comparison.close() run_script(script_comparison_file,running_mode)
def construct_csr_matrix_from_data_and_nodes(f,nodes,blacklisted_nodes,remove_diag=True): print "GenomeDISCO | "+strftime("%c")+" | processing: Loading interaction data from "+f total_nodes=len(nodes.keys()) i=[] j=[] v=[] #print strftime("%c") c=0 for line in gzip.open(f): items=line.strip().split('\t') n1,n2,val=nodes[items[0]]['idx'],nodes[items[1]]['idx'],float(items[2]) i.append(n1) j.append(n2) v.append(val) c+=1 csr_m=csr_matrix( (v,(i,j)), shape=(total_nodes,total_nodes),dtype=float) if remove_diag: csr_m.setdiag(0) return filter_nodes(csr_m,blacklisted_nodes)
def dump_to_csv(self, output_csv, input_fields, write_header=True, top_level=False, mode='a', encoding='utf-8', compression=None): if compression == 'bz2': mode = binary_mode(mode) filehandle = bz2.open(output_csv, mode) elif compression == 'gzip': mode = binary_mode(mode) filehandle = gzip.open(output_csv, mode) else: filehandle = open(output_csv, mode) writer = csv.writer(filehandle) if write_header: writer.writerow(input_fields) tweet_parser = TweetParser() for tweet in self.get_iterator(): if top_level: ret = list(zip(input_fields, [tweet.get(field) for field in input_fields])) else: ret = tweet_parser.parse_columns_from_tweet(tweet,input_fields) ret_values = [col_val[1] for col_val in ret] writer.writerow(ret_values) filehandle.close()
def get_iterator(self): tweet_parser = TweetParser() if self.compression == 'bz2': self.mode = binary_mode(self.mode) csv_handle = bz2.open(self.filepath, self.mode, encoding=self.encoding) elif self.compression == 'gzip': self.mode = binary_mode(self.mode) csv_handle = gzip.open(self.filepath, self.mode, encoding=self.encoding) else: csv_handle = open(self.filepath, self.mode, encoding=self.encoding) for count, tweet in enumerate(csv.DictReader(csv_handle)): if self.limit < count+1 and self.limit != 0: csv_handle.close() return elif tweet_parser.tweet_passes_filter(self.filter, tweet) \ and tweet_parser.tweet_passes_custom_filter_list(self.custom_filters, tweet): if self.should_strip: yield tweet_parser.strip_tweet(self.keep_fields, tweet) else: yield dict(tweet) csv_handle.close()
def setup(self, config): """ Load name model (word list) and compile regexes for stop characters. :param config: Configuration object. :type config: ``dict`` """ reference_model = os.path.join( config[helper.CODE_ROOT], config[helper.NAME_MODEL]) self.stopper = regex.compile(('(%s)' % '|'.join([ 'and', 'or', 'og', 'eller', r'\?', '&', '<', '>', '@', ':', ';', '/', r'\(', r'\)', 'i', 'of', 'from', 'to', r'\n', '!'])), regex.I | regex.MULTILINE) self.semistop = regex.compile( ('(%s)' % '|'.join([','])), regex.I | regex.MULTILINE) self.size_probability = [0.000, 0.000, 0.435, 0.489, 0.472, 0.004, 0.000] self.threshold = 0.25 self.candidates = defaultdict(int) with gzip.open(reference_model, 'rb') as inp: self.model = json.loads(inp.read().decode('utf-8')) self.tokenizer = regex.compile(r'\w{2,20}')
def next_batch(self, batch_size): assert self.train_mode or self.validation_mode, "Please set mode, train, validation or test. e.g. DataLoad.train()" idx_next_batch = [(self.current_idx + i)%len(self.p_imgs) for i in range(self.batch_size)] patient_img_next_batch = [ self.p_imgs[idx] for idx in idx_next_batch] batch_image = [] batch_mask = [] for image in patient_img_next_batch: fi = gzip.open(self.data_path + image, 'rb') img = pickle.load(fi) img = np.expand_dims(img, axis=2) batch_image.append(img) fi.close() fm = gzip.open(self.mask_path + image, 'rb') mask = pickle.load(fm) fm.close() mask_binary_class = np.zeros([mask.shape[0],mask.shape[1],2]) mask_binary_class[:,:,0][mask == 0] = 1 mask_binary_class[:,:,1][mask == 1] = 1 batch_mask.append(mask_binary_class) self.current_idx = (self.current_idx + batch_size) % len(self.p_imgs) batched_image = np.stack(batch_image) batched_mask = np.stack(batch_mask) return batched_image, batched_mask
def read_fakelc(fakelcfile): ''' This just reads a pickled fake LC. ''' try: with open(lcfile,'rb') as infd: lcdict = pickle.load(infd) except UnicodeDecodeError: with open(lcfile,'rb') as infd: lcdict = pickle.load(infd, encoding='latin1') return lcdict ####################### ## UTILITY FUNCTIONS ## #######################
def read_pklc(lcfile): ''' This just reads a pickle. ''' try: with open(lcfile,'rb') as infd: lcdict = pickle.load(infd) except UnicodeDecodeError: with open(lcfile,'rb') as infd: lcdict = pickle.load(infd, encoding='latin1') return lcdict # these translate filter operators given as strings to Python operators
def writetoHTML(html_file, defaultInfo): html_handle = open(html_file, 'w') current_dir = os.path.dirname(__file__) with open(current_dir + '/lib/template.html') as report: for line in report: line = line.strip() print(line, file=html_handle) try: start_index = line.index("^^") stop_index = line.index("$$") if (line[start_index+2: stop_index] == 'defaultInfo'): print(defaultInfo, file=html_handle) else: file_path = current_dir + '/lib' + line[start_index+2: stop_index] with open(file_path) as fh: for subline in fh: subline = subline.strip() print(subline, file=html_handle) except ValueError: pass html_handle.close() print("HTML report successfully saved to " + html_file)
def get_targetids(filter_seq_ids, target_seq_ids): target_ids = univset() if filter_seq_ids: target_ids = univset() filter_ids = [] with open(filter_seq_ids) as fh: for line in fh: line = line.strip() line = line.lstrip('>') filter_ids.append(line) target_ids = target_ids - set(filter_ids) elif target_seq_ids: target_ids = [] with open(target_seq_ids) as fh: for line in fh: line = line.strip() line = line.lstrip('>') target_ids.append(line) target_ids = set(target_ids) return target_ids
def copy(self): "Copy the file to the local directory" fpi= open(self.filename, "rb") fpo_filename= os.path.join( self.destination, os.path.basename(self.filename)) try: fpo= open(fpo_filename, "r+b") except IOError, exc: if exc.errno == errno.ENOENT: fpo= open(fpo_filename, "wb") else: raise try: self.phase_copy(fpi, fpo, self.phase1, self.phase2) self.phase_copy(fpi, fpo, self.phase2, self.phase3) finally: self.record_state()
def load(batch_size, test_batch_size, n_labelled=None): filepath = '/tmp/mnist.pkl.gz' url = 'http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz' if not os.path.isfile(filepath): print "Couldn't find MNIST dataset in /tmp, downloading..." urllib.urlretrieve(url, filepath) with gzip.open('/tmp/mnist.pkl.gz', 'rb') as f: train_data, dev_data, test_data = pickle.load(f) return ( mnist_generator(train_data, batch_size, n_labelled), mnist_generator(dev_data, test_batch_size, n_labelled), mnist_generator(test_data, test_batch_size, n_labelled) )
def extract_images(filename): """Extract the images into a 4D uint8 numpy array [index, y, x, depth].""" log = logger.get() log.info('Extracting {}'.format(filename)) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2051: raise ValueError( 'Invalid magic number %d in MNIST image file: %s' % (magic, filename)) num_images = _read32(bytestream) rows = _read32(bytestream) cols = _read32(bytestream) buf = bytestream.read(rows * cols * num_images) data = numpy.frombuffer(buf, dtype=numpy.uint8) data = data.reshape(num_images, rows, cols, 1) return data
def extract_labels(filename, one_hot=False): """Extract the labels into a 1D uint8 numpy array [index].""" log = logger.get() log.info('Extracting {}'.format(filename)) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2049: raise ValueError( 'Invalid magic number %d in MNIST label file: %s' % (magic, filename)) num_items = _read32(bytestream) buf = bytestream.read(num_items) labels = numpy.frombuffer(buf, dtype=numpy.uint8) if one_hot: return dense_to_one_hot(labels) return labels
def download_annotated(request, individual_id): individual = get_object_or_404(Individual, pk=individual_id) filepath = os.path.dirname(str(individual.vcf_file.name)) filename = os.path.basename(str(individual.vcf_file.name)) # path = settings.MEDIA_ROOT # if filename.endswith('vcf.zip'): # basename = filename.split('.vcf.zip')[0] # else: basename = filename.split('.vcf')[0] fullpath = '%s/annotation.final.vcf.zip' % (filepath) vcffile = open(fullpath, 'rb') response = HttpResponse(vcffile, content_type='application/x-zip-compressed') # # response['Content-Encoding'] = 'gzip' response['Content-Disposition'] = 'attachment; filename=%s.annotated.mendelmd.vcf.zip' % basename response['Content-Length'] = os.path.getsize(fullpath) return response
def request_file(link, outfile, force_rerun_flag=False): """Download a file given a URL if the outfile does not exist already. Args: link (str): Link to download file. outfile (str): Path to output file, will make a new file if it does not exist. Will not download if it does exist, unless force_rerun_flag is True. force_rerun_flag (bool): Flag to force re-downloading of the file if it exists already. Returns: str: Path to downloaded file. """ if force_rerun(flag=force_rerun_flag, outfile=outfile): req = requests.get(link) if req.status_code == 200: with open(outfile, 'w') as f: f.write(req.text) log.debug('Loaded and saved {} to {}'.format(link, outfile)) else: log.error('{}: request error {}'.format(link, req.status_code)) return outfile
def write_torque_script(command, outfile, walltime, queue, name, out, err, print_exec=True): with open(outfile, 'w') as script: script.write('#PBS -l walltime={}\n'.format(walltime)) script.write('#PBS -q regular\n') script.write('#PBS -N {}\n'.format(name)) script.write('#PBS -o {}.o\n'.format(out)) script.write('#PBS -e {}.e\n'.format(err)) script.write('cd ${PBS_O_WORKDIR}\n') script.write(command) os.chmod(outfile, 0o755) if print_exec: print('qsub {};'.format(outfile)) return outfile
def save(self, filename): """ Save the state of this network to a pickle file on disk. :param filename: Save the parameters of this network to a pickle file at the named path. If this name ends in ".gz" then the output will automatically be gzipped; otherwise the output will be a "raw" pickle. :return: None """ state = dict([('class', self.__class__.__name__), ('network', self.__str__())]) for layer in self.layers: key = '{}-values'.format(layer.layerNum) state[key] = [p.get_value() for p in layer.params] opener = gzip.open if filename.lower().endswith('.gz') else open handle = opener(filename, 'wb') cPickle.dump(state, handle, -1) handle.close() print 'Saved model parameter to {}'.format(filename)
def append(self, text): try: from Naked.toolshed.system import file_exists if not file_exists(self.filepath): #confirm that file exists, if not raise IOError (assuming that developer expected existing file if using append) raise IOError("The file specified for the text append does not exist (Naked.toolshed.file.py:append).") with open(self.filepath, 'a') as appender: appender.write(text) except UnicodeEncodeError as ue: self.append_utf8(text) #try writing as utf-8 except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to append text to the file with the append() method (Naked.toolshed.file.py).") raise e #------------------------------------------------------------------------------ # [ append_utf8 method ] # Text writer that appends text to existing file with utf-8 encoding # Tests: test_IO.py :: test_file_utf8_readwrite_append #------------------------------------------------------------------------------
def append_utf8(self, text): try: from Naked.toolshed.system import file_exists if not file_exists(self.filepath): raise IOError("The file specified for the text append does not exist (Naked.toolshed.file.py:append_utf8).") import codecs import unicodedata norm_text = unicodedata.normalize('NFKD', text) # NKFD normalization of the unicode data before write with codecs.open(self.filepath, mode='a', encoding="utf_8") as appender: appender.write(norm_text) except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to append text to the file with the append_utf8 method (Naked.toolshed.file.py).") raise e #------------------------------------------------------------------------------ # [ gzip method (writer) ] # writes data to gzip compressed file # Note: adds .gz extension to filename if user did not specify it in the FileWriter class constructor # Note: uses compresslevel = 6 as default to balance speed and compression level (which in general is not significantly less than 9) # Tests: test_IO.py :: test_file_gzip_ascii_readwrite, test_file_gzip_utf8_readwrite, # test_file_gzip_utf8_readwrite_explicit_decode #------------------------------------------------------------------------------
def gzip(self, text, compression_level=6): try: import gzip if not self.filepath.endswith(".gz"): self.filepath = self.filepath + ".gz" with gzip.open(self.filepath, 'wb', compresslevel=compression_level) as gzip_writer: gzip_writer.write(text) except UnicodeEncodeError as ue: import unicodedata norm_text = unicodedata.normalize('NFKD', text) # NKFD normalization of the unicode data before write import codecs binary_data = codecs.encode(norm_text, "utf_8") with gzip.open(self.filepath, 'wb', compresslevel=compression_level) as gzip_writer: gzip_writer.write(binary_data) except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: unable to gzip compress the file with the gzip method (Naked.toolshed.file.py).") raise e #------------------------------------------------------------------------------ # [ write method ] # Universal text file writer that writes by system default or utf-8 encoded unicode if throws UnicdeEncodeError # Tests: test_IO.py :: test_file_ascii_readwrite, test_file_ascii_readwrite_missing_file, # test_file_utf8_write_raises_unicodeerror #------------------------------------------------------------------------------
def write(self, text): try: with open(self.filepath, 'wt') as writer: writer.write(text) except UnicodeEncodeError as ue: self.write_utf8(text) # attempt to write with utf-8 encoding except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to write to requested file with the write() method (Naked.toolshed.file.py).") raise e #------------------------------------------------------------------------------ # [ write_as method ] # text file writer that uses developer specified text encoding # Tests: test_IO.py :: test_file_utf8_readas_writeas #------------------------------------------------------------------------------
def write_as(self, text, the_encoding=""): try: if the_encoding == "": #if the developer did not include the encoding type, raise an exception raise RuntimeError("The text encoding was not specified as an argument to the write_as() method (Naked.toolshed.file.py:write_as).") import codecs with codecs.open(self.filepath, encoding=the_encoding, mode='w') as f: f.write(text) except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: unable to write file with the specified encoding using the write_as() method (Naked.toolshed.file.py).") raise e #------------------------------------------------------------------------------ # [ write_bin method ] # binary data file writer # Tests: test_IO.py :: test_file_bin_readwrite #------------------------------------------------------------------------------
def safe_write(self, text): import os.path if not os.path.exists(self.filepath): # if the file does not exist, then can write try: with open(self.filepath, 'wt') as writer: writer.write(text) return True except UnicodeEncodeError as ue: self.write_utf8(text) return True except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to write to requested file with the safe_write() method (Naked.toolshed.file.py).") raise e else: return False # if file exists, do not write and return False #------------------------------------------------------------------------------ # [ safe_write_bin method ] # Binary data file writer that will NOT overwrite existing file at the requested filepath # returns boolean indicator for success of write based upon test for existence of file (False = write failed because file exists) #------------------------------------------------------------------------------
def safe_write_bin(self, file_data): try: import os.path if not os.path.exists(self.filepath): with open(self.filepath, 'wb') as writer: writer.write(file_data) return True else: return False except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to write to requested file with the safe_write_bin() method (Naked.toolshed.file.py).") raise e #------------------------------------------------------------------------------ # [ write_utf8 method ] # Text file writer with explicit UTF-8 text encoding # uses filepath from class constructor # requires text to passed as a method parameter # Tests: test_IO.py :: test_file_utf8_readwrite, test_file_utf8_readwrite_raises_unicodeerror #------------------------------------------------------------------------------
def write_utf8(self, text): try: import codecs f = codecs.open(self.filepath, encoding='utf_8', mode='w') except IOError as ioe: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to open file for write with the write_utf8() method (Naked.toolshed.file.py).") raise ioe try: import unicodedata norm_text = unicodedata.normalize('NFKD', text) # NKFD normalization of the unicode data before write f.write(norm_text) except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to write UTF-8 encoded text to file with the write_utf8() method (Naked.toolshed.file.py).") raise e finally: f.close() #------------------------------------------------------------------------------ # [ FileReader class ] # reads data from local files # filename assigned in constructor (inherited from IO class interface) #------------------------------------------------------------------------------
def read_bin(self): try: with open(self.filepath, 'rb') as bin_reader: data = bin_reader.read() return data except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to read the binary data from the file with the read_bin method (Naked.toolshed.file.py).") raise e #------------------------------------------------------------------------------ # [ read_as method ] (string with developer specified text encoding) # Text file reader with developer specified text encoding # returns file contents in developer specified text encoding # Tests: test_IO.py :: test_file_utf8_readas_writeas, test_file_readas_missing_file #------------------------------------------------------------------------------
def read_as(self, the_encoding): try: if the_encoding == "": raise RuntimeError("The text file encoding was not specified as an argument to the read_as method (Naked.toolshed.file.py:read_as).") import codecs with codecs.open(self.filepath, encoding=the_encoding, mode='r') as f: data = f.read() return data except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to read the file with the developer specified text encoding with the read_as method (Naked.toolshed.file.py).") raise e #------------------------------------------------------------------------------ # [ readlines method ] (list of strings) # Read text from file line by line, uses utf8 encoding by default # returns list of utf8 encoded file lines as strings # Tests: test_IO.py :: test_file_readlines, test_file_readlines_missing_file #------------------------------------------------------------------------------
def readlines_utf8(self): try: import codecs with codecs.open(self.filepath, encoding='utf-8', mode='r') as uni_reader: modified_text_list = [] for line in uni_reader: import unicodedata norm_line = unicodedata.normalize('NFKD', line) # NKFD normalization of the unicode data before use modified_text_list.append(norm_line) return modified_text_list except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: unable to read lines in the unicode file with the readlines_utf8 method (Naked.toolshed.file.py)") raise e #------------------------------------------------------------------------------ # [ read_gzip ] (byte string) # reads data from a gzip compressed file # returns the decompressed binary data from the file # Note: if decompressing unicode file, set encoding="utf-8" # Tests: test_IO.py :: test_file_gzip_ascii_readwrite, test_file_gzip_utf8_readwrite, # test_file_read_gzip_missing_file #------------------------------------------------------------------------------
def read_utf8(self): try: import codecs f = codecs.open(self.filepath, encoding='utf_8', mode='r') except IOError as ioe: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to open file for read with read_utf8() method (Naked.toolshed.file.py).") raise ioe try: textstring = f.read() import unicodedata norm_text = unicodedata.normalize('NFKD', textstring) # NKFD normalization of the unicode data before returns return norm_text except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to read the file with UTF-8 encoding using the read_utf8() method (Naked.toolshed.file.py).") raise e finally: f.close()