Amazon Keyspace(Cassandra)查询没有可用于执行查询的节点。

huangapple go评论87阅读模式
英文:

Amazon Keyspace (Cassandra) query no node was available to execute query

问题

我在AWS EMR中使用运行在Apache Flink上的AWS KeyspaceCassandra 3.11.2)。有时以下查询会抛出异常相同的代码在AWS Lambda上也出现了相同的NoHost异常我做错了什么

String query = "INSERT INTO TEST (field1, field2) VALUES(?, ?)";
PreparedStatement prepared = CassandraConnector.prepare(query);
int i = 0;
BoundStatement bound = prepared.bind().setString(i++, "Field1").setString(i++, "Field2")
                    .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
ResultSet rs = CassandraConnector.execute(bound);
com.datastax.oss.driver.api.core.NoNodeAvailableException: 没有可用的节点来执行查询
 at com.datastax.oss.driver.api.core.NoNodeAvailableException.copy(NoNodeAvailableException.java:40)
 at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
 at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:53)
 at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30)
 at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230)
 at com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:53)
 at com.test.manager.connectors.CassandraConnector.execute(CassandraConnector.java:16)
 at com.test.repository.impl.BackupRepositoryImpl.insert(BackupRepositoryImpl.java:36)
 at com.test.service.impl.BackupServiceImpl.insert(BackupServiceImpl.java:18)
 at com.test.flink.function.AsyncBackupFunction.processMessage(AsyncBackupFunction.java:78)
 at com.test.flink.function.AsyncBackupFunction.lambda$asyncInvoke$0(AsyncBackupFunction.java:35)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

这是我的代码:

CassandraConnector.java:
因为初始化预准备语句的成本很高,我对其进行了缓存。

public class CassandraConnector {
    private static final ConcurrentHashMap<String, PreparedStatement> preparedStatementCache = new ConcurrentHashMap<String, PreparedStatement>();

    public static ResultSet execute(BoundStatement bound) {
        CqlSession session = CassandraManager.getSessionInstance();
        return session.execute(bound);
    }

    public static ResultSet execute(String query) {
        CqlSession session = CassandraManager.getSessionInstance();
        return session.execute(query);
    }

    public static PreparedStatement prepare(String query) {
        PreparedStatement result = preparedStatementCache.get(query);
        if (result == null) {
            CqlSession session = CassandraManager.getSessionInstance();
            result = session.prepare(query);
            preparedStatementCache.putIfAbsent(query, result);
        }

        return result;
    }
}

CassandraManager.java:
我对会话对象使用了单例的双重检查锁定。

public class CassandraManager {
    private static final Logger logger = LoggerFactory.getLogger(CassandraManager.class);
    private static final String SSL_CASSANDRA_PASSWORD = "password";
    private static volatile CqlSession session;

    static {
        try {
            initSession();
        } catch (Exception e) {
            logger.error("Error CassandraManager getSessionInstance", e);
        }
    }

    private static void initSession() {
        List<InetSocketAddress> contactPoints = Collections.singletonList(InetSocketAddress.createUnresolved(
                "cassandra.ap-southeast-1.amazonaws.com", 9142));
        DriverConfigLoader loader = DriverConfigLoader.fromClasspath("application.conf");

        Long start = BaseHelper.getTime();
        session = CqlSession.builder().addContactPoints(contactPoints).withConfigLoader(loader)
                .withAuthCredentials(AppUtil.getProperty("cassandra.username"),
                        AppUtil.getProperty("cassandra.password"))
                .withSslContext(getSSLContext()).withLocalDatacenter("ap-southeast-1")
                .withKeyspace(AppUtil.getProperty("cassandra.keyspace")).build();
        logger.info("End connect: " + (new Date().getTime() - start));

    }

    public static CqlSession getSessionInstance() {
        if (session == null || session.isClosed()) {
            synchronized (CassandraManager.class) {
                if (session == null || session.isClosed()) {
                    initSession();
                }
            }
        }

        return session;
    }

