获取Azure Monitor的特定警报,使用Python。

huangapple go评论64阅读模式
英文:

Get specific alert from Azure Monitor using Python

问题

Sure, here's the translation of the non-code parts:

我想要使用Python(通过Azure函数)从Azure Monitor获取特定的警报。
Azure Monitor将触发每个事件的Azure函数。

目前,我正在使用azure.mgmt.alertsmanagement.operations模块中的get_all,这允许我获取所有警报。
我已经测试过get_by_id,但我被迫指定警报ID,而我希望能自动获取它。

请查看以下由Azure Monitor使用get_all发送的JSON:

正如您所看到的,我正在通过if alert.name == "alert_rule_name" 进行过滤,但这不是我正在寻找的(我得到的是事件列表)。

是否有一种方法可以在Azure Monitor调用我的函数时从负载中获取警报ID?
这是为了使用此ID获取特定的警报(事件)。

提前感谢您的帮助。

英文:

I want to get a specific alert from Azure Monitor using python (through an Azure function).
The Azure Monitor will triggered the az function for each Event.

Currently I'm using get_all from azure.mgmt.alertsmanagement.operations module, this allows me to get all alerts.
Also already tested get_by_id but I was obliged to specify the alert_id while I'm looking to get it automatically.

import logging
import urllib3
import os
import json
import requests
from azure.identity import ClientSecretCredential
from azure.mgmt.alertsmanagement import AlertsManagementClient


subscription_id =""
client_id =""
client_secret =""
tenant_id = ""

credential = ClientSecretCredential(
      tenant_id=tenant_id, 
      client_id=client_id, 
      client_secret=client_secret
)

print("===Auth Azure Monitor===")
client = AlertsManagementClient(
    credential,
    subscription_id
)

print("=== Get alert event from Az Monitor & Post it to monitoring platform === ")
headers = {'Authorization': f'authtoken {token}'}

for alert in client.alerts.get_all():
    if alert.name == "alert_rule_name" :
        attributes = {'CLASS': 'EVENT',
                      'severity': 'CRITICAL',
                      'msg': alert.name,
                      'lastModifiedDateTime': json.dumps(alert.properties.essentials.last_modified_date_time, indent=4, sort_keys=True, default=str)
                      }
        payload = [{'eventSourceHostName': alert.properties.essentials.target_resource_name, 'attributes': attributes}]
        print("JSON_PAYLOAD :", payload)
## Some code here to push the Alert to a monitoring platform ..

Please, find below the json sent by Azure Monitor with get_all :

