在尝试了一些失败的实验后,我试图使用python和其余API将消息发送到Azure Event Hub,我发现了有效的代码(请参阅下文),但是我希望能够选择将事件发送到哪个分区。
是否可以使用rest API,如果应该,应该怎么做?
#!/user/bin/python import json from datetime import datetime from multiprocessing import Pool # from azure.servicebus import _service_bus_error_handler from azure.servicebus.servicebusservice import ServiceBusService, ServiceBusSASAuthentication from azure.http import ( HTTPRequest, HTTPError ) from azure.http.httpclient import _HTTPClient EVENT_HUB_HOST = "mysecrethub.servicebus.windows.net" EVENT_HUB_NAME = "secerthub-name" KEYNAME = "senderkey" # needs to be loaded from ENV KEYVALUE = "keyvalue" # needs to be loaded from ENV EXTRA_HEADERS = [] NUM_OF_PARTITIONS = 16 class EventHubClient(object): def __init__(self, host, hubname, keyname, keyvalue): self._host = host self._hub = hubname self._keyname = keyname self._key = keyvalue def sendMessage(self, body, partition=None, additional_headers=None): eventHubHost = self._host httpclient = _HTTPClient(service_instance=self) sasKeyName = self._keyname sasKeyValue = self._key authentication = ServiceBusSASAuthentication(sasKeyName, sasKeyValue) request = HTTPRequest() request.method = "POST" request.host = eventHubHost request.protocol_override = "https" request.path = "/%s/messages?api-version=2014-01" % (self._hub) request.body = body request.headers.append(('Content-Type', 'application/atom+xml;type=entry;charset=utf-8')) if additional_headers is not None: for item in additional_headers: request.headers.append(item) if partition is not None: value = json.dumps({'PartitionKey': partition}) request.headers.append(('BrokerProperties', value)) authentication.sign_request(request, httpclient) request.headers.append(('Content-Length', str(len(request.body)))) status = 0 try: resp = httpclient.perform_request(request) status = resp.status except HTTPError as ex: status = ex.status # print request.headers return status def prepare_message(appid, sessionid, partitionKey=None, SessionEllapsed=None, DeviceOs=None): message = {"Name": "MonitorEvent"} Attributes = {"AppId": appid, "SessionStarted": "".join(str(datetime.now())[:-3])} if SessionEllapsed is not None: Attributes['SessionEllapsed'] = SessionEllapsed if DeviceOs is not None: Attributes['DeviceOs'] = DeviceOs if partitionKey is not None: message["PartitionKey"] = str(partitionKey) message["PartitionId"] = str(partitionKey) Attributes['ItemId'] = partitionKey message['Attributes'] = Attributes return json.dumps(message) def send_monitoring_event(partition): hubClient = EventHubClient(EVENT_HUB_HOST, EVENT_HUB_NAME, KEYNAME, KEYVALUE) appid = 1 sendertime = datetime.now().strftime('%Y%M%d-%H%M%S') message = prepare_message(appid, sendertime, partitionKey=partition, SessionEllapsed=1, DeviceOs='Monitor' + str(partition)) # print message hubStatus = hubClient.sendMessage(message, partition=None, additional_headers=EXTRA_HEADERS) # return the HTTP status to the caller return hubStatus def main(): pool = Pool(processes=NUM_OF_PARTITIONS) print pool.map(send_monitoring_event, range(NUM_OF_PARTITIONS)) if __name__ == '__main__': main()
在“事件中心REST API”文档“发送事件”部分https://msdn.microsoft.com/zh- cn/library/azure/dn790664.aspx中,您不能使用请求URI https:// {serviceNamespace} .servicebus.windows.net / {eventHubPath} / messages以选择将事件发送到哪个分区。
您应该使用请求URI https:// {serviceNamespace} .servicebus.windows.net / {eventHubPath} / publishers / {deviceId} / messages。属性{deviceId}是用于对设备进行分组/分区的分区键-无论是地理位置,设备类型,版本,租户等。
但是分区数必须是2到32之间的数字。因此,如果需要使用32个以上的分区,建议将密钥放入事件数据中。
最好的祝福。