Source code for salvius.speech
from zorg.adaptor import Adaptor
from zorg.driver import Driver
from multiprocessing import Queue
import speech_recognition
import subprocess
[docs]class SpeechRecognition(Adaptor):
def __init__(self, options):
super(SpeechRecognition, self).__init__(options)
self.recognizer = speech_recognition.Recognizer()
self.microphone = speech_recognition.Microphone()
# Allow different speech recognition methods to be selected
# See https://pypi.python.org/pypi/SpeechRecognition/
self.recognizer_function = options.get(
"recognizer_function", "recognize_sphinx"
)
self.queue = Queue(maxsize=7)
self.stop_listening = None
[docs] def start_listening(self):
"""
Start the listening process.
"""
return self.recognizer.listen_in_background(
self.microphone,
self.callback
)
[docs] def connect(self):
subprocess.call(["jack_control", "start"])
# we only need to calibrate once, before we start listening
with self.microphone as source:
self.recognizer.adjust_for_ambient_noise(source)
# Start listening in the background
self.stop_listening = self.start_listening()
# `stop_listening` is now a function that stops background listening
[docs] def disconnect(self):
if self.stop_listening:
self.stop_listening()
subprocess.call(["jack_control", "stop"])
[docs] def callback(self, recognizer, audio):
recognizer_function = getattr(recognizer, self.recognizer_function)
# this is called from the background thread
# received audio data, now we'll recognize it using speech recognition
try:
result = recognizer_function(audio)
self.queue.put(result)
except speech_recognition.UnknownValueError:
self.queue.put("I am sorry, I could not understand that.")
except speech_recognition.RequestError as e:
m = "My speech recognition service has failed. {0}"
self.queue.put(m.format(e))
[docs]class ApiDriver(Driver):
def __init__(self, options, connection):
super(ApiDriver, self).__init__(options, connection)
self.commands += ["get_words"]
[docs] def start(self):
self.connection.connect()
[docs] def stop(self):
self.connection.disconnect()
[docs] def get_words(self):
if self.connection.queue.empty():
return None
return self.connection.queue.get()
[docs]class SpeechSynthesis(Driver):
def __init__(self, options, connection):
super(SpeechSynthesis, self).__init__(options, connection)
self.commands += ["reply", "start", "stop"]
[docs] def reply(self, text):
import time
from espeak import espeak
from espeak import core as espeak_core
# Get the robot's response to the input text
response = self.connection.respond(text)
done_synth = [False]
def synth_callback(event, pos, length):
if event == espeak_core.event_MSG_TERMINATED:
done_synth[0] = True
espeak.set_SynthCallback(synth_callback)
call_result = espeak.synth(response.text)
# Wait for the speech to stop
while call_result and not done_synth[0]:
time.sleep(0.05)
return call_result
[docs] def start(self):
self.connection.start_listening()
[docs] def stop(self):
self.connection.stop_listening()