英文:
Databricks Pyspark: How to get the list of tables in external MySQL and create data frame?
问题
在Azure Databricks中使用Pyspark,我需要连接到外部MySQL服务器ABC数据库,并将该数据库下的所有表复制到Azure Databricks。我编写了以下代码,它只返回该数据库中的表的数量。
pip install pymysql
import pymysql
con = pymysql.connect(host='XXXXX', password='XXXXX', port=3306, user='my_username', charset='utf6', database='ABC')
cursor = con.cursor()
cursor.execute('show tables')
它只返回输出:41
如何获取该数据库中表的列表?如何按批次创建这41个表的数据框?
希望获得ABC数据库下表的名称列表,希望按批次创建这41个表的数据框并存储在Databricks开发数据库中。
英文:
Working in Azure Databricks, Pyspark.
I need to connect to an external MySQL server ABC database and copy all the tables under the database to Azure Databricks.
I wrote the below codes and it only returns the number of tables in that database.
pip install pymysql
import pymysql
con=pymysql.connect(host='XXXXX', password='XXXXX', port=3306, user='my_username', charset='utf6', database='ABC')
cursor = con.cursor()
cursor.execute('show tables')
it only returns Output: 41
How can I get the list of tables in that database?
How to create dataframe for these 41 tables by batch?
hope to get the list of table names under ABC database
hope to create data frames for these 41 tables by batch and stored in databricks dev database
答案1
得分: 0
以下是翻译好的代码部分:
from pyspark.sql import SparkSession
import pymysql
# 建立与MySQL服务器的连接并检索表名
connection = pymysql.connect(
host='XXXXX',
port=3306,
user='my_username',
password='XXXXX',
charset='utf8mb4',
database='ABC'
)
cursor = connection.cursor()
# 检索表名
cursor.execute('SHOW TABLES')
table_names = [table[0] for table in cursor.fetchall()]
cursor.close()
connection.close()
spark = SparkSession.builder \
.appName("MySQL to DataFrame") \
.getOrCreate()
databricks_database = "dev"
# 遍历表名并创建数据帧
for table_name in table_names:
query = f"SELECT * FROM ABC.{table_name}"
df = spark.read.format("jdbc").option("url", "jdbc:mysql://XXXXX:3306/ABC") \
.option("user", "my_username") \
.option("password", "XXXXX") \
.option("dbtable", query) \
.option("driver", "com.mysql.jdbc.Driver") \
.load()
# 将数据帧保存为Databricks数据库中的表
df.write.mode("overwrite").saveAsTable(f"{databricks_database}.{table_name}")
请注意,我已经将代码中的 HTML 实体编码(如 '
和 "
)替换为普通的单引号和双引号,以便代码更易阅读。
英文:
I do this and it works fine for me:
from pyspark.sql import SparkSession
import pymysql
# Establish a connection to the MySQL server and retrieve the table names
connection = pymysql.connect(
host='XXXXX',
port=3306,
user='my_username',
password='XXXXX',
charset='utf8mb4',
database='ABC'
)
cursor = connection.cursor()
# Retrieve table names
cursor.execute('SHOW TABLES')
table_names = [table[0] for table in cursor.fetchall()]
cursor.close()
connection.close()
spark = SparkSession.builder \
.appName("MySQL to DataFrame") \
.getOrCreate()
databricks_database = "dev"
# Iterate over table names and create data frames
for table_name in table_names:
query = f"SELECT * FROM ABC.{table_name}"
df = spark.read.format("jdbc").option("url", "jdbc:mysql://XXXXX:3306/ABC") \
.option("user", "my_username") \
.option("password", "XXXXX") \
.option("dbtable", query) \
.option("driver", "com.mysql.jdbc.Driver") \
.load()
# Save data frame as a table in the Databricks database
df.write.mode("overwrite").saveAsTable(f"{databricks_database}.{table_name}")
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论