怎样在Java Spark中对一个包含array<string>类型的数据集进行单词统计?

huangapple go评论68阅读模式
英文:

How to do wordcount in a DataSet which one column is of array<string> type in Java Spark?

问题

我有一个数据集,两列:

Dataset<Row> words = data.select(functions.explode(functions.split(data.col("queryTerms"), ",\\s*")).as("word"));

Dataset<Row> wordCounts = words.join(input, functions.array_contains(functions.split(input.col("queryTerms"), ",\\s*"), words.col("word")))
                .groupBy("word")
                .agg(functions.sum("queryTermCounts").as("count"));
英文:

I have a Dataset, two columns:

queryTermCounts	        queryTerms
[1, 1, 1, 1, 1, 1...]	[hello, world, spark, hello, hello...]
[1, 1, 1, 1, 1, 1...]	[eat, fish, cat, cat, fish...]
[1, 1, 1, 1, 1, 1...]	[today, sun, sun, cloud, hello...]

queryTerms is the words in the document after word segmentation, queryTermCounts corresponds to the number of occurrences of words in queryTerms, but all are 1.

I hope to make statistics on the words in queryTerms again, count the words in queryTerms and put the corresponding count in querytermCounts, looks like:

queryTermCounts	        queryTerms
[3, 1, 1 ...]	    [hello, world, spark ...]
[2, 2, 1 ...]	    [fish, cat, eat ...]
[2, 1, 1, 1 ...]	[sun, today, cloud, hello...]

I try to use :

Dataset&lt;Row&gt; words = data.select(functions.explode(functions.split(data.col(&quot;queryTerms&quot;), &quot;,\\s*&quot;)).as(&quot;word&quot;));

Dataset&lt;Row&gt; wordCounts = words.join(input, functions.array_contains(functions.split(input.col(&quot;queryTerms&quot;), &quot;,\\s*&quot;), words.col(&quot;word&quot;)))
                .groupBy(&quot;word&quot;)
                .agg(functions.sum(&quot;queryTermCounts&quot;).as(&quot;count&quot;));

It work collect on String, but doesn't worked on array&lt;string&gt; type, can anyone help me ?

答案1

得分: 1

更新 - 仅使用queryTerms列,如评论中讨论的:

窗口是为了确保两个列表的顺序一致,因为collect_list不能保证顺序。

Java:

WindowSpec w = Window.orderBy(functions.col("queryTerms")).rowsBetween(Window.unboundedPreceding(), Window.unboundedFollowing());

df.select("queryTerms")
  .withColumn("queryTerms", functions.explode(functions.col("queryTerms")))
  .groupBy("queryTerms")
  .agg(functions.count("queryTerms").alias("queryTermCounts"))
  .select(functions.collect_list("queryTerms").over(w).alias("queryTerms"), functions.collect_list("queryTermCounts").over(w).alias("queryTermCounts"))
  .select(functions.first("queryTerms").alias("queryTerms"), functions.first("queryTermCounts").alias("queryTermCounts"))
  .show(false);

PySpark:

w = Window.orderBy(F.col("queryTerms")).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.select("queryTerms")\
 .withColumn("queryTerms", F.explode("queryTerms"))\
 .groupBy("queryTerms")\
 .agg(F.count("queryTerms").alias("queryTermCounts"))\
 .select(F.collect_list("queryTerms").over(w).alias("queryTerms"), F.collect_list("queryTermCounts").over(w).alias("queryTermCounts"))\
 .select(F.first("queryTerms").alias("queryTerms"), F.first("queryTermCounts").alias("queryTermCounts"))\
 .show(truncate=False)

更新 - 在Java中:

df
    .withColumn("asStruct", functions.arrays_zip(functions.col("queryTermCounts"), functions.col("queryTerms")))
    .withColumn("asStruct", functions.explode(functions.col("asStruct")))
    .select(functions.col("asStruct.*"))
    .groupBy("queryTerms")
    .agg(functions.sum("queryTermCounts").alias("queryTermCounts"))
    .withColumn("queryTermCounts", functions.collect_list("queryTermCounts").over(Window.orderBy("queryTerms")))
    .withColumn("queryTerms", functions.collect_list("queryTerms").over(Window.orderBy("queryTerms")))
    .withColumn("mx", functions.max(functions.size(functions.col("queryTerms"))).over(Window.orderBy(functions.lit("dummy"))))
    .filter(functions.size(functions.col("queryTerms")).equalTo(functions.col("mx")))
    .drop("mx")
    .show(false);

