英文:
Transform - Custom Code - runs great w 2 filters but want 3+ filters?
问题
在AWS Glue Studio中,我创建了一个“自定义代码 - 转换”来基于 - 最终 - 许多列筛选一些记录,只保留违反任何列验证不正确的记录,以便可以查看日志以确定无效值。
以下代码用于仅基于两列进行筛选:
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
df = dfc.select(list(dfc.keys())[0]).toDF()
from pyspark.sql import functions as sf
dfFirstRule = df.filter((df['Cardholder_ID'] < 3911539932589))
dfFirstRule = dfFirstRule.withColumn('invalid_cardholder_id', sf.lit('true'))
dfSecondRule = df.filter((df['Patient_Effective_Date'] < 20230101))
dfSecondRule = dfSecondRule.withColumn('invalid_patient_effective_date', sf.lit('true'))
outputDf = dfFirstRule.unionByName(dfSecondRule, allowMissingColumns=True)
output = DynamicFrame.fromDF(outputDf, glueContext, "output")
return (DynamicFrameCollection({"out0": output}, glueContext))
如果您想添加另一条规则(现在和以后可能会有许多规则),您可以重新构造它以更加优雅和高效地处理超过两条规则。
英文:
In AWS Glue Studio, I created a "Transform - Custom code" to filter some records based on - eventually - many columns, keeping only records that violate any column not validating correctly so the log can be looked at to determine invalid values.
The following code works to filter based only on two columns:
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    df = dfc.select(list(dfc.keys())[0]).toDF()
    from pyspark.sql import functions as sf
    dfFirstRule = df.filter((df['Cardholder_ID'] < 3911539932589))
    dfFirstRule = dfFirstRule.withColumn('invalid_cardholder_id', sf.lit('true'))
    dfSecondRule = df.filter((df['Patient_Effective_Date'] < 20230101))
    dfSecondRule = dfSecondRule.withColumn('invalid_patient_effective_date', sf.lit('true'))
    outputDf = dfFirstRule.unionByName(dfSecondRule, allowMissingColumns=True)
    output = DynamicFrame.fromDF(outputDf, glueContext, "output")
    return (DynamicFrameCollection({"out0": output}, glueContext))
If I want to add another rule (for now and later many rules), how can I restructure this so that it is more elegant and efficient to handle more than just two rules?
答案1
得分: 0
完成!
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
规则 = [
{'列': '持卡人ID', '运算符': '<', '值': 3911539932589, '错误': '无效持卡人ID'},
{'列': '患者生效日期', '运算符': '<', '值': 20230101, '错误': '无效患者生效日期'},
{'列': '患者终止原因', '运算符': '<', '值': 1, '错误': '无效患者终止原因'},
{'列': '患者终止日期', '运算符': '<', '值': 20230101, '错误': '无效患者终止日期'}
# 更多规则在此
]
df = dfc.select(list(dfc.keys())[0]).toDF()
from pyspark.sql import functions as sf
输出DF = None
for rule in 规则:
过滤后DF = df.filter((df[rule['列']] < rule['值']))
过滤后DF = 过滤后DF.withColumn(rule['错误'], sf.lit('true'))
if 输出DF is None:
输出DF = 过滤后DF
else:
输出DF = 输出DF.unionByName(过滤后DF, allowMissingColumns=True)
输出 = DynamicFrame.fromDF(输出DF, glueContext, "输出")
return (DynamicFrameCollection({"out0": 输出}, glueContext))
英文:
Did it!
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    rules = [
        {'column': 'Cardholder_ID', 'operator': '<', 'value': 3911539932589, 'errors': 'invalid_cardholder_id'},
        {'column': 'Patient_Effective_Date', 'operator': '<', 'value': 20230101, 'errors': 'invalid_patient_effective_date'},
        {'column': 'Patient_Term_Reason', 'operator': '<', 'value': 1, 'errors': 'invalid_patient_term_reason'},
        {'column': 'Patient_Term_Date', 'operator': '<', 'value': 20230101, 'errors': 'invalid_patient_term_date'}
        # more rules here
        ]
    df = dfc.select(list(dfc.keys())[0]).toDF()
    from pyspark.sql import functions as sf
    outputDf = None
    for rule in rules:
        filteredDf = df.filter((df[rule['column']] < rule['value']))
        filteredDf = filteredDf.withColumn(rule['errors'], sf.lit('true'))
        if outputDf is None:
            outputDf = filteredDf
        else:
            outputDf = outputDf.unionByName(filteredDf, allowMissingColumns=True)
    output = DynamicFrame.fromDF(outputDf, glueContext, "output")
    return (DynamicFrameCollection({"out0": output}, glueContext))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论