英文:
create new spark column based on dictionary values
问题
I understand your request. Here's the translated code part without additional content:
如果我有一个以此格式为Spark数据框的DF:
| currency | value |
| -------- | -------- |
| USD | 1.00 |
| EUR | 2.00 |
以及一个包含当前货币汇率的字典(例如 - `{EUR: 1.00, USD: 0.90}`)。
我想要根据该字典添加另一个列,例如 `value_eur`。我应该如何操作?
我尝试了以下函数:
```python
raw_df.withColumn("value_eur", raw_df.value * currency_exchanges[raw_df.currency])
但它报错了:
TypeError: unhashable type: 'Column'
<details>
<summary>英文:</summary>
Say I have a Spark DF in this format:
| currency | value |
| -------- | -------- |
| USD | 1.00 |
| EUR | 2.00 |
And a dictionary with current currency exchanges <br>
(e.g - `{EUR: 1.00, USD: 0.90}`).
And I want to add another column, say `value_eur`, based on that dict. How would I go on in doing that?
I tried the function
raw_df.withColumn("value_eur", raw_df.value * currency_exchanges[raw_df.currency])
but it gives me Error
TypeError: unhashable type: 'Column'
</details>
# 答案1
**得分**: 0
I understand your request. Here is the translated code:
**Input DF**-
```python
from pysprl.sql.types import *
schema = StructType([
StructField("currency", StringType(), True),
StructField("value", DoubleType(), True)
])
data = [("USD", 1.00), ("EUR", 2.00)]
raw_df = spark.createDataFrame(data, schema)
Required Output-
from pyspark.sql.functions import *
currency_exchanges = {'EUR': 1.00, 'USD': 0.90}
currency_df = spark.createDataFrame(list(currency_exchanges.items()), ['currency', 'exchange_rate'])
result_df = raw_df.join(broadcast(currency_df), 'currency', 'left') \
.withColumn('value_eur', when(col('currency') == 'EUR', col('value')).otherwise(col('value') * col('exchange_rate'))) \
.drop('exchange_rate')
result_df.show()
+--------+-----+---------+
|currency|value|value_eur|
+--------+-----+---------+
| USD| 1.0| 0.9|
| EUR| 2.0| 2.0|
+--------+-----+---------+
Here, I've translated the code, and it appears to be the desired output you provided.
英文:
See the below implementation -
Input DF-
from pysprl.sql.types import *
schema = StructType([
StructField("currency", StringType(), True),
StructField("value", DoubleType(), True)
])
data = [("USD", 1.00), ("EUR", 2.00)]
raw_df = spark.createDataFrame(data, schema)
Required Output-
from pyspark.sql.functions import *
currency_exchanges = {'EUR': 1.00, 'USD': 0.90}
currency_df = spark.createDataFrame(list(currency_exchanges.items()), ['currency', 'exchange_rate'])
result_df = raw_df.join(broadcast_currency_df, 'currency', 'left') \
.withColumn('value_eur', when(col('currency') == 'EUR', col('value')).otherwise(col('value') * col('exchange_rate'))) \
.drop('exchange_rate')
result_df.show()
+--------+-----+---------+
|currency|value|value_eur|
+--------+-----+---------+
| USD| 1.0| 0.9|
| EUR| 2.0| 2.0|
+--------+-----+---------+
Here I've created another currency_df
using the dictionary and have joined raw_df
with currency_df
for the required use case.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论