ロジスティック回帰のROC曲線のコード

 今回は下記書籍のロジスティック回帰のROC曲線のコードを Ruby で実装してみたいと思います。

ITエンジニアのための機械学習理論入門

www.amazon.co.jp

 サンプルコードはこちらで公開されています。

github.com

データセットの更新

 Python版では計算結果の確率をデータセットに反映する際に、下記のように pandas.DataFrame.ix メソッドを使って対象を特定し、値を代入しています。

p = 1.0/(1.0+np.exp(-a))
training_set.ix[index, 'probability'] = p

pandas.DataFrame.ix — pandas 0.19.2 documentation

 Ruby版ではデータの取り出しと代入を一度では行えなさそうだったので、 Daru::DataFrame#row で Vector オブジェクトを取得し、値を更新した上で Daru::DataFrame#set_row_at で更新するやり方にしてみました。

p = 1.0 / (1.0 + Math.exp(-a))
v = train_set.row[index]
v[:probability] = p
train_set.set_row_at([index], v)

Method: Daru::DataFrame#row — Documentation for daru (0.1.4.1)

Method: Daru::DataFrame#set_row_at — Documentation for daru (0.1.4.1)

データセットのソート

 Python版では pandas.DataFrame.sort_values メソッドを使ってデータセットを確率の高い順にソートしています。

result = training_set.sort_values(by=['probability'], ascending=[False])

pandas.DataFrame.sort_values — pandas 0.19.2 documentation

 Ruby版では Daru::DataFrame#sort メソッドを使ってソートしました。

sorted_train_set = train_set.sort([:probability], ascending: false)

Method: Daru::DataFrame#sort — Documentation for daru (0.1.4.1)

Class: Daru::DataFrame — Documentation for daru (0.1.4.1)

Indexの振り直し

 Python版ではソート後のデータセットのインデックスを pandas.DataFrame.reset_index メソッドで振り直しています。

return result.reset_index()

pandas.DataFrame.reset_index — pandas 0.19.2 documentation

 Ruby版では単純にインデックスを振り直すやり方がみつからなかったので、Indexに入れる値を別の列のデータとして追加した後、 Daru::DataFrame#reindex でその列をインデックスとして使うように変更しました。

Method: Daru::DataFrame#set_index — Documentation for daru (0.1.4.1)

sorted_train_set[:index] = sorted_train_set.size.times.to_a
sorted_train_set.set_index(:index)

スクリプト全体

 上記以外のデータセットの用意やロジスティック回帰の実施部分などは前回とほぼ同じです。ここまでの内容を踏まえて、コード全体としては下記のように実装しました。

# ロジスティック回帰のROC曲線

require 'daru'
require 'nyaplot'
require 'numo/narray'

Variances = [50, 150] # 両クラス共通の分散(2種類の分散で計算を実施)

def normal_rand(mu = 0, sigma = 1.0)
  random = Random.new
  (Math.sqrt(-2 * Math.log(random.rand)) * Math.sin(2 * Math::PI * random.rand) * sigma) + mu
end

# データセット {x_n,y_n,type_n} を用意
def prepare_dataset(variance)
  n1 = 80
  n2 = 200
  mu1 = [9, 9]
  mu2 = [-3, -3]

  sigma = Math.sqrt(variance)

  df1 = n1.times.map do
    [normal_rand(mu1[0], sigma), normal_rand(mu1[1], sigma)]
  end
  df1 = df1.transpose
  df1 = Daru::DataFrame.new(x: df1[0], y: df1[1], type: Array.new(n1).fill(1))

  df2 = n2.times.map do
    [normal_rand(mu2[0], sigma), normal_rand(mu2[1], sigma)]
  end
  df2 = df2.transpose
  df2 = Daru::DataFrame.new(x: df2[0], y: df2[1], type: Array.new(n2).fill(0))

  df = df1.concat(df2)
  df = df.reindex(Daru::Index.new(df.index.to_a.shuffle))
  df[:index] = (n1 + n2).times.to_a
  df.set_index(:index)
end

# ロジスティック回帰
def run_logistic(train_set, plot)
  w = Numo::NArray[[0], [0.1], [0.1]]
  phi = train_set[:x, :y]
  phi[:bias] = Array.new(phi.size).fill(1)
  t = train_set[:type]
  t = t.to_matrix

  # 最大100回のIterationを実施
  100.times do
    # IRLS法によるパラメータの修正
    y = []
    phi.each_row do |line|
      a = Vector[*line.to_a].dot(w)
      y << (1.0 / (1.0 + Math.exp(-a)))
    end
    r = Matrix.diagonal(*(Numo::NArray[*y] * (1 - Numo::NArray[*y]).to_a))
    y = Numo::NArray[y].transpose
    tmp1 = Matrix[*Numo::NArray[*phi.to_matrix.transpose.to_a].dot(Numo::NArray[*r.to_a]).dot(Numo::NArray[*phi.to_matrix.to_a]).to_a].inverse
    tmp2 = Numo::NArray[*phi.to_matrix.transpose.to_a].dot(y - Numo::NArray[*t.transpose.to_a])
    w_new = w - Numo::NArray[*tmp1.to_a].dot(tmp2)

    # パラメータの変化が 0.1% 未満になったら終了
    if (w_new - w).transpose.dot(w_new - w).flatten[0] < (0.001 * (w.transpose.dot(w))).flatten[0]
      w = w_new
      break
    end
  end

  # 分類誤差の計算と確率付きデータの用意
  d0 = w[0]
  dx = w[1]
  dy = w[2]
  err = 0
  train_set[:probability] = Array.new(train_set.size).fill(0.0)

  train_set.each_row_with_index do |line, index|
    a = Vector[1, line.x, line.y].dot(w)
    p = 1.0 / (1.0 + Math.exp(-a))
    v = train_set.row[index]
    v[:probability] = p
    train_set.set_row_at([index], v)
    if (p - 0.5) * (line[:type] * 2 - 1) < 0
      err += 1
    end
  end

  err_rate = err * 100 / train_set.size

  # 境界線(P=0.5)を表示
  xmin = train_set.x.min - 5
  xmax = train_set.x.max + 5
  linex = Numo::NArray[*(xmin.to_i-5..xmax.to_i+4).to_a]
  liney = -linex * dx / dy - d0 / dy
  label = "ERR %.2f%%" % err_rate

  line_err = plot.add(:line, linex.cast_to(Numo::Int64).to_a, liney.cast_to(Numo::Int64).to_a)
  line_err.title(label)
  line_err.color('blue')

  # 確率付きデータを返却
  sorted_train_set = train_set.sort([:probability], ascending: false)
  sorted_train_set[:index] = sorted_train_set.size.times.to_a
  sorted_train_set.set_index(:index)
end

def run_simulation(variance, plot)
  train_set = prepare_dataset(variance)
  train_set1 = train_set.filter_rows {|row| row[:type] == 1 }
  train_set2 = train_set.filter_rows {|row| row[:type] == 0 }
  ymin = train_set.y.min - 5
  xmin = train_set.x.min - 5
  ymax = train_set.y.max + 10
  xmax = train_set.x.max + 10
  plot.configure do
    x_label("Variance: #{variance}")
    y_label('')
    xrange([xmin, xmax])
    yrange([ymin, ymax])
    legend(true)
    height(300)
    width(490)
  end

  scatter_true = plot.add_with_df(train_set1.to_nyaplotdf, :scatter, :x, :y)
  scatter_true.color('green')
  scatter_true.title('1')
  scatter_false = plot.add_with_df(train_set2.to_nyaplotdf, :scatter, :x, :y)
  scatter_false.color('orange')
  scatter_false.title('0')

  run_logistic(train_set, plot)
end

def draw_roc(result, plot)
  positives = result.filter_rows {|row| row[:type] == 1 }.size
  negatives = result.filter_rows {|row| row[:type] == 0 }.size
  tp = result.size.times.map { 0.0 }
  fp = result.size.times.map { 0.0 }
  result.each_row_with_index do |line, index|
    result.size.times do |c|
      if index < c
        if line[:type] == 1
          tp[c] += 1
        else
          fp[c] += 1
        end
      end
    end
  end
  
  tp_rate = Numo::NArray[*tp] / positives
  fp_rate = Numo::NArray[*fp] / negatives

  plot.configure do
    x_label('False positive rate')
    y_label('True positive rate')
    xrange([0, 1])
    yrange([0, 1])
    height(300)
    width(400)
  end

  plot.add(:line, fp_rate, tp_rate)
