Spring WebFlux消费者到Sink

huangapple go评论85阅读模式
英文:

Spring WebFlux consumer to sink

问题

这里是一个简单的Spring Boot应用程序:

@SpringBootApplication
@RestController
public class ReactiveApplication {
    
    static Flux<String> fluxString;
    static volatile Queue<String> queue = new ConcurrentLinkedQueueProxy();
    
    private static class ConcurrentLinkedQueueProxy extends ConcurrentLinkedQueue<String> {
        private static final long serialVersionUID = 1L;

        @Override
        public boolean add(String e) {
            synchronized (this) {
                notify();
            }
            return super.add(e);
        }
        
        @Override
        public String poll() {
            synchronized (this) {
                if(isEmpty()) {
                    try {
                        wait();
                    } catch (InterruptedException ex) {}
                }
            }
            return super.peek() == null ? "" : super.poll();
        }
    }
    
    static Consumer<String> consumer = str -> queue.add(str);

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(ReactiveApplication.class, args);
    }
    
    static {
        for(int i = 0; i < 10; i++)
            queue.add("testData " + i + " ");
    }
    
    @GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<String> home() {
        
        Scheduler sch = Schedulers.newParallel("parallel-sch", 1);
        List<String> list = new ArrayList<>(queue);
        queue.removeAll(queue);
        
        fluxString = Flux.<String>create(sink -> {
            sink.onRequest(n -> {
                for(int i = 0; i < n; i++) {
                    sink.next(queue.poll());
                }
            }).onCancel(() -> sch.dispose());
        }).log().subscribeOn(sch).mergeWith(Flux.<String>fromIterable(list));
        
        return fluxString;
        
    }
    
    @GetMapping("/add")
    public String add(@RequestParam String s) {
        consumer.accept(s);
        return s;
    }
}

因此,这个应用程序创建了一个字符串流。访问/将获取队列中存在的所有字符串,然后将从/add资源添加的任何内容合并在一起(忽略"Safe Methods Must be Idempotent"部分)。

我感到奇怪的是,当我将public static void main(...)移动到第一行时,应用程序开始表现异常,而向/add添加新值没有任何效果。我认为可能有一些有趣的事情正在发生,导致应用程序的异常行为。是否有任何解释呢?

英文:

Here is a simple spring boot application:

@SpringBootApplication
@RestController
public class ReactiveApplication {
static Flux&lt;String&gt; fluxString;
static volatile Queue&lt;String&gt; queue = new ConcurrentLinkedQueueProxy();
private static class ConcurrentLinkedQueueProxy extends ConcurrentLinkedQueue&lt;String&gt; {
private static final long serialVersionUID = 1L;
@Override
public boolean add(String e) {
synchronized (this) {
notify();
}
return super.add(e);
}
@Override
public String poll() {
synchronized (this) {
if(isEmpty()) {
try {
wait();
} catch (InterruptedException ex) {}
}
}
return super.peek() == null ? &quot;&quot; : super.poll();
}
}
static Consumer&lt;String&gt; consumer = str -&gt; queue.add(str);
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ReactiveApplication.class, args);
}
static {
for(int i = 0; i &lt; 10; i++)
queue.add(&quot;testData &quot; + i + &quot; &quot;);
}
@GetMapping(value = &quot;/&quot;, produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux&lt;String&gt; home() {
Scheduler sch = Schedulers.newParallel(&quot;parallel-sch&quot;, 1);
List&lt;String&gt; list = new ArrayList&lt;&gt;(queue);
queue.removeAll(queue);
fluxString = Flux.&lt;String&gt;create(sink -&gt; {
sink.onRequest(n -&gt; {
for(int i = 0; i &lt; n; i++) {
sink.next(queue.poll());
}
}).onCancel(() -&gt; sch.dispose());
}).log().subscribeOn(sch).mergeWith(Flux.&lt;String&gt;fromIterable(list));
return fluxString;
}
@GetMapping(&quot;/add&quot;)
public String add( @RequestParam String s) {
consumer.accept(s);
return s;
}
}

So basically this application creates a String stream. Visiting / will grab all the string present queue and then merge anything that is added from /add resource(ignore the "Safe Methods Must be Idempotent" thing).

What I feel is strange is that when I move public static void main(...) to line 1, the application starts to misbehave and adding new values to /add doesn't have any effect. I think there must be something interesting going on that is making application misbehave. Any explaination?

答案1

得分: 0

我最终使用了这段代码它运行得很好

@SpringBootApplication
@RestController
public class ReactiveApplication {

    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(1000);
    private static Consumer<String> consumer = str -> {
        try { queue.put(str); }
        catch (InterruptedException e) {}
    };
    static {
        for (int i = 0; i < 10; i++) queue.add("testData " + i + " ");
    }

    public static void main(String[] args) {
        SpringApplication.run(ReactiveApplication.class, args);
    }

    @GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<String> home() {

        final Scheduler sch = Schedulers.newSingle("async-flux");

        return Flux.<String>generate(sink -> {
            try { sink.next(queue.take()); }
            catch (InterruptedException e) { }
        }).log().subscribeOn(sch);

    }

    @GetMapping("/add")
    public String add(@RequestParam String s) {
        consumer.accept(s);
        return s;
    }

}
英文:

I ended up using this which works great:

@SpringBootApplication
@RestController
public class ReactiveApplication {
private static BlockingQueue&lt;String&gt; queue = new ArrayBlockingQueue&lt;&gt;(1000);
private static Consumer&lt;String&gt; consumer = str -&gt; {
try { queue.put(str); }
catch (InterruptedException e) {}
};
static {
for (int i = 0; i &lt; 10; i++) queue.add(&quot;testData &quot; + i + &quot; &quot;);
}
public static void main(String[] args) {
SpringApplication.run(ReactiveApplication.class, args);
}
@GetMapping(value = &quot;/&quot;, produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux&lt;String&gt; home() {
final Scheduler sch = Schedulers.newSingle(&quot;async-flux&quot;);
return Flux.&lt;String&gt;generate(sink -&gt; {
try { sink.next(queue.take()); }
catch (InterruptedException e) { }
}).log().subscribeOn(sch);
}
@GetMapping(&quot;/add&quot;)
public String add(@RequestParam String s) {
consumer.accept(s);
return s;
}
}

huangapple
  • 本文由 发表于 2020年5月3日 21:47:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/61575514.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定