如何在不使用Spark的情况下从S3读取Parquet文件? Java

huangapple go评论112阅读模式
英文:

How to read Parquet file from S3 without spark? Java

问题

目前,我正在使用 Apache ParquetReader 来读取本地 Parquet 文件,大致如下所示:

  1. ParquetReader<GenericData.Record> reader = null;
  2. Path path = new Path("userdata1.parquet");
  3. try {
  4. reader = AvroParquetReader.<GenericData.Record>builder(path).withConf(new Configuration()).build();
  5. GenericData.Record record;
  6. while ((record = reader.read()) != null) {
  7. System.out.println(record);

然而,我正在尝试通过 S3 访问 Parquet 文件,而无需下载它。是否有一种方法可以直接使用 Parquet 读取器解析 InputStream?

英文:

Currently, I am using the Apache ParquetReader for reading local parquet files,
which looks something like this:

  1. ParquetReader&lt;GenericData.Record&gt; reader = null;
  2. Path path = new Path(&quot;userdata1.parquet&quot;);
  3. try {
  4. reader = AvroParquetReader.&lt;GenericData.Record&gt;builder(path).withConf(new Configuration()).build();
  5. GenericData.Record record;
  6. while ((record = reader.read()) != null) {
  7. System.out.println(record);

However, I am trying to access a parquet file through S3 without downloading it. Is there a way to parse Inputstream directly with parquet reader?

答案1

得分: 6

是的,最新版本的Hadoop已经包括对S3文件系统的支持。使用来自hadoop-aws库的s3a客户端可以直接访问S3文件系统。

HadoopInputFile的路径应构造为s3a://bucket-name/prefix/key,同时使用属性配置认证凭据access_keysecret_key

  • fs.s3a.access.key
  • fs.s3a.secret.key

此外,您还需要这些依赖库:

  • hadoop-common JAR
  • aws-java-sdk-bundle JAR

了解更多信息:相关配置属性

英文:

Yes, the latest versions of hadoop include support for S3 filesystem. Use the s3a client from hadoop-aws library to directly access the S3 filesystem.

The HadoopInputFile Path should be constructed as s3a://bucket-name/prefix/key along with the authentication credentials access_key and secret_key configured using the properties

  • fs.s3a.access.key
  • fs.s3a.secret.key

Additionally, you would require these dependant libraries

  • hadoop-common JAR
  • aws-java-sdk-bundle JAR

Read more: Relevant configuration properties

答案2

得分: 2

我使用以下依赖项使其正常工作:

  1. compile 'org.slf4j:slf4j-api:1.7.5'
  2. compile 'org.slf4j:slf4j-log4j12:1.7.5'
  3. compile 'org.apache.parquet:parquet-avro:1.12.0'
  4. compile 'org.apache.avro:avro:1.10.2'
  5. compile 'com.google.guava:guava:11.0.2'
  6. compile 'org.apache.hadoop:hadoop-client:2.4.0'
  7. compile 'org.apache.hadoop:hadoop-aws:3.3.0'
  8. compile 'org.apache.hadoop:hadoop-common:3.3.0'
  9. compile 'com.amazonaws:aws-java-sdk-core:1.11.563'
  10. compile 'com.amazonaws:aws-java-sdk-s3:1.11.563'

示例:

  1. Path path = new Path("s3a://yours3path");
  2. Configuration conf = new Configuration();
  3. conf.set("fs.s3a.access.key", "KEY");
  4. conf.set("fs.s3a.secret.key", "SECRET");
  5. conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
  6. conf.setBoolean("fs.s3a.path.style.access", true);
  7. conf.setBoolean(org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED, true);
  8. InputFile file = HadoopInputFile.fromPath(path, conf);
  9. ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
  10. GenericRecord record;
  11. while ((record = reader.read()) != null) {
  12. System.out.println(record);
  13. }
英文:

I got it working with this following dependencies

  1. compile &#39;org.slf4j:slf4j-api:1.7.5&#39;
  2. compile &#39;org.slf4j:slf4j-log4j12:1.7.5&#39;
  3. compile &#39;org.apache.parquet:parquet-avro:1.12.0&#39;
  4. compile &#39;org.apache.avro:avro:1.10.2&#39;
  5. compile &#39;com.google.guava:guava:11.0.2&#39;
  6. compile &#39;org.apache.hadoop:hadoop-client:2.4.0&#39;
  7. compile &#39;org.apache.hadoop:hadoop-aws:3.3.0&#39;
  8. compile &#39;org.apache.hadoop:hadoop-common:3.3.0&#39;
  9. compile &#39;com.amazonaws:aws-java-sdk-core:1.11.563&#39;
  10. compile &#39;com.amazonaws:aws-java-sdk-s3:1.11.563&#39;

Example

  1. Path path = new Path(&quot;s3a://yours3path&quot;);
  2. Configuration conf = new Configuration();
  3. conf.set(&quot;fs.s3a.access.key&quot;, &quot;KEY&quot;);
  4. conf.set(&quot;fs.s3a.secret.key&quot;, &quot;SECRET&quot;);
  5. conf.set(&quot;fs.s3a.impl&quot;, &quot;org.apache.hadoop.fs.s3a.S3AFileSystem&quot;);
  6. conf.setBoolean(&quot;fs.s3a.path.style.access&quot;, true);
  7. conf.setBoolean(org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED, true);
  8. InputFile file = HadoopInputFile.fromPath(path, conf);
  9. ParquetReader&lt;GenericRecord&gt; reader = AvroParquetReader.&lt;GenericRecord&gt;builder(file).build();
  10. GenericRecord record;
  11. while ((record = reader.read()) != null) {
  12. System.out.println(record);
  13. }

答案3

得分: 1

只在 @franklinsijo 的基础上添加,对于刚开始学习 S3 的新手,请注意要为 Hadoop 配置设置访问密钥和秘密密钥:
以下是一段可能有用的代码片段:

  1. public static void main(String[] args) throws IOException {
  2. String PATH_SCHEMA = "s3a://xxx/xxxx/userdata1.parquet";
  3. Path path = new Path(PATH_SCHEMA);
  4. Configuration conf = new Configuration();
  5. conf.set("fs.s3a.access.key", "xxxxx");
  6. conf.set("fs.s3a.secret.key", "xxxxx");
  7. InputFile file = HadoopInputFile.fromPath(path, conf);
  8. ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
  9. GenericRecord record;
  10. while ((record = reader.read()) != null) {
  11. System.out.println(record.toString());
  12. }
  13. }

我的导入语句:

  1. import org.apache.avro.generic.GenericRecord;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.parquet.hadoop.ParquetReader;
  4. import org.apache.parquet.avro.AvroParquetReader;
  5. import org.apache.parquet.hadoop.util.HadoopInputFile;
  6. import org.apache.parquet.io.InputFile;
  7. import java.io.IOException;
  8. import org.apache.hadoop.fs.Path;
英文:

Just adding on top of @franklinsijo , for freshers starting S3, Please note that access key and secret key is set for Hadoop Configuration:
Here is a snippet of code that might be useful:

  1. public static void main(String[] args) throws IOException {
  2. String PATH_SCHEMA = &quot;s3a://xxx/xxxx/userdata1.parquet&quot;;
  3. Path path = new Path(PATH_SCHEMA);
  4. Configuration conf = new Configuration();
  5. conf.set(&quot;fs.s3a.access.key&quot;, &quot;xxxxx&quot;);
  6. conf.set(&quot;fs.s3a.secret.key&quot;, &quot;xxxxx&quot;);
  7. InputFile file = HadoopInputFile.fromPath(path, conf);
  8. ParquetReader&lt;GenericRecord&gt; reader = AvroParquetReader.&lt;GenericRecord&gt;builder(file).build();
  9. GenericRecord record;
  10. while ((record = reader.read()) != null) {
  11. System.out.println(record.toString());
  12. }

My imports:

  1. import org.apache.avro.generic.GenericRecord;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.parquet.hadoop.ParquetReader;
  4. import org.apache.parquet.avro.AvroParquetReader;
  5. import org.apache.parquet.hadoop.util.HadoopInputFile;
  6. import org.apache.parquet.io.InputFile;
  7. import java.io.IOException;
  8. import org.apache.hadoop.fs.Path;

huangapple
  • 本文由 发表于 2020年4月10日 01:02:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/61126391.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定