英文:
Hive UDF in Java fails when creating a table
问题
The difference between those two queries is:
第一个查询:
SELECT my_fun(col_name) FROM my_table;
第二个查询:
CREATE TABLE new_table AS SELECT my_fun(col_name) FROM my_table;
在第一个查询中,您只是选择从现有表(my_table)中调用 my_fun
函数的结果列。
在第二个查询中,您创建了一个新表(new_table),该表的内容是从现有表(my_table)中调用 my_fun
函数的结果。这将涉及更多的操作和资源,因此可能会导致 Java 错误,特别是在您提到的 Java 文件中的某些操作可能与表的创建有关。
英文:
What is the difference between those two queries:
SELECT my_fun(col_name) FROM my_table;
and
CREATE TABLE new_table AS SELECT my_fun(col_name) FROM my_table;
Where my_fun is a java UDF.
I'm asking, because when I create new table (second query) I receive a java error.
Failure while running task:java.lang.RuntimeException: java.lang.RuntimeException: Map operator initialization failed
...
Caused by: org.apache.hadoop.hive.ql.exec.UDFArgumentException: Unable to instantiate UDF implementation class com.company_name.examples.ExampleUDF: java.lang.NullPointerException
I found that the source of error is line in my java file:
encoded = Files.readAllBytes(Paths.get(configPath));
But the question is why it works when table is not created and fails if table is created?
答案1
得分: 1
问题可能出在你读取文件的方式上。尝试将文件路径作为UDF的第二个参数传递,然后按以下方式读取:
private BufferedReader getReaderFor(String filePath) throws HiveException {
try {
Path fullFilePath = FileSystems.getDefault().getPath(filePath);
Path fileName = fullFilePath.getFileName();
if (Files.exists(fileName)) {
return Files.newBufferedReader(fileName, Charset.defaultCharset());
}
else
if (Files.exists(fullFilePath)) {
return Files.newBufferedReader(fullFilePath, Charset.defaultCharset());
}
else {
throw new HiveException("Could not find \"" + fileName + "\" or \"" + fullFilePath + "\" in intersect_file() UDF.");
}
}
catch(IOException exception) {
throw new HiveException(exception);
}
}
private void loadFromFile(String filePath) throws HiveException {
set = new HashSet<String>();
try (BufferedReader reader = getReaderFor(filePath)) {
String line;
while((line = reader.readLine()) != null) {
set.add(line);
}
} catch (IOException e) {
throw a HiveException(e);
}
}
不同通用UDF的完整代码,它使用文件读取器,可以在此处找到。
英文:
The problem might be with the way you read the file. Try to pass the file path as the second argument in the UDF, then read as follows
private BufferedReader getReaderFor(String filePath) throws HiveException {
try {
Path fullFilePath = FileSystems.getDefault().getPath(filePath);
Path fileName = fullFilePath.getFileName();
if (Files.exists(fileName)) {
return Files.newBufferedReader(fileName, Charset.defaultCharset());
}
else
if (Files.exists(fullFilePath)) {
return Files.newBufferedReader(fullFilePath, Charset.defaultCharset());
}
else {
throw new HiveException("Could not find \"" + fileName + "\" or \"" + fullFilePath + "\" in inersect_file() UDF.");
}
}
catch(IOException exception) {
throw new HiveException(exception);
}
}
private void loadFromFile(String filePath) throws HiveException {
set = new HashSet<String>();
try (BufferedReader reader = getReaderFor(filePath)) {
String line;
while((line = reader.readLine()) != null) {
set.add(line);
}
} catch (IOException e) {
throw new HiveException(e);
}
}
The full code for different generic UDF that utilizes file reader can be found here
答案2
得分: 1
以下是翻译的内容:
我认为有一些不太清楚的地方,所以这个答案是基于假设的。
首先,重要的是要理解,Hive目前优化了几个简单的查询,根据你的数据大小,你执行的查询SELECT my_fun(col_name) FROM my_table;
很可能是在执行作业的客户端本地运行的,这是为什么你的UDF可以访问本地的配置文件,这个“执行模式”是因为你的数据大小。CTAS触发一个独立于输入数据的作业,这个作业在集群中分布运行,每个工作节点都无法访问你的配置文件。
看起来你试图从本地文件系统而不是HDSFS中读取配置文件,这意味着你的配置必须要么在所有工作节点中复制,要么在分布式缓存中事先添加(你可以使用从这里的文档中的add file)。关于从UDF访问分布式缓存中的文件的问题,你可以在这里找到其他问题。
另一个问题是你通过环境变量传递了配置文件的位置,但这并没有传播到作业的工作节点作为Hive作业的一部分。你应该将这个配置作为Hive配置传递,关于从UDF访问Hive配置的答案可以在这里找到,假设你正在扩展GenericUDF。
英文:
I think there are several points unclear, so this answer is based on assumptions.
First of all, it is important to understand that hive currently optimize several simple queries and depending on the size of your data, the query that is working for you SELECT my_fun(col_name) FROM my_table;
is most likely running locally from the client where you are executing the job, that is why you UDF can access your config file locally available, this "execution mode" is because the size of your data. CTAS trigger a job independent on the input data, this job runs distributed in the cluster where each worker fail accessing your config file.
It looks like you are trying to read your configuration file from the local file system, not from the HDSFS Files.readAllBytes(Paths.get(configPath))
, this means that your configuration has to either be replicated in all the worker nodes or be added previously to the distributed cache (you can use add file from this, doc here. You can find another questions here about accessing files from the distributed cache from UDFs.
One additional problem is that you are passing the location of your config file through an environment variable which is not propagated to worker nodes as part of your hive job. You should pass this configuration as a hive config, there is an answer for accessing Hive Config from UDF here assuming that you are extending GenericUDF.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论