英文:
Write the result of SQL Query to file by Apache Flink
问题
以下是翻译好的内容:
我有以下任务:
- 创建一个带有 Hive 表 SQL 请求的作业;
- 在远程 Flink 集群上运行此作业;
- 将此作业的结果收集到文件中(最好使用 HDFS)。
注意
因为需要在远程 Flink 集群上运行此作业,所以我不能以简单的方式使用 TableEnvironment。这个问题在此工单中有提到:https://issues.apache.org/jira/browse/FLINK-18095。对于当前的解决方案,我使用了来自 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-Environment-for-Remote-Execution-td35691.html 的建议。
代码
EnvironmentSettings batchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
// 创建远程环境
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "/path/to/my/jar");
// 创建 StreamTableEnvironment
TableConfig tableConfig = new TableConfig();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
batchSettings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
batchSettings.getBuiltInCatalogName(),
batchSettings.getBuiltInDatabaseName()))
.executionConfig(
streamExecutionEnvironment.getConfig())
.build();
ModuleManager moduleManager = new ModuleManager();
BatchExecutor batchExecutor = new BatchExecutor(streamExecutionEnvironment);
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
StreamTableEnvironmentImpl tableEnv = new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
functionCatalog,
tableConfig,
streamExecutionEnvironment,
new BatchPlanner(batchExecutor, tableConfig, functionCatalog, catalogManager),
batchExecutor,
false);
// 配置 HiveCatalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf"; // 本地路径
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
// 对 Hive 发出请求
Table table = tableEnv.sqlQuery("select * from myhive.`default`.test");
问题
在这一步,我可以调用 table.execute() 方法,然后通过 collect() 方法获取 CloseableIterator。但在我的情况下,我可能会得到大量的行作为我的请求结果,最好能将其收集到文件中(在 HDFS 中使用 ORC 格式)。
我该如何实现我的目标呢?
英文:
I have the following task:
- Create a job with SQL request to Hive table;
- Run this job on remote Flink cluster;
- Collect the result of this job in file (HDFS is preferable).
Note
Because it is necessary to run this job on remote Flink cluster i can not use TableEnvironment in a simple way. This problem is mentioned in this ticket: https://issues.apache.org/jira/browse/FLINK-18095. For current solution I use adivce from http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-Environment-for-Remote-Execution-td35691.html.
Code
EnvironmentSettings batchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
// create remote env
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "/path/to/my/jar");
// create StreamTableEnvironment
TableConfig tableConfig = new TableConfig();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
batchSettings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
batchSettings.getBuiltInCatalogName(),
batchSettings.getBuiltInDatabaseName()))
.executionConfig(
streamExecutionEnvironment.getConfig())
.build();
ModuleManager moduleManager = new ModuleManager();
BatchExecutor batchExecutor = new BatchExecutor(streamExecutionEnvironment);
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
StreamTableEnvironmentImpl tableEnv = new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
functionCatalog,
tableConfig,
streamExecutionEnvironment,
new BatchPlanner(batchExecutor, tableConfig, functionCatalog, catalogManager),
batchExecutor,
false);
// configure HiveCatalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf"; // a local path
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
// request to Hive
Table table = tableEnv.sqlQuery("select * from myhive.`default`.test");
Question
On this step I can call table.execute() method and after it get CloseableIterator by collect() method. But in my case I can get a large count of rows as a result of my request and it will be perfect to collect it into file (ORC in HDFS).
How can I reach my goal?
答案1
得分: 1
Table.execute().collect()
将视图的结果返回到客户端,以供交互使用。在您的情况下,您可以使用文件系统连接器,并使用 INSERT INTO
将视图写入文件中。例如:
// 创建一个文件系统表
tableEnvironment.executeSql("CREATE TABLE MyUserTable (\n" +
" column_name1 INT,\n" +
" column_name2 STRING,\n" +
" ... " +
" \n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'hdfs://path/to/your/file',\n" +
" 'format' = 'orc' \n" +
")");
// 提交作业
tableEnvironment.executeSql("insert into MyUserTable select * from myhive.`default`.test");
了解有关文件系统连接器的更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
英文:
Table.execute().collect()
returns the result of the view to your client side for interactive purpose. In your case, you can use the filesystem connector and use INSERT INTO
for writing the view to the file. For example:
// create a filesystem table
tableEnvironment.executeSql("CREATE TABLE MyUserTable (\n" +
" column_name1 INT,\n" +
" column_name2 STRING,\n" +
" ..." +
" \n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'hdfs://path/to/your/file',\n" +
" 'format' = 'orc' \n" +
")");
// submit the job
tableEnvironment.executeSql("insert into MyUserTable select * from myhive.`default`.test");
See more about the filesystem connector: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论