    public static SSLContext getSSLContext() {
        InputStream in = null;
        try {
            KeyStore ks = KeyStore.getInstance("JKS");
            in = CassandraManager.class.getClassLoader().getResourceAsStream("cassandra_truststore.jks");
            ks.load(in, SSL_CASSANDRA_PASSWORD.toCharArray());
            TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            tmf.init(ks);

            SSLContext ctx = SSLContext.getInstance("TLS");
            ctx.init(null, tmf.getTrustManagers(), null);
            return ctx;
        } catch (Exception e) {
            logger.error("Error CassandraConnector getSSLContext", e);
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    logger.error("", e);
                }
            }
        }

        return null;
    }
}

application.conf

datastax-java-driver {
  basic.request {
    timeout = 5 seconds
    consistency = LOCAL_ONE
  }
  advanced.connection {
    max-requests-per-connection = 1024
    pool {
      local.size = 1
      remote.size = 1
    }
  }
  advanced.reconnect-on-init = true
  advanced.reconnection-policy {
    class = ExponentialReconnectionPolicy
    base-delay = 1 second
    max-delay = 60 seconds
  }
  advanced.retry-policy {
    class = DefaultRetryPolicy
  }
  advanced.protocol {
    version = V4
  }

  advanced.heartbeat {
    interval = 30 seconds
    timeout = 1 second
  }

  advanced.session-leak.threshold = 8
  advanced.metadata.token-map.enabled = false
}
英文:

I'm using AWS Keyspace (Cassandra 3.11.2) run on Apache Flink in AWS EMR. Some time below query throws Exception. The same code used on AWS Lambda also had the same Exception NoHost. What did I do wrong?

String query = &quot;INSERT INTO TEST (field1, field2) VALUES(?, ?)&quot;;
PreparedStatement prepared = CassandraConnector.prepare(query);
int i = 0;
BoundStatement bound = prepared.bind().setString(i++, &quot;Field1&quot;).setString(i++, &quot;Field2&quot;)
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
ResultSet rs = CassandraConnector.execute(bound);
 at com.datastax.oss.driver.api.core.NoNodeAvailableException.copy(NoNodeAvailableException.java:40)
at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:53)
at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30)
at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230)
at com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:53)
at com.test.manager.connectors.CassandraConnector.execute(CassandraConnector.java:16)
at com.test.repository.impl.BackupRepositoryImpl.insert(BackupRepositoryImpl.java:36)
at com.test.service.impl.BackupServiceImpl.insert(BackupServiceImpl.java:18)
at com.test.flink.function.AsyncBackupFunction.processMessage(AsyncBackupFunction.java:78)
at com.test.flink.function.AsyncBackupFunction.lambda$asyncInvoke$0(AsyncBackupFunction.java:35)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

This is my code:

CassandraConnector.java:
Because cost of init preparedStatement is huge, I'm cached this.

public class CassandraConnector {
private static final ConcurrentHashMap&lt;String, PreparedStatement&gt; preparedStatementCache = new ConcurrentHashMap&lt;String, PreparedStatement&gt;();
public static ResultSet execute(BoundStatement bound) {
CqlSession session = CassandraManager.getSessionInstance();
return session.execute(bound);
}
public static ResultSet execute(String query) {
CqlSession session = CassandraManager.getSessionInstance();
return session.execute(query);
}
public static PreparedStatement prepare(String query) {
PreparedStatement result = preparedStatementCache.get(query);
if (result == null) {
CqlSession session = CassandraManager.getSessionInstance();
result = session.prepare(query);
preparedStatementCache.putIfAbsent(query, result);
}
return result;
}
}

CassandraManager.java:
I'm using singleton double-check locking for session object.

