有没有类似于迭代器的东西,但带有类似流(Streams)的函数?

huangapple go评论68阅读模式
英文:

Is there something like an Iterator, but with functions like Streams?

问题

以下是您要翻译的内容:

所以基本上我试图做的是以下几点:

  1. 从数据库加载数据批次
  2. 将这些数据(Object[] 查询结果)映射到表示可读格式数据的类
  3. 写入文件
  4. 重复步骤,直到查询不再返回结果

我列出了我熟悉的结构,似乎适合我的需求,以及它们为什么不适合我的需求。

  • 迭代器(Iterator)→ 没有在不调用 next() 的情况下进行映射和过滤的选项
    • 我需要在子类中定义映射函数,但实际上没有数据(类似于流),以便我可以将“流”传递给调用类,并仅在那里调用 next,然后作为结果调用所有映射函数。
  • 流(Stream)→ 必须在映射和过滤之前提供所有数据
  • 可观察对象(Observable)→ 一旦数据可用就会发送数据。但我需要同步处理它

为了更好地理解我的尝试,我制作了一个小示例:

// 免责声明:“Something”是我现在不确定的结构。
// 可能是迭代器或其他适合的东西(这是问题)
public class Orchestrator {
    @Inject
    private DataGetter dataGetter;

    public void doWork() {
        FileWriter writer = new FileWriter("filename");

        // 将格式化后的数据写入文件
        dataGetter.getData()
            .forEach(data -> writer.writeToFile(data));
    }
}

public class FileWriter {
    public void writeToFile(List<Thing> data) {
        // 写入文件
    }
}

public class DataGetter {
    @Inject
    private ThingDao thingDao;

    public Something<List<Thing>> getData() {

        // 将数据映射到正确的格式并返回
        return thingDao.getThings()
            .map(partialResult -> /* 映射到对象 */);
    }
}

public class ThingDao {

    public Something<List<Object[]>> getThings() {
        Query q = ...;
        // 不知道该返回什么
    }
}

我到目前为止已经做了些什么:

我尝试从迭代器的基础开始,因为它是唯一真正满足我的内存要求的选项。然后,我添加了一些用于映射和循环遍历数据的方法。它并不是一个非常健壮的设计,比我想象的要困难,因此我想知道是否已经有什么东西可以满足我的需求。

public class QIterator<E> implements Iterator<List<E>> {
	public static String QUERY_OFFSET = "queryOffset";
	public static String QUERY_LIMIT = "queryLimit";

	private Query query;

	private long lastResultIndex = 0;
	private long batchSize;

	private Function<List<Object>, List<E>> mapper;

	public QIterator(Query query, long batchSize) {
		this.query = query;
		this.batchSize = batchSize;
	}

	public QIterator(Query query, long batchSize, Function<List<Object>, List<E>> mapper) {
		this(query, batchSize);
		this.mapper = mapper;
	}

	@Override
	public boolean hasNext() {
		return lastResultIndex % batchSize == 0;
	}

	@Override
	public List<E> next() {
        query.setParameter(QueryIterator.QUERY_OFFSET, lastResultIndex);
        query.setParameter(QueryIterator.QUERY_LIMIT, batchSize);

        List<Object> result = (List<Object>) query.getResultList(); // unchecked
        lastResultIndex += result.size();

        List<E> mappedResult;
        if (mapper != null) {
            mappedResult = mapper.apply(result);
        } else {
            mappedResult = (List<E>) result; // unchecked
        }

        return mappedResult;
	}

	public <R> QIterator<R> map(Function<List<E>, List<R>> appendingMapper) {
		return new QIterator<>(query, batchSize, (data) -> {
			if (this.mapper != null) {
				return appendingMapper.apply(this.mapper.apply(data));
			} else {
				return appendingMapper.apply((List<E>) data);
			}
		});
	}

	public void forEach(BiConsumer<List<E>, Integer> consumer) {
		for (int i = 0; this.hasNext(); i++) {
			consumer.accept(this.next(), i);
		}
	}
}

