前回の投稿で、 Amazon Dash Button を押したら Slack にポストする記事を書いたのですが、Facebookでシェアしたところ、「イベントハンドリングのところで、RxRubyあたりを使うとクールに実装できそうです。」というコメントをいただいたので、RxRubyを使う版を試してみました。
RxRuby
RxRuby は Reactive Extensions の Ruby用ライブラリです。最終の更新からしばらく経っているようですが、使わせてもらうことにしました。
ググってみたところ、RxRubyについてはあまり記事は多くなかったのですが、こちらのブログを参考にさせていただきました。
Reactive Extensions については私も今回初めて調べたので、詳しいことは語れませんが、GoFのデザインパターンのオブザーバーパターンを表したもので、非同期にイベント駆動で処理を行うためのモデルと解釈しています。詳しい内容についてはReactive Extensionsの公式ページを参照していただくと良いかと思います。
Dash Button のMACアドレス確認
やろうとしていることは前回記事と同じですが、RxRubyを使うようにコードを変更してみました。
capture_rx.rb
require 'packetfu' require 'rx' require './ouis.rb' include PacketFu class Rx::BehaviorSubject public :check_unsubscribed end def get_capture(iface) subject = Rx::Subject.new subject.select {|pkt| dash_packet?(pkt) }.subscribe( lambda {|pkt| capture(pkt) }, lambda {|err| puts "Error: #{err}" }, lambda { puts 'Completed.' } ) cap = Capture.new(iface: iface, start: true) cap.stream.each do |pkt| subject.on_next pkt end end def dash_packet?(pkt) return false unless EthPacket.can_parse?(pkt) get_vendor_name(EthHeader.str2mac(EthPacket.parse(pkt).eth_src)).downcase.include?('amazon') end def capture(pkt) time = Time.now.strftime("%Y-%m-%d %H:%M:%S.%6N") if UDPPacket.can_parse?(pkt) packet = UDPPacket.parse(pkt) src_ip = IPHeader.octet_array(packet.ip_src).join('.') dst_ip = IPHeader.octet_array(packet.ip_dst).join('.') protocol = 'udp' elsif ARPPacket.can_parse?(pkt) packet = ARPPacket.parse(pkt) src_ip = packet.arp_saddr_ip dst_ip = packet.arp_daddr_ip protocol = 'arp' else return end src_mac, dst_mac, vendor_name = get_common_values(packet) output(time, src_mac, dst_mac, src_ip, dst_ip, protocol, vendor_name) end def output(time, src_mac, dst_mac, src_ip, dst_ip, protocol, vendor_name) puts "time:#{time}, src_mac:#{src_mac}, dst_mac:#{dst_mac}, src_ip:#{src_ip}, dst_ip:#{dst_ip}, protocol:#{protocol}, vendor:#{vendor_name}" end def get_common_values(packet) src_mac = EthHeader.str2mac(packet.eth_src) dst_mac = EthHeader.str2mac(packet.eth_dst) vendor_name = get_vendor_name(src_mac) return src_mac, dst_mac, vendor_name end def get_vendor_name(mac) return '' if mac.nil? oui = mac.split(':').slice(0, 3).join('-') OUIS[oui.upcase] end if $0 == __FILE__ iface = ARGV[0] puts "Capturing for interface: #{iface}" get_capture(iface) end
キャプチャしたパケットをRxRubyで作ったストリームに放り込み、selectでダッシュボタンからのパケットのみフィルタリングしています。パケットの内容を出力する処理は subscribe で渡す lambda の中に定義しました。
Dash Button からの通信を検知して Slack に投稿する
こちらもやろうとしてることは前回と同じですが、RxRubyを使うように変更しています。
post_to_slack_rx.rb
require 'packetfu' require 'open3' require 'json' require 'rx' include PacketFu FILTER = nil MAC_OF_DASH = 'Dash Button の MACアドレス' SLACK_API_URL = 'Slack Incoming Webhook URL' INTERVAL_SECONDS = 2 class Rx::BehaviorSubject public :check_unsubscribed end def get_capture(iface) subject = Rx::Subject.new subject.select {|pkt| target_dash_pushed?(pkt) }.debounce(INTERVAL_SECONDS).subscribe( lambda {|pkt| post_to_slack }, lambda {|err| puts "Error: #{err}" }, lambda { puts 'Completed.' } ) cap = Capture.new(iface: iface, filter: FILTER, start: true) cap.stream.each do |pkt| subject.on_next pkt end end def target_dash_pushed?(pkt) return false unless EthPacket.can_parse?(pkt) EthHeader.str2mac(EthPacket.parse(pkt).eth_src) == MAC_OF_DASH end def post_to_slack api_url = SLACK_API_URL payload = { channel: '#akanuma_private', username: 'dash', icon_emoji: ':squirrel:', text: 'Hello World from Dash Button!!' } command = "curl -X POST --data-urlencode 'payload=#{payload.to_json}' #{api_url}" puts command output, std_error, status = Open3.capture3(command) puts output puts std_error puts status t_stamp = Time.now.strftime("%Y-%m-%d %H:%M:%S.%6N") puts "#{t_stamp} Posted to Slack." end if $0 == __FILE__ iface = ARGV[0] puts "Capturing for interface: #{iface}" get_capture(iface) end
キャプチャしたパケットをRxRubyで作ったストリームに放り込み、selectでターゲットのダッシュボタンからのパケットのみフィルタリングしています。そして debounce で指定した時間(今回は2秒間)よりも短い間隔のパケットをフィルターしています。前回のコードだと最後の処理の時間を記録してその差分をチェックする部分を自前で用意していましたが、 debounce を利用することでシンプルに実装することができました。
まだまだ Reactive Extensions は使い始めたばかりですが、うまく使えばかなりシンプルに処理を実装することができそうなので、今後使い方を身につけていきたいと思います。