stateful service with lazy sequences and SSE — how to distribute with fault tolerance?

huangapple go评论64阅读模式
英文:

stateful service with lazy sequences and SSE -- how to distribute with fault tolerance?

问题

以下是翻译好的部分:

我编写了一个Web服务,使用Clojure中的惰性序列和各种无限级数公式(Euler、Leibniz)来生成Pi的估算值。Clojure服务通过Server-Sent Events通道发送这些估算值。目前,使用Vue.js的HTML/JS视图来消耗SSE事件并显示它们。

作为单节点服务,只要SSE通道的连接不关闭,它就可以正常运行得相当好。但是到目前为止,它不会持久化或备份减少的状态(无限级数中的位置),以便在连接关闭或服务失败时恢复。此外,由于状态包含在服务中的本地内存中(在Clojure序列值中),因此无法水平扩展,就像如果长期存储状态存储在Redis中一样。在这种情况下,只是添加新节点不会提供一种实际分割工作的方法 - 它只会复制相同的级数。使用Redis来卸载长期存储状态是我在无状态Web服务中习惯的设置,以简化水平扩展和容错策略。

在这种有状态的情况下,我有点不知道如何使用分布式、多节点解决方案来扩展Clojure服务,以便可以并行处理级数项。也许可以有一个分发“主”服务,将序列范围委派给不同的节点,同时从节点并发接收结果(通过Redis pub/sub),以数学方式聚合它们,并生成用于视图的结果SSE流?在这种情况下,主服务将使用大约为一千的无限级数的数字间隔来生成范围界限,并行节点可以使用它们来初始化非无限的Clojure序列(可能仍然是惰性的)?当处理范围时,我肯定需要标记哪些序列范围已经完成,以及在节点故障期间如何重试策略。

我正在学习Kubernetes Stateful Sets,以了解有状态服务的部署模式,尽管我尚未遇到适用于此特定问题的模式或解决方案。如果这是一个无状态服务,Kubernetes解决方案就会很明显,但有状态的方法让我在Kubernetes环境中一筹莫展。

有人能否指引我在这里的架构方向?假设我确实想保留Clojure惰性序列中的级数项的状态(即在本地服务内存中),那么我在分割工作方面的策略是否正确?

以下是单节点Clojure服务的相关代码。

英文:

I wrote a web service to generate estimates of Pi, using lazy sequences in Clojure and various infinite series formulae (Euler, Leibniz). The Clojure service sends these estimates over a Server-Sent Events channel. Currently a HTML/JS view is using Vue.js to consume the SSE events and display them.

It works pretty well as a service with a single node, as long as the connection for the SSE channel isn't closed. But as of now it doesn't persist or back up the state of the reductions (the position in the infinite series) to recover from a failure if the connection is closed or the service dies. Also, since the state is contained in local memory in the service (in the Clojure sequence value), there is no horizontal scalability, as there would be if the long-term memory state lived in Redis for example. In this case, just adding new nodes won't offer a way to actually divide the work -- it would just duplicate the same series. Using Redis to offload long-term memory state is the kind of setup I'm used to with stateless web services, to streamline a horizontal-scaling and fault tolerance strategy.

In this stateful case, I'm kind of at a loss as to how to scale the Clojure service with a distributed, multi-node solution, that could process series terms in parallel. Maybe there could be a dispatching "master" service that delegates sequence ranges to different nodes, receives the results from the nodes concurrently (via Redis pub/sub), aggregates them mathematically and yields a resulting SSE stream for the view? And in that case, the master service would use an infinite series of numbers spaced by about a thousand, to yield the range bounds, which the parallel nodes could use to initialize non-infinite Clojure sequences (likely still lazy)? Surely in this case I'd need to mark which sequence ranges are complete as they come in, with a retry strategy in the case of node failure during the processing of a range.

I am studying Kubernetes Stateful Sets to become familiar with deployment patterns for stateful services, though I haven't yet encountered a pattern or solution that fits this specific problem. Were this a stateless service, the Kubernetes solution would be kind of obvious, but a stateful approach leaves me with a blank slate in the Kubernetes environment.

Can anyone point me in a good direction for the architecture here? Assuming I do want to keep the state of series terms encapsulated in Clojure lazy sequences (that is, in local service memory), am I on the right track in my strategy to divide the work?

Here is the relevant code for the single-node Clojure service:

(ns server-sent-events.service
  (:require [io.pedestal.http :as http]
            [io.pedestal.http.sse :as sse]
            [io.pedestal.http.route :as route]
            [io.pedestal.http.route.definition :refer [defroutes]]
            [ring.util.response :as ring-resp]
            [clojure.core.async :as async]
  )
)

(defn seq-of-terms
   [func]
   (map func (iterate (partial + 1) 0))
)

(defn euler-term [n]
  (let [current (+ n 1)] (/ 6.0 (* current current)))
)

; The following returns a lazy list representing iterable sums that estimate pi
; according to the Euler series for increasing amounts of terms in the series.
; Sample usage: (take 100 euler-reductions)
(def euler-reductions
  (map (fn [sum] (Math/sqrt sum))  (reductions + (seq-of-terms euler-term) ))
)

(defn leibniz-term [n] ; starts at zero
   (let [
          oddnum (+ (* 2.0 n) 1.0)
          signfactor (- 1 (* 2 (mod n 2)))
        ]
        (/ (* 4.0 signfactor) oddnum)
  )
)