在Pyspark中,这将起作用(不久将发布Java等效代码):

df.withColumn('asStruct', F.arrays_zip('queryTermCounts', 'queryTerms'))\
.withColumn('asStruct', F.explode('asStruct'))\
.select(F.col('asStruct.*'))\
.groupBy("queryTerms")\
.agg(F.sum("queryTermCounts").alias("queryTermCounts"))\
.withColumn("queryTermCounts", F.collect_list("queryTermCounts").over(Window.orderBy("queryTerms")))\
.withColumn("queryTerms", F.collect_list("queryTerms").over(Window.orderBy("queryTerms")))\
.withColumn("mx", F.max(F.size("queryTerms")).over(Window.orderBy(F.lit("dummy"))))\
.filter(F.size("queryTerms")==F.col("mx"))\
.drop("mx")\
.show(truncate=False)

输入:

怎样在Java Spark中对一个包含array<string>类型的数据集进行单词统计?

输出:

怎样在Java Spark中对一个包含array<string>类型的数据集进行单词统计?

英文:

Update - Using just one column queryTerms as discussed in comments:

Window is to make sure both lists are in order, as collect_list doesn't guarantee order.

Java:

WindowSpec w = Window.orderBy(functions.col(&quot;queryTerms&quot;)).rowsBetween(Window.unboundedPreceding(), Window.unboundedFollowing());

df.select(&quot;queryTerms&quot;)
  .withColumn(&quot;queryTerms&quot;, functions.explode(functions.col(&quot;queryTerms&quot;)))
  .groupBy(&quot;queryTerms&quot;)
  .agg(functions.count(&quot;queryTerms&quot;).alias(&quot;queryTermCounts&quot;))
  .select(functions.collect_list(&quot;queryTerms&quot;).over(w).alias(&quot;queryTerms&quot;), functions.collect_list(&quot;queryTermCounts&quot;).over(w).alias(&quot;queryTermCounts&quot;))
  .select(functions.first(&quot;queryTerms&quot;).alias(&quot;queryTerms&quot;), functions.first(&quot;queryTermCounts&quot;).alias(&quot;queryTerms&quot;))
  .show(false);

PySpark:

w=Window.orderBy(F.col(&quot;queryTerms&quot;)).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing);

df.select(&quot;queryTerms&quot;)\
 .withColumn(&quot;queryTerms&quot;, F.explode(&quot;queryTerms&quot;))\
 .groupBy(&quot;queryTerms&quot;)\
 .agg(F.count(&quot;queryTerms&quot;).alias(&quot;queryTermCounts&quot;))\
 .select(F.collect_list(&quot;queryTerms&quot;).over(w).alias(&quot;queryTerms&quot;), F.collect_list(&quot;queryTermCounts&quot;).over(w).alias(&quot;queryTermCounts&quot;))\
 .select(F.first(&quot;queryTerms&quot;).alias(&quot;queryTerms&quot;), F.first(&quot;queryTermCounts&quot;).alias(&quot;queryTerms&quot;))\
 .show(truncate=False)

Update - In Java:

     df
        .withColumn(&quot;asStruct&quot;, functions.arrays_zip(functions.col(&quot;queryTermCounts&quot;), functions.col(&quot;queryTerms&quot;)))
        .withColumn(&quot;asStruct&quot;, functions.explode(functions.col(&quot;asStruct&quot;)))
        .select(functions.col(&quot;asStruct.*&quot;))
        .groupBy(&quot;queryTerms&quot;)
        .agg(functions.sum(&quot;queryTermCounts&quot;).alias(&quot;queryTermCounts&quot;))
        .withColumn(&quot;queryTermCounts&quot;, functions.collect_list(&quot;queryTermCounts&quot;).over(Window.orderBy(&quot;queryTerms&quot;)))
        .withColumn(&quot;queryTerms&quot;, functions.collect_list(&quot;queryTerms&quot;).over(Window.orderBy(&quot;queryTerms&quot;)))
        .withColumn(&quot;mx&quot;, functions.max(functions.size(functions.col(&quot;queryTerms&quot;))).over(Window.orderBy(functions.lit(&quot;dummy&quot;))))
        .filter(functions.size(functions.col(&quot;queryTerms&quot;)).equalTo(functions.col(&quot;mx&quot;)))
        .drop(&quot;mx&quot;)
        .show(false);

