Databricks Pyspark:如何获取外部MySQL中的表列表并创建数据框架?

huangapple go评论76阅读模式
英文:

Databricks Pyspark: How to get the list of tables in external MySQL and create data frame?

问题

在Azure Databricks中使用Pyspark,我需要连接到外部MySQL服务器ABC数据库,并将该数据库下的所有表复制到Azure Databricks。我编写了以下代码,它只返回该数据库中的表的数量。

pip install pymysql
import pymysql

con = pymysql.connect(host='XXXXX', password='XXXXX', port=3306, user='my_username', charset='utf6', database='ABC')

cursor = con.cursor()

cursor.execute('show tables')

它只返回输出:41

如何获取该数据库中表的列表?如何按批次创建这41个表的数据框?

希望获得ABC数据库下表的名称列表,希望按批次创建这41个表的数据框并存储在Databricks开发数据库中。

英文:

Working in Azure Databricks, Pyspark.
I need to connect to an external MySQL server ABC database and copy all the tables under the database to Azure Databricks.
I wrote the below codes and it only returns the number of tables in that database.

pip install pymysql

import pymysql

con=pymysql.connect(host='XXXXX', password='XXXXX', port=3306, user='my_username', charset='utf6', database='ABC')

cursor = con.cursor()

cursor.execute('show tables')

it only returns Output: 41

How can I get the list of tables in that database?
How to create dataframe for these 41 tables by batch?

hope to get the list of table names under ABC database

hope to create data frames for these 41 tables by batch and stored in databricks dev database

答案1

得分: 0

以下是翻译好的代码部分:

from pyspark.sql import SparkSession
import pymysql

# 建立与MySQL服务器的连接并检索表名
connection = pymysql.connect(
    host='XXXXX',
    port=3306,
    user='my_username',
    password='XXXXX',
    charset='utf8mb4',
    database='ABC'
)

cursor = connection.cursor()

# 检索表名
cursor.execute('SHOW TABLES')
table_names = [table[0] for table in cursor.fetchall()]

cursor.close()
connection.close()

spark = SparkSession.builder \
    .appName("MySQL to DataFrame") \
    .getOrCreate()

databricks_database = "dev"

# 遍历表名并创建数据帧
for table_name in table_names:
    query = f"SELECT * FROM ABC.{table_name}"
    df = spark.read.format("jdbc").option("url", "jdbc:mysql://XXXXX:3306/ABC") \
        .option("user", "my_username") \
        .option("password", "XXXXX") \
        .option("dbtable", query) \
        .option("driver", "com.mysql.jdbc.Driver") \
        .load()

    # 将数据帧保存为Databricks数据库中的表
    df.write.mode("overwrite").saveAsTable(f"{databricks_database}.{table_name}")

请注意,我已经将代码中的 HTML 实体编码(如 '")替换为普通的单引号和双引号,以便代码更易阅读。

英文:

I do this and it works fine for me:

from pyspark.sql import SparkSession
import pymysql

# Establish a connection to the MySQL server and retrieve the table names
connection = pymysql.connect(
    host='XXXXX',
    port=3306,
    user='my_username',
    password='XXXXX',
    charset='utf8mb4',
    database='ABC'
)

cursor = connection.cursor()

# Retrieve table names
cursor.execute('SHOW TABLES')
table_names = [table[0] for table in cursor.fetchall()]

cursor.close()
connection.close()

spark = SparkSession.builder \
    .appName("MySQL to DataFrame") \
    .getOrCreate()

databricks_database = "dev"

# Iterate over table names and create data frames
for table_name in table_names:
    query = f"SELECT * FROM ABC.{table_name}"
    df = spark.read.format("jdbc").option("url", "jdbc:mysql://XXXXX:3306/ABC") \
        .option("user", "my_username") \
        .option("password", "XXXXX") \
        .option("dbtable", query) \
        .option("driver", "com.mysql.jdbc.Driver") \
        .load()

    # Save data frame as a table in the Databricks database
    df.write.mode("overwrite").saveAsTable(f"{databricks_database}.{table_name}")

huangapple
  • 本文由 发表于 2023年6月12日 14:46:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/76454167.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定