我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.UnsupportedOperation()。
def super_len(o): if hasattr(o, '__len__'): return len(o) if hasattr(o, 'len'): return o.len if hasattr(o, 'fileno'): try: fileno = o.fileno() except io.UnsupportedOperation: pass else: return os.fstat(fileno).st_size if hasattr(o, 'getvalue'): # e.g. BytesIO, cStringIO.StringI return len(o.getvalue())
def chunks(self, chunk_size=None): """ Read the file and yield chunks of ``chunk_size`` bytes (defaults to ``UploadedFile.DEFAULT_CHUNK_SIZE``). """ if not chunk_size: chunk_size = self.DEFAULT_CHUNK_SIZE try: self.seek(0) except (AttributeError, UnsupportedOperation): pass while True: data = self.read(chunk_size) if not data: break yield data
def super_len(o): if hasattr(o, '__len__'): return len(o) if hasattr(o, 'len'): return o.len if hasattr(o, 'fileno'): try: fileno = o.fileno() except io.UnsupportedOperation: pass else: return os.fstat(fileno).st_size if hasattr(o, 'getvalue'): # e.g. BytesIO, cStringIO.StringIO return len(o.getvalue())
def _is_daemon(): # The process group for a foreground process will match the # process group of the controlling terminal. If those values do # not match, or ioctl() fails on the stdout file handle, we assume # the process is running in the background as a daemon. # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics try: is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno()) except io.UnsupportedOperation: # Could not get the fileno for stdout, so we must be a daemon. is_daemon = True except OSError as err: if err.errno == errno.ENOTTY: # Assume we are a daemon because there is no terminal. is_daemon = True else: raise return is_daemon
def total_len(o): if hasattr(o, '__len__'): return len(o) if hasattr(o, 'len'): return o.len if hasattr(o, 'fileno'): try: fileno = o.fileno() except io.UnsupportedOperation: pass else: return os.fstat(fileno).st_size if hasattr(o, 'getvalue'): # e.g. BytesIO, cStringIO.StringIO return len(o.getvalue())
def __init__(self, stdin=None): self.stdin = stdin or sys.stdin # The input object should be a TTY. assert self.stdin.isatty() # Test whether the given input object has a file descriptor. # (Idle reports stdin to be a TTY, but fileno() is not implemented.) try: # This should not raise, but can return 0. self.stdin.fileno() except io.UnsupportedOperation: if 'idlelib.run' in sys.modules: raise io.UnsupportedOperation( 'Stdin is not a terminal. Running from Idle is not supported.') else: raise io.UnsupportedOperation('Stdin is not a terminal.')
def deal_with_empty_file_stream(data): """???????????0???????????""" if hasattr(data, 'fileno') and hasattr(data, 'tell'): try: fileno = data.fileno() total_length = os.fstat(fileno).st_size current_position = data.tell() if total_length - current_position == 0: return "" except io.UnsupportedOperation: return "" return data
def super_len(o): total_length = 0 current_position = 0 if hasattr(o, '__len__'): total_length = len(o) elif hasattr(o, 'len'): total_length = o.len elif hasattr(o, 'getvalue'): # e.g. BytesIO, cStringIO.StringIO total_length = len(o.getvalue()) elif hasattr(o, 'fileno'): try: fileno = o.fileno() except io.UnsupportedOperation: pass else: total_length = os.fstat(fileno).st_size # Having used fstat to determine the file length, we need to # confirm that this file was opened up in binary mode. if 'b' not in o.mode: warnings.warn(( "Requests has determined the content-length for this " "request using the binary size of the file: however, the " "file has been opened in text mode (i.e. without the 'b' " "flag in the mode). This may lead to an incorrect " "content-length. In Requests 3.0, support will be removed " "for files in text mode."), FileModeWarning ) if hasattr(o, 'tell'): current_position = o.tell() return max(0, total_length - current_position)
def seek(self, position: int, whence: int = 0) -> None: """ Seek the file at the given position. .. note:: The :exc:`io.UnsupportedOperation` will be raised if the underlying file-object is not :meth:`.seekable`. :param position: the position to seek on. :param whence: optional whence argument. """ raise NotImplementedError('Seek operation is not supported by this object: %r' % self) # pragma: no cover
def read(self, size=None): raise io.UnsupportedOperation('read')
def readinto(self, buf): raise io.UnsupportedOperation('readinto')
def total_len(o): # Stolen from requests_toolbelt and modified if hasattr(o, '__len__'): return len(o) if hasattr(o, 'len'): return o.len if hasattr(o, 'fileno'): try: fileno = o.fileno() except io.UnsupportedOperation: pass else: return os.fstat(fileno).st_size if hasattr(o, 'getvalue'): # e.g. BytesIO, cStringIO.StringIO return len(o.getvalue()) try: current_pos = o.tell() length = o.seek(0, 2) o.seek(current_pos, 0) return length except IOError: pass
def test_writing_to_stdout_is_diverted_if_broken(self, sys): def sample(**kwargs): assert kwargs['stdin'] is None assert kwargs['stdout'] is not None assert kwargs['stdout'] is not sys.stdout sys.stdout.fileno.side_effect = io.UnsupportedOperation utils.wrap_subprocess_call(sample)(stdin=None, stdout=None)
def test_writing_to_stdout_is_not_diverted_if_wrapstdout_is_false(self, sys): def sample(**kwargs): assert kwargs['stdin'] is None assert kwargs['stdout'] is None sys.stdout.fileno.side_effect = io.UnsupportedOperation utils.wrap_subprocess_call(sample, wrap_stdout=False)(stdin=None, stdout=None)
def test_writing_to_stderr_is_diverted_if_broken(self, sys): def sample(**kwargs): assert kwargs['stderr'] is not None assert kwargs['stderr'] is not sys.stdout sys.stderr.fileno.side_effect = io.UnsupportedOperation utils.wrap_subprocess_call(sample)(stderr=None)
def test_reading_from_stdin_is_not_diverted(self, sys): def sample(**kwargs): assert kwargs['stdin'] is None assert kwargs['stdout'] is None sys.stdin.fileno.side_effect = io.UnsupportedOperation utils.wrap_subprocess_call(sample)(stdin=None, stdout=None)
def test_arbitrary_outputs_are_not_replaced_even_if_stdout_is_broken(self, sys): output = io.StringIO() def sample(**kwargs): assert kwargs['stdin'] is None assert kwargs['stdout'] is output sys.stdout.fileno.side_effect = io.UnsupportedOperation utils.wrap_subprocess_call(sample)(stdin=None, stdout=output)
def test_writing_to_broken_stderr_is_output_after_returning(self, sys): def sample(**kwargs): kwargs['stderr'].write('Output') # The output isn't written until we return sys.stderr.write.assert_not_called() sys.stderr.fileno.side_effect = io.UnsupportedOperation utils.wrap_subprocess_call(sample)(stderr=None) sys.stderr.write.assert_called_once_with('Output')
def wrap_subprocess_call(func, wrap_stdout=True): @functools.wraps(func) def wrapper(*popenargs, **kwargs): out = kwargs.get('stdout', None) err = kwargs.get('stderr', None) replay_out = False replay_err = False if out is None and wrap_stdout: try: sys.stdout.fileno() except io.UnsupportedOperation: kwargs['stdout'] = tempfile.NamedTemporaryFile() replay_out = True if err is None: try: sys.stderr.fileno() except io.UnsupportedOperation: kwargs['stderr'] = tempfile.NamedTemporaryFile() replay_err = True try: return func(*popenargs, **kwargs) finally: if replay_out: kwargs['stdout'].seek(0) sys.stdout.write(kwargs['stdout'].read()) if replay_err: kwargs['stderr'].seek(0) sys.stderr.write(kwargs['stderr'].read()) return wrapper
def open_stream(self, file_stream, **keywords): """open ods file stream""" if not hasattr(file_stream, 'seek'): # python 2 # Hei zipfile in odfpy would do a seek # but stream from urlib cannot do seek file_stream = BytesIO(file_stream.read()) try: file_stream.seek(0) except UnsupportedOperation: # python 3 file_stream = BytesIO(file_stream.read()) BookReader.open_stream(self, file_stream, **keywords) self._load_from_memory()
def has_fileno(obj): if not hasattr(obj, "fileno"): return False # check BytesIO case and maybe others try: obj.fileno() except (AttributeError, IOError, io.UnsupportedOperation): return False return True
def __read(self, n): if not self._readable: raise UnsupportedOperation('read') while True: try: return _read(self._fileno, n) except (IOError, OSError) as ex: if ex.args[0] not in ignored_errors: raise self.hub.wait(self._read_event)
def write(self, b): if not self._writable: raise UnsupportedOperation('write') while True: try: return _write(self._fileno, b) except (IOError, OSError) as ex: if ex.args[0] not in ignored_errors: raise self.hub.wait(self._write_event)