#
# This file is part of Dragonfly.
# (c) Copyright 2007, 2008 by Christo Butcher
# Licensed under the LGPL.
#
# Dragonfly is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Dragonfly is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with Dragonfly. If not, see
# <http://www.gnu.org/licenses/>.
#
"""
SAPI 5 engine classes
============================================================================
"""
#---------------------------------------------------------------------------
import logging
import time
import os.path
import pythoncom
from datetime import datetime
from ctypes import c_int, pointer, windll, WinError, WINFUNCTYPE
from ctypes.wintypes import MSG, POINT, DWORD, HANDLE, HWND, LONG
import win32con
from six import string_types, integer_types
from win32com.client import Dispatch, getevents, constants
from win32com.client.gencache import EnsureDispatch
from dragonfly.grammar.recobs import RecognitionObserver
from dragonfly.windows.window import Window
from dragonfly.engines.base import (EngineBase, EngineError,
MimicFailure, DelegateTimerManager,
DelegateTimerManagerInterface,
GrammarWrapperBase)
from dragonfly.engines.backend_sapi5.speaker import Sapi5Speaker
from dragonfly.engines.backend_sapi5.compiler import Sapi5Compiler
from dragonfly.engines.backend_sapi5.recobs import Sapi5RecObsManager
#===========================================================================
class MimicObserver(RecognitionObserver):
_log = logging.getLogger("SAPI5 RecObs")
def __init__(self):
RecognitionObserver.__init__(self)
self.status = "none"
def on_recognition(self, words):
self._log.debug("SAPI5 RecObs on_recognition(): %r" % (words,))
self.status = "recognition: %r" % (words,)
def on_failure(self):
self._log.debug("SAPI5 RecObs on_failure()")
self.status = "failure"
#===========================================================================
[docs]
class Sapi5SharedEngine(EngineBase, DelegateTimerManagerInterface):
""" Speech recognition engine back-end for SAPI 5 shared recognizer. """
_name = "sapi5shared"
recognizer_dispatch_name = "SAPI.SpSharedRecognizer"
#-----------------------------------------------------------------------
def __init__(self, retain_dir=None):
"""
:param retain_dir: Retains recognized audio and/or metadata in the
given directory, saving audio to ``retain_[timestamp].wav`` file
and metadata to ``retain.tsv``.
Disabled by default (``None``).
:type retain_dir: str|None
"""
EngineBase.__init__(self)
DelegateTimerManagerInterface.__init__(self)
EnsureDispatch(self.recognizer_dispatch_name)
EnsureDispatch("SAPI.SpVoice")
self._recognizer = None
self._compiler = None
self._speaker = None
self._recognition_observer_manager = Sapi5RecObsManager(self)
self._timer_manager = DelegateTimerManager(0.02, self)
if isinstance(retain_dir, string_types) or retain_dir is None:
self._retain_dir = retain_dir
else:
self._retain_dir = None
self._log.error("Invalid retain_dir: %r" % retain_dir)
[docs]
def connect(self):
""" Connect to back-end SR engine. """
self._recognizer = Dispatch(self.recognizer_dispatch_name)
self._speaker = Sapi5Speaker()
self._compiler = Sapi5Compiler()
[docs]
def disconnect(self):
""" Disconnect from back-end SR engine. """
self._recognizer = None
self._speaker = None
self._compiler = None
#-----------------------------------------------------------------------
# Methods for working with grammars.
def _load_grammar(self, grammar):
""" Load the given *grammar*. """
self._log.debug("Loading grammar %s." % grammar.name)
if not self._recognizer:
self.connect()
# Create recognition context, compile grammar, and create
# the grammar wrapper object for managing this grammar.
context = self._recognizer.CreateRecoContext()
# TODO Once audio retention is made modular, this block will need
# to be exposed as an engine option "retain_audio". Otherwise,
# as I understand it, audio retention won't work.
if self._retain_dir:
context.RetainedAudio = constants.SRAORetainAudio
handle = self._compiler.compile_grammar(grammar, context)
wrapper = GrammarWrapper(grammar, handle, context, self)
handle.State = constants.SGSEnabled
for rule in grammar.rules:
handle.CmdSetRuleState(rule.name, constants.SGDSActive)
# self.activate_grammar(grammar)
# for l in grammar.lists:
# l._update()
handle.CmdSetRuleState("_FakeRule", constants.SGDSActive)
return wrapper
def _unload_grammar(self, grammar, wrapper):
""" Unload the given *grammar*. """
try:
wrapper.handle.State = constants.SGSDisabled
except Exception as e:
self._log.exception("Failed to unload grammar %s: %s."
% (grammar, e))
[docs]
def activate_grammar(self, grammar):
""" Activate the given *grammar*. """
self._log.debug("Activating grammar %s." % grammar.name)
grammar_handle = self._get_grammar_wrapper(grammar).handle
grammar_handle.State = constants.SGSEnabled
[docs]
def deactivate_grammar(self, grammar):
""" Deactivate the given *grammar*. """
self._log.debug("Deactivating grammar %s." % grammar.name)
grammar_handle = self._get_grammar_wrapper(grammar).handle
grammar_handle.State = constants.SGSDisabled
[docs]
def activate_rule(self, rule, grammar):
""" Activate the given *rule*. """
self._log.debug("Activating rule %s in grammar %s."
% (rule.name, grammar.name))
grammar_handle = self._get_grammar_wrapper(grammar).handle
grammar_handle.CmdSetRuleState(rule.name, constants.SGDSActive)
[docs]
def deactivate_rule(self, rule, grammar):
""" Deactivate the given *rule*. """
self._log.debug("Deactivating rule %s in grammar %s."
% (rule.name, grammar.name))
grammar_handle = self._get_grammar_wrapper(grammar).handle
grammar_handle.CmdSetRuleState(rule.name, constants.SGDSInactive)
def update_list(self, lst, grammar):
grammar_handle = self._get_grammar_wrapper(grammar).handle
list_rule_name = "__list_%s" % lst.name
rule_handle = grammar_handle.Rules.FindRule(list_rule_name)
rule_handle.Clear()
src_state = rule_handle.InitialState
dst_state = None
for item in lst.get_list_items():
src_state.AddWordTransition(dst_state, item)
grammar_handle.Rules.Commit()
[docs]
def set_exclusiveness(self, grammar, exclusive):
self._log.debug("Setting exclusiveness of grammar %s to %s."
% (grammar.name, exclusive))
wrapper = self._get_grammar_wrapper(grammar)
if exclusive and wrapper.handle.State != constants.SGSExclusive:
wrapper.state_before_exclusive = wrapper.handle.State
wrapper.handle.State = constants.SGSExclusive
elif not exclusive and wrapper.handle.State == constants.SGSExclusive:
assert wrapper.state_before_exclusive in (constants.SGSEnabled,
constants.SGSDisabled)
wrapper.handle.State = wrapper.state_before_exclusive
# grammar_handle.SetGrammarState(constants.SPGS_EXCLUSIVE)
#-----------------------------------------------------------------------
# Miscellaneous methods.
[docs]
def mimic(self, words):
"""
Mimic a recognition of the given *words*.
.. note:: This method has a few quirks to be aware of:
#. Mimic can fail to recognize a command if the relevant grammar
is not yet active.
#. Mimic does not work reliably with the shared recognizer unless
there are one or more exclusive grammars active.
#. Mimic can **crash the process** in some circumstances, e.g.
when mimicking non-ASCII characters.
"""
self._log.debug("SAPI5 mimic: %r" % (words,))
if isinstance(words, string_types):
phrase = words
else:
phrase = " ".join(words)
# Fail on empty input.
if not phrase:
raise MimicFailure("Invalid mimic input %r" % phrase)
# Register a recognition observer for checking the success of this
# mimic.
observer = MimicObserver()
observer.register()
# Emulate recognition of the phrase and wait for recognition to
# finish, timing out after 2 seconds.
self._recognizer.EmulateRecognition(phrase)
timeout = 2
NULL = c_int(win32con.NULL)
if timeout != None:
begin_time = time.time()
windll.user32.SetTimer(NULL, NULL, int(timeout * 1000), NULL)
message = MSG()
message_pointer = pointer(message)
while (not timeout) or (time.time() - begin_time < timeout):
if timeout:
self._log.debug("SAPI5 message loop: %s sec left"
% (timeout + begin_time - time.time()))
else:
self._log.debug("SAPI5 message loop: no timeout")
if windll.user32.GetMessageW(message_pointer, NULL, 0, 0) == 0:
msg = str(WinError())
self._log.error("GetMessageW() failed: %s" % msg)
raise EngineError("GetMessageW() failed: %s" % msg)
self._log.debug("SAPI5 message: %r" % (message.message,))
if message.message == win32con.WM_TIMER:
# A timer message means this loop has timed out.
self._log.debug("SAPI5 message loop timed out: %s sec left"
% (timeout + begin_time - time.time()))
break
else:
# Process other messages as normal.
self._log.debug("SAPI5 message translating and dispatching.")
windll.user32.TranslateMessage(message_pointer)
windll.user32.DispatchMessageW(message_pointer)
if observer.status.startswith("recognition:"):
# The previous message was a recognition which matched.
self._log.debug("SAPI5 message caused recognition.")
# Unregister the observer and check its status.
observer.unregister()
if observer.status == "failure":
raise MimicFailure("Mimic failed.")
elif observer.status == "none":
raise MimicFailure("Mimic failed, nothing happened.")
[docs]
def speak(self, text):
""" Speak the given *text* using text-to-speech. """
self._speaker.speak(text)
def _get_language(self):
if not self._recognizer:
return "en"
# Get Windows language identifiers for supported languages from the
# recognizer's current status information.
languages = self._recognizer.Status.SupportedLanguages
# Lookup and return the language tag for the first supported
# language ID.
if languages:
return self._get_language_tag(languages[0])
else:
return "en"
def _do_recognition(self):
"""
Recognize speech in a loop.
This will also call any scheduled timer functions and ensure
that the correct window context is used.
"""
# Register for window change events to activate/deactivate grammars
# and rules on window changes, including window title changes. This
# is done here because the SAPI5 'OnPhraseStart' grammar callback is
# called after grammar state changes are allowed.
WinEventProcType = WINFUNCTYPE(None, HANDLE, DWORD, HWND, LONG,
LONG, DWORD, DWORD)
self._last_foreground_window = None
self._last_foreground_window_title = None
def callback(hWinEventHook, event, hwnd, idObject, idChild,
dwEventThread, dwmsEventTime):
window = Window.get_foreground()
# Note: hwnd doesn't always match window.handle, even when
# foreground window changed (and sometimes it didn't change)
window_changed = (
window != self._last_foreground_window or
window == self._last_foreground_window and
window.title != self._last_foreground_window_title
)
if window_changed:
self.process_grammars_context(window)
self._last_foreground_window = window
self._last_foreground_window_title = window.title
def set_hook(win_event_proc, event_type):
return windll.user32.SetWinEventHook(
event_type, event_type, 0, win_event_proc, 0, 0,
win32con.WINEVENT_OUTOFCONTEXT)
win_event_proc = WinEventProcType(callback)
windll.user32.SetWinEventHook.restype = HANDLE
events = {win32con.EVENT_SYSTEM_FOREGROUND,
win32con.EVENT_OBJECT_NAMECHANGE}
hook_ids = [set_hook(win_event_proc, event) for event in events]
# Recognize speech, call timer functions and handle window change
# events in a loop. Stop on disconnect().
self.speak('beginning loop!')
try:
while self._recognizer is not None:
pythoncom.PumpWaitingMessages()
self.call_timer_callback()
time.sleep(0.005)
finally:
# Unregister event hooks.
for hook_id in hook_ids:
windll.user32.UnhookWinEvent(hook_id)
#---------------------------------------------------------------------------
# Make the shared engine available as Sapi5Engine, for backwards
# compatibility.
Sapi5Engine = Sapi5SharedEngine
#===========================================================================
[docs]
class Sapi5InProcEngine(Sapi5SharedEngine):
"""
Speech recognition engine back-end for SAPI 5 in process
recognizer.
"""
_name = "sapi5inproc"
recognizer_dispatch_name = "SAPI.SpInProcRecognizer"
[docs]
def connect(self, audio_source=0):
"""
Connect to the speech recognition backend.
The audio source to use for speech recognition can be
specified using the *audio_source* argument. If it is not
given, it defaults to the first audio source found.
"""
Sapi5SharedEngine.connect(self)
self.select_audio_source(audio_source)
[docs]
def get_audio_sources(self):
"""
Get the available audio sources.
This method returns a list of audio sources, each represented
by a 3-element tuple: the index, the description, and the COM
handle for the audio source.
"""
available_sources = self._recognizer.GetAudioInputs()
audio_sources_list = []
for index, item in enumerate(collection_iter(available_sources)):
audio_sources_list.append((index, item.GetDescription(), item))
return audio_sources_list
[docs]
def select_audio_source(self, audio_source):
"""
Configure the speech recognition engine to use the given
audio source.
The audio source may be specified as follows:
- As an *int* specifying the index of the audio source to use
- As a *str* containing the description of the audio source
to use, or a substring thereof
The :meth:`get_audio_sources()` method can be used to
retrieve the available sources together with their indices
and descriptions.
"""
available_sources = self._recognizer.GetAudioInputs()
if isinstance(audio_source, integer_types):
# Parameter is the index of the source to use.
if 0 <= audio_source < available_sources.Count:
selected_source = available_sources.Item(audio_source)
else:
raise EngineError("Invalid audio source index: %r"
" (%s sources available, so index must be"
" in range 0 to %s)"
% (audio_source, available_sources.Count,
available_sources.Count - 1))
elif isinstance(audio_source, string_types):
for item in collection_iter(available_sources):
if audio_source in item.GetDescription():
selected_source = item
break
else:
raise EngineError("Audio source not found: %r"
% (audio_source))
else:
raise EngineError("Invalid audio source qualifier: %r"
% (audio_source))
self._log.info("Selecting audio source: %r"
% (selected_source.GetDescription(),))
self._recognizer.AudioInput = selected_source
#---------------------------------------------------------------------------
# Utility generator function for iterating over COM collections.
def collection_iter(collection):
if not collection:
return
for index in range(0, collection.Count):
yield collection.Item(index)
#---------------------------------------------------------------------------
class GrammarWrapper(GrammarWrapperBase):
def __init__(self, grammar, handle, context, engine):
GrammarWrapperBase.__init__(self, grammar, engine)
self.handle = handle
self.context = context
self.state_before_exclusive = handle.State
# Register callback functions which will handle recognizer events.
base = getevents("SAPI.SpSharedRecoContext")
class ContextEvents(base): pass
c = ContextEvents(context)
c.OnPhraseStart = self.phrase_start_callback
c.OnRecognition = self.recognition_callback
# OnRecognitionForOtherContext is disabled because the recognition
# results given to it are not useful.
#if hasattr(grammar, "process_recognition_other"):
# c.OnRecognitionForOtherContext = self.recognition_other_callback
if hasattr(grammar, "process_recognition_failure"):
c.OnFalseRecognition = self.recognition_failure_callback
def phrase_start_callback(self, stream_number, stream_position):
window = Window.get_foreground()
self.grammar.process_begin(window.executable, window.title,
window.handle)
# FIXME Extract to an example command module using "process_recognition_other".
def _retain_audio(self, newResult, results, rule_name):
# Only write audio data and metadata if the directory exists.
retain_dir = self.engine._retain_dir
if retain_dir and not os.path.isdir(retain_dir):
self.engine._log.warning(
"Audio was not retained because '%s' was not a "
"directory" % retain_dir
)
elif retain_dir:
try:
file_stream = Dispatch("SAPI.SpFileStream")
# Note: application can also retrieve smaller portions
# of the audio stream by specifying a starting phrase
# element and phrase element length.
audio_stream = newResult.Audio()
# Make sure we have audio data, which we wouldn't from a
# mimic or if the retain flag wasn't set above.
if audio_stream:
# Write audio data.
file_stream.Format = audio_stream.Format
now = datetime.now()
filename = ("retain_%s.wav"
% now.strftime("%Y-%m-%d_%H-%M-%S_%f"))
wav_path = os.path.join(retain_dir, filename)
flags = constants.SSFMCreateForWrite
file_stream.Open(wav_path, flags)
try:
file_stream.Write(audio_stream.GetData())
finally:
file_stream.Close()
# Write metadata
words = ' '.join([r[2] for r in results])
audio_length = int(newResult.Times.Length) / 1e7
tsv_path = os.path.join(retain_dir, "retain.tsv")
with open(tsv_path, "a") as tsv_file:
tsv_file.write('\t'.join([
filename, str(audio_length),
self.grammar.name, rule_name, words
]) + '\n')
except:
self.engine._log.exception("Exception retaining audio")
def recognition_callback(self, StreamNumber, StreamPosition,
RecognitionType, Result):
try:
newResult = Dispatch(Result)
phrase_info = newResult.PhraseInfo
rule_name = phrase_info.Rule.Name
#---------------------------------------------------------------
# Build a list of rule names for each element.
# First populate it with the top level rule name.
element = phrase_info.Rule
name = element.Name
start = element.FirstElement
count = element.NumberOfElements
rule_names = [name] * count
# Walk the tree of child rules and put their names in the list.
stack = [collection_iter(phrase_info.Rule.Children)]
while stack:
try: element = next(stack[-1])
except StopIteration: stack.pop(); continue
name = element.Name
start = element.FirstElement
count = element.NumberOfElements
rule_names[start:start + count] = [name] * count
if element.Children:
stack.append(collection_iter(element.Children))
#---------------------------------------------------------------
# Prepare the words and rule names for the element parsers.
replacements = [False] * len(rule_names)
if phrase_info.Replacements:
for replacement in collection_iter(phrase_info.Replacements):
begin = replacement.FirstElement
end = begin + replacement.NumberOfElements
replacements[begin] = replacement.Text
for index in range(begin + 1, end):
replacements[index] = True
results = []
rule_set = list(set(rule_names))
elements = phrase_info.Elements
for index in range(len(rule_names)):
element = elements.Item(index)
rule_id = rule_set.index(rule_names[index])
# Map dictation rule IDs to 1M so that dragonfly recognizes
# the words as dictation.
if rule_names[index] == "dgndictation":
rule_id = 1000000
replacement = replacements[index]
info = [element.LexicalForm, rule_id,
element.DisplayText, element.DisplayAttributes,
replacement]
results.append(info)
#---------------------------------------------------------------
# Retain audio
self._retain_audio(results, newResult, rule_name)
#---------------------------------------------------------------
# Attempt to parse the recognition.
if self.process_results(results, rule_set, newResult, True):
return
except Exception as e:
Sapi5Engine._log.error("Grammar %s: exception: %s"
% (self.grammar._name, e), exc_info=True)
#-------------------------------------------------------------------
# If this point is reached, then the recognition was not
# processed successfully.
self._log.error("Grammar %s: failed to decode recognition %r.",
self.grammar._name, [r[0] for r in results])
def recognition_failure_callback(self, StreamNumber, StreamPosition,
Result):
func = getattr(self.grammar, "process_recognition_failure", None)
self._process_grammar_callback(func, results=Dispatch(Result))