英文:
How can we read historical data using databricks from kinesis or kafka by specifying starting and ending time stamp?
问题
以下是要翻译的代码部分:
spark.readStream.format("kinesis").option("streamName", kinesisStreamName).option("region", kinesisRegion).option("initialPosition", '{"at_timestamp": "03/08/2023 00:00:00 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}').option("awsAccessKey", awsAccessKeyId).option("awsSecretKey", awsSecretKey).load()
英文:
Lets says I'd like to read data arrived in the period between 8th mar 2023 to 14th mar 2023
Is there a way we can define ending position along with initialPosition in below.
spark.readStream.format("kinesis").option("streamName", kinesisStreamName).option("region", kinesisRegion).option("initialPosition", '{"at_timestamp": "03/08/2023 00:00:00 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}').option("awsAccessKey", awsAccessKeyId).option("awsSecretKey", awsSecretKey).load()
答案1
得分: 2
I think what you are looking for is a Batch processing not a Stream processing, since you desire like a backfill job.
Unfortunately, you can't set like endPosition config to Spark Streaming app to read Kafka or Kinesis.
Some suggestions:
1- If you have a chance like changing Kinesis to Kafka then you can use spark.read("kafka") method instead spark.readStream("kafka"). So, you can use below parameters.
.option("startingOffsets", start_offset) \
.option("endingOffsets", end_offset) \
2- If Kinesis usage is required, then you can feed a s3 path with your this Kinesis Stream. Then you can consume its data files with Spark by setting a start-end where condition. (I would recommend AWS-Glue pushdown_predicate feature not to read all data).
Thanks.
英文:
I think what you are looking for is a Batch processing not a Stream processing, since you desire like a backfill job.
Unfortunately, you can't set like endPosition config to Spark Streaming app to read Kafka or Kinesis.
Some suggestions:
1- If you have a chance like changing Kinesis to Kafka then you can use spark.read("kafka") method instead spark.readStream("kafka"). So, you can use below parameters.
.option("startingOffsets", start_offset) \
.option("endingOffsets", end_offset) \
2- If Kinesis usage is required, then you can feed a s3 path with your this Kinesis Stream. Then you can consume its data files with Spark by setting a start-end where condition. (I would recommend AWS-Glue pushdown_predicate feature not to read all data).
Thanks.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论