批量消费者丢弃元素

huangapple go评论64阅读模式
英文:

Batch Consumer Dropping Elements

问题

我有一个BatchConsumer,它接受一些项目,然后调用回调函数。

public class BatchConsumer<T> {

  private List<T> storage = new ArrayList<>();

  private final Consumer<List<T>> callback;
  private final int batchSize;
  private final ExecutorService executor;

  public BatchConsumer(int batchSize, Consumer<List<T>> batchConsumer, ExecutorService executor) {
    this.batchSize = batchSize;
    this.callback = batchConsumer;
    this.executor = executor;
  }

  /**
   * 插入一个对象。当插入了 {@link BatchConsumer#batchSize} 个对象后,将在执行器内调用回调函数。
   *
   * @param object
   */
  public synchronized void insert(T object) {
    storage.add(object);

    if (storage.size() == batchSize) {
      executor.submit(
          new Runnable() {

            @Override
            public void run() {
              callback.accept(Collections.unmodifiableList(storage));
            }
          });

      storage = new ArrayList<>();
    }
  }
}

我的问题是,在batchConsumer回调函数内检查传递的storage对象时,它从未包含batchSize个元素。通常它的元素数量在应有的数量(batchSize)的 1/3 到 2/3 之间。

我认为这只是一个我忽略的简单并发问题。有人可以指点我正确的方向吗?

英文:

I have a BatchConsumer that takes a number of items then invokes a callback.

public class BatchConsumer&lt;T&gt; {

  private List&lt;T&gt; storage = new ArrayList&lt;&gt;();

  private final Consumer&lt;List&lt;T&gt;&gt; callback;
  private final int batchSize;
  private final ExecutorService executor;

  public BatchConsumer(int batchSize, Consumer&lt;List&lt;T&gt;&gt; batchConsumer, ExecutorService executor) {
    this.batchSize = batchSize;
    this.callback = batchConsumer;
    this.executor = executor;
  }

  /**
   * Inserts an object. When {@link BatchConsumer#batchSize} objects have been inserted, the
   * callback will be invoked on the batch inside the executor.
   *
   * @param object
   */
  public synchronized void insert(T object) {
    storage.add(object);

    if (storage.size() == batchSize) {
      executor.submit(
          new Runnable() {

            @Override
            public void run() {
              callback.accept(Collections.unmodifiableList(storage));
            }
          });

      storage = new ArrayList&lt;&gt;();
    }
  }
}

My problem is that when I inspect the passed storage object inside the batchConsumer callback it never has batchSize elements. Typically it has between 1/3-2/3'rds as many as it should.

I think this is a simply concurrency issue that I am missing. Could someone point me in the right direction?

答案1

得分: 1

你并没有丢弃元素,而是完全丢弃了该列表。匿名内部类正在引用外部类成员变量。您对该变量所做的任何更改都会在内部类中可见。在提交内部类之后,您更改了 storage,然后当内部类运行时,它会看到修改后的 storage 版本。

在提交之前声明一个 final 变量,将其赋值给 storage,并在内部类中使用该 final 变量,以便您可以使用所需的 storage 版本。

英文:

You are not dropping elements, you are dropping the list completely. The anonymous inner class is referencing the enclosing class member variable. Any changes you make to that variable is visible in the inner class. After you submit the inner class, you change storage, and then when the inner class runs, it sees the modified version of the storage.

Declare a final variable before submitting, assign it to storage, and use that final variable in the inner class so you can use the version of storage you need.

huangapple
  • 本文由 发表于 2020年7月28日 21:51:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/63135740.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定