AWS Greengrass のデバイスコードを Python版 SDK で実装してみる

f:id:akanuma-hiroaki:20180105082050p:plain:right

 前回は AWS Greengrass の公式ドキュメントで紹介されているロボットアームのシナリオをそのまま動かしてみましたが、手順をそのままトレースしただけだったので、内容の理解のためにも Python 版の SDK を使ってデバイス用コードを実装してみました。

 具体的には、グループやサブスクリプション等の設定は前回のものをそのまま使用し、 RobotArm_Thing と Switch_Thing で動かすプログラムを、 AWS IoT C++ Device SDK のサンプルとして提供されていたものと同じものを AWS IoT Device SDK for Python を使用して実装しています。

github.com

サブスクリプションの設定内容

 コードの処理内容をイメージしやすくするためにも、コードの中身の前に、サブスクリプションの内容について振り返ってみたいと思います。サブスクリプションの内容が理解できると、各デバイスや Greengrass Core、クラウド上の AWS IoT サービスなどがどのように連携しているかイメージしやすくなるかと思います。前回作成したサブスクリプションの内容は下記の通りですが、それぞれの設定がどういった内容になるのか改めて整理してみます。

$ aws greengrass get-subscription-definition-version --subscription-definition-id 2eb1f6e4-a960-4796-8e0d-e67adbdeabd7 --subscription-definition-version-id c75cc6e2-af57-48fb-8f6a-42e9a5f74d10
{
    "Definition": {
        "Subscriptions": [
            {
                "Source": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/Switch_Thing", 
                "Target": "GGShadowService", 
                "Id": "1", 
                "Subject": "$aws/things/RobotArm_Thing/shadow/update"
            }, 
            {
                "Source": "GGShadowService", 
                "Target": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/RobotArm_Thing", 
                "Id": "10", 
                "Subject": "$aws/things/RobotArm_Thing/shadow/update/accepted"
            }, 
            {
                "Source": "GGShadowService", 
                "Target": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/RobotArm_Thing", 
                "Id": "11", 
                "Subject": "$aws/things/RobotArm_Thing/shadow/update/rejected"
            }, 
            {
                "Source": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/RobotArm_Thing", 
                "Target": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:uptimeLambda:storyLineUptime", 
                "Id": "2", 
                "Subject": "/topic/state"
            }, 
            {
                "Source": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/RobotArm_Thing", 
                "Target": "GGShadowService", 
                "Id": "3", 
                "Subject": "$aws/things/RobotArm_Thing/shadow/update"
            }, 
            {
                "Source": "GGShadowService", 
                "Target": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/RobotArm_Thing", 
                "Id": "4", 
                "Subject": "$aws/things/RobotArm_Thing/shadow/update/delta"
            }, 
            {
                "Source": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:uptimeLambda:storyLineUptime", 
                "Target": "cloud", 
                "Id": "5", 
                "Subject": "/topic/metering"
            }, 
            {
                "Source": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:messageLambda:storyLineMessage", 
                "Target": "GGShadowService", 
                "Id": "6", 
                "Subject": "$aws/things/RobotArm_Thing/shadow/update"
            }, 
            {
                "Source": "cloud", 
                "Target": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:messageLambda:storyLineMessage", 
                "Id": "7", 
                "Subject": "/topic/update"
            }, 
            {
                "Source": "GGShadowService", 
                "Target": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/Switch_Thing", 
                "Id": "8", 
                "Subject": "$aws/things/RobotArm_Thing/shadow/update/accepted"
            }, 
            {
                "Source": "GGShadowService", 
                "Target": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/Switch_Thing", 
                "Id": "9", 
                "Subject": "$aws/things/RobotArm_Thing/shadow/update/rejected"
            }
        ]
    }, 
    "Version": "c75cc6e2-af57-48fb-8f6a-42e9a5f74d10", 
    "CreationTimestamp": "2017-12-29T13:14:34.243Z", 
    "Id": "2eb1f6e4-a960-4796-8e0d-e67adbdeabd7", 
    "Arn": "arn:aws:greengrass:ap-northeast-1:XXXXXXXXXXXX:/greengrass/definition/subscriptions/2eb1f6e4-a960-4796-8e0d-e67adbdeabd7/versions/c75cc6e2-af57-48fb-8f6a-42e9a5f74d10"
}

 上記設定について Id 順に私なりに理解している内容を書いていきたいと思います。

