英文:
Can't achieve desired directory structure when writing data from Kafka topic to HDFS using PySpark
问题
I've translated the code for you. Here's the translated code:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyarrow.hdfs as hdfs
import os
# 配置日志
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger("org.apache.kafka").setLevel(logging.WARN)
# 初始化 SparkSession
spark = SparkSession.builder \
.appName("KafkaToHDFS") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
.getOrCreate()
# Kafka 服务器配置
kafka_server = "sandbox-hdf.hortonworks.com:6667"
kafka_topic = "airbnbdata-final"
# 从 Kafka 主题读取数据
df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_server) \
.option("subscribe", kafka_topic) \
.load()
# 定义一个创建输出目录的函数
def create_directory(directory):
current_time = spark.sql("SELECT current_timestamp()").collect()[0][0]
# 提取年、月、日和小时组件
year_str = current_time.strftime("%Y")
month_str = current_time.strftime("%m")
day_str = current_time.strftime("%d")
hour_str = current_time.strftime("%H")
fulldirectory = directory + "/" + year_str + "/" + month_str + "/" + day_str + "/" + hour_str + "/"
return fulldirectory
# 定义一个处理每个批次的函数
def process_batch(df, fulldirectory):
schema = StructType().add("file_type", StringType()).add("file_name", StringType()).add("data", StringType())
# 提取键、值和其他列
extracted_df = df.selectExpr(
"CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp", "timestampType"
)
# 解析值列中的 JSON 字符串
extracted_df = extracted_df.withColumn("value_json", from_json(col("value"), schema))
# 从 value_json 列中提取字段
extracted_df = extracted_df.withColumn("file_type", col("value_json.file_type"))
extracted_df = extracted_df.withColumn("file_name", col("value_json.file_name"))
extracted_df = extracted_df.withColumn("data", col("value_json.data"))
extracted_df.printSchema()
return extracted_df
# 定义一个自定义函数,将每个文件写入 HDFS
def write_file(row, fulldirectory):
fs = hdfs.connect(host="sandbox-hdp.hortonworks.com", port=8020)
file_name = row["file_name"]
data = row["data"]
hdfs_path = fulldirectory + file_name
with fs.open(hdfs_path, "wb") as f:
f.write(data)
# 定义一个处理图像和视频的函数
def process_images_and_videos(df, fulldirectory):
# 过滤数据框,仅获取图像和视频
image_df = df.filter(col("file_type").isin(["jpg", "png"]))
video_df = df.filter(col("file_type").isin(["mp4", "mov"]))
# 将图像写入 HDFS
image_df.foreach(lambda row: write_file(row, fulldirectory + "images/"))
# 将视频写入 HDFS
video_df.foreach(lambda row: write_file(row, fulldirectory + "videos/"))
# 配置目录路径并创建输出目录
directory_path = "hdfs://sandbox-hdp.hortonworks.com:8020/user/hadoop/OUTPUT_FINAL1"
fulldirectory = create_directory(directory_path)
# 处理批次
processed_df = process_batch(df, fulldirectory)
# 将每个批次写入 HDFS 作为 Parquet 文件
processed_df.write \
.format("parquet") \
.mode("append") \
.partitionBy("file_type", "file_name") \
.parquet(fulldirectory + "csv/")
# 处理图像和视频
process_images_and_videos(processed_df, fulldirectory)
# 停止 SparkSession
spark.stop()
If you have any further questions or need assistance with this code, please let me know.
英文:
I am reading data from a Kafka topic and storing the data to HDFS using PySpark. I am trying to achieve the following directory structure in HDFS when I run the code:
user/hadoop/OUTPUT_FINAL1/year/month/day/hour/
csv/ (all csv files as parquet)
images/ (all images files)
videos/ (all videos files)
However when I run my code I the directory structure is seen as the following in HDFS:
user/hadoop/OUTPUT_FINAL1/year/month/day/hour/
csv/file_type=csv/ (csv data) and file_type=image/ (images data)
My code is not writing the desired directory structure and it is not writing the video files.
CSV files need to be written with the same name with the extension parquet.
Images need to be written with the the .jpg or .png extension and same for videos with .mp4 or .mov.
here is my code.
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyarrow.hdfs as hdfs
import os
# Configure logging
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger("org.apache.kafka").setLevel(logging.WARN)
# Initialize SparkSession
spark = SparkSession.builder \
.appName("KafkaToHDFS") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
.getOrCreate()
# Kafka broker configuration
kafka_server = "sandbox-hdf.hortonworks.com:6667"
kafka_topic = "airbnbdata-final"
# Read data from Kafka topic
df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_server) \
.option("subscribe", kafka_topic) \
.load()
# Define a function to create the output directory
def create_directory(directory):
current_time = spark.sql("SELECT current_timestamp()").collect()[0][0]
# Extract year, month, day, and hour components
year_str = current_time.strftime("%Y")
month_str = current_time.strftime("%m")
day_str = current_time.strftime("%d")
hour_str = current_time.strftime("%H")
fulldirectory = directory + "/" + year_str + "/" + month_str + "/" + day_str + "/" + hour_str + "/"
return fulldirectory
# Define a function to process each batch
def process_batch(df, fulldirectory):
schema = StructType().add("file_type", StringType()).add("file_name", StringType()).add("data", StringType())
# Extract the key, value, and other columns
extracted_df = df.selectExpr(
"CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp", "timestampType"
)
# Parse the JSON string in the value column
extracted_df = extracted_df.withColumn("value_json", from_json(col("value"), schema))
# Extract fields from the value_json column
extracted_df = extracted_df.withColumn("file_type", col("value_json.file_type"))
extracted_df = extracted_df.withColumn("file_name", col("value_json.file_name"))
extracted_df = extracted_df.withColumn("data", col("value_json.data"))
extracted_df.printSchema()
return extracted_df
# Define a custom function to write each file to HDFS
def write_file(row, fulldirectory):
fs = hdfs.connect(host="sandbox-hdp.hortonworks.com", port=8020)
file_name = row["file_name"]
data = row["data"]
hdfs_path = fulldirectory + file_name
with fs.open(hdfs_path, "wb") as f:
f.write(data)
# Define a function to process images and videos
def process_images_and_videos(df, fulldirectory):
# Filter the dataframe to get only images and videos
image_df = df.filter(col("file_type").isin(["jpg", "png"]))
video_df = df.filter(col("file_type").isin(["mp4", "mov"]))
# Write images to HDFS
image_df.foreach(lambda row: write_file(row, fulldirectory + "images/"))
# Write videos to HDFS
video_df.foreach(lambda row: write_file(row, fulldirectory + "videos/"))
# Configure the directory path and create the output directory
directory_path = "hdfs://sandbox-hdp.hortonworks.com:8020/user/hadoop/OUTPUT_FINAL1"
fulldirectory = create_directory(directory_path)
# Process the batch
processed_df = process_batch(df, fulldirectory)
# Write each batch to HDFS as Parquet
processed_df.write \
.format("parquet") \
.mode("append") \
.partitionBy("file_type", "file_name") \
.parquet(fulldirectory + "csv/")
# Process images and videos
process_images_and_videos(processed_df, fulldirectory)
# Stop the SparkSession
spark.stop()
I run my code in my ambari sandbox environment ssh using the following command:
spark-submit --jars /tmp/Airbnb_Data/jsr305-3.0.2.jar,/tmp/Airbnb_Data/snappy-java-1.1.8.4.jar,/tmp/Airbnb_Data/kafka-clients-3.3.2.jar,/tmp/Airbnb_Data/spark-sql-kafka-0-10_2.11-2.4.0.jar,/tmp/Airbnb_Data/commons-logging-1.1.3.jar,/tmp/Airbnb_Data/spark-streaming-kafka-0-10_2.12-3.4.0.jar,/tmp/Airbnb_Data/hadoop-client-api-3.3.4.jar,/tmp/Airbnb_Data/lz4-java-1.8.0.jar,/tmp/Airbnb_Data/hadoop-client-runtime-3.3.4.jar,/tmp/Airbnb_Data/scala-library-2.12.17.jar,/tmp/Airbnb_Data/spark-tags_2.12-3.4.0.jar,/tmp/Airbnb_Data/spark-token-provider-kafka-0-10_2.12-3.4.0.jar,/tmp/Airbnb_Data/slf4j-api-1.7.36.jar KafkaToHDFS.py
Here is my desired directory structure:
user/hadoop/OUTPUT_FINAL1/
├── year/
│ ├── month/
│ │ ├── day/
│ │ │ ├── hour/
│ │ │ │ ├── csv/
│ │ │ │ │ └── <csv parquet files>
│ │ │ │ ├── images/
│ │ │ │ │ └── <image files>
│ │ │ │ └── videos/
│ │ │ │ └── <video files>
答案1
得分: 1
Spark无法写入二进制图像或视频数据。
在Spark foreach内调用pyarrow函数并不真正使用Spark文件系统写入程序。另外,我相信pyarrow是在HDP 2.6.5支持结束后发布的,所以我不认为该函数会起作用。
关于目录结构,让Spark为您完成,通过对单独的日期列进行分区,而不是收集批处理并提取一个元素。
然后你将会有这样的数据结构:year=xxxx/month=xx/day=xx/hour=xx/filetype=xxx/
图片需要以.jpg或.png扩展名写入,视频也是如此,使用.mp4或.mov扩展名。
HDFS不适合存储此类文件。请使用Blob存储来存储媒体资产。
英文:
Spark cannot write binary image or video data.
The fact that you're calling pyarrow functions within Spark foreach is not truly using Spark filesystem writer. Plus, pyarrow was released after HDP 2.6.5 support ended, I believe, so I don't think that function will work.
Regarding the directory structure, let Spark do that for you by partitioning the individual date columns rather than collecting the batch and extracting one element.
Then you'll have year=xxxx/month=xx/day=xx/hour=xx/filetype=xxx/
data
> Images need to be written with the the .jpg or .png extension and same for videos with .mp4 or .mov.
HDFS is not a good location for such files. Use blob storage for media assets
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论