SORACOM Beam から Ruby で AWS IoT の Device Shadow を更新する

 前回 Raspberry Pi を SORACOM Beam 経由で AWS IoT に接続できるところまで確認したので、今回は AWS IoT の Shadow を使って状態を管理するところまでやってみたいと思います。

AWS IoT メッセージブローカーに Pub/Sub する

 まずは AWS IoT メッセージブローカーを MQTT Broker として単純に Pub/Sub してみたいと思います。

 今回は Ruby の MQTT クライアントとして ruby-mqtt を使ってみます。

github.com

 Gemfile に下記を追加して bundle install しておきます。

gem 'mqtt'

 Subscriber 側の実装は下記のようにしました。 SORACOM Beam に接続し、 topic/sample というトピックに Subscribe してメッセージを待ち受けます。

require 'bundler/setup'
require 'mqtt'

BEAM_URL = 'beam.soracom.io'
TOPIC = 'topic/sample'

MQTT::Client.connect(host: BEAM_URL) do |client|
  client.subscribe(TOPIC)
  puts "Subscribed to the topic: #{TOPIC}"

  client.get do |topic, message|
    puts "#{topic}: #{message}"
  end
end

 SORACOM Beam を使っているので認証情報等を書く必要がなく、とてもスッキリ書けます。

 次に Publisher 側です。 topic/sample というトピックにメッセージを一つ Publish して終了します。

require 'bundler/setup'
require 'mqtt'

BEAM_URL = 'beam.soracom.io'
TOPIC = 'topic/sample'

MQTT::Client.connect(host: BEAM_URL) do |client|
  now = Time.now
  client.publish(TOPIC, "Test from publisher.rb: #{now}")
  puts "Published to the topic '#{TOPIC}': #{now}"
end

 では動作を確認してみます。まずは Subscriber を起動してメッセージを待ち受けます。

pi@raspberrypi:~/aws_iot $ bundle exec ruby subscriber.rb  
Subscribed to the topic: topic/sample                           

 そして Publisher を起動してメッセージを Publish します。下記の例では二回起動して二回メッセージを Publish しています。

pi@raspberrypi:~/aws_iot $ bundle exec ruby publisher.rb 
Published to the topic 'topic/sample': 2017-07-12 23:17:33 +0000
pi@raspberrypi:~/aws_iot $ 
pi@raspberrypi:~/aws_iot $ bundle exec ruby publisher.rb 
Published to the topic 'topic/sample': 2017-07-12 23:17:48 +0000

 すると先ほど待ち受け状態にした Subscriber 側で下記のように受け取ったメッセージが表示されます。

pi@raspberrypi:~/aws_iot $ bundle exec ruby subscriber.rb  
Subscribed to the topic: topic/sample                           
topic/sample: Test from publisher.rb: 2017-07-12 23:17:33 +0000 
topic/sample: Test from publisher.rb: 2017-07-12 23:17:48 +0000 

Device Shadow を更新して内容を表示する

 では続いて AWS IoT の Device Shadow を使ってみます。Device Shadow は AWS IoT でモノ(Thing)の状態を保存するために使用されるJSONドキュメントで、Thing Shadows サービスによって管理され、参照、更新することができます。

docs.aws.amazon.com

 Thing Shadows サービスでは MQTT トピックを使用してメッセージがやりとりされます。まずはシンプルに Device Shadow の更新を行い、その内容を受信して表示してみます。Thing Shadows のトピック名のベースは $aws/things/モノの名前/shadow となります。今回はモノの名前を raspberry_pi としていますので、 $aws/things/raspberry_pi/shadow となります。

 Subscriber は下記のようにしました。 Device Shadow に対する更新が正常に行われると、Thing Shadows では $awsthings/raspberry_pi/shadow/update/accepted というトピックにメッセージが Publish されますので、このトピックに Subscribe しておきます。

require 'bundler/setup'
require 'mqtt'
require 'json'

BEAM_URL = 'beam.soracom.io'
TOPIC = '$aws/things/raspberry_pi/shadow/update/accepted'

MQTT::Client.connect(host: BEAM_URL) do |client|
  client.subscribe(TOPIC)
  puts "Subscribed to the topic: #{TOPIC}"

  client.get do |topic, json|
    puts "#{topic}: #{JSON.parse(json)}"
  end
end

 次に Publisher 側です。Device Shadow の状態を更新するには $aws/things/raspberry_pi/shadow/update というトピックにメッセージを Publish します。