{
    "Source": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/Switch_Thing", 
    "Target": "GGShadowService", 
    "Id": "1", 
    "Subject": "$aws/things/RobotArm_Thing/shadow/update"
}, 

 Id: 1 は Switch_Thing が RobotArm_Thing の Device Shadow の状態を更新する際の設定です。 Switch_Thing から Greengrass Core デバイス の Subject に示されるトピックに publish されると、 Target である Greengrass の Device Shadow の Service に publish された内容が送信されます。

{
    "Source": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/RobotArm_Thing", 
    "Target": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:uptimeLambda:storyLineUptime", 
    "Id": "2", 
    "Subject": "/topic/state"
}, 

 Id: 2 は RobotArm_Thing が MQTTブローカー(Greengrass Core) に Publish した場合の設定です。 Subject に示されるトピックに publish されると、 Target に指定されている Lambda Function に publish された内容が送信され、 Greengrass Core デバイス上で実行されます。

{
    "Source": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/RobotArm_Thing", 
    "Target": "GGShadowService", 
    "Id": "3", 
    "Subject": "$aws/things/RobotArm_Thing/shadow/update"
}, 

 Id: 3 は RobotArm_Thing が自身の Device Shadow の状態を更新する際の設定です。 Source が RobotArm_Thing 自身になったという以外は Id: 1 と同様です。

{
    "Source": "GGShadowService", 
    "Target": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/RobotArm_Thing", 
    "Id": "4", 
    "Subject": "$aws/things/RobotArm_Thing/shadow/update/delta"
}, 

 Id: 4 は Greengrass Core の Device Shadow の Service が delta トピックに publish した場合の設定です。 RobotArm_Thing の Device Shadow の desired ステータスが更新され、 reported ステータスと差異が発生するとその差異が Subject に示される delta トピックに Publish され、 Target である RobotArm_Thing に送信されます。

{
    "Source": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:uptimeLambda:storyLineUptime", 
    "Target": "cloud", 
    "Id": "5", 
    "Subject": "/topic/metering"
}, 

 Id: 5 は Greengrass Core デバイス上で稼働する Lambda Function からクラウド上の AWS IoT サービスにメッセージを送信する場合の設定です。 Source に示されている Lambda Function から Subject に示されているトピックに publish すると、 AWS IoT サービスの同様のトピックにその内容が publish されます。

{
    "Source": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:messageLambda:storyLineMessage", 
    "Target": "GGShadowService", 
    "Id": "6", 
    "Subject": "$aws/things/RobotArm_Thing/shadow/update"
}, 

 Id: 6 は Greengrass Core デバイス上で稼働する Lambda Function から Greengrass Core の Device Shadow のサービスにメッセージを publish する際の設定です。 Source に示されている Lambda Function から RobotArm_Thing の Device Shadow への更新がリクエストされると Subject に示されるトピックに更新用のメッセージが publish され、 Target に示されている Greengrass Core の Device Shadow のサービスへ送信されます。

{
    "Source": "cloud", 
    "Target": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:messageLambda:storyLineMessage", 
    "Id": "7", 
    "Subject": "/topic/update"
}, 

 Id: 7 はクラウド上の AWS IoT のコンソールからメッセージを publish した場合の設定です。コンソールから Subject に示されているトピックに publish されると、Target に示されている Greengrass Core デバイス上で稼働している Lambda Function にメッセージが送信されます。

{
    "Source": "GGShadowService", 
    "Target": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/Switch_Thing", 
    "Id": "8", 
    "Subject": "$aws/things/RobotArm_Thing/shadow/update/accepted"
}, 

 Id: 8 は Switch_Thing から RobotArm_Thing の Device Shadow の更新が成功した場合の設定です。 更新が成功すると Source に示されている Greengrass Core の Device Shadow サービスから Subject に示されている accepted トピックにメッセージが publish され、その内容が Switch_Thing に送信されます。

