前回は Voice Kit に Web カメラを接続して、音声で写真を撮れるようにしてみましたが、カメラにどんな映像が写っているかは写真を撮ってみないとわからなかったので、今回はカメラからの映像をデスクトップに映しつつ、写真を撮れるようにしてみました。デバイスの接続は前回と同様で、コードのみの変更です。
OpenCV での動画撮影のサンプル
OpenCV での動画の撮影の仕方については下記リンク先のチュートリアルを参考にさせていただきました。
動画を扱う — OpenCV-Python Tutorials 1 documentation
基本的な処理は VideoCapture クラスの read() メソッドで読み込んだ画像を cv2.imshow() メソッドで表示するという処理を無限ループで繰り返しています。
実装する内容
上記チュートリアルの内容を前回のコードをベースに組み込んで行きます。実装する処理としては、スクリプトを起動したら Web カメラの映像をデスクトップに表示開始し、 "OK Google. Take a picture." と発話したらカウントダウンして写真を撮影し、その画像を数秒表示したらまた Web カメラの映像を開始するというものです。
ここで問題になるのは、上記サンプルの通り、映像の表示は無限ループで行われているため、このまま前回のコードの処理の一部に入れてしまうとそこで処理がブロックされ、発話に対する処理がブロックされてしまいます。なので今回は Thread を使い、映像の表示と発話に対する処理をそれぞれ独立した Thread で行うようにしました。 Thread を使用する例は Voice Kit のガイドでも解説されていて、 hotword によるリクエストと、キット上部のボタンによるリクエストの両方を同時に待ち受けるためのサンプルが提供されています。
映像表示用 Thread
まず映像表示用 Thread で実行する処理は下記のようなメソッドとして実装します。基本的な処理はチュートリアルの通りで無限ループで画像の読み込みと表示を繰り返していますが、写真の撮影と処理の終了を判断するためのトリガーとして self.take_photo と self.stop というフラグを使用しています。このフラグは発話に対する処理を行う Thread から変更することを想定していて、 self.take_photo が True になったらその時に読み込んでいる画像をファイルに保存し、5秒間表示をキープした後にフラグを False に戻し、無限ループを続行します。
また、プログラムの終了時には発話処理 Thread が終了してもそのままだと映像表示用 Thread は動き続けたままになるため、 発話処理 Thread の終了処理の中で self.stop を True に変更し、映像表示用 Thread で self.stop が True になっていたら無限ループを抜けて処理を終了するようにしています。
def _show_video(self): cap = cv2.VideoCapture(0) while(True): ret, frame = cap.read() cv2.imshow('frame', frame) if self.take_photo: now = datetime.now() filename = 'capture_%s.jpg' % now.strftime('%Y%m%d_%H%M%S') if cv2.imwrite(filename, frame): self.print_and_say('I took nice one.') print('Captured Image: %s' % filename) time.sleep(5) else: self.print_and_say("I couldn't take a picture.") self.take_photo = False cv2.waitKey(1) if self.stop: break cap.release() cv2.destroyAllWindows() print('Stop showing video.')
そしてコンストラクタで上記メソッドを使用する Thread を生成します。
self._show_video_task = threading.Thread(target = self._show_video)
発話処理用 Thread
発話に対する処理も今回は Thread で行うように変更しています。下記のようなメソッドにして Thread で実行します。
def _run_event(self): with Assistant(self.credentials) as assistant: self._assistant = assistant for event in assistant.start(): self._process_event(event)
発話内容が "Take a picture." だった場合の処理からは画像のキャプチャ用処理は削除し、フラグの変更だけを行なっています。
self.take_photo = True
また、発話内容が "Goodbye." だった場合の終了処理にフラグの変更を追加しています。
self.stop = True
そして映像表示用 Thread と同様に、コンストラクタで Thread を生成します。
self._run_event_task = threading.Thread(target = self._run_event)
Thread の処理開始
生成した Thread の start() メソッドを実行することで、処理が開始されます。今回は下記のように各 Thread の処理を開始しています。
def start(self): self._show_video_task.start() self._run_event_task.start()
コード全体
今回実装したコードの全体は下記の通りになります。前回のコードから単純に LED を操作するだけの部分は削除しました。
#!/usr/bin/env python3 import sys import time import cv2 import threading from datetime import datetime import aiy.assistant.auth_helpers import aiy.audio import aiy.voicehat from google.assistant.library import Assistant from google.assistant.library.event import EventType import RPi.GPIO as GPIO class MyAssistant: GPIO_LED_GREEN = 2 GPIO_LED_YELLOW = 3 def __init__(self): self.credentials = aiy.assistant.auth_helpers.get_assistant_credentials() self.status_ui = aiy.voicehat.get_status_ui() GPIO.setmode(GPIO.BCM) GPIO.setup(MyAssistant.GPIO_LED_GREEN, GPIO.OUT) GPIO.setup(MyAssistant.GPIO_LED_YELLOW, GPIO.OUT) self._run_event_task = threading.Thread(target = self._run_event) self._show_video_task = threading.Thread(target = self._show_video) self.take_photo = False self.stop = False def print_and_say(self, text): print(text) aiy.audio.say(text) def start(self): self._show_video_task.start() self._run_event_task.start() def _show_video(self): cap = cv2.VideoCapture(0) while(True): ret, frame = cap.read() cv2.imshow('frame', frame) if self.take_photo: now = datetime.now() filename = 'capture_%s.jpg' % now.strftime('%Y%m%d_%H%M%S') if cv2.imwrite(filename, frame): self.print_and_say('I took nice one.') print('Captured Image: %s' % filename) time.sleep(5) else: self.print_and_say("I couldn't take a picture.") self.take_photo = False cv2.waitKey(1) if self.stop: break cap.release() cv2.destroyAllWindows() print('Stop showing video.') def _run_event(self): with Assistant(self.credentials) as assistant: self._assistant = assistant for event in assistant.start(): self._process_event(event) def _process_event(self, event): if event.type == EventType.ON_START_FINISHED: self.status_ui.status('ready') aiy.audio.say("OK, I'm ready.") if sys.stdout.isatty(): print('Say "OK, Google" then speak.') elif event.type == EventType.ON_RECOGNIZING_SPEECH_FINISHED and event.args: print('You said: %s' % event.args['text']) text = event.args['text'].lower() if text == 'take a picture': self._assistant.stop_conversation() GPIO.output(MyAssistant.GPIO_LED_GREEN, GPIO.LOW) GPIO.output(MyAssistant.GPIO_LED_YELLOW, GPIO.LOW) aiy.audio.say('OK, 3') GPIO.output(MyAssistant.GPIO_LED_GREEN, GPIO.HIGH) time.sleep(1) aiy.audio.say('2') GPIO.output(MyAssistant.GPIO_LED_GREEN, GPIO.LOW) GPIO.output(MyAssistant.GPIO_LED_YELLOW, GPIO.HIGH) time.sleep(1) aiy.audio.say('1') GPIO.output(MyAssistant.GPIO_LED_GREEN, GPIO.HIGH) time.sleep(1) self.take_photo = True GPIO.output(MyAssistant.GPIO_LED_GREEN, GPIO.LOW) GPIO.output(MyAssistant.GPIO_LED_YELLOW, GPIO.LOW) elif text == 'goodbye': self.status_ui.status('stopping') self._assistant.stop_conversation() self.stop = True aiy.audio.say('Goodbye. See you again.') print('Stop processing event.') sys.exit() elif event.type == EventType.ON_CONVERSATION_TURN_STARTED: self.status_ui.status('listening') elif event.type == EventType.ON_END_OF_UTTERANCE: self.status_ui.status('thinking') elif event.type == EventType.ON_CONVERSATION_TURN_FINISHED: self.status_ui.status('ready') elif event.type == EventType.ON_ASSISTANT_ERROR and event.args and event.args['is_fatal']: sys.exit(1) def main(self): self.status_ui.status('starting') self.start() if __name__ == '__main__': sample = MyAssistant() sample.main()
動作確認
上記スクリプトを実行した様子は下記の動画のようになります。ひとまず想定したように動作しているようです。
まとめ
各スレッド間の連携はフラグを用いただけの簡単な実装ですし色々と適当な実装ですが、ひとまず映像を確認しながら写真を撮ることができるようになりました。発話内容に対する処理の方は別スレッドにせずにメインスレッドのままにして、映像表示の方だけ別スレッドにする形でも良かったかもしれませんがまぁ今回は良しということで。今後はこれにさらに Motion Detection やその他のトリガーの追加も試してみたいと思います。