英文:
Unable to write to redshift via PySpark
问题
我尝试使用PySpark写入Redshift。我的Spark版本是3.2.0,使用的Scala版本是2.12.15。
我正在尝试按照这里的指南进行写入。我还尝试通过aws_iam_role进行写入,但结果出现了相同的错误。所有我的依赖项都与Spark使用的Scala版本2.12匹配。
环境
Spark 3.2
Scala 2.12.15
Pyspark 3.2.3
Java 11
Ubuntu 22.04 LTS
Python 3.8
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc')\
.config("spark.jars.packages","com.eclipsesource.minimal-json:minimal-json:0.9.5,com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,com.amazonaws:aws-java-sdk-s3:1.12.437,org.apache.spark:spark-avro_2.12:3.3.2,io.github.spark-redshift-community:spark-redshift_2.12:5.1.0,org.apache.hadoop:hadoop-aws:3.2.2,com.google.guava:failureaccess:1.0")\
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.access.key", "etc") \
.config("spark.hadoop.fs.s3a.secret.key", "etc") \
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
.getOrCreate()
df=spark.read.option("header",True) \
.csv("demo.csv")
df.write \
.format("io.github.spark_redshift_community.spark.redshift") \
.option("url", "jdbc:redshift:iam://host:5439/dev?user=user&password=pass") \
.option("dbtable", "demo") \
.option("forward_spark_s3_credentials","True") \
.option("tempdir", "s3a://mubucket/folder") \
.mode("append") \
.save()
它抛出错误
23/03/30 18:51:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/03/30 18:51:50 WARN Utils$: The S3 bucket demo does not have an object lifecycle configuration to ensure cleanup of temporary files. Consider configuring `tempdir` to point to a bucket with an object lifecycle policy that automatically deletes files after an expiration period. For more information, see https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html
23/03/30 18:51:51 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:54 ERROR Utils: Aborting task
java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.createDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)'
由于发布要求,已删除了凭据。通过相同的凭据,我能够创建数据库/表。相同的凭据也能够在S3上创建文件并具有完全访问权限。
我尝试使用Spark写入Redshift。我按照指南的步骤进行了尝试,但无法写入。我尝试了多次,使用手册中提供的不同方法,但都导致了相同的错误。这是该手册的链接。
英文:
I am trying to write to redshift via PySpark. My Spark version is 3.2.0 Using Scala version 2.12.15.
I am trying to write as per guided here. I have also tried writing via aws_iam_role as explained in the link but it resulted in the same error. All my depndenices match scala version 2.12 which is what my Spark is using.
Enviorment
Spark 3.2
Scala 2.12.15
Pyspark 3.2.3
Java 11
Ubuntu 22.04 LTS
Python 3.8
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc')\
.config("spark.jars.packages","com.eclipsesource.minimal-json:minimal-json:0.9.5,com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,com.amazonaws:aws-java-sdk-s3:1.12.437,org.apache.spark:spark-avro_2.12:3.3.2,io.github.spark-redshift-community:spark-redshift_2.12:5.1.0,org.apache.hadoop:hadoop-aws:3.2.2,com.google.guava:failureaccess:1.0")\
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.access.key", "etc") \
.config("spark.hadoop.fs.s3a.secret.key", "etc") \
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
.getOrCreate()
df=spark.read.option("header",True) \
.csv("demo.csv")
df.write \
.format("io.github.spark_redshift_community.spark.redshift") \
.option("url", "jdbc:redshift:iam://host:5439/dev?user=user&password=pass") \
.option("dbtable", "demo") \
.option("forward_spark_s3_credentials","True") \
.option("tempdir", "s3a://mubucket/folder") \
.mode("append") \
.save()
It throws error
23/03/30 18:51:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/03/30 18:51:50 WARN Utils$: The S3 bucket demo does not have an object lifecycle configuration to ensure cleanup of temporary files. Consider configuring `tempdir` to point to a bucket with an object lifecycle policy that automatically deletes files after an expiration period. For more information, see https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html
23/03/30 18:51:51 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:54 ERROR Utils: Aborting task
java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.createDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)'
Credentials were removed due to posting. Via the same creds I am able to create database/tables. The same creds are also able to create files on S3 and have full access.
I was trying to write to redshift via spark. I used the guide till the point but was unable to write. I tried multiple times with different methodologies provided in the manual but all resulted in the same error. This is the manual.
答案1
得分: 0
这将您的数据框以Parquet格式写入S3,然后从该数据运行数据库的COPY命令,并从存储桶中删除它。
英文:
It seems this isn't working. For now, I have created a custom solution that can write to S3 via Spark as a parquet and run copy commands on the database. I have also opened an issue on GitHub about this. You can view it here.
from pyspark.sql import SparkSession
import psycopg2
import boto3
def query_redshift(current_query,fetch,url):
conn_string = url
conn = psycopg2.connect(conn_string)
conn.autocommit=True
cursor = conn.cursor()
cursor.execute(current_query)
if fetch==1:
records=cursor.fetchall()
conn.commit()
return records
cursor.close()
conn.close()
print ("S3 to Redshift Transfer Successful")
def write_to_redshift(df,folder,arn,tablename,jdbc_url,bucket,aws_access_key_id,aws_secret_access_key):
staging = "s3://"+bucket+"/"+folder
s3a = staging.replace("s3://","s3a://")
df.write.parquet(s3a)
query=f"""
COPY {tablename}
FROM '{staging}'
CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}'
FORMAT AS PARQUET;
"""
try:
print(query)
resp = query_redshift(query,0,jdbc_url)
except Exception as e:
print(str(e))
finally:
s3 = boto3.resource('s3',aws_access_key_id=aws_access_key_id,
aws_secret_access_key= aws_secret_access_key)
bucket = s3.Bucket(bucket)
delete = bucket.objects.filter(Prefix=folder+"/").delete()
print(delete)
def main():
aws_access_key_id = 'etc'
aws_secret_access_key = 'etc'
spark = SparkSession.builder.appName('abc')\
.config("spark.jars.packages","com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,org.apache.hadoop:hadoop-aws:3.2.2")\
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
.config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \
.getOrCreate()
df=spark.read.option("header",True) \
.csv("demo.csv") # replace with whatever dataframe you have
df.show()
tablename = 'public.demo'
iam_role=""
bucket_name = 'bucket'
#S3 Credentials Option 1
jdbc = "host = 'host' port ='5439' dbname = 'dev' user = 'user' password = 'pass' connect_timeout = 30000"
folder = "cache8"
write_to_redshift(df,folder,iam_role,tablename,jdbc,bucket_name,aws_access_key_id,aws_secret_access_key)
main()
This writes your dataframe to s3 as a parquet then runs a copy command on your db from that data and deletes it from the bucket.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论