{
    "Source": "GGShadowService", 
    "Target": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/Switch_Thing", 
    "Id": "9", 
    "Subject": "$aws/things/RobotArm_Thing/shadow/update/rejected"
}

 Id: 9 は Id: 8 とは逆に Device Shadow の更新が失敗した場合の設定です。更新が失敗すると Source に示されている Greengrass Core の Device Shadow サービスから Subject に示されている rejected トピックにメッセージが publish され、その内容が Switch_Thing に送信されます。

{
    "Source": "GGShadowService", 
    "Target": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/RobotArm_Thing", 
    "Id": "10", 
    "Subject": "$aws/things/RobotArm_Thing/shadow/update/accepted"
}, 

 Id: 10 は Id: 8 と同様に RobotArm_Thing の Device Shadow の更新に成功した場合の設定ですが、送信先を RobotArm_Thing にしています。

{
    "Source": "GGShadowService", 
    "Target": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:thing/RobotArm_Thing", 
    "Id": "11", 
    "Subject": "$aws/things/RobotArm_Thing/shadow/update/rejected"
}, 

 Id: 11 は Id: 9 と同様に RobotArm_Thing の Device Shadow の更新に失敗した場合の設定ですが、送信先を RobotArm_Thing にしています。

RobotArm_Thing のデバイスコード

 デバイスコードで実行している処理の概要は、下記のような流れになります。

  • DiscoveryInfoProvider で自身が属する Greengrass Group の Core デバイスの情報を取得する

  • Greengrass Core に接続する

  • Device Shadow や MQTTクライアントのインスタンスを取得する

  • Device Shadow の更新や MQTTブローカーへのメッセージの publish など任意の処理を行う

 上記の流れは RobotArm_Thing でも Switch_Thing でも同様です。まずは RobotArm_Thing のデバイスコードから見ていきます。ひとまずコード全体を掲載します。わかりやすくするためにエラーハンドリングなどの処理は省略しています。

import time
import uuid
import json
import logging
from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueDisabledException

class RobotArm:
    ENDPOINT = 'greengrass.iot.ap-northeast-1.amazonaws.com'
    ROOT_CA_PATH = 'certs/robotArm/root-ca.pem'
    CERTIFICATE_PATH = 'certs/robotArm/c5e6d39f7b-certificate.pem.crt'
    PRIVATE_KEY_PATH = 'certs/robotArm/c5e6d39f7b-private.pem.key'
    THING_NAME = 'RobotArm_Thing'
    CLIENT_ID = 'RobotArm_Thing'
    GROUP_CA_PATH = './groupCA/'
    METERING_TOPIC = '/topic/state'

    def __init__(self):
        self.discoveryInfoProvider = DiscoveryInfoProvider()
        self.discoveryInfoProvider.configureEndpoint(self.ENDPOINT)
        self.discoveryInfoProvider.configureCredentials(self.ROOT_CA_PATH, self.CERTIFICATE_PATH, self.PRIVATE_KEY_PATH)
        self.discoveryInfoProvider.configureTimeout(10)

        logger = logging.getLogger('AWSIoTPythonSDK.core')
        logger.setLevel(logging.DEBUG)
        streamHandler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        streamHandler.setFormatter(formatter)
        logger.addHandler(streamHandler)

    def get_ggc_info(self):
        discoveryInfo = self.discoveryInfoProvider.discover(self.THING_NAME)
        caList = discoveryInfo.getAllCas()
        coreList = discoveryInfo.getAllCores()

        groupId, ca = caList[0]
        coreInfo = coreList[0]
        return (groupId, ca, coreInfo)

    def write_ca_file(self, groupId, ca):
        groupCAPath = self.GROUP_CA_PATH + groupId + '_CA_' + str(uuid.uuid4()) + '.crt'
        groupCAFile = open(groupCAPath, 'w')
        groupCAFile.write(ca)
        groupCAFile.close()
        return groupCAPath

    def connect_to_shadow_service(self, groupCAPath, coreInfo):
        shadowClient = AWSIoTMQTTShadowClient(self.CLIENT_ID)
        shadowClient.configureCredentials(groupCAPath, self.PRIVATE_KEY_PATH, self.CERTIFICATE_PATH)

        connectivityInfo = coreInfo.connectivityInfoList[0]
        ggcHost = connectivityInfo.host
        ggcPort = connectivityInfo.port

        shadowClient.configureEndpoint(ggcHost, ggcPort)
        shadowClient.connect()
        return shadowClient

    def get_mqtt_client(self, shadowClient):
        return shadowClient.getMQTTConnection()

    def get_device_shadow(self, shadowClient):
        return shadowClient.createShadowHandlerWithName(self.CLIENT_ID, True)

    def shadow_update_callback(self, payload, responseStatus, token):
        reportedState = json.loads(payload)['state']['reported']['myState']
        self.publish_mqtt_async(self.mqttClient, reportedState)

    def shadow_delta_callback(self, payload, responseStatus, token):
        desiredState = json.loads(payload)['state']['myState']
        self.publish_shadow_state(self.deviceShadow, desiredState)

    def publish_shadow_state(self, deviceShadow, state):
        reportedState = { 'state': { 'reported': { 'myState': state } } }
        print('Sending State -------\n%s' % reportedState)
        deviceShadow.shadowUpdate(json.dumps(reportedState), self.shadow_update_callback, 5)

    def publish_mqtt_async(self, mqttClient, state):
        payload = { 'state': state }
        mqttClient.publish(self.METERING_TOPIC, json.dumps(payload), 0)

    def wait_for_update_shadow(self, deviceShadow):
        deviceShadow.shadowRegisterDeltaCallback(self.shadow_delta_callback)

    def execute(self):
        groupId, ca, coreInfo = self.get_ggc_info()
        groupCAPath = self.write_ca_file(groupId, ca)

        shadowClient = self.connect_to_shadow_service(groupCAPath, coreInfo)

        self.deviceShadow = self.get_device_shadow(shadowClient)
        self.mqttClient = self.get_mqtt_client(shadowClient)

        self.publish_shadow_state(self.deviceShadow, 'off')

        self.wait_for_update_shadow(self.deviceShadow)
        print('Waiting for an update!')

        while True:
            time.sleep(1)

