我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用http.server()。
def testInvalidServer(self): """Test download on non-existent server""" spec = { 'url' : "https://127.1.2.3:7257" } archive = SimpleHttpArchive(spec) archive.wantDownload(True) archive.wantUpload(True) # Local archive.downloadPackage(b'\x00'*20, "unused", "unused", 0) archive.downloadPackage(b'\x00'*20, "unused", "unused", 1) self.assertEqual(archive.downloadLocalLiveBuildId(b'\x00'*20, 0), None) # Jenkins with TemporaryDirectory() as workspace: with open(os.path.join(workspace, "test.buildid"), "wb") as f: f.write(b'\x00'*20) script = archive.download(None, "test.buildid", "result.tgz") callJenkinsScript(script, workspace)
def __init__(self, job_id, job_info): super().__init__(job_info) self.id = job_id self.last_dispatched = time.time() self.start_time = time.time() self.finish_time = self.start_time # force one chunk for process jobs if self.type == netrender.model.JOB_PROCESS: self.chunks = 1 # Force WAITING status on creation self.status = netrender.model.JOB_WAITING # special server properties self.last_update = 0 self.save_path = "" self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end, rfile.signature) for rfile in job_info.files]
def do_HEAD(self): if self.path == "/status": job_id = self.headers.get('job-id', "") job_frame = int(self.headers.get('job-frame', -1)) job = self.server.getJobID(job_id) if job: frame = job[job_frame] if frame: self.send_head(http.client.OK) else: # no such frame self.send_head(http.client.NO_CONTENT) else: # no such job id self.send_head(http.client.NO_CONTENT) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def clientScan(report = None): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) s.settimeout(30) s.bind(('', 8000)) buf, address = s.recvfrom(64) address = address[0] port = int(str(buf, encoding='utf8')) reporting(report, "Master server found") return (address, port) except socket.timeout: reporting(report, "No master server on network", IOError) return ("", 8000) # return default values
def clientVerifyVersion(conn, timeout): with ConnectionContext(timeout): conn.request("GET", "/version") response = conn.getresponse() if response.status != http.client.OK: conn.close() return False server_version = response.read() if server_version != VERSION: print("Incorrect server version!") print("expected", str(VERSION, encoding='utf8'), "received", str(server_version, encoding='utf8')) return False return True
def setup(self): # SSLify.. if self.server.sslkey: import ssl sslca = self.server.sslca keyfile = self.server.sslkey certfile = self.server.sslcrt sslreq = ssl.CERT_NONE # If they specify a CA key, require valid client certs if sslca: sslreq=ssl.CERT_REQUIRED self.request = ssl.wrap_socket(self.request, keyfile=keyfile, certfile=certfile, ca_certs=sslca, cert_reqs=sslreq, server_side=True) http.server.BaseHTTPRequestHandler.setup(self)
def handleLogin(self, objname, authinfo): if not self.server.getSharedObject(objname): raise Exception('Shared object does not exists!') if self.server.authmod: if not authinfo or not self.server.authmod.authCobraUser(authinfo): self.send_response(http.client.UNAUTHORIZED) self.end_headers() else: sesskey = base64.b64encode( os.urandom(32) ) self.server.sessions[ sesskey ] = (authinfo, time.time()) c = http.cookies.SimpleCookie() c['SessionId'] = sesskey # set morsel self.send_response(http.client.OK) self.send_header('Set-Cookie', list(c.values())[0].output(header='')) self.send_header("Content-type", "text/html") self.end_headers() return self.send_response(http.client.OK) self.end_headers()
def shareObject(self, obj, name=None, doref=False): """ Share an object in this cobra server. By specifying doref=True you will let CobraProxy objects decide that the object is done and should be un-shared. Also, if name == None a random name is chosen. Returns: name (or the newly generated random one) """ refcnt = None if doref: refcnt = 0 if name == None: raise Exception("You must specify an object name to share!") self.shared[name] = obj if doref: raise Exception("CobraHttp doesnt do refcnt") return name
def quick_api(api_key, secret_key, port=8000): """ This method helps you get access to linkedin api quickly when using it from the interpreter. Notice that this method creates http server and wait for a request, so it shouldn't be used in real production code - it's just an helper for debugging The usage is basically: api = quick_api(KEY, SECRET) After you do that, it will print a URL to the screen which you must go in and allow the access, after you do that, the method will return with the api object. """ auth = LinkedInAuthentication(api_key, secret_key, 'http://localhost:8000/', list(PERMISSIONS.enums.values())) app = LinkedInApplication(authentication=auth) print(auth.authorization_url) _wait_for_user_to_enter_browser(app, port) return app
def local_thumb(self, bitstream, mimetype): """ upload thumbnail to simple http server""" webserver_ip =[(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1] req_handler = local_server.ImageRequestHandler # create a webserver to handle a single request on a free port or a specific port if passed in the parameter port = 0 req_handler.content_type = mimetype req_handler.content = bitstream self.imageserver = http.server.HTTPServer((webserver_ip, port), req_handler) self.imagethread = threading.Thread(target=self.imageserver.handle_request) self.imagethread.start() url = "http://%s:%s" % (webserver_ip, str(self.imageserver.server_port)) return url
def local_sub(self, filename, mimetype): """serve a local subtitle file""" if os.path.isfile(filename): filename = os.path.abspath(filename) else: return None webserver_ip =[(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1] req_handler = local_server.SubtitleRequestHandler # create a webserver to handle a single request on a free port or a specific port if passed in the parameter port = 0 self.subtitleserver = http.server.HTTPServer((webserver_ip, port), req_handler) self.subtitlethread = threading.Thread(target=self.subtitleserver.handle_request) self.subtitlethread.start() url = "http://%s:%s%s" % (webserver_ip, str(self.subtitleserver.server_port), quote_plus(filename, "/")) return url
def __init__(self, handler_class, port, ip='', keyfile=None, certfile=None): """Initialize new TcpIpHttpEndpoint object Args: handler_class (obj): a request handler class that will be handling requests received by internal httpd server port (int): tcp port that httpd server will listen on ip (str): ip address that httpd server will listen on, by default listen on all addresses """ if certfile is not None and keyfile is not None: endpoint_id = "https://{}:{}".format(ip, port) else: endpoint_id = "http://{}:{}".format(ip, port) super().__init__(endpoint_id) self._context.data['listen_ip'] = ip self._context.data['listen_port'] = port self._context.data['certfile'] = certfile self._context.data['keyfile'] = keyfile self._handler_class = handler_class self.__setup_httpd_thread(ip, port)
def __init__(self, handler_class, path, keyfile=None, certfile=None): """Initialize new UnixSocketHTTPEndpoint object Args: handler_class (obj): a request handler class that will be handling requests received by internal httpd server path (str): Unix socket path, that internal httpd server will listen on """ if certfile is not None and keyfile is not None: endpoint_id = "https://{}".format(path) else: endpoint_id = "http://{}".format(path) super().__init__(endpoint_id) self._context.data['socket_path'] = path self._context.data['certfile'] = certfile self._context.data['keyfile'] = keyfile self._handler_class = handler_class self.__cleanup_stale_socket(path) self.__setup_httpd_thread(path)
def __setup_httpd_thread(self, socket_path): """Setup internal HTTPd server that this endpoints relies on to serve requests. Args: path (str): Unix socket path, that internal httpd server will listen on """ self._httpd = UnixSocketStatefulHTTPServer(self._context, socket_path, self._handler_class) httpd_thread_name = "UnixSocketHttpdThread-{}".format(self.id) self._httpd_thread = threading.Thread(target=self._httpd.serve_forever, name=httpd_thread_name) # nginx spawns worker processes as 'nobody/nogroup', so we need to # make the socket available to it. os.chmod(socket_path, 0o777)
def start_server(self): """ Start background daemon to serve our events. """ handler_class = self.create_handler() class BackgroundServer(threading.Thread): def run(self): logger.info("Starting background server for monitor on port %d", MemoryStorage.SERVER_PORT) try: httpd = _ThreadingSimpleServer((MemoryStorage.SERVER_HOST, MemoryStorage.SERVER_PORT), handler_class) httpd.serve_forever() except Exception as exc: logger.info("Background server stopped: %s", str(exc)) try: thread = BackgroundServer(daemon=True) thread.start() except RuntimeError: logger.warning("Failed to start monitor server:", exc_info=True)
def test_run(): Monitor.environment = "test" # type: ignore memory = MemoryStorage() MonitorPayload.dispatchers.append(memory) schema_names = ["auburn", "burgundy", "cardinal", "flame", "fuchsia"] table_names = ["apple", "banana", "cantaloupe", "durian", "fig"] index = {"current": 0, "final": len(schema_names) * len(table_names)} host = MemoryStorage.SERVER_HOST if MemoryStorage.SERVER_HOST else "localhost" print("Creating events ... follow along at http://{0}:{1}/".format(host, MemoryStorage.SERVER_PORT)) with Monitor("color.fruit", "test", index=dict(current=1, final=1, name="outer")): for i, names in enumerate(itertools.product(schema_names, table_names)): try: with Monitor('.'.join(names), "test", index=dict(index, current=i + 1)): time.sleep(random.uniform(0.5, 2.0)) # Create an error on one "table" so that highlighting of errors can be tested: if i == 9: raise RuntimeError("An error occurred!") except RuntimeError: pass input("Press return (or Ctrl-c) to stop server\n")
def internal_server_error(error): logger.debug("An internel server error occured") logger.error(traceback.format_exc()) return json.dumps({'success':'false', 'message':'500 Internal Server Error', 'Unauthorized': 'True'})
def setUp(self): super().setUp() self.httpd = socketserver.ThreadingTCPServer(("localhost", 0), createHttpHandler(self.repo.name)) self.ip, self.port = self.httpd.server_address self.server = threading.Thread(target=self.httpd.serve_forever) self.server.daemon = True self.server.start()
def run_server(host, port, priv_key, cert, i2pseeds_file): """Start HTTPS server""" Handler = ReseedHandler Handler.i2pseeds_file = i2pseeds_file httpd = http.server.HTTPServer((host, int(port)), Handler) httpd.socket = ssl.wrap_socket(httpd.socket, server_side=True, keyfile=priv_key, certfile=cert, ssl_version=ssl.PROTOCOL_TLSv1) try: httpd.serve_forever() except KeyboardInterrupt: exit()
def rsyncwebsite(options): # Copy to the server os.environ['RSYNC_RSH'] = '/usr/bin/ssh' src_path = path(options.sphinx.builddir) / 'html' sh('(cd %s; rsync --archive --delete --verbose . %s:%s)' % (src_path, options.website.server, options.website.server_path)) return
def test(options): # Copy to the local site src_path = path(options.sphinx.builddir) / 'html' os.chdir(src_path) server_address = ('', 8080) httpd = http.server.HTTPServer(server_address, http.server.SimpleHTTPRequestHandler) httpd.serve_forever() return
def deploy(options): """Rebuild and copy website files to the remote server. """ # Rebuild html_clean(options) # Copy the sdist into the html output directory. sdist(options) # Rebuild the site-map buildsitemap(options) # Install rsyncwebsite(options) # Update Google notify_google(options) return
def execute(self, context): netsettings = context.scene.network_render # open connection to make sure server exists conn = clientConnection(netsettings, report = self.report) if conn: conn.close() if netsettings.use_ssl: webbrowser.open("https://%s:%i" % (netsettings.server_address, netsettings.server_port)) else: webbrowser.open("http://%s:%i" % (netsettings.server_address, netsettings.server_port)) return {'FINISHED'}
def web_server(port): server = http.server.HTTPServer # Comment out the above line and uncomment the below for Python 2.7.x. #server = BaseHTTPServer.HTTPServer handler = http.server.CGIHTTPRequestHandler #RequestsHandler # Comment out the above line and uncomment the below for Python 2.7.x. #handler = CGIHTTPServer.CGIHTTPRequestHandler #RequestsHandler server_address = ("", port) handler.cgi_directories = ["/cgi-bin", ] httpd = server(server_address, handler) print ("Starting web server with CGI support on port: %s ..." %port) httpd.serve_forever()
def from_import(module_name, *symbol_names, **kwargs): """ Example use: >>> HTTPConnection = from_import('http.client', 'HTTPConnection') >>> HTTPServer = from_import('http.server', 'HTTPServer') >>> urlopen, urlparse = from_import('urllib.request', 'urlopen', 'urlparse') Equivalent to this on Py3: >>> from module_name import symbol_names[0], symbol_names[1], ... and this on Py2: >>> from future.moves.module_name import symbol_names[0], ... or: >>> from future.backports.module_name import symbol_names[0], ... except that it also handles dotted module names such as ``http.client``. """ if PY3: return __import__(module_name) else: if 'backport' in kwargs and bool(kwargs['backport']): prefix = 'future.backports' else: prefix = 'future.moves' parts = prefix.split('.') + module_name.split('.') module = importlib.import_module(prefix + '.' + module_name) output = [getattr(module, name) for name in symbol_names] if len(output) == 1: return output[0] else: return output
def webServer(config): os.chdir('generated/') Handler = http.server.SimpleHTTPRequestHandler try: PORT = int(config['ETC']['server-port']) except: print('Port not specified in config, defaulting to 8080') PORT = 8080 try: ADDR = config['ETC']['server-ip'] except: print('Address not specified in config, defaulting to 127.0.0.1') ADDR = '127.0.0.1' Handler = http.server.SimpleHTTPRequestHandler try: with socketserver.TCPServer((ADDR, PORT), Handler) as httpd: def closeServer(): os.chdir('../') print('Closed server') httpd.server_close() sys.exit(0) def server_signal_handler(signal, frame): closeServer() signal.signal(signal.SIGINT, server_signal_handler) print(''' NOTICE: This server is meant primarily for site testing purposes. For increased security, speed, and for advanced features such as https, consider using a full web server like Caddy, Nginx, or Apache ''') print('Serving at port', PORT, 'on', ADDR) atexit.register(closeServer) httpd.serve_forever() except OSError: return 'port already taken'
def tcp_keepalive(server, transport): sock = transport.get_extra_info('socket') sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
def tcp_keepalive(server, transport): # pragma: no cover pass
def test_simple_HTTP(self) : # ============== Setting up an HTTP server at 'http://localhost:8001/' in current directory try : PORT = int(sys.argv[1]) except : PORT = randint(1025, 65535) try : Handler = SimpleHTTPServer.SimpleHTTPRequestHandler httpd = SocketServer.TCPServer(("", PORT), Handler) except : Handler = http.server.SimpleHTTPRequestHandler httpd = socketserver.TCPServer(("", PORT), Handler) print ("Serving at port %d" % PORT) http_thread = Thread( target = httpd.serve_forever, ) http_thread.daemon = True # ============== Starting the HTTP server http_thread.start() # ============== Wait until HTTP server is ready sleep(1) with remote_repo(['test_package'], base_url = 'http://localhost:%d/' % PORT) : from test_package import module1 self.assertTrue(module1.dummy_str) # If this point is reached then the module1 is imported succesfully!
def __init__(self, config, handler_class): super().__init__(name='Webhook', daemon=False) self.config = config if self.config.use_https: self.server = WebhookHTTPServer(('', self.config.port), handler_class) self.server.socket = ssl.wrap_socket(self.server.socket.socket, keyfile=config.https_keyfile, certfile=config.https_certfile, server_side=True) else: self.server = WebhookHTTPServer(('', self.config.port), handler_class) self.server.init_props()
def webhook_url(self): if not hasattr(self, '_webhook_url'): self.hostname = self.config.host if self.config.host != '' else resolve_public_ip() self._webhook_url = '%s://%s:%d/%s' % ( 'https' if self.config.use_https else 'http', self.hostname, self.server.server_port, self.server.session_token) return self._webhook_url
def set_worker(self, worker): self.server.worker_thread = worker
def stop(self): self.server.shutdown()
def run(self): logging.info('Webhook server listening on %s.', self.webhook_url) self.server.serve_forever() logging.info('Webhook server stopped.')
def get_voters(amount: hug.types.number = 20): try: voters = sorted(json.loads( linecache.getline("/root/workspace/Chaos/server/voters.json", 1)).items(), key=lambda x: x[1], reverse=True) if amount > 0: voters = voters[:amount] return {x[0]: x[1] for x in voters} except: __log.exception("Failed to read voters!")
def get_meritocracy(amount: hug.types.number = 20): try: meritocracy = json.loads( linecache.getline("/root/workspace/Chaos/server/meritocracy.json", 1)) if amount > 0: meritocracy = meritocracy[:amount] return meritocracy except: __log.exception("Failed to read meritocracy!")
def server(username, filename, data, parsedQuery): # Ignore filename if data: if data == "sfs_version": return Responder.writeResult(VERSION) elif data == "python_version": return Responder.writeResult(sys.version) else: return Responder.error("invalid server information request") else: return Responder.error("no server information requested")
def test_can_import_several(self): """ This test failed in v0.12-pre if e.g. future/standard_library/email/header.py contained: from future import standard_library standard_library.remove_hooks() """ import future.moves.urllib.parse as urllib_parse import future.moves.urllib.request as urllib_request import http.server for m in [urllib_parse, urllib_request, http.server]: self.assertTrue(m is not None)
def test_other_http_imports(self): import http import http.server import http.cookies import http.cookiejar self.assertTrue(True)
def __init__(self, *args, **kwargs): self.handlers = { '__cobra_hello' : self.handleHello, '__cobra_getattr' : self.handleGetAttr, '__cobra_setattr' : self.handleSetAttr, '__cobra_login' : self.handleLogin } http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def do_POST(self): # this is a json posted data list (args, kwargs) body = json.loads(urllib.parse.unquote(self.rfile.read( int(self.headers['Content-Length'])) )) #self.handleClient(s.server, s, s.path, body=body) self.handleClient(body=body)
def handleHello(self, objname): ''' Hello messages are used to get the initial cache of method names for the newly connected object. ''' if verbose: print("GOT A HELLO") obj = self.server.getSharedObject(objname) ret = {} for name in dir(obj): if type(getattr(obj,name)) in (types.MethodType, types.BuiltinMethodType): ret[name] = True self.send_response(http.client.OK) self.end_headers() self.wfile.write(json.dumps(ret)) return
def handleSetAttr(self, objname, name, value): if not self.server.attr: self.send_response(http.client.FORBIDDEN) self.end_headers() excinfo = "__setattr__ disabled" self.wfile.write(json.dumps(excinfo)) return obj = self.server.getSharedObject(objname) if verbose: print("SETTING ATTRIBUTE:", urlparams['args']) setattr(obj, name, value) self.send_response(http.client.OK) self.end_headers()
def setAuthModule(self, authmod): ''' Enable an authentication module for this server ( all connections *must* be authenticated through the authmod ) NOTE: See cobra.auth.* for various auth module implementations Example: import cobra.auth.shadow as c_a_shadow authmod = c_a_shadow.ShadowFileAuth('passwdfile.txt') cdaemon = CobraDaemon() cdaemon.setAuthModule() ''' self.authmod = authmod