end

fig = Nyaplot::Frame.new

Variances.each do |variance|
  plot = Nyaplot::Plot.new
  result = run_simulation(variance, plot)
  fig.add(plot)

  plot = Nyaplot::Plot.new
  draw_roc(result, plot)
  fig.add(plot)
end

fig.show

 これを Jupyter Notebook 上で実行すると下記のようなグラフが表示されます。

f:id:akanuma-hiroaki:20170130074649p:plain

 コードは下記にも公開してあります。

github.com

ロジスティック回帰とパーセプトロンの比較コード

 今回は下記書籍のロジスティック回帰とパーセプトロンの比較のコードを Ruby で実装してみたいと思います。

ITエンジニアのための機械学習理論入門 https://www.amazon.co.jp/IT-ebook/dp/B016Q22IX2/

 サンプルコードはこちらで公開されています。

github.com

ロジスティック回帰のコード

 データセットの用意やパーセプトロンのアルゴリズムの実装については前回までと基本的な内容は同じです。ロジスティック回帰のメソッドは下記のように実装しました。

def run_logistic(train_set, plot)
  w = Numo::NArray[[0], [0.1], [0.1]]
  phi = train_set[:x, :y]
  phi[:bias] = Array.new(phi.size).fill(1)
  t = train_set[:type]
  t = t.to_matrix

  # 最大30回のIterationを実施
  30.times do
    # IRLS法によるパラメータの修正
    y = []
    phi.each_row do |line|
      a = Vector[*line.to_a].dot(w)
      y << (1.0 / (1.0 + Math.exp(-a)))
    end
    r = Matrix.diagonal(*(Numo::NArray[*y] * (1 - Numo::NArray[*y]).to_a))
    y = Numo::NArray[y].transpose
    tmp1 = Matrix[*Numo::NArray[*phi.to_matrix.transpose.to_a].dot(Numo::NArray[*r.to_a]).dot(Numo::NArray[*phi.to_matrix.to_a]).to_a].inverse
    tmp2 = Numo::NArray[*phi.to_matrix.transpose.to_a].dot(y - Numo::NArray[*t.transpose.to_a])
    w_new = w - Numo::NArray[*tmp1.to_a].dot(tmp2)

    # パラメータの変化が 0.1% 未満になったら終了
    if (w_new - w).transpose.dot(w_new - w).flatten[0] < (0.001 * (w.transpose.dot(w))).flatten[0]
      w = w_new
      break
    end
  end

  # 分類誤差の計算
  w0 = w[0]
  w1 = w[1]
  w2 = w[2]
  err = 0
  train_set.each_row do |point|
    x = point.x
    y = point.y
    type = point[:type]
    type = type * 2 - 1
    if type * (w0 + w1*x + w2*y) < 0
      err += 1
    end
  end

  err_rate = err * 100 / train_set.size

  # 結果を表示
  xmin = train_set.x.min - 5
  xmax = train_set.x.max + 10
  linex = Numo::NArray[*(xmin.to_i-5..xmax.to_i+4).to_a]
  liney = -linex * w1 / w2 - w0 / w2
  label = "ERR %.2f%%" % err_rate

  line_err = plot.add(:line, linex.cast_to(Numo::Int64).to_a, liney.cast_to(Numo::Int64).to_a)
  line_err.title(label)
  line_err.color('blue')
end

内積の計算

 python では内積の計算を numpy.dot メソッドで行なっています。

numpy.dot — NumPy v1.12 Manual

高速数値計算ライブラリ「Numpy」覚書き - Pashango’s Blog

 ruby では Daru::DataFrame にはそのまま内積を計算する処理はなさそうだったので、Vectorに変換した上で dot メソッドを使いました。

instance method Vector#dot (Ruby 2.4.0)

Ruby2.2 ではアレが死ぬほど使いやすくなるの! - Qiita

a = Vector[*line.to_a].dot(w)

入力配列を対角要素とする配列

 python では numpy.diag メソッドで、入力配列を対角要素とする配列を取得しています。

numpy.diag — NumPy v1.10 Manual

r = np.diag(y*(1-y)) 

 ruby では Matrix.diagonal メソッドを使用しました。

singleton method Matrix.diagonal (Ruby 2.4.0)

r = Matrix.diagonal(*(Numo::NArray[*y] * (1 - Numo::NArray[*y]).to_a))

パラメータの計算

 python での実装は下記のようになっています。

tmp1 = np.linalg.inv(np.dot(np.dot(phi.T, r),phi))
tmp2 = np.dot(phi.T, (y-t))
w_new = w - np.dot(tmp1, tmp2)

 内積の計算用の dot メソッドや、転置行列を求める T メソッド、逆行列を求める numpy.linalog.inv メッソドなどが使われています。

 ruby で同様の処理を実装しようとしたところ、 Daru::DataFrame、 Numo::NArray、 Matrix それぞれで持っているメソッドを使うための変換処理でだいぶ煩雑になってしまいました。我ながらなんともイケてないコードです。前回までの実装もそうですが、Daru::DataFrame、Numo::NArray、Ruby標準のMatrix/Vector のどれか一つで基本的に処理することができればもっとすっきりかけるしやりようはあるのだろうと思っているのですが、まだその辺の使い方がよくわかっていません。元のpythonのコードの流れをそのままトレースするのではなく、そもそもどんな処理なのかをちゃんと理解して書く必要があるというのを改めて感じました。

tmp1 = Matrix[*Numo::NArray[*phi.to_matrix.transpose.to_a].dot(Numo::NArray[*r.to_a]).dot(Numo::NArray[*phi.to_matrix.to_a]).to_a].inverse
tmp2 = Numo::NArray[*phi.to_matrix.transpose.to_a].dot(y - Numo::NArray[*t.transpose.to_a])
w_new = w - Numo::NArray[*tmp1.to_a].dot(tmp2)

 それ以降の処理についてはパーセプトロンの場合とあまり変わらないので割愛します。

スクリプト全体

 コード全体では下記のように実装しました。

# ロジスティック回帰とパーセプトロンの比較

require 'daru'
require 'nyaplot'
require 'numo/narray'

Variances = [5, 10, 30, 50] # 両クラス共通の分散(4種類の分散で計算を実施)

class Array
  def mean
    self.inject(:+) / self.size.to_f
  end
end

def normal_rand(mu = 0, sigma = 1.0)
  random = Random.new
  (Math.sqrt(-2 * Math.log(random.rand)) * Math.sin(2 * Math::PI * random.rand) * sigma) + mu
end

# データセット {x_n,y_n,type_n} を用意
def prepare_dataset(variance)
  n1 = 10
  n2 = 20
  mu1 = [7, 7]
  mu2 = [-3, -3]

  sigma = Math.sqrt(variance)

  df1 = n1.times.map do
    [normal_rand(mu1[0], sigma), normal_rand(mu1[1], sigma)]
  end
  df1 = df1.transpose
  df1 = Daru::DataFrame.new(x: df1[0], y: df1[1], type: Array.new(n1).fill(1))

  df2 = n2.times.map do
    [normal_rand(mu2[0], sigma), normal_rand(mu2[1], sigma)]
  end
  df2 = df2.transpose
  df2 = Daru::DataFrame.new(x: df2[0], y: df2[1], type: Array.new(n2).fill(0))

  df = df1.concat(df2)
  df = df.reindex(Daru::Index.new(df.index.to_a.shuffle))
  df[:index] = (n1 + n2).times.to_a
  df.set_index(:index)
end

