Transform – 自定义代码 – 使用2个滤镜很好,但想要3个或更多滤镜?

huangapple go评论61阅读模式
英文:

Transform - Custom Code - runs great w 2 filters but want 3+ filters?

问题

在AWS Glue Studio中,我创建了一个“自定义代码 - 转换”来基于 - 最终 - 许多列筛选一些记录,只保留违反任何列验证不正确的记录,以便可以查看日志以确定无效值。

以下代码用于仅基于两列进行筛选:

def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    df = dfc.select(list(dfc.keys())[0]).toDF()
    from pyspark.sql import functions as sf

    dfFirstRule = df.filter((df['Cardholder_ID'] < 3911539932589))
    dfFirstRule = dfFirstRule.withColumn('invalid_cardholder_id', sf.lit('true'))

    dfSecondRule = df.filter((df['Patient_Effective_Date'] < 20230101))
    dfSecondRule = dfSecondRule.withColumn('invalid_patient_effective_date', sf.lit('true'))

    outputDf = dfFirstRule.unionByName(dfSecondRule, allowMissingColumns=True)

    output = DynamicFrame.fromDF(outputDf, glueContext, "output")
    return (DynamicFrameCollection({"out0": output}, glueContext))

如果您想添加另一条规则(现在和以后可能会有许多规则),您可以重新构造它以更加优雅和高效地处理超过两条规则。

英文:

In AWS Glue Studio, I created a "Transform - Custom code" to filter some records based on - eventually - many columns, keeping only records that violate any column not validating correctly so the log can be looked at to determine invalid values.

The following code works to filter based only on two columns:

def MyTransform (glueContext, dfc) -&gt; DynamicFrameCollection:
&#160;&#160;&#160; df = dfc.select(list(dfc.keys())[0]).toDF()
&#160;&#160;&#160; from pyspark.sql import functions as sf

&#160;&#160;&#160; dfFirstRule = df.filter((df[&#39;Cardholder_ID&#39;] &lt; 3911539932589))
&#160;&#160;&#160; dfFirstRule = dfFirstRule.withColumn(&#39;invalid_cardholder_id&#39;, sf.lit(&#39;true&#39;))

&#160;&#160;&#160; dfSecondRule = df.filter((df[&#39;Patient_Effective_Date&#39;] &lt; 20230101))
&#160;&#160;&#160; dfSecondRule = dfSecondRule.withColumn(&#39;invalid_patient_effective_date&#39;, sf.lit(&#39;true&#39;))

&#160;&#160;&#160; outputDf = dfFirstRule.unionByName(dfSecondRule, allowMissingColumns=True)

&#160;&#160;&#160; output = DynamicFrame.fromDF(outputDf, glueContext, &quot;output&quot;)
&#160;&#160;&#160; return (DynamicFrameCollection({&quot;out0&quot;: output}, glueContext))

If I want to add another rule (for now and later many rules), how can I restructure this so that it is more elegant and efficient to handle more than just two rules?

答案1

得分: 0

完成!

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    规则 = [
        {'列': '持卡人ID', '运算符': '<', '值': 3911539932589, '错误': '无效持卡人ID'},
        {'列': '患者生效日期', '运算符': '<', '值': 20230101, '错误': '无效患者生效日期'},
        {'列': '患者终止原因', '运算符': '<', '值': 1, '错误': '无效患者终止原因'},
        {'列': '患者终止日期', '运算符': '<', '值': 20230101, '错误': '无效患者终止日期'}
        # 更多规则在此
    ]
    df = dfc.select(list(dfc.keys())[0]).toDF()
    from pyspark.sql import functions as sf
    输出DF = None
    for rule in 规则:
        过滤后DF = df.filter((df[rule['列']] < rule['值']))
        过滤后DF = 过滤后DF.withColumn(rule['错误'], sf.lit('true'))
        if 输出DF is None:
            输出DF = 过滤后DF
        else:
            输出DF = 输出DF.unionByName(过滤后DF, allowMissingColumns=True)
    输出 = DynamicFrame.fromDF(输出DF, glueContext, "输出")
    return (DynamicFrameCollection({"out0": 输出}, glueContext))
英文:

Did it!

def MyTransform (glueContext, dfc) -&gt; DynamicFrameCollection:
&#160;&#160;&#160; rules = [
&#160;&#160;&#160;&#160;&#160;&#160;&#160; {&#39;column&#39;: &#39;Cardholder_ID&#39;, &#39;operator&#39;: &#39;&lt;&#39;, &#39;value&#39;: 3911539932589, &#39;errors&#39;: &#39;invalid_cardholder_id&#39;},
&#160;&#160;&#160;&#160;&#160;&#160;&#160; {&#39;column&#39;: &#39;Patient_Effective_Date&#39;, &#39;operator&#39;: &#39;&lt;&#39;, &#39;value&#39;: 20230101, &#39;errors&#39;: &#39;invalid_patient_effective_date&#39;},
&#160;&#160;&#160;&#160;&#160;&#160;&#160; {&#39;column&#39;: &#39;Patient_Term_Reason&#39;, &#39;operator&#39;: &#39;&lt;&#39;, &#39;value&#39;: 1, &#39;errors&#39;: &#39;invalid_patient_term_reason&#39;},
&#160;&#160;&#160;&#160;&#160;&#160;&#160; {&#39;column&#39;: &#39;Patient_Term_Date&#39;, &#39;operator&#39;: &#39;&lt;&#39;, &#39;value&#39;: 20230101, &#39;errors&#39;: &#39;invalid_patient_term_date&#39;}
&#160;&#160;&#160;&#160;&#160;&#160;&#160; # more rules here
&#160;&#160;&#160;&#160;&#160;&#160;&#160; ]
&#160;&#160;&#160; df = dfc.select(list(dfc.keys())[0]).toDF()
&#160;&#160;&#160; from pyspark.sql import functions as sf
&#160;&#160;&#160; outputDf = None
&#160;&#160;&#160; for rule in rules:
&#160;&#160;&#160;&#160;&#160;&#160;&#160; filteredDf = df.filter((df[rule[&#39;column&#39;]] &lt; rule[&#39;value&#39;]))
&#160;&#160;&#160;&#160;&#160;&#160;&#160; filteredDf = filteredDf.withColumn(rule[&#39;errors&#39;], sf.lit(&#39;true&#39;))
&#160;&#160;&#160;&#160;&#160;&#160;&#160; if outputDf is None:
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; outputDf = filteredDf
&#160;&#160;&#160;&#160;&#160;&#160;&#160; else:
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; outputDf = outputDf.unionByName(filteredDf, allowMissingColumns=True)
&#160;&#160;&#160; output = DynamicFrame.fromDF(outputDf, glueContext, &quot;output&quot;)
&#160;&#160;&#160; return (DynamicFrameCollection({&quot;out0&quot;: output}, glueContext))

huangapple
  • 本文由 发表于 2023年4月19日 22:52:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/76055943.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定