目前这个代码可以工作,但有一些未经检查的赋值,我不太喜欢,而且我希望有能力将一个 QIterator“追加”到另一个 QIterator,这本身并不困难,但它还应该考虑在追加之后跟随的映射。

英文:

So basically what I am trying to do is the following:

  1. Load Batch of Data from the Database
  2. Map that data (Object[] query result) to a class representing the data in a readable format
  3. Write to File
  4. Repeat until query gets no more results

I listed the structures that I am familiar with that seem to fit the need and why they don't fit my needs.

  • Iterator → Has no option to map and filter without calling next()
    • I need to define the map function in a subclass though without actually having the data (similar to a stream), so that I can pass the "Stream" way up to a calling class and only there call next, which then calls all the map functions as a result
  • Stream → All data needs to be available before mapping and filtering is possible
  • Observable → Sends data as soon as it comes available. I need to process it in sync though

To get more of a feeling what I am trying to do, I made a small example:

// Disclaimer: &quot;Something&quot; is the structure I am not sure of now. 
// Could be an Iterator or something else that fits (Thats the question)
public class Orchestrator {
    @Inject
    private DataGetter dataGetter;

    public void doWork() {
        FileWriter writer = new FileWriter(&quot;filename&quot;);

        // Write the formatted data to the file
        dataGetter.getData()
            .forEach(data -&gt; writer.writeToFile(data));
    }
}

public class FileWriter {
    public void writeToFile(List&lt;Thing&gt; data) {
        // Write to file
    }
}

public class DataGetter {
    @Inject
    private ThingDao thingDao;

    public Something&lt;List&lt;Thing&gt;&gt; getData() {

        // Map data to the correct format and return that
        return thingDao.getThings()
            .map(partialResult -&gt; /* map to object */);
    }
}

public class ThingDao {

    public Something&lt;List&lt;Object[]&gt;&gt; getThings() {
        Query q = ...;
        // Dont know what to return
    }
}

What I have got so far:

I tried to go from the base of an Iterator, because it's the only one that really fulfills my memory requirements. Then I have added some methods to map and loop over the data. It's not really a robust design though and it's going to be harder than I thought, so I wanted to know if there is anything out there already that does what I need.

public class QIterator&lt;E&gt; implements Iterator&lt;List&lt;E&gt;&gt; {
	public static String QUERY_OFFSET = &quot;queryOffset&quot;;
	public static String QUERY_LIMIT = &quot;queryLimit&quot;;

	private Query query;

	private long lastResultIndex = 0;
	private long batchSize;

	private Function&lt;List&lt;Object&gt;, List&lt;E&gt;&gt; mapper;

	public QIterator(Query query, long batchSize) {
		this.query = query;
		this.batchSize = batchSize;
	}

	public QIterator(Query query, long batchSize, Function&lt;List&lt;Object&gt;, List&lt;E&gt;&gt; mapper) {
		this(query, batchSize);
		this.mapper = mapper;
	}

	@Override
	public boolean hasNext() {
		return lastResultIndex % batchSize == 0;
	}

	@Override
	public List&lt;E&gt; next() {
        query.setParameter(QueryIterator.QUERY_OFFSET, lastResultIndex);
        query.setParameter(QueryIterator.QUERY_LIMIT, batchSize);

        List&lt;Object&gt; result = (List&lt;Object&gt;) query.getResultList(); // unchecked
        lastResultIndex += result.size();

        List&lt;E&gt; mappedResult;
        if (mapper != null) {
            mappedResult = mapper.apply(result);
        } else {
            mappedResult = (List&lt;E&gt;) result; // unchecked
        }

        return mappedResult;
	}

