将SQL查询结果写入文件,使用Apache Flink。

huangapple go评论69阅读模式
英文:

Write the result of SQL Query to file by Apache Flink

问题

以下是翻译好的内容:

我有以下任务:

  1. 创建一个带有 Hive 表 SQL 请求的作业;
  2. 在远程 Flink 集群上运行此作业;
  3. 将此作业的结果收集到文件中(最好使用 HDFS)。

注意

因为需要在远程 Flink 集群上运行此作业,所以我不能以简单的方式使用 TableEnvironment。这个问题在此工单中有提到:https://issues.apache.org/jira/browse/FLINK-18095。对于当前的解决方案,我使用了来自 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-Environment-for-Remote-Execution-td35691.html 的建议。

代码

EnvironmentSettings batchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
// 创建远程环境
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "/path/to/my/jar");
// 创建 StreamTableEnvironment
TableConfig tableConfig = new TableConfig();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
CatalogManager catalogManager = CatalogManager.newBuilder()
                                              .classLoader(classLoader)
                                              .config(tableConfig.getConfiguration())
                                              .defaultCatalog(
                                                  batchSettings.getBuiltInCatalogName(),
                                                  new GenericInMemoryCatalog(
                                                      batchSettings.getBuiltInCatalogName(),
                                                      batchSettings.getBuiltInDatabaseName()))
                                              .executionConfig(
                                                  streamExecutionEnvironment.getConfig())
                                              .build();
ModuleManager moduleManager = new ModuleManager();
BatchExecutor batchExecutor = new BatchExecutor(streamExecutionEnvironment);
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
StreamTableEnvironmentImpl tableEnv = new StreamTableEnvironmentImpl(
    catalogManager,
    moduleManager,
    functionCatalog,
    tableConfig,
    streamExecutionEnvironment,
    new BatchPlanner(batchExecutor, tableConfig, functionCatalog, catalogManager),
    batchExecutor,
    false);
// 配置 HiveCatalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf"; // 本地路径
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
// 对 Hive 发出请求
Table table = tableEnv.sqlQuery("select * from myhive.`default`.test");

问题

在这一步,我可以调用 table.execute() 方法,然后通过 collect() 方法获取 CloseableIterator。但在我的情况下,我可能会得到大量的行作为我的请求结果,最好能将其收集到文件中(在 HDFS 中使用 ORC 格式)。

我该如何实现我的目标呢?

英文:

I have the following task:

  1. Create a job with SQL request to Hive table;
  2. Run this job on remote Flink cluster;
  3. Collect the result of this job in file (HDFS is preferable).

Note

Because it is necessary to run this job on remote Flink cluster i can not use TableEnvironment in a simple way. This problem is mentioned in this ticket: https://issues.apache.org/jira/browse/FLINK-18095. For current solution I use adivce from http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-Environment-for-Remote-Execution-td35691.html.

Code

EnvironmentSettings batchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
// create remote env
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "/path/to/my/jar");
// create StreamTableEnvironment
TableConfig tableConfig = new TableConfig();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
batchSettings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
batchSettings.getBuiltInCatalogName(),
batchSettings.getBuiltInDatabaseName()))
.executionConfig(
streamExecutionEnvironment.getConfig())
.build();
ModuleManager moduleManager = new ModuleManager();
BatchExecutor batchExecutor = new BatchExecutor(streamExecutionEnvironment);
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
StreamTableEnvironmentImpl tableEnv = new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
functionCatalog,
tableConfig,
streamExecutionEnvironment,
new BatchPlanner(batchExecutor, tableConfig, functionCatalog, catalogManager),
batchExecutor,
false);
// configure HiveCatalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf"; // a local path
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
// request to Hive
Table table = tableEnv.sqlQuery("select * from myhive.`default`.test");

Question

On this step I can call table.execute() method and after it get CloseableIterator by collect() method. But in my case I can get a large count of rows as a result of my request and it will be perfect to collect it into file (ORC in HDFS).

How can I reach my goal?

答案1

得分: 1

Table.execute().collect() 将视图的结果返回到客户端,以供交互使用。在您的情况下,您可以使用文件系统连接器,并使用 INSERT INTO 将视图写入文件中。例如:

// 创建一个文件系统表
tableEnvironment.executeSql("CREATE TABLE MyUserTable (\n" +
	"  column_name1 INT,\n" +
	"  column_name2 STRING,\n" +
	"  ... " +
	" \n" +
	") WITH (\n" +
	"  'connector' = 'filesystem',\n" +
	"  'path' = 'hdfs://path/to/your/file',\n" +
	"  'format' = 'orc' \n" +
	")");

// 提交作业
tableEnvironment.executeSql("insert into MyUserTable select * from myhive.`default`.test");

了解有关文件系统连接器的更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

英文:

Table.execute().collect() returns the result of the view to your client side for interactive purpose. In your case, you can use the filesystem connector and use INSERT INTO for writing the view to the file. For example:

// create a filesystem table
tableEnvironment.executeSql("CREATE TABLE MyUserTable (\n" +
	"  column_name1 INT,\n" +
	"  column_name2 STRING,\n" +
	"  ..." +
	" \n" +
	") WITH (\n" +
	"  'connector' = 'filesystem',\n" +
	"  'path' = 'hdfs://path/to/your/file',\n" +
	"  'format' = 'orc' \n" +
	")");

// submit the job
tableEnvironment.executeSql("insert into MyUserTable select * from myhive.`default`.test");

See more about the filesystem connector: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

huangapple
  • 本文由 发表于 2020年9月7日 23:05:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/63780140.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定