KeyError 即使步骤没有输入且针对匹配情况存在键。

huangapple go评论64阅读模式
英文:

dataflow python - KeyError even though no input to the step and the key exists for matched cases

问题

我正在编写一个数据管道,用于提取4种不同属性结构的JSON数据,并将其写入Bigquery。

我的管道:

data = (pipeline
        | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=known_args.input_subscription)
    )

# 第一个分支 - 将消息推送到Datastore
p1 = (data
      | "Add publish time" >> ParDo(AddTimestamp())
      | "Create Entity" >> Map(EntityWrapper(known_args.kind).make_entity)
      | "Write to Datastore" >> WriteToDatastore(DATASTORE_PROJECT)
      )
# 第二个分支 - 将数据写入Bigquery
p2 = (data
      | "classify request depending on type" >> ParDo(ClassifyReq()).with_outputs('label', 'ads', 'website', 'post')
      )

for type in ['label', 'ads', 'website', 'post']:

    result = (p2[type]
              | "extract req: " + type >> ParDo(ExtractReq(type))  # 步骤标签名必须唯一
              | "Write to Bigquery: " + type >> io.WriteToBigQuery(
                  known_args.output_table,
                  schema=known_args.output_schema,
                  write_disposition=io.BigQueryDisposition.WRITE_APPEND,
              )
              )

我的函数:

class ClassifyReq(DoFn):
    """
    A transform that classifies the type of Pub/Sub messages and outputs elements with tags.
    """
    def process(self, element):
        type = ''
        req_str = str(element)
        req = json.loads(element.decode('utf-8'))

        if 'inbox_labels' in req_str:
            type = 'label'
        elif 'ad_id' in req_str:
            type = 'ads'
        elif ('postback' in req_str or 'referral' in req_str) and 'ref' in req_str:
            type = 'website'
        elif ('feed' in req_str) and ('comment' in req_str):  # 仅评论,没有reaction_type
            data = req['entry'][0]['changes'][0]['value']
            from_id = data['from']['id']
            if from_id not in page_data.keys():  # 仅更新不来自页面管理员
                type = 'post'

        yield TaggedOutput(type, element)

class ExtractReq(DoFn):
    """
    A Transform that extracts elements and maps data for the Bigquery table depending on the tag from the prior step.
    """
    def __init__(self, type):
        self.type = type

    def process(self, element):
        req = json.loads(element.decode("utf-8"))
        page_id = req['entry'][0]['id']
        if self.type == 'label':
            unix_datetime = req['entry'][0]['time']
            data = req['entry'][0]['changes'][0]['value']
            customer_id = data['user']['id']
            label = data['label']['page_label_name']
            label_action = data['action']
            row = {'page_id': page_id,
                   'type': self.type,
                   'customer_id': customer_id,
                   'unix_datetime': unix_datetime,
                   'ad_data': None,
                   'ad_type': None,
                   'ad_title': None,
                   'post_id': None,
                   'post_link': None,
                   'label': label,
                   'action': label_action
                   }

        elif self.type == 'ads':
            data = req['entry'][0]['messaging'][0]
            customer_id = data['sender']['id']
            unix_datetime = int(str(req['entry'][0]['time'])[:-3])
            if isinstance(data, str):
                data = json.loads(data)
            ad_id = data['referral']['ad_id']
            ad_title = data['referral']['ads_context_data']['ad_title']
            ad_type = data['referral']['type']
            post_id = data['referral']['ads_context_data']['post_id']

            row = {'page_id': page_id,
                   'type': self.type,
                   'customer_id': customer_id,
                   'unix_datetime': unix_datetime,
                   'ad_data': ad_id,
                   'ad_type': ad_type,
                   'ad_title': ad_title,
                   'post_id': post_id,
                   'post_link': None,
                   'label': None,
                   'action': None
                   }

        elif self.type == 'website':
            data = req['entry'][0]['messaging'][0]
            # 时间戳有13位数字,所以去掉毫秒单位
            unix_datetime = int(str(req['entry'][0]['time'])[:-3])
            if isinstance(data, str):
                data = json.loads(data)
            customer_id = data['sender']['id']
            ref = data['referral']['ref']

            row = {'page_id': page_id,
                   'type': self.type,
                   'customer_id': customer_id,
                   'unix_datetime': unix_datetime,
                   'ad_data': ref,
                   'ad_type': None,
                   'ad_title': None,
                   'post_id': None,
                   'post_link': None,
                   'label': None,
                   'action': None
                   }

        elif self.type == 'post':
            data = req['entry'][0]['changes'][0]['value']
            from_id = data['from']['id']
            link = data['post']['permalink_url']
            post_id = data['post_id']
            unix_datetime = data['created_time']
            verb = data['verb']

            row = {'page_id': page_id,
                   'type': self.type,
                   'customer_id': from_id,
                   'unix_datetime': unix_datetime,
                   'ad_data': None,
                   'ad_type': None,
                   'ad_title': None,
                   'post_id': post_id,
                   'post_link': link,
                   'label': None,
                   'action': verb
                   }
        return [row]

错误主要出现在类型'website'和'post'。例如:

KeyError: "post [while running 'extract req: post-ptransform-217']"

KeyError: "referral [while running 'extract req: website-ptransform-228']"

我曾手动测试了每种类型的一些请求,结果都正常。我认为每种类型的键确实存在。此外,我检查过有时第一步没有产生类型为'website'的输出,但仍然会出现从步骤'extract req: website'中出现的错误。我尝试使用if isinstance(data, str):来检查,如果不是,就使用json.loads(data),但没有起作用。

