在每个Apache Spark工作节点上创建一个Java HBase客户端实例。

huangapple go评论84阅读模式
英文:

Creating an instance of java hbase client on each Apache Spark worker node

问题

与 Spark Structured Streaming 一起工作。

我正在处理一段代码,其中我需要对数据执行许多查找操作。这些查找操作非常复杂,很难转化为连接操作。

例如,在表 B 中查找字段 A 并获取一个值,如果找到,在另一个表中查找该值。如果未在表 B 中找到,在表 D 中查找另一个值 C,依此类推。

我使用 HBase 编写了这些查找操作,它在功能上运行良好。
我为每个查找操作编写了 UDF,例如一个非常简单的示例可能是:

val someColFunc = udf((code: String) => {
    val value = HbaseObject.table.getRow("lookupTable", code, "cf", "value1")
    if (value != null)
        Bytes.toString(value)
    else
        null
})

由于 Java HBase 客户端不可序列化,我通过以下方式传递 Hbase 对象:

object HbaseObject {
    val table = new HbaseUtilities(zkUrl)
}

HbaseUtilities 是我编写的一个类,用于简化查找操作。它只是创建一个 Java HBase 客户端,并为我需要的获取命令提供了包装。

这使我的代码变得非常慢,这也是可以接受的。令我困惑的是,增加或减少执行器或内核的数量对我的代码速度没有任何影响。无论是 1 个执行器还是 30 个执行器,它的运行速度都完全相同。这使我相信存在缺乏并行性。因此,所有的工作节点可能都在共享同一个 HBase 对象。是否有一种方法可以在每个工作节点在开始执行之前实例化一个这样的对象?
我已经尝试过使用懒惰的 lazy val,但没有任何效果。

我甚至尝试过创建一个共享的单例,就像这里所示:https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/,这对我解决了一些问题,但并没有解决并行性丧失的问题。

我知道可能有更好的方法来解决这个问题,我非常欢迎所有的建议,但现在我受到了一些限制和紧迫的时间表所困扰。

英文:

Working with Spark Structured Streaming.

I am working on a code where I need to do a lot of lookups on data. Lookups are very complex and just don't translate too well to joins.

e.g. look up field A in Table B and fetch a value, if found lookup that values in another table. if not found lookup some other value C in table D and then so on and so forth.

I managed to write these lookups using HBase and it works fine, functionally.
I wrote udfs for each of these lookups e.g. a very simple one might be:

val someColFunc= udf( (code:String) =>
		{
			val value = HbaseObject.table.getRow("lookupTable", code, "cf", "value1")
			if (value != null)
				Bytes.toString(value)
			else
				null
		}
	)

Since java hbase client is non serializable. I am passing Hbase object like this

object HbaseObject {
 val table = new HbaseUtilities(zkUrl)
}

HbaseUtilities is a class I wrote to simplify lookups. It just creates a java HBase client and provides a wrapper for the kind of get commands I need.

This is rendering my code too slow, which too, is alright. What's puzzling me, is that increasing or decreasing the number of executors or cores is having no effect on the speed of my code. be it 1 executor or 30 it's running at the exact same rate. Which makes me believe there is lack of parallelism. So all my workers must be sharing the same Hbase object. Is their a way I can instantiate one such object on each worker before they start executing?
I have already tried using lazy val, it's not having any effect

I have even tried creating a sharedSingleton as shown here https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/, which solved some problems for me but not the loss of parallelism.

I know there might be better ways to solve the problem and all suggestions are very welcome but right now I'm caught in a few constraints and a tight timeline.

答案1

得分: 1

你需要在执行程序中创建所有不可序列化的对象。
你可以使用foreachPartitionmapPartitions在每个执行程序中创建连接。

类似于以下内容(我正在使用HBase客户端2.0.0):

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put, Result}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}

df.foreachPartition(
partition => {
  //为每个执行程序创建连接和表
  val config: Configuration = HBaseConfiguration.create()
  config.set("hbase.zookeeper.quorum", "zk url")
  val connection: Connection = ConnectionFactory.createConnection(config)
  val table = connection.getTable(TableName.valueOf("tableName"))
  partition.map(
    record => {
      val byteKey = Bytes.toBytes(record.getString(0))
      val get = new Get(byteKey)
      val result = table.get(get)
      //在每条记录上执行您的逻辑
    }
  ).toList
  table.close()
  connection.close()
}
)

df是您想要查找的每条记录的数据框架。

您可以为每个执行程序创建所需的表格,而无需担心不可序列化问题。您可以将其放在类似于HbaseUtilities的类中,以在foreach/mapPartitions内部仅创建一个新实例。

英文:

You need to create all non serializable objects in the executor.
you can use foreachPartition or mapPartitions to create a connection in each executor.

Something similar to this (i'm using hbase client 2.0.0):

 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put, Result}
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}


