我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用BaseHTTPServer.BaseHTTPRequestHandler.__init__()。
def __init__(self, cmd_queue, commands, config, sessions, *args, **kwargs): """ Constructor. """ # Commands queue. self.cmd_queue = cmd_queue # Commands array in shared memory. self.commands = commands # Sessions array in shared memory. self.sessions = sessions # Configuration instance. self.config = config # HTTP server version. self.server_version = "temboard-agent/0.0.1" # HTTP request method self.http_method = None # HTTP query. self.query = None # HTTP POST content in json format. self.post_json = None # Call HTTP request handler constructor. BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def __init__(self, device, conf): super(EspSender, self).__init__() self.daemon = True self.conf = conf self.debugAck = self.conf.ESP_FORCE_ACK_DEBUG self.out_queue = Queue.Queue() self.signal_queue = Queue.Queue() self.device = device self.ip = str(self.conf.ipNetwork[device]) self.last_send = -1 self.pin_values = {} print "#" + str(device) + " UDP to " + str(self.ip)
def __init__(self, device, conf): super(EspHandler, self).__init__() self.daemon = True self.conf = conf self.sender = None self.device = None self.in_queue = None self.pin_last_command = {} if (device > 0) and (self.conf.ipNetwork is not None) and (device < self.conf.ipNetwork.size): self.device = device self.sender = EspSender(device, self.conf) self.sender.start() self.in_queue = Queue.Queue() print "#" + str(device) + " Handler to " + str(self.getIp()) else: print "#" + str(device) + " Invalid device or net: " + str(self.conf.ipNetwork)
def __init__(self, *args, **kwargs): self.tls = threading.local() self.version_table = {10: 'HTTP/1.0', 11: 'HTTP/1.1'} self.http_request = HTTPRequest() # Address to time series backend backend_host, backend_port = self.backend_address self.backend_netloc = "{}:{}".format(backend_host, backend_port) self.path = None self.connection = None self.rfile = None self.wfile = None self.close_connection = 0 BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def __init__(self, request, client_address, server): BaseHTTPRequestHandler.__init__(self, request, client_address, server)
def __init__(self, *args, **kwargs): self.tls = threading.local() self.tls.conns = {} BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def __init__(self, resources, *args, **kwargs): """ :arg resources: A dict of resource paths pointing to content bytes """ self.resources = resources BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def __init__(self, config, duration, errors, *args, **kwargs): self._config = config self._duration = duration self._errors = errors BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def __init__(self, server_address, RequestHandlerClass, thread = {}): HTTPServer.__init__(self, server_address, RequestHandlerClass) self.thread = thread
def __init__(self, name=None): Thread.__init__(self, name=name) self.daemon = True self.event = Event() self.degree = 90 self.direction = None self.servo = ServoSG90() self.channel = {'left':0, 'right':1, 'middle':2, 'claw':3} self.name = name self.servo.set_pwm_freq(60) self.servo.set_degree(self.channel[name], 90)
def __init__(self, request, client_address, server): self.tls = threading.local() self.tls.conns = {} BaseHTTPRequestHandler.__init__(self, request, client_address, server) # noinspection PyShadowingBuiltins
def __init__(self, conf): super(EspNetHandler, self).__init__() self.daemon = True self.conf = conf self.debugAck = self.conf.ESP_FORCE_ACK_DEBUG self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.bind((conf.ESP_LISTEN_IP, self.conf.ESP_LISTEN_PORT)) print("Listening ESP on " + self.conf.ESP_LISTEN_IP + ":" + str(conf.ESP_LISTEN_PORT) + ", sending to :" + str(conf.ESP_PORT))
def MakeHandlerClass(base_url): #This class will handles any incoming request from #the JMX service # Needed during installation of the JAR class CustomHandler(BaseHTTPRequestHandler): def __init__(self, *args, **kwargs): self._base_url = base_url BaseHTTPRequestHandler.__init__(self, *args, **kwargs) #Handler for the GET requests def do_GET(self): if self.path=="/": mlet_code = '<html><mlet code="de.siberas.lab.SiberasPayload" archive="siberas_mlet.jar" name="Siberas:name=payload,id=1" codebase="' + self._base_url + '"></mlet></html>' self.send_response(200) self.send_header('Pragma', 'no-cache') self.end_headers() self.wfile.write(mlet_code) elif self.path=="/siberas_mlet.jar": f = open("./payloads/siberas_mlet.jar") self.send_response(200) self.send_header('Content-type', 'application/jar') self.end_headers() self.wfile.write(f.read()) f.close() else: self.send_error(404, 'File not found: ' + self.path) # # except IOError: # self.send_error(404,'File Not Found: %s' % self.path) return CustomHandler ### /INSTALL MODE ### ### UNINSTALL MODE ###
def __init__(self,PORT): self.port = PORT self.logger = Logger('server.py') self.run()
def __init__(self, request, client_address, server): self.sqlite = sqlite3.connect(DB_CONFIG['SQLITE']) self.table_name = 'proxy' BaseHTTPRequestHandler.__init__(self, request, client_address, server)
def __init__(self, request, client_address, server): self.timeout = 1 BaseHTTPRequestHandler.__init__(self, request, client_address, server)
def __init__(self, configfile, pidfile): self.configfile = configfile self.receiver = None self.config = None Daemon.__init__(self, pidfile)
def __init__(self, port): self.port = int(port) self.run()
def __init__(self, request, client_address, server): try: self.sqlite = DatabaseObject(DB_CONFIG['SQLITE']) self.table_name = 'proxy' except Exception, e: self.sqlite = '' logger.error('SQLite error: %s', e) BaseHTTPRequestHandler.__init__(self, request, client_address, server)
def __init__(self, appRequestHandler, *args, **kwargs): self.handleRequest = appRequestHandler BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def __init__(self, config): super(AgentApi, self).__init__(config) self._enabled = config.fetch('agent', 'enabled', bool) self.host = config.fetch('agent', 'host') self.port = config.fetch('agent', 'port', int)
def __init__(self, config): def handler(*args): AgentApiHandler(config, *args) server = HTTPServer((config.host, config.port), handler) server.serve_forever()
def __init__(self, config, *args): self.sender = config.sender BaseHTTPRequestHandler.__init__(self, *args)
def __init__(self, filelike, blksize=8192): self.filelike = filelike self.blksize = blksize if hasattr(filelike,'close'): self.close = filelike.close
def __init__(self,headers): if not isinstance(headers, list): raise TypeError("Headers must be a list of name/value tuples") self._headers = headers
def __init__(self, stdin, stdout, stderr, environ, multithread=True, multiprocess=False): self.stdin = stdin self.stdout = stdout self.stderr = stderr self.base_env = environ self.wsgi_multithread = multithread self.wsgi_multiprocess = multiprocess
def __init__(self, *args, **kwargs): from google.appengine._internal.django.conf import settings self.admin_media_prefix = settings.ADMIN_MEDIA_PREFIX # We set self.path to avoid crashes in log_message() on unsupported # requests (like "OPTIONS"). self.path = '' self.style = color_style() BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def __init__(self, application, media_dir=None): from google.appengine._internal.django.conf import settings self.application = application if not media_dir: import django self.media_dir = os.path.join(django.__path__[0], 'contrib', 'admin', 'media') else: self.media_dir = media_dir self.media_url = settings.ADMIN_MEDIA_PREFIX
def __init__(self, callback, handlerClass, port = 8881, address='0.0.0.0'): threading.Thread.__init__(self) self._stop = False self._port = port self._addr = address self._handlerClass = handlerClass self._handlerArgs = callback; def handler(*args): return self._handlerClass(self._handlerArgs, *args) self._httpd = HTTPServer((self._addr, self._port), handler)
def __init__(self, callback, *args): self._callback = callback BaseHTTPRequestHandler.__init__(self, *args)
def __init__(self, interface, target, gateway): print banner print self.interface = interface print "[+] Interface: {}".format(self.interface) def nic_ip(interface): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, struct.pack('256s', interface[:15]) )[20:24]) except IOError: print "[!] Select a valid network interface, exiting ..." exit(0) def nic_mac(interface): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', interface[:15])) return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1] self.hostIP = nic_ip(self.interface) print "[+] This host IP Address: {}".format(self.hostIP) self.hostMAC = nic_mac(self.interface) print "[+] This host MAC Address: {}".format(self.hostMAC) def resolve_mac(ip): try: conf.verb = 0 ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(op="who-has", pdst=ip), timeout=2) for snd, rcv in ans: return str(rcv[Ether].src) except socket.gaierror: print "[!] Select a valid IP Address as target/gateway, exiting ..." exit(0) self.targetIP = target print "[+] Target IP Address: {}".format(self.targetIP) self.targetMAC = resolve_mac(self.targetIP) print "[+] Target MAC Address: {}".format(self.targetMAC) self.gatewayIP = gateway print "[+] Gateway IP Address: {}".format(self.gatewayIP) self.gatewayMAC = resolve_mac(self.gatewayIP) print "[+] Gateway MAC Address: {}".format(self.gatewayMAC) #if not self.targetMAC or not self.gatewayMAC: # print "[!] Failed to resolve MAC Address, check if IP Address is online, exiting ..." #exit(0) animation = "|/-\\" for i in range(15): time.sleep(0.1) sys.stdout.write("\r" + "[" + animation[i % len(animation)] + "]" + " Loading SSL Kill ...") sys.stdout.flush() self.ArpPoisoner() sys.stdout.write("\n[+] ARP Poisoner thread loaded") self.DnsPoisoner() print "\n[+] DNS Poisoner thread loaded" if debug: print "\n[+]Debugger is on!" else: print "\n[-]Debugger is off!"
def __init__(self): self.param = { "SCRATCH_IP": "127.0.0.1", "SCRATCH_PORT": 58266, "ESP_PORT": 9876, "ESP_LISTEN_IP": "", "ESP_LISTEN_PORT": 9877, "ESP_BUFFER": 1024, "ESP_PRECISION": 0.001, "ESP_ACK_TIMEOUT": 0.200, "ESP_POLL_INTERVAL": 0.500, "ESP_POLL_CMD": "poll", "ESP_FORCE_ACK_DEBUG": False, "ESP_POLL_NL": 64, } self.const = { # Pin modes. # except from UNAVAILABLE taken from Firmata.h "UNAVAILABLE": -1, "INPUT": 0, # as defined in wiring.h "OUTPUT": 1, # as defined in wiring.h "ANALOG": 2, # analog pin in analogInput mode "PWM": 3, # digital pin in PWM output mode "SERVO": 4, # digital pin in SERVO mode # Pin digit values "LOW": 0, "HIGH": 1, # Response to Scratch "RESPONSE_OK": "okay" } self.name2id = {} self.id2name = {} self.name_value_re = re.compile(r"^name ([\w-]*)|;name ([\w-]*)") self.processOptions() self.code_to_mode = { self.UNAVAILABLE: "UNAVAILABLE", self.INPUT: "Digital%20Input", self.OUTPUT: "Digital%20Output", self.ANALOG: "Analog%20Input", self.PWM: "Analog%20Output(PWM)", self.SERVO: "Servo" } self.mode_to_code = {} for c, m in self.code_to_mode.iteritems(): self.mode_to_code[m] = c self.ipNetwork = None self.espHandlersByDevice = {} self.espHandlersByIp = {} self.lastCmdTs = -1
def __init__(self, config): """ Read timeout and database configurations from config object """ self.config = config self.buffer = {} self.addtimes = {} self.active_keys = [] self.lock = Lock() self.timeout = 600 if self.config.has_section("CacheConfig"): if self.config.has_option("CacheConfig", "timeout"): self.timeout = self.config.getint("CacheConfig", "timeout") self.oldest = None self.batch_ready = Queue() self.send_in_progress = False self.heads = {"Content-Type" : "application/octet-stream"} self.dbs = {} self.db_hosts = {} self.def_db_host = { "hostname" : "localhost", "port" : 8086, "username" : "testuser", "password" : "testpass", "maxsend" : 1000, "batch" : 10, "maxcache" : 2000, "timeout" : 10, "maxdbs" : 1000, "activedbs" : [], "exclude" : "", "create_db" : False} self.read_db("AdminDB") if self.config.has_section("SplitConfig"): if self.config.has_option("SplitConfig", "dbentries"): l_dbentries = re.split("\s*,\s*", self.config.get("SplitConfig", "dbentries")) for db in l_dbentries: splitdb = "SplitDB-%s" % db self.read_db(splitdb)