public class CassandraManager {
private static final Logger logger = LoggerFactory.getLogger(CassandraManager.class);
private static final String SSL_CASSANDRA_PASSWORD = &quot;password&quot;;
private static volatile CqlSession session;
static {
try {
initSession();
} catch (Exception e) {
logger.error(&quot;Error CassandraManager getSessionInstance&quot;, e);
}
}
private static void initSession() {
List&lt;InetSocketAddress&gt; contactPoints = Collections.singletonList(InetSocketAddress.createUnresolved(
&quot;cassandra.ap-southeast-1.amazonaws.com&quot;, 9142));
DriverConfigLoader loader = DriverConfigLoader.fromClasspath(&quot;application.conf&quot;);
Long start = BaseHelper.getTime();
session = CqlSession.builder().addContactPoints(contactPoints).withConfigLoader(loader)
.withAuthCredentials(AppUtil.getProperty(&quot;cassandra.username&quot;),
AppUtil.getProperty(&quot;cassandra.password&quot;))
.withSslContext(getSSLContext()).withLocalDatacenter(&quot;ap-southeast-1&quot;)
.withKeyspace(AppUtil.getProperty(&quot;cassandra.keyspace&quot;)).build();
logger.info(&quot;End connect: &quot; + (new Date().getTime() - start));
}
public static CqlSession getSessionInstance() {
if (session == null || session.isClosed()) {
synchronized (CassandraManager.class) {
if (session == null || session.isClosed()) {
initSession();
}
}
}
return session;
}
public static SSLContext getSSLContext() {
InputStream in = null;
try {
KeyStore ks = KeyStore.getInstance(&quot;JKS&quot;);
in = CassandraManager.class.getClassLoader().getResourceAsStream(&quot;cassandra_truststore.jks&quot;);
ks.load(in, SSL_CASSANDRA_PASSWORD.toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ks);
SSLContext ctx = SSLContext.getInstance(&quot;TLS&quot;);
ctx.init(null, tmf.getTrustManagers(), null);
return ctx;
} catch (Exception e) {
logger.error(&quot;Error CassandraConnector getSSLContext&quot;, e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.error(&quot;&quot;, e);
}
}
}
return null;
}
}

application.conf

datastax-java-driver {
basic.request {
timeout = 5 seconds
consistency = LOCAL_ONE
}
advanced.connection {
max-requests-per-connection = 1024
pool {
local.size = 1
remote.size = 1
}
}
advanced.reconnect-on-init = true
advanced.reconnection-policy {
class = ExponentialReconnectionPolicy
base-delay = 1 second
max-delay = 60 seconds
}
advanced.retry-policy {
class = DefaultRetryPolicy
}
advanced.protocol {
version = V4
}
advanced.heartbeat {
interval = 30 seconds
timeout = 1 second
}
advanced.session-leak.threshold = 8
advanced.metadata.token-map.enabled = false
}

答案1

得分: 3

以下是已翻译的内容:

有两种情况下,驱动程序会报告 NoNodeAvailableException

  1. 节点不响应/不可用,驱动程序已将它们全部标记为不可用。
  2. 所提供的所有联系点都无效。

如果某些插入操作正在工作,但最终遇到 NoNodeAvailableException,这表明节点已经过载,最终变得不响应,因此驱动程序不再选择协调者,因为它们都被标记为 "down"。

如果所有请求都不起作用,这意味着无法访问或无法解析联系点,因此驱动程序无法连接到集群。祝好!

英文:

There are two scenarios where the driver would report NoNodeAvailableException:

  1. Nodes are unresponsive/unavailable and the driver has marked all of them as down.
  2. All the contact points provided are invalid.

If some inserts are working but eventually runs into NoNodeAvailableException, that indicates to me that the nodes are getting overloaded and eventually become unresponsive so the driver no longer picks a coordinator since they're all marked as "down".

If none of the requests work at all, it means that the contact points are unreachable or unresolvable so the driver can't connect to the cluster. Cheers!

答案2

得分: 1

The NoHostAvailableException是一个由开源驱动程序在重试可用主机后抛出的客户端异常。开源驱动程序封装了重试的根本原因,可能会让人感到困惑。

我建议首先通过设置这些CloudWatch指标来改善您的可观察性。您可以按照这个预构建的CloudFormation模板来开始,只需要几秒钟的时间。

这是一个用于使用CloudWatch监控Amazon Keyspaces的Keyspace和表指标的设置:https://github.com/aws-samples/amazon-keyspaces-cloudwatch-cloudformation-templates

您还可以使用这个助手项目中的以下示例来替换重试策略。此项目中的重试策略将尝试重试,或者抛出原始异常,这将消除NoHostAvailableException的发生,从而为您的应用程序提供更好的透明度。这是GitHub存储库的链接:https://github.com/aws-samples/amazon-keyspaces-java-driver-helpers

