英文:
PySpark multi join on column names as values
问题
我需要增强这个数据集A,使用第二个数据集B的多个sdl_id列。您需要按照以下条件进行连接:
B.domain_name = {A_col_name} && A.{A_col_name} == B.domain_code
并且获取带有A列前缀的sdl_id列(例如,ACCDES_sdl_id)。因此,结果将如下所示:
ACCDES | ACIDYR | ACLAS | BMOP | ACCDES_sdl_id | ACIDYR_sdl_id | ACLAS_sdl_id | BMOP_sdl_id |
---|---|---|---|---|---|---|---|
RA | TIX | 123221 | TA | 100012 | 1005316 | 1006537 | 1009015 |
RA | TIX | 123221 | TA | 100012 | 1005316 | 1006537 | 1009015 |
KE | TIX | 123221 | TA | 100014 | 1005316 | 1006537 | 1009015 |
KE | TIX | 123221 | TA | 100014 | 1005316 | 1006537 | 1009015 |
KE | REP | 987898 | TA | 100014 | 1005317 | 1006538 | 1009015 |
KE | REP | 987898 | TA | 100014 | 1005317 | 1006538 | 1009015 |
ON | REP | 987898 | TA | 100015 | 1005317 | 1006538 | 1009015 |
ON | REP | 987898 | TA | 100015 | 1005317 | 1006538 | 1009015 |
ON | MOS | 987898 | TA | 100015 | 1005318 | 1006538 | 1009015 |
ON | MOS | 6756 | DE | 100015 | 1005318 | 6756 | 1009016 |
RA | MOS | 6756 | DE | 100012 | 1005318 | 6756 | 1009016 |
您的首选方法是使用循环列A,并像这样连接B:
for c in A.columns:
A = A.join(B.filter(col("domain_name") == c), col(c) == col("domain_code"), "left") \
.select(A["*"], B["sdl_id"].alias(c + '_sdl_id'))
return A
但是您遇到了sdl_id列模糊的错误。我认为有更复杂的连接方法,而无需循环。感谢您的帮助!
英文:
I have a dataset A
ACCDES | ACIDYR | ACLAS | BMOP |
---|---|---|---|
RA | TIX | 123221 | TA |
RA | TIX | 123221 | TA |
KE | TIX | 123221 | TA |
KE | TIX | 123221 | TA |
KE | REP | 987898 | TA |
KE | REP | 987898 | TA |
ON | REP | 987898 | TA |
ON | REP | 987898 | TA |
ON | MOS | 987898 | TA |
ON | MOS | 6756 | DE |
RA | MOS | 6756 | DE |
this dataset I need to enhance with multiple sdl_id col from second dataset B
domain_name | sdl_id | domain_code |
---|---|---|
ACCDES | 100012 | RA |
ACCDES | 100014 | KE |
ACCDES | 100015 | ON |
ACCDES | 100017 | BE |
ACCDES | 100018 | LO |
ACCDES | 100019 | TE |
ACCDES | 1005313 | NA |
ACCDES | 1005314 | KA |
ACIDYR | 1005316 | TIX |
ACIDYR | 1005317 | REP |
ACIDYR | 1005318 | MOS |
ACIDYR | 1005319 | JIS |
ACIDYR | 1005320 | DEF |
ACIDYR | 1005321 | LIP |
ACIDYR | 1005324 | KER |
ACIDYR | 1005325 | NOS |
ACLAS | 1006537 | 123221 |
ACLAS | 1006538 | 987898 |
ACLAS | 1007631 | 6756 |
BMOP | 1009015 | TA |
BMOP | 1009016 | DE |
need to join it with the following condition:
B.domain_name = {A_col_name} && A.{A_col_name} == B.domain_code
and take sdl_id column with prefix of the A columns (e.g. ACCDES_sdl_id). So the result will looks like:
ACCDES | ACIDYR | ACLAS | BMOP | ACCDES_sdl_id | ACIDYR_sdl_id | ACLAS_sdl_id | BMOP_sdl_id |
---|---|---|---|---|---|---|---|
RA | TIX | 123221 | TA | 100012 | 1005316 | 1006537 | 1009015 |
RA | TIX | 123221 | TA | 100012 | 1005316 | 1006537 | 1009015 |
KE | TIX | 123221 | TA | 100014 | 1005316 | 1006537 | 1009015 |
KE | TIX | 123221 | TA | 100014 | 1005316 | 1006537 | 1009015 |
KE | REP | 987898 | TA | 100014 | 1005317 | 1006538 | 1009015 |
KE | REP | 987898 | TA | 100014 | 1005317 | 1006538 | 1009015 |
ON | REP | 987898 | TA | 100015 | 1005317 | 1006538 | 1009015 |
ON | REP | 987898 | TA | 100015 | 1005317 | 1006538 | 1009015 |
ON | MOS | 987898 | TA | 100015 | 1005318 | 1006538 | 1009015 |
ON | MOS | 6756 | DE | 100015 | 1005318 | 6756 | 1009016 |
RA | MOS | 6756 | DE | 100012 | 1005318 | 6756 | 1009016 |
My first thought was to loop columns of A and join B like this:
for c in A.columns:
A = A.join(B.filter(col("domain_name") == c), col(c) == col("domain_code"), "left") \
.select(A["*"], B["sdl_id"].alias(c + '_sdl_id'))
return A
but I am getting error of column sdl_id is ambiguous. I guess there must be some more sophisticated method of joining without looping. Thanks!
答案1
得分: 1
你必须在连接后删除重复的列。
df = spark.read.csv('test1.csv', sep='\t', header=True)
df2 = spark.read.csv('test2.csv', sep='\t', header=True)
cols = df1.columns
def join(df, c):
return df \
.join(df2, (f.col('domain_name') == f.lit(c)) & (f.col('domain_code') == f.col(c)), 'left') \
.withColumnRenamed('sdl_id', c + '_sdl_id') \
.drop('domain_name', 'domain_code')
for c in cols:
df = join(df, c)
df.show(truncate=False)
请注意,这只是代码的翻译部分,不包括注释或其他额外的信息。
英文:
You have to drop the duplicated columns after the join.
df = spark.read.csv('test1.csv', sep='\t', header=True)
df2 = spark.read.csv('test2.csv', sep='\t', header=True)
cols = df1.columns
def join(df, c):
return df \
.join(df2, (f.col('domain_name') == f.lit(c)) & (f.col('domain_code') == f.col(c)), 'left') \
.withColumnRenamed('sdl_id', c + '_sdl_id') \
.drop('domain_name', 'domain_code')
for c in cols:
df = join(df, c)
df.show(truncate=False)
+------+------+------+----+-------------+-------------+------------+-----------+
|ACCDES|ACIDYR|ACLAS |BMOP|ACCDES_sdl_id|ACIDYR_sdl_id|ACLAS_sdl_id|BMOP_sdl_id|
+------+------+------+----+-------------+-------------+------------+-----------+
|RA |TIX |123221|TA |100012 |1005316 |1006537 |1009015 |
|RA |TIX |123221|TA |100012 |1005316 |1006537 |1009015 |
|KE |TIX |123221|TA |100014 |1005316 |1006537 |1009015 |
|KE |TIX |123221|TA |100014 |1005316 |1006537 |1009015 |
|KE |REP |987898|TA |100014 |1005317 |1006538 |1009015 |
|KE |REP |987898|TA |100014 |1005317 |1006538 |1009015 |
|ON |REP |987898|TA |100015 |1005317 |1006538 |1009015 |
|ON |REP |987898|TA |100015 |1005317 |1006538 |1009015 |
|ON |MOS |987898|TA |100015 |1005318 |1006538 |1009015 |
|ON |MOS |6756 |DE |100015 |1005318 |1007631 |1009016 |
|RA |MOS |6756 |DE |100012 |1005318 |1007631 |1009016 |
+------+------+------+----+-------------+-------------+------------+-----------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论