	public &lt;R&gt; QIterator&lt;R&gt; map(Function&lt;List&lt;E&gt;, List&lt;R&gt;&gt; appendingMapper) {
		return new QIterator&lt;&gt;(query, batchSize, (data) -&gt; {
			if (this.mapper != null) {
				return appendingMapper.apply(this.mapper.apply(data));
			} else {
				return appendingMapper.apply((List&lt;E&gt;) data);
			}
		});
	}

	public void forEach(BiConsumer&lt;List&lt;E&gt;, Integer&gt; consumer) {
		for (int i = 0; this.hasNext(); i++) {
			consumer.accept(this.next(), i);
		}
	}
}

This works so far, but has some unchecked assignments which I do not really like and also I would like to have the ability to "append" one QIterator to another which is not hard by itself, but it should also take the maps that follow after the append.

答案1

得分: 2

假设你有一个提供分页数据的DAO,例如通过对底层SQL应用LIMITOFFSET子句来实现。这样的DAO类将有一个接受这些值作为参数的方法,即该方法将符合以下功能方法:

@FunctionalInterface
public interface PagedDao<T> {
List<T> getData(int offset, int limit);
}

例如,调用getData(0, 20)将返回前20行(第一页),调用getData(60, 20)将返回第4页的20行。如果该方法返回少于20行,则意味着我们得到了最后一页。请求超过最后一行的数据将返回一个空列表。

对于下面的演示,我们可以模拟这样一个DAO类:

public class MockDao {
private final int rowCount;
public MockDao(int rowCount) {
this.rowCount = rowCount;
}
public List<SimpleRow> getSimpleRows(int offset, int limit) {
System.out.println("DEBUG: getData(" + offset + ", " + limit + ")");
if (offset < 0 || limit <= 0)
throw new IllegalArgumentException();
List<SimpleRow> data = new ArrayList<>();
for (int i = 0, rowNo = offset + 1; i < limit && rowNo <= this.rowCount; i++, rowNo++)
data.add(new SimpleRow("Row #" + rowNo));
System.out.println("DEBUG:   data = " + data);
return data;
}
}
public class SimpleRow {
private final String data;
public SimpleRow(String data) {
this.data = data;
}
@Override
public String toString() {
return "Row[data=" + this.data + "]";
}
}

如果您希望从该方法生成一系列行,以特定大小的块流式处理所有行,我们需要为此创建一个Spliterator,以便我们可以使用StreamSupport.stream(Spliterator<T> spliterator, boolean parallel)创建一个流。

下面是这样一个Spliterator的实现:

public class PagedDaoSpliterator<T> implements Spliterator<T> {
private final PagedDao<T> dao;
private final int blockSize;
private int nextOffset;
private List<T> data;
private int dataIdx;
public PagedDaoSpliterator(PagedDao<T> dao, int blockSize) {
if (blockSize <= 0)
throw new IllegalArgumentException();
this.dao = Objects.requireNonNull(dao);
this.blockSize = blockSize;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (this.data == null) {
if (this.nextOffset == -1/*At end*/)
return false; // Already at end
this.data = this.dao.getData(this.nextOffset, this.blockSize);
this.dataIdx = 0;
if (this.data.size() < this.blockSize)
this.nextOffset = -1/*At end, after this data*/;
else
this.nextOffset += data.size();
if (this.data.isEmpty()) {
this.data = null;
return false; // At end
}
}
action.accept(this.data.get(this.dataIdx++));
if (this.dataIdx == this.data.size())
this.data = null;
return true;
}
@Override
public Spliterator<T> trySplit() {
return null; // Parallel processing not supported
}
@Override
public long estimateSize() {
return Long.MAX_VALUE; // Unknown
}
@Override
public int characteristics() {
return ORDERED | NONNULL;
}
}

现在我们可以使用上面的模拟DAO进行测试:

MockDao dao = new MockDao(13);
Stream<SimpleRow> stream = StreamSupport.stream(
new PagedDaoSpliterator<>(dao::getSimpleRows, 5), /*parallel*/false);
stream.forEach(System.out::println);

输出

