Ractor の Enumerable 化?(Ruby)

オブジェクトを順に Ractor に流し込んで、(無限連鎖の)Enumerable(親クラス)として取り出すというものです。
 
こんな感じです。
ractor_enum.rb

class RactorEnum
  class End end
  
  def initialize
    @pipe = Ractor.new do
      loop do
        e = Ractor.receive
        if e.instance_of?(RactorEnum::End)
          Ractor.current.close_outgoing
        else
          Ractor.yield e
        end
      end
    end
  end
  attr_reader :pipe

  def each
    loop { yield @pipe.take }
  end
  
  include Enumerable
end

loop do ~ endRactor::ClosedErrorを捕捉するようです。
 
re = RactorEnum.newしてre.pipeに流し込み、Enumerable として取り出します。

re = RactorEnum.new
re.pipe << :Hello
re.pipe << :World!
re.take(2)    #=>[:Hello, :World!]

無限連鎖なので、re.mapとかするとフリーズします。lazy化するとよいかも知れません。

re = RactorEnum.new
20.times { re.pipe << rand(97..122) }
re.lazy.map(&:chr).first(8).join    #=>"mnflrutp"

 
あるいはRactorEnum::End.newで流し込みの終わりを指定します。

re = RactorEnum.new
20.times { re.pipe << rand(97..122) }
re.pipe << RactorEnum::End.new

re.map(&:chr).join    #=>"ccwbjfecegtyjwszeued"

 
Ractor を使えば、後から流し込むこともできます。

re = RactorEnum.new

Ractor.new(re) do |re|
  re.each { puts _1 }
end

re.pipe << 1
sleep(1)
re.pipe << 2
sleep(1)    #これがないと"2"を表示する前に終わってしまうことがある

"1" が表示され、1秒後に "2" が表示されます。
 
重い処理を分散して実行し、流し込んだ順に Enumerable として取り出します。

re = RactorEnum.new

#3つの分散処理
3.times do |i|
  Ractor.new(re, i) do |re, i|
    #何か重い処理
    10.times do
      sleep(rand(2.0..3.0))
      re.pipe << rand(i*10..(i+1)*10-1)
    end
  end
end

re.each_slice(3).with_index(1) do |ary, i|
  p ary.map { _1 + 1000 }
  re.pipe.close_outgoing if i == 4
end

出力例。3つずつ流し込まれた時点で出力します。

[1012, 1026, 1000]
[1004, 1011, 1029]
[1013, 1002, 1024]
[1009, 1015, 1020]

 
適当に数を20個流し込んで、素数があったところで切って出力します。

require "prime"

re = RactorEnum.new

#サーバー
Ractor.new(re) do |re|
  20.times do
    sleep(rand)
    re.pipe << rand(2..30)
  end
  re.pipe << RactorEnum::End.new
end

re.slice_after(&:prime?).each do |ary|
  p ary
end

出力例。

[5]
[14, 29]
[13]
[10, 28, 19]
[15, 30, 3]
[15, 13]
[5]
[23]
[6, 3]
[13]
[15, 8, 28]    #最後は素数で終わるとは限らない

 
分散処理の例。

require "prime"

N = 100

re1 = RactorEnum.new
re2 = RactorEnum.new

3.times.map do
  Ractor.new(re1, re2) do |re1, re2|
    re1.each { |n| re2.pipe << [n, n.prime?] }
  end
end

(1..N).each { |i| re1.pipe << i }
re1.pipe << RactorEnum::End.new

p re2.take(N).sort_by { |n, b| n }

 
素数があったらその次の 5つを出力する、というのを続ける。RactorEnum を Enumerator に変換しています。

require "prime"

re = RactorEnum.new

#サーバー
Ractor.new(re) do |re|
  (2..40).each do |i|
    sleep(rand)
    re.pipe << i
  end
  re.pipe << RactorEnum::End.new
end

enum = re.to_enum

loop do
  n = enum.next
  if n.prime?
    p enum.take(5)
  end
end

出力。