; The following returns a lazy list representing iterable sums that estimate pi
; according to the Leibniz series for increasing amounts of terms in the series.
; Sample usage: (take 100 leibniz-reductions)
(def leibniz-reductions (reductions + (seq-of-terms leibniz-term)))

(defn send-result
  [event-ch count-num rdcts]
  (doseq [item rdcts]
    (Thread/sleep 150) ; we must use a naive throttle here to prevent an overflow on the core.async CSP channel, event-ch
    (async/put! event-ch (str item))
  )
)

(defn sse-euler-stream-ready
  "Start to send estimates to the client according to the Euler series"
  [event-ch ctx]
  ;; The context is passed into this function.
  (let
    [
      {:keys [request response-channel]} ctx
      lazy-list euler-reductions
    ]
    (send-result event-ch 10 lazy-list)
  )
)

(defn sse-leibniz-stream-ready
  "Start to send estimates to the client according to the Leibniz series"
  [event-ch ctx]
  (let
    [
      {:keys [request response-channel]} ctx
      lazy-list leibniz-reductions
    ]
    (send-result event-ch 10 lazy-list)
  )
)


;; Wire root URL to sse event stream
;; with custom event-id setting
(defroutes routes
  [[["/" {:get [::send-result-euler (sse/start-event-stream sse-euler-stream-ready)]}
    ["/euler" {:get [::send-result
                    (sse/start-event-stream sse-euler-stream-ready)]}]
    ["/leibniz" {:get [::send-result-leibniz
                      (sse/start-event-stream sse-leibniz-stream-ready)]}]
    ]]])

(def url-for (route/url-for-routes routes))

(def service {:env :prod
              ::http/routes routes
              ;; Root for resource interceptor that is available by default.
              ::http/resource-path "/public"
              ;; Either :jetty or :tomcat (see comments in project.clj
              ;; to enable Tomcat)
              ::http/type :jetty
              ::http/port 8080
              ;;::http/allowed-origins ["http://127.0.0.1:8081"]
              }
)

Full code is at <https://github.com/wclark-aburra-code/pi-service>. Inline Vue.js code included, which consumes the SSE stream.

答案1

得分: 1

如果只是用于扩展,我认为你不需要持久化任何东西。你只需要一个分发“主控”(潜在地可以是客户端本身),来从多个后端请求分块序列并重新组装它们以正确的顺序交付。

使用core.async,可以像这样实现一个分发主控:

(let [batch-ch (async/chan)
      out-ch   (async/chan)]

  ;; 请求100个批次(或无限)
  (async/onto-chan batch-ch (range 100))
  ;; 通过将其推送回sse通道来消耗结果
  (async/go-loop []
    (when-let [res (async/<! out-ch)]
      (log/info ::result res)
      (recur)))

  ;;
  ;; 从batch-ch获取每个批次编号并将其并行分发到后端
  ;; 你还可以在这里添加异常处理程序。
  ;;
  (async/pipeline-async
   ;; 并行度
   32
   ;; 输出
   out-ch
   ;; 调用后端服务,这应该立即返回
   (fn [batch ch]
     (let [batch-sz 1000]
       (async/go
         (let [start (* batch batch-sz)
               end   (-> batch inc (* batch-sz))]
           (log/info ::fetching-from-service start end)
           ;; 模拟慢服务
           (async/<! (async/timeout 1000))
           ;; 将结果推回管道并关闭通道
           ;; (这里我只返回术语本身)
           (async/onto-chan ch (range start end)))))))
   ;; 输入
   batch-ch))

注意:上述代码是使用Clojure的core.async库编写的,用于异步编程。它描述了一个用于并行处理数据的简单框架,其中包括将请求发送到多个后端服务并重新组装结果的逻辑。

英文:

If it is just for scaling, I don't think you need to persist anything. All you need is a dispatching "master" (which can potentially be the client itself) to request the chunked sequences from multiple backends and reassemble them to deliver in the right order.

Using core.async, a dispatching master can be implemented like this:

(let [batch-ch (async/chan)
      out-ch   (async/chan)]

  ;; request for 100 batches (or infinite)
  (async/onto-chan batch-ch (range 100))
  ;; consume the result by pushing it back to the sse channel
  (async/go-loop []
    (when-let [res (async/&lt;! out-ch)]
      (log/info ::result res)
      (recur)))

  ;;
  ;; take each batch number from batch-ch and dispatch it to the backend
  ;; in parallel. You would also add an exception handler in here.
  ;;
  (async/pipeline-async
   ;; parallelism
   32
   ;; output
   out-ch
   ;; invoke backend service, this should return immediately
   (fn [batch ch]
     (let [batch-sz 1000]
       (async/go
         (let [start (* batch batch-sz)
               end   (-&gt; batch inc (* batch-sz))]
           (log/info ::fetching-from-service start end)
           ;; simulate a slow service
           (async/&lt;! (async/timeout 1000))
           ;; push the result back to the pipeline and close the channel
           ;; (here I just return the term itself)
           (async/onto-chan ch (range start end))))))
   ;; input  ;;
   batch-ch))

huangapple
  • 本文由 发表于 2020年1月4日 02:01:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/59583242.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定