英文:
Not able to push the data into Azure event hub
问题
我正在尝试将数据样本数据推送到Azure事件中心,但无法成功。
import azure.functions as func
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
from azure.identity.aio import DefaultAzureCredential
def main(myblob: func.InputStream):
logging.info(f"Python blob trigger function processed blob \n" f"Name: {myblob.name}\n" f"Blob Size: {myblob.length} bytes")
event_hub_connection_string = "Endpoint=sb://event-hub-namespace/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=sharedaccesskey"
event_hub_name = "event_hub_name"
producer = EventHubProducerClient.from_connection_string(event_hub_connection_string, eventhub_name=event_hub_name)
event_data = EventData(b'Hello, Event Hub!')
with producer:
producer.send_batch(event_data)
我收到以下错误,并不确定是否传递了正确的连接字符串。
Result: Failure Exception: RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-1_0'. Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 479, in _handle__invocation_request call_result = await self._loop.run_in_executor( File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 752, in _run_sync_func return ExtensionManager.get_sync_invocation_wrapper(context, File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/extension.py", line 215, in _raw_invocation_wrapper result = function(**args) File "/home/site/wwwroot/event-hub-test/__init__.py", line 15, in main producer = EventHubProducerClient.from_connection_string(event_hub_connection_string, eventhub_name=event_hub_name) File "/home/site/wwwroot/.python_packages/lib/site-packages/azure/eventhub/aio/_producer_client_async.py", line 517, in from_connection_string return cls(**constructor_args) File "/home/site/wwwroot/.python_packages/lib/site-packages/azure/eventhub/aio/_producer_client_async.py", line 181, in __init__ ALL_PARTITIONS: self._create_producer() File "/home/site/wwwroot/.python_packages/lib/site-packages/azure/eventhub/aio/_producer_client_async.py", line 354, in _create_producer handler = EventHubProducer( # type: ignore File "/home/site/wwwroot/.python_packages/lib/site-packages/azure/eventhub/aio/_producer_async.py", line 110, in __init__ self._lock = asyncio.Lock(**self._internal_kwargs) File "/usr/local/lib/python3.9/asyncio/locks.py", line 81, in __init__ self._loop = events.get_event_loop() File "/usr/local/lib/python3.9/asyncio/events.py", line 642, in get_event_loop raise RuntimeError('There is no current event loop in thread %r.'
英文:
I am trying to push the data sample data into the Azure event hub but I am not able to do so.
import azure.functions as func
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
from azure.identity.aio import DefaultAzureCredential
def main(myblob: func.InputStream):
logging.info(f"Python blob trigger function processed blob \n" f"Name: {myblob.name}\n" f"Blob Size: {myblob.length} bytes")
event_hub_connection_string = "Endpoint=sb://event-hub-namespace/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=sharedaccesskey"
event_hub_name = "event_hub_name"
producer = EventHubProducerClient.from_connection_string(event_hub_connection_string, eventhub_name=event_hub_name)
# event_data = "this is the first message"
event_data = EventData(b'Hello, Event Hub!')
with producer:
producer.send_batch(event_data)
I am getting the below error and I am not sure If I am passing the correct connection string as well.
Result: Failure Exception: RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-1_0'. Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 479, in _handle__invocation_request call_result = await self._loop.run_in_executor( File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 752, in _run_sync_func return ExtensionManager.get_sync_invocation_wrapper(context, File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/extension.py", line 215, in _raw_invocation_wrapper result = function(**args) File "/home/site/wwwroot/event-hub-test/__init__.py", line 15, in main producer = EventHubProducerClient.from_connection_string(event_hub_connection_string, eventhub_name=event_hub_name) File "/home/site/wwwroot/.python_packages/lib/site-packages/azure/eventhub/aio/_producer_client_async.py", line 517, in from_connection_string return cls(**constructor_args) File "/home/site/wwwroot/.python_packages/lib/site-packages/azure/eventhub/aio/_producer_client_async.py", line 181, in __init__ ALL_PARTITIONS: self._create_producer() File "/home/site/wwwroot/.python_packages/lib/site-packages/azure/eventhub/aio/_producer_client_async.py", line 354, in _create_producer handler = EventHubProducer( # type: ignore File "/home/site/wwwroot/.python_packages/lib/site-packages/azure/eventhub/aio/_producer_async.py", line 110, in __init__ self._lock = asyncio.Lock(**self._internal_kwargs) File "/usr/local/lib/python3.9/asyncio/locks.py", line 81, in __init__ self._loop = events.get_event_loop() File "/usr/local/lib/python3.9/asyncio/events.py", line 642, in get_event_loop raise RuntimeError('There is no current event loop in thread %r.'
答案1
得分: 1
这是因为导入模块时引入了异步版本的客户端,而 Azure 函数是以同步方式运行的。
请进行以下更改,然后尝试重新运行该函数:
import azure.functions as func
from azure.eventhub.aio import EventHubProducerClient # 引入异步版本的 EventHubProducerClient
from azure.eventhub import EventData
from azure.identity import DefaultAzureCredential
# ... (其余代码不变)
您可以在我们的存储库中找到更多示例 这里。
英文:
This is happening because the imports are bringing in the async version of the clients and the azure function is running in sync.
Please make the following changes and try running the function again
import azure.functions as func
from azure.eventhub import EventData
from azure.eventhub import EventHubProducerClient
from azure.identity import DefaultAzureCredential
def main(myblob: func.InputStream):
logging.info(f"Python blob trigger function processed blob \n" f"Name: {myblob.name}\n" f"Blob Size: {myblob.length} bytes")
event_hub_connection_string = "Endpoint=sb://event-hub-namespace/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=sharedaccesskey"
event_hub_name = "event_hub_name"
producer = EventHubProducerClient.from_connection_string(event_hub_connection_string, eventhub_name=event_hub_name)
# event_data = "this is the first message"
event_data = EventData(b'Hello, Event Hub!')
with producer:
producer.send_batch(event_data)
you can find more samples in our repo here
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论