Spark ETL大数据传输 – 如何并行化

huangapple go评论72阅读模式
英文:

Spark ETL Large data transfer - how to parallelize

问题

以下是您提供的内容的翻译:

我想要将大量数据从一个数据库迁移到另一个数据库,我已经了解到Spark是一个很好的工具来完成这个任务。我正试图理解Spark的大数据ETL过程和思想。如果有人能解释Spark是如何将数据并行化(或分割)成各个作业的,我将不胜感激。我的主要目标是将数据从BigQuery迁移到Amazon Keyspaces,数据大小约为40GB。

我在这里提供了我已经从网上收集的理解。

这是从BigQuery读取数据的代码。

from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-bigquery-ks') \
  .getOrCreate()

spark.conf.set("credentialsFile", "./cred.json")

# 从BigQuery加载数据。
df = spark.read.format('bigquery') \
  .option('parentProject','project-id') \
  .option('dataset', 'mydataset') \
  .option('table', 'mytable') \
  .option('query', 'SELECT * from mytable LIMIT 10') \
  .load()

print(df.head())

现在我需要弄清楚转换数据的最佳方式(这非常容易,我可以自己做),但我最重要的问题是关于如何分批处理如此大的数据集。在将这些数据移动到Keyspaces时,是否有什么需要考虑的因素(因为它不适合内存)。

from ssl import SSLContext, CERT_REQUIRED, PROTOCOL_TLSv1_2
import boto3
from boto3 import Session
from cassandra_sigv4.auth import AuthProvider, Authenticator, SigV4AuthProvider
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT

ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context.load_verify_locations('./AmazonRootCA1.pem')
ssl_context.verify_mode = CERT_REQUIRED
boto_session = boto3.Session(aws_access_key_id="accesstoken",
                             aws_secret_access_key="accesskey",
                             aws_session_token="token",
                             region_name="region")
auth_provider = SigV4AuthProvider(boto_session)
cluster = Cluster(['cassandra.region.amazonaws.com'], ssl_context=ssl_context, auth_provider=auth_provider,
                  port=9142)
session = cluster.connect()

最后,将数据推送到Keyspaces的代码如下:

# 将数据写入Amazon Keyspaces
for index, row in pdf.iterrows():
    keyfamilyid = row["keyfamilyid"]
    recommendedfamilyid = row["recommendedfamilyid"]
    rank = row["rank"]
    chi = row["chi"]
    recommendationtype = row["recommendationtype"]
    title = row["title"]
    location = row["location"]
    typepriority = row["typepriority"]
    customerid = row["customerid"]
    insert_query = f"INSERT INTO {keyspace_name}.{table_name} (keyfamilyid, recommendedfamilyid, rank, chi, recommendationtype, title, location, typepriority, customerid) VALUES ('{keyfamilyid}', '{recommendedfamilyid}', {rank}, {chi}, '{recommendationtype}', '{title}', '{location}', '{typepriority}', '{customerid}')"
    try:
        client.execute(insert_query)
    except ClientError as e:
        print(f"写入第{index}行数据时出错:{e.response['Error']['Message']}")
英文:

I am looking to move a large amount of data from one db to another and I have seen that Spark is a good tool for doing this. I am trying to understand the process and the ideology behind Spark's big data ETL's. Would also appreciate if someone could explain how Spark goes about parallelizing (or splitting) the data in to the various jobs that it spawns. My main aim is to move the data from BigQuery to Amazon Keyspaces - and the data is around 40Gigs.

I am putting here the understanding I have gathered already from online.

This is the code to read the data from Bigquery.

from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-bigquery-ks') \
  .getOrCreate()

spark.conf.set("credentialsFile", "./cred.json")

# Load data from BigQuery.
df = spark.read.format('bigquery') \
  .option('parentProject','project-id') \
  .option('dataset', 'mydataset') \
  .option('table', 'mytable') \
  .option('query', 'SELECT * from mytable LIMIT 10') \
  .load()


print(df.head())

Now I need to figure out the best way to transform the data (which is super easy and I can do that myself) - but my most important question is regarding the batching and handling of such a large data set. Are there any considerations that I need to have to move such data ( which wont fit in memory ) to KeySpaces.

from ssl import SSLContext, CERT_REQUIRED, PROTOCOL_TLSv1_2
import boto3
from boto3 import Session
from cassandra_sigv4.auth import AuthProvider, Authenticator, SigV4AuthProvider
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT


ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context.load_verify_locations('./AmazonRootCA1.pem')
ssl_context.verify_mode = CERT_REQUIRED
boto_session = boto3.Session(aws_access_key_id="accesstoken",
                             aws_secret_access_key="accesskey",
                             aws_session_token="token",
                             region_name="region")
auth_provider = SigV4AuthProvider(boto_session)
cluster = Cluster(['cassandra.region.amazonaws.com'], ssl_context=ssl_context, auth_provider=auth_provider,
                  port=9142)
session = cluster.connect()

and finally to push the data to keyspaces would look something like this code.