# ロジスティック回帰
def run_logistic(train_set, plot)
  w = Numo::NArray[[0], [0.1], [0.1]]
  phi = train_set[:x, :y]
  phi[:bias] = Array.new(phi.size).fill(1)
  t = train_set[:type]
  t = t.to_matrix

  # 最大30回のIterationを実施
  30.times do
    # IRLS法によるパラメータの修正
    y = []
    phi.each_row do |line|
      a = Vector[*line.to_a].dot(w)
      y << (1.0 / (1.0 + Math.exp(-a)))
    end
    r = Matrix.diagonal(*(Numo::NArray[*y] * (1 - Numo::NArray[*y]).to_a))
    y = Numo::NArray[y].transpose
    tmp1 = Matrix[*Numo::NArray[*phi.to_matrix.transpose.to_a].dot(Numo::NArray[*r.to_a]).dot(Numo::NArray[*phi.to_matrix.to_a]).to_a].inverse
    tmp2 = Numo::NArray[*phi.to_matrix.transpose.to_a].dot(y - Numo::NArray[*t.transpose.to_a])
    w_new = w - Numo::NArray[*tmp1.to_a].dot(tmp2)

    # パラメータの変化が 0.1% 未満になったら終了
    if (w_new - w).transpose.dot(w_new - w).flatten[0] < (0.001 * (w.transpose.dot(w))).flatten[0]
      w = w_new
      break
    end
  end

  # 分類誤差の計算
  w0 = w[0]
  w1 = w[1]
  w2 = w[2]
  err = 0
  train_set.each_row do |point|
    x = point.x
    y = point.y
    type = point[:type]
    type = type * 2 - 1
    if type * (w0 + w1*x + w2*y) < 0
      err += 1
    end
  end

  err_rate = err * 100 / train_set.size

  # 結果を表示
  xmin = train_set.x.min - 5
  xmax = train_set.x.max + 10
  linex = Numo::NArray[*(xmin.to_i-5..xmax.to_i+4).to_a]
  liney = -linex * w1 / w2 - w0 / w2
  label = "ERR %.2f%%" % err_rate

  line_err = plot.add(:line, linex.cast_to(Numo::Int64).to_a, liney.cast_to(Numo::Int64).to_a)
  line_err.title(label)
  line_err.color('blue')
end

# パーセプトロン
def run_perceptron(train_set, plot)
  w0 = 0.0
  w1 = 0.0
  w2 = 0.0
  bias = 0.5 * (train_set.x.mean + train_set.y.mean)

  # Iterationを30回実施
  30.times do
    # 確率的勾配降下法によるパラメータの修正
    train_set.each_row do |point|
      x = point.x
      y = point.y
      type = point[:type]
      type = type * 2 - 1
      if type * (w0*bias + w1*x + w2*y) <= 0
        w0 += type * bias
        w1 += type * x
        w2 += type * y
      end
    end
  end

  # 分類誤差の計算
  err = 0
  train_set.each_row do |point|
    x = point.x
    y = point.y
    type = point[:type]
    type = type * 2 - 1
    if type * (w0*bias + w1*x + w2*y) <= 0
      err += 1
    end
  end

  err_rate = err * 100 / train_set.size

  # 結果を表示
  xmin = train_set.x.min - 5
  xmax = train_set.x.max + 10
  linex = Numo::NArray[*(xmin.to_i-5..xmax.to_i+4).to_a]
  liney = -linex * w1 / w2 - bias * w0 / w2
  label = "ERR %.2f%%" % err_rate

  line_err = plot.add(:line, linex.cast_to(Numo::Int64).to_a, liney.cast_to(Numo::Int64).to_a)
  line_err.title(label)
  line_err.color('red')
end

def run_simulation(variance, plot)
  train_set = prepare_dataset(variance)
  train_set1 = train_set.filter_rows {|row| row[:type] == 1 }
  train_set2 = train_set.filter_rows {|row| row[:type] == 0 }
  ymin = train_set.y.min - 5
  xmin = train_set.x.min - 5
  ymax = train_set.y.max + 10
  xmax = train_set.x.max + 10
  plot.configure do
    x_label("Variance: #{variance}")
    y_label('')
    xrange([xmin, xmax])
    yrange([ymin, ymax])
    legend(true)
    height(300)
    width(490)
  end

  scatter_true = plot.add_with_df(train_set1.to_nyaplotdf, :scatter, :x, :y)
  scatter_true.color('green')
  scatter_true.title('1')
  scatter_false = plot.add_with_df(train_set2.to_nyaplotdf, :scatter, :x, :y)
  scatter_false.color('orange')
  scatter_false.title('0')

  run_logistic(train_set, plot)
  run_perceptron(train_set, plot)
end

fig = Nyaplot::Frame.new

Variances.each do |variance|
  plot = Nyaplot::Plot.new
  run_simulation(variance, plot)
  fig.add(plot)
end

fig.show

 これを Jupyter Notebook 上で実行すると下記のようなグラフが表示されます。

f:id:akanuma-hiroaki:20170126073424p:plain

f:id:akanuma-hiroaki:20170126073442p:plain

 python版と比べるとロジスティック回帰の優位性が見えにくい結果になっている気がするので、どこか実装に間違いがある可能性もあるかなと思っていますが、ひとまず近い形のグラフの表示までは行けた形です。もし、ここが違っている、とお気付きの方はコメントいただけると嬉しいです。

 コードは下記にも公開しました。

github.com

パーセプトロンによる二項分類のコード

 今回は下記書籍のパーセプトロンによる二項分類のコードを Ruby で実装してみたいと思います。

ITエンジニアのための機械学習理論入門
https://www.amazon.co.jp/IT-ebook/dp/B016Q22IX2/www.amazon.co.jp

 サンプルコードはこちらで公開されています。

github.com

データセットの用意

 サンプルコードでは下記のようにデータセットを用意するためのコードが実装されています。

N1 = 20         # クラス t=+1 のデータ数
Mu1 = [15,10]   # クラス t=+1 の中心座標

N2 = 30         # クラス t=-1 のデータ数
Mu2 = [0,0]     # クラス t=-1 の中心座標

Variances = [15,30] # 両クラス共通の分散(2種類の分散で計算を実施)

# データセット {x_n,y_n,type_n} を用意
def prepare_dataset(variance):
    cov1 = np.array([[variance,0],[0,variance]])
    cov2 = np.array([[variance,0],[0,variance]])

    df1 = DataFrame(multivariate_normal(Mu1,cov1,N1),columns=['x','y'])
    df1['type'] = 1
    df2 = DataFrame(multivariate_normal(Mu2,cov2,N2),columns=['x','y'])
    df2['type'] = -1 
    df = pd.concat([df1,df2],ignore_index=True)
    df = df.reindex(np.random.permutation(df.index)).reset_index(drop=True)
    return df

 このコードの目的としては、クラス t = +1 のデータと t = -1 のデータが混在したデータセットを返すことで、 クラス t = +1 は {x, y} = {15, 10} を中心とし、クラス t = -1 は {x, y} = {0, 0} を中心とした、 分散15もしくは30の場合の二次元正規分布乱数のセットです。

 処理の内容としては、 multivariate_normal メソッドで二次元正規分布乱数の配列を生成して それぞれのクラス用のDataFrameを用意し、結合してランダムにシャッフルしてインデックスを振り直しています。

 multivariate_normal は平均の配列と共分散行列を引数に取り、多次元正規分布乱数を生成してくれます。

numpy.random.multivariate_normal — NumPy v1.11 Manual

 こちらも参考にしました。

3次元正規分布を Axes3D で描画-python | コード7区

 今回のケースでは平均の配列としてそれぞれのクラスの x, y の中心座標値を渡しており、 共分散行列としてはどちらも共通で [[variance,0],[0,variance]] という 2 x 2 行列を渡しています。 この場合対角成分であるvarianceはそれぞれxの分散、yの分散ということになり、 また、xyの共分散は0ということになりますのでxとyの間に相関はないということを表しているという理解です。

 最初は multivariate_normal に該当する処理をrubyで探したものの、ライブラリ等は見つからず、 自前実装にも multivariate_normal の内容の理解が足りなかったのですが、今回のケースであれば、 xyに相関はなく、xとyで独立して正規分布乱数を生成すれば問題ないのではないかと思ったので、 下記のようにデータセット生成処理を実装しました。

N1 = 20
Mu1 = [15, 10]

N2 = 30
Mu2 = [0, 0]

def normal_rand(mu = 0, sigma = 1.0)
  random = Random.new
  (Math.sqrt(-2 * Math.log(random.rand)) * Math.sin(2 * Math::PI * random.rand) * sigma) + mu
end

