英文:
dataflow python - KeyError even though no input to the step and the key exists for matched cases
问题
我正在编写一个数据管道,用于提取4种不同属性结构的JSON数据,并将其写入Bigquery。
我的管道:
data = (pipeline
| "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=known_args.input_subscription)
)
# 第一个分支 - 将消息推送到Datastore
p1 = (data
| "Add publish time" >> ParDo(AddTimestamp())
| "Create Entity" >> Map(EntityWrapper(known_args.kind).make_entity)
| "Write to Datastore" >> WriteToDatastore(DATASTORE_PROJECT)
)
# 第二个分支 - 将数据写入Bigquery
p2 = (data
| "classify request depending on type" >> ParDo(ClassifyReq()).with_outputs('label', 'ads', 'website', 'post')
)
for type in ['label', 'ads', 'website', 'post']:
result = (p2[type]
| "extract req: " + type >> ParDo(ExtractReq(type)) # 步骤标签名必须唯一
| "Write to Bigquery: " + type >> io.WriteToBigQuery(
known_args.output_table,
schema=known_args.output_schema,
write_disposition=io.BigQueryDisposition.WRITE_APPEND,
)
)
我的函数:
class ClassifyReq(DoFn):
"""
A transform that classifies the type of Pub/Sub messages and outputs elements with tags.
"""
def process(self, element):
type = ''
req_str = str(element)
req = json.loads(element.decode('utf-8'))
if 'inbox_labels' in req_str:
type = 'label'
elif 'ad_id' in req_str:
type = 'ads'
elif ('postback' in req_str or 'referral' in req_str) and 'ref' in req_str:
type = 'website'
elif ('feed' in req_str) and ('comment' in req_str): # 仅评论,没有reaction_type
data = req['entry'][0]['changes'][0]['value']
from_id = data['from']['id']
if from_id not in page_data.keys(): # 仅更新不来自页面管理员
type = 'post'
yield TaggedOutput(type, element)
class ExtractReq(DoFn):
"""
A Transform that extracts elements and maps data for the Bigquery table depending on the tag from the prior step.
"""
def __init__(self, type):
self.type = type
def process(self, element):
req = json.loads(element.decode("utf-8"))
page_id = req['entry'][0]['id']
if self.type == 'label':
unix_datetime = req['entry'][0]['time']
data = req['entry'][0]['changes'][0]['value']
customer_id = data['user']['id']
label = data['label']['page_label_name']
label_action = data['action']
row = {'page_id': page_id,
'type': self.type,
'customer_id': customer_id,
'unix_datetime': unix_datetime,
'ad_data': None,
'ad_type': None,
'ad_title': None,
'post_id': None,
'post_link': None,
'label': label,
'action': label_action
}
elif self.type == 'ads':
data = req['entry'][0]['messaging'][0]
customer_id = data['sender']['id']
unix_datetime = int(str(req['entry'][0]['time'])[:-3])
if isinstance(data, str):
data = json.loads(data)
ad_id = data['referral']['ad_id']
ad_title = data['referral']['ads_context_data']['ad_title']
ad_type = data['referral']['type']
post_id = data['referral']['ads_context_data']['post_id']
row = {'page_id': page_id,
'type': self.type,
'customer_id': customer_id,
'unix_datetime': unix_datetime,
'ad_data': ad_id,
'ad_type': ad_type,
'ad_title': ad_title,
'post_id': post_id,
'post_link': None,
'label': None,
'action': None
}
elif self.type == 'website':
data = req['entry'][0]['messaging'][0]
# 时间戳有13位数字,所以去掉毫秒单位
unix_datetime = int(str(req['entry'][0]['time'])[:-3])
if isinstance(data, str):
data = json.loads(data)
customer_id = data['sender']['id']
ref = data['referral']['ref']
row = {'page_id': page_id,
'type': self.type,
'customer_id': customer_id,
'unix_datetime': unix_datetime,
'ad_data': ref,
'ad_type': None,
'ad_title': None,
'post_id': None,
'post_link': None,
'label': None,
'action': None
}
elif self.type == 'post':
data = req['entry'][0]['changes'][0]['value']
from_id = data['from']['id']
link = data['post']['permalink_url']
post_id = data['post_id']
unix_datetime = data['created_time']
verb = data['verb']
row = {'page_id': page_id,
'type': self.type,
'customer_id': from_id,
'unix_datetime': unix_datetime,
'ad_data': None,
'ad_type': None,
'ad_title': None,
'post_id': post_id,
'post_link': link,
'label': None,
'action': verb
}
return [row]
错误主要出现在类型'website'和'post'。例如:
KeyError: "post [while running 'extract req: post-ptransform-217']"
KeyError: "referral [while running 'extract req: website-ptransform-228']"
我曾手动测试了每种类型的一些请求,结果都正常。我认为每种类型的键确实存在。此外,我检查过有时第一步没有产生类型为'website'的输出,但仍然会出现从步骤'extract req: website'中出现的错误。我尝试使用if isinstance(data, str):
来检查,如果不是,就使用json.loads(data)
,但没有起作用。
英文:
I'm writing a data pipeline to extract 4 types of json which comes with different attribute structure and write it to Bigquery.
My pipeline :
data = (pipeline
| "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=known_args.input_subscription)
)
# first branch - push message to Datastore
p1 = (data
| "Add publish time" >> ParDo(AddTimestamp())
| "Create Entity" >> Map(EntityWrapper(known_args.kind).make_entity)
| "Write to Datastore" >> WriteToDatastore(DATASTORE_PROJECT)
)
# second branch - write data to Bigquery
p2 = (data
| "classify request depending on type" >> ParDo(ClassifyReq()).with_outputs('label', 'ads', 'website', 'post')
)
for type in ['label', 'ads', 'website', 'post'] :
result = (p2[type]
| "extract req: " + type >> ParDo(ExtractReq(type)) # step label name must be unique
| "Write to Bigquery: " + type >> io.WriteToBigQuery(
known_args.output_table,
schema=known_args.output_schema,
write_disposition=io.BigQueryDisposition.WRITE_APPEND,
)
)
My function :
class ClassifyReq(DoFn):
"""
A transform that classify type of Pub/Sub messages and outputs element with tag
"""
def process(self,element):
type = ''
req_str = str(element)
req = json.loads(element.decode('utf-8'))
if 'inbox_labels' in req_str:
type = 'label'
elif 'ad_id' in req_str:
type = 'ads'
elif ('postback' in req_str or 'referral' in req_str) and 'ref' in req_str:
type = 'website'
elif ('feed' in req_str) and ('comment' in req_str): # only comment , no reaction_type
data = req['entry'][0]['changes'][0]['value']
from_id = data['from']['id']
if from_id not in page_data.keys(): # only update not from page admin
type = 'post'
yield TaggedOutput(type, element)
class ExtractReq(DoFn):
"""
A Transform that extract element and map data for Bigquery table depending on
given tag from the prior step.
"""
def __init__(self, type):
self.type = type
def process(self, element):
req = json.loads(element.decode("utf-8"))
page_id = req['entry'][0]['id']
if self.type == 'label':
unix_datetime = req['entry'][0]['time']
data = req['entry'][0]['changes'][0]['value']
customer_id = data['user']['id']
label = data['label']['page_label_name']
label_action = data['action']
row = {'page_id': page_id,
'type': self.type,
'customer_id' : customer_id,
'unix_datetime': unix_datetime,
'ad_data' : None,
'ad_type' : None,
'ad_title' : None,
'post_id': None,
'post_link': None,
'label' : label,
'action' : label_action
}
elif self.type == 'ads':
data = req['entry'][0]['messaging'][0]
customer_id = data['sender']['id']
unix_datetime = int(str(req['entry'][0]['time'])[:-3])
if isinstance(data, str):
data = json.loads(str)
ad_id = data['referral']['ad_id']
ad_title = data['referral']['ads_context_data']['ad_title']
ad_type = data['referral']['type']
post_id = data['referral']['ads_context_data']['post_id']
row = {'page_id': page_id,
'type': self.type,
'customer_id' : customer_id,
'unix_datetime': unix_datetime,
'ad_data' : ad_id,
'ad_type' : ad_type,
'ad_title' : ad_title,
'post_id': post_id,
'post_link': None,
'label' : None,
'action' : None
}
elif self.type == 'website':
data = req['entry'][0]['messaging'][0]
# timestamp has 13 digits so cut out milli second unit
unix_datetime = int(str(req['entry'][0]['time'])[:-3])
if isinstance(data, str):
data = json.loads(str)
customer_id = data['sender']['id']
ref = data['referral']['ref']
row = {'page_id': page_id,
'type': self.type,
'customer_id' : customer_id,
'unix_datetime': unix_datetime,
'ad_data' : ref,
'ad_type' : None,
'ad_title' : None,
'post_id': None,
'post_link': None,
'label' : None,
'action' : None
}
elif self.type == 'post':
data = req['entry'][0]['changes'][0]['value']
from_id = data['from']['id']
link = data['post']['permalink_url']
post_id = data['post_id']
unix_datetime = data['created_time']
verb = data['verb']
row = {'page_id': page_id,
'type': self.type,
'customer_id' : from_id,
'unix_datetime': unix_datetime,
'ad_data' : None,
'ad_type' : None,
'ad_title' : None,
'post_id': post_id,
'post_link': link,
'label' : None,
'action' : verb
}
return [row]
My error mostly occured with the type 'website' and 'post. For example
KeyError: "post [while running 'extract req: post-ptransform-217']"
KeyError: "referral [while running 'extract req: website-ptransform-228']"
I used to manually test with some requests of each type and it works fine and I think the key really exists for each type. Moreover, I've checked that sometimes the first step has no website-typed output yet still error from step extract req: website occured. I have tried to check with if isinstance(data, str):
If not use json.loads(data)
but it doesn't work.
答案1
得分: 0
经过我苦苦挣扎了许多天,我找到了一种解决方法,通过将请求字符串写为Bigquery中的一列来调试我的代码。由于存在太多不需要的Webhook请求,我错过了一些分类规则,所以它们通过了分类,而没有我想要提取的关键信息。
英文:
After I struggled for many days, I got a solution to debug my code by writing a request string as one of a column in Bigquery. As there are too many cases of unwanted webhook request, I missed some rules in classification, so they passed the classification without the key I wants to extract.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论