[3, 4, 5, 6, 7]
[12, 13, 14, 15, 16]
[18, 19, 20, 21, 22]
[24, 25, 26, 27, 28]
[30, 31, 32, 33, 34]
[38, 39, 40]

 

Ractor#takeがあるのでこんなことをしてもあまり意味はないけれど、まあ敢て Enumerable で取り出したかったら…。

不思議な"&"(Ruby)

こんなコードが可能なのだな。
 

(2..10).map(&"5".to_i.method(:to_s))
#=>["101", "12", "11", "10", "5", "5", "5", "5", "5"]

やっていることは、"5" を 2~10 進数表記で表現するということですが…。意味はありません。

もうひとつ。

(1..5).map(&"Ruby!".method(:slice).curry(2).call(0))
#=>["R", "Ru", "Rub", "Ruby", "Ruby!"]

意味がわからないな(笑)。

Object#method とカリー化で関数型っぽく(Ruby)

Object#method はメソッドをオブジェクト化するものである。
 

Method は取り出しの対象であるメソッドがなければ作れませんが、Proc は準備なしに作れます。その点から Proc は使い捨てに向き、Method は何度も繰り返し生成する場合に向くと言えます。また内包するコードの大きさという点では Proc は小規模、Method は大規模コードに向くと言えます。

https://docs.ruby-lang.org/ja/latest/class/Method.html

 
例として、ここダイクストラ法のメソッドを使ってみる。まず、Hash で全体のグラフ構造を与える。

graph = {s: {t: 6, y: 4}, t: {x: 3, y: 2},
         x: {z: 4}, y: {z: 3, t: 1, x: 9}, z: {s: 7, x: 5}}


このグラフに対して、始点を与えてダイクストラ法を実行する関数を、Method オブジェクトから作ってみる。

dijkstra = method(:dijkstra).curry
give_shortest = dijkstra.(graph)


これで、例えば始点:s:xを与えて最短経路が求められる。

give_shortest.(:s).first    #=>{:s=>0, :t=>5, :y=>4, :z=>7, :x=>8}
give_shortest.(:x).first    #=>{:x=>0, :z=>4, :s=>11, :t=>16, :y=>15}

例えば:sを始点としたとき、:xまでの最短距離は8であるとわかる。同様に、:xを始点としたとき、:sまでの最短距離は11である。


次いで、始点と終点を与え、最短距離と最短経路を出力する関数を作ってみる。

calc_path = ->(dijkstra_func, start, goal) {
  shortest, pred = dijkstra_func.(start)
  
  route = [goal]
  route.unshift(pred[route[0]]) until route[0] == start

  [shortest[goal], route]
}.curry
shortest_path = calc_path.(give_shortest)


こんな風に使える。

shortest_path.(:s, :x)    #=>[8, [:s, :y, :t, :x]]
shortest_path.(:x, :s)    #=>[11, [:x, :z, :s]]

つまり、始点が:sで終点が:xのとき、最短経路は8で、その経路は s→y→t→x の順であるとわかる。


このように、Method オブジェクトがカリー化されたgive_shortestが、何度も使い回されていることがわかると思う。これを変えれば、同じコードで別のグラフにも簡単に対応できる。

メソッドと Proc の相互変換(Ruby)

これらができたからとて、特にうれしいことはない感じです。

メソッド→Proc。Object#method と Method#to_proc を使う。

m = 100.method(:to_s)

p m    #=>#<Method: Integer#to_s(*)>
p m.call(2)    #=>"1100100"

pr = m.to_proc

p pr    #=>#<Proc:0x000055e967ad8300 (lambda)>
p pr.call(16)    #=>"64"

 
Proc→メソッド。Proc は無名関数なので、メソッド名が必要。あとは Module#define_method を使えばよい。

pr = ->(n, base) {n.to_s(base)}

define_method(:to_string, pr)

p to_string(100, 16)    #=>"64"

最大値をもつものを集める(Ruby)

例えば都道府県名をローマ字化したものから、文字数の最大値と、その文字数をもつすべての県名を得たいとする。そのとき、こんなメソッドを作ってみるといいかも知れない。
 