# データセット {x_n,y_n,type_n} を用意
def prepare_dataset(variance)
  sigma = Math.sqrt(variance)

  df1 = N1.times.map do
    [normal_rand(Mu1[0], sigma), normal_rand(Mu1[1], sigma)]
  end
  df1 = df1.transpose
  df1 = Daru::DataFrame.new(x: df1[0], y: df1[1], type: Array.new(N1).fill(1))

  df2 = N2.times.map do
    [normal_rand(Mu2[0], sigma), normal_rand(Mu2[1], sigma)]
  end
  df2 = df2.transpose
  df2 = Daru::DataFrame.new(x: df2[0], y: df2[1], type: Array.new(N2).fill(-1))

  df = df1.concat(df2)
  df = df.reindex(Daru::Index.new(df.index.to_a.shuffle))
  df[:index] = (N1 + N2).times.to_a
  df.set_index(:index)
end

 normal_rand メソッドは前回まででも実装した内容と同じで、正規分布の乱数を生成します。引数としては平均と標準偏差を取ります。 prepare_database メソッドに渡される variance は分散なので、平方根を求めて偏差 sigma として normal_rand に渡しています。

Perceptronのアルゴリズム(確率的勾配降下法)を実行

 該当メソッドのコードとしては下記のように実装しました。

まずはデータセットのプロット部分。

# Perceptronのアルゴリズム(確率的勾配降下法)を実行
def run_simulation(variance)
  data_graph = Nyaplot::Plot.new
  param_graph = Nyaplot::Plot.new

  train_set = prepare_dataset(variance)
  train_set1 = train_set.filter_rows {|row| row[:type] == 1 }
  train_set2 = train_set.filter_rows {|row| row[:type] == -1 }
  ymin = train_set.y.min - 5
  xmin = train_set.x.min - 5
  ymax = train_set.y.max + 10
  xmax = train_set.x.max + 10
  data_graph.configure do
    x_label('')
    y_label('')
    xrange([xmin, xmax])
    yrange([ymin, ymax])
    legend(true)
    height(300)
    width(490)
  end

  scatter_true = data_graph.add_with_df(train_set1.to_nyaplotdf, :scatter, :x, :y)
  scatter_true.color('green')
  scatter_true.title('1')
  scatter_false = data_graph.add_with_df(train_set2.to_nyaplotdf, :scatter, :x, :y)
  scatter_false.color('orange')
  scatter_false.title('-1')

 データセットから filter_rows メソッドでクラス t = 1 のセットとクラス t = -1 のセットに分け、 それぞれのセットのデータを一つの図にプロットしています。

 次に確率的勾配降下法による処理で30回のIterationを回して各パラメータを決定している部分です。

  # パラメータの初期値とbias項の設定
  w0 = 0.0
  w1 = 0.0
  w2 = 0.0
  bias = 0.5 * (train_set.x.mean + train_set.y.mean)
    
  # Iterationを30回実施
  paramhist = Daru::DataFrame.new(w0: [w0], w1: [w1], w2: [w2])
  10.times do
    train_set.each_row do |point|
      x = point.x
      y = point.y
      type = point[:type]
      if type * (w0*bias + w1*x + w2*y) <= 0
        w0 += type * bias
        w1 += type * x
        w2 += type * y
      end
    end
    paramhist.add_row(Daru::Vector.new([w0, w1, w2], index: [:w0, :w1, :w2]))
  end

 各パラメータの初期値を入れたDataFrameを用意しておき、Iterationごとにadd_rowで各パラメータ値をVectorとして追加しています。

 そして次にその結果について判定誤差の割合を計算します。

  # 判定誤差の計算
  err = 0
  train_set.each_row do |point|
    x = point.x
    y = point.y
    type = point[:type]
    if type * (w0*bias + w1*x + w2*y) <= 0
      err += 1
    end
  end
  
  err_rate = err * 100 / train_set.size

 決定されたパラメータでデータセットの各値に対して判定を行い、エラーになった数の割合を計算しています。

 そして最後にここまでの結果をグラフに表示します。

  # 結果の表示
  linex = Numo::NArray[*(xmin.to_i-5..xmax.to_i+4).to_a]
  liney = -linex * w1 / w2 - bias * w0 / w2
  label = "ERR %.2f%%" % err_rate
  line_err = data_graph.add(:line, linex.cast_to(Numo::Int64).to_a, liney.cast_to(Numo::Int64).to_a)
  line_err.title(label)
  line_err.color('red')
  line_w0 = param_graph.add(:line, paramhist.index.to_a, paramhist.w0)
  line_w0.title('w0')
  line_w0.color('blue')
  line_w1 = param_graph.add(:line, paramhist.index.to_a, paramhist.w1)
  line_w1.title('w1')
  line_w1.color('green')
  line_w2 = param_graph.add(:line, paramhist.index.to_a, paramhist.w2)
  line_w2.title('w2')
  line_w2.color('red')
  
  param_graph.configure do
    x_label('')
    y_label('')
    legend(true)
    height(300)
    width(490)
  end

  [data_graph, param_graph]
end

 元の python のコードでは、 linex を numpy.arange メソッドを使って numpy.ndarray クラスのオブジェクトとして取得しています。 numpy.ndarray は 多次元配列を扱うためのクラスです。

numpy.ndarray — NumPy v1.12 Manual

 ruby では同様のことを行うために、下記サイトを参考にして NArray を使用しました。

Numerical Ruby NArray

github.com

 linex を NArray のオブジェクトとして生成し、各パラメータ値による計算を行い、liney を求めています。

 NArrayではオブジェクトが生成されるときに各値のデータ型は自動的に判定されます。今回のケースでは Numo::Int32 として判定されたのですが、そのオブジェクトを to_a で通常の配列に変換すると正しく変換されなかったため、一旦 Numo::Int64 に変換した上で配列に変換しました。

irb(main):024:0* Numo::NArray[-3, -2, -1, 0, 1, 2, 3]
=> Numo::Int32#shape=[7]
[-3, -2, -1, 0, 1, 2, 3]
irb(main):025:0> Numo::NArray[-3, -2, -1, 0, 1, 2, 3].to_a
=> [4294967293, 4294967294, 4294967295, 0, 1, 2, 3]
irb(main):026:0> Numo::NArray[-3, -2, -1, 0, 1, 2, 3].cast_to(Numo::Int64).to_a
=> [-3, -2, -1, 0, 1, 2, 3]

スクリプト全体

 スクリプト全体としては下記のようになります。

require 'daru'
require 'nyaplot'
require 'numo/narray'

N1 = 20
Mu1 = [15, 10]

N2 = 30
Mu2 = [0, 0]

Variances = [15, 30]

class Array
  def mean
    self.inject(:+) / self.size.to_f
  end
end

def normal_rand(mu = 0, sigma = 1.0)
  random = Random.new
  (Math.sqrt(-2 * Math.log(random.rand)) * Math.sin(2 * Math::PI * random.rand) * sigma) + mu
end

# データセット {x_n,y_n,type_n} を用意
def prepare_dataset(variance)
  sigma = Math.sqrt(variance)

  df1 = N1.times.map do
    [normal_rand(Mu1[0], sigma), normal_rand(Mu1[1], sigma)]
  end
  df1 = df1.transpose
  df1 = Daru::DataFrame.new(x: df1[0], y: df1[1], type: Array.new(N1).fill(1))

  df2 = N2.times.map do
    [normal_rand(Mu2[0], sigma), normal_rand(Mu2[1], sigma)]
  end
  df2 = df2.transpose
  df2 = Daru::DataFrame.new(x: df2[0], y: df2[1], type: Array.new(N2).fill(-1))

  df = df1.concat(df2)
  df = df.reindex(Daru::Index.new(df.index.to_a.shuffle))
  df[:index] = (N1 + N2).times.to_a
  df.set_index(:index)
end