如果您正在使用私有VPC终端节点,您需要添加以下权限,以在system.peers表中启用更多条目。
Amazon Keyspaces刚刚宣布了新功能,将在与私有VPC终端节点建立会话时提供更多连接点。

关于Keyspaces如何通过AWS PrivateLink自动优化客户端连接以提高可用性、写入和读取的链接:https://aws.amazon.com/about-aws/whats-new/2021/07/amazon-keyspaces-for-apache-cassandra-now-automatically-optimi/

这个链接讨论了如何在接口VPC终端节点上使用Amazon Keyspaces:https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html。要启用这个新功能,您需要为DescribeNetworkInterfaces和DescribeVpcEndpoints提供额外的权限。

{
   "Version":"2012-10-17",
   "Statement":[
      {
         "Sid":"ListVPCEndpoints",
         "Effect":"Allow",
         "Action":[
            "ec2:DescribeNetworkInterfaces",
            "ec2:DescribeVpcEndpoints"
         ],
         "Resource":"*"
      }
   ]
}
英文:

The NoHostAvailableException is a client side exception thrown by the open source driver after it has retried available hosts. The open source driver encapsulated the root cause for retry, which can be confusing.

I suggest first improving you observability by setting up these CloudWatch metrics. You can follow this prebuild CloudFormation template to get started it only takes a few seconds.

Here is a set up for Keyspace & Table Metrics for Amazon Keyspaces using Cloud Watch:
https://github.com/aws-samples/amazon-keyspaces-cloudwatch-cloudformation-templates

You can also replace retry policy with the following examples found in this helper project. The retry policy in this project will either try or throw the original exception which will remove the occurrences of NoHostAvailableException this will provide you with better transparency to your application. Here's the like to the Github repo: https://github.com/aws-samples/amazon-keyspaces-java-driver-helpers

If you're using the private VPC endpoint you want to add the following permissions to enable more entries in the system.peers table.,
Amazon Keyspaces just announced new functionality that will provide more connection points when establishing a session with a private VPC endpoints.

Here is a link about how Keyspaces now automatically optimizes client connection made through AWS PrivateLink to improve availability and write and read: https://aws.amazon.com/about-aws/whats-new/2021/07/amazon-keyspaces-for-apache-cassandra-now-automatically-optimi/

This link that talks about Using Amazon Keypscaes with Interface VPC Endpoints: https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html . To enable this new functionality you will need to provide additional permissions to DescribeNetworkInterfaces and DescribeVpcEndpoints.

  {
&quot;Version&quot;:&quot;2012-10-17&quot;,
&quot;Statement&quot;:[
{
&quot;Sid&quot;:&quot;ListVPCEndpoints&quot;,
&quot;Effect&quot;:&quot;Allow&quot;,
&quot;Action&quot;:[
&quot;ec2:DescribeNetworkInterfaces&quot;,
&quot;ec2:DescribeVpcEndpoints&quot;
],
&quot;Resource&quot;:&quot;*&quot;
}
]
}

答案3

得分: 0

我怀疑这个代码段:

.withLocalDatacenter(AppUtil.getProperty("cassandra.localdatacenter"))

会返回一个数据中心名称,该名称与键空间复制定义或配置的数据中心名称不匹配:

nodetool status | grep Datacenter

基本上,如果你的连接是使用不存在的本地数据中心来定义的,它仍将尝试在该数据中心中进行读写操作。这将失败,因为它显然无法在不存在的数据中心中找到节点。

类似的问题在这里可以找到:https://stackoverflow.com/questions/41879216/nohostavailable-error-in-cqlsh-console

英文:

I suspect that this:

.withLocalDatacenter(AppUtil.getProperty(&quot;cassandra.localdatacenter&quot;))

Pulls back a data center name which either does not match the keyspace replication definition or the configured data center name:

nodetool status | grep Datacenter

Basically, if your connection is defined with a local data center which does not exist, it will still try to read/write with replicas in that data center. This will fail, because it obviously cannot find nodes in a non-existent data center.

Similar question here: https://stackoverflow.com/questions/41879216/nohostavailable-error-in-cqlsh-console

huangapple
  • 本文由 发表于 2020年9月22日 11:17:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/64002599.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定