if '__main__' == __name__:
    robotArm = RobotArm()
    robotArm.execute()

 各処理について解説していきます。まずはコンストラクタで所属している Greengrass Group の情報を取得するための DiscoveryInfoProvider のインスタンスを取得して設定を行なっています。エンドポイントについてはアカウント固有の AWS IoT のエンドポイントではなく、リージョンごとに固定のエンドポイントで良いようです。

    def __init__(self):
        self.discoveryInfoProvider = DiscoveryInfoProvider()
        self.discoveryInfoProvider.configureEndpoint(self.ENDPOINT)
        self.discoveryInfoProvider.configureCredentials(self.ROOT_CA_PATH, self.CERTIFICATE_PATH, self.PRIVATE_KEY_PATH)
        self.discoveryInfoProvider.configureTimeout(10)

 そしてその DiscoveryInfoProvider インスタンスを使用して自身の Thing Name を引数にして discover メソッドを実行して Greengrass Group の情報を取得します。

    def get_ggc_info(self):
        discoveryInfo = self.discoveryInfoProvider.discover(self.THING_NAME)
        caList = discoveryInfo.getAllCas()
        coreList = discoveryInfo.getAllCores()

        groupId, ca = caList[0]
        coreInfo = coreList[0]
        return (groupId, ca, coreInfo)

 デバイスから Greengrass Core に接続するには証明書の情報が必要なので Greengrass Group の情報から取得できる証明書を保存しておきます。

    def write_ca_file(self, groupId, ca):
        groupCAPath = self.GROUP_CA_PATH + groupId + '_CA_' + str(uuid.uuid4()) + '.crt'
        groupCAFile = open(groupCAPath, 'w')
        groupCAFile.write(ca)
        groupCAFile.close()
        return groupCAPath

 そして Greengrass Core の Device Shadow サービスに接続し、 AWSIoTMQTTShadowClient のインスタンスを返します。

    def connect_to_shadow_service(self, groupCAPath, coreInfo):
        shadowClient = AWSIoTMQTTShadowClient(self.CLIENT_ID)
        shadowClient.configureCredentials(groupCAPath, self.PRIVATE_KEY_PATH, self.CERTIFICATE_PATH)

        connectivityInfo = coreInfo.connectivityInfoList[0]
        ggcHost = connectivityInfo.host
        ggcPort = connectivityInfo.port

        shadowClient.configureEndpoint(ggcHost, ggcPort)
        shadowClient.connect()
        return shadowClient

 そのインスタンスから、実際に Device Shadow を操作するために、自身の Thing Name を CLIENT_ID として deviceShadow のインスタンスを取得します。

    def get_device_shadow(self, shadowClient):
        return shadowClient.createShadowHandlerWithName(self.CLIENT_ID, True)

 さらに同じ接続を使用してプレーンな MQTT の操作を行うために AWSIoTMQTTShadowClient のインスタンスから AWSIoTMQTTClient のインスタンスを取得します。

    def get_mqtt_client(self, shadowClient):
        return shadowClient.getMQTTConnection()

 そして起動時にはまず状態を off として Device Shadow の reported を更新します。また、コールバックメソッドで現在の状態を MQTT トピックに publish しています。今回は responseStatus に関係なく処理を行っていますが、本来は responseStatus によって処理をハンドリングする必要があるかと思います。

    def publish_shadow_state(self, deviceShadow, state):
        reportedState = { 'state': { 'reported': { 'myState': state } } }
        print('Sending State -------\n%s' % reportedState)
        deviceShadow.shadowUpdate(json.dumps(reportedState), self.shadow_update_callback, 5)

    def shadow_update_callback(self, payload, responseStatus, token):
        reportedState = json.loads(payload)['state']['reported']['myState']
        self.publish_mqtt_async(self.mqttClient, reportedState)

    def publish_mqtt_async(self, mqttClient, state):
        payload = { 'state': state }
        mqttClient.publish(self.METERING_TOPIC, json.dumps(payload), 0)

 最後に Switch_Thing からの Device Shadow の更新を待ち受けるため、 delta トピックにメッセージが publish された時のコールバックメソッドを登録します。今回の処理は先ほどの publish_shadow_state を使用して、 reported のステータスを更新し、 desired ステータスに揃えているだけですが、実際はハードのスイッチの ON/OFF の処理などを行うことになるかと思います。

    def wait_for_update_shadow(self, deviceShadow):
        deviceShadow.shadowRegisterDeltaCallback(self.shadow_delta_callback)

    def shadow_delta_callback(self, payload, responseStatus, token):
        desiredState = json.loads(payload)['state']['myState']
        self.publish_shadow_state(self.deviceShadow, desiredState)

 このスクリプトを実行すると、デバッグ用の出力を除くと下記のような出力があり、 Switch_Thing からの Device Shadow の更新を待ち受けます。