module ExEnumerable
  refine Enumerable do
    def max_select
      pool = []
      max_num = -Float::INFINITY
      each do |i|
        n = yield(i)
        if n > max_ num
          max_ num = n
          pool = [i]
        elsif n == max_ num
          pool << i
        end
      end
      [max_ num, pool]
    end
  end

  [Array, Enumerator, Hash].each {|mdl| mdl.include Enumerable}
end

Enumerable#max_selectは、ブロックの返り値の最大値(max_num)を求めて、そのような最大値になるようなものをレシーバーから集め(pool)、[max_num, pool]を返す。

これを使って、最初の課題を解いてみる。

require "open-uri"
using ExEnumerable

url = "https://gist.githubusercontent.com/koseki/38926/raw/671d5279db1e5cb2c137465e22424c6ba27f4524/todouhuken.txt"
prefectures = URI.open(url).each_line.map {|l| l.chomp.split.last}
prefectures.max_select(&:size)
#=>[9, ["fukushima", "yamanashi", "hiroshima", "yamaguchi", "tokushima", "kagoshima"]]

9文字が最大値だとわかる。県名のリストも返ってくる。


※参考
Ruby で関数型プログラミングっぽく(コピペ) + Haskell 版 - Camera Obscura

Enumerator::Lazy でエラトステネスの篩(Ruby)

Enumerator::Lazy は無限数列を扱うことができるが、それを使って「エラトステネスの篩」をちょっとおもしろく実装できることに気づいた。何はともあれコードである。

lazy_prime.rb

prime_seq = Enumerator.new do |y|
  sieve = 2.step.lazy
  loop do
    a = sieve.first
    y << a
    sieve = sieve.reject {|x| (x % a).zero?}
  end
end

prime_seq.take(10)    #=>[2, 3, 5, 7, 11, 13, 17, 19, 23, 29]

 
Enumerator::Lazy はsieve = 2.step.lazyのところで生成している。これの先頭aは必ず素数なので、素数列として Enumerator で取り出す(y << a)。そして、sieveからaの倍数をrejectですべて篩い落とし(Lazy だから可能なのである)、そうされたものを新しいsieveとする。以下、繰り返し、というわけである。

これは子供の頃に紙と鉛筆でやってみた「エラトステネスの篩」のやり方そのものである。小さい方から素数に丸でも打っておいて、その倍数を斜線で消していく…。実装としては、素直といえば素直だ。

しかし、これ、めちゃめちゃに遅いのはすぐわかる。素数ひとつ得るたびに、Enumerator::Lazy オブジェクトを作り直しているのだから、どれほど遅いかというと、例えば最初の 500個の素数を得るのに、この方法だとわたしの環境でなんと 2.8秒ほどもかかる。標準添付ライブラリの prime を使うと 0.0002秒ほどだから、比較にもならない。

でもまあ、ちょっとおもしろかったので書いてみた。


なお、単なる Enumerator ももちろん無限数列を扱うことができるが、mapselect、それにここで使ったrejectなどの結果が有限数列(Array)になってしまう。なので、Enumerator::Lazy を使ったのである。為念。

MP3ファイルの分割(ffmpeg, Ruby)

ffmpegRuby を使って、MP3ファイルを分割します。

こんな感じ。
cut_mp3.rb

file = DATA.gets.chomp
ts = DATA.gets.split.map {|t| t.split(":").map(&:to_i)}

Dir.chdir(File.dirname(file))
bname = File.basename(file, ".mp3")
ts.map {_1 * 3600 + _2 * 60 + _3}.each_cons(2).with_index(1) do |(s, e), i|
  `ffmpeg -i "#{file}" -ss #{s} -t #{e - s} "#{bname + ("_%02d" % i)}.mp3"`
end


__END__
/home/***/Music/TAPEMP3/Bach.mp3
00:00:00 00:18:20 00:36:57 00:56:20 01:14:23 01:38:45 01:58:20

これだと、パス /home/***/Music/TAPEMP3/Bach.mp3 を6ファイルに分割します。最初のファイルは 00:00:00~00:18:20 まで、2番目のファイルは 00:18:20~00:36:57 までで、最後のファイルは 01:38:45~01:58:20 までとなります。