diff --git a/6-consumer/lessons/1-speech-recognition/code-record/pi/smart-timer/app_button_gpio.py b/6-consumer/lessons/1-speech-recognition/code-record/pi/smart-timer/app_button_gpio.py new file mode 100644 index 0000000000000000000000000000000000000000..b7dd44fa2280ff3705d87aae72bd605e9ef6d087 --- /dev/null +++ b/6-consumer/lessons/1-speech-recognition/code-record/pi/smart-timer/app_button_gpio.py @@ -0,0 +1,219 @@ +import io +import pyaudio +import time +import wave +import requests +import threading +import json + +import RPi.GPIO as GPIO + +speech_api_key = [key] +location = [location] +language = [language] + +# 设置GPIO模式为BCM模式 +GPIO.setmode(GPIO.BCM) + +# 指定要读取的GPIO引脚号 +pin_number = 17 + +# 配置GPIO引脚为输入模式 +GPIO.setup(pin_number, GPIO.IN) + + +audio = pyaudio.PyAudio() + +microphone_card_number = 3 +speaker_card_number = 3 + +rate = 48000 #48KHz + +def get_access_token(): + headers = { + 'Ocp-Apim-Subscription-Key': speech_api_key + } + + token_endpoint = f'https://{location}.api.cognitive.microsoft.com/sts/v1.0/issuetoken' + response = requests.post(token_endpoint, headers=headers) + return str(response.text) + +def convert_speech_to_text(buffer): + url = f'https://{location}.stt.speech.microsoft.com/speech/recognition/conversation/cognitiveservices/v1' + + headers = { + 'Authorization': 'Bearer ' + get_access_token(), + 'Content-Type': f'audio/wav; codecs=audio/pcm; samplerate={rate}', + 'Accept': 'application/json;text/xml' + } + + params = { + 'language': language + } + response = requests.post(url, headers=headers, params=params, data=buffer) + response_json = response.json() + # print(response_json) + if response_json['Offset'] != 0: + if response_json['RecognitionStatus'] == 'Success': + return response_json['DisplayText'] + else: + return '' + +def capture_audio(value): + stream = audio.open(format = pyaudio.paInt16, + rate = rate, + channels = 2, + input_device_index = microphone_card_number, + input = True, + frames_per_buffer = 4096) + + frames = [] + while value==0: + frames.append(stream.read(4096)) + if GPIO.input(pin_number)==1: + break; + + stream.stop_stream() + stream.close() + + wav_buffer = io.BytesIO() + with wave.open(wav_buffer, 'wb') as wavefile: + wavefile.setnchannels(1) + wavefile.setsampwidth(audio.get_sample_size(pyaudio.paInt16)) + wavefile.setframerate(rate) + wavefile.writeframes(b''.join(frames)) + wav_buffer.seek(0) + return wav_buffer + +def play_audio(buffer): + stream = audio.open(format = pyaudio.paInt16, + rate = rate, + channels = 2, + output_device_index = speaker_card_number, + output = True) + + with wave.open(buffer, 'rb') as wf: + data = wf.readframes(4096) + + while len(data) > 0: + stream.write(data) + data = wf.readframes(4096) + + stream.close() + +def get_timer_time(text): + url = 'http://192.168.6.109:7071/api/text-to-timer' + body = { + 'text': text + } + + response = requests.post(url, json=body) + if response.status_code != 200: + return 0 + + payload = response.json() + return payload['seconds'] + +def get_voice(): + url = f'https://{location}.tts.speech.microsoft.com/cognitiveservices/voices/list' + + headers = { + 'Authorization': 'Bearer ' + get_access_token() + } + + response = requests.get(url, headers=headers) + voices_json = json.loads(response.text) + + first_voice = next(x for x in voices_json if x['Locale'].lower() == language.lower() and x['VoiceType'] == 'Neural') + return first_voice['ShortName'] + +voice = get_voice() +print(f'Using voice {voice}') +playback_format = 'riff-48khz-16bit-mono-pcm' + +def get_speech(text): + url = f'https://{location}.tts.speech.microsoft.com/cognitiveservices/v1' + + headers = { + 'Authorization': 'Bearer ' + get_access_token(), + 'Content-Type': 'application/ssml+xml', + 'X-Microsoft-OutputFormat': playback_format + } + + ssml = f'' + ssml += f'' + ssml += text + ssml += '' + ssml += '' + + response = requests.post(url, headers=headers, data=ssml.encode('utf-8')) + return io.BytesIO(response.content) + +def play_speech(speech): + with wave.open(speech, 'rb') as wave_file: + stream = audio.open(format=audio.get_format_from_width(wave_file.getsampwidth()), + #channels=wave_file.getnchannels(), + channels = 2, + rate=wave_file.getframerate(), + output_device_index=speaker_card_number, + output=True) + + data = wave_file.readframes(4096) + + while len(data) > 0: + stream.write(data) + data = wave_file.readframes(4096) + + stream.stop_stream() + stream.close() + +def say(text): + print(text) + speech = get_speech(text) + play_speech(speech) + +def announce_timer(minutes, seconds): + announcement = 'Times up on your ' + if minutes > 0: + announcement += f'{minutes} minute ' + if seconds > 0: + announcement += f'{seconds} second ' + announcement += 'timer.' + say(announcement) + +def create_timer(total_seconds): + minutes, seconds = divmod(total_seconds, 60) + threading.Timer(total_seconds, announce_timer, args=[minutes, seconds]).start() + announcement = '' + if minutes > 0: + announcement += f'{minutes} minute ' + if seconds > 0: + announcement += f'{seconds} second ' + announcement += 'timer started.' + say(announcement) + +def process_text(text): + print(text) + seconds = get_timer_time("Set a two minute 27 second timer") + if seconds > 0: + create_timer(seconds) + + + +try: + while True: + # 读取GPIO引脚的电平值(0表示低电平,1表示高电平) + value = GPIO.input(pin_number) + print(f"GPIO {pin_number} 的电平值为: {value}") + while value==1: + time.sleep(.1) + break; + + buffer = capture_audio(value) + text = convert_speech_to_text(buffer) + process_text(text) +except KeyboardInterrupt: + pass + + +