In Pyspark this would work (will post Java equivalent soon):

df.withColumn(&#39;asStruct&#39;, F.arrays_zip(&#39;queryTermCounts&#39;, &#39;queryTerms&#39;))\
.withColumn(&#39;asStruct&#39;, F.explode(&#39;asStruct&#39;))\
.select(F.col(&#39;asStruct.*&#39;))\
.groupBy(&quot;queryTerms&quot;)\
.agg(F.sum(&quot;queryTermCounts&quot;).alias(&quot;queryTermCounts&quot;))\
.withColumn(&quot;queryTermCounts&quot;, F.collect_list(&quot;queryTermCounts&quot;).over(Window.orderBy(&quot;queryTerms&quot;)))\
.withColumn(&quot;queryTerms&quot;, F.collect_list(&quot;queryTerms&quot;).over(Window.orderBy(&quot;queryTerms&quot;)))\
.withColumn(&quot;mx&quot;, F.max(F.size(&quot;queryTerms&quot;)).over(Window.orderBy(F.lit(&quot;dummy&quot;))))\
.filter(F.size(&quot;queryTerms&quot;)==F.col(&quot;mx&quot;))\
.drop(&quot;mx&quot;)\
.show(truncate=False)

Input:

怎样在Java Spark中对一个包含array<string>类型的数据集进行单词统计?

Output:

怎样在Java Spark中对一个包含array<string>类型的数据集进行单词统计?

答案2

得分: 1

我只使用了数组函数和映射来获得结果。

PySpark

data = [
    [['hello', 'world', 'spark', 'hello', 'hello']],
    [['eat', 'fish', 'cat', 'cat', 'fish']],
    [['today', 'sun', 'sun', 'cloud', 'hello']]
]

df = spark.createDataFrame(data, ['queryTerms'])

df.withColumn('countMap', f.aggregate(
    'queryTerms',
    f.create_map().cast('map<string, int>'),
    lambda acc, w: f.map_concat(f.map_filter(acc, lambda k, v: k != w), f.create_map(w, f.coalesce(acc[w] + f.lit(1), f.lit(1)))))).withColumn('queryTerms', f.map_keys('countMap')).withColumn('queryTermsCount', f.map_values('countMap')).drop('countMap').show(truncate=False)
英文:

I have only used the array functions and map to get the result.

PySpark

data = [
    [[&#39;hello&#39;, &#39;world&#39;, &#39;spark&#39;, &#39;hello&#39;, &#39;hello&#39;]],
    [[&#39;eat&#39;, &#39;fish&#39;, &#39;cat&#39;, &#39;cat&#39;, &#39;fish&#39;]],
    [[&#39;today&#39;, &#39;sun&#39;, &#39;sun&#39;, &#39;cloud&#39;, &#39;hello&#39;]]
]

df = spark.createDataFrame(data, [&#39;queryTerms&#39;])

df.withColumn(&#39;countMap&#39;, f.aggregate(
    &#39;queryTerms&#39;,
    f.create_map().cast(&#39;map&lt;string, int&gt;&#39;),
    lambda acc, w: f.map_concat(f.map_filter(acc, lambda k, v: k != w), f.create_map(w, f.coalesce(acc[w] + f.lit(1), f.lit(1)))))
  ) \
  .withColumn(&#39;queryTerms&#39;, f.map_keys(&#39;countMap&#39;)) \
  .withColumn(&#39;queryTermsCount&#39;, f.map_values(&#39;countMap&#39;)) \
  .drop(&#39;countMap&#39;) \
  .show(truncate=False)

+--------------------------+---------------+
|queryTerms                |queryTermsCount|
+--------------------------+---------------+
|[world, spark, hello]     |[1, 1, 3]      |
|[eat, cat, fish]          |[1, 2, 2]      |
|[today, sun, cloud, hello]|[1, 2, 1, 1]   |
+--------------------------+---------------+

huangapple
  • 本文由 发表于 2023年2月19日 08:26:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/75497270.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定