オブジェクトを順に Ractor に流し込んで、(無限連鎖の)Enumerable(親クラス)として取り出すというものです。
こんな感じです。
ractor_enum.rb
class RactorEnum class End end def initialize @pipe = Ractor.new do loop do e = Ractor.receive if e.instance_of?(RactorEnum::End) Ractor.current.close_outgoing else Ractor.yield e end end end end attr_reader :pipe def each loop { yield @pipe.take } end include Enumerable end
loop do ~ end
はRactor::ClosedError
を捕捉するようです。
re = RactorEnum.new
してre.pipe
に流し込み、Enumerable として取り出します。
re = RactorEnum.new re.pipe << :Hello re.pipe << :World! re.take(2) #=>[:Hello, :World!]
無限連鎖なので、re.map
とかするとフリーズします。lazy化するとよいかも知れません。
re = RactorEnum.new 20.times { re.pipe << rand(97..122) } re.lazy.map(&:chr).first(8).join #=>"mnflrutp"
あるいはRactorEnum::End.new
で流し込みの終わりを指定します。
re = RactorEnum.new 20.times { re.pipe << rand(97..122) } re.pipe << RactorEnum::End.new re.map(&:chr).join #=>"ccwbjfecegtyjwszeued"
Ractor を使えば、後から流し込むこともできます。
re = RactorEnum.new Ractor.new(re) do |re| re.each { puts _1 } end re.pipe << 1 sleep(1) re.pipe << 2 sleep(1) #これがないと"2"を表示する前に終わってしまうことがある
"1" が表示され、1秒後に "2" が表示されます。
重い処理を分散して実行し、流し込んだ順に Enumerable として取り出します。
re = RactorEnum.new #3つの分散処理 3.times do |i| Ractor.new(re, i) do |re, i| #何か重い処理 10.times do sleep(rand(2.0..3.0)) re.pipe << rand(i*10..(i+1)*10-1) end end end re.each_slice(3).with_index(1) do |ary, i| p ary.map { _1 + 1000 } re.pipe.close_outgoing if i == 4 end
出力例。3つずつ流し込まれた時点で出力します。
[1012, 1026, 1000] [1004, 1011, 1029] [1013, 1002, 1024] [1009, 1015, 1020]
適当に数を20個流し込んで、素数があったところで切って出力します。
require "prime" re = RactorEnum.new #サーバー Ractor.new(re) do |re| 20.times do sleep(rand) re.pipe << rand(2..30) end re.pipe << RactorEnum::End.new end re.slice_after(&:prime?).each do |ary| p ary end
出力例。
[5] [14, 29] [13] [10, 28, 19] [15, 30, 3] [15, 13] [5] [23] [6, 3] [13] [15, 8, 28] #最後は素数で終わるとは限らない
分散処理の例。
require "prime" N = 100 re1 = RactorEnum.new re2 = RactorEnum.new 3.times.map do Ractor.new(re1, re2) do |re1, re2| re1.each { |n| re2.pipe << [n, n.prime?] } end end (1..N).each { |i| re1.pipe << i } re1.pipe << RactorEnum::End.new p re2.take(N).sort_by { |n, b| n }
素数があったらその次の 5つを出力する、というのを続ける。RactorEnum を Enumerator に変換しています。
require "prime" re = RactorEnum.new #サーバー Ractor.new(re) do |re| (2..40).each do |i| sleep(rand) re.pipe << i end re.pipe << RactorEnum::End.new end enum = re.to_enum loop do n = enum.next if n.prime? p enum.take(5) end end
出力。
[3, 4, 5, 6, 7] [12, 13, 14, 15, 16] [18, 19, 20, 21, 22] [24, 25, 26, 27, 28] [30, 31, 32, 33, 34] [38, 39, 40]
Ractor#take
があるのでこんなことをしてもあまり意味はないけれど、まあ敢て Enumerable で取り出したかったら…。