英文:
How to do wordcount in a DataSet which one column is of array<string> type in Java Spark?
问题
我有一个数据集,两列:
Dataset<Row> words = data.select(functions.explode(functions.split(data.col("queryTerms"), ",\\s*")).as("word"));
Dataset<Row> wordCounts = words.join(input, functions.array_contains(functions.split(input.col("queryTerms"), ",\\s*"), words.col("word")))
                .groupBy("word")
                .agg(functions.sum("queryTermCounts").as("count"));
英文:
I have a Dataset, two columns:
queryTermCounts	        queryTerms
[1, 1, 1, 1, 1, 1...]	[hello, world, spark, hello, hello...]
[1, 1, 1, 1, 1, 1...]	[eat, fish, cat, cat, fish...]
[1, 1, 1, 1, 1, 1...]	[today, sun, sun, cloud, hello...]
queryTerms is the words in the document after word segmentation, queryTermCounts corresponds to the number of occurrences of words in queryTerms, but all are 1.
I hope to make statistics on the words in queryTerms again, count the words in queryTerms and put the corresponding count in querytermCounts, looks like:
queryTermCounts	        queryTerms
[3, 1, 1 ...]	    [hello, world, spark ...]
[2, 2, 1 ...]	    [fish, cat, eat ...]
[2, 1, 1, 1 ...]	[sun, today, cloud, hello...]
I try to use :
Dataset<Row> words = data.select(functions.explode(functions.split(data.col("queryTerms"), ",\\s*")).as("word"));
Dataset<Row> wordCounts = words.join(input, functions.array_contains(functions.split(input.col("queryTerms"), ",\\s*"), words.col("word")))
                .groupBy("word")
                .agg(functions.sum("queryTermCounts").as("count"));
It work collect on String, but doesn't worked on array<string> type, can anyone help me ?
答案1
得分: 1
更新 - 仅使用queryTerms列,如评论中讨论的:
窗口是为了确保两个列表的顺序一致,因为collect_list不能保证顺序。
Java:
WindowSpec w = Window.orderBy(functions.col("queryTerms")).rowsBetween(Window.unboundedPreceding(), Window.unboundedFollowing());
df.select("queryTerms")
  .withColumn("queryTerms", functions.explode(functions.col("queryTerms")))
  .groupBy("queryTerms")
  .agg(functions.count("queryTerms").alias("queryTermCounts"))
  .select(functions.collect_list("queryTerms").over(w).alias("queryTerms"), functions.collect_list("queryTermCounts").over(w).alias("queryTermCounts"))
  .select(functions.first("queryTerms").alias("queryTerms"), functions.first("queryTermCounts").alias("queryTermCounts"))
  .show(false);
PySpark:
w = Window.orderBy(F.col("queryTerms")).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.select("queryTerms")\
 .withColumn("queryTerms", F.explode("queryTerms"))\
 .groupBy("queryTerms")\
 .agg(F.count("queryTerms").alias("queryTermCounts"))\
 .select(F.collect_list("queryTerms").over(w).alias("queryTerms"), F.collect_list("queryTermCounts").over(w).alias("queryTermCounts"))\
 .select(F.first("queryTerms").alias("queryTerms"), F.first("queryTermCounts").alias("queryTermCounts"))\
 .show(truncate=False)
更新 - 在Java中:
df
    .withColumn("asStruct", functions.arrays_zip(functions.col("queryTermCounts"), functions.col("queryTerms")))
    .withColumn("asStruct", functions.explode(functions.col("asStruct")))
    .select(functions.col("asStruct.*"))
    .groupBy("queryTerms")
    .agg(functions.sum("queryTermCounts").alias("queryTermCounts"))
    .withColumn("queryTermCounts", functions.collect_list("queryTermCounts").over(Window.orderBy("queryTerms")))
    .withColumn("queryTerms", functions.collect_list("queryTerms").over(Window.orderBy("queryTerms")))
    .withColumn("mx", functions.max(functions.size(functions.col("queryTerms"))).over(Window.orderBy(functions.lit("dummy"))))
    .filter(functions.size(functions.col("queryTerms")).equalTo(functions.col("mx")))
    .drop("mx")
    .show(false);
在Pyspark中,这将起作用(不久将发布Java等效代码):
df.withColumn('asStruct', F.arrays_zip('queryTermCounts', 'queryTerms'))\
.withColumn('asStruct', F.explode('asStruct'))\
.select(F.col('asStruct.*'))\
.groupBy("queryTerms")\
.agg(F.sum("queryTermCounts").alias("queryTermCounts"))\
.withColumn("queryTermCounts", F.collect_list("queryTermCounts").over(Window.orderBy("queryTerms")))\
.withColumn("queryTerms", F.collect_list("queryTerms").over(Window.orderBy("queryTerms")))\
.withColumn("mx", F.max(F.size("queryTerms")).over(Window.orderBy(F.lit("dummy"))))\
.filter(F.size("queryTerms")==F.col("mx"))\
.drop("mx")\
.show(truncate=False)
输入:

输出:

