オブジェクトを順に Ractor に流し込んで、(無限連鎖の)Enumerable(親クラス)として取り出すというものです。
こんな感じです。
ractor_enum.rb
class RactorEnum
class End end
def initialize
@pipe = Ractor.new do
loop do
e = Ractor.receive
if e.instance_of?(RactorEnum::End)
Ractor.current.close_outgoing
else
Ractor.yield e
end
end
end
end
attr_reader :pipe
def each
loop { yield @pipe.take }
end
include Enumerable
end
loop do ~ end
はRactor::ClosedError
を捕捉するようです。
re = RactorEnum.new
してre.pipe
に流し込み、Enumerable として取り出します。
re = RactorEnum.new
re.pipe << :Hello
re.pipe << :World!
re.take(2)
無限連鎖なので、re.map
とかするとフリーズします。lazy化するとよいかも知れません。
re = RactorEnum.new
20.times { re.pipe << rand(97..122) }
re.lazy.map(&:chr).first(8).join
あるいはRactorEnum::End.new
で流し込みの終わりを指定します。
re = RactorEnum.new
20.times { re.pipe << rand(97..122) }
re.pipe << RactorEnum::End.new
re.map(&:chr).join
Ractor を使えば、後から流し込むこともできます。
re = RactorEnum.new
Ractor.new(re) do |re|
re.each { puts _1 }
end
re.pipe << 1
sleep(1)
re.pipe << 2
sleep(1)
"1" が表示され、1秒後に "2" が表示されます。
重い処理を分散して実行し、流し込んだ順に Enumerable として取り出します。
re = RactorEnum.new
3.times do |i|
Ractor.new(re, i) do |re, i|
10.times do
sleep(rand(2.0..3.0))
re.pipe << rand(i*10..(i+1)*10-1)
end
end
end
re.each_slice(3).with_index(1) do |ary, i|
p ary.map { _1 + 1000 }
re.pipe.close_outgoing if i == 4
end
出力例。3つずつ流し込まれた時点で出力します。
[1012, 1026, 1000]
[1004, 1011, 1029]
[1013, 1002, 1024]
[1009, 1015, 1020]
適当に数を20個流し込んで、素数があったところで切って出力します。
require "prime"
re = RactorEnum.new
Ractor.new(re) do |re|
20.times do
sleep(rand)
re.pipe << rand(2..30)
end
re.pipe << RactorEnum::End.new
end
re.slice_after(&:prime?).each do |ary|
p ary
end
出力例。
[5]
[14, 29]
[13]
[10, 28, 19]
[15, 30, 3]
[15, 13]
[5]
[23]
[6, 3]
[13]
[15, 8, 28]
分散処理の例。
require "prime"
N = 100
re1 = RactorEnum.new
re2 = RactorEnum.new
3.times.map do
Ractor.new(re1, re2) do |re1, re2|
re1.each { |n| re2.pipe << [n, n.prime?] }
end
end
(1..N).each { |i| re1.pipe << i }
re1.pipe << RactorEnum::End.new
p re2.take(N).sort_by { |n, b| n }
素数があったらその次の 5つを出力する、というのを続ける。RactorEnum を Enumerator に変換しています。
require "prime"
re = RactorEnum.new
Ractor.new(re) do |re|
(2..40).each do |i|
sleep(rand)
re.pipe << i
end
re.pipe << RactorEnum::End.new
end
enum = re.to_enum
loop do
n = enum.next
if n.prime?
p enum.take(5)
end
end
出力。
[3, 4, 5, 6, 7]
[12, 13, 14, 15, 16]
[18, 19, 20, 21, 22]
[24, 25, 26, 27, 28]
[30, 31, 32, 33, 34]
[38, 39, 40]
Ractor#take
があるのでこんなことをしてもあまり意味はないけれど、まあ敢て Enumerable で取り出したかったら…。