Ractor の Enumerable 化?(Ruby)

オブジェクトを順に Ractor に流し込んで、(無限連鎖の)Enumerable(親クラス)として取り出すというものです。
 
こんな感じです。
ractor_enum.rb

class RactorEnum
  class End end
  
  def initialize
    @pipe = Ractor.new do
      loop do
        e = Ractor.receive
        if e.instance_of?(RactorEnum::End)
          Ractor.current.close_outgoing
        else
          Ractor.yield e
        end
      end
    end
  end
  attr_reader :pipe

  def each
    loop { yield @pipe.take }
  end
  
  include Enumerable
end

loop do ~ endRactor::ClosedErrorを捕捉するようです。
 
re = RactorEnum.newしてre.pipeに流し込み、Enumerable として取り出します。

re = RactorEnum.new
re.pipe << :Hello
re.pipe << :World!
re.take(2)    #=>[:Hello, :World!]

無限連鎖なので、re.mapとかするとフリーズします。lazy化するとよいかも知れません。

re = RactorEnum.new
20.times { re.pipe << rand(97..122) }
re.lazy.map(&:chr).first(8).join    #=>"mnflrutp"

 
あるいはRactorEnum::End.newで流し込みの終わりを指定します。

re = RactorEnum.new
20.times { re.pipe << rand(97..122) }
re.pipe << RactorEnum::End.new

re.map(&:chr).join    #=>"ccwbjfecegtyjwszeued"

 
Ractor を使えば、後から流し込むこともできます。

re = RactorEnum.new

Ractor.new(re) do |re|
  re.each { puts _1 }
end

re.pipe << 1
sleep(1)
re.pipe << 2
sleep(1)    #これがないと"2"を表示する前に終わってしまうことがある

"1" が表示され、1秒後に "2" が表示されます。
 
重い処理を分散して実行し、流し込んだ順に Enumerable として取り出します。

re = RactorEnum.new

#3つの分散処理
3.times do |i|
  Ractor.new(re, i) do |re, i|
    #何か重い処理
    10.times do
      sleep(rand(2.0..3.0))
      re.pipe << rand(i*10..(i+1)*10-1)
    end
  end
end

re.each_slice(3).with_index(1) do |ary, i|
  p ary.map { _1 + 1000 }
  re.pipe.close_outgoing if i == 4
end

出力例。3つずつ流し込まれた時点で出力します。

[1012, 1026, 1000]
[1004, 1011, 1029]
[1013, 1002, 1024]
[1009, 1015, 1020]

 
適当に数を20個流し込んで、素数があったところで切って出力します。

require "prime"

re = RactorEnum.new

#サーバー
Ractor.new(re) do |re|
  20.times do
    sleep(rand)
    re.pipe << rand(2..30)
  end
  re.pipe << RactorEnum::End.new
end

re.slice_after(&:prime?).each do |ary|
  p ary
end

出力例。

[5]
[14, 29]
[13]
[10, 28, 19]
[15, 30, 3]
[15, 13]
[5]
[23]
[6, 3]
[13]
[15, 8, 28]    #最後は素数で終わるとは限らない

 
分散処理の例。

require "prime"

N = 100

re1 = RactorEnum.new
re2 = RactorEnum.new

3.times.map do
  Ractor.new(re1, re2) do |re1, re2|
    re1.each { |n| re2.pipe << [n, n.prime?] }
  end
end

(1..N).each { |i| re1.pipe << i }
re1.pipe << RactorEnum::End.new

p re2.take(N).sort_by { |n, b| n }

 
素数があったらその次の 5つを出力する、というのを続ける。RactorEnum を Enumerator に変換しています。

require "prime"

re = RactorEnum.new

#サーバー
Ractor.new(re) do |re|
  (2..40).each do |i|
    sleep(rand)
    re.pipe << i
  end
  re.pipe << RactorEnum::End.new
end

enum = re.to_enum

loop do
  n = enum.next
  if n.prime?
    p enum.take(5)
  end
end

出力。

[3, 4, 5, 6, 7]
[12, 13, 14, 15, 16]
[18, 19, 20, 21, 22]
[24, 25, 26, 27, 28]
[30, 31, 32, 33, 34]
[38, 39, 40]

 

Ractor#takeがあるのでこんなことをしてもあまり意味はないけれど、まあ敢て Enumerable で取り出したかったら…。