英文:
Update - Using just one column queryTerms as discussed in comments:
Window is to make sure both lists are in order, as collect_list doesn't guarantee order.
Java:
WindowSpec w = Window.orderBy(functions.col("queryTerms")).rowsBetween(Window.unboundedPreceding(), Window.unboundedFollowing());
df.select("queryTerms")
  .withColumn("queryTerms", functions.explode(functions.col("queryTerms")))
  .groupBy("queryTerms")
  .agg(functions.count("queryTerms").alias("queryTermCounts"))
  .select(functions.collect_list("queryTerms").over(w).alias("queryTerms"), functions.collect_list("queryTermCounts").over(w).alias("queryTermCounts"))
  .select(functions.first("queryTerms").alias("queryTerms"), functions.first("queryTermCounts").alias("queryTerms"))
  .show(false);
PySpark:
w=Window.orderBy(F.col("queryTerms")).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing);
df.select("queryTerms")\
 .withColumn("queryTerms", F.explode("queryTerms"))\
 .groupBy("queryTerms")\
 .agg(F.count("queryTerms").alias("queryTermCounts"))\
 .select(F.collect_list("queryTerms").over(w).alias("queryTerms"), F.collect_list("queryTermCounts").over(w).alias("queryTermCounts"))\
 .select(F.first("queryTerms").alias("queryTerms"), F.first("queryTermCounts").alias("queryTerms"))\
 .show(truncate=False)
Update - In Java:
     df
        .withColumn("asStruct", functions.arrays_zip(functions.col("queryTermCounts"), functions.col("queryTerms")))
        .withColumn("asStruct", functions.explode(functions.col("asStruct")))
        .select(functions.col("asStruct.*"))
        .groupBy("queryTerms")
        .agg(functions.sum("queryTermCounts").alias("queryTermCounts"))
        .withColumn("queryTermCounts", functions.collect_list("queryTermCounts").over(Window.orderBy("queryTerms")))
        .withColumn("queryTerms", functions.collect_list("queryTerms").over(Window.orderBy("queryTerms")))
        .withColumn("mx", functions.max(functions.size(functions.col("queryTerms"))).over(Window.orderBy(functions.lit("dummy"))))
        .filter(functions.size(functions.col("queryTerms")).equalTo(functions.col("mx")))
        .drop("mx")
        .show(false);
In Pyspark this would work (will post Java equivalent soon):
df.withColumn('asStruct', F.arrays_zip('queryTermCounts', 'queryTerms'))\
.withColumn('asStruct', F.explode('asStruct'))\
.select(F.col('asStruct.*'))\
.groupBy("queryTerms")\
.agg(F.sum("queryTermCounts").alias("queryTermCounts"))\
.withColumn("queryTermCounts", F.collect_list("queryTermCounts").over(Window.orderBy("queryTerms")))\
.withColumn("queryTerms", F.collect_list("queryTerms").over(Window.orderBy("queryTerms")))\
.withColumn("mx", F.max(F.size("queryTerms")).over(Window.orderBy(F.lit("dummy"))))\
.filter(F.size("queryTerms")==F.col("mx"))\
.drop("mx")\
.show(truncate=False)
Input:
Output:
答案2
得分: 1
我只使用了数组函数和映射来获得结果。
PySpark
data = [
    [['hello', 'world', 'spark', 'hello', 'hello']],
    [['eat', 'fish', 'cat', 'cat', 'fish']],
    [['today', 'sun', 'sun', 'cloud', 'hello']]
]
df = spark.createDataFrame(data, ['queryTerms'])
df.withColumn('countMap', f.aggregate(
    'queryTerms',
    f.create_map().cast('map<string, int>'),
    lambda acc, w: f.map_concat(f.map_filter(acc, lambda k, v: k != w), f.create_map(w, f.coalesce(acc[w] + f.lit(1), f.lit(1)))))).withColumn('queryTerms', f.map_keys('countMap')).withColumn('queryTermsCount', f.map_values('countMap')).drop('countMap').show(truncate=False)
英文:
I have only used the array functions and map to get the result.
PySpark
data = [
    [['hello', 'world', 'spark', 'hello', 'hello']],
    [['eat', 'fish', 'cat', 'cat', 'fish']],
    [['today', 'sun', 'sun', 'cloud', 'hello']]
]
df = spark.createDataFrame(data, ['queryTerms'])
df.withColumn('countMap', f.aggregate(
    'queryTerms',
    f.create_map().cast('map<string, int>'),
    lambda acc, w: f.map_concat(f.map_filter(acc, lambda k, v: k != w), f.create_map(w, f.coalesce(acc[w] + f.lit(1), f.lit(1)))))
  ) \
  .withColumn('queryTerms', f.map_keys('countMap')) \
  .withColumn('queryTermsCount', f.map_values('countMap')) \
  .drop('countMap') \
  .show(truncate=False)
+--------------------------+---------------+
|queryTerms                |queryTermsCount|
+--------------------------+---------------+
|[world, spark, hello]     |[1, 1, 3]      |
|[eat, cat, fish]          |[1, 2, 2]      |
|[today, sun, cloud, hello]|[1, 2, 1, 1]   |
+--------------------------+---------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论