我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用platform.node()。
def __init__(self, base_dir, config, backup_time, seed_uri, argv=None): StateBase.__init__(self, base_dir, config) self.base_dir = base_dir self.state['backup'] = True self.state['completed'] = False self.state['name'] = backup_time self.state['method'] = config.backup.method self.state['path'] = base_dir self.state['cmdline'] = argv self.state['config'] = config.dump() self.state['version'] = config.version self.state['git_commit'] = config.git_commit self.state['host'] = { 'hostname': platform.node(), 'uname': platform.uname(), 'python': { 'build': platform.python_build(), 'version': platform.python_version() } } self.state['seed'] = { 'uri': seed_uri.str(), 'replset': seed_uri.replset } self.init()
def get_platform_info(self): """ Information regarding the computer where the fuzzer is running """ try: node_properties = { 'node_name' : platform.node(), 'os_release': platform.release(), 'os_version': platform.version(), 'machine' : platform.machine(), 'processor' : platform.processor() } except: self.ae.m_alert('[x] Error getting platform information') return None return node_properties
def render(self): view = View('none') box = BoxElement(title='widget.info.header', icon='server', padding=False) table = TableElement( content=[ ('widget.info.os', '%s %s (%s)' % (platform.system(), platform.release(), platform.architecture()[0])), ('widget.info.hostname', platform.node()), ('widget.info.ipaddress', prism.settings.PRISM_CONFIG['host']), ('widget.info.uptime', self.get_uptime()) ] ) box.add(table) view.add(box) return view
def parent(self, vcdir, abortOnError=True): self.check_for_hg() # We don't use run because this can be called very early before _opts is set try: out = subprocess.check_output(['hg', '-R', vcdir, 'parents', '--template', '{node}\n']) parents = out.rstrip('\n').split('\n') if len(parents) != 1: if abortOnError: abort('hg parents returned {} parents (expected 1)'.format(len(parents))) return None return parents[0] except subprocess.CalledProcessError: if abortOnError: abort('hg parents failed') else: return None
def latest(self, vcdir, rev1, rev2, abortOnError=True): #hg log -r 'heads(ancestors(26030a079b91) and ancestors(6245feb71195))' --template '{node}\n' self.check_for_hg() try: revs = [rev1, rev2] revsetIntersectAncestors = ' or '.join(('ancestors({})'.format(rev) for rev in revs)) revset = 'heads({})'.format(revsetIntersectAncestors) out = subprocess.check_output(['hg', '-R', vcdir, 'log', '-r', revset, '--template', '{node}\n']) parents = out.rstrip('\n').split('\n') if len(parents) != 1: if abortOnError: abort('hg log returned {} possible latest (expected 1)'.format(len(parents))) return None return parents[0] except subprocess.CalledProcessError: if abortOnError: abort('latest failed') else: return None
def writexml(self, writer, indent="", addindent="", newl=""): writer.write(indent + "<" + self.tagName) attrs = self._get_attributes() a_names = attrs.keys() a_names.sort() for a_name in a_names: writer.write(" %s=\"" % a_name) xml.dom.minidom._write_data(writer, attrs[a_name].value) writer.write("\"") if self.childNodes: if not self.ownerDocument.padTextNodeWithoutSiblings and len(self.childNodes) == 1 and isinstance(self.childNodes[0], xml.dom.minidom.Text): # if the only child of an Element node is a Text node, then the # text is printed without any indentation or new line padding writer.write(">") self.childNodes[0].writexml(writer) writer.write("</%s>%s" % (self.tagName, newl)) else: writer.write(">%s" % (newl)) for node in self.childNodes: node.writexml(writer, indent + addindent, addindent, newl) writer.write("%s</%s>%s" % (indent, self.tagName, newl)) else: writer.write("/>%s" % (newl))
def __init__(self, qualname, node, comm=None): # add the testname self.qualname = qualname self.testname = node.name # NOTE: this will include parametrized ID # add the qualified test path for output file # (removes any parametrization IDs) self.original_testname = node.originalname if self.original_testname is None: self.original_testname = self.testname # the name of the file this function should be written too self.filename = '.'.join(self.qualname.split('.')[:-1] + [self.original_testname]) self.comm = comm # store benchmarks here self.benchmark = {'filename':self.filename, 'testname':self.testname} self.benchmark['tags'] = [] # store meta-data here self.attrs = {}
def _hostname_linux(self): """ Returns system hostname. """ # Default value found using platform hostname = platform.node() try: # Try to get hostname (FQDN) using 'hostname -f' (rc, out, err) = exec_command([which('hostname'), '-f']) if rc == 0: hostname = out.encode('utf-8').strip() except Exception: try: # Try to get hostname (FQDN) using socket module (hostname, _, _) = socket.gethostbyaddr(socket.gethostname()) hostname = hostname.strip() except Exception: pass return hostname
def get_auth_token(cls, login, password, prompt=None): import platform if cls.fqdn != GITHUB_COM_FQDN: gh = github3.GitHubEnterprise() else: gh = github3.GitHub() gh.login(login, password, two_factor_callback=lambda: prompt('2FA code> ')) try: auth = gh.authorize(login, password, scopes=[ 'repo', 'delete_repo', 'gist' ], note='git-repo2 token used on {}'.format(platform.node()), note_url='https://github.com/guyzmo/git-repo') return auth.token except github3.models.GitHubError as err: if len(err.args) > 0 and 422 == err.args[0].status_code: raise ResourceExistsError("A token already exist for this machine on your github account.") else: raise err
def test_uname_win32_ARCHITEW6432(self): # Issue 7860: make sure we get architecture from the correct variable # on 64 bit Windows: if PROCESSOR_ARCHITEW6432 exists we should be # using it, per # http://blogs.msdn.com/david.wang/archive/2006/03/26/HOWTO-Detect-Process-Bitness.aspx try: with support.EnvironmentVarGuard() as environ: if 'PROCESSOR_ARCHITEW6432' in environ: del environ['PROCESSOR_ARCHITEW6432'] environ['PROCESSOR_ARCHITECTURE'] = 'foo' platform._uname_cache = None system, node, release, version, machine, processor = platform.uname() self.assertEqual(machine, 'foo') environ['PROCESSOR_ARCHITEW6432'] = 'bar' platform._uname_cache = None system, node, release, version, machine, processor = platform.uname() self.assertEqual(machine, 'bar') finally: platform._uname_cache = None
def __init__(self, root, depth, losses, extra, **kwargs): self.losses = losses self.losses_batch = copy.deepcopy(losses) self.extra = extra # self.f_epoch = open("/net/vena/scratch/Dropbox/GIT/DeepLearning/losses/{0}_loss.json".format(platform.node()), 'w') # self.f_batch = open("/net/vena/scratch/Dropbox/GIT/DeepLearning/losses/{0}_loss_batch.json".format(platform.node()), 'w') # self.f_epoch.write('['+json.dumps(self.extra)) # self.f_batch.write('['+json.dumps(self.extra)) self.f_epoch_local = open("{0}_{1}_loss.json".format(root, depth), 'w') self.f_batch_local = open("{0}_{1}_loss_batch.json".format(root, depth), 'w') self.f_epoch_local.write('['+json.dumps(self.extra)) self.f_batch_local.write('['+json.dumps(self.extra)) # flush_file(self.f_batch) flush_file(self.f_batch_local) # flush_file(self.f_epoch) flush_file(self.f_epoch_local)
def home(): if not session.get('logged_in'): return render_template('login.html') else: sys_data = {"current_time": '',"machine_name": ''} mycroft_skills = "" try: sys_data['current_time'] = subprocess.check_output(['date'], shell=True) sys_data['machine_name'] = platform.node() sys_data['mycroft_version'] = subprocess.check_output(['dpkg -s mycroft-core | grep Version'], shell=True) mycroft_skills = sorted(os.listdir('/opt/mycroft/skills')) sys_data['skills_log'] = subprocess.check_output(['tail -n 10 /var/log/mycroft-skills.log'], shell=True) print(type(sys_data['skills_log'])) disk_usage_info = disk_usage_list() except Exception as ex: print(ex) finally: return render_template("index.html", title='Mark 1 - System Information', sys_data = sys_data, name=login['username'], mycroft_skills=mycroft_skills)
def index(): sys_data = {"current_time": '',"machine_name": ''} try: sys_data['current_time'] = datetime.now().strftime("%d-%b-%Y , %I : %M : %S %p") sys_data['machine_name'] = platform.node() cpu_genric_info = cpu_generic_details() disk_usage_info = disk_usage_list() running_process_info = running_process_list() except Exception as ex: print ex finally: return render_template("index.html", title='Mark 1 - System Information', sys_data = sys_data, cpu_genric_info = cpu_genric_info, disk_usage_info= disk_usage_info, running_process_info = running_process_info)
def main(): parser = argparse.ArgumentParser(description='email notification') parser.add_argument('--email', '-e', help='email address', required=True) parser.add_argument('--process', '-p', help='process', required=True) parser.add_argument('--title', '-t', help='email content', default='Your process is done') parser.add_argument('--content', '-c', help='email content') args = parser.parse_args() host = platform.node() process = args.process email = args.email title = args.title content = args.content if args.content else '{} is done'.format(process) print commands.getoutput('ps -ef | grep %s' % process).split('\n') while any([True if x.find('python %s' % process) >= 0 else False for x in commands.getoutput('ps -ef | grep %s' % process).split('\n')]): print 'sleep ...' sleep(300) send_email(title, host, email, content)
def test_uname_win32_ARCHITEW6432(self): # Issue 7860: make sure we get architecture from the correct variable # on 64 bit Windows: if PROCESSOR_ARCHITEW6432 exists we should be # using it, per # http://blogs.msdn.com/david.wang/archive/2006/03/26/HOWTO-Detect-Process-Bitness.aspx try: with test_support.EnvironmentVarGuard() as environ: if 'PROCESSOR_ARCHITEW6432' in environ: del environ['PROCESSOR_ARCHITEW6432'] environ['PROCESSOR_ARCHITECTURE'] = 'foo' platform._uname_cache = None system, node, release, version, machine, processor = platform.uname() self.assertEqual(machine, 'foo') environ['PROCESSOR_ARCHITEW6432'] = 'bar' platform._uname_cache = None system, node, release, version, machine, processor = platform.uname() self.assertEqual(machine, 'bar') finally: platform._uname_cache = None
def log_error(step, content): ''' This function logs a page for error. It is called whenever an unhandled exception will occur. `step`: The step in login process, it could be normal_login, step_two_login or select_alternate. It makes it easy to identify the file in logs. `content`: content to log. ''' file_name = step + "- " + time.strftime("%d-%m-%Y %H-%M-%S") + ".html" f = open(log_dir+file_name, 'w') f.write(content) f.close() # hostname of the machine where the py-google-auth is running hostname = platform.node() return file_name, hostname
def test_task_complete(mocked_plugin, tpool, configure_model, freeze_time): tsk = create_task() polled = {"a": False} def side_effect(*args, **kwargs): if polled["a"]: return False polled["a"] = True return True mocked_plugin.alive.side_effect = side_effect tpool.submit(tsk) assert tsk._id in tpool.data time.sleep(2) tsk.refresh() assert tsk.executor_host == platform.node() assert tsk.executor_pid == 100 assert tsk.time_completed == int(freeze_time.return_value) assert not tsk.time_failed assert not tsk.time_cancelled assert not tpool.global_stop_event.is_set() assert tsk._id not in tpool.data
def test_task_cancelled(mocked_plugin, tpool, configure_model, freeze_time): tsk = create_task() polled = {"a": False} def side_effect(*args, **kwargs): if polled["a"]: return False polled["a"] = True return True mocked_plugin.alive.side_effect = side_effect tpool.submit(tsk) tpool.cancel(tsk._id) time.sleep(2) tsk.refresh() assert tsk.executor_host == platform.node() assert tsk.executor_pid == 100 assert tsk.time_cancelled == int(freeze_time.return_value) assert not tsk.time_completed assert not tsk.time_failed assert not tpool.global_stop_event.is_set() assert tsk._id not in tpool.data
def test_task_failed_exit(mocked_plugin, tpool, configure_model, freeze_time): tsk = create_task() polled = {"a": False} def side_effect(*args, **kwargs): if polled["a"]: return False polled["a"] = True return True mocked_plugin.alive.side_effect = side_effect mocked_plugin.returncode = os.EX_SOFTWARE tpool.submit(tsk) time.sleep(2) tsk.refresh() assert tsk.executor_host == platform.node() assert tsk.executor_pid == 100 assert tsk.time_failed == int(freeze_time.return_value) assert not tsk.time_cancelled assert not tsk.time_completed assert not tpool.global_stop_event.is_set() assert tsk._id not in tpool.data
def package_node(root=None,name=None): '''package node aims to package a (present working node) for a user into a container. This assumes that the node is a single partition. :param root: the root of the node to package, default is / :param name: the name for the image. If not specified, will use machine's psutil.disk_partitions() ''' if name is None: name = platform.node() if root is None: root = "/" tmpdir = tempfile.mkdtemp() image = "%s/%s.tgz" %(tmpdir,name) print("Preparing to package root %s into %s" %(root,name)) cmd = ["tar","--one-file-system","-czvSf", image, root,"--exclude",image] output = run_command(cmd) return image
def get_sys_info(): info = """ Machine: {}\nVersion: {} Platform: {}\nNode: {}\nUname: {}\nSystem: {} Processor: {}\n\nHost Name: {}\nFQDN: {}\n """.format( platform.machine(), platform.version(), platform.platform(), platform.node(), platform.uname(), platform.system(), platform.processor(), socket.gethostname(), socket.getfqdn() ) return info
def _set_spec(self): """Set spec values""" json_obj = self._container_specjson json_obj["root"]["path"] = os.path.realpath(self.container_root) json_obj["root"]["readonly"] = False if "." in self.opt["hostname"]: json_obj["hostname"] = self.opt["hostname"] else: json_obj["hostname"] = platform.node() if self.opt["cwd"]: json_obj["process"]["cwd"] = self.opt["cwd"] json_obj["process"]["env"] = [] for env_str in self.opt["env"]: (env_var, value) = env_str.split("=", 1) if env_var: json_obj["process"]["env"].append("%s=%s" % (env_var, value)) for idmap in json_obj["linux"]["uidMappings"]: if "hostID" in idmap: idmap["hostID"] = Config.uid for idmap in json_obj["linux"]["gidMappings"]: if "hostID" in idmap: idmap["hostID"] = Config.gid json_obj["process"]["args"] = self._remove_quotes(self.opt["cmd"]) return json_obj
def __init__(self): self.modes = ['maya', 'houdini', '3dsmax', 'nuke', 'standalone'] self.status = False self.current_mode = 'standalone' # self.current_project = 'sthpw' self.current_path = None self.get_current_path() self.platform = platform.system() self.node = platform.node()
def build_payload(data, request, response): payload = { 'node': platform.node(), 'content-type': response.get('content-type'), 'ip': get_ip(request), 'method': request.method, 'params': dict(request.GET), 'path': request.path, 'status': response.status_code, 'time': datetime.utcnow(), } resolver_match = getattr(request, 'resolver_match', None) if resolver_match: payload.update({ 'view': resolver_match.view_name, }) user = getattr(request, 'user', None) if user: payload.update({ 'user_id': user.id, }) payload.update(data) payload.pop('start_time', None) payload.pop('end_time', None) payload.pop('db_record_stacks', None) return payload
def get_information(): information = '#####################################################\n' information += ('System: %s\n') % (platform.system()) information += ('Machine: %s\n') % (platform.machine()) information += ('Node: %s\n') % (platform.node()) information += ('Release: %s\n') % (platform.release()) information += ('Version: %s\n') % (platform.version()) information += ('Platform: %s\n') % (platform.platform()) information += ('%s version: %s\n') % (comun.APPNAME, comun.VERSION) information += '#####################################################\n' return information
def hostname(): return platform.node()
def defaults_hostname(self): if self.model.hostname is None: self.model.hostname = platform.node().split('.')[0]
def execute(self, args, console): print 'architecture:', platform.architecture()[0] print 'java_ver:', platform.java_ver() print 'node:', platform.node() print 'processor:', platform.processor() print 'python_compiler:', platform.python_compiler() print 'python_implementation:', platform.python_implementation() print 'python_version:', platform.python_version() print 'release:', platform.release()
def machineInfo(): machineDict = {"pythonVersion": sys.version, "node": platform.node(), "OSRelease": platform.release(), "OSVersion": platform.platform(), "processor": platform.processor(), "machineType": platform.machine(), "env": os.environ, "syspaths": sys.path} return machineDict
def UpdateRecord(rinex, path): try: cnn = dbConnection.Cnn('gnss_data.cfg') Config = pyOptions.ReadOptions('gnss_data.cfg') rnxobj = pyRinex.ReadRinex(rinex['NetworkCode'], rinex['StationCode'], path) date = pyDate.Date(year=rinex['ObservationYear'], doy=rinex['ObservationDOY']) if not verify_rinex_date_multiday(date, rnxobj, Config): cnn.begin_transac() # propagate the deletes cnn.query( 'DELETE FROM gamit_soln WHERE "NetworkCode" = \'%s\' AND "StationCode" = \'%s\' AND "Year" = %i AND "DOY" = %i' % (rinex['NetworkCode'], rinex['StationCode'], rinex['ObservationYear'], rinex['ObservationDOY'])) cnn.query( 'DELETE FROM ppp_soln WHERE "NetworkCode" = \'%s\' AND "StationCode" = \'%s\' AND "Year" = %i AND "DOY" = %i' % (rinex['NetworkCode'], rinex['StationCode'], rinex['ObservationYear'], rinex['ObservationDOY'])) cnn.query( 'DELETE FROM rinex WHERE "NetworkCode" = \'%s\' AND "StationCode" = \'%s\' AND "ObservationYear" = %i AND "ObservationDOY" = %i' % (rinex['NetworkCode'], rinex['StationCode'], rinex['ObservationYear'], rinex['ObservationDOY'])) cnn.commit_transac() return 'Multiday rinex file moved out of the archive: ' + rinex['NetworkCode'] + '.' + rinex['StationCode'] + ' ' + str(rinex['ObservationYear']) + ' ' + str(rinex['ObservationDOY']) + ' using node ' + platform.node() else: cnn.update('rinex', rinex, Completion=rnxobj.completion) except Exception: return traceback.format_exc() + ' processing rinex: ' + rinex['NetworkCode'] + '.' + rinex['StationCode'] + ' ' + str(rinex['ObservationYear']) + ' ' + str(rinex['ObservationDOY']) + ' using node ' + platform.node()
def __init__(self, **kwargs): dict.__init__(self) self['EventDate'] = datetime.datetime.now() self['EventType'] = 'info' self['NetworkCode'] = None self['StationCode'] = None self['Year'] = None self['DOY'] = None self['Description'] = '' self['node'] = platform.node() self['stack'] = None module = inspect.getmodule(inspect.stack()[1][0]) if module is None: self['module'] = inspect.stack()[1][3] # just get the calling module else: self['module'] = module.__name__ + '.' + inspect.stack()[1][3] # just get the calling module # initialize the dictionary based on the input for key in kwargs: if key not in self.keys(): raise Exception('Provided key not in list of valid fields.') arg = kwargs[key] self[key] = arg if self['EventType'] == 'error': self['stack'] = ''.join(traceback.format_stack()[0:-1]) # print the traceback until just before this call else: self['stack'] = None
def insert_warning(self, desc): line = inspect.stack()[1][2] caller = inspect.stack()[1][3] mod = platform.node() module = '[%s:%s(%s)]\n' % (mod, caller, str(line)) # get the module calling for insert_warning to make clear how is logging this message self.insert_event_bak('warn', module, desc)
def insert_info(self, desc): line = inspect.stack()[1][2] caller = inspect.stack()[1][3] mod = platform.node() module = '[%s:%s(%s)]\n' % (mod, caller, str(line)) self.insert_event_bak('info', module, desc)
def run_test(config, args): """ Run the 'test' subcommand :param config: configuration :type config: :py:class:`~.Config` :param args: command line arguments :type args: :py:class:`argparse.Namespace` """ base_url = get_base_url(config, args) logger.debug('API base url: %s', base_url) endpoints = config.get('endpoints') if args.endpoint_name is not None: endpoints = { args.endpoint_name: endpoints[args.endpoint_name] } dt = datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') data = { 'message': 'testing via webhook2lambda2sqs CLI', 'version': VERSION, 'host': node(), 'datetime': dt } for ep in sorted(endpoints): url = base_url + ep + '/' print('=> Testing endpoint %s with %s: %s' % ( url, endpoints[ep]['method'], pformat(data)) ) if endpoints[ep]['method'] == 'POST': res = requests.post(url, json=data) elif endpoints[ep]['method'] == 'GET': res = requests.get(url, params=data) else: raise Exception('Unimplemented method: %s' '' % endpoints[ep]['method']) print('RESULT: HTTP %d' % res.status_code) for h in sorted(res.headers): print('%s: %s' % (h, res.headers[h])) print("\n%s\n" % res.content)
def init_with_context(self, context): upMinionCount = Host.objects.filter(minion_status=1).count() downMinionCount = Host.objects.filter(minion_status=0).count() self.hostname = platform.node() self.system_info = '%s, %s, %s' % ( platform.system(), ' '.join(platform.linux_distribution()), platform.release()) self.arch = ' '.join(platform.architecture()) self.procesor = platform.processor(), self.py_version = platform.python_version() self.host_count = Host.objects.count() self.buss_count = Project.objects.count() self.minions_status = '??? %s,??? %s' % (upMinionCount, downMinionCount)
def tip(self, vcdir, abortOnError=True): self.check_for_hg() # We don't use run because this can be called very early before _opts is set try: return subprocess.check_output(['hg', 'tip', '-R', vcdir, '--template', '{node}']) except subprocess.CalledProcessError: if abortOnError: abort('hg tip failed') else: return None
def release_version(self, snapshotSuffix='dev'): """ Gets the release tag from VC or create a time based once if VC is unavailable """ if not self._releaseVersion: _version = self._get_early_suite_dict_property('version') if not _version: _version = self.vc.release_version_from_tags(self.vc_dir, self.name, snapshotSuffix=snapshotSuffix) if not _version: _version = 'unknown-{0}-{1}'.format(platform.node(), time.strftime('%Y-%m-%d_%H-%M-%S_%Z')) self._releaseVersion = _version return self._releaseVersion
def system(inp): """system -- Retrieves information about the host system.""" hostname = platform.node() os = platform.platform() python_imp = platform.python_implementation() python_ver = platform.python_version() architecture = '-'.join(platform.architecture()) cpu = platform.machine() with open('/proc/uptime', 'r') as f: uptime_seconds = float(f.readline().split()[0]) uptime = str(timedelta(seconds = uptime_seconds)) return "Hostname: \x02{}\x02, Operating System: \x02{}\x02, Python " \ "Version: \x02{} {}\x02, Architecture: \x02{}\x02, CPU: \x02{}" \ "\x02, Uptime: \x02{}\x02".format(hostname, os, python_imp, python_ver, architecture, cpu, uptime)
def get_admin_command(): # no cov if ON_WINDOWS: return [ 'runas', r'/user:{}\{}'.format( platform.node() or os.environ.get('USERDOMAIN', ''), os.environ.get('_DEFAULT_ADMIN_', 'Administrator') ) ] # Should we indeed use -H here? else: admin = os.environ.get('_DEFAULT_ADMIN_', '') return ['sudo', '-H'] + (['--user={}'.format(admin)] if admin else [])
def process_send_data(socket, context): """ Send all memory, cpu, disk, network data of computer to server(master node) """ while True: try: cpu_percent = psutil.cpu_percent(interval=1, percpu=True) memory_info = psutil.virtual_memory() disk_info = psutil.disk_usage('/') pids_computer = psutil.pids() info_to_send = json.dumps({ "computer_utc_clock": str(datetime.datetime.utcnow()), "computer_clock": str(datetime.datetime.now()), "hostname": platform.node(), "mac_address": mac_address(), "ipv4_interfaces": internet_addresses(), "cpu": { "percent_used": cpu_percent }, "memory": { "total_bytes": memory_info.total, "total_bytes_used": memory_info.used, "percent_used": memory_info.percent }, "disk": { "total_bytes": disk_info.total, "total_bytes_used": disk_info.used, "total_bytes_free": disk_info.free, "percent_used": disk_info.percent }, "process": pids_active(pids_computer) }).encode() #send json data in the channel 'status', although is not necessary to send. socket.send_multipart([b"status", info_to_send]) #time.sleep(0.500) except (KeyboardInterrupt, SystemExit): socket.close() context.term()
def start(ip_address='127.0.0.1', port=5555): """ Connect to master node and each one second send data. """ context = zmq.Context() socket = context.socket(zmq.PUB) socket.connect("tcp://%s:%s" % (ip_address, port)) socket.sndhwm = 1 handler_signal_keyboard(socket, context) process_send_data(socket, context)