英文:

I'm writing a data pipeline to extract 4 types of json which comes with different attribute structure and write it to Bigquery.

My pipeline :

data =  (pipeline 
| "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=known_args.input_subscription)
)
# first branch - push message to Datastore
p1 =    (data  
| "Add publish time" >> ParDo(AddTimestamp()) 
| "Create Entity" >> Map(EntityWrapper(known_args.kind).make_entity) 
| "Write to Datastore" >> WriteToDatastore(DATASTORE_PROJECT)
)
# second branch - write data to Bigquery 
p2 =    (data  
| "classify request depending on type" >> ParDo(ClassifyReq()).with_outputs('label', 'ads', 'website', 'post')
)
for type in ['label', 'ads', 'website', 'post'] :
result =   (p2[type] 
| "extract req: " + type >> ParDo(ExtractReq(type)) # step label name must be unique
| "Write to Bigquery: " + type >> io.WriteToBigQuery(
known_args.output_table,
schema=known_args.output_schema,
write_disposition=io.BigQueryDisposition.WRITE_APPEND,
)
)

My function :

class ClassifyReq(DoFn):
"""
A transform that classify type of Pub/Sub messages and outputs element with tag
"""
def process(self,element):
type = ''
req_str = str(element)
req = json.loads(element.decode('utf-8'))
if 'inbox_labels' in req_str:
type = 'label'
elif 'ad_id' in req_str:
type = 'ads'
elif ('postback' in req_str or 'referral' in req_str) and 'ref' in req_str:
type = 'website'
elif ('feed' in req_str) and ('comment' in req_str): # only comment , no reaction_type
data = req['entry'][0]['changes'][0]['value']
from_id = data['from']['id']
if from_id not in page_data.keys():  # only update not from page admin
type = 'post'
yield TaggedOutput(type, element)
class ExtractReq(DoFn):
"""
A Transform that extract element and map data for Bigquery table depending on 
given tag from the prior step.
"""
def __init__(self, type):
self.type = type
def process(self, element):
req = json.loads(element.decode("utf-8"))
page_id = req['entry'][0]['id']
if self.type == 'label':
unix_datetime = req['entry'][0]['time']
data = req['entry'][0]['changes'][0]['value']
customer_id = data['user']['id']
label = data['label']['page_label_name']
label_action = data['action']
row = {'page_id': page_id,
'type': self.type, 
'customer_id' : customer_id, 
'unix_datetime': unix_datetime,
'ad_data' : None,
'ad_type' : None,
'ad_title' : None,
'post_id': None, 
'post_link': None,
'label' : label,
'action' : label_action
}
elif self.type == 'ads':
data = req['entry'][0]['messaging'][0]
customer_id = data['sender']['id']
unix_datetime = int(str(req['entry'][0]['time'])[:-3])
if isinstance(data, str):
data = json.loads(str)
ad_id = data['referral']['ad_id']
ad_title = data['referral']['ads_context_data']['ad_title']
ad_type = data['referral']['type']
post_id = data['referral']['ads_context_data']['post_id']
row = {'page_id': page_id,
'type': self.type, 
'customer_id' : customer_id, 
'unix_datetime': unix_datetime,
'ad_data' : ad_id,
'ad_type' : ad_type,
'ad_title' : ad_title,
'post_id': post_id, 
'post_link': None,
'label' : None,
'action' : None
}
elif self.type == 'website':
data = req['entry'][0]['messaging'][0]
# timestamp has 13 digits so cut out milli second unit
unix_datetime = int(str(req['entry'][0]['time'])[:-3])
if isinstance(data, str):
data = json.loads(str)
customer_id = data['sender']['id']
ref = data['referral']['ref']
row = {'page_id': page_id,
'type': self.type, 
'customer_id' : customer_id, 
'unix_datetime': unix_datetime,
'ad_data' : ref,
'ad_type' : None,
'ad_title' : None,
'post_id': None, 
'post_link': None,
'label' : None,
'action' : None
}
elif self.type == 'post':
data = req['entry'][0]['changes'][0]['value']
from_id = data['from']['id']
link = data['post']['permalink_url']
post_id = data['post_id']
unix_datetime = data['created_time']
verb = data['verb']
row = {'page_id': page_id,
'type': self.type, 
'customer_id' : from_id, 
'unix_datetime': unix_datetime,
'ad_data' : None,
'ad_type' : None,
'ad_title' : None,
'post_id': post_id, 
'post_link': link,
'label' : None,
'action' : verb
}
return [row]

My error mostly occured with the type 'website' and 'post. For example

KeyError: "post [while running 'extract req: post-ptransform-217']"

KeyError: "referral [while running 'extract req: website-ptransform-228']"

I used to manually test with some requests of each type and it works fine and I think the key really exists for each type. Moreover, I've checked that sometimes the first step has no website-typed output yet still error from step extract req: website occured. I have tried to check with if isinstance(data, str): If not use json.loads(data) but it doesn't work.

答案1

得分: 0

经过我苦苦挣扎了许多天,我找到了一种解决方法,通过将请求字符串写为Bigquery中的一列来调试我的代码。由于存在太多不需要的Webhook请求,我错过了一些分类规则,所以它们通过了分类,而没有我想要提取的关键信息。

英文:

After I struggled for many days, I got a solution to debug my code by writing a request string as one of a column in Bigquery. As there are too many cases of unwanted webhook request, I missed some rules in classification, so they passed the classification without the key I wants to extract.

huangapple
  • 本文由 发表于 2023年2月24日 12:57:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/75552768.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定