Subscribed unsubscribe Subscribe Subscribe

Amazon Dash Button を押したら Slack にポストする(RxRuby版)

IoT Ruby Tool Slack Bot Rx

 前回の投稿で、 Amazon Dash Button を押したら Slack にポストする記事を書いたのですが、Facebookでシェアしたところ、「イベントハンドリングのところで、RxRubyあたりを使うとクールに実装できそうです。」というコメントをいただいたので、RxRubyを使う版を試してみました。

RxRuby

 RxRuby は Reactive Extensions の Ruby用ライブラリです。最終の更新からしばらく経っているようですが、使わせてもらうことにしました。

github.com

 ググってみたところ、RxRubyについてはあまり記事は多くなかったのですが、こちらのブログを参考にさせていただきました。

qiita.com

 Reactive Extensions については私も今回初めて調べたので、詳しいことは語れませんが、GoFのデザインパターンのオブザーバーパターンを表したもので、非同期にイベント駆動で処理を行うためのモデルと解釈しています。詳しい内容についてはReactive Extensionsの公式ページを参照していただくと良いかと思います。

ReactiveX

Dash Button のMACアドレス確認

 やろうとしていることは前回記事と同じですが、RxRubyを使うようにコードを変更してみました。

capture_rx.rb

require 'packetfu'
require 'rx'
require './ouis.rb'
include PacketFu

class Rx::BehaviorSubject
  public :check_unsubscribed
end

def get_capture(iface)
  subject = Rx::Subject.new
  subject.select {|pkt| dash_packet?(pkt) }.subscribe(
    lambda {|pkt| capture(pkt) },
    lambda {|err| puts "Error: #{err}" },
    lambda { puts 'Completed.' }
  )

  cap = Capture.new(iface: iface, start: true)
  cap.stream.each do |pkt|
    subject.on_next pkt
  end
end

def dash_packet?(pkt)
  return false unless EthPacket.can_parse?(pkt)
  get_vendor_name(EthHeader.str2mac(EthPacket.parse(pkt).eth_src)).downcase.include?('amazon')
end

def capture(pkt)
  time = Time.now.strftime("%Y-%m-%d %H:%M:%S.%6N")

  if UDPPacket.can_parse?(pkt)
    packet = UDPPacket.parse(pkt)
    src_ip = IPHeader.octet_array(packet.ip_src).join('.')
    dst_ip = IPHeader.octet_array(packet.ip_dst).join('.')
    protocol = 'udp'
  elsif ARPPacket.can_parse?(pkt)
    packet = ARPPacket.parse(pkt)
    src_ip = packet.arp_saddr_ip
    dst_ip = packet.arp_daddr_ip
    protocol = 'arp'
  else
    return
  end

  src_mac, dst_mac, vendor_name = get_common_values(packet)
  output(time, src_mac, dst_mac, src_ip, dst_ip, protocol, vendor_name)
end

def output(time, src_mac, dst_mac, src_ip, dst_ip, protocol, vendor_name)
  puts "time:#{time}, src_mac:#{src_mac}, dst_mac:#{dst_mac}, src_ip:#{src_ip}, dst_ip:#{dst_ip}, protocol:#{protocol}, vendor:#{vendor_name}"
end

def get_common_values(packet)
  src_mac = EthHeader.str2mac(packet.eth_src)
  dst_mac = EthHeader.str2mac(packet.eth_dst)
  vendor_name = get_vendor_name(src_mac)
  return src_mac, dst_mac, vendor_name
end

def get_vendor_name(mac)
  return '' if mac.nil?
  oui = mac.split(':').slice(0, 3).join('-')
  OUIS[oui.upcase]
end

if $0 == __FILE__
  iface = ARGV[0]
  puts "Capturing for interface: #{iface}"
  get_capture(iface)
end

 キャプチャしたパケットをRxRubyで作ったストリームに放り込み、selectでダッシュボタンからのパケットのみフィルタリングしています。パケットの内容を出力する処理は subscribe で渡す lambda の中に定義しました。

Dash Button からの通信を検知して Slack に投稿する

 こちらもやろうとしてることは前回と同じですが、RxRubyを使うように変更しています。

post_to_slack_rx.rb

require 'packetfu'
require 'open3'
require 'json'
require 'rx'
include PacketFu

FILTER = nil
MAC_OF_DASH = 'Dash Button の MACアドレス'
SLACK_API_URL = 'Slack Incoming Webhook URL'
INTERVAL_SECONDS = 2

class Rx::BehaviorSubject
  public :check_unsubscribed
end

def get_capture(iface)
  subject = Rx::Subject.new
  subject.select {|pkt| target_dash_pushed?(pkt) }.debounce(INTERVAL_SECONDS).subscribe(
    lambda {|pkt| post_to_slack },
    lambda {|err| puts "Error: #{err}" },
    lambda { puts 'Completed.' }
  )

  cap = Capture.new(iface: iface, filter: FILTER, start: true)
  cap.stream.each do |pkt|
    subject.on_next pkt
  end
end

def target_dash_pushed?(pkt)
  return false unless EthPacket.can_parse?(pkt)
  EthHeader.str2mac(EthPacket.parse(pkt).eth_src) == MAC_OF_DASH
end

def post_to_slack
  api_url = SLACK_API_URL
  payload = {
    channel:    '#akanuma_private',
    username:   'dash',
    icon_emoji: ':squirrel:',
    text:       'Hello World from Dash Button!!'
  }
  command = "curl -X POST --data-urlencode 'payload=#{payload.to_json}' #{api_url}"
  puts command
  output, std_error, status = Open3.capture3(command)
  puts output
  puts std_error
  puts status

  t_stamp = Time.now.strftime("%Y-%m-%d %H:%M:%S.%6N")
  puts "#{t_stamp} Posted to Slack."
end

if $0 == __FILE__
  iface = ARGV[0]
  puts "Capturing for interface: #{iface}"
  get_capture(iface)
end

 キャプチャしたパケットをRxRubyで作ったストリームに放り込み、selectでターゲットのダッシュボタンからのパケットのみフィルタリングしています。そして debounce で指定した時間(今回は2秒間)よりも短い間隔のパケットをフィルターしています。前回のコードだと最後の処理の時間を記録してその差分をチェックする部分を自前で用意していましたが、 debounce を利用することでシンプルに実装することができました。

 まだまだ Reactive Extensions は使い始めたばかりですが、うまく使えばかなりシンプルに処理を実装することができそうなので、今後使い方を身につけていきたいと思います。