Datastax Java驱动程序自定义重试策略

huangapple go评论75阅读模式
英文:

Datastax Java Driver Custom Retry Policy

问题

我已经编写了一个自定义的重试策略类,我可以在其中传递重试次数,驱动程序将在发生onWriteTimeout/onUnavilable/onReadTimeout时执行重试。

public class CustomRetryPolicy implements RetryPolicy {

  private static final Logger LOG = LoggerFactory.getLogger(CustomRetryPolicy.class);

  @VisibleForTesting
  public static final String RETRYING_ON_READ_TIMEOUT =
      "[{}] Retrying on read timeout on same host (consistency: {}, required responses: {}, "
          + "received responses: {}, data retrieved: {}, retries: {})";

  // ...(其他日志常量)

  private final int readAttempts;
  private final int writeAttempts;
  private final int unavailableAttempts;

  public CustomRetryPolicy(int readAttempts, int writeAttempts, int unavailableAttempts) {
    this.readAttempts = readAttempts;
    this.writeAttempts = writeAttempts;
    this.unavailableAttempts = unavailableAttempts;
  }

  @Override
  public RetryDecision onReadTimeout(Request request, ConsistencyLevel cl, int blockFor,
      int received, boolean dataPresent, int retryCount) {
    // ...(方法实现)

    return decision;
  }

  // ...(其他方法实现)

  @Override
  public void close() {
    // ...(关闭方法实现)
  }
}

我正在使用DataStax Java Driver 4.6.0。但问题是我无法将该类的对象传递给CQLSessionBuilder,而在旧版本的驱动程序中可以通过以下方式实现:

RetryPolicy rc = new CustomRetryPolicy(3, 3, 2);
Cluster cluster = Cluster.builder().addContactPoint("192.168.0.0").withRetryPolicy(rc).build();

但在较旧的版本中,我尝试了DriverConfigLoader,但只有通过自定义类名来传递的选项。

请问您有什么建议?

英文:

I have written a custom retry policy class where I can pass no of retries driver will perform onWriteTimeout/onUnavilable/onReadTimeout.

public class CustomRetryPolicy implements RetryPolicy {


  private static final Logger LOG = LoggerFactory.getLogger(CustomRetryPolicy.class);

  @VisibleForTesting
  public static final String RETRYING_ON_READ_TIMEOUT =
      "[{}] Retrying on read timeout on same host (consistency: {}, required responses: {}, "
          + "received responses: {}, data retrieved: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_WRITE_TIMEOUT =
      "[{}] Retrying on write timeout on same host (consistency: {}, write type: {}, "
          + "required acknowledgments: {}, received acknowledgments: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_UNAVAILABLE =
      "[{}] Retrying on unavailable exception on next host (consistency: {}, "
          + "required replica: {}, alive replica: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_ABORTED =
      "[{}] Retrying on aborted request on next host (retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_ERROR =
      "[{}] Retrying on node error on next host (retries: {})";

  private static final String LOG_PREFIX = "DATASTORE-CASSANDRA";

  private final int readAttempts;
  private final int writeAttempts;
  private final int unavailableAttempts;

  public CustomRetryPolicy(int readAttempts, int writeAttempts, int unavailableAttempts) {
    this.readAttempts = readAttempts;
    this.writeAttempts = writeAttempts;
    this.unavailableAttempts = unavailableAttempts;
  }

  @Override
  public RetryDecision onReadTimeout(Request request, ConsistencyLevel cl, int blockFor,
      int received, boolean dataPresent, int retryCount) {


    RetryDecision decision = (retryCount < readAttempts && received >= blockFor && !dataPresent)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_READ_TIMEOUT, LOG_PREFIX, cl, blockFor, received, false, retryCount);
    }

    return decision;
  }



  @Override
  public RetryDecision onWriteTimeout(Request request, ConsistencyLevel cl, WriteType writeType,
      int blockFor, int received, int retryCount) {
    RetryDecision decision = (retryCount < writeAttempts && writeType == DefaultWriteType.BATCH_LOG)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_WRITE_TIMEOUT, LOG_PREFIX, cl, writeType, blockFor, received,
          retryCount);
    }
    return decision;
  }

  @Override
  public RetryDecision onUnavailable(Request request, ConsistencyLevel cl, int required, int alive,
      int retryCount) {
    RetryDecision decision =
        (retryCount < unavailableAttempts) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_UNAVAILABLE, LOG_PREFIX, cl, required, alive, retryCount);
    }

    return decision;
  }

  @Override
  public RetryDecision onRequestAborted(Request request, Throwable error, int retryCount) {
    RetryDecision decision =
        (error instanceof ClosedConnectionException || error instanceof HeartbeatException)
            ? RetryDecision.RETRY_NEXT
            : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_ABORTED, LOG_PREFIX, retryCount, error);
    }

    return decision;
  }

  @Override
  public RetryDecision onErrorResponse(Request request, CoordinatorException error,
      int retryCount) {
    RetryDecision decision =
        (error instanceof ReadFailureException || error instanceof WriteFailureException)
            ? RetryDecision.RETHROW
            : RetryDecision.RETRY_NEXT;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_ERROR, LOG_PREFIX, retryCount, error);
    }

    return decision;
  }

  @Override
  public void close() {

    // Nothing to do

  }



}

I am using datastax java driver 4.6.0.
But the problem is I cant pass object of this class with CQLSessionBuilder,which is possible via like

RetryPolicy rc = new CustomRetryPolicy(3, 3, 2);
Cluster cluster = Cluster.builder().addContactPoint("192.168.0.0").withRetryPolicy(rc).build();

in older versions of driver. I have tried with DriverConfigLoader but there is only option to pass the custom class name.

Could you please suggest .

答案1

得分: 3

如果您查看DefaultRetryPolicy的实现,以及CustomRetryPolicy的示例,您会发现两者都接收了2个参数:类型为DriverContextcontext和包含配置文件名的字符串。然后,您可以使用context通过getConfig调用来获取DriverConfig,然后在配置上使用getProfile来获取所需的用于自定义策略的配置值 - 您可以将自己的配置值放入配置文件中,并在重试策略内部使用,类似于这样:

datastax-java-driver {
  advanced.retry-policy {
    class = DefaultRetryPolicy
  }
  profiles {
    custom-retries {
      advanced.retry-policy {
        class = CustomRetryPolicy
        custom-policy {
           read-attempts = 3
           write-attempts = 2
           ...
        }
      }
    }
  }
}
英文:

If you look to implementation of DefaultRetryPolicy, and example of CustomRetryPolicy, you'll see that both are receiving 2 parameters: context of type DriverContext, and string with profile name. And then you should be able to use context to get DriverConfig via getConfig call, and then use getProfile on config to pull configuration values that are required for your custom policy - you can put your own configuration values into configuration file and use it inside your retry policy, something like this:

datastax-java-driver {
  advanced.retry-policy {
    class = DefaultRetryPolicy
  }
  profiles {
    custom-retries {
      advanced.retry-policy {
        class = CustomRetryPolicy
        custom-policy {
           read-attempts = 3
           write-attempts = 2
           ...
        }
      }
    }
  }
}

huangapple
  • 本文由 发表于 2020年9月8日 16:21:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/63789917.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定