$ python robot_arm.py 
Sending State -------
{'state': {'reported': {'myState': 'off'}}}
Waiting for an update!

Switch_Thing のデバイスコード

 続いて Switch_Thing のデバイスコードです。こちらもまずはコード全体を掲載します。エラーハンドリング等の処理は省略しています。

import sys
import time
import uuid
import json
import logging
from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient

class Switch:
    ENDPOINT = 'greengrass.iot.ap-northeast-1.amazonaws.com'
    ROOT_CA_PATH = 'certs/switch/root-ca.pem'
    CERTIFICATE_PATH = 'certs/switch/a20b621e05-certificate.pem.crt'
    PRIVATE_KEY_PATH = 'certs/switch/a20b621e05-private.pem.key'
    THING_NAME = 'Switch_Thing'
    CLIENT_ID = 'Switch_Thing'
    TARGET_THING_NAME = 'RobotArm_Thing'
    GROUP_CA_PATH = './groupCA/'

    def __init__(self):
        self.discoveryInfoProvider = DiscoveryInfoProvider()
        self.discoveryInfoProvider.configureEndpoint(self.ENDPOINT)
        self.discoveryInfoProvider.configureCredentials(self.ROOT_CA_PATH, self.CERTIFICATE_PATH, self.PRIVATE_KEY_PATH)
        self.discoveryInfoProvider.configureTimeout(10)

        logger = logging.getLogger('AWSIoTPythonSDK.core')
        logger.setLevel(logging.INFO)
        streamHandler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        streamHandler.setFormatter(formatter)
        logger.addHandler(streamHandler)

    def get_ggc_info(self):
        discoveryInfo = self.discoveryInfoProvider.discover(self.THING_NAME)
        caList = discoveryInfo.getAllCas()
        coreList = discoveryInfo.getAllCores()

        groupId, ca = caList[0]
        coreInfo = coreList[0]
        return (groupId, ca, coreInfo)

    def write_ca_file(self, groupId, ca):
        groupCAPath = self.GROUP_CA_PATH + groupId + '_CA_' + str(uuid.uuid4()) + '.crt'
        groupCAFile = open(groupCAPath, 'w')
        groupCAFile.write(ca)
        groupCAFile.close()
        return groupCAPath

    def connect_to_shadow_service(self, groupCAPath, coreInfo):
        shadowClient = AWSIoTMQTTShadowClient(self.CLIENT_ID)
        shadowClient.configureCredentials(groupCAPath, self.PRIVATE_KEY_PATH, self.CERTIFICATE_PATH)

        connectivityInfo = coreInfo.connectivityInfoList[0]
        ggcHost = connectivityInfo.host
        ggcPort = connectivityInfo.port

        shadowClient.configureEndpoint(ggcHost, ggcPort)
        shadowClient.connect()
        return shadowClient

    def get_mqtt_client(self, shadowClient):
        return shadowClient.getMQTTConnection()

    def update_target_device_shadow(self, mqttClient, state):
        update_topic = '$aws/things/%s/shadow/update' % self.TARGET_THING_NAME
        desiredState = { 'state': { 'desired': { 'myState': state } } }
        print('Sending State -------\n%s' % desiredState)
        mqttClient.publish(update_topic, json.dumps(desiredState), 0)

    def execute(self):
        groupId, ca, coreInfo = self.get_ggc_info()
        groupCAPath = self.write_ca_file(groupId, ca)

        shadowClient = self.connect_to_shadow_service(groupCAPath, coreInfo)

        mqttClient = self.get_mqtt_client(shadowClient)

        while True:
            sys.stdout.write('Please enter 1 (turn on) or 0 (turn off) to control the robot arm, q to quit: ')
            user_input = raw_input('')

            if user_input == 'q':
                break

            if user_input == '1':
                state = 'on'
            elif user_input == '0':
                state = 'off'
            else:
                print('Invalid input.')
                continue

            self.update_target_device_shadow(mqttClient, state)