# Perceptronのアルゴリズム(確率的勾配降下法)を実行
def run_simulation(variance)
  data_graph = Nyaplot::Plot.new
  param_graph = Nyaplot::Plot.new

  train_set = prepare_dataset(variance)
  train_set1 = train_set.filter_rows {|row| row[:type] == 1 }
  train_set2 = train_set.filter_rows {|row| row[:type] == -1 }
  ymin = train_set.y.min - 5
  xmin = train_set.x.min - 5
  ymax = train_set.y.max + 10
  xmax = train_set.x.max + 10
  data_graph.configure do
    x_label('')
    y_label('')
    xrange([xmin, xmax])
    yrange([ymin, ymax])
    legend(true)
    height(300)
    width(490)
  end

  scatter_true = data_graph.add_with_df(train_set1.to_nyaplotdf, :scatter, :x, :y)
  scatter_true.color('green')
  scatter_true.title('1')
  scatter_false = data_graph.add_with_df(train_set2.to_nyaplotdf, :scatter, :x, :y)
  scatter_false.color('orange')
  scatter_false.title('-1')
  
  # パラメータの初期値とbias項の設定
  w0 = 0.0
  w1 = 0.0
  w2 = 0.0
  bias = 0.5 * (train_set.x.mean + train_set.y.mean)
    
  # Iterationを30回実施
  paramhist = Daru::DataFrame.new(w0: [w0], w1: [w1], w2: [w2])
  30.times do
    train_set.each_row do |point|
      x = point.x
      y = point.y
      type = point[:type]
      if type * (w0*bias + w1*x + w2*y) <= 0
        w0 += type * bias
        w1 += type * x
        w2 += type * y
      end
    end
    paramhist.add_row(Daru::Vector.new([w0, w1, w2], index: [:w0, :w1, :w2]))
  end
  
  # 判定誤差の計算
  err = 0
  train_set.each_row do |point|
    x = point.x
    y = point.y
    type = point[:type]
    if type * (w0*bias + w1*x + w2*y) <= 0
      err += 1
    end
  end
  
  err_rate = err * 100 / train_set.size
  
  # 結果の表示
  linex = Numo::NArray[*(xmin.to_i-5..xmax.to_i+4).to_a]
  liney = -linex * w1 / w2 - bias * w0 / w2
  label = "ERR %.2f%%" % err_rate
  line_err = data_graph.add(:line, linex.cast_to(Numo::Int64).to_a, liney.cast_to(Numo::Int64).to_a)
  line_err.title(label)
  line_err.color('red')
  line_w0 = param_graph.add(:line, paramhist.index.to_a, paramhist.w0)
  line_w0.title('w0')
  line_w0.color('blue')
  line_w1 = param_graph.add(:line, paramhist.index.to_a, paramhist.w1)
  line_w1.title('w1')
  line_w1.color('green')
  line_w2 = param_graph.add(:line, paramhist.index.to_a, paramhist.w2)
  line_w2.title('w2')
  line_w2.color('red')
  
  param_graph.configure do
    x_label('')
    y_label('')
    legend(true)
    height(300)
    width(490)
  end

  [data_graph, param_graph]
end

fig = Nyaplot::Frame.new

Variances.each do |variance|
  data_graph, param_graph = run_simulation(variance)
  fig.add(data_graph)
  fig.add(param_graph)
end

fig.show

 これを Jupyter Notebook 上で実行すると下記のようなグラフが表示されます。

f:id:akanuma-hiroaki:20170118083001p:plain

 書籍のサンプルと近い形になっているのであっていそうな気はしていますが、今回は特に理解が怪しいので、間違いなどありましたらご指摘いただければと思います。

 コードは下記にも公開しました。

github.com

推定量の一致性と不偏性の確認コード

 今回は下記書籍の推定量の一致性と不偏性の確認のサンプルコードをRubyで実装してみたいと思います。

ITエンジニアのための機械学習理論入門 | 中井悦司 | 工学 | Kindleストア | Amazon

 サンプルコードはこちらで公開されています。

github.com

算術平均メソッド

 RubyではArrayの平均を計算するメソッドが用意されていないので、配列内の値を合計したものを要素数で割って平均を取得するのですが、今回は平均を使う回数が多かったので、Arrayクラスのメソッドとして実装しました。

class Array
  def mean
    self.inject(:+) / self.size.to_f
  end
end

 Arrayの中身が数値であること前提の実装ですが、今回の用途としてはこれで十分なので最低限の実装にしてあります。

配列データの取り出し

 Pythonのコードでは下記のように実装されています。

raw_linex[0:-1:50]

 これは、下記のような指定になっています。

配列名[始点インデックス:終端インデックス:ステップ]

 インデックスにマイナスの値を指定すると配列末尾からの指定になりますので、「raw_linexという配列の先頭から末尾までを50ステップごとに取り出す」という指定になります。

 Ruby では上記のようなシンタックスは用意されていないようだったので、下記サイトを参考に、Array#values_at を使用して実装しました。

stackoverflow.com

def thin_out_data(array)
  array.values_at(*(0...array.size-1).step(50))
end

 Rangeとして配列の先頭から末尾のインデックスになるように指定し、step で50おきに取り出し、アスタリスク付きでメソッドに渡すことでそれを展開した形で渡しています。

スクリプト全体

 ここまでの内容を踏まえて、スクリプト全体としては下記のように実装しました。

# 推定量の一致性と不偏性の確認

require 'nyaplot'

class Array
  def mean
    self.inject(:+) / self.size.to_f
  end
end

def normal_rand(mu = 0, sigma = 1.0)
  random = Random.new
  (Math.sqrt(-2 * Math.log(random.rand)) * Math.sin(2 * Math::PI * random.rand) * sigma) + mu
end

def thin_out_data(array)
  array.values_at(*(0...array.size-1).step(50))
end

def draw_plot(linex1, liney1, linex2, liney2, label, ylim)
  plot = Nyaplot::Plot.new
  df = Nyaplot::DataFrame.new(x: linex1, y: liney1)
  scatter = plot.add_with_df(df, :scatter, :x, :y)
  scatter.color('blue')
  scatter.title('data')
  line = plot.add(:line, linex2, liney2)
  line.color('red')
  line.title('mean')

  plot.configure do
    x_label(label)
    y_label('')
    xrange([linex1.min, linex1.max + 1])
    yrange(ylim)
    legend(true)
  end
end

mean_linex = []
mean_mu = []
mean_s2 = []
mean_u2 = []
raw_linex = []
raw_mu = []
raw_s2 = []
raw_u2 = []

(2..50).each do |n| # 観測データ数Nを変化させて実行
  2000.times do # 特定のNについて2000回の推定を繰り返す
    ds = n.times.map { normal_rand }
    raw_mu << ds.mean
    sum_of_squares = ds.inject(0) {|sum, i| sum + (i - ds.mean) ** 2 }
    var = sum_of_squares / ds.size.to_f
    raw_s2 << var
    raw_u2 << var * n / (n - 1)
    raw_linex << n
  end

  mean_mu << raw_mu.mean # 標本平均の平均
  mean_s2 << raw_s2.mean # 標本分散の平均
  mean_u2 << raw_u2.mean # 不偏分散の平均
  mean_linex << n
end

# プロットデータを40個に間引きする
raw_linex = thin_out_data(raw_linex)
raw_mu = thin_out_data(raw_mu)
raw_s2 = thin_out_data(raw_s2)
raw_u2 = thin_out_data(raw_u2)

fig = Nyaplot::Frame.new

# 標本平均の結果表示
plot = draw_plot(raw_linex, raw_mu, mean_linex, mean_mu, 'Sample mean', [-1.5, 1.5])
fig.add(plot)

# 標本分散の結果表示
plot = draw_plot(raw_linex, raw_s2, mean_linex, mean_s2, 'Sample variance', [-0.5, 3.0])
fig.add(plot)

# 不偏分散の結果表示
plot = draw_plot(raw_linex, raw_u2, mean_linex, mean_u2, 'Unbiased variance', [-0.5, 3.0])
fig.add(plot)

fig.show

 これを Jupyter Notebook 上で実行すると下記のようなグラフが描画されます。

f:id:akanuma-hiroaki:20170106072922p:plain

f:id:akanuma-hiroaki:20170106072939p:plain

f:id:akanuma-hiroaki:20170106072953p:plain

 コードは下記にも公開してあります。

github.com

最尤推定による正規分布の推定コード

 今回は下記書籍の最尤推定による正規分布の推定のサンプルコードをRubyで実装してみたいと思います。

ITエンジニアのための機械学習理論入門

 サンプルコードはこちらで公開されています。

github.com

算術平均

 python では numpy.mean を使って配列に含まれる値の算術平均を取得できます。

numpy.mean
https://docs.scipy.org/doc/numpy/reference/generated/numpy.mean.html

>>> array
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> numpy.mean(array)
5.5

 ruby では同様のメソッドがみつからなかったので、配列内の値を合計したものを要素数で割って平均を取得します。