require 'bundler/setup'
require 'mqtt'
require 'json'

BEAM_URL = 'beam.soracom.io'
TOPIC = '$aws/things/raspberry_pi/shadow/update'

MQTT::Client.connect(host: BEAM_URL) do |client|
  statement = {
    state: {
      desired: {
        power: 'on'
      }
    }
  }

  client.publish(TOPIC, statement.to_json)
  puts "Published to the topic: '#{TOPIC}'"
end

 では動作確認です。まずは Subscriber を起動して、メッセージを待ち受けます。

pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb                                                  
Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/accepted                                               

 そして Publisher を起動して Device Shadow の更新を行います。

pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_publisher.rb 
Published to the topic: '$aws/things/raspberry_pi/shadow/update'
pi@raspberrypi:~/aws_iot $ 
pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_publisher.rb 
Published to the topic: '$aws/things/raspberry_pi/shadow/update'

 すると下記のように Subscriber 側で Device Shadow の更新内容が表示されます。実際に Publish した内容以外に metadate, version, timestamp が追加されています。

pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb                                                  
Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/accepted                                               
$aws/things/raspberry_pi/shadow/update/accepted: {"state"=>{"desired"=>{"power"=>"on"}}, "metadata"=>{"desired"=>{"power"=>{"timestamp"=>1499902096}}}, "version"=>8, "timestamp"=>1499902096}                                                
$aws/things/raspberry_pi/shadow/update/accepted: {"state"=>{"desired"=>{"power"=>"on"}}, "metadata"=>{"desired"=>{"power"=>{"timestamp"=>1499902128}}}, "version"=>9, "timestamp"=>1499902128}                                                

Device Shadow の更新差分を表示する

 先ほどの例では Device Shadow の更新のために Publish された情報をそのまま受信して表示していましたが、今度は Device Shadow の状態に変更があった場合だけその差分を表示するようにしてみます。

 Device Shadow にはモノの実際の状態を示す reported セクションと、アプリケーション側で望む状態である desired というセクションが含まれます。Device Shadow に対する更新が行われた時に、 reported セクションと desired セクションに差異が発生した場合には $aws/things/raspberry_pi/shadow/update/delta というトピックにメッセージが Publish されますので、このトピックに対して Subscribe しておけば差分が発生したことを検知できます。

 また、差分を検知するためにはまずモノの状態を reported セクションで登録しておく必要がありますので、下記の Subscriber の実装の中ではまず起動時にモノの状態を Publish してから、 delta トピックに Subscribe しています。

require 'bundler/setup'
require 'mqtt'
require 'json'

BEAM_URL = 'beam.soracom.io'
TOPIC = '$aws/things/raspberry_pi/shadow/update'
DELTA_TOPIC = "#{TOPIC}/delta"

MQTT::Client.connect(host: BEAM_URL) do |client|
  initial_statement = {
    state: {
      reported: {
        power: 'off'
      }
    }
  }

  client.publish(TOPIC, initial_statement.to_json)
  puts "Published initial statement."

  client.subscribe(DELTA_TOPIC)
  puts "Subscribed to the topic: #{DELTA_TOPIC}"

  client.get do |topic, json|
    puts "#{topic}: #{JSON.parse(json)}"
  end
end

 そして Publisher 側では desired セクションでモノの状態を Publish します。下記の例では Publish を4回行い、power の状態を on -> off -> off -> on と変更しています。

require 'bundler/setup'
require 'mqtt'
require 'json'

BEAM_URL = 'beam.soracom.io'
TOPIC = '$aws/things/raspberry_pi/shadow/update'

def statement(power)
  {
    state: {
      desired: {
        power: power
      }
    }
  }
end

def publish(client, power)
  client.publish(TOPIC, statement(power).to_json)
  puts "Published to the topic: '#{TOPIC}'. power: #{power}"
end

MQTT::Client.connect(host: BEAM_URL) do |client|
  power = 'on'
  publish(client, power)

  sleep(3)

  power = 'off'
  publish(client, power)

  sleep(3)

  publish(client, power)

  sleep(3)

  power = 'on'
  publish(client, power)
end

 では動作確認です。まず Subscriber を起動してメッセージを待ち受けます。

pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb                                                  
Published initial statement.                                                                                           
Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/delta                                                  

 そして Publisher を起動して Device Shadow を更新します。

pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_publisher.rb 
Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: on
Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: off
Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: off
Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: on

 すると Subscriber 側でメッセージが受信されます。Publisher からはメッセージが4回 Publish されていますが、Subscriber 側では2件しか受信されていません。

pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb                                                  
Published initial statement.                                                                                           
Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/delta                                                  
$aws/things/raspberry_pi/shadow/update/delta: {"version"=>17, "timestamp"=>1499903040, "state"=>{"power"=>"on"}, "metad
ata"=>{"power"=>{"timestamp"=>1499903040}}}                                                                            
$aws/things/raspberry_pi/shadow/update/delta: {"version"=>20, "timestamp"=>1499903049, "state"=>{"power"=>"on"}, "metad
ata"=>{"power"=>{"timestamp"=>1499903049}}}                                                                            

 これは、 Subscriber 起動時に Device Shadow の reported を power: off で更新しているので、 Publisher から desired を power: off で Publish した場合は差分がないため delta にメッセージが Publish されないためです。また、差分を受け取っても特に reported の更新は行なっていないため、 reported は power: off のままになります。

Device Shadow の更新差分を受け取って reported を更新する

 では更新差分を受け取った場合はモノ(Thing)から reported を更新するようにしてみたいと思います。更新差分のメッセージを受信した場合はそのJSONをパースして、power の値を読み取り、reported の状態変更メッセージを Publish します。

require 'bundler/setup'
require 'mqtt'
require 'json'

BEAM_URL = 'beam.soracom.io'
TOPIC = '$aws/things/raspberry_pi/shadow/update'
DELTA_TOPIC = "#{TOPIC}/delta"

def statement(power)
  {
    state: {
      reported: {
        power: power
      }
    }
  }
end

MQTT::Client.connect(host: BEAM_URL) do |client|
  power = 'off'
  client.publish(TOPIC, statement(power).to_json)
  puts "Published initial statement. power: #{power}"

  client.subscribe(DELTA_TOPIC)
  puts "Subscribed to the topic: #{DELTA_TOPIC}"

  client.get do |topic, json|
    power = JSON.parse(json)['state']['power']
    client.publish(TOPIC, statement(power).to_json)
    puts "Changed power state to: #{power}"
    puts json
  end
end

 Publisher は一つ前の例とほぼ同じです。

require 'bundler/setup'
require 'mqtt'
require 'json'

BEAM_URL = 'beam.soracom.io'
TOPIC = '$aws/things/raspberry_pi/shadow/update'

def statement(power)
  {
    state: {
      desired: {
        power: power
      }
    }
  }
end

def publish(client, power)
  client.publish(TOPIC, statement(power).to_json)
  puts "Published to the topic: '#{TOPIC}'. power: #{power}"
end

MQTT::Client.connect(host: BEAM_URL) do |client|
  power = 'on'
  publish(client, power)

  sleep(3)

  power = 'off'
  publish(client, power)

  sleep(3)

  publish(client, power)

  sleep(3)

  power = 'on'
  publish(client, power)

  sleep(3)
end

 では動かしてみます。まず Subscriber を起動。

pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb                                      
Published initial statement. power: off                                                                    
Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/delta                                      

 そして Publisher を起動します。

pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_publisher.rb 
Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: on
Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: off
Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: off
Published to the topic: '$aws/things/raspberry_pi/shadow/update'. power: on

 すると Subscriber 側で下記のようにメッセージが受信されます。

pi@raspberrypi:~/aws_iot $ bundle exec ruby shadow_subscriber.rb                                      
Published initial statement. power: off                                                                    
Subscribed to the topic: $aws/things/raspberry_pi/shadow/update/delta                                      
Changed power state to: on                                                                                 
{"version":38,"timestamp":1499903887,"state":{"power":"on"},"metadata":{"power":{"timestamp":1499903887}}} 
Changed power state to: off                                                                                
{"version":40,"timestamp":1499903889,"state":{"power":"off"},"metadata":{"power":{"timestamp":1499903889}}}
Changed power state to: on                                                                                 
{"version":43,"timestamp":1499903895,"state":{"power":"on"},"metadata":{"power":{"timestamp":1499903895}}} 

 今回はメッセージが3回受信されています。 Subscriber 側で差分通知を受け取った時に reported を更新するようにしたので、 Publisher から二回連続で power: off が Publish された時はメッセージが受信されていませんが、それ以外は差分が通知され、メッセージが受信されています。

 これでひとまず Device Shadow を使ったモノの管理ができるようになりました。SORACOM Beam を使うことで認証情報等は Beam にオフロードしてコードをシンプルにできますし、AWS IoT からは他のAWSサービスとの連携も簡単なので、色々と試してみたいと思います。