英文:
Concurrent-ruby async method calling problem
问题
Ruby版本:ruby-3.2.1
我有一个要求,需要读取一个包含50,000行的CSV文件,然后针对每一行依次执行7个API,以创建一个第三方应用程序中的条目。
我正在尝试使用concurrent-ruby gem的异步功能。以下是相同的示例代码。
require 'csv'
require "json"
require 'byebug'
require 'concurrent'
class Processor
include Concurrent::Async
def consume(c_row)
## 在consume方法内,按顺序调用了7个API来处理每一行
puts "processing!!!! -> #{c_row}"
sleep 1 if c_row % 2
sleep 5 if c_row % 3
end
end
class ImporterAsync
def self.perform(master_file)
begin
## CSV有50,000条记录 ##
CSV.foreach(master_file, headers: true).with_index(1) do |row|
Processor.new.async.consume(row)
end
rescue StandardError => e
puts "Main script failed: #{e.message}"
ensure
puts 'closing'
end
end
end
ImporterAsync.perform('master_file.csv')
当我使用异步关键字Processor.new.async.consume(row)
调用处理器时,代码没有响应。
但是,如果不使用异步关键字,代码可以正常执行。
Processor.new.consume(row)
。
我相信使用异步方式将有助于我的代码在这种负载下更好地执行。我也愿意听取建议。
提前感谢 - Ajith
英文:
Ruby version: ruby-3.2.1
I have a requirement of reading a csv file which contains 50k rows. Then with each row i need to execute 7 APIs one by one in the order to create an entry in a third part application.
I am trying to use async feature of concurrent-ruby gem. Below is a sample code of the same.
require 'csv'
require "json"
require 'byebug'
require 'concurrent'
class Processor
include Concurrent::Async
def consume(c_row)
## Inside consume methods 7 APIS are called in sequence for each row
puts "processing!!!! -> #{c_row}"
sleep 1 if c_row % 2
sleep 5 if c_row % 3
end
end
class ImporterAsync
def self.perform(master_file)
begin
## CSV has 50k entries ##
CSV.foreach(master_file, headers: true).with_index(1) do |row|
Processor.new.async.consume(row)
end
rescue StandardError => e
puts "Main script failed: #{e.message}"
ensure
puts 'closing'
end
end
end
ImporterAsync.perform('master_file.csv')
I am getting a no response from the code when I invoke the processor with async keyword Processor.new.async.consume(row)
But code executes fine without async keyword.
Processor.new.consume(row)
.
I believe with asynchronous way will help my code to execute better with this load. I am also open for suggestions.
Thanks in advance - Ajith
答案1
得分: 1
你的程序在创建异步线程后立即退出。你需要等待它们,否则它们会随着程序的其余部分终止。
像这样使用异步实际上会返回一个Concurrent::IVar,你可以使用它来跟踪每个线程上的工作,使用#value
。
这是你的类的修改版本,将每个IVar收集到一个数组中。在启动它们后,它会等待每一个完成。这意味着程序会在所有线程都完成之前等待。(请注意,完成也可能意味着出现异常,这并不意味着“良好”的完成。)
英文:
Your program effectively exits immediately after creating the async threads. You need to wait for them or they'll terminate with the rest of the program.
Using async like this actually returns a Concurrent::IVar that you can use to track the work in each one on #value
.
class ImporterAsync
def self.perform(master_file)
futures = []
begin
CSV.foreach(master_file, headers: true).with_index(1) do |row|
future = Processor.new.async.consume(row)
futures.push(future)
end
rescue StandardError => e
puts "Main script failed: #{e.message}"
ensure
puts 'closing'
end
futures.each do |t|
t.wait
puts "finished (#{t.value.to_s[0...40].strip}) #{Time.now}"
end
puts "ALL DONE"
end
end
This is an altered version of your class to collect each IVar up into an array. After starting them all it waits for each one to finish. This means the program will wait for ALL of them before exiting. (Note that finished can also mean an exception was raised, it doesn't imply a "good" finish.)
答案2
得分: 0
我参考了以下链接以了解发生了什么。
https://stackoverflow.com/questions/32492560/celluloid-async-inside-ruby-blocks-does-not-work
但我仍然愿意听取建议。
英文:
Actually i referred the below link to get the understanding of what is happening.
https://stackoverflow.com/questions/32492560/celluloid-async-inside-ruby-blocks-does-not-work
But i am still open for suggestions.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论