我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用getopt.error()。
def parseArgs(self, argv): try: options, args = getopt.getopt(argv[1:], 'hHvqi:', ['help','verbose','quiet','impl=']) for opt, value in options: if opt in ('-h','-H','--help'): self.usageExit() if opt in ('-q','--quiet'): self.verbosity = 0 if opt in ('-v','--verbose'): self.verbosity = 2 if opt in ('-i','--impl'): self.testLoader.impl = value if len(args) == 0 and self.defaultTest is None: self.test = self.testLoader.loadTestsFromModule(self.module) return if len(args) > 0: self.testNames = args else: self.testNames = (self.defaultTest,) self.createTests() except getopt.error, msg: self.usageExit(msg)
def _deliver(self, mailfrom, rcpttos, data): import smtplib refused = {} try: s = smtplib.SMTP() s.connect(self._remoteaddr[0], self._remoteaddr[1]) try: refused = s.sendmail(mailfrom, rcpttos, data) finally: s.quit() except smtplib.SMTPRecipientsRefused, e: print >> DEBUGSTREAM, 'got SMTPRecipientsRefused' refused = e.recipients except (socket.error, smtplib.SMTPException), e: print >> DEBUGSTREAM, 'got', e.__class__ # All recipients were refused. If the exception had an associated # error code, use it. Otherwise,fake it with a non-triggering # exception code. errcode = getattr(e, 'smtp_code', -1) errmsg = getattr(e, 'smtp_error', 'ignore') for r in rcpttos: refused[r] = (errcode, errmsg) return refused
def main(): """Small main program""" import sys, getopt try: opts, args = getopt.getopt(sys.argv[1:], 'deut') except getopt.error as msg: sys.stdout = sys.stderr print(msg) print("""usage: %s [-d|-e|-u|-t] [file|-] -d, -u: decode -e: encode (default) -t: encode and decode string 'Aladdin:open sesame'"""%sys.argv[0]) sys.exit(2) func = encode for o, a in opts: if o == '-e': func = encode if o == '-d': func = decode if o == '-u': func = decode if o == '-t': test(); return if args and args[0] != '-': with open(args[0], 'rb') as f: func(f, sys.stdout.buffer) else: func(sys.stdin.buffer, sys.stdout.buffer)
def parseArgs(self, argv): try: options, args = getopt.getopt(argv[1:], 'hHvqi:', ['help','verbose','quiet','impl=']) for opt, value in options: if opt in ('-h','-H','--help'): self.usageExit() if opt in ('-q','--quiet'): self.verbosity = 0 if opt in ('-v','--verbose'): self.verbosity = 2 if opt in ('-i','--impl'): self.impl = value if len(args) == 0 and self.defaultTest is None: self.test = self.testLoader.loadTestsFromModule(self.module) return if len(args) > 0: self.testNames = args else: self.testNames = (self.defaultTest,) self.createTests() except getopt.error, msg: self.usageExit(msg)
def xatom(self, name, *args): """Allow simple extension commands notified by server in CAPABILITY response. Assumes command is legal in current state. (typ, [data]) = <instance>.xatom(name, arg, ...) Returns response appropriate to extension command `name'. """ name = name.upper() #if not name in self.capabilities: # Let the server decide! # raise self.error('unknown extension command: %s' % name) if not name in Commands: Commands[name] = (self.state,) return self._simple_command(name, *args) # Private methods
def _get_line(self): line = self.readline() if not line: raise self.abort('socket error: EOF') # Protocol mandates all lines terminated by CRLF if not line.endswith('\r\n'): raise self.abort('socket error: unterminated line') line = line[:-2] if __debug__: if self.debug >= 4: self._mesg('< %s' % line) else: self._log('< %s' % line) return line
def main(): global verbose, filename_only try: opts, args = getopt.getopt(sys.argv[1:], "qv") except getopt.error, msg: errprint(msg) return for o, a in opts: if o == '-q': filename_only = filename_only + 1 if o == '-v': verbose = verbose + 1 if not args: errprint("Usage:", sys.argv[0], "[-v] file_or_directory ...") return for arg in args: check(arg)
def _find_grail_rc(self): import glob import pwd import socket import tempfile tempdir = os.path.join(tempfile.gettempdir(), ".grail-unix") user = pwd.getpwuid(os.getuid())[0] filename = os.path.join(tempdir, user + "-*") maybes = glob.glob(filename) if not maybes: return None s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) for fn in maybes: # need to PING each one until we find one that's live try: s.connect(fn) except socket.error: # no good; attempt to clean it out, but don't fail: try: os.unlink(fn) except IOError: pass else: return s
def redirect_internal(self, url, fp, errcode, errmsg, headers, data): if 'location' in headers: newurl = headers['location'] elif 'uri' in headers: newurl = headers['uri'] else: return void = fp.read() fp.close() # In case the server sent a relative URL, join with original: newurl = basejoin(self.type + ":" + url, newurl) # For security reasons we do not allow redirects to protocols # other than HTTP, HTTPS or FTP. newurl_lower = newurl.lower() if not (newurl_lower.startswith('http://') or newurl_lower.startswith('https://') or newurl_lower.startswith('ftp://')): raise IOError('redirect error', errcode, errmsg + " - Redirection to url '%s' is not allowed" % newurl, headers) return self.open(newurl)
def Args(self, args=[]): # process command-line options try: opts, args = getopt.getopt(args, 'hcl:', ['help', 'clear', 'lines=']) except getopt.error, msg: raise 'MyArgError' if len(args) > 1: raise 'MyArgError' self.file = args[0] for o, a in opts: if o in ('-h', '--help'): raise 'MyUsage' if o in ('-c', '--clear'): self.SetClear() if o in ('-l', '--lines'): self.lines_per_page = int(a)
def save_pickle(self, dumpfile=DUMPFILE): if not self.changed: self.note(0, "\nNo need to save checkpoint") elif not dumpfile: self.note(0, "No dumpfile, won't save checkpoint") else: self.note(0, "\nSaving checkpoint to %s ...", dumpfile) newfile = dumpfile + ".new" f = open(newfile, "wb") pickle.dump(self, f) f.close() try: os.unlink(dumpfile) except os.error: pass os.rename(newfile, dumpfile) self.note(0, "Done.") return 1
def open_file(self, url): path = urllib.url2pathname(urllib.unquote(url)) if os.path.isdir(path): if path[-1] != os.sep: url = url + '/' indexpath = os.path.join(path, "index.html") if os.path.exists(indexpath): return self.open_file(url + "index.html") try: names = os.listdir(path) except os.error, msg: raise IOError, msg, sys.exc_traceback names.sort() s = MyStringIO("file:"+url, {'content-type': 'text/html'}) s.write('<BASE HREF="file:%s">\n' % urllib.quote(os.path.join(path, ""))) for name in names: q = urllib.quote(name) s.write('<A HREF="%s">%s</A>\n' % (q, q)) s.seek(0) return s return urllib.FancyURLopener.open_file(self, url)
def main(): verbose = webchecker.VERBOSE try: opts, args = getopt.getopt(sys.argv[1:], "qv") except getopt.error, msg: print msg print "usage:", sys.argv[0], "[-qv] ... [rooturl] ..." return 2 for o, a in opts: if o == "-q": verbose = 0 if o == "-v": verbose = verbose + 1 c = Sucker() c.setflags(verbose=verbose) c.urlopener.addheaders = [ ('User-agent', 'websucker/%s' % __version__), ] for arg in args: print "Adding root", arg c.addroot(arg) print "Run..." c.run()
def makedirs(dir): if not dir: return if os.path.exists(dir): if not os.path.isdir(dir): try: os.rename(dir, dir + ".bak") os.mkdir(dir) os.rename(dir + ".bak", os.path.join(dir, "index.html")) except os.error: pass return head, tail = os.path.split(dir) if not tail: print "Huh? Don't know how to make dir", dir return makedirs(head) os.mkdir(dir, 0777)
def close(self, **kw): """(typ, [data]) = close() Close currently selected mailbox. Deleted messages are removed from writable mailbox. This is the recommended command before 'LOGOUT'.""" if self.state != 'SELECTED': raise self.error('No mailbox selected.') try: typ, dat = self._simple_command('CLOSE') finally: self.state = AUTH if __debug__: self._log(1, 'state => AUTH') self._release_state_change() return self._deliver_dat(typ, dat, kw)
def getURLContents(self, url, data=None): "Returns the contents of the given URL as an Unicode string" s = "" success = False req = Request(url, data, {'User-agent': self.useragent}) try: f = urlopen(req) s = f.read() f.close() success = True except HTTPError, e: print 'Server error: ', e.code if (self.verbose and BaseHTTPRequestHandler.responses.has_key(e.code)): title, msg = BaseHTTPRequestHandler.responses[e.code] print title + ": " + msg except URLError, e: print 'Connection error: ', e.reason dammit = UnicodeDammit(s) return (success, dammit.unicode)
def uid(self, command, *args): """Execute "command arg ..." with messages identified by UID, rather than message number. (typ, [data]) = <instance>.uid(command, arg1, arg2, ...) Returns response appropriate to 'command'. """ command = command.upper() if not command in Commands: raise self.error("Unknown IMAP4 UID command: %s" % command) if self.state not in Commands[command]: raise self.error("command %s illegal in state %s, " "only allowed in states %s" % (command, self.state, ', '.join(Commands[command]))) name = 'UID' typ, dat = self._simple_command(name, command, *args) if command in ('SEARCH', 'SORT', 'THREAD'): name = command else: name = 'FETCH' return self._untagged_response(typ, dat, name)
def test(): """Small test program""" import sys, getopt try: opts, args = getopt.getopt(sys.argv[1:], 'deut') except getopt.error, msg: sys.stdout = sys.stderr print msg print """usage: %s [-d|-e|-u|-t] [file|-] -d, -u: decode -e: encode (default) -t: encode and decode string 'Aladdin:open sesame'"""%sys.argv[0] sys.exit(2) func = encode for o, a in opts: if o == '-e': func = encode if o == '-d': func = decode if o == '-u': func = decode if o == '-t': test1(); return if args and args[0] != '-': with open(args[0], 'rb') as f: func(f, sys.stdout) else: func(sys.stdin, sys.stdout)
def _GetServiceShortName(longName): # looks up a services name # from the display name # Thanks to Andy McKay for this code. access = win32con.KEY_READ | win32con.KEY_ENUMERATE_SUB_KEYS | win32con.KEY_QUERY_VALUE hkey = win32api.RegOpenKey(win32con.HKEY_LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services", 0, access) num = win32api.RegQueryInfoKey(hkey)[0] longName = longName.lower() # loop through number of subkeys for x in range(0, num): # find service name, open subkey svc = win32api.RegEnumKey(hkey, x) skey = win32api.RegOpenKey(hkey, svc, 0, access) try: # find display name thisName = str(win32api.RegQueryValueEx(skey, "DisplayName")[0]) if thisName.lower() == longName: return svc except win32api.error: # in case there is no key called DisplayName pass return None # Open a service given either it's long or short name.
def RemoveService(serviceName): try: import perfmon perfmon.UnloadPerfCounterTextStrings("python.exe "+serviceName) except (ImportError, win32api.error): pass hscm = win32service.OpenSCManager(None,None,win32service.SC_MANAGER_ALL_ACCESS) try: hs = SmartOpenService(hscm, serviceName, win32service.SERVICE_ALL_ACCESS) win32service.DeleteService(hs) win32service.CloseServiceHandle(hs) finally: win32service.CloseServiceHandle(hscm) import win32evtlogutil try: win32evtlogutil.RemoveSourceFromRegistry(serviceName) except win32api.error: pass
def __FindSvcDeps(findName): if type(findName) is pywintypes.UnicodeType: findName = str(findName) dict = {} k = win32api.RegOpenKey(win32con.HKEY_LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services") num = 0 while 1: try: svc = win32api.RegEnumKey(k, num) except win32api.error: break num = num + 1 sk = win32api.RegOpenKey(k, svc) try: deps, typ = win32api.RegQueryValueEx(sk, "DependOnService") except win32api.error: deps = () for dep in deps: dep = dep.lower() dep_on = dict.get(dep, []) dep_on.append(svc) dict[dep]=dep_on return __ResolveDeps(findName, dict)
def GetServiceClassString(cls, argv = None): if argv is None: argv = sys.argv import pickle modName = pickle.whichmodule(cls, cls.__name__) if modName == '__main__': try: fname = win32api.GetFullPathName(argv[0]) path = os.path.split(fname)[0] # Eaaaahhhh - sometimes this will be a short filename, which causes # problems with 1.5.1 and the silly filename case rule. # Get the long name fname = os.path.join(path, win32api.FindFiles(fname)[0][8]) except win32api.error: raise error("Could not resolve the path name '%s' to a full path" % (argv[0])) modName = os.path.splitext(fname)[0] return modName + "." + cls.__name__
def RunCommand(cmd): """RunCommand Runs command specified by user @param command to execute """ logging.debug("Running cmd %s", cmd) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True) o, e = p.communicate() s = p.returncode if s != 0: return (s, e) return (s, o) # returns error, or None for OK # opts is dictionary of {option: value}. # for now we care about size and (maybe) policy
def validate_vsan_policy_name(policy_name, vmdk_path): """ Ensure that the policy file exists """ if not vsan_info.is_on_vsan(vmdk_path): raise ValidationError('Cannot use a VSAN policy on a non-VSAN datastore') if not vsan_policy.policy_exists(policy_name): err_msg = 'Policy {0} does not exist.'.format(policy_name) # If valid policies exist, append their names along with error message # for available policy names that can be used avail_policies = vsan_policy.get_policies() if avail_policies: avail_msg = ' Available policies are: {0}'.format(list(avail_policies.keys())) err_msg = err_msg + avail_msg raise ValidationError(err_msg)
def set_policy_to_vmdk(vmdk_path, opts, vol_name=None): """ Set VSAN policy to the vmdk object If failed, delete the vmdk file and return the error info to be displayed on client """ out = vsan_policy.set_policy_by_name(vmdk_path, opts[kv.VSAN_POLICY_NAME]) if out: # If policy is incompatible/wrong, return the error and delete the vmdk_path msg = ("Failed to create volume %s: %s" % (vol_name, out)) logging.warning(msg) error_info = err(msg) clean_err = cleanVMDK(vmdk_path=vmdk_path, vol_name=vol_name) if clean_err: logging.warning("Failed to clean %s file: %s", vmdk_path, clean_err) error_info = error_info + clean_err return error_info return None
def getVMDK(vmdk_path, vol_name, datastore): """Checks if the volume exists, and returns error if it does not""" # Note: will return more Volume info here, when Docker API actually accepts it logging.debug("getVMDK: vmdk_path=%s vol_name=%s, datastore=%s", vmdk_path, vol_name, datastore) file_exist = os.path.isfile(vmdk_path) logging.debug("getVMDK: file_exist=%d", file_exist) if not os.path.isfile(vmdk_path): return err("Volume {0} not found (file: {1})".format(vol_name, vmdk_path)) # Return volume info - volume policy, size, allocated capacity, allocation # type, creat-by, create time. try: result = vol_info(kv.getAll(vmdk_path), kv.get_vol_info(vmdk_path), datastore) except Exception as ex: logging.error("Failed to get disk details for %s (%s)" % (vmdk_path, ex)) return None return result
def GetGeneratePath(): """Returns the name of the path to generate to. Checks the directory is OK. """ assert not is_readonly, "Why do you want the genpath for a readonly store?" try: os.makedirs(win32com.__gen_path__) #os.mkdir(win32com.__gen_path__) except os.error: pass try: fname = os.path.join(win32com.__gen_path__, "__init__.py") os.stat(fname) except os.error: f = open(fname,"w") f.write('# Generated file - this directory may be deleted to reset the COM cache...\n') f.write('import win32com\n') f.write('if __path__[:-1] != win32com.__gen_path__: __path__.append(win32com.__gen_path__)\n') f.close() return win32com.__gen_path__ # # The helpers for win32com.client.Dispatch and OCX clients. #
def main(): import getopt usage = """Usage: %s [-n | -t] url -n: open new window -t: open new tab""" % sys.argv[0] try: opts, args = getopt.getopt(sys.argv[1:], 'ntd') except getopt.error, msg: print >>sys.stderr, msg print >>sys.stderr, usage sys.exit(1) new_win = 0 for o, a in opts: if o == '-n': new_win = 1 elif o == '-t': new_win = 2 if len(args) != 1: print >>sys.stderr, usage sys.exit(1) url = args[0] open(url, new_win) print "\a"
def _deliver(self, mailfrom, rcpttos, data): import smtplib refused = {} try: s = smtplib.SMTP() s.connect(self._remoteaddr[0], self._remoteaddr[1]) try: refused = s.sendmail(mailfrom, rcpttos, data) finally: s.quit() except smtplib.SMTPRecipientsRefused as e: print('got SMTPRecipientsRefused', file=DEBUGSTREAM) refused = e.recipients except (socket.error, smtplib.SMTPException) as e: print('got', e.__class__, file=DEBUGSTREAM) # All recipients were refused. If the exception had an associated # error code, use it. Otherwise,fake it with a non-triggering # exception code. errcode = getattr(e, 'smtp_code', -1) errmsg = getattr(e, 'smtp_error', 'ignore') for r in rcpttos: refused[r] = (errcode, errmsg) return refused
def replace_stdout(): """Set stdout encoder error handler to backslashreplace (as stderr error handler) to avoid UnicodeEncodeError when printing a traceback""" if os.name == "nt": # Replace sys.stdout breaks the stdout newlines on Windows: issue #8533 return import atexit stdout = sys.stdout sys.stdout = open(stdout.fileno(), 'w', encoding=stdout.encoding, errors="backslashreplace", closefd=False) def restore_stdout(): sys.stdout.close() sys.stdout = stdout atexit.register(restore_stdout)
def main(): global verbose, filename_only try: opts, args = getopt.getopt(sys.argv[1:], "qv") except getopt.error as msg: errprint(msg) return for o, a in opts: if o == '-q': filename_only = filename_only + 1 if o == '-v': verbose = verbose + 1 if not args: errprint("Usage:", sys.argv[0], "[-v] file_or_directory ...") return for arg in args: check(arg)
def main(): import getopt usage = """Usage: %s [-n | -t] url -n: open new window -t: open new tab""" % sys.argv[0] try: opts, args = getopt.getopt(sys.argv[1:], 'ntd') except getopt.error as msg: print(msg, file=sys.stderr) print(usage, file=sys.stderr) sys.exit(1) new_win = 0 for o, a in opts: if o == '-n': new_win = 1 elif o == '-t': new_win = 2 if len(args) != 1: print(usage, file=sys.stderr) sys.exit(1) url = args[0] open(url, new_win) print("\a")
def runsource(self, source): "Extend base class method: Stuff the source in the line cache first" filename = self.stuffsource(source) self.more = 0 self.save_warnings_filters = warnings.filters[:] warnings.filterwarnings(action="error", category=SyntaxWarning) # at the moment, InteractiveInterpreter expects str assert isinstance(source, str) #if isinstance(source, str): # from idlelib import IOBinding # try: # source = source.encode(IOBinding.encoding) # except UnicodeError: # self.tkconsole.resetoutput() # self.write("Unsupported characters in input\n") # return try: # InteractiveInterpreter.runsource() calls its runcode() method, # which is overridden (see below) return InteractiveInterpreter.runsource(self, source, filename) finally: if self.save_warnings_filters is not None: warnings.filters[:] = self.save_warnings_filters self.save_warnings_filters = None
def main(argv=None): if argv is None: argv = sys.argv try: try: opts, args = getopt.getopt(argv[1:], "h", ["help"]) except getopt.error, msg: raise Usage(msg) #more code, unchanged for opt, value in opts: if opt in ("-h", "--help"): print 'please input null parameter to run.' elif opt in ("-t", "--test"): print 'not used, just test.' else: start() except Usage, err: print >>sys.stderr, err.msg print >>sys.stderr, "for help use --help" return 2