我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.setdefaulttimeout()。
def get_ips_from_url(url): """ Retrieve IPs from url :param str url: The url to resolve :rtype: list :return: the list of resolved IP address for given url """ try: parsed = urlparse(url) if parsed.hostname: socket.setdefaulttimeout(5) ips = socket.gethostbyname_ex(parsed.hostname)[2] return ips except (ValueError, socket.error, socket.gaierror, socket.herror, socket.timeout): pass
def Scrape(url): timeout = 10 socket.setdefaulttimeout(timeout) #Collecting html content. headers = {'User-Agent': 'TorScrapper - Onion scrapper | github.com/ConanKapoor/TorScrapper.git' } req = urllib.request.Request(url,None,headers) response = urllib.request.urlopen(req) #Using BeautifulSoup to parse html object response. page = BeautifulSoup(response.read(),'html.parser') #Saving output token = re.sub(r'[^\w]', '', url) name = os.path.abspath("") + '/Output/Scraped-' + token +'.html' file = open(name,'w') file.write(str(page)) file.close() # Taking input.
def run(self): # loop over the "in" queue and get a new port and scan it socket.setdefaulttimeout(TIMEOUT) while 1: host, port = self.inq.get() sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # connect to the given host:port sd.connect((host, port)) except socket.error: # set the CLOSED flag self.outq.put((host, port, 'CLOSED')) else: # set the OPEN flag self.outq.put((host, port, 'OPEN')) sd.close()
def _do_trakt_auth_post(self, url, data): try: session = self.get_session() headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + session, 'trakt-api-version': '2', 'trakt-api-key': self.CLIENT_ID } # timeout in seconds timeout = 5 socket.setdefaulttimeout(timeout) request = urllib2.Request(url, data, headers) response = urllib2.urlopen(request).read() self.logger.info('Response: {0}'.format(response)) return response except urllib2.HTTPError as e: self.logger.error('Unable to submit post data {url} - {error}'.format(url=url, error=e.reason)) raise
def test(host): socket.setdefaulttimeout(5) if int(verbose) == 1: s.send("PRIVMSG %s :%s%s\r\n" % (CHAN, "[+] Testing:", host)) try: if host[:7] != "http://": host = "http://"+host source = urllib2.urlopen(host).read() if re.search(MATCH, source): s.send("PRIVMSG %s :%s%s\r\n" % (CHAN, "[!] Found:", host)) file = open("foundsqli.txt", "a") file.write("\n[!] Found: "+host) file.close() else: if int(verbose) == 1: s.send("PRIVMSG %s :%s%s\r\n" % (CHAN, "[-] Not Vuln:", host)) except(socket.gaierror, socket.timeout, socket.error), msg: s.send("PRIVMSG %s :%s%s @ %s\r\n" % (CHAN, "[-] Error: ",msg, host)) except: pass
def __setHTTPTimeout(): """ Set the HTTP timeout """ if conf.timeout: debugMsg = "setting the HTTP timeout" logger.debug(debugMsg) conf.timeout = float(conf.timeout) if conf.timeout < 3.0: warnMsg = "the minimum HTTP timeout is 3 seconds, sqlmap " warnMsg += "will going to reset it" logger.warn(warnMsg) conf.timeout = 3.0 else: conf.timeout = 10.0 socket.setdefaulttimeout(conf.timeout)
def check(ip, port, timeout): try: socket.setdefaulttimeout(timeout) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, int(port))) flag = "envi" # envi # dump # reqs # ruok # stat s.send(flag) data = s.recv(1024) print(data) s.close() if 'Environment' in data: return u"Zookeeper Unauthorized access" except: pass
def checkTraficLight(self): self.activityTimer.callback.remove(self.checkTraficLight) self.activityTimer.start(100, False) from urllib import urlopen import socket import os currentTimeoutDefault = socket.getdefaulttimeout() socket.setdefaulttimeout(3) message = "" picon = None default = True socket.setdefaulttimeout(currentTimeoutDefault) if default: self.showDisclaimer() else: message += "\n" + _("Do you want to update your receiver?") self.session.openWithCallback(self.startActualUpdate, MessageBox, message, default = default, picon = picon)
def ipkgCallback(self, event, param): if event == IpkgComponent.EVENT_DONE: if self.updating: self.updating = False self.ipkg.startCmd(IpkgComponent.CMD_UPGRADE_LIST) elif self.ipkg.currentCommand == IpkgComponent.CMD_UPGRADE_LIST: self.total_packages = len(self.ipkg.getFetchedList()) print ('[OnlineVersionCheck] %s Updates available' % self.total_packages) if self.total_packages: from urllib import urlopen import socket currentTimeoutDefault = socket.getdefaulttimeout() socket.setdefaulttimeout(3) config.softwareupdate.updatefound.setValue(True) try: config.softwareupdate.updateisunstable.setValue(urlopen("http://odisealinux.com/feeds/" + getImageVersion() + "/status").read()) except: config.softwareupdate.updateisunstable.setValue(1) socket.setdefaulttimeout(currentTimeoutDefault) else: config.softwareupdate.updatefound.setValue(False) else: config.softwareupdate.updatefound.setValue(False) pass
def build_http(): """Builds httplib2.Http object Returns: A httplib2.Http object, which is used to make http requests, and which has timeout set by default. To override default timeout call socket.setdefaulttimeout(timeout_in_sec) before interacting with this method. """ if socket.getdefaulttimeout() is not None: http_timeout = socket.getdefaulttimeout() else: http_timeout = DEFAULT_HTTP_TIMEOUT_SEC return httplib2.Http(timeout=http_timeout)
def get_browser(self): '''Returns a mechanize.Browser object configured with the framework's global options.''' br = mechanize.Browser() # set the user-agent header br.addheaders = [('User-agent', self._global_options['user-agent'])] # set debug options if self._global_options['verbosity'] >= 2: br.set_debug_http(True) br.set_debug_redirects(True) br.set_debug_responses(True) # set proxy if self._global_options['proxy']: br.set_proxies({'http': self._global_options['proxy'], 'https': self._global_options['proxy']}) # additional settings br.set_handle_robots(False) # set timeout socket.setdefaulttimeout(self._global_options['timeout']) return br
def single_host(ip): try: socket.inet_aton(ip) except socket.error: return 'Error: Invalid IP address.' results = '' for p in PORTS: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) c = s.connect_ex((ip, p)) socket.setdefaulttimeout(0.5) state = 'open' if not c else 'closed' results += '{:>5}/tcp {:>7}\n'.format(p, state) return results.rstrip()
def __recv_msg_compat(sock,size,timeout): # compatibility implementation for non-MSG_WAITALL / M2Crypto msglen=0 msglist=[] # Receive chunks of max. 60kb size: # (rather arbitrary limit, but it avoids memory/buffer problems on certain OSes -- VAX/VMS, Windows) while msglen<size: chunk=sock.recv(min(60000,size-msglen)) if not chunk: if hasattr(sock,'pending'): # m2crypto ssl socket - they have problems with a defaulttimeout if socket.getdefaulttimeout() != None: raise ConnectionClosedError("m2crypto SSL can't be used when socket.setdefaulttimeout() has been set") err = ConnectionClosedError('connection lost') err.partialMsg=''.join(msglist) # store the message that was received until now raise err msglist.append(chunk) msglen+=len(chunk) return ''.join(msglist) # Send a message over a socket. Raises ConnectionClosedError if the msg # couldn't be sent (the connection has probably been lost then). # We need this because 'send' isn't guaranteed to send all desired # bytes in one call, for instance, when network load is high.
def TTSBaidu(self, tid, txt, lan, spd): ''' get the BAIDU.COM's TTS url filename: save the txt's Speech in the file with filetype .wav lan: language, 'en' for English or 'zh' for Chinese txt: the TTS text spd: the speedding of read ''' socket.setdefaulttimeout(34.0) try: #print('processing... ',tid, txt) ttsUrl = genTTSUrl(lan ,txt, spd) c = getpage(ttsUrl) #?master????? #print('processing...finished',tid) self.results[tid]=c except urllib.error.URLError as e: print("error:URLError ",e," we will try again...tid:",tid) self.TTSBaidu(tid, txt, lan, spd) except socket.timeout: print("error: TTSBaidu time out!, we will try again...tid:",tid ) self.TTSBaidu(tid, txt, lan, spd) finally: pass
def workFunc(task): try: socket.setdefaulttimeout(10) i = requests.get(task['url']) except: return((False, task)) if i.status_code == requests.codes.ok: filename = '{0}'.format(task['filename'].replace(' ', '_')) f = iopen(filename, 'wb') f.write(i.content) f.close() return((True, task)) else: return((False, task)) # read in tasking for files not already downloaded
def run(): try: try: socket.setdefaulttimeout(float(variables['timeout'][0])) except ValueError: printError('invalid timeout') return ModuleError("invalid timeout") conn = http.client.HTTPConnection(variables['target'][0]) conn.request("HEAD","/index.html") res = conn.getresponse() results = res.getheaders() print('') for item in results: print(colors.yellow+item[0], item[1]+colors.end) print('') return results except http.client.InvalidURL: printError('invalid url') return ("invalid url") except socket.gaierror: printError('name or service not known') return ModuleError("name or service not known") except socket.timeout: printError('timeout') return ModuleError("timeout")
def getLatestImageTimestamp(self): currentTimeoutDefault = socket.getdefaulttimeout() socket.setdefaulttimeout(3) try: # TODO: Use Twisted's URL fetcher, urlopen is evil. And it can # run in parallel to the package update. from time import strftime from datetime import datetime imageVersion = about.getImageTypeString().split(" ")[1] imageVersion = (int(imageVersion) < 5 and "%.1f" or "%s") % int(imageVersion) url = "http://openpli.org/download/timestamp/%s~%s" % (HardwareInfo().get_device_model(), imageVersion) try: latestImageTimestamp = datetime.fromtimestamp(int(urlopen(url, timeout=5).read())).strftime(_("%Y-%m-%d %H:%M")) except: # OpenPli 5.0 uses python 2.7.11 and here we need to bypass the certificate check from ssl import _create_unverified_context latestImageTimestamp = datetime.fromtimestamp(int(urlopen(url, timeout=5, context=_create_unverified_context()).read())).strftime(_("%Y-%m-%d %H:%M")) except: latestImageTimestamp = "" socket.setdefaulttimeout(currentTimeoutDefault) return latestImageTimestamp
def start(self,stopEvent,port=PORT_NUMBER): global PORT_NUMBER global HOST_NAME global g_stopEvent print 'port',port,'HOST_NAME',HOST_NAME g_stopEvent = stopEvent socket.setdefaulttimeout(10) server_class = ThreadedHTTPServer #MyHandler.protocol_version = "HTTP/1.1" MyHandler.protocol_version = "HTTP/1.1" httpd = server_class((HOST_NAME, port), MyHandler) print "XBMCLocalProxy Starts - %s:%s" % (HOST_NAME, port) while(True and not stopEvent.isSet()): httpd.handle_request() httpd.server_close() print "XBMCLocalProxy Stops %s:%s" % (HOST_NAME, port)
def main(): ports = [21,23,22] ips = "192.168.195." for octet in range(0,255): for port in ports: ip = ips + str(octet) #print("[*] Testing port %s at IP %s") % (port, ip) try: socket.setdefaulttimeout(1) s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((ip,port)) output = s.recv(1024) print("[+] The banner: %s for IP: %s at Port: %s") % (output,ip,port) except: print("[-] Failed to Connect to %s:%s") % (ip, port) finally: s.close()
def scantcp(threadName,rmip,r1,r2,c): try: for port in range(r1,r2): sock= socket.socket(socket.AF_INET,socket.SOCK_STREAM) #sock= socket.socket(socket.AF_INET,socket.SOCK_DGRAM) socket.setdefaulttimeout(c) result = sock.connect_ex((rmip,port)) if result==0: print "Port Open:---->\t", port,"--", data.get(port, "Not in Database") sock.close() except KeyboardInterrupt: print "You stop this " sys.exit() except socket.gaierror: print "Hostname could not be resolved" sys.exit() except socket.error: print "could not connect to server" sys.exit() shelf.close()
def check(ip, port, timeout): try: socket.setdefaulttimeout(timeout) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, int(port))) data = binascii.a2b_hex( "3a000000a741000000000000d40700000000000061646d696e2e24636d640000000000ffffffff130000001069736d6173746572000100000000") s.send(data) result = s.recv(1024) if "ismaster" in result: getlog_data = binascii.a2b_hex( "480000000200000000000000d40700000000000061646d696e2e24636d6400000000000100000021000000026765744c6f670010000000737461727475705761726e696e67730000") s.send(getlog_data) result = s.recv(1024) if "totalLinesWritten" in result: return u"?????" except Exception, e: pass
def check(ip, port, timeout): try: socket.setdefaulttimeout(timeout) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, port)) flag = "PUT /vultest.txt HTTP/1.1\r\nHost: %s:%d\r\nContent-Length: 9\r\n\r\nxxscan0\r\n\r\n" % (ip, port) s.send(flag) time.sleep(1) data = s.recv(1024) s.close() if 'PUT' in data: url = 'http://' + ip + ":" + str(port) + '/vultest.txt' request = urllib2.Request(url) res_html = urllib2.urlopen(request, timeout=timeout).read(204800) if 'xxscan0' in res_html: return u"iis webdav??" except Exception, e: pass
def check(ip, port, timeout): socket.setdefaulttimeout(timeout) user_list = ['root'] for user in user_list: for pass_ in PASSWORD_DIC: try: pass_ = str(pass_.replace('{user}', user)) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((ip, int(port))) packet = sock.recv(254) # print packet plugin, scramble = get_scramble(packet) auth_data = get_auth_data(user, pass_, scramble, plugin) sock.send(auth_data) result = sock.recv(1024) if result == "\x07\x00\x00\x02\x00\x00\x00\x02\x00\x00\x00": return u"?????????%s????%s" % (user, pass_) except Exception, e: if "Errno 10061" in str(e) or "timed out" in str(e): return
def check(ip, port, timeout): try: socket.setdefaulttimeout(timeout) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, port)) filename = random_str(6) flag = "PUT /fileserver/sex../../..\\styles/%s.txt HTTP/1.0\r\nContent-Length: 9\r\n\r\nxxscan0\r\n\r\n"%(filename) s.send(flag) time.sleep(1) s.recv(1024) s.close() url = 'http://' + ip + ":" + str(port) + '/styles/%s.txt'%(filename) res_html = urllib2.urlopen(url, timeout=timeout).read(1024) if 'xxscan0' in res_html: return u"???????????" + url except: pass
def check(ip, port, timeout): try: socket.setdefaulttimeout(timeout) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, int(port))) s.send("INFO\r\n") result = s.recv(1024) if "redis_version" in result: return u"?????" elif "Authentication" in result: for pass_ in PASSWORD_DIC: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, int(port))) s.send("AUTH %s\r\n" % (pass_)) result = s.recv(1024) if '+OK' in result: return u"?????????%s" % (pass_) except Exception, e: pass
def audit(arg): port =9200 host = arg pluginList = ['test','kopf', 'HQ', 'marvel', 'bigdesk' ,'head' ] try: for plugin in pluginList: socket.setdefaulttimeout(3) s = socket.socket() s.connect((host,port)) s.send("GET /_plugin/%s/ HTTP/1.0\n" "Host: %s\n\n" % (plugin, host)) file = s.recv(16) if ("HTTP/1.0 200 OK" in file): grab(plugin,host,port) break except Exception: pass finally: s.close()
def check(host,port): try: s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) socket.setdefaulttimeout(20) s.connect((host,port)) payload1='\x05\x02\x00\x02'#socks 5 ??1 02????????? s.send(payload1) data1=s.recv(1024) if data1[0]!='\x05': # Server response 05 02 use username/password validate #Don't support user/pass authmethod s.close() return False if data1[1]=='\x00': security_warning('socks5: %s:%s => NO AUTHENTICATION REQUIRED'%(host,str(port))) return False elif data1[1]=='\x02': return True else: return False except: if s: s.close() return False
def initialisation(host,port): ''' ??????????,????????????? ''' flag=False s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) rsync={"MagicHeader":"@RSYNCD:","HeaderVersion":" 30.0"} payload=struct.pack("!8s5ss",rsync["MagicHeader"],rsync["HeaderVersion"],"\n")#init try: socket.setdefaulttimeout(20)#?? s.connect((host,port))#????????? s.send(payload) data=s.recv(1024) reply=struct.unpack('!8s5ss',data) if len(reply)==3: flag=True #???? rsynclist=ClientQuery(s) #????? except Exception : pass finally: s.close() if flag: return True,reply[0],reply[1],rsynclist return False,"port not open"
def socket_timeout(seconds=15): cto = socket.getdefaulttimeout() try: socket.setdefaulttimeout(seconds) yield finally: socket.setdefaulttimeout(cto)
def socket_timeout(timeout=15): def _socket_timeout(func): def _socket_timeout(*args, **kwargs): old_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(timeout) try: return func(*args, **kwargs) finally: socket.setdefaulttimeout(old_timeout) return _socket_timeout return _socket_timeout
def setUpClass(cls): socket.setdefaulttimeout(60) if os.environ.get('WITH_SELENIUM', False): time.sleep(1) # Start a virtual display server for running the tests headless. if os.environ.get('SELENIUM_HEADLESS', False): cls.vdisplay = xvfbwrapper.Xvfb(width=1280, height=720) cls.vdisplay.start() cls.selenium = WebDriver() super(SeleniumTestCase, cls).setUpClass()