{'value': [{'properties': {'essentials': {
'severity': 'Sev2', 
'signalType': 'Metric', 
'alertState': 'New', 
'monitorCondition': 'Fired', 
'monitorService': 'Platform', 
'targetResource': '/subscriptions/sub_id/resourcegroups/rsg_name/providers/microsoft.compute/virtualmachines/vm_name', 
'targetResourceName': 'vm_name', 
'targetResourceGroup': 'rsg_name', 
'targetResourceType': 'virtualmachines', 
'sourceCreatedId': '5f33r_rsg_name_microsoft.insights_metricAlerts_alert_rule_name-1899618006', 
'alertRule': '/subscriptions/sub_id/resourceGroups/rsg_name/providers/microsoft.insights/metricAlerts/alert_rule_name', 
'startDateTime': '2023-05-09T13:32:28.1880147Z', 
'lastModifiedDateTime': '2023-05-09T13:32:28.1880147Z', 
'lastModifiedUserName': 'System', 
'actionStatus': {'isSuppressed': False}, 'description': ''}
}, 
'id': '/subscriptions/sub_id/providers/Microsoft.AlertsManagement/alerts/2222-5555-88888', 
'type': 'Microsoft.AlertsManagement/alerts', 
'name': 'alert_rule_name'}, 

As you see, I'm filtering by [if alert.name == "alert_rule_name"] and this is not what I'm looking for (I got a list of Events).

Is there a way to get the alert ID from the payload when Azure Monitor call my function ?
This is to use this ID to get a specific alert (event).

Thanks in advance

答案1

得分: 1

以下是您要翻译的内容:

Is there a way to get the alert ID from the payload when Azure Monitor

您可以使用以下代码来使用Python获取负载中的警报ID。

您需要在属性中添加 alert.id 来获取您特定警报的警报ID。

代码:

import os
import json
import requests
from azure.identity import DefaultAzureCredential
from azure.mgmt.alertsmanagement import AlertsManagementClient


subscription_id = "your subscription id"
client_id = ""
client_secret = ""
tenant_id = ""

credential = ClientSecretCredential(
      tenant_id=tenant_id, 
      client_id=client_id, 
      client_secret=client_secret
)


print("===Auth Azure Monitor===")
client = AlertsManagementClient(
    credential,
    subscription_id
)
print("=== Get alert event from Az Monitor & Post it to monitoring platform === ")

for alert in client.alerts.get_all():
    if alert.name == "Backup Failure" :
        attributes = {'CLASS': 'EVENT',
                      'severity': 'CRITICAL',
                      'msg': alert.name,
                      'id': alert.id,
                      'lastModifiedDateTime': json.dumps(alert.properties.essentials.last_modified_date_time, indent=4, sort_keys=True, default=str)
                      }

        payload = [{'eventSourceHostName': alert.properties.essentials.target_resource_name,'attributes': attributes}]
        print("JSON_PAYLOAD :", payload)

输出:

===Auth Azure Monitor===
=== Get alert event from Az Monitor & Post it to monitoring platform ===
JSON_PAYLOAD : [{'eventSourceHostName': 'mm-automation-runas-account-2', 'attributes': {'CLASS': 'EVENT', 'severity': 'CRITICAL', 'msg': 'aa-test-1', 'id': '/subscriptions/bxxxxf/resourcegroups/management_migration-resources/providers/microsoft.automation/automationaccounts/mm-automation-runas-account-2/providers/Microsoft.AlertsManagement/alerts/3f481155-b808-a188-6exxxxxx', 'lastModifiedDateTime': '"2023-06-14 05:35:14.747028+00:00"'}}]
JSON_PAYLOAD : [{'eventSourceHostName': 'mm-automation-runas-account-2', 'attributes': {'CLASS': 'EVENT', 'severity': 'CRITICAL', 'msg': 'aa-test-1', 'id': '/subscriptions/bxxxxxf/resourcegroups/management_migration-resources/providers/microsoft.automation/automationaccounts/mm-automation-runas-account-2/providers/Microsoft.AlertsManagement/alerts/8cba3e70-c957-4xxxxxxxx', 'lastModifiedDateTime': '"2023-06-13 12:35:13.840749+00:00"'}}]

获取Azure Monitor的特定警报,使用Python。

英文:

> Is there a way to get the alert ID from the payload when Azure Monitor

You can use the below code to get an Alert id with payload using python.

You need to add alert.id in your attributes to get the alert id of your specific alert.

Code:

import os
import json
import requests
from azure.identity import DefaultAzureCredential
from azure.mgmt.alertsmanagement import AlertsManagementClient


subscription_id ="your subscription id"
client_id =""
client_secret =""
tenant_id = ""

credential = ClientSecretCredential(
      tenant_id=tenant_id, 
      client_id=client_id, 
      client_secret=client_secret
)


print("===Auth Azure Monitor===")
client = AlertsManagementClient(
    credential,
    subscription_id
)
print("=== Get alert event from Az Monitor & Post it to monitoring platform === ")

for alert in client.alerts.get_all():
    if alert.name == "Backup Failure" :
        attributes = {'CLASS': 'EVENT',
                      'severity': 'CRITICAL',
                      'msg': alert.name,
                      'id': alert.id,
                      'lastModifiedDateTime': json.dumps(alert.properties.essentials.last_modified_date_time, indent=4, sort_keys=True, default=str)
                      }
        
        payload = [{'eventSourceHostName': alert.properties.essentials.target_resource_name,'attributes': attributes}]
        print("JSON_PAYLOAD :", payload)

Output:

===Auth Azure Monitor===
=== Get alert event from Az Monitor & Post it to monitoring platform ===
JSON_PAYLOAD : [{'eventSourceHostName': 'mm-automation-runas-account-2', 'attributes': {'CLASS': 'EVENT', 'severity': 'CRITICAL', 'msg': 'aa-test-1', 'id': '/subscriptions/bxxxxf/resourcegroups/management_migration-resources/providers/microsoft.automation/automationaccounts/mm-automation-runas-account-2/providers/Microsoft.AlertsManagement/alerts/3f481155-b808-a188-6exxxxxx', 'lastModifiedDateTime': '"2023-06-14 05:35:14.747028+00:00"'}}]
JSON_PAYLOAD : [{'eventSourceHostName': 'mm-automation-runas-account-2', 'attributes': {'CLASS': 'EVENT', 'severity': 'CRITICAL', 'msg': 'aa-test-1', 'id': '/subscriptions/bxxxxxf/resourcegroups/management_migration-resources/providers/microsoft.automation/automationaccounts/mm-automation-runas-account-2/providers/Microsoft.AlertsManagement/alerts/8cba3e70-c957-4xxxxxxxx', 'lastModifiedDateTime': '"2023-06-13 12:35:13.840749+00:00"'}}]

获取Azure Monitor的特定警报,使用Python。

答案2

得分: 0

Azure Monitor触发以下Azure函数,仅解析一个事件并将其转发到另一个目的地,以通知支持团队:

import azure.functions as func
import os
import json
import requests
import urllib3
import logging

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info("===== Auth Monitoring Platform =====")
    urllib3.disable_warnings()
    creds = {'username': "user", 'password': "****", 'tenantName': '*'}
    logging.debug(f'Retrieving authorization token')
    resp = requests.post(f'https://{api_url}', json=creds, verify=False)
    if resp.status_code != 200:
            logging.error(f'{resp.headers}')
            logging.error(f'{resp.text}')
            exit(1)
            
    token = resp.json()["response"]["authToken"]

    logging.info("===== Get Alert JSON & Prepare Post to Monitor Platform =====")

    req_body = req.get_json()
    
    msg_detail = {'firedDateTime' : req_body['data']['essentials']['firedDateTime'], 
                   'operator':       req_body['data']['alertContext']['condition']['allOf'][0]['operator'],
                   'threshold':      req_body['data']['alertContext']['condition']['allOf'][0]['threshold'],
                   'metricValue':    req_body['data']['alertContext']['condition']['allOf'][0]['metricValue']
                   }
    headers = {'Authorization': f'authtoken {token}'}
    attributes = {  'CLASS':           'EVENT',
                    'severity':        'CRITICAL',
                    'msg':             req_body['data']['essentials']['alertRule'],
                    'msg_detail':     mc_long_msg,
                    'object':       req_body['data']['essentials']['configurationItems'][0],
                    'object_class': req_body['data']['alertContext']['condition']['allOf'][0]['metricNamespace'],
                    'mc_parameter':    req_body['data']['alertContext']['condition']['allOf'][0]['metricName']
                 }
    
    payload = [{'eventSource': req_body['data']['essentials']['configurationItems'][0], 'attributes': attributes}]
    params = {'param1':"value1" , 'param2':"value2"}
    logging.info(f'Event header: {headers}')
    logging.info(f'Event payload: {payload}')     
    
    response = requests.post(f'https://{api_url}',
                       headers=headers,
                       params=params,
                       json=payload,
                       verify=False
                )
    if response.status_code != 200:
        logging.error(f'{response.headers}')
        logging.error(f'{response.text}')
        exit(1)
    
    logging.debug(f'{response.headers}')
    logging.debug(f'{response.text}')
    logging.info(f'Payload: {payload}')
    
    return func.HttpResponse(
        "This HTTP triggered function executed successfully.",
        status_code=200
    )
英文:

Azure Monitor trigger the below Azure Function which parse only one Event and forward it to another destination in order to notify the support team :

import azure.functions as func
import os
import json
import requests
import urllib3
import logging
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info("===== Auth Monitoring Platform =====")
urllib3.disable_warnings()
creds = {'username': "user", 'password': "****", 'tenantName': '*'}
logging.debug(f'Retrieving authorization token')
resp = requests.post(f'https://{api_url}', json=creds, verify=False)
if resp.status_code != 200:
logging.error(f'{resp.headers}')
logging.error(f'{resp.text}')
exit(1)
token = resp.json()["response"]["authToken"]
logging.info("===== Get Alert JSON & Prepare Post to Monitor Platform =====")
req_body = req.get_json()
msg_detail = {'firedDateTime' : req_body['data']['essentials']['firedDateTime'], 
'operator':       req_body['data']['alertContext']['condition']['allOf'][0]['operator'],
'threshold':      req_body['data']['alertContext']['condition']['allOf'][0]['threshold'],
'metricValue':    req_body['data']['alertContext']['condition']['allOf'][0]['metricValue']
}
headers = {'Authorization': f'authtoken {token}'}
attributes = {  'CLASS':           'EVENT',
'severity':        'CRITICAL',
'msg':             req_body['data']['essentials']['alertRule'],
'msg_detail':     mc_long_msg,
'object':       req_body['data']['essentials']['configurationItems'][0],
'object_class': req_body['data']['alertContext']['condition']['allOf'][0]['metricNamespace'],
'mc_parameter':    req_body['data']['alertContext']['condition']['allOf'][0]['metricName']
}
payload = [{'eventSource': req_body['data']['essentials']['configurationItems'][0], 'attributes': attributes}]
params = {'param1':"value1" , 'param2':"value2"}
logging.info(f'Event header: {headers}')
logging.info(f'Event payload: {payload}')     
response = requests.post(f'https://{api_url}',
headers=headers,
params=params,
json=payload,
verify=False
)
if response.status_code != 200:
logging.error(f'{response.headers}')
logging.error(f'{response.text}')
exit(1)
logging.debug(f'{response.headers}')
logging.debug(f'{response.text}')
logging.info(f'Payload: {payload}')
return func.HttpResponse(
"This HTTP triggered function executed successfully.",
status_code=200
)

huangapple
  • 本文由 发表于 2023年6月9日 14:03:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76437598.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定