我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用socket.write()。
def __mod__(self, fields): """Interpolation into ``UrlEncoded``s is disabled. If you try to write ``UrlEncoded("%s") % "abc", will get a ``TypeError``. """ raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self): """Returns an open connection (socket) to the Splunk instance. This method is used for writing bulk events to an index or similar tasks where the overhead of opening a connection multiple times would be prohibitive. :returns: A socket. **Example**:: import splunklib.binding as binding c = binding.connect(...) socket = c.connect() socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to") socket.write("Host: %s:%s\\r\\n" % (c.host, c.port)) socket.write("Accept-Encoding: identity\\r\\n") socket.write("Authorization: %s\\r\\n" % c.token) socket.write("X-Splunk-Input-Mode: Streaming\\r\\n") socket.write("\\r\\n") """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if self.scheme == "https": sock = ssl.wrap_socket(sock) sock.connect((socket.gethostbyname(self.host), self.port)) return sock
def _write_int(value, stream): stream.write(struct.pack("!i", value))
def main(socket): while (True): input_length = _read_int(socket) data = socket.read(input_length) result = transform(data) resultBytes = result.encode() _write_int(len(resultBytes), socket) socket.write(resultBytes) socket.flush()
def _apns_send(app_id, token, alert, badge=None, sound=None, category=None, content_available=False, action_loc_key=None, loc_key=None, loc_args=[], extra={}, identifier=0, expiration=None, priority=10, socket=None): data = {} aps_data = {} if action_loc_key or loc_key or loc_args: alert = {"body": alert} if alert else {} if action_loc_key: alert["action-loc-key"] = action_loc_key if loc_key: alert["loc-key"] = loc_key if loc_args: alert["loc-args"] = loc_args if alert is not None: aps_data["alert"] = alert if badge is not None: aps_data["badge"] = badge if sound is not None: aps_data["sound"] = sound if category is not None: aps_data["category"] = category if content_available: aps_data["content-available"] = 1 data["aps"] = aps_data data.update(extra) # convert to json, avoiding unnecessary whitespace with separators (keys sorted for tests) json_data = json.dumps(data, separators=(",", ":"), sort_keys=True).encode("utf-8") max_size = SETTINGS["APNS_MAX_NOTIFICATION_SIZE"] if len(json_data) > max_size: raise APNSDataOverflow("Notification body cannot exceed %i bytes" % (max_size)) # if expiration isn't specified use 1 month from now expiration_time = expiration if expiration is not None else int(time.time()) + 2592000 frame = _apns_pack_frame(token, json_data, identifier, expiration_time, priority) if socket: socket.write(frame) else: with closing(_apns_create_socket_to_push(app_id)) as socket: socket.write(frame) _apns_check_errors(socket)