irb(main):020:0* array                              
=> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
irb(main):021:0> array.inject(:+) / array.size.to_f
=> 5.5

分散

 python で分散を算出するには numpy.var を使用します。

numpy.var
https://docs.scipy.org/doc/numpy/reference/generated/numpy.var.html

>>> array
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> numpy.var(array)
8.25                

 こちらも ruby ではメソッドがみつからなかったので、下記サイトを参考にさせていただきました。

Rubyで標本の分散・標準偏差・変動係数算出まで | WEBサービス創造記

 平均、偏差平方和を算出し、要素数で割って分散を算出しています。

irb(main):044:0* array
=> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
irb(main):045:0> mean = array.inject(:+) / array.size.to_f                         
=> 5.5
irb(main):046:0> sum_of_squares = array.inject(0) {|sum, i| sum + (i - mean) ** 2} 
=> 82.5
irb(main):047:0> sum_of_squares / array.size.to_f                                  
=> 8.25

等間隔の数値配列生成

 ある一定範囲内で、指定したステップごとの数値の配列の取得は、 python では numpy.arrange を利用します。

numpy.arange
https://docs.scipy.org/doc/numpy/reference/generated/numpy.arange.html

>>> numpy.arange(-10, 10, 2)                             
array([-10,  -8,  -6,  -4,  -2,   0,   2,   4,   6,   8])

 ruby では範囲オブジェクトと step メソッドを利用して生成します。

irb(main):059:0* s = 2
=> 2
irb(main):060:0> (-10..10-s).step(s).to_a
=> [-10, -8, -6, -4, -2, 0, 2, 4, 6, 8]

確率密度関数

 python では確率密度関数の利用は scipy.stats.norm から行います。

scipy.stats.norm
https://docs.scipy.org/doc/scipy-0.16.1/reference/generated/scipy.stats.norm.html

>>> from scipy.stats import norm
>>> orig = norm(loc=0, scale=1)
>>> orig                                                              
<scipy.stats._distn_infrastructure.rv_frozen object at 0x7f095ae1b950>
>>> linex = numpy.arange(-10, 10, 2)                     
>>> linex                                                
array([-10,  -8,  -6,  -4,  -2,   0,   2,   4,   6,   8])
>>> orig.pdf(linex)                                         
array([  7.69459863e-23,   5.05227108e-15,   6.07588285e-09,
         1.33830226e-04,   5.39909665e-02,   3.98942280e-01,
         5.39909665e-02,   1.33830226e-04,   6.07588285e-09,
         5.05227108e-15])                                   

 ruby では rubystats という gem があったので、そちらを使ってみます。

github.com

Distribution::Normal というのもあるのですが、こちらは平均と標準偏差がそれぞれ 0, 1 のケースにしか適用できなかったので、 rubystats を使うことにしました。

Distribution http://www.rubydoc.info/github/sciruby/distribution

irb(main):002:0* require 'rubystats/normal_distribution'
=> true
irb(main):003:0> norm = Rubystats::NormalDistribution.new(0, 1)
=> #<Rubystats::NormalDistribution:0x007f6dfa935930 @mean=0, @stdev=1, @variance=1, @pdf_denominator=2.5066282746310007, @cdf_denominator=1.4142135623730951, @use_last=nil>
irb(main):004:0> s = 2
=> 2
irb(main):005:0> linex = (-10..10-s).step(s).to_a
=> [-10, -8, -6, -4, -2, 0, 2, 4, 6, 8]
irb(main):006:0> linex.map {|x| norm.pdf(x) }
=> [7.694598626706419e-23, 5.052271083536892e-15, 6.075882849823285e-09, 0.00013383022576488534, 0.05399096651318805, 0.39894228040143265, 0.05399096651318805, 0.00013383022576488534, 6.075882849823285e-09, 5.052271083536892e-15]

スクリプト全体

 ここまでの内容を踏まえて、スクリプト全体としては下記のように実装しました。

require 'nyaplot'
require 'rubystats/normal_distribution'

def normal_rand(mu = 0, sigma = 1.0)
  random = Random.new
  (Math.sqrt(-2 * Math.log(random.rand)) * Math.sin(2 * Math::PI * random.rand) * sigma) + mu
end

fig = Nyaplot::Frame.new

[2,4,10,100].each do |datapoints|
  ds = datapoints.times.map { normal_rand }

  mu = ds.inject(:+) / ds.size.to_f
  sum_of_squares = ds.inject(0) {|sum, i| sum + (i - mu) ** 2} 
  var = sum_of_squares / ds.size.to_f
  sigma = Math.sqrt(var)

  plot = Nyaplot::Plot.new

  s = 0.1
  linex = (-10..10-s).step(s).to_a
  
  # 真の曲線を表示
  orig = Rubystats::NormalDistribution.new(0, 1)  
  orig_pdfs = linex.map {|x| orig.pdf(x) }
  line = plot.add(:line, linex, orig_pdfs)
  line.color('green')
  line.title('Original')

  # 推定した曲線を表示
  est = Rubystats::NormalDistribution.new(mu, Math.sqrt(sigma))
  est_pdfs = linex.map {|x| est.pdf(x)}
  line = plot.add(:line, linex, est_pdfs)
  line.color('red')
  line.title("Sigma=#{sprintf("%.2f", sigma)}")
  
  # サンプルの表示
  df = Nyaplot::DataFrame.new({x: ds,y: ds.map {|x| orig.pdf(x)}})
  scatter = plot.add_with_df(df, :scatter, :x, :y)
  scatter.color('blue')
  scatter.title('Sample')

  fig.add(plot)
  
  plot.configure do
    x_label("N=#{datapoints}")
    y_label('')
    xrange([-4, 4])
    legend(true)
    height(300)
    width(490)
  end
end

fig.show

 実行すると Jupyter Notebook で実行すると下記のようなグラフが表示されます。

f:id:akanuma-hiroaki:20161230132326p:plain

 コードは下記にも公開しています。

github.com

Amazon Dash Button を押したら Slack にポストする(RxRuby版)

 前回の投稿で、 Amazon Dash Button を押したら Slack にポストする記事を書いたのですが、Facebookでシェアしたところ、「イベントハンドリングのところで、RxRubyあたりを使うとクールに実装できそうです。」というコメントをいただいたので、RxRubyを使う版を試してみました。

RxRuby

 RxRuby は Reactive Extensions の Ruby用ライブラリです。最終の更新からしばらく経っているようですが、使わせてもらうことにしました。

github.com

 ググってみたところ、RxRubyについてはあまり記事は多くなかったのですが、こちらのブログを参考にさせていただきました。

qiita.com

 Reactive Extensions については私も今回初めて調べたので、詳しいことは語れませんが、GoFのデザインパターンのオブザーバーパターンを表したもので、非同期にイベント駆動で処理を行うためのモデルと解釈しています。詳しい内容についてはReactive Extensionsの公式ページを参照していただくと良いかと思います。

ReactiveX

Dash Button のMACアドレス確認

 やろうとしていることは前回記事と同じですが、RxRubyを使うようにコードを変更してみました。

capture_rx.rb

require 'packetfu'
require 'rx'
require './ouis.rb'
include PacketFu

class Rx::BehaviorSubject
  public :check_unsubscribed
end

def get_capture(iface)
  subject = Rx::Subject.new
  subject.select {|pkt| dash_packet?(pkt) }.subscribe(
    lambda {|pkt| capture(pkt) },
    lambda {|err| puts "Error: #{err}" },
    lambda { puts 'Completed.' }
  )

  cap = Capture.new(iface: iface, start: true)
  cap.stream.each do |pkt|
    subject.on_next pkt
  end
end

def dash_packet?(pkt)
  return false unless EthPacket.can_parse?(pkt)
  get_vendor_name(EthHeader.str2mac(EthPacket.parse(pkt).eth_src)).downcase.include?('amazon')
end

def capture(pkt)
  time = Time.now.strftime("%Y-%m-%d %H:%M:%S.%6N")

  if UDPPacket.can_parse?(pkt)
    packet = UDPPacket.parse(pkt)
    src_ip = IPHeader.octet_array(packet.ip_src).join('.')
    dst_ip = IPHeader.octet_array(packet.ip_dst).join('.')
    protocol = 'udp'
  elsif ARPPacket.can_parse?(pkt)
    packet = ARPPacket.parse(pkt)
    src_ip = packet.arp_saddr_ip
    dst_ip = packet.arp_daddr_ip
    protocol = 'arp'
  else
    return
  end

  src_mac, dst_mac, vendor_name = get_common_values(packet)
  output(time, src_mac, dst_mac, src_ip, dst_ip, protocol, vendor_name)
