Source code for salvius.speech

from zorg.adaptor import Adaptor
from zorg.driver import Driver
from multiprocessing import Queue
import speech_recognition
import subprocess

[docs]class SpeechRecognition(Adaptor): def __init__(self, options): super(SpeechRecognition, self).__init__(options) self.recognizer = speech_recognition.Recognizer() self.microphone = speech_recognition.Microphone() # Allow different speech recognition methods to be selected # See self.recognizer_function = options.get( "recognizer_function", "recognize_sphinx" ) self.queue = Queue(maxsize=7) self.stop_listening = None
[docs] def start_listening(self): """ Start the listening process. """ return self.recognizer.listen_in_background( self.microphone, self.callback )
[docs] def connect(self):["jack_control", "start"]) # we only need to calibrate once, before we start listening with self.microphone as source: self.recognizer.adjust_for_ambient_noise(source) # Start listening in the background self.stop_listening = self.start_listening()
# `stop_listening` is now a function that stops background listening
[docs] def disconnect(self): if self.stop_listening: self.stop_listening()["jack_control", "stop"])
[docs] def callback(self, recognizer, audio): recognizer_function = getattr(recognizer, self.recognizer_function) # this is called from the background thread # received audio data, now we'll recognize it using speech recognition try: result = recognizer_function(audio) self.queue.put(result) except speech_recognition.UnknownValueError: self.queue.put("I am sorry, I could not understand that.") except speech_recognition.RequestError as e: m = "My speech recognition service has failed. {0}" self.queue.put(m.format(e))
[docs]class ApiDriver(Driver): def __init__(self, options, connection): super(ApiDriver, self).__init__(options, connection) self.commands += ["get_words"]
[docs] def start(self): self.connection.connect()
[docs] def stop(self): self.connection.disconnect()
[docs] def get_words(self): if self.connection.queue.empty(): return None return self.connection.queue.get()
[docs]class SpeechSynthesis(Driver): def __init__(self, options, connection): super(SpeechSynthesis, self).__init__(options, connection) self.commands += ["reply", "start", "stop"]
[docs] def reply(self, text): import time from espeak import espeak from espeak import core as espeak_core # Get the robot's response to the input text response = self.connection.respond(text) done_synth = [False] def synth_callback(event, pos, length): if event == espeak_core.event_MSG_TERMINATED: done_synth[0] = True espeak.set_SynthCallback(synth_callback) call_result = espeak.synth(response.text) # Wait for the speech to stop while call_result and not done_synth[0]: time.sleep(0.05) return call_result
[docs] def start(self): self.connection.start_listening()
[docs] def stop(self): self.connection.stop_listening()