Flink:将流与静态列表进行左连接

huangapple go评论76阅读模式
英文:

Flink: Left joining a stream with a static list

问题

我想将Attempts的流式数据加入到一个静态的被封锁邮箱列表中并且按照IP对结果进行分组以便稍后统计相关统计数据结果应该按照每10秒钟后的30分钟滑动窗口进行传递以下是我尝试实现此目标的几种方式之一

override fun performQuery(): Table {
    val query = "SELECT ip, " +
        "COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
        "COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
        "COUNT(DISTINCT id) accounts, " +
        "COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
        "COUNT(CASE WHEN blockedEmail IS NOT NULL THEN 1 END) AS blocked_accounts " +
        "FROM Attempts " +
        "LEFT JOIN LATERAL TABLE(blockedEmailsList()) AS T(blockedEmail) ON TRUE " +
        "WHERE Attempts.email <> '' AND Attempts.createdAt < CURRENT_TIMESTAMP " +
        "GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"

    return runQuery(query)
        .select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}

这段代码使用了下面的用户自定义表函数,该函数已经在我的 tableEnv 中注册为 blockedEmailsList

public class BlockedEmailsList extends TableFunction<Row> {
    private Collection<String> emails;

    public BlockedEmailsList(Collection<String> emails) {
        this.emails = emails;
    }

    public Row read(String email) {
        return Row.of(email);
    }

    public void eval() {
        this.emails.forEach(email -> collect(read(email)));
    }
}

然而,它返回了下面的错误信息:

Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

如果我按照建议将 created_at 转换为 TIMESTAMP,则会得到以下错误:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.

我在Stack Overflow上找到了与这些异常相关的其他问题,但它们涉及流和时间表,没有一个解决了将流与静态列表进行连接的情况。

有什么想法吗?

编辑: 看起来Flink项目中有一个针对我的用例的未解决问题:https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API

所以,我也接受解决方法的建议。


<details>
<summary>英文:</summary>

I want to join a streaming of Attempts to a static list of blocked emails and group the result by IP, so I can later count a pack of relevant stats. The result should be delivered as a sliding window of 30 minutes after each 10 seconds. Below is one of several ways I have tried to achieve this:

override fun performQuery(): Table {
val query = "SELECT ip, " +
"COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
"COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
"COUNT(DISTINCT id) accounts, " +
"COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
"COUNT(CASE WHEN blockedEmail IS NOT NULL THEN 1 END) AS blocked_accounts " +
"FROM Attempts " +
"LEFT JOIN LATERAL TABLE(blockedEmailsList()) AS T(blockedEmail) ON TRUE " +
"WHERE Attempts.email <> '' AND Attempts.createdAt < CURRENT_TIMESTAMP " +
"GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"

return runQuery(query)
    .select(&quot;ip, accounts, fails, successes, non_existing_accounts, blocked_accounts&quot;)

}

This uses the User-Defined Table Function below, which is already registered in my `tableEnv` as `blockedEmailsList`:

public class BlockedEmailsList extends TableFunction<Row> {
private Collection<String> emails;

public BlockedEmailsList(Collection&lt;String&gt; emails) {
    this.emails = emails;
}

public Row read(String email) {
    return Row.of(email);
}

public void eval() {
    this.emails.forEach(email -&gt; collect(read(email)));
}

}


However, it returns the error below:

Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

If I do as it suggests and cast the `created_at` to `TIMESTAMP`, I get this instead:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.

I have found other questions here on Stack Overflow related to theses exceptions, but they involve streams and temporal tables and none of them solves a case of joining a stream to a static list.

Any ideas?

**EDIT:** Looks like there is an open issue in the Flink project for my use case: https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API

So, I&#39;m also accepting workaround suggestions.

</details>


# 答案1
**得分**: 1

Caused by: org.apache.flink.table.api.TableException: 行时间属性不得出现在常规连接的输入行中。作为一种解决方法,您可以在输入表的时间属性之前将其转换为时间戳。

原因是侧向表函数是 Flink 的常规连接,而常规连接将发送空值,例如:

    左表:(K0,A),右表(K1,T1)  => 发送    (K0,A,NULL,NULL)
    左表:        ,右表(K0,T2)  => 撤消    (K0,A,NULL,NULL)  
                                      发送    (K0,A,K0,T2)

因此,连接后来自输入流的时间属性将丢失。

在您的情况下,您不需要一个 TableFunction,您可以使用一个 Scalar Function,例如:

    public static class BlockedEmailFunction extends ScalarFunction {
        private static List<String> blockedEmails = ...;
        public Boolean eval(String email) {
           return blockedEmails.contains(attempt.getEmail());
        }
    }

    // 注册函数
    env.createTemporarySystemFunction("blockedEmailFunction", BlockedEmailFunction.class);

    // 在 SQL 中调用已注册的函数并执行您期望的窗口操作
    env.sqlQuery("SELECT blockedEmailFunction(email) as status, ip, createdAt FROM Attempts");

