我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pika.BasicProperties()。
def publish(self, item, priority=0, retry=2): body = json.dumps(item) try: self._channel.basic_publish(exchange=u'', routing_key=self._queue_name, body=body, properties=pika.BasicProperties( delivery_mode=2, priority=priority )) except exceptions.ConnectionClosed as err: if retry <= 0: raise err self.open() self.publish(item, retry - 1)
def properties(self) -> BasicProperties: """ Build :class:`pika.BasicProperties` object """ return BasicProperties( content_type=self.content_type, content_encoding=self.content_encoding, headers=self.headers, delivery_mode=self.delivery_mode, priority=self.priority, correlation_id=self.correlation_id, reply_to=self.reply_to, expiration=str(convert_timestamp(self.expiration * 1000)) if self.expiration else None, message_id=self.message_id, timestamp=self.timestamp, type=self.type, user_id=self.user_id, app_id=self.app_id )
def send(self, message): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key = 'eagle', properties = pika.BasicProperties(\ reply_to = self.callback_queue, correlation_id =self.corr_id,), body=message ) for i in xrange(self.timeout): if self.response is None: self.connection.process_data_events() else: break time.sleep(1) return self.response
def process_rss(rss_result, message_body, redis_conn, message_queue): for result in rss_result: page_url = _convert_url(result.url, message_body['website']) in_database = _check_redis(page_url, redis_conn) message_body['title'] = result.title message_body['date'] = result.date message_body['url'] = page_url to_send = json.dumps(message_body) if not in_database: message_queue.basic_publish(exchange='', routing_key='scraper_queue', body=to_send, properties=pika.BasicProperties( delivery_mode=2,)) #Set the value within redis to expire in 3 days redis_conn.setex(page_url, 259200, 1) else: pass
def on_mutation_request(self, ch, method, props, body): """Callback for messages in the 'rpc_mutations_queue' They say: "Hey, do you have a mutation for me?" """ # This is the "remote procedure" # being called and returning a value mutation_obj = self.get_mutation() ch.basic_publish(exchange = '', routing_key = props.reply_to, properties = pika.BasicProperties( correlation_id = props.correlation_id), body = mutation_obj.serialize_me()) ch.basic_ack(delivery_tag = method.delivery_tag)
def on_evaluation_request(self, ch, method, props, body): """Callback for messages in the 'rpc_evaluations_queue' They say: "Hey, here are the execution results" """ # This is the "remote procedure" # being called and returning a value ev_mutation_object = pickle.loads(body) self.process_execution_results(ev_mutation_object) ch.basic_publish(exchange = '', routing_key = props.reply_to, properties = pika.BasicProperties( correlation_id = props.correlation_id), body = 'EVALUATION RECEIVED') ch.basic_ack(delivery_tag = method.delivery_tag)
def poll_mutation_queue(self): """ In this paradigm calling means pushing our message to the queue (the callback will take care of it) and wait for the response and process it. @returns: string, serialized MutationObject (only attributes) """ self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange = '', # default exchange routing_key = 'rpc_mutations_queue', properties = pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id), body = 'POLL MUTATION QUEUE') self.ae.m_info("[x] Sent mutation queue poll") while self.response is None: # Waiting for a response self.connection.process_data_events() return self.response
def send_evaluation(self, mutation_object): """ In this paradigm calling means pushing our message to the queue (the callback will take care of it) and wait for the response and process it. @returns: string, serialized MutationObject (only attributes) """ self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange = '', # default exchange routing_key = 'rpc_evaluations_queue', properties = pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id), # This should be a serialized # evaluation object body = mutation_object.serialize_me()) self.ae.m_info("[x] Sent evaluation") while self.response is None: # Waiting for a response self.connection.process_data_events() return self.response
def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) basicProperties = pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ) self.channel.basic_publish(exchange='', routing_key=QUEUE, properties=basicProperties, body=str(n)) while self.response is None: self.connection.process_data_events() return self.response
def on_request(ch, method, props, body): global REQ_COUNT REQ_COUNT += 1 print(" [x] Listening ... Request Number: %i" % REQ_COUNT) body = json.load(StringIO(body)) operator = body['operator'] values = body['data'] print(" [.] mathOps(%s)" % operator) response = json.dumps(mathOps(values, operator=operator), separators=(',', ':')) print(" Output: %s\n" % response) basicProperties = pika.BasicProperties(correlation_id=props.correlation_id) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=basicProperties, body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag)
def run(self, function, args=None, kwargs=None, retry_policy=None, callback=None): with self._pool.acquire() as cxn: cxn.channel.basic_publish( body=self.serialize( { "function": function, "parameters": { "args": args or tuple(), "kwargs": kwargs or {}, "retry_policy": retry_policy, "callback": callback } } ), exchange='', routing_key=self.queue_name, properties=pika.BasicProperties( delivery_mode=2, ) ) self.log(logging.DEBUG, "Task received : {}".format(function))
def request(self, n): corr_id = str(uuid.uuid4()) self.response[corr_id] = None #???????????????correlation_id self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = corr_id, ), body=str(n)) #??????? while self.response[corr_id] is None: self.connection.process_data_events() return int(self.response[corr_id])
def publish(self, message): self._message_number_out += 1 amqp_message_update_meta(message, self.get_meta()) amqp_msg = amqp_message_encode(message) log.debug("Publish message #%s, AMQP message: %s" % (self._message_number_out, amqp_msg)) properties = BasicProperties( app_id=self.app_id, content_type='application/json', content_encoding='utf-8', delivery_mode=2, # persistent ) try: yield self._channel.basic_publish( self.exchange_name, self.queue_out_routing_key, amqp_msg, properties=properties, ) except ChannelClosed: self.retry_channel() self._cached_messages.append(message) except AMQPError: self.retry_connect() self._cached_messages.append(message)
def callback(self, ch, method, properties, body): """ ????,????????????rabbitmq??? :param ch: ???self.channel :param method: :param properties:??????????? :param body:???? :return: """ before = time.monotonic() # ????????????? exec_cmd = threading.Thread(target=self.exec_call, args=(body,)) exec_cmd.start() exec_cmd.join(self.timeout) after = time.monotonic() # ????????????,???????????? if (after - before) > self.timeout: # ????????????????,??????????,??????????? self.response = bytes("command exec timeout", "utf8") print(" [*] Got a task {}".format(str(body, "utf8)"))) message = {"host": self.id, "data": self.response} ch.basic_publish(exchange="", routing_key=properties.reply_to, properties=pika.BasicProperties( correlation_id=properties.correlation_id,), body=bytes(str(message), "utf-8")) ch.basic_ack(delivery_tag=method.delivery_tag)
def message_worker(self): while 1: try: record, routing_key = self.queue.get() if not self.connection or self.connection.is_closed or not self.channel or self.channel.is_closed: self.open_connection() self.channel.basic_publish( exchange=self.exchange, routing_key=routing_key, body=self.format(record), properties=pika.BasicProperties( delivery_mode=2 ) ) except Exception: self.channel, self.connection = None, None self.handleError(record) finally: self.queue.task_done() if self.close_after_emit: self.close_connection()
def whois_push(**whois_recv_info): global channel_whois # print 'whois push:', whois_recv_info result = '' try: result = json.dumps(whois_recv_info) except UnicodeDecodeError: for key in whois_recv_info.keys(): if type(whois_recv_info[key]) == str: whois_recv_info[key] = whois_recv_info[key].decode('latin-1').encode("utf-8") result = json.dumps(whois_recv_info) if result != '': channel_whois.basic_publish( exchange='', routing_key='whois_queue', body=json.dumps(result), properties=pika.BasicProperties( delivery_mode=2) ) # ????com_manage???????whois??????xxx?????????????
def send_task(self): while True: if self.send_queue.empty()&self.handle_stoping: self.send_stop_evt.set() return if not self.send_queue.empty(): callinfo=self.send_queue.get_nowait() # ??RPC?????RPC????`rpc_queue`????????`reply_to`?`correlation_id` self._channel.basic_publish(exchange=self.Exchange, routing_key=self.Queue, properties=pika.BasicProperties( reply_to = self.callback_queue, ), body=callinfo.body) gevent.sleep(0)
def publish_submission_message(challenge_id, phase_id, submission_id): connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='evalai_submissions', type='topic') # though worker is creating the queue(queue creation is idempotent too) # but lets create the queue here again, so that messages dont get missed # later on we can apply a check on queue message length to raise some alert # this way we will be notified of worker being up or not channel.queue_declare(queue='submission_task_queue', durable=True) message = { 'challenge_id': challenge_id, 'phase_id': phase_id, 'submission_id': submission_id } channel.basic_publish(exchange='evalai_submissions', routing_key='submission.*.*', body=json.dumps(message), properties=pika.BasicProperties(delivery_mode=2)) # make message persistent print(" [x] Sent %r" % message) connection.close()
def start_consuming(self): """Exchange, channel, consumer ready to start listening""" # send rpc request self.worker_id = None self.correlation_id = uuid.uuid4().hex self._channel.basic_publish( exchange=self.exchange, routing_key='%s.worker.%s' % (self.key, self.worker_type), properties=pika.BasicProperties( reply_to=self.queue, correlation_id=self.correlation_id, content_type='application/json', ), body=json.dumps(self.worker_kwargs), ) log.info("%s: sent RPC request, will wait for response.", self.lbl) super(_HubTornadoConsumer, self).start_consuming()
def send_to_worker(self, action, msg=''): if not self.consumer.worker_id: raise Exception("Routing key not yet received in RPC response.") routing_key = '%s.%s' % (self.consumer.worker_id, action) if isinstance(msg, basestring): self.consumer._channel.basic_publish(exchange=self.exchange, routing_key=routing_key, body=msg) else: self.consumer._channel.basic_publish( exchange=self.exchange, routing_key=routing_key, properties=pika.BasicProperties( content_type='application/json', ), body=json.dumps(msg), )
def _publish(self, exchange_name, queue_name, body, priority, retry): try: self._channel.basic_publish(exchange=exchange_name, routing_key=queue_name, body=body, properties=pika.BasicProperties( delivery_mode=2, priority=priority )) except exceptions.ConnectionClosed as err: if retry <= 0: raise err self.open() self._publish(exchange_name, queue_name, body, priority, retry - 1)
def _send_signal(self, signal_name, args): # Send a signal on the exchange body = {'signal_name': signal_name, 'args': args} body = serializer.dumps(body) b = False while not b: try: self.channel.basic_publish( exchange='rebus_signals', routing_key='', body=body, properties=pika.BasicProperties(delivery_mode=2,)) b = True except pika.exceptions.ConnectionClosed: log.info("Disconnected (in _send_signal). " "Trying to reconnect...") self._reconnect() time.sleep(0.5) # TODO Check is the key is valid
def publish_message(queue_name, message): message_body = json.dumps(message) message_id = message['message_id'] connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) channel.basic_publish(exchange='', routing_key=queue_name, body=message_body, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) log_msg = "Published : [queue_name={}] [comment_id={}] [username={}] [comment_body={}]".format( queue_name, message['comment_id'], message['username'], message['comment_body']) logger.log_info_message(message_id, LogUtilityConstants.message_published_event, 'sub_monitor', log_msg) connection.close()
def publish(queue_name,body='Hello World!',exchange=''): """Publish the content or message to the queue pika.BasicProperties(delivery_mode=2) will make message persistent Args: queue_name: the mq`s name body(str) —— the content will be publish exchange: """ channel = get_channel(queue_name) if channel: channel.basic_publish(exchange=exchange, routing_key=queue_name, body=body,properties=pika.BasicProperties(delivery_mode=2)) # logger.info('ramq publish queue_name: ' + str(queue_name) + ' ,body: \n' + str(body) + '\n') # print(" [x] Sent " + body)
def put_queue_list(self, queue_name=None, message_list=None): """put queue to list""" if not queue_name: return None try: if not message_list: return None if isinstance(message_list, dict): message_list = [message_list] self.__connect() self.channel.queue_declare(queue=queue_name, durable=True) for message in message_list: message = json.dumps(message) self.channel.basic_publish( exchange='', routing_key=queue_name, body=message, properties=pika.BasicProperties(delivery_mode=2, )) self.connection.close() except Exception as e: print e return None
def call(self, submit_id, result_path, data_path, judge_path): rpc_body = encode(submit_id, result_path, data_path, judge_path) for i in range(5): try: app.logger.info("try!! %s %s %s %s" % (submit_id, result_path, data_path, judge_path)) self.channel.basic_publish(exchange='', routing_key=self.ch, properties=pika.BasicProperties( delivery_mode=2, ), body=rpc_body) return except pika.exceptions.ConnectionClosed: self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() self.channel.queue_declare(queue=self.ch, durable=True) app.logger.info("local!! %s %s %s %s" % (submit_id, result_path, data_path, judge_path)) #convert to local judge. that's a sync way! from .sandbox_server import SandBoxService SandBoxService.local_exec(submit_id, result_path, data_path, judge_path)
def publish(self, payload): with rmq_pool.acquire() as cxn: try: cxn.channel.queue_declare(queue=QUEUE_NAME, auto_delete=True) cxn.channel.basic_publish( body=json.dumps(payload), exchange='', routing_key=QUEUE_NAME, properties=pika.BasicProperties( content_type='plain/text' ) ) subscriber_id = payload['subscriber']['_id'] logger.info(f"Queue.publish published: {subscriber_id}") except Exception as e: logger.error(f"Queue.publish exception: {e}")
def sendRegisterMessage(server,routingKeys): exchangeName="qos.service" queueName="heartbeatService" msgHeaders={"__TypeId__":"com.tecomgroup.qos.communication.message.ServerStarted"} msgBody={"originName":None,"serverName":""} serverConfig = server.getConfigObject() errors=[] mqConf = getMqConf(serverConfig['mq'], server.name, errors) # raise exception only if all mq's are down, so message sending is impossible if mqConf is None: raise Exception("sendRegisterMessage error: " + str(errors)) connection=pika.BlockingConnection(pika.URLParameters(mqConf['amqpUrl'])) channel = connection.channel() channel.exchange_declare(exchange=exchangeName, exchange_type='topic', durable=True) channel.queue_declare(queue=queueName, durable=True,arguments={'x-message-ttl':1800000}) channel.queue_bind(queue=queueName, exchange=exchangeName, routing_key="server.agent.register") for key in routingKeys: channel.basic_publish( exchange=exchangeName, routing_key=key, properties=pika.BasicProperties( delivery_mode=2, # make message persistent content_type='application/json', content_encoding='UTF-8', priority=0, expiration="86400000", headers=msgHeaders), body=json.dumps(msgBody).encode('UTF-8') ) connection.close()
def send_message_to_queue(message): global corr_id global response global connection global channel global callback_queue response=None connection = pika.BlockingConnection(pika.ConnectionParameters(host="37.187.22.103",port=2765,heartbeat_interval=30)) channel = connection.channel() result=channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_consume(on_response, no_ack=True, queue=callback_queue) corr_id=str(uuid.uuid4()) response = None corr_id = str(uuid.uuid4()) channel.basic_publish( exchange='', routing_key="rpc_queue", properties=pika.BasicProperties( reply_to = callback_queue, correlation_id = corr_id), body=message) print(" [x] Sent data to RabbitMQ") while response is None: connection.process_data_events() print(" [x] Get response from RabbitMQ") return str(response)
def send_message_to_queue(message): global corr_id global response global connection global channel global callback_queue response=None connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq,port=int(rabbitmq_port),heartbeat_interval=30)) channel = connection.channel() result=channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_consume(on_response, no_ack=True, queue=callback_queue) corr_id=str(uuid.uuid4()) response = None corr_id = str(uuid.uuid4()) channel.basic_publish( exchange='', routing_key="rpc_queue", properties=pika.BasicProperties( reply_to = callback_queue, correlation_id = corr_id), body=message) print(" [x] Sent data to RabbitMQ") while response is None: connection.process_data_events() print(" [x] Get response from RabbitMQ") print "response: "+str(response) return response
def on_request(self, ch, method, props, body): response = self.receive(body) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag)
def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response)
def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag)
def request(ch, method, props, body): print(" [.] increase(%s)" % (body,)) response = increase(int(body)) #???????????????correlation_id??? ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag)
def request(self, n): self.response = None #?????????????? self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to =self.callback_queue, ), body=str(n)) #??????? while self.response is None: self.connection.process_data_events() return int(self.response)
def publish_message(self): """If the class is not stopping, publish a message to RabbitMQ, appending a list of deliveries with the message number that was sent. This list will be used to check for delivery confirmations in the on_delivery_confirmations method. Once the message has been sent, schedule another message to be sent. The main reason I put scheduling in was just so you can get a good idea of how the process is flowing by slowing down and speeding up the delivery intervals by changing the PUBLISH_INTERVAL constant in the class. """ if self._stopping: return # controllo che il servizio di acquisizione sia attivo ... # if not self._winservice.isRunning(): # LOGGER.info('Win Service is not running...') # print 'Win Service is not running...' # return message = self.get_message_from_selected_data(); #print "***************************************" #print json.dumps(message, ensure_ascii=False) #print "***************************************" properties = pika.BasicProperties(app_id='myrabbit_py-publisher', content_type='application/json') self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY, json.dumps(message, ensure_ascii=False), properties) self._message_number += 1 self._deliveries.append(self._message_number) logger.info('Published message # %i', self._message_number) self.schedule_next_message()
def send(self, body, **kw): return self.channel.basic_publish(self.exchange, self.routing_key, json.dumps(body), properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ), **kw)
def push_message(self, msg): self._check_connection() if self.channel.basic_publish(exchange=self.exchange, routing_key=self.exchange + '-' + self.queue_name, body=msg, properties=pika.BasicProperties( delivery_mode=2)): print('message sent') else: print('ERROR: message failed to send')
def enqueue_flight_snippet(flight_snippet): """Add items from the flight_dictionary to rabbitmq queue as json strings""" channel.basic_publish(exchange='', routing_key=queue_name, body=flight_snippet, properties=pika.BasicProperties(delivery_mode=2) # make message persistent ) # this function reads a dictionary of a flight snapshot and returns a different and easier to work with dictionary
def produce(self, message): """Publish a message to add inside the queue. Args; message: object to add inside the queue. """ self.channel.basic_publish(exchange='', routing_key='dazzar_jobs', body=pickle.dumps(message), properties=pika.BasicProperties( delivery_mode=2, # make message persistent ))
def send(self, n, routing): self.channel.basic_publish(exchange='', routing_key=routing, properties=pika.BasicProperties( delivery_mode=2,), body=json.dumps(n))
def start(self, cmd, routing_key="remote.call"): self.response = [] # ?????????? self.correlation_id = str(uuid.uuid4()) self.log.info("exec command {}".format(cmd)) self.log.debug("routing key {}".format(routing_key)) self.channel.basic_publish(exchange=self.exchange, routing_key=routing_key, # ?routing key???????????????routing key??? properties=pika.BasicProperties( reply_to=self.queue_name, correlation_id=self.correlation_id ), body=cmd) before = time.monotonic() # ?????????? after_len = 0 # ????????????? while True: if len(self.response) != after_len: # ?????????,???????????? before_len = len(self.response) else: before_len = after_len # ????????????????????????????,??????,????????,????????? time.sleep(0.4) self.connection.process_data_events() # ??????,????????????? if len(self.response) == before_len and before_len: break after = time.monotonic() # ????????????? if (after - before) > self.timeout: # ???????16s,????????? break return self.response # ??????????
def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange="", routing_key="rpc_queue", properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response)