if '__main__' == __name__:
    switch = Switch()
    switch.execute()

 RobotArm_Thing のデバイスコードと違って自身の Device Shadow を扱う必要がないので deviceShadow インスタンスは取得していませんが、それ以外は AWSIoTMQTTClient の取得までは同様の処理になります。

 while ループの中ではユーザからの入力を待ち受け、 on(1) か off(0) の入力があると RobotArm_Thing の Device Shadow の update トピックに desired ステータスを publish します。

    def update_target_device_shadow(self, mqttClient, state):
        update_topic = '$aws/things/%s/shadow/update' % self.TARGET_THING_NAME
        desiredState = { 'state': { 'desired': { 'myState': state } } }
        print('Sending State -------\n%s' % desiredState)
        mqttClient.publish(update_topic, json.dumps(desiredState), 0)

 スクリプトを実行すると、デバッグ用の出力以外では下記のような形で入力を待ち受け、入力があると RobotArm_Thing の Device Shadow が更新されます。

$ python switch.py 
Please enter 1 (turn on) or 0 (turn off) to control the robot arm, q to quit: 1
Sending State -------
{'state': {'desired': {'myState': 'on'}}}

 これで前回実行したロボットアームのシナリオと同様の動作をする処理を Python SDK で実装することができました。

まとめ

 デバイスコードの処理内容は一度わかってしまえば複雑ではないですし、 Python だとよりシンプルにかけて良い感じでしたが、実際はエラーハンドリングの処理を随所に入れていく必要があるかと思いますので、ネットワークの状態など様々な状態を考慮したエラーハンドリングは結構大変になりそうな気がします。また、各デバイスも起動時には自身が所属する Greengass Group の情報を取得するためにはクラウドにアクセスする必要がありますので、完全にローカルのみで動作させるというわけには行かなそうです。 Core デバイスの IP と Port を決め打ちにしてしまえば起動時からローカルのみでも行けそうではありますが、それだと逆にクラウドからは接続情報などを反映させることができなくなるので、微妙なところではあります。

 今回のコードはこちらにも公開しました。

github.com