DEBUG: getData(0, 5)
DEBUG:   data = [Row[data=Row #1], Row[data=Row #2], Row[data=Row #3], Row[data=Row #4], Row[data=Row #5]]
Row[data=Row #1]
Row[data=Row #2]
Row[data=Row #3]
Row[data=Row #4]
Row[data=Row #5]
DEBUG: getData(5, 5)
DEBUG:   data = [Row[data=Row #6], Row[data=Row #7], Row[data=Row #8], Row[data=Row #9], Row[data=Row #10]]
Row[data=Row #6]
Row[data=Row #7]
Row[data=Row #8]
Row[data=Row #9]
Row[data=Row #10]
DEBUG: getData(10, 5)
DEBUG:   data = [Row[data=Row #11], Row[data=Row #12], Row[data=Row #13]]
Row[data=Row #11]
Row[data=Row #12]
Row[data=Row #13]

如上所示,我们获得了13行数据,以5行一块从数据库中检索出来。

数据直到需要时才从数据库中检索出来,从而产生较低的内存占用,这取决于块大小和流操作不会缓存数据。

英文:

Assume you have a DAO that provides data in a paginated manner, e.g. by applying the LIMIT and OFFSET clauses to the underlying SQL. Such a DAO class would have a method that takes those values as argument, i.e. the method would conform to the following functional method:

@FunctionalInterface
public interface PagedDao&lt;T&gt; {
List&lt;T&gt; getData(int offset, int limit);
}

E.g. calling getData(0, 20) would return the first 20 rows (page 1), calling getData(60, 20) would return the 20 rows on page 4. If the method returns less than 20 rows, it means we got the last page. Asking for data after the last row will return an empty list.

For the demo below, we can mock such a DAO class:

public class MockDao {
private final int rowCount;
public MockDao(int rowCount) {
this.rowCount = rowCount;
}
public List&lt;SimpleRow&gt; getSimpleRows(int offset, int limit) {
System.out.println(&quot;DEBUG: getData(&quot; + offset + &quot;, &quot; + limit + &quot;)&quot;);
if (offset &lt; 0 || limit &lt;= 0)
throw new IllegalArgumentException();
List&lt;SimpleRow&gt; data = new ArrayList&lt;&gt;();
for (int i = 0, rowNo = offset + 1; i &lt; limit &amp;&amp; rowNo &lt;= this.rowCount; i++, rowNo++)
data.add(new SimpleRow(&quot;Row #&quot; + rowNo));
System.out.println(&quot;DEBUG:   data = &quot; + data);
return data;
}
}
public class SimpleRow {
private final String data;
public SimpleRow(String data) {
this.data = data;
}
@Override
public String toString() {
return &quot;Row[data=&quot; + this.data + &quot;]&quot;;
}
}

If you then want to generate a Stream of rows from that method, streaming all rows in blocks of a certain size, we need a Spliterator for that, so we can use StreamSupport.stream(Spliterator&lt;T&gt; spliterator, boolean parallel) to create a stream.

Here is an implementation of such a Spliterator:

public class PagedDaoSpliterator&lt;T&gt; implements Spliterator&lt;T&gt; {
private final PagedDao&lt;T&gt; dao;
private final int blockSize;
private int nextOffset;
private List&lt;T&gt; data;
private int dataIdx;
public PagedDaoSpliterator(PagedDao&lt;T&gt; dao, int blockSize) {
if (blockSize &lt;= 0)
throw new IllegalArgumentException();
this.dao = Objects.requireNonNull(dao);
this.blockSize = blockSize;
}
@Override
public boolean tryAdvance(Consumer&lt;? super T&gt; action) {
if (this.data == null) {
if (this.nextOffset == -1/*At end*/)
return false; // Already at end
this.data = this.dao.getData(this.nextOffset, this.blockSize);
this.dataIdx = 0;
if (this.data.size() &lt; this.blockSize)
this.nextOffset = -1/*At end, after this data*/;
else
this.nextOffset += data.size();
if (this.data.isEmpty()) {
this.data = null;
return false; // At end
}
}
action.accept(this.data.get(this.dataIdx++));
if (this.dataIdx == this.data.size())
this.data = null;
return true;
}
@Override
public Spliterator&lt;T&gt; trySplit() {
return null; // Parallel processing not supported
}
@Override
public long estimateSize() {
return Long.MAX_VALUE; // Unknown
}
@Override
public int characteristics() {
return ORDERED | NONNULL;
}
}

We can now test that using the mock DAO above:

MockDao dao = new MockDao(13);
Stream&lt;SimpleRow&gt; stream = StreamSupport.stream(
new PagedDaoSpliterator&lt;&gt;(dao::getSimpleRows, 5), /*parallel*/false);
stream.forEach(System.out::println);

Output

DEBUG: getData(0, 5)
DEBUG:   data = [Row[data=Row #1], Row[data=Row #2], Row[data=Row #3], Row[data=Row #4], Row[data=Row #5]]
Row[data=Row #1]
Row[data=Row #2]
Row[data=Row #3]
Row[data=Row #4]
Row[data=Row #5]
DEBUG: getData(5, 5)
DEBUG:   data = [Row[data=Row #6], Row[data=Row #7], Row[data=Row #8], Row[data=Row #9], Row[data=Row #10]]
Row[data=Row #6]
Row[data=Row #7]
Row[data=Row #8]
Row[data=Row #9]
Row[data=Row #10]
DEBUG: getData(10, 5)
DEBUG:   data = [Row[data=Row #11], Row[data=Row #12], Row[data=Row #13]]
Row[data=Row #11]
Row[data=Row #12]
Row[data=Row #13]

As can be seen, we get 13 rows of data, retrieved from the database in blocks of 5 rows.

The data is not retrieved from the database until it is needed, causing low memory footprint, depending on block size and the stream operation not caching the data.

答案2

得分: 1

你可以这样一行代码实现:

stmt = con.createStatement();
ResultSet rs = stmt.executeQuery(queryThatReturnsAllRowsOrdered);
Stream.generate(() -> rs.next() ? map(rs) : null)
      .takeWhile(Objects::nonNull)
      .filter(<某个条件>)
      .forEach(<某个操作>);

这将在从查询返回第一行时开始处理,并在数据库中并行进行,直到读取完所有行为止。

这种方法一次只在内存中保存一行数据,通过仅运行1次查询来最小化对数据库的负载。

ResultSet进行映射比从Object[]进行映射更加简单自然,因为你可以通过名称访问列,并使用正确类型的值,例如:

MyDao map(ResultSet rs) {
    try {
        String someStr = rs.getString("COLUMN_X");
        int someInt = rs.getInt("COLUMN_Y");
        return new MyDao(someStr, someInt);
    } catch (SQLException e) {
        throw new RuntimeException(e);
    }
}
英文:

You can do it in one line as follows:

stmt = con.createStatement();
ResultSet rs = stmt.executeQuery(queryThatReturnsAllRowsOrdered);
Stream.generate(rs.next() ? map(rs) : null)
.takeWhile(Objects::nonNull)
.filter(&lt;some predicate&gt;)
.forEach(&lt;some operation);

This starts processing when the first row is returned from the query and continues in parallel with the database until all rows have been read.

This approach only has one row in memory at a time, and minimises the load on the database by only running 1 query.

Mapping from a ResultSet is far more easy and natural than mapping from Object[] because you can access columns by name and with properly typed values, eg:

MyDao map(ResultSet rs) {
try {
String someStr = rs.getString(&quot;COLUMN_X&quot;);
int someInt = rs.getInt(&quot;COLUMN_Y&quot;):
return new MyDao(someStr, someInt);
} catch (SQLException e ) {
throw new RuntimeException(e);
}
}

huangapple
  • 本文由 发表于 2020年8月25日 17:38:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/63576056.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定