<details>
<summary>英文:</summary>

    Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

The reason is lateral table function is a Flink regular join, and a regular join  will send null value, for example

    left:(K0, A), right(K1, T1)  =&gt; send    (K0, A, NULL, NULL)
    left:         , right(K0, T2) =&gt; retract (K0, A, NULL, NULL )  
                                       send   (K0, A, K0, T2)

and thus the time attribute from input stream will loss after join.

In your case, you do not need a TableFunction, you can use a Scalar Function
like: 

     public static class BlockedEmailFunction extends ScalarFunction {
         private static List&lt;String&gt; blockedEmails = ...;
         public Boolean eval(String email) {
            return blockedEmails.contains(attempt.getEmail());
         }
     }
 

    // register function
    env.createTemporarySystemFunction(&quot;blockedEmailFunction&quot;, BlockedEmailFunction.class);
    
    // call registered function in SQL and do window operation as your expected
    env.sqlQuery(&quot;SELECT blockedEmailFunction(email) as status, ip, createdAt FROM Attempts&quot;);
     



</details>



# 答案2
**得分**: 0

我成功地实现了一个解决方案,解决了我的问题!

与其将流式尝试(Attempts)与静态邮件列表进行连接,我事先将每个尝试(Attempt)映射到一个新的尝试,其中添加了一个`blockedEmail`属性。如果静态列表`blockedEmails`包含当前尝试的电子邮件,我会将其`blockedEmail`属性设置为`true`。

```java
DataStream<Attempt> attemptsStream = sourceApi.<Attempt>startStream().map(new MapFunction<Attempt, Attempt>() {
    @Override
    public Attempt map(Attempt attempt) throws Exception {
        if (blockedEmails.contains(attempt.getEmail())) {
            attempt.setBlockedEmail(true);
        }
        return attempt;
    }
});

静态列表blockedEmails的类型为HashSet,因此查找的复杂度为O(1)。

最后,分组查询被调整为:

override fun performQuery(): Table {
    val query = "SELECT ip, " +
        "COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
        "COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
        "COUNT(DISTINCT id) accounts, " +
        "COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
        "COUNT(CASE WHEN blockedEmail IS true THEN 1 END) AS blocked_accounts " +
        "FROM Attempts " +
        "WHERE Attempts.email <> '' " +
        "GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"

    return runQuery(query)
        .select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}

到目前为止,流和静态列表之间的连接问题似乎尚未解决,但在我的情况下,上述解决方案已经很好地解决了这个问题。

英文:

I managed to implement a workaround which solved my problem!

Instead of joining streamed Attempts with the static list of emails, I mapped beforehand each Attempt to a new one with an added blockedEmail attribute. If the static list blockedEmails contains the current Attempt email, I set its blockedEmail attribute to true.

DataStream&lt;Attempt&gt; attemptsStream = sourceApi.&lt;Attempt&gt;startStream().map(new MapFunction&lt;Attempt, Attempt&gt;() {
    @Override
    public Attempt map(Attempt attempt) throws Exception {
        if (blockedEmails.contains(attempt.getEmail())) {
            attempt.setBlockedEmail(true);
        }
        return attempt;
    }
});

The static list blockedEmails is of type HashSet, so a lookup would be O(1).

Finally, the grouping query was adjusted to:

override fun performQuery(): Table {
    val query = &quot;SELECT ip, &quot; +
        &quot;COUNT(CASE WHEN success IS false THEN 1 END) AS fails, &quot; +
        &quot;COUNT(CASE WHEN success IS true THEN 1 END) AS successes, &quot; +
        &quot;COUNT(DISTINCT id) accounts, &quot; +
        &quot;COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, &quot; +
        &quot;COUNT(CASE WHEN blockedEmail IS true THEN 1 END) AS blocked_accounts &quot; +
        &quot;FROM Attempts &quot; +
        &quot;WHERE Attempts.email &lt;&gt; &#39;&#39; &quot; +
        &quot;GROUP BY HOP(Attempts.createdAt, INTERVAL &#39;10&#39; SECOND, INTERVAL &#39;30&#39; MINUTE), ip&quot;

    return runQuery(query)
        .select(&quot;ip, accounts, fails, successes, non_existing_accounts, blocked_accounts&quot;)
}

So far, the joining problem between streams and static lists seems unresolved yet, but in my case the above workaround solution solved it fine.

huangapple
  • 本文由 发表于 2020年4月9日 07:25:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/61111592.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定