end

def output(time, src_mac, dst_mac, src_ip, dst_ip, protocol, vendor_name)
  puts "time:#{time}, src_mac:#{src_mac}, dst_mac:#{dst_mac}, src_ip:#{src_ip}, dst_ip:#{dst_ip}, protocol:#{protocol}, vendor:#{vendor_name}"
end

def get_common_values(packet)
  src_mac = EthHeader.str2mac(packet.eth_src)
  dst_mac = EthHeader.str2mac(packet.eth_dst)
  vendor_name = get_vendor_name(src_mac)
  return src_mac, dst_mac, vendor_name
end

def get_vendor_name(mac)
  return '' if mac.nil?
  oui = mac.split(':').slice(0, 3).join('-')
  OUIS[oui.upcase]
end

if $0 == __FILE__
  iface = ARGV[0]
  puts "Capturing for interface: #{iface}"
  get_capture(iface)
end

 キャプチャしたパケットをRxRubyで作ったストリームに放り込み、selectでダッシュボタンからのパケットのみフィルタリングしています。パケットの内容を出力する処理は subscribe で渡す lambda の中に定義しました。

Dash Button からの通信を検知して Slack に投稿する

 こちらもやろうとしてることは前回と同じですが、RxRubyを使うように変更しています。

post_to_slack_rx.rb

require 'packetfu'
require 'open3'
require 'json'
require 'rx'
include PacketFu

FILTER = nil
MAC_OF_DASH = 'Dash Button の MACアドレス'
SLACK_API_URL = 'Slack Incoming Webhook URL'
INTERVAL_SECONDS = 2

class Rx::BehaviorSubject
  public :check_unsubscribed
end

def get_capture(iface)
  subject = Rx::Subject.new
  subject.select {|pkt| target_dash_pushed?(pkt) }.debounce(INTERVAL_SECONDS).subscribe(
    lambda {|pkt| post_to_slack },
    lambda {|err| puts "Error: #{err}" },
    lambda { puts 'Completed.' }
  )

  cap = Capture.new(iface: iface, filter: FILTER, start: true)
  cap.stream.each do |pkt|
    subject.on_next pkt
  end
end

def target_dash_pushed?(pkt)
  return false unless EthPacket.can_parse?(pkt)
  EthHeader.str2mac(EthPacket.parse(pkt).eth_src) == MAC_OF_DASH
end

def post_to_slack
  api_url = SLACK_API_URL
  payload = {
    channel:    '#akanuma_private',
    username:   'dash',
    icon_emoji: ':squirrel:',
    text:       'Hello World from Dash Button!!'
  }
  command = "curl -X POST --data-urlencode 'payload=#{payload.to_json}' #{api_url}"
  puts command
  output, std_error, status = Open3.capture3(command)
  puts output
  puts std_error
  puts status

  t_stamp = Time.now.strftime("%Y-%m-%d %H:%M:%S.%6N")
  puts "#{t_stamp} Posted to Slack."
end

if $0 == __FILE__
  iface = ARGV[0]
  puts "Capturing for interface: #{iface}"
  get_capture(iface)
end

 キャプチャしたパケットをRxRubyで作ったストリームに放り込み、selectでターゲットのダッシュボタンからのパケットのみフィルタリングしています。そして debounce で指定した時間(今回は2秒間)よりも短い間隔のパケットをフィルターしています。前回のコードだと最後の処理の時間を記録してその差分をチェックする部分を自前で用意していましたが、 debounce を利用することでシンプルに実装することができました。

 まだまだ Reactive Extensions は使い始めたばかりですが、うまく使えばかなりシンプルに処理を実装することができそうなので、今後使い方を身につけていきたいと思います。

Amazon Dash Button を押したら Slack にポストする

 12月5日に日本でも Amazon Dash Button の提供が始まりましたが、すぐに下記の記事を投稿されている方がいて、面白そうだったので私もやってみました。

qiita.com

f:id:akanuma-hiroaki:20161217181325j:plain:w300:left

 他にもすでに同様のことを行われている記事はあるのですが、パケットキャプチャ部分を Ruby で書いている記事は見つからなかったので、 Ruby で書いてみました。ボタンの設定は上記ページなどですでに詳しく紹介されていて、同様に行いましたのでここでは割愛します。

 ちなみに本当は AWS IoT Button を使ってみたいのですが、まだ日本では発売されていないので、ひとまず Dash Button で我慢します。

AWS IoT ボタン - AWS IoT | AWS

処理の流れ

 先ほどの記事でも紹介されていますが、 Dash Button が押された時の処理の流れは下記のようになります。

  1. 電源ON(通常時はOFFになっている)
  2. 設定されたWi-Fiネットワークに接続
  3. IP重複検知のために ARP Probe もしくは UDP Broadcast 送信
  4. Amazonへの購入処理実行
  5. 電源OFF

 試しにボタンのIPアドレスにpingしてみたところ、ボタンを押すと電源が入ってpingが返ってくるようになり、処理が終わるとまた電源がOFFになってpingが返らなくなりました。

 $ ping 192.168.10.10
PING 192.168.10.10 (192.168.10.10): 56 data bytes
Request timeout for icmp_seq 0
Request timeout for icmp_seq 1
Request timeout for icmp_seq 2
Request timeout for icmp_seq 3
ping: sendto: No route to host
Request timeout for icmp_seq 4
ping: sendto: Host is down
Request timeout for icmp_seq 5
64 bytes from 192.168.10.10: icmp_seq=0 ttl=64 time=6763.680 ms
64 bytes from 192.168.10.10: icmp_seq=1 ttl=64 time=5758.602 ms
64 bytes from 192.168.10.10: icmp_seq=2 ttl=64 time=4753.781 ms
64 bytes from 192.168.10.10: icmp_seq=3 ttl=64 time=3752.679 ms
64 bytes from 192.168.10.10: icmp_seq=4 ttl=64 time=2749.437 ms
64 bytes from 192.168.10.10: icmp_seq=7 ttl=64 time=4.412 ms
64 bytes from 192.168.10.10: icmp_seq=8 ttl=64 time=6.484 ms
64 bytes from 192.168.10.10: icmp_seq=9 ttl=64 time=2.491 ms
64 bytes from 192.168.10.10: icmp_seq=10 ttl=64 time=3.172 ms
Request timeout for icmp_seq 15
Request timeout for icmp_seq 16
Request timeout for icmp_seq 17
Request timeout for icmp_seq 18

 Dash Button では AWS IoT Button のような設定の自由度はなく、購入処理のための通信を検知して処理を行う必要があります。そこで大きく分けて下記2つの内容を実装します。

  1. Dash Button の ARP Probe もしくは UDP Broadcast をキャプチャしてMACアドレスを確認する

  2. Dash Button のMACアドレスからの通信を検知したら目的の処理を行う

Dash Button のMACアドレス確認

 Ruby でのパケットキャプチャ用ライブラリを探したところ、 PacketFu というライブラリがあったのでこちらを使用してみます。

github.com

 実装コードは下記の通りです。

capture.rb

require 'packetfu'
require './ouis.rb'
include PacketFu

def get_capture(iface)
  cap = Capture.new(iface: iface, start: true)
  cap.stream.each do |pkt|
    time = Time.now.strftime("%Y-%m-%d %H:%M:%S.%6N")

    if UDPPacket.can_parse?(pkt)
      udp_packet = UDPPacket.parse(pkt)
      src_ip = IPHeader.octet_array(udp_packet.ip_src).join('.')
      dst_ip = IPHeader.octet_array(udp_packet.ip_dst).join('.')
      src_port = udp_packet.udp_src
      dst_port = udp_packet.udp_dst
      src_mac, dst_mac, vendor_name = get_common_values(udp_packet)
      puts "time:#{time}, src_mac:#{src_mac}, dst_mac:#{dst_mac}, src_ip:#{src_ip}, dst_ip:#{dst_ip}, src_port:#{src_port}, dst_port:#{dst_port}, protocol:udp, vendor: #{vendor_name}"
      next
    end

    if ARPPacket.can_parse?(pkt)
      arp_packet = ARPPacket.parse(pkt)
      src_ip = arp_packet.arp_saddr_ip
      dst_ip = arp_packet.arp_daddr_ip
      src_mac, dst_mac, vendor_name = get_common_values(arp_packet)
      puts "time:#{time}, src_mac:#{src_mac}, dst_mac:#{dst_mac}, src_ip:#{src_ip}, dst_ip:#{dst_ip}, protocol:arp, vendor: #{vendor_name}"
    end
  end
