How can we read historical data using databricks from kinesis or kafka by specifying starting and ending time stamp?

huangapple go评论54阅读模式
英文:

How can we read historical data using databricks from kinesis or kafka by specifying starting and ending time stamp?

问题

以下是要翻译的代码部分:

spark.readStream.format("kinesis").option("streamName", kinesisStreamName).option("region", kinesisRegion).option("initialPosition", '{"at_timestamp": "03/08/2023 00:00:00 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}').option("awsAccessKey", awsAccessKeyId).option("awsSecretKey", awsSecretKey).load()

英文:

Lets says I'd like to read data arrived in the period between 8th mar 2023 to 14th mar 2023

Is there a way we can define ending position along with initialPosition in below.

spark.readStream.format("kinesis").option("streamName", kinesisStreamName).option("region", kinesisRegion).option("initialPosition", '{"at_timestamp": "03/08/2023 00:00:00 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}').option("awsAccessKey", awsAccessKeyId).option("awsSecretKey", awsSecretKey).load()

答案1

得分: 2

I think what you are looking for is a Batch processing not a Stream processing, since you desire like a backfill job.

Unfortunately, you can't set like endPosition config to Spark Streaming app to read Kafka or Kinesis.

Some suggestions:

1- If you have a chance like changing Kinesis to Kafka then you can use spark.read("kafka") method instead spark.readStream("kafka"). So, you can use below parameters.

    .option("startingOffsets", start_offset) \
    .option("endingOffsets", end_offset) \

2- If Kinesis usage is required, then you can feed a s3 path with your this Kinesis Stream. Then you can consume its data files with Spark by setting a start-end where condition. (I would recommend AWS-Glue pushdown_predicate feature not to read all data).

Thanks.

英文:

I think what you are looking for is a Batch processing not a Stream processing, since you desire like a backfill job.

Unfortunately, you can't set like endPosition config to Spark Streaming app to read Kafka or Kinesis.

Some suggestions:

1- If you have a chance like changing Kinesis to Kafka then you can use spark.read("kafka") method instead spark.readStream("kafka"). So, you can use below parameters.

    .option("startingOffsets", start_offset) \
    .option("endingOffsets", end_offset) \

2- If Kinesis usage is required, then you can feed a s3 path with your this Kinesis Stream. Then you can consume its data files with Spark by setting a start-end where condition. (I would recommend AWS-Glue pushdown_predicate feature not to read all data).

Thanks.

huangapple
  • 本文由 发表于 2023年3月15日 20:16:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/75744580.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定