Unable to write to redshift via PySpark.

huangapple go评论58阅读模式
英文:

Unable to write to redshift via PySpark

问题

我尝试使用PySpark写入Redshift。我的Spark版本是3.2.0,使用的Scala版本是2.12.15。

我正在尝试按照这里的指南进行写入。我还尝试通过aws_iam_role进行写入,但结果出现了相同的错误。所有我的依赖项都与Spark使用的Scala版本2.12匹配。

环境
Spark 3.2
Scala 2.12.15
Pyspark 3.2.3
Java 11
Ubuntu 22.04 LTS
Python 3.8

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('abc')\
    .config("spark.jars.packages","com.eclipsesource.minimal-json:minimal-json:0.9.5,com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,com.amazonaws:aws-java-sdk-s3:1.12.437,org.apache.spark:spark-avro_2.12:3.3.2,io.github.spark-redshift-community:spark-redshift_2.12:5.1.0,org.apache.hadoop:hadoop-aws:3.2.2,com.google.guava:failureaccess:1.0")\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", "etc") \
    .config("spark.hadoop.fs.s3a.secret.key", "etc") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
    .getOrCreate()

df=spark.read.option("header",True) \
.csv("demo.csv")

df.write \
  .format("io.github.spark_redshift_community.spark.redshift") \
  .option("url", "jdbc:redshift:iam://host:5439/dev?user=user&password=pass") \
  .option("dbtable", "demo") \
  .option("forward_spark_s3_credentials","True") \
  .option("tempdir", "s3a://mubucket/folder") \
  .mode("append") \
  .save()

它抛出错误

23/03/30 18:51:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/03/30 18:51:50 WARN Utils$: The S3 bucket demo does not have an object lifecycle configuration to ensure cleanup of temporary files. Consider configuring `tempdir` to point to a bucket with an object lifecycle policy that automatically deletes files after an expiration period. For more information, see https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html
23/03/30 18:51:51 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:54 ERROR Utils: Aborting task
java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.createDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)'

由于发布要求,已删除了凭据。通过相同的凭据,我能够创建数据库/表。相同的凭据也能够在S3上创建文件并具有完全访问权限。

我尝试使用Spark写入Redshift。我按照指南的步骤进行了尝试,但无法写入。我尝试了多次,使用手册中提供的不同方法,但都导致了相同的错误。是该手册的链接。

英文:

I am trying to write to redshift via PySpark. My Spark version is 3.2.0 Using Scala version 2.12.15.

I am trying to write as per guided here. I have also tried writing via aws_iam_role as explained in the link but it resulted in the same error. All my depndenices match scala version 2.12 which is what my Spark is using.

Enviorment
Spark 3.2
Scala 2.12.15
Pyspark 3.2.3
Java 11
Ubuntu 22.04 LTS
Python 3.8

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('abc')\
    .config("spark.jars.packages","com.eclipsesource.minimal-json:minimal-json:0.9.5,com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,com.amazonaws:aws-java-sdk-s3:1.12.437,org.apache.spark:spark-avro_2.12:3.3.2,io.github.spark-redshift-community:spark-redshift_2.12:5.1.0,org.apache.hadoop:hadoop-aws:3.2.2,com.google.guava:failureaccess:1.0")\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", "etc") \
    .config("spark.hadoop.fs.s3a.secret.key", "etc") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
    .getOrCreate()

df=spark.read.option("header",True) \
.csv("demo.csv")

df.write \
  .format("io.github.spark_redshift_community.spark.redshift") \
  .option("url", "jdbc:redshift:iam://host:5439/dev?user=user&password=pass") \
  .option("dbtable", "demo") \
  .option("forward_spark_s3_credentials","True") \
  .option("tempdir", "s3a://mubucket/folder") \
  .mode("append") \
  .save()

It throws error

23/03/30 18:51:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/03/30 18:51:50 WARN Utils$: The S3 bucket demo does not have an object lifecycle configuration to ensure cleanup of temporary files. Consider configuring `tempdir` to point to a bucket with an object lifecycle policy that automatically deletes files after an expiration period. For more information, see https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html
23/03/30 18:51:51 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:54 ERROR Utils: Aborting task
java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.createDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)'

Credentials were removed due to posting. Via the same creds I am able to create database/tables. The same creds are also able to create files on S3 and have full access.

I was trying to write to redshift via spark. I used the guide till the point but was unable to write. I tried multiple times with different methodologies provided in the manual but all resulted in the same error. This is the manual.

答案1

得分: 0

这将您的数据框以Parquet格式写入S3,然后从该数据运行数据库的COPY命令,并从存储桶中删除它。

英文:

It seems this isn't working. For now, I have created a custom solution that can write to S3 via Spark as a parquet and run copy commands on the database. I have also opened an issue on GitHub about this. You can view it here.

from pyspark.sql import SparkSession
import psycopg2
import boto3


def query_redshift(current_query,fetch,url):
      conn_string = url
      conn = psycopg2.connect(conn_string)
      conn.autocommit=True
      cursor = conn.cursor()
      cursor.execute(current_query)

      if fetch==1:
          records=cursor.fetchall()
          conn.commit()
          return records
      cursor.close()
      conn.close()
      print ("S3 to Redshift Transfer Successful")
           
      
def write_to_redshift(df,folder,arn,tablename,jdbc_url,bucket,aws_access_key_id,aws_secret_access_key):
  staging = "s3://"+bucket+"/"+folder
  s3a = staging.replace("s3://","s3a://")
  df.write.parquet(s3a)
  
  query=f"""
COPY {tablename}
FROM '{staging}'
CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}'
FORMAT AS PARQUET;
  """
  
  try:
    print(query)
    resp = query_redshift(query,0,jdbc_url)
  except Exception as e:
    print(str(e))  
  finally:
    s3 = boto3.resource('s3',aws_access_key_id=aws_access_key_id,
          aws_secret_access_key= aws_secret_access_key)
    bucket = s3.Bucket(bucket)
    delete = bucket.objects.filter(Prefix=folder+"/").delete()
    print(delete)



def main():
  aws_access_key_id = 'etc'
  aws_secret_access_key = 'etc'
  spark = SparkSession.builder.appName('abc')\
      .config("spark.jars.packages","com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,org.apache.hadoop:hadoop-aws:3.2.2")\
      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
      .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \
      .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
      .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \
      .getOrCreate()
      
  df=spark.read.option("header",True) \
  .csv("demo.csv") # replace with whatever dataframe you have

  df.show()
  tablename = 'public.demo'
  iam_role=""
  bucket_name = 'bucket'

  #S3 Credentials Option 1

  jdbc = "host = 'host' port ='5439' dbname = 'dev' user = 'user' password = 'pass' connect_timeout = 30000"
  folder = "cache8"
  
  write_to_redshift(df,folder,iam_role,tablename,jdbc,bucket_name,aws_access_key_id,aws_secret_access_key)
  
main()

This writes your dataframe to s3 as a parquet then runs a copy command on your db from that data and deletes it from the bucket.

huangapple
  • 本文由 发表于 2023年3月31日 03:01:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/75892033.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定