M5Stack で MicroPython のスレッドを使う

 前回は M5Stack でテキストを簡易的にスクロール表示させる処理を実装してみましたが、画面の下端にテキストをスクロール表示させつつ、残りの部分に何かを表示するにはスレッドを使った処理が必要かと思ったので、今回は M5Stack の _thread モジュールを使った処理を実装してみました。

 _thread モジュールについては下記サイトを参考にさせていただきました。

qiita.com

 また、 M5Stack の github リポジトリにもサンプルが公開されていました。

github.com

サンプル実装

 まずは _thread モジュールがちゃんと使えることを確認するために、ごく簡単なサンプルを実装してみます。下記のコードではテキストを表示する2つのスレッドを生成し、違う間隔でテキストの表示を行います。 _thread.start_new_thread() でメソッドを指定してスレッドを生成しています。

from m5stack import lcd

import time
import _thread

lcd.clear()
lcd.setCursor(0, 0)
lcd.setColor(lcd.WHITE)

def hello():
    while True:
        time.sleep(3)
        lcd.println("Hello World! from: {}".format(_thread.getSelfName()))

def goodby():
    while True:
        time.sleep(5)
        lcd.println("Goodby! from: {}".format(_thread.getSelfName()))
    
_thread.start_new_thread('hello', hello, ())
_thread.start_new_thread('goodby', goodby, ())

 これを実行すると下記の動画のようになります。とりあえず各スレッドでの処理が行われていることが確認できます。

天気情報+アバター表示

 それでは次は前回の天気情報のスクロール表示にアバター表示を組み合わせて、天気情報をしゃべっているような表示を実装してみたいと思います。M5Stack でのアバター表示は @meganetaaan さんが m5stack-avatar を公開されていますので、 Arduino 環境であればこちらを使うのが良いかと思います。

github.com

 検索してみた限りではまだ MicroPython 版のアバター表示ライブラリは内容でしたので、自前で簡易に表示させてみたいと思います。画面イメージは下記画像の通りです。

f:id:akanuma-hiroaki:20181013223503j:plain

 まずはコード全体を掲載しておきます。

from m5stack import lcd

import random
import time
import ujson
import urequests
import _thread

class Weather:
    def __init__(self):
        self.base_url    = 'http://api.openweathermap.org/data/2.5/weather?q={},jp&appid={}'
        self.api_key     = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
        self.prefectures = ['Tokyo', 'Saitama', 'Nagoya']

    def get_weather(self, prefecture):
        response      = urequests.get(self.base_url.format(prefecture, self.api_key))
        json          = response.json()
        main          = json['main']
        self.temp     = main['temp']
        self.pressure = main['pressure']
        self.humidity = main['humidity']
        self.temp_min = main['temp_min']
        self.temp_max = main['temp_max']

    def text(self, prefecture):
        return "[{}] temp: {} pressure: {} humidity: {} temp_min: {} temp_max: {}".format(
            prefecture, self.temp, self.pressure, self.humidity, self.temp_min, self.temp_max
        )

