前回 Raspberry Pi を SORACOM Beam 経由で AWS IoT に接続できるところまで確認したので、今回は AWS IoT の Shadow を使って状態を管理するところまでやってみたいと思います。
AWS IoT メッセージブローカーに Pub/Sub する
まずは AWS IoT メッセージブローカーを MQTT Broker として単純に Pub/Sub してみたいと思います。
今回は Ruby の MQTT クライアントとして ruby-mqtt を使ってみます。
Gemfile に下記を追加して bundle install しておきます。
gem 'mqtt'
Subscriber 側の実装は下記のようにしました。 SORACOM Beam に接続し、 topic/sample
というトピックに Subscribe してメッセージを待ち受けます。
require 'bundler/setup' require 'mqtt' BEAM_URL = 'beam.soracom.io' TOPIC = 'topic/sample' MQTT::Client.connect(host: BEAM_URL) do |client| client.subscribe(TOPIC) puts "Subscribed to the topic: #{TOPIC}" client.get do |topic, message| puts "#{topic}: #{message}" end end
SORACOM Beam を使っているので認証情報等を書く必要がなく、とてもスッキリ書けます。
次に Publisher 側です。 topic/sample
というトピックにメッセージを一つ Publish して終了します。
require 'bundler/setup' require 'mqtt' BEAM_URL = 'beam.soracom.io' TOPIC = 'topic/sample' MQTT::Client.connect(host: BEAM_URL) do |client| now = Time.now client.publish(TOPIC, "Test from publisher.rb: #{now}") puts "Published to the topic '#{TOPIC}': #{now}" end
では動作を確認してみます。まずは Subscriber を起動してメッセージを待ち受けます。
pi@raspberrypi:~/aws_iot $ bundle exec ruby subscriber.rb
Subscribed to the topic: topic/sample
そして Publisher を起動してメッセージを Publish します。下記の例では二回起動して二回メッセージを Publish しています。
pi@raspberrypi:~/aws_iot $ bundle exec ruby publisher.rb Published to the topic 'topic/sample': 2017-07-12 23:17:33 +0000 pi@raspberrypi:~/aws_iot $ pi@raspberrypi:~/aws_iot $ bundle exec ruby publisher.rb Published to the topic 'topic/sample': 2017-07-12 23:17:48 +0000
すると先ほど待ち受け状態にした Subscriber 側で下記のように受け取ったメッセージが表示されます。
pi@raspberrypi:~/aws_iot $ bundle exec ruby subscriber.rb Subscribed to the topic: topic/sample topic/sample: Test from publisher.rb: 2017-07-12 23:17:33 +0000 topic/sample: Test from publisher.rb: 2017-07-12 23:17:48 +0000
Device Shadow を更新して内容を表示する
では続いて AWS IoT の Device Shadow を使ってみます。Device Shadow は AWS IoT でモノ(Thing)の状態を保存するために使用されるJSONドキュメントで、Thing Shadows サービスによって管理され、参照、更新することができます。
Thing Shadows サービスでは MQTT トピックを使用してメッセージがやりとりされます。まずはシンプルに Device Shadow の更新を行い、その内容を受信して表示してみます。Thing Shadows のトピック名のベースは $aws/things/モノの名前/shadow
となります。今回はモノの名前を raspberry_pi
としていますので、 $aws/things/raspberry_pi/shadow
となります。
Subscriber は下記のようにしました。 Device Shadow に対する更新が正常に行われると、Thing Shadows では $awsthings/raspberry_pi/shadow/update/accepted
というトピックにメッセージが Publish されますので、このトピックに Subscribe しておきます。
require 'bundler/setup' require 'mqtt' require 'json' BEAM_URL = 'beam.soracom.io' TOPIC = '$aws/things/raspberry_pi/shadow/update/accepted' MQTT::Client.connect(host: BEAM_URL) do |client| client.subscribe(TOPIC) puts "Subscribed to the topic: #{TOPIC}" client.get do |topic, json| puts "#{topic}: #{JSON.parse(json)}" end end
次に Publisher 側です。Device Shadow の状態を更新するには $aws/things/raspberry_pi/shadow/update
というトピックにメッセージを Publish します。
require 'bundler/setup' require 'mqtt' require 'json' BEAM_URL = 'beam.soracom.io' TOPIC = '$aws/things/raspberry_pi/shadow/update' MQTT::Client.connect(host: BEAM_URL) do |client| statement = { state: { desired: { power: 'on' } } } client.publish(TOPIC, statement.to_json) puts "Published to the topic: '#{TOPIC}'" end
では動作確認です。まずは Subscriber を起動して、メッセージを待ち受けます。
pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/accepted
そして Publisher を起動して Device Shadow の更新を行います。
pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_publisher.rb Published to the topic: '$aws/things/raspberry_pi/shadow/update' pi@raspberrypi:~/aws_iot $ pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_publisher.rb Published to the topic: '$aws/things/raspberry_pi/shadow/update'
すると下記のように Subscriber 側で Device Shadow の更新内容が表示されます。実際に Publish した内容以外に metadate, version, timestamp が追加されています。
pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/accepted $aws/things/raspberry_pi/shadow/update/accepted: {"state"=>{"desired"=>{"power"=>"on"}}, "metadata"=>{"desired"=>{"power"=>{"timestamp"=>1499902096}}}, "version"=>8, "timestamp"=>1499902096} $aws/things/raspberry_pi/shadow/update/accepted: {"state"=>{"desired"=>{"power"=>"on"}}, "metadata"=>{"desired"=>{"power"=>{"timestamp"=>1499902128}}}, "version"=>9, "timestamp"=>1499902128}
Device Shadow の更新差分を表示する
先ほどの例では Device Shadow の更新のために Publish された情報をそのまま受信して表示していましたが、今度は Device Shadow の状態に変更があった場合だけその差分を表示するようにしてみます。
Device Shadow にはモノの実際の状態を示す reported
セクションと、アプリケーション側で望む状態である desired
というセクションが含まれます。Device Shadow に対する更新が行われた時に、 reported セクションと desired セクションに差異が発生した場合には $aws/things/raspberry_pi/shadow/update/delta
というトピックにメッセージが Publish されますので、このトピックに対して Subscribe しておけば差分が発生したことを検知できます。
また、差分を検知するためにはまずモノの状態を reported セクションで登録しておく必要がありますので、下記の Subscriber の実装の中ではまず起動時にモノの状態を Publish してから、 delta トピックに Subscribe しています。
require 'bundler/setup' require 'mqtt' require 'json' BEAM_URL = 'beam.soracom.io' TOPIC = '$aws/things/raspberry_pi/shadow/update' DELTA_TOPIC = "#{TOPIC}/delta" MQTT::Client.connect(host: BEAM_URL) do |client| initial_statement = { state: { reported: { power: 'off' } } } client.publish(TOPIC, initial_statement.to_json) puts "Published initial statement." client.subscribe(DELTA_TOPIC) puts "Subscribed to the topic: #{DELTA_TOPIC}" client.get do |topic, json| puts "#{topic}: #{JSON.parse(json)}" end end
そして Publisher 側では desired セクションでモノの状態を Publish します。下記の例では Publish を4回行い、power の状態を on -> off -> off -> on
と変更しています。
require 'bundler/setup' require 'mqtt' require 'json' BEAM_URL = 'beam.soracom.io' TOPIC = '$aws/things/raspberry_pi/shadow/update' def statement(power) { state: { desired: { power: power } } } end def publish(client, power) client.publish(TOPIC, statement(power).to_json) puts "Published to the topic: '#{TOPIC}'. power: #{power}" end MQTT::Client.connect(host: BEAM_URL) do |client| power = 'on' publish(client, power) sleep(3) power = 'off' publish(client, power) sleep(3) publish(client, power) sleep(3) power = 'on' publish(client, power) end
では動作確認です。まず Subscriber を起動してメッセージを待ち受けます。
pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb Published initial statement. Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/delta
そして Publisher を起動して Device Shadow を更新します。
pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_publisher.rb Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: on Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: off Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: off Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: on
すると Subscriber 側でメッセージが受信されます。Publisher からはメッセージが4回 Publish されていますが、Subscriber 側では2件しか受信されていません。
pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb Published initial statement. Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/delta $aws/things/raspberry_pi/shadow/update/delta: {"version"=>17, "timestamp"=>1499903040, "state"=>{"power"=>"on"}, "metad ata"=>{"power"=>{"timestamp"=>1499903040}}} $aws/things/raspberry_pi/shadow/update/delta: {"version"=>20, "timestamp"=>1499903049, "state"=>{"power"=>"on"}, "metad ata"=>{"power"=>{"timestamp"=>1499903049}}}
これは、 Subscriber 起動時に Device Shadow の reported を power: off
で更新しているので、 Publisher から desired を power: off
で Publish した場合は差分がないため delta にメッセージが Publish されないためです。また、差分を受け取っても特に reported の更新は行なっていないため、 reported は power: off
のままになります。
Device Shadow の更新差分を受け取って reported を更新する
では更新差分を受け取った場合はモノ(Thing)から reported を更新するようにしてみたいと思います。更新差分のメッセージを受信した場合はそのJSONをパースして、power の値を読み取り、reported の状態変更メッセージを Publish します。
require 'bundler/setup' require 'mqtt' require 'json' BEAM_URL = 'beam.soracom.io' TOPIC = '$aws/things/raspberry_pi/shadow/update' DELTA_TOPIC = "#{TOPIC}/delta" def statement(power) { state: { reported: { power: power } } } end MQTT::Client.connect(host: BEAM_URL) do |client| power = 'off' client.publish(TOPIC, statement(power).to_json) puts "Published initial statement. power: #{power}" client.subscribe(DELTA_TOPIC) puts "Subscribed to the topic: #{DELTA_TOPIC}" client.get do |topic, json| power = JSON.parse(json)['state']['power'] client.publish(TOPIC, statement(power).to_json) puts "Changed power state to: #{power}" puts json end end
Publisher は一つ前の例とほぼ同じです。
require 'bundler/setup' require 'mqtt' require 'json' BEAM_URL = 'beam.soracom.io' TOPIC = '$aws/things/raspberry_pi/shadow/update' def statement(power) { state: { desired: { power: power } } } end def publish(client, power) client.publish(TOPIC, statement(power).to_json) puts "Published to the topic: '#{TOPIC}'. power: #{power}" end MQTT::Client.connect(host: BEAM_URL) do |client| power = 'on' publish(client, power) sleep(3) power = 'off' publish(client, power) sleep(3) publish(client, power) sleep(3) power = 'on' publish(client, power) sleep(3) end
では動かしてみます。まず Subscriber を起動。
pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb Published initial statement. power: off Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/delta
そして Publisher を起動します。
pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_publisher.rb Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: on Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: off Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: off Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: on
すると Subscriber 側で下記のようにメッセージが受信されます。
pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb Published initial statement. power: off Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/delta Changed power state to: on {"version":38,"timestamp":1499903887,"state":{"power":"on"},"metadata":{"power":{"timestamp":1499903887}}} Changed power state to: off {"version":40,"timestamp":1499903889,"state":{"power":"off"},"metadata":{"power":{"timestamp":1499903889}}} Changed power state to: on {"version":43,"timestamp":1499903895,"state":{"power":"on"},"metadata":{"power":{"timestamp":1499903895}}}
今回はメッセージが3回受信されています。 Subscriber 側で差分通知を受け取った時に reported を更新するようにしたので、 Publisher から二回連続で power: off
が Publish された時はメッセージが受信されていませんが、それ以外は差分が通知され、メッセージが受信されています。
これでひとまず Device Shadow を使ったモノの管理ができるようになりました。SORACOM Beam を使うことで認証情報等は Beam にオフロードしてコードをシンプルにできますし、AWS IoT からは他のAWSサービスとの連携も簡単なので、色々と試してみたいと思います。