# Write data to Amazon Keyspaces
for index, row in pdf.iterrows():
    keyfamilyid = row["keyfamilyid"]
    recommendedfamilyid = row["recommendedfamilyid"]
    rank = row["rank"]
    chi = row["chi"]
    recommendationtype = row["recommendationtype"]
    title = row["title"]
    location = row["location"]
    typepriority = row["typepriority"]
    customerid = row["customerid"]
    insert_query = f"INSERT INTO {keyspace_name}.{table_name} (keyfamilyid, recommendedfamilyid, rank, chi, recommendationtype, title, location, typepriority, customerid) VALUES ('{keyfamilyid}', '{recommendedfamilyid}', {rank}, {chi}, '{recommendationtype}', '{title}', '{location}', '{typepriority}', '{customerid}')"
    try:
        client.execute(insert_query)
    except ClientError as e:
        print(f"Error writing data for row {index}: {e.response['Error']['Message']}")

答案1

得分: 2

您可以使用Spark Cassandra连接器将数据复制到Amazon Keyspaces。在以下示例中,我从S3写入Keyspaces,但对于BigQuery,操作方式将类似。写入Keyspaces的最重要的事情是要执行以下操作:

  1. 使用预配置容量模式预热表格。为了确保表格在初始写入速率下有足够的资源,分配大量的容量。

  2. 从Bigtable中随机排列数据,因为它很可能是按排序顺序导出的。在插入到NoSQL时,应以随机访问模式写入。

以下是代码示例的一部分,用于实现上述操作:

object GlueApp {

  def main(sysArgs: Array[String]) {

    // 代码内容...
    
    val orderedData = sparkSession.read.format(backupFormat).load(s3bucketBackupsLocation)

    // 在加载之前随机化数据,以最大化表格吞吐量并避免写入节流事件
    // 从其他数据库或Cassandra导出的数据可能按主键排序
    // 在Amazon Keyspaces中,您需要以随机方式加载数据以使用所有可用资源
    // 以下命令将随机化数据
    val shuffledData = orderedData.orderBy(rand())

    shuffledData.write.format("org.apache.spark.sql.cassandra").mode("append").option("keyspace", keyspaceName).option("table", tableName).save()

    Job.commit()
  }
}

您可以在这里找到完整示例代码的链接。

请注意,这是代码的部分示例,用于说明如何将数据复制到Amazon Keyspaces,并且需要根据您的具体需求进行适当的设置和配置。

英文:

You can use the spark cassandra connector to copy data to Amazon Keyspaces. In the following example I write to Keyspaces from S3 but big query would be similar. The biggest thing with writing to Keyspaces would be to do the following.

  1. Prewarm the tables using provisioned capacity mode. Provision a high amount to ensure the table has enough resources at intial write rate.

  2. Shuffle the data from bigtable since it would most likely be exported in sorted order. Inserting to NoSQL you should write in a random access pattern

object GlueApp {

  def main(sysArgs: Array[String]) {

  val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "KEYSPACE_NAME", "TABLE_NAME", "DRIVER_CONF", "FORMAT", "S3_URI").toArray)

  val driverConfFileName = args("DRIVER_CONF")

  val conf = new SparkConf()
      .setAll(
       Seq(
           ("spark.task.maxFailures",  "10"),
             
          
          ("spark.cassandra.connection.config.profile.path",  driverConfFileName),
          ("spark.cassandra.query.retry.count", "1000"),
          ("spark.cassandra.output.consistency.level",  "LOCAL_QUORUM"),

          ("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"),
          ("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"),
          ("spark.cassandra.concurrent.reads", "512"),

          ("spark.cassandra.output.concurrent.writes", "15"),
          ("spark.cassandra.output.batch.grouping.key", "none"),
          ("spark.cassandra.output.batch.size.rows", "1")
      ))

    val spark: SparkContext = new SparkContext(conf)
    val glueContext: GlueContext = new GlueContext(spark)
    val sparkSession: SparkSession = glueContext.getSparkSession

    import sparkSession.implicits._

    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    val logger = new GlueLogger
    
    //validation steps for peers and partitioner 
    val connector = CassandraConnector.apply(conf);
    val session = connector.openSession();
    val peersCount = session.execute("SELECT * FROM system.peers").all().size()
    
    val partitioner = session.execute("SELECT partitioner from system.local").one().getString("partitioner")
    
    logger.info("Total number of seeds:" + peersCount);
    logger.info("Configured partitioner:" + partitioner);
    
    if(peersCount == 0){
       throw new Exception("No system peers found. Check required permissions to read from the system.peers table. If using VPCE check permissions for describing VPCE endpoints. https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html")
    }
    
    if(partitioner.equals("com.amazonaws.cassandra.DefaultPartitioner")){
        throw new Exception("Sark requires the use of RandomPartitioner or Murmur3Partitioner. See Working with partioners in Amazon Keyspaces documentation. https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-partitioners.html")
    }
    
    val tableName = args("TABLE_NAME")
    val keyspaceName = args("KEYSPACE_NAME")
    val backupFormat = args("FORMAT")
    val s3bucketBackupsLocation = args("S3_URI")	


    val orderedData = sparkSession.read.format(backupFormat).load(s3bucketBackupsLocation)	

   //You want randomize data before loading to maximize table throughput and avoid WriteThottleEvents	
   //Data exported from another database or Cassandra may be ordered by primary key.	
   //With Amazon Keyspaces you want to load data in a random way to use all available resources.	
   //The following command will randomize the data.	
   val shuffledData = orderedData.orderBy(rand())	

   shuffledData.write.format("org.apache.spark.sql.cassandra").mode("append").option("keyspace", keyspaceName).option("table", tableName).save()

   Job.commit()
  }
}

huangapple
  • 本文由 发表于 2023年3月3日 22:48:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/75628536.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定