df.foreachPartition(
partition => {
  //foreach executor create the connection and the table
  val config: Configuration = HBaseConfiguration.create()
  config.set("hbase.zookeeper.quorum", "zk url")
  val connection: Connection = ConnectionFactory.createConnection(config)
  val table = connection.getTable(TableName.valueOf("tableName"))
  partition.map(
    record => {
      val byteKey = Bytes.toBytes(record.getString(0))
      val get = new Get(byteKey)
      val result = table.get(get)
      //DO YOUR LOGIC HERE FOR EACH RECORD
    }
  ).toList
  table.close()
  connection.close()
}
)

df is the dataframe for each record you want to do the lookup.

You can create as many tables you need for each executor for the same connection.

As you create all the objects in executors you don't need to deal with non serializable problems. You can have it in a class like your HbaseUtilities to be used there but you need to create a new instance only inside the foreach/map partitions

答案2

得分: 0

你可以通过使用HBase项目的主分支中的HBase-Spark Connector来实现你正在尝试的操作。出于某种原因,连接器似乎没有包含在任何官方的HBase构建中,但你可以自己构建它,它可以正常工作。只需构建该JAR文件并将其包含在你的pom.xml中。

一旦构建完成,该连接器将允许你在Worker类内部传递HBase连接对象,因此你无需担心序列化连接或构建单例等问题。

例如:

JavaSparkContext jSPContext ...; // 创建Java Spark Context
HBaseConfiguration hbConf = HBaseConfiguration.create();
hbConf.set("hbase.zookeeper.quorum", zkQuorum);
hbConf.set("hbase.zookeeper.property.clientPort", PORT_NUM);
// 这是你从Spark访问HBase的关键链接 - 每次在Spark并行性内访问HBase时都要使用它:
JavaHBaseContext hBaseContext = new JavaHBaseContext(jSPContext, hbConf);

// 创建一个RDD并将其与HBase访问并行化:
JavaRDD<String> myRDD = ... // 创建你的RDD
hBaseContext.foreachPartition(myRDD, new SparkHBaseWorker());
// 你还可以执行其他常见的Spark任务,如mapPartitions,forEach等。

// 用于RDD类型为String的foreachPartition用例的Spark Worker类可能如下所示:
class SparkHBaseWorker implements VoidFunction<Tuple2<Iterator<String>, Connection>>
{
    private static final long serialVersionUID = 1L;

    public WorkerIngest()
    {
    }

    // 将所有HBase逻辑放入此函数中:
    @Override
    public void call(Tuple2<Iterator<String>, Connection> t) throws Exception
    {			
        // 这是你的HBase连接对象:
        Connection conn = t._2();
        // 现在你可以从此Spark worker节点直接访问HBase:
        Table hbTable = conn.getTable(TableName.valueOf(MY_TABLE));
        // 现在可以对表格等进行操作。
    }
}

注意:由于代码段可能需要进一步上下文以进行准确的翻译,因此翻译结果可能不具有完全的准确性。

英文:

You can accomplish what you are trying to do by using the HBase-Spark Connector from the main branch of the HBase project. For some reason the connector doesn't seem to be included in any official HBase builds, but you can build it yourself and it works fine. Just build the jar and include it in your pom.xml.

Once built, the connector will allow you to pass the HBase Connection object inside the Worker class, so you don't have to worry about serializing the connection or building singletons/etc.

For example:

JavaSparkContext jSPContext ...; //Create Java Spark Context
HBaseConfiguration hbConf = HBaseConfiguration.create();
hbConf.set(&quot;hbase.zookeeper.quorum&quot;, zkQuorum);
hbConf.set(&quot;hbase.zookeeper.property.clientPort&quot;, PORT_NUM);
// this is your key link to HBase from Spark -- use it every time you need to access HBase inside the Spark parallelism:
JavaHBaseContext hBaseContext = new JavaHBaseContext(jSPContext, hbConf);	

// Create an RDD and parallelize it with HBase access:
JavaRDD&lt;String&gt; myRDD = ... //create your RDD
hBaseContext.foreachPartition(myRDD,  new SparkHBaseWorker());
// You can also do other usual Spark tasks, like mapPartitions, forEach, etc.

// The Spark worker class for foreachPartition use-case on RDD of type String would look something like this:
class SparkHBaseWorker implements VoidFunction&lt;Tuple2&lt;Iterator&lt;String&gt;, Connection&gt;&gt;
{
	private static final long serialVersionUID = 1L;
	
	public WorkerIngest()
	{
	}
	
// Put all your HBase logic into this function:
	@Override
	public void call(Tuple2&lt;Iterator&lt;String&gt;, Connection&gt; t) throws Exception
	{			
		// This is your HBase connection object:
		Connection conn = t._2();
		// Now you can do direct access to HBase from this Spark worker node:
		Table hbTable = conn.getTable(TableName.valueOf(MY_TABLE));
		// now do something with the table/etc.
	}
}

huangapple
  • 本文由 发表于 2020年7月26日 22:51:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/63101754.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定