英文:
How to do wordcount in a DataSet which one column is of array<string> type in Java Spark?
问题
我有一个数据集,两列:
Dataset<Row> words = data.select(functions.explode(functions.split(data.col("queryTerms"), ",\\s*")).as("word"));
Dataset<Row> wordCounts = words.join(input, functions.array_contains(functions.split(input.col("queryTerms"), ",\\s*"), words.col("word")))
.groupBy("word")
.agg(functions.sum("queryTermCounts").as("count"));
英文:
I have a Dataset, two columns:
queryTermCounts queryTerms
[1, 1, 1, 1, 1, 1...] [hello, world, spark, hello, hello...]
[1, 1, 1, 1, 1, 1...] [eat, fish, cat, cat, fish...]
[1, 1, 1, 1, 1, 1...] [today, sun, sun, cloud, hello...]
queryTerms is the words in the document after word segmentation, queryTermCounts corresponds to the number of occurrences of words in queryTerms, but all are 1.
I hope to make statistics on the words in queryTerms again, count the words in queryTerms and put the corresponding count in querytermCounts, looks like:
queryTermCounts queryTerms
[3, 1, 1 ...] [hello, world, spark ...]
[2, 2, 1 ...] [fish, cat, eat ...]
[2, 1, 1, 1 ...] [sun, today, cloud, hello...]
I try to use :
Dataset<Row> words = data.select(functions.explode(functions.split(data.col("queryTerms"), ",\\s*")).as("word"));
Dataset<Row> wordCounts = words.join(input, functions.array_contains(functions.split(input.col("queryTerms"), ",\\s*"), words.col("word")))
.groupBy("word")
.agg(functions.sum("queryTermCounts").as("count"));
It work collect on String, but doesn't worked on array<string>
type, can anyone help me ?
答案1
得分: 1
更新 - 仅使用queryTerms
列,如评论中讨论的:
窗口是为了确保两个列表的顺序一致,因为collect_list不能保证顺序。
Java:
WindowSpec w = Window.orderBy(functions.col("queryTerms")).rowsBetween(Window.unboundedPreceding(), Window.unboundedFollowing());
df.select("queryTerms")
.withColumn("queryTerms", functions.explode(functions.col("queryTerms")))
.groupBy("queryTerms")
.agg(functions.count("queryTerms").alias("queryTermCounts"))
.select(functions.collect_list("queryTerms").over(w).alias("queryTerms"), functions.collect_list("queryTermCounts").over(w).alias("queryTermCounts"))
.select(functions.first("queryTerms").alias("queryTerms"), functions.first("queryTermCounts").alias("queryTermCounts"))
.show(false);
PySpark:
w = Window.orderBy(F.col("queryTerms")).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.select("queryTerms")\
.withColumn("queryTerms", F.explode("queryTerms"))\
.groupBy("queryTerms")\
.agg(F.count("queryTerms").alias("queryTermCounts"))\
.select(F.collect_list("queryTerms").over(w).alias("queryTerms"), F.collect_list("queryTermCounts").over(w).alias("queryTermCounts"))\
.select(F.first("queryTerms").alias("queryTerms"), F.first("queryTermCounts").alias("queryTermCounts"))\
.show(truncate=False)
更新 - 在Java中:
df
.withColumn("asStruct", functions.arrays_zip(functions.col("queryTermCounts"), functions.col("queryTerms")))
.withColumn("asStruct", functions.explode(functions.col("asStruct")))
.select(functions.col("asStruct.*"))
.groupBy("queryTerms")
.agg(functions.sum("queryTermCounts").alias("queryTermCounts"))
.withColumn("queryTermCounts", functions.collect_list("queryTermCounts").over(Window.orderBy("queryTerms")))
.withColumn("queryTerms", functions.collect_list("queryTerms").over(Window.orderBy("queryTerms")))
.withColumn("mx", functions.max(functions.size(functions.col("queryTerms"))).over(Window.orderBy(functions.lit("dummy"))))
.filter(functions.size(functions.col("queryTerms")).equalTo(functions.col("mx")))
.drop("mx")
.show(false);
在Pyspark中,这将起作用(不久将发布Java等效代码):
df.withColumn('asStruct', F.arrays_zip('queryTermCounts', 'queryTerms'))\
.withColumn('asStruct', F.explode('asStruct'))\
.select(F.col('asStruct.*'))\
.groupBy("queryTerms")\
.agg(F.sum("queryTermCounts").alias("queryTermCounts"))\
.withColumn("queryTermCounts", F.collect_list("queryTermCounts").over(Window.orderBy("queryTerms")))\
.withColumn("queryTerms", F.collect_list("queryTerms").over(Window.orderBy("queryTerms")))\
.withColumn("mx", F.max(F.size("queryTerms")).over(Window.orderBy(F.lit("dummy"))))\
.filter(F.size("queryTerms")==F.col("mx"))\
.drop("mx")\
.show(truncate=False)
输入:
输出:
英文:
Update - Using just one column queryTerms
as discussed in comments:
Window is to make sure both lists are in order, as collect_list doesn't guarantee order.
Java:
WindowSpec w = Window.orderBy(functions.col("queryTerms")).rowsBetween(Window.unboundedPreceding(), Window.unboundedFollowing());
df.select("queryTerms")
.withColumn("queryTerms", functions.explode(functions.col("queryTerms")))
.groupBy("queryTerms")
.agg(functions.count("queryTerms").alias("queryTermCounts"))
.select(functions.collect_list("queryTerms").over(w).alias("queryTerms"), functions.collect_list("queryTermCounts").over(w).alias("queryTermCounts"))
.select(functions.first("queryTerms").alias("queryTerms"), functions.first("queryTermCounts").alias("queryTerms"))
.show(false);
PySpark:
w=Window.orderBy(F.col("queryTerms")).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing);
df.select("queryTerms")\
.withColumn("queryTerms", F.explode("queryTerms"))\
.groupBy("queryTerms")\
.agg(F.count("queryTerms").alias("queryTermCounts"))\
.select(F.collect_list("queryTerms").over(w).alias("queryTerms"), F.collect_list("queryTermCounts").over(w).alias("queryTermCounts"))\
.select(F.first("queryTerms").alias("queryTerms"), F.first("queryTermCounts").alias("queryTerms"))\
.show(truncate=False)
Update - In Java:
df
.withColumn("asStruct", functions.arrays_zip(functions.col("queryTermCounts"), functions.col("queryTerms")))
.withColumn("asStruct", functions.explode(functions.col("asStruct")))
.select(functions.col("asStruct.*"))
.groupBy("queryTerms")
.agg(functions.sum("queryTermCounts").alias("queryTermCounts"))
.withColumn("queryTermCounts", functions.collect_list("queryTermCounts").over(Window.orderBy("queryTerms")))
.withColumn("queryTerms", functions.collect_list("queryTerms").over(Window.orderBy("queryTerms")))
.withColumn("mx", functions.max(functions.size(functions.col("queryTerms"))).over(Window.orderBy(functions.lit("dummy"))))
.filter(functions.size(functions.col("queryTerms")).equalTo(functions.col("mx")))
.drop("mx")
.show(false);
In Pyspark this would work (will post Java equivalent soon):
df.withColumn('asStruct', F.arrays_zip('queryTermCounts', 'queryTerms'))\
.withColumn('asStruct', F.explode('asStruct'))\
.select(F.col('asStruct.*'))\
.groupBy("queryTerms")\
.agg(F.sum("queryTermCounts").alias("queryTermCounts"))\
.withColumn("queryTermCounts", F.collect_list("queryTermCounts").over(Window.orderBy("queryTerms")))\
.withColumn("queryTerms", F.collect_list("queryTerms").over(Window.orderBy("queryTerms")))\
.withColumn("mx", F.max(F.size("queryTerms")).over(Window.orderBy(F.lit("dummy"))))\
.filter(F.size("queryTerms")==F.col("mx"))\
.drop("mx")\
.show(truncate=False)
Input:
Output:
答案2
得分: 1
我只使用了数组函数和映射来获得结果。
PySpark
data = [
[['hello', 'world', 'spark', 'hello', 'hello']],
[['eat', 'fish', 'cat', 'cat', 'fish']],
[['today', 'sun', 'sun', 'cloud', 'hello']]
]
df = spark.createDataFrame(data, ['queryTerms'])
df.withColumn('countMap', f.aggregate(
'queryTerms',
f.create_map().cast('map<string, int>'),
lambda acc, w: f.map_concat(f.map_filter(acc, lambda k, v: k != w), f.create_map(w, f.coalesce(acc[w] + f.lit(1), f.lit(1)))))).withColumn('queryTerms', f.map_keys('countMap')).withColumn('queryTermsCount', f.map_values('countMap')).drop('countMap').show(truncate=False)
英文:
I have only used the array functions and map to get the result.
PySpark
data = [
[['hello', 'world', 'spark', 'hello', 'hello']],
[['eat', 'fish', 'cat', 'cat', 'fish']],
[['today', 'sun', 'sun', 'cloud', 'hello']]
]
df = spark.createDataFrame(data, ['queryTerms'])
df.withColumn('countMap', f.aggregate(
'queryTerms',
f.create_map().cast('map<string, int>'),
lambda acc, w: f.map_concat(f.map_filter(acc, lambda k, v: k != w), f.create_map(w, f.coalesce(acc[w] + f.lit(1), f.lit(1)))))
) \
.withColumn('queryTerms', f.map_keys('countMap')) \
.withColumn('queryTermsCount', f.map_values('countMap')) \
.drop('countMap') \
.show(truncate=False)
+--------------------------+---------------+
|queryTerms |queryTermsCount|
+--------------------------+---------------+
|[world, spark, hello] |[1, 1, 3] |
|[eat, cat, fish] |[1, 2, 2] |
|[today, sun, cloud, hello]|[1, 2, 1, 1] |
+--------------------------+---------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论