end

def get_common_values(packet)
  src_mac = EthHeader.str2mac(packet.eth_src)
  dst_mac = EthHeader.str2mac(packet.eth_dst)
  vendor_name = get_vendor_name(src_mac)
  return src_mac, dst_mac, vendor_name
end

def get_vendor_name(mac)
  oui = mac.split(':').slice(0, 3).join('-')
  OUIS[oui.upcase]
end

if $0 == __FILE__
  iface = ARGV[0]
  puts "Capturing for interface: #{iface}"
  get_capture(iface)
end

 キャプチャしたパケットが ARPパケットもしくはUDPパケットであれば、パケットを解析して必要な情報を抜き出して画面に出力しています。

 また、Dash Button 以外からのパケットの情報も出力されるため、MACアドレスのベンダーコードからどのベンダーの端末からのパケットかも出すようにしています。あらかじめ IEEE のサイトからベンダーコードのリストを取得し、hash 形式で ouis.rb に保持し、 require して使っています。

IEEEのベンダーコードリスト(重いです)
http://standards.ieee.org/regauth/oui/oui_public.txt

ouis.rb

# http://standards.ieee.org/regauth/oui/oui_public.txt をもとに作成
OUIS = {
  'E0-43-DB' => "Shenzhen ViewAt Technology Co.,Ltd. ",
  '24-05-F5' => "Integrated Device Technology (Malaysia) Sdn. Bhd.",
  '2C-30-33' => "NETGEAR",
  '3C-D9-2B' => "Hewlett Packard",
  '9C-8E-99' => "Hewlett Packard",
  'B4-99-BA' => "Hewlett Packard",
  '1C-C1-DE' => "Hewlett Packard",
  '3C-35-56' => "Cognitec Systems GmbH",
〜〜〜以下略〜〜〜

 そしてスクリプトを実行して Dash Button を押すと下記のような出力が確認できました。

$ sudo ruby capture.rb en0 | grep -i amazon
/Users/akanuma/workspace/dash_button/ouis.rb:8296: warning: key "00-01-C8" is duplicated and overwritten on line 8309
/Users/akanuma/workspace/dash_button/ouis.rb:12779: warning: key "08-00-30" is duplicated and overwritten on line 17381
/Users/akanuma/workspace/dash_button/ouis.rb:12779: warning: key "08-00-30" is duplicated and overwritten on line 22049
time:2016-12-17 20:52:06.677957, src_mac:88:71:e5:f0:89:71, dst_mac:ff:ff:ff:ff:ff:ff, src_ip:0.0.0.0, dst_ip:255.255.255.255, src_port:68, dst_port:67, protocol:udp, vendor: Amazon Technologies Inc.
time:2016-12-17 20:52:06.689735, src_mac:88:71:e5:f0:89:71, dst_mac:ff:ff:ff:ff:ff:ff, src_ip:192.168.10.10, dst_ip:192.168.10.1, protocol:arp, vendor: Amazon Technologies Inc.

 ベンダーコードのリストで同一のベンダーコードが微妙に異なるベンダー名表記で複数回登録されているために warning が出ていますが、処理には影響ないので今回は無視します。ベンダー名が Amazon Technologies Inc. となっているのが Dash Button です。 src_mac として出力している内容が Dash Button のMACアドレスになりますので、このMACアドレスからの通信を検知して処理を行うスクリプトを実装します。

Dash Button からの通信を検知して Slack に投稿する

 通信を検知するための処理は基本的にMACアドレスを確認するためのスクリプトと同様になります。パケットの送信元MACアドレスが Dash Button のものだったらSlackに投稿する処理を実行します。

post_to_slack.rb

require 'packetfu'
require 'open3'
require 'json'
include PacketFu

FILTER = nil
MAC_OF_DASH = '88:71:e5:f0:89:71'
SLACK_API_URL = 'Slack の Incoming Webhook URL'
INTERVAL_MICRO_SECONDS = 2_000_000

@last_processed_time = 0

def get_capture(iface)
  cap = Capture.new(iface: iface, filter: FILTER, start: true)
  cap.stream.each do |pkt|
    next if !ARPPacket.can_parse?(pkt) && !UDPPacket.can_parse?(pkt)

    packet = Packet.parse(pkt)
    next if EthHeader.str2mac(packet.eth_src) != MAC_OF_DASH
    next if !past_since_last_processed?

    post_to_slack
    @last_processed_time = current_unixtime_with_micro_seconds

    t_stamp = Time.now.strftime("%Y-%m-%d %H:%M:%S.%6N")
    puts "#{t_stamp} Posted to Slack."
  end
end

def current_unixtime_with_micro_seconds
  now = Time.now
  "#{now.to_i}#{now.usec}".to_i
end

def past_since_last_processed?
  current_unixtime_with_micro_seconds - @last_processed_time >= INTERVAL_MICRO_SECONDS
end

def post_to_slack
  api_url = SLACK_API_URL
  payload = {
    channel:    '#akanuma_private',
    username:   'dash',
    icon_emoji: ':squirrel:',
    text:       'Hello World from Dash Button!!'
  }
  command = "curl -X POST --data-urlencode 'payload=#{payload.to_json}' #{api_url}"
  puts command
  output, std_error, status = Open3.capture3(command)
  puts output
  puts std_error
  puts status
end

if $0 == __FILE__
  iface = ARGV[0]
  puts "Capturing for interface: #{iface}"
  get_capture(iface)
end

 色々試してみたところ、 自宅ではボタンを押すと ARP と UDP のパケットが一度ずつ飛ぶのですが、会社のネットワークで試すと ARP や UDP のパケットが飛ぶタイミングを特定しきれず、ボタンを押した時に ARP が飛んだり飛ばなかったり、 UDP が複数回飛んだりするので、とりあえずの策として、ARPとUDP両方を検知して、前回の実行から2,000,000マイクロ秒(2秒)以内だったら実行しないようにしました。他の方の記事ではあまり回数がブレるというのはなさそうに見えるので、もう少し調べてみたいところです。この辺り詳しい方いたら教えていただけると嬉しいです。

 Slackへの投稿は Open3 を使って Incoming Webhook のURLにポストしているだけのシンプルな内容です。これを実行すると下記のようにSlackに投稿されます。

f:id:akanuma-hiroaki:20161217212950p:plain

 今回はSlackに投稿しましたが、この内容さえ変えればボタンが押された時の処理は自由に変更できるので、アイディア次第で色々面白いことができそうに思っています。社内で使うちょっとしたツールなど色々試してみたいと思います。

LAN内にサーバが必要

 今回はとりあえず動かしてみたということで、自分のMac上でスクリプトを動かしましたが、常時使うためのツールを作るのであれば、LAN内にサーバを用意する必要があります。これが AWS IoT Button であれば直接AWSのサービスと連携できると思うので、早く日本でも使えるようになってほしいですね。

Vagrant で稼働させたVM上で Dash Button のパケットをキャプチャできなかった

 今回試す環境を最初は Vagrant でパブリックネットワーク設定のVMを作ってVM上で動かしていたのですが、VM上では Dash Button からのパケットが検知できませんでした。たまに検知できるときもあるんですが、検知できない場合が多いです。ただ、 冒頭で紹介した記事で紹介されている node.js でのやり方だとちゃんと検知できていますし、 tcpdump でパケットを見てみると、Macのローカル上でもVM上でも同様に検知できているので、PacketFuで解析している部分が何か違うのかなと思い色々調べてみましたが、特定できませんでした。VM上へはMacのネットワークインタフェースを経由して到達していると思うので、経由したIFの情報を送信元として認識しているような感じではあるのですが、この辺りも何かわかる方いたら情報いただけると嬉しいです。

今回のコード

今回のコードはこちらに公開しました。

github.com