英文:
Spring WebFlux consumer to sink
问题
这里是一个简单的Spring Boot应用程序:
@SpringBootApplication
@RestController
public class ReactiveApplication {
static Flux<String> fluxString;
static volatile Queue<String> queue = new ConcurrentLinkedQueueProxy();
private static class ConcurrentLinkedQueueProxy extends ConcurrentLinkedQueue<String> {
private static final long serialVersionUID = 1L;
@Override
public boolean add(String e) {
synchronized (this) {
notify();
}
return super.add(e);
}
@Override
public String poll() {
synchronized (this) {
if(isEmpty()) {
try {
wait();
} catch (InterruptedException ex) {}
}
}
return super.peek() == null ? "" : super.poll();
}
}
static Consumer<String> consumer = str -> queue.add(str);
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ReactiveApplication.class, args);
}
static {
for(int i = 0; i < 10; i++)
queue.add("testData " + i + " ");
}
@GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> home() {
Scheduler sch = Schedulers.newParallel("parallel-sch", 1);
List<String> list = new ArrayList<>(queue);
queue.removeAll(queue);
fluxString = Flux.<String>create(sink -> {
sink.onRequest(n -> {
for(int i = 0; i < n; i++) {
sink.next(queue.poll());
}
}).onCancel(() -> sch.dispose());
}).log().subscribeOn(sch).mergeWith(Flux.<String>fromIterable(list));
return fluxString;
}
@GetMapping("/add")
public String add(@RequestParam String s) {
consumer.accept(s);
return s;
}
}
因此,这个应用程序创建了一个字符串流。访问/
将获取队列中存在的所有字符串,然后将从/add
资源添加的任何内容合并在一起(忽略"Safe Methods Must be Idempotent"部分)。
我感到奇怪的是,当我将public static void main(...)
移动到第一行时,应用程序开始表现异常,而向/add
添加新值没有任何效果。我认为可能有一些有趣的事情正在发生,导致应用程序的异常行为。是否有任何解释呢?
英文:
Here is a simple spring boot application:
@SpringBootApplication
@RestController
public class ReactiveApplication {
static Flux<String> fluxString;
static volatile Queue<String> queue = new ConcurrentLinkedQueueProxy();
private static class ConcurrentLinkedQueueProxy extends ConcurrentLinkedQueue<String> {
private static final long serialVersionUID = 1L;
@Override
public boolean add(String e) {
synchronized (this) {
notify();
}
return super.add(e);
}
@Override
public String poll() {
synchronized (this) {
if(isEmpty()) {
try {
wait();
} catch (InterruptedException ex) {}
}
}
return super.peek() == null ? "" : super.poll();
}
}
static Consumer<String> consumer = str -> queue.add(str);
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ReactiveApplication.class, args);
}
static {
for(int i = 0; i < 10; i++)
queue.add("testData " + i + " ");
}
@GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> home() {
Scheduler sch = Schedulers.newParallel("parallel-sch", 1);
List<String> list = new ArrayList<>(queue);
queue.removeAll(queue);
fluxString = Flux.<String>create(sink -> {
sink.onRequest(n -> {
for(int i = 0; i < n; i++) {
sink.next(queue.poll());
}
}).onCancel(() -> sch.dispose());
}).log().subscribeOn(sch).mergeWith(Flux.<String>fromIterable(list));
return fluxString;
}
@GetMapping("/add")
public String add( @RequestParam String s) {
consumer.accept(s);
return s;
}
}
So basically this application creates a String stream. Visiting /
will grab all the string present queue and then merge anything that is added from /add
resource(ignore the "Safe Methods Must be Idempotent" thing).
What I feel is strange is that when I move public static void main(...)
to line 1, the application starts to misbehave and adding new values to /add
doesn't have any effect. I think there must be something interesting going on that is making application misbehave. Any explaination?
答案1
得分: 0
我最终使用了这段代码,它运行得很好:
@SpringBootApplication
@RestController
public class ReactiveApplication {
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(1000);
private static Consumer<String> consumer = str -> {
try { queue.put(str); }
catch (InterruptedException e) {}
};
static {
for (int i = 0; i < 10; i++) queue.add("testData " + i + " ");
}
public static void main(String[] args) {
SpringApplication.run(ReactiveApplication.class, args);
}
@GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> home() {
final Scheduler sch = Schedulers.newSingle("async-flux");
return Flux.<String>generate(sink -> {
try { sink.next(queue.take()); }
catch (InterruptedException e) { }
}).log().subscribeOn(sch);
}
@GetMapping("/add")
public String add(@RequestParam String s) {
consumer.accept(s);
return s;
}
}
英文:
I ended up using this which works great:
@SpringBootApplication
@RestController
public class ReactiveApplication {
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(1000);
private static Consumer<String> consumer = str -> {
try { queue.put(str); }
catch (InterruptedException e) {}
};
static {
for (int i = 0; i < 10; i++) queue.add("testData " + i + " ");
}
public static void main(String[] args) {
SpringApplication.run(ReactiveApplication.class, args);
}
@GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> home() {
final Scheduler sch = Schedulers.newSingle("async-flux");
return Flux.<String>generate(sink -> {
try { sink.next(queue.take()); }
catch (InterruptedException e) { }
}).log().subscribeOn(sch);
}
@GetMapping("/add")
public String add(@RequestParam String s) {
consumer.accept(s);
return s;
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论