我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.path.getsize()。
def dirsize(src): """ Takes a source directory and returns the entire size of all of it's content(s) in bytes. The function returns None if the size can't be properly calculated. """ if not isdir(src): # Nothing to return return 0 try: with pushd(src, create_if_missing=False): size = sum(getsize(f) for f in listdir('.') if isfile(f)) except (OSError, IOError): return None # Return our total size return size
def open(self): """ Setup the internal structure. NB : Call this function before extracting data from a file. """ if self.file : self.file.close() try : self.file = open(self.path, 'rb') except Exception as e: raise Exception("python couldn't open file %s : %s" % (self.path, e)) self.file_size = path.getsize(self.file.name) self.creation_date = datetime.fromtimestamp(path.getctime(self.file.name)) self.modification_date = datetime.fromtimestamp(path.getmtime(self.file.name)) self.nomenclature = self.get_nomenclature() self.factory = self.get_factory() self.layout = self.create_layout()
def __mmap_ncs_packet_headers(self, filename): """ Memory map of the Neuralynx .ncs file optimized for extraction of data packet headers Reading standard dtype improves speed, but timestamps need to be reconstructed """ filesize = getsize(self.sessiondir + sep + filename) # in byte if filesize > 16384: data = np.memmap(self.sessiondir + sep + filename, dtype='<u4', shape=((filesize - 16384) / 4 / 261, 261), mode='r', offset=16384) ts = data[:, 0:2] multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data), axis=0) timestamps = np.sum(ts * multi, axis=1) # timestamps = data[:,0] + (data[:,1] *2**32) header_u4 = data[:, 2:5] return timestamps, header_u4 else: return None
def __mmap_ncs_packet_timestamps(self, filename): """ Memory map of the Neuralynx .ncs file optimized for extraction of data packet headers Reading standard dtype improves speed, but timestamps need to be reconstructed """ filesize = getsize(self.sessiondir + sep + filename) # in byte if filesize > 16384: data = np.memmap(self.sessiondir + sep + filename, dtype='<u4', shape=(int((filesize - 16384) / 4 / 261), 261), mode='r', offset=16384) ts = data[:, 0:2] multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data), axis=0) timestamps = np.sum(ts * multi, axis=1) # timestamps = data[:,0] + data[:,1]*2**32 return timestamps else: return None
def __mmap_nev_file(self, filename): """ Memory map the Neuralynx .nev file """ nev_dtype = np.dtype([ ('reserved', '<i2'), ('system_id', '<i2'), ('data_size', '<i2'), ('timestamp', '<u8'), ('event_id', '<i2'), ('ttl_input', '<i2'), ('crc_check', '<i2'), ('dummy1', '<i2'), ('dummy2', '<i2'), ('extra', '<i4', (8,)), ('event_string', 'a128'), ]) if getsize(self.sessiondir + sep + filename) > 16384: return np.memmap(self.sessiondir + sep + filename, dtype=nev_dtype, mode='r', offset=16384) else: return None
def rinexobs(obsfn,writeh5=None,maxtimes=None): stem,ext = splitext(expanduser(obsfn)) if ext[-1].lower() == 'o': #raw text file with open(obsfn,'r') as f: t=time.time() lines = f.read().splitlines(True) lines.append('') header,version,headlines,obstimes,sats,svset = scan(lines) print('{} is a RINEX {} file, {} kB.'.format(obsfn,version,getsize(obsfn)/1000.0)) data = processBlocks(lines,header,obstimes,svset,headlines,sats) print("finished in {0:.2f} seconds".format(time.time()-t)) #%% save to disk (optional) if writeh5: h5fn = stem + '.h5' print('saving OBS data to {}'.format(h5fn)) data.to_hdf(h5fn,key='OBS',mode='a',complevel=6,append=False) elif ext.lower() == '.h5': data = read_hdf(obsfn,key='OBS') print('loaded OBS data from {} to {}'.format(blocks.items[0],blocks.items[-1])) return data # this will scan the document for the header info and for the line on # which each block starts
def test_write_gff_file(self, seqprop_with_i, tmpdir): """Test writing the features, and that features are now loaded from a file""" outpath = tmpdir.join('test_seqprop_with_i_write_gff_file.gff').strpath seqprop_with_i.write_gff_file(outfile=outpath, force_rerun=True) # Test that the file was written assert op.exists(outpath) assert op.getsize(outpath) > 0 # Test that file paths are correct assert seqprop_with_i.feature_path == outpath assert seqprop_with_i.feature_file == 'test_seqprop_with_i_write_gff_file.gff' assert seqprop_with_i.feature_dir == tmpdir # Test that features cannot be changed with pytest.raises(ValueError): seqprop_with_i.features = ['NOFEATURES']
def clear_cache(force = False): """ If the folder exists, and has more than 5MB of icons in the cache, delete it to clear all the icons then recreate it. """ from os.path import getsize, join, isfile, exists from os import makedirs, listdir from sublime import cache_path from shutil import rmtree # The icon cache path icon_path = join(cache_path(), "GutterColor") # The maximum amount of space to take up limit = 5242880 # 5 MB if exists(icon_path): size = sum(getsize(join(icon_path, f)) for f in listdir(icon_path) if isfile(join(icon_path, f))) if force or (size > limit): rmtree(icon_path) if not exists(icon_path): makedirs(icon_path)
def test_open(self): store = self.create_ssh_store() target_filename = 'sample_text_file1.txt' with open(self.sample_text_file1, 'rb') as f: length = store.put(target_filename, f) self.assertEqual(length, getsize(self.sample_text_file1)) self.assertTrue(exists(join(self.temp_path, target_filename))) # Reading with store.open(target_filename, mode='rb') as stored_file, \ open(self.sample_text_file1, mode='rb') as original_file: self.assertEqual(stored_file.read(), original_file.read()) # Writing new_content = b'Some Binary Data' with store.open(target_filename, mode='wb') as stored_file: stored_file.write(new_content) with store.open(target_filename, mode='rb') as stored_file: self.assertEqual(stored_file.read(), new_content)
def test_open(self): store = FileSystemStore(self.temp_path, self.base_url) target_filename = 'test_open/sample_text_file1.txt' with open(self.sample_text_file1, 'rb') as f: length = store.put(target_filename, f) self.assertEqual(length, getsize(self.sample_text_file1)) self.assertTrue(exists(join(self.temp_path, target_filename))) # Reading with store.open(target_filename, mode='rb') as stored_file, \ open(self.sample_text_file1, mode='rb') as original_file: self.assertEqual(stored_file.read(), original_file.read()) # Writing new_content = b'Some Binary Data' with store.open(target_filename, mode='wb') as stored_file: stored_file.write(new_content) with store.open(target_filename, mode='rb') as stored_file: self.assertEqual(stored_file.read(), new_content)
def addService(self, service): from os import path from enigma import eServiceCenter, iServiceInformation from ServiceReference import ServiceReference from time import localtime, time self.source = service serviceHandler = eServiceCenter.getInstance() info = serviceHandler.info(service) sDescr = info and info.getInfoString(service, iServiceInformation.sDescription) or "" self.DVBdescr = sDescr sTimeCreate = info.getInfo(service, iServiceInformation.sTimeCreate) if sTimeCreate > 1: self.timeCreate = localtime(sTimeCreate) serviceref = ServiceReference(info.getInfoString(service, iServiceInformation.sServiceref)) name = info and info.getName(service) or "Title" + sDescr self.DVBname = name self.DVBchannel = serviceref.getServiceName() self.inputfile = service.getPath() self.filesize = path.getsize(self.inputfile) self.estimatedDiskspace = self.filesize self.length = info.getLength(service)
def shall_skip(module, opts): # type: (unicode, Any) -> bool """Check if we want to skip this module.""" # skip if the file doesn't exist and not using implicit namespaces if not opts.implicit_namespaces and not path.exists(module): return True # skip it if there is nothing (or just \n or \r\n) in the file if path.exists(module) and path.getsize(module) <= 2: return True # skip if it has a "private" name and this is selected filename = path.basename(module) if filename != '__init__.py' and filename.startswith('_') and \ not opts.includeprivate: return True return False
def get_sha1_by_slice(file_name, slice_size): """ Get SHA array based on Qcloud Slice Upload Interface :param file_name: local file path :param slice_size: slice size in bit :return: sha array like [{“offset”:0, “datalen”:1024,”datasha”:”aaa”}, {}, {}] """ from os import path with open(file_name, 'rb') as f: result = [] file_size = path.getsize(file_name) sha1_obj = Sha1Hash() for current_offset in range(0, file_size, slice_size): data_length = min(slice_size, file_size - current_offset) sha1_obj.update(f.read(data_length)) sha1_val = sha1_obj.inner_digest() result.append({"offset": current_offset, "datalen": data_length, "datasha": sha1_val}) result[-1]['datasha'] = sha1_obj.hexdigest() return result
def tqdm_open(filename, encoding='utf8'): """ ???????? ?????, ????????? ? tqdm """ total = getsize(filename) def wrapped_line_iterator(fd): with tqdm(total=total, unit="B", unit_scale=True, desc=basename(filename), miniters=1) as pb: processed_bytes = 0 for line in fd: processed_bytes += len(line) if processed_bytes >= 1024 * 1024: pb.update(processed_bytes) processed_bytes = 0 yield line pb.update(processed_bytes) with open(filename, encoding=encoding) as fd: yield wrapped_line_iterator(fd)
def hashsize(path): ''' Generate SHA-1 hash + file size string for the given filename path. Used to check integrity of downloads. Resulting string is space separated 'hash size': >>> hashsize('locally.py') 'fbb498a1d3a3a47c8c1ad5425deb46b635fac2eb 2006' ''' size = getsize(path) h = sha1() with open(path, 'rb') as source: while True: # read in 64k blocks, because some files are too big # and free memory is not enough c = source.read(64*1024) if not c: break h.update(c) return '%s %s' % (h.hexdigest(), size)
def timeit(parser_class, file_name, message, **kwargs): print(message, 'File:', file_name) file_name = join(dirname(__file__), 'test_inputs', file_name) file_size = getsize(file_name) print('File size: {:.2f}'.format(file_size/1000), 'KB') this_folder = dirname(__file__) g = Grammar.from_file(join(this_folder, 'rhapsody.pg')) parser = parser_class(g, **kwargs) t_start = time.time() with open(file_name) as f: parser.parse(f.read()) t_end = time.time() print('Elapsed time: {:.2f}'.format(t_end - t_start), 'sec') print('Speed = {:.2f}'.format(file_size/1000/(t_end - t_start)), 'KB/sec\n')
def judge_twice_url(self, save_path, save_type): ''' ???????????? ?? title ? mian.html???????????????? ?????????????? 1?title??? Page has moved 2???htmml????3KB ''' page_path = pjoin(save_path, 'main.html') url_path = pjoin(save_path, 'url_file') with open(page_path, 'r') as f: content = f.read() title = get_title(content) if title == 'Page has moved' or getsize(page_path) <= 3072: with open(url_path, 'r') as f: url = f.read() redict_url = self.get_twice_page_url(content) # ??????? if redict_url: if redict_url.find('http') != -1: logger['logger_file_debug'].debug( "redict_url: %s" % (redict_url, )) self.re_download_twice_web( redict_url, save_path, save_type)
def as_contigset(fasta_file, xml_file): if fasta_file == xml_file or xml_file is None: if not op.isfile(fasta_file) or op.getsize(fasta_file) == 0: return ContigSet() return ContigSet(fasta_file) file_size = op.getsize(fasta_file) fai_file = fasta_file + ".fai" if op.exists(fai_file): os.remove(fai_file) ds = ContigSet(fasta_file, generateIndices=True) ds.write(xml_file) if not file_size > 0: with open(fai_file, "w") as fai: fai.write("") return ds
def main(): with open('dlink_ftp.dlink.eu_filelist.csv', 'w') as fout: cw = csv.writer(fout, dialect='excel') cw.writerow(['ftp_url', 'file_size', 'file_date', 'model', 'file_sha1', 'file_md5']) with open('dlink_ftp.dlink.eu_filelist.txt', 'r') as fin: for line in fin: line=line.strip() if not line: continue ftpurl, fsize, fdate = line.split('\t', 2) fdate = datetime.fromtimestamp(float(fdate)) fname = 'output/D-Link/ftp.dlink.eu/' + ftpurl.split('/')[-1] sha1 = getFileSha1(fname) md5 = getFileMd5(fname) fsize = path.getsize(fname) model = get_model_from_ftp_url(ftpurl) cw.writerow([ftpurl, fsize, fdate, model,sha1,md5]) print('%s,%s,%s,%s'%(ftpurl,fsize,fdate,model))
def filedata(self): filedata = { resource.descriptor['path']: { 'name': resource.descriptor['name'], 'length': getsize(resource.source), 'md5': compute_hash(resource.source), 'type': resource.descriptor.get('mediatype', 'text/'+resource.descriptor['path'].split('.')[-1]), } for resource in self } descriptor_file = { basename(self.filepath): { 'name': self.name, 'length': getsize(self.filepath), 'md5': compute_hash(self.filepath), 'type': 'application/octet-stream', } } filedata.update(descriptor_file) return { 'filedata': filedata, 'metadata': { 'owner': self.user.id, 'name': self.name } }
def TestDownload(self): try: a = datetime.now() info('Excuting regular download test for network speed') url = self.config.cloudConfig.DownloadSpeedTestUrl if 'DownloadSpeedTestUrl' in self.config.cloudConfig else defaultUrl debug(url + ' ' + download_path) request.urlretrieve(url, download_path) request.urlcleanup() b = datetime.now() c = b - a if path.exists(download_path): size = path.getsize(download_path)/mb self.downloadSpeed = size/c.total_seconds() remove(download_path) return True except socket_error as serr: error ('TestDownload:' + str(serr)) ret = False Daemon.OnFailure('cloud', serr.errno) return except: exception('TestDownload Failed') return False
def rotator(source, dest): try: # print('Log rotator, pid:' + str(getpid())) size=path.getsize(source) if size > 100000000: # files larger than 100MB will be deleted remove(source) else: tar = tarfile.open(dest, "w:bz2") tar.add(source) tar.close() remove(source) # Remove old myDevices.log backups if they are older than a week. This code can be removed # in later versions if myDevices.log files have been replaced with cayenne.log. for old_file in iglob('/var/log/myDevices/myDevices.log*'): if path.getmtime(old_file) + 604800 < time.time(): remove(old_file) except Exception as ex: print('Log rotator failed with: ' +str(ex))
def addService(self, service): from os import path from enigma import eServiceCenter, iServiceInformation from ServiceReference import ServiceReference from time import localtime self.source = service serviceHandler = eServiceCenter.getInstance() info = serviceHandler.info(service) sDescr = info and info.getInfoString(service, iServiceInformation.sDescription) or "" self.DVBdescr = sDescr sTimeCreate = info.getInfo(service, iServiceInformation.sTimeCreate) if sTimeCreate > 1: self.timeCreate = localtime(sTimeCreate) serviceref = ServiceReference(info.getInfoString(service, iServiceInformation.sServiceref)) name = info and info.getName(service) or "Title" + sDescr self.DVBname = name self.DVBchannel = serviceref.getServiceName() self.inputfile = service.getPath() self.filesize = path.getsize(self.inputfile) self.estimatedDiskspace = self.filesize self.length = info.getLength(service)
def locate(self, spin, request): path = join(spin.app.app_dir, spin.app.static_dir, basename(request.path)) if not isfile(path): return # Where we are going to serve files. # I might spawn an event like FILE_NOT_FOUND. # So, users could use it to send appropriate answers. type_file, encoding = guess_type(path) default_type = 'application/octet-stream' spin.add_header(('Content-Type', type_file if type_file else default_type), ('Content-Length', getsize(path))) spin.send_headers() xmap(spin, OPEN_FILE_ERR, lambda con, err: lose(con)) drop(spin, path)
def upload_image(self, filename, album_uri, caption=None, title=None, keywords=None): headers = { 'User-Agent': self.__user_agent, 'X-Smug-ResponseType': 'JSON', 'X-Smug-Version': 'v2', 'Content-Type': guess_type(filename)[0], 'X-Smug-AlbumUri': album_uri, 'X-Smug-FileName': filename, 'Content-Length': path.getsize(filename), } if caption is not None: headers['X-Smug-Caption']=caption if title is not None: headers['X-Smug-Title']=title if keywords is not None: headers['X-Smug-Keywords']=keywords with open(filename, "rb") as f: data=f.read() return self.raw_post(self.UPLOAD_URL, data=data, headers=headers)
def traverse_dir(path): file_dict = {} dir_dict = {} count = 1 for root, dirs, files in walk(path): for d in dirs: abs_p = join(root, d) dir_dict[abs_p] = 0 print(abs_p) count += 1 if count % 200 == 0: print('%s files scanned' % count) for f in files: abs_p = join(root, f) file_dict[abs_p] = getsize(abs_p) print(abs_p) count += 1 if count % 200 == 0: print('%s files scanned' % count) return file_dict, dir_dict
def do_show_bulletin_attachment(request, bulletin_id, localfile, remotefile): #pylint: disable=unused-argument if not cavedb.perms.is_bulletin_allowed(bulletin_id): raise Http404 if not isfile(localfile): raise Http404 mimetype = guess_type(localfile)[0] if mimetype is None: mimetype = "application/octet-stream" try: wrapper = FileWrapper(open(localfile, 'rb')) response = FileResponse(wrapper, content_type=mimetype) if remotefile and (mimetype is None or not mimetype.startswith('image')): response['Content-Disposition'] = 'attachment; filename=' + remotefile response['Content-Length'] = getsize(localfile) except IOError: print('Cannot find %s\n' % (localfile), file=sys.stderr) raise Http404 return response
def getfilesize(filename, ratio=None): rawsize = op.getsize(filename) if not filename.endswith(".gz"): return rawsize import struct fo = open(filename, 'rb') fo.seek(-4, 2) r = fo.read() fo.close() size = struct.unpack('<I', r)[0] # This is only ISIZE, which is the UNCOMPRESSED modulo 2 ** 32 if ratio is None: return size # Heuristic heuristicsize = rawsize / ratio while size < heuristicsize: size += 2 ** 32 if size > 2 ** 32: logging.warn(\ "Gzip file estimated uncompressed size: {0}.".format(size)) return size
def file_manager(request): def slugify(text): import re return re.sub(r'[ /"#!:]+', '_', text) if not is_admin_or_root(request.user): raise PermissionDenied if request.method == 'POST': try: file = request.FILES['file'] save_uploaded_file_to(file, settings.UPLOAD_DIR, filename=slugify(file.name)) except Exception as e: raise PermissionDenied(repr(e)) return render(request, 'filemanager.jinja2', context={ 'file_list': list(map(lambda x: { 'name': x, 'modified_time': datetime.fromtimestamp(path.getmtime(path.join(settings.UPLOAD_DIR, x))). strftime(settings.DATETIME_FORMAT_TEMPLATE), 'size': str(path.getsize(path.join(settings.UPLOAD_DIR, x)) // 1024) + "K" }, filter(lambda x: path.isfile(path.join(settings.UPLOAD_DIR, x)), listdir(settings.UPLOAD_DIR)))) })
def dir_size(dir): """ calculate the size of files under a dir based on the os module example""" # It's really hard to give an approximate value for package's # installed size. Gettin a sum of all files' sizes if far from # being true. Using 'du' command (like Debian does) can be a # better solution :(. # Not really, du calculates size on disk, this is much better -- exa from os.path import getsize, islink, isdir, exists join = join_path if exists(dir) and (not isdir(dir) and not islink(dir)): #so, this is not a directory but file.. return getsize(dir) if islink(dir): return long(len(os.readlink(dir))) def sizes(): for root, dirs, files in os.walk(dir): yield sum([getsize(join(root, name)) for name in files if not islink(join(root,name))]) yield sum([long(len(os.readlink((join(root, name))))) for name in files if islink(join(root,name))]) return sum( sizes() )
def __init__(self, inputCsv, resolver): if not op.isfile(inputCsv): raise ValueError("Missing input file: %s" % inputCsv) nbytes = min(32, op.getsize(inputCsv)) raw= open(inputCsv, 'rb').read(nbytes) if raw.startswith(codecs.BOM_UTF8): raise TableValidationError("Input CSV file is in UTF-8 format. Please convert to ASCII or remove Byte Order Mark (BOM)") try: with open(inputCsv) as f: cr = csv.reader(f) allRows = list(cr) columnNames, rows = \ allRows[0], allRows[1:] self.tbl = eztable.Table(columnNames, rows) except: raise TableValidationError("Input CSV file can't be read/parsed:" + str(sys.exc_info()[0])) self._validateTable() self._resolveInputs(resolver)
def _download_wrapped_file(download): download_path = download.abspath # We do not allow symlinks as downloads for security reasons if not path.exists(download_path) or path.islink(download_path): return HttpResponse("Download not found", status=HTTP_NOT_FOUND) wrapper = FileWrapper(open(download_path, "rb")) response = HttpResponse(wrapper, content_type='application/force-download') response['Content-Disposition'] = 'attachment; filename="{}"'.format( DOWNLOAD_FNAME_TEMLATE.format( filename=path.basename(download_path), download_pk=download.pk, problem_slug=download.problem.slug ) ) response['Content-Length'] = path.getsize(download_path) return response
def compile(self, nb_procs): if nb_procs > 1: target = "lsa.mpi" else: target = "lsa" cmd_comp = "make -f %sMakefile -C %s %s 1>/dev/null" % ( self.getTempDirectory(), self.getTempDirectory(), target) res_comp = call(cmd_comp, stdout=open("%sout_optim_comp" % self.getTempDirectory(),"w"), stderr=open("%serr_optim_comp" % self.getTempDirectory(),"w"), shell=True, preexec_fn=setpgrp, close_fds=True) if res_comp != 0 or getsize(self.getTempDirectory() + "err_optim_comp") > 0: return self.OPTIM_FAILURE else: return self.OPTIM_SUCCESS
def Cleanup(delete_dir, delete_threshold, freeup_amount): free_space = FreeSpaceMB(delete_dir) if free_space < delete_threshold: files = [f for f in map(lambda x: join(delete_dir, x), listdir(delete_dir)) \ if isfile(f) and not islink(f)] # Sort files acsending based on their modification time. files.sort(key=lambda f: getmtime(f)) freed = 0.0 # Delete enough files to free up enough space that macthes freeup_amount for f in files: # Size of file in MB f_size = getsize(f) / 1024 / 1024 remove(f) print "Deleted ", f freed = freed + f_size if freed >= freeup_amount: break
def __len__(self): """ Returns the length of the content """ if not self.filepath: # If there is no filepath, then we're probably dealing with a # stream in memory like a StringIO or BytesIO stream. if self.stream: # Advance to the end of the file ptr = self.stream.tell() # Advance to the end of the file and get our length length = self.stream.seek(0L, SEEK_END) if length != ptr: # Return our pointer self.stream.seek(ptr, SEEK_SET) else: # No Stream or Filepath; nothing has been initialized # yet at all so just return 0 length = 0 else: if self.stream and self._dirty is True: self.stream.flush() self._dirty = False # Get the size length = getsize(self.filepath) return length
def __init__(self, filename, counter): self.filename = path.abspath(filename) self.queue = Queue() self.check_chain = CheckerChain(self.queue, counter) self.observer = Observer() self.fd = None self.offset = 0 if path.isfile(self.filename): self.fd = open(self.filename) self.offset = path.getsize(self.filename)
def on_moved(self, event): if path.abspath(event.src_path) == self.filename: self.fd.close() if path.abspath(event.dest_path) == self.filename and path.isfile(self.filename): self.fd = open(self.filename) self.offset = path.getsize(self.filename)
def on_created(self, event): if path.abspath(event.src_path) == self.filename and path.isfile(self.filename): self.fd = open(self.filename) self.offset = path.getsize(self.filename)
def valid_file_exists(file): """Determines whether a file exists and its "valid" (i.e., the file size is greater than 0; if it's 0, it probably faild dueto an RPC error)""" return path.isfile(file) and path.getsize(file) > 0 #endregion
def needs_download(url, filepath): if not op.exists(filepath): return True else: response = requests.head(url) remote_size = int(response.headers['Content-Length']) local_size = op.getsize(filepath) if remote_size > local_size: return True else: return False
def __mmap_nse_packets(self, filename): """ Memory map of the Neuralynx .ncs file optimized for extraction of data packet headers Reading standard dtype improves speed, but timestamps need to be reconstructed """ filesize = getsize(self.sessiondir + sep + filename) # in byte if filesize > 16384: data = np.memmap(self.sessiondir + sep + filename, dtype='<u2', shape=((filesize - 16384) / 2 / 56, 56), mode='r', offset=16384) # reconstructing original data # first 4 ints -> timestamp in microsec timestamps = data[:, 0] + data[:, 1] * 2 ** 16 + data[:, 2] * 2 ** 32 + \ data[ :, 3] * 2 ** 48 channel_id = data[:, 4] + data[:, 5] * 2 ** 16 cell_number = data[:, 6] + data[:, 7] * 2 ** 16 features = [data[:, p] + data[:, p + 1] * 2 ** 16 for p in range(8, 23, 2)] features = np.array(features, dtype='i4') data_points = data[:, 24:56].astype('i2') del data return timestamps, channel_id, cell_number, features, data_points else: return None
def __mmap_ncs_data(self, filename): """ Memory map of the Neuralynx .ncs file optimized for data extraction""" if getsize(self.sessiondir + sep + filename) > 16384: data = np.memmap(self.sessiondir + sep + filename, dtype=np.dtype(('i2', (522))), mode='r', offset=16384) # removing data packet headers and flattening data return data[:, 10:] else: return None
def __mmap_ntt_file(self, filename): """ Memory map the Neuralynx .nse file """ nse_dtype = np.dtype([ ('timestamp', '<u8'), ('sc_number', '<u4'), ('cell_number', '<u4'), ('params', '<u4', (8,)), ('data', '<i2', (32, 4)), ]) if getsize(self.sessiondir + sep + filename) > 16384: return np.memmap(self.sessiondir + sep + filename, dtype=nse_dtype, mode='r', offset=16384) else: return None