class Face:
    def __init__(self, ww, wh, fw, fh):
        self.ww                 = ww
        self.wh                 = wh
        self.fw                 = fw
        self.fh                 = fh
        self.eye_x              = 90
        self.eye_y              = 80
        self.eye_r              = 10
        self.eye_close_x        = 70
        self.eye_close_width    = 40
        self.eye_close_height   = 5
        self.blink_term_ms      = 500
        self.mouth_x            = 135
        self.mouth_y            = 150
        self.mouth_width        = 50
        self.mouth_height       = 5
        self.mouth_close        = True
        self.mouth_close_height = 20
        
        self.spaces = ' '
        while lcd.textWidth(self.spaces) < self.ww:
            self.spaces += ' '

    def blink(self):
        while True:
            self.eye_close()
            time.sleep_ms(self.blink_term_ms)
            self.eye_open()
            time.sleep(random.randint(2, 6))

    def eye_close(self):
        lcd.circle(self.eye_x, self.eye_y, self.eye_r, lcd.BLACK, lcd.BLACK)
        lcd.circle(self.ww - self.eye_x, self.eye_y, self.eye_r, lcd.BLACK, lcd.BLACK)
        lcd.rect(self.eye_close_x, self.eye_y, self.eye_close_width, self.eye_close_height, lcd.WHITE, lcd.WHITE)
        lcd.rect(
            self.ww - self.eye_close_x - self.eye_close_width,
            self.eye_y, self.eye_close_width,
            self.eye_close_height,
            lcd.WHITE,
            lcd.WHITE
        )

    def eye_open(self):
        lcd.rect(self.eye_close_x, self.eye_y, self.eye_close_width, self.eye_close_height, lcd.BLACK, lcd.BLACK)
        lcd.rect(
            self.ww - self.eye_close_x - self.eye_close_width,
            self.eye_y,
            self.eye_close_width,
            self.eye_close_height,
            lcd.BLACK,
            lcd.BLACK
        )
        lcd.circle(self.eye_x, self.eye_y, self.eye_r, lcd.WHITE, lcd.WHITE)
        lcd.circle(self.ww - self.eye_x, self.eye_y, self.eye_r, lcd.WHITE, lcd.WHITE)
        
    def lipsync(self):
        if self.mouth_close:
            self.lip_open()
        else:
            self.lip_close()

    def lip_close(self):
        lcd.rect(
            self.mouth_x,
            self.mouth_y - (self.mouth_close_height // 2),
            self.mouth_width,
            self.mouth_height + self.mouth_close_height, 
            lcd.BLACK,
            lcd.BLACK
        )
        lcd.rect(self.mouth_x, self.mouth_y, self.mouth_width, self.mouth_height, lcd.WHITE, lcd.WHITE)
        self.mouth_close = True

    def lip_open(self):
        lcd.rect(self.mouth_x, self.mouth_y, self.mouth_width, self.mouth_height, lcd.BLACK, lcd.BLACK)
        lcd.rect(
            self.mouth_x,
            self.mouth_y - (self.mouth_close_height // 2),
            self.mouth_width,
            self.mouth_height + self.mouth_close_height,
            lcd.WHITE,
            lcd.WHITE
        )
        self.mouth_close = False

    def speak(self, text):
        lcd.textClear(0, (self.wh - self.fh) - 1, self.spaces)
        lcd.print(text, 0, (self.wh - self.fh) - 1)
        time.sleep_ms(3000)
        while lcd.textWidth(text) > 0:
            text = text[1:]
            lcd.textClear(0, (self.wh - self.fh) - 1, self.spaces)
            lcd.print(text, 0, (self.wh - self.fh) - 1)
            self.lipsync()
            time.sleep_ms(200)
        self.lip_close()

    def mouth(self):
        lcd.rect(self.mouth_x, self.mouth_y, self.mouth_width, self.mouth_height, lcd.WHITE, lcd.WHITE)
        while True:
            typ, sender, msg = _thread.getmsg()
            if msg:
                self.speak(msg)
            time.sleep_ms(200)

    def display(self):
        _thread.start_new_thread('eye', self.blink, ())
        self.mouth_thread_id = _thread.start_new_thread('mouth', self.mouth, ())
        while True:
            typ, sender, msg = _thread.getmsg()
            if msg:
                _thread.sendmsg(self.mouth_thread_id, msg)
            time.sleep_ms(200)

def display_weather(face_thread_id):
    weather = Weather()
    while True:
        for prefecture in weather.prefectures:
            weather.get_weather(prefecture)
            text = weather.text(prefecture)
            _thread.sendmsg(face_thread_id, text)
            time.sleep(30)

lcd.setCursor(0, 0)
lcd.setColor(lcd.WHITE)
lcd.font(lcd.FONT_DejaVu24)
lcd.clear()

fw, fh = lcd.fontSize()
ww, wh = lcd.winsize()

face = Face(ww, wh, fw, fh)

face_thread_id = _thread.start_new_thread('face', face.display, ())
_thread.start_new_thread('weather', display_weather, (face_thread_id,))

 Weather クラスの内容は前回と同様です。

 Face クラスでは顔の表示と瞬き表示と、口パクとともにテキストをスクロール表示する処理を行います。

 処理の流れとしては、 Face クラスのインスタンスを生成し、その display() メソッドを実行するスレッドを開始します。 display() メソッドではさらに口の動きと独立して瞬き表示を行うためのスレッドと、口の処理を行うスレッドを開始します。

 続いて天気情報を表示するためのスレッドを開始し、メソッドの引数には上記の display() メソッドを実行しているスレッドのIDを渡します。天気情報を取得してテキストを生成したら、そのスレッドIDを指定してテキストを _thread.sendmsg() メソッドでメッセージとして送信します。

 display() メソッドではメッセージを受け取ったらそのメッセージをさらに口の動きを処理しているスレッドに通知し、受け取った側の mouth() メソッドでテキストの表示と口パクの処理を行なっています。口パクの表示ではひとまず今回は1文字スクロールする度に口の開閉を切り替えるようにしてみました。

 目や口の開閉の切り替えは、調べた感じでは描画したオブジェクトを消すという処理はなさそうだったので、背景色と同じ色で描画し直すことで見えなくしてから切り替え後の状態を描画するようにしています。

 図形の表示については下記に API の説明が記載されています。

github.com

動作確認

 上記のコードを実行すると下記の動画のようになります。

 テキストの内容が天気情報なので微妙なところはありますが、それでも口パクがつくだけでアバターが喋ってるような感じに見えますね。 

まとめ

 スレッド処理はそんなに詳しいわけではないので、細かいことはあまり考慮していなかったり、まだまだ実装も適当ですが、とりあえず動かすことができました。ただ、うまく使っていかないと実装がどんどん複雑になっていってしまいそうですし、デバッグ等も難しくなりそうなので、極力シンプルに最低限で使うようにすべきかなと思います。アバター表示はそれっぽいのもができたので、もっと実用的な機能や面白い機能も追加していってアシスタントっぽくできると面白そうかなと考えています。