Set up language and model selection

This commit is contained in:
Chidi Williams 2022-09-26 15:04:09 +01:00
commit dad03d8fe7
3 changed files with 107 additions and 58 deletions

View file

@ -1,6 +1,6 @@
# Buzz
Buzz transcribes audio from your computer's microphones to text using OpenAI's [Whisper](https://github.com/openai/whisper).
Buzz transcribes audio from your computer's microphones to text using OpenAI's [Whisper](https://github.com/openai/whisper). Buzz works by splitting audio recordings into chunks and transcribing the chunks to text using Whisper.
## Requirements

132
gui.py
View file

@ -1,5 +1,5 @@
import enum
from typing import List, Tuple
from typing import List, Optional, Tuple
import pyaudio
import whisper
@ -18,7 +18,7 @@ class Label(QLabel):
class AudioDevicesComboBox(QComboBox):
"""AudioDevicesComboBox is a combo box for selecting audio devices."""
"""AudioDevicesComboBox displays a list of available audio input devices"""
deviceChanged = pyqtSignal(int)
def __init__(self, *args) -> None:
@ -43,35 +43,56 @@ class AudioDevicesComboBox(QComboBox):
def on_index_changed(self, index: int):
self.deviceChanged.emit(self.audio_devices[index][0])
def get_default_device_id(self):
return self.audio_devices[0][0]
class LanguagesComboBox(QComboBox):
"""LanguagesComboBox displays a list of languages available to use with Whisper"""
languageChanged = pyqtSignal(str)
def __init__(self, *args) -> None:
def __init__(self, default_language: str, *args) -> None:
super().__init__(*args)
self.languages = {'none': 'Detect language', **tokenizer.LANGUAGES}
self.languages = {'': 'Detect language', **tokenizer.LANGUAGES}
self.addItems(map(lambda lang: lang.title(), self.languages.values()))
self.currentIndexChanged.connect(self.on_index_changed)
self.setCurrentText(self.languages.get(default_language, '').title())
def on_index_changed(self, index: int):
key = list(self.languages.values())[index]
self.languageChanged.emit(
self.languages.get(key) if key != 'none' else None)
key = list(self.languages.keys())[index]
self.languageChanged.emit(key)
class ModelsComboBox(QComboBox):
"""ModelsComboBox displays the list of available Whisper models for selection
"""
modelChanged = pyqtSignal(str)
"""ModelsComboBox displays the list of available Whisper models for selection"""
modelNameChanged = pyqtSignal(str)
def __init__(self, default_model_name: str, *args) -> None:
super().__init__(*args)
self.models = whisper.available_models()
self.addItems(map(self.label, self.models))
self.currentIndexChanged.connect(self.on_index_changed)
self.setCurrentText(default_model_name)
def on_index_changed(self, index: int):
self.modelNameChanged.emit(self.models[index])
def label(self, model_name: str):
name, lang = (model_name.split('.') + [None])[:2]
if lang:
return "%s (%s)" % (name.title(), lang.upper())
return name.title()
class TextDisplayBox(QTextEdit):
"""TextDisplayBox is a read-only textbox"""
def __init__(self, *args) -> None:
super().__init__(*args)
self.models = whisper.available_models()
self.addItems(self.models)
self.currentIndexChanged.connect(self.on_index_changed)
def on_index_changed(self, index: int):
self.modelChanged.emit(self.models[index])
self.setReadOnly(True)
self.setPlaceholderText('Click Record to begin...')
self.setStyleSheet(
'QTextEdit { padding-left: 5; padding-top: 5; padding-bottom: 5; padding-right: 5; background-color: #151515; border-radius: 6; background-color: #1e1e1e; }')
class RecordButton(QPushButton):
@ -88,7 +109,7 @@ class RecordButton(QPushButton):
self.statusChanged.connect(self.on_status_changed)
def on_click_record(self):
current_status: self.Status
current_status: RecordButton.Status
if self.current_status == self.Status.RECORDING:
current_status = self.Status.STOPPED
else:
@ -113,9 +134,10 @@ class TranscriberWorker(QObject):
text = pyqtSignal(str)
finished = pyqtSignal()
def __init__(self, input_device_index: int = None, *args) -> None:
def __init__(self, model_name: str, language: Optional[str], input_device_index: Optional[int], *args) -> None:
super().__init__(*args)
self.transcriber = Transcriber(text_callback=self.on_next_text)
self.transcriber = Transcriber(
model_name=model_name, language=language, text_callback=self.on_next_text)
self.input_device_index = input_device_index
def run(self):
@ -132,15 +154,14 @@ class TranscriberWorker(QObject):
class Application(QApplication):
current_status = RecordButton.Status.STOPPED
selected_device_id: int = None
thread: Optional[QThread] = None
selected_model_name = 'tiny'
selected_language = 'en'
selected_device_id: int
def __init__(self) -> None:
super().__init__([])
self.setStyleSheet("""QComboBox {
color: #eee;
}""")
self.window = QWidget()
self.window.setFixedSize(400, 400)
@ -150,37 +171,41 @@ class Application(QApplication):
self.audio_devices_combo_box = AudioDevicesComboBox()
self.audio_devices_combo_box.deviceChanged.connect(
self.on_device_changed)
self.selected_device_id = self.audio_devices_combo_box.get_default_device_id()
self.record_button = RecordButton()
self.record_button.statusChanged.connect(self.on_status_changed)
record_button = RecordButton()
record_button.statusChanged.connect(self.on_status_changed)
self.text_box = self.text_box()
self.text_box = TextDisplayBox()
models_combo_box = ModelsComboBox(
default_model_name=self.selected_model_name)
models_combo_box.modelNameChanged.connect(self.on_model_changed)
languages_combo_box = LanguagesComboBox(
default_language=self.selected_language)
languages_combo_box.languageChanged.connect(self.on_language_changed)
layout.addWidget(Label('Model:'), 0, 0, 1, 3)
layout.addWidget(ModelsComboBox(), 0, 3, 1, 9)
layout.addWidget(models_combo_box, 0, 3, 1, 9)
layout.addWidget(Label('Language:'), 1, 0, 1, 3)
layout.addWidget(LanguagesComboBox(), 1, 3, 1, 9)
layout.addWidget(languages_combo_box, 1, 3, 1, 9)
layout.addWidget(Label('Microphone:'), 2, 0, 1, 3)
layout.addWidget(self.audio_devices_combo_box, 2, 3, 1, 9)
layout.addWidget(self.record_button, 3, 9, 1, 3)
layout.addWidget(record_button, 3, 9, 1, 3)
layout.addWidget(self.text_box, 4, 0, 1, 12)
self.window.show()
def text_box(self):
box = QTextEdit()
box.setReadOnly(True)
box.setPlaceholderText('Click Record to begin...')
box.setStyleSheet(
'QTextEdit { padding-left:10; padding-top:10; padding-bottom:10; padding-right:10; background-color: #151515; border-radius: 6; }')
return box
# TODO: might be great to send when the text has been updated rather than appending
def on_next_text(self, text: str):
self.text_box.append(text)
self.text_box.moveCursor(QTextCursor.MoveOperation.End)
self.text_box.insertPlainText(text)
self.text_box.moveCursor(QTextCursor.MoveOperation.End)
def on_device_changed(self, device_id: int):
self.selected_device_id = device_id
@ -193,26 +218,47 @@ class Application(QApplication):
self.audio_devices_combo_box.setDisabled(False)
self.stop_recording()
def on_model_changed(self, model_name: str):
self.selected_model_name = model_name
def on_language_changed(self, language: str):
self.selected_language = language
def start_recording(self):
# Clear text box placeholder
# Clear text box placeholder because the first chunk takes a while to process
self.text_box.setPlaceholderText('')
# Thread needs to be attached to app object to live after end of method
# Transcribing the recording chunks is a blocking
# process, so we handle this in a new thread
# Wait for previous thread to complete in case stop_recording isn't yet done
if self.thread != None:
self.thread.wait()
self.thread = QThread()
self.transcriber_worker = TranscriberWorker(
input_device_index=self.selected_device_id)
input_device_index=self.selected_device_id,
model_name=self.selected_model_name,
language=self.selected_language if self.selected_language != '' else None,
)
self.transcriber_worker.moveToThread(self.thread)
# Connect worker and thread such that the worker runs once
# the thread starts and the thread quits once the worker finishes
self.thread.started.connect(self.transcriber_worker.run)
self.transcriber_worker.finished.connect(self.thread.quit)
self.transcriber_worker.finished.connect(
self.transcriber_worker.deleteLater)
self.thread.finished.connect(self.thread.deleteLater)
self.thread.finished.connect(self.clean_up_thread)
self.transcriber_worker.text.connect(self.on_next_text)
self.thread.start()
def clean_up_thread(self):
self.thread.deleteLater()
self.thread = None
def stop_recording(self):
self.transcriber_worker.stop_recording()

View file

@ -1,10 +1,10 @@
import logging
import os
import platform
import sys
import tempfile
import wave
from datetime import datetime
from typing import Callable
from typing import Callable, Optional
import pyaudio
import whisper
@ -15,12 +15,15 @@ os.environ["PATH"] += os.pathsep + "/usr/local/bin"
class Transcriber:
"""Transcriber records audio from a system microphone and transcribes it into text using Whisper."""
# Number of times the queue is greater than the frames_per_chunk
# after which the transcriber will stop queueing new frames
chunk_drop_factor = 5
def __init__(self, model_name="tiny", language=None, text_callback: Callable[[str], None] = print) -> None:
def __init__(self, model_name: str, language: Optional[str], text_callback: Callable[[str], None]) -> None:
self.pyaudio = pyaudio.PyAudio()
self.model_name = model_name
self.model = whisper.load_model(model_name)
self.stream = None
self.frames = []
@ -29,8 +32,9 @@ class Transcriber:
self.language = language
def start_recording(self, frames_per_buffer=1024, sample_format=pyaudio.paInt16,
channels=1, rate=44100, chunk_duration=4, input_device_index=None):
logging.debug("Recording...")
channels=1, rate=44100, chunk_duration=5, input_device_index: Optional[int] = None):
logging.debug("Recording with language \"%s\", model \"%s\"" %
(self.language, self.model_name))
self.stream = self.pyaudio.open(format=sample_format,
channels=channels,
rate=rate,
@ -77,7 +81,7 @@ class Transcriber:
except KeyboardInterrupt as e:
self.stop_recording()
os.remove(chunk_path)
raise e
sys.exit(0)
def stream_callback(self, in_data, frame_count, time_info, status):
# Append new frame only if the queue is not larger than the chunk drop factor
@ -86,11 +90,12 @@ class Transcriber:
return in_data, pyaudio.paContinue
def stop_recording(self):
logging.debug("Ending recording...")
self.stopped = True
self.stream.stop_stream()
self.stream.close()
self.pyaudio.terminate()
if self.stream != None:
logging.debug("Ending recording...")
self.stopped = True
self.stream.stop_stream()
self.stream.close()
self.pyaudio.terminate()
def write_chunk(self, path, channels, rate, frames):
logging.debug('Writing chunk to path: %s' % path)
@ -103,11 +108,9 @@ class Transcriber:
wavefile.close()
return path
def chunk_path(self):
def chunk_path(self) -> str:
"""Returns the path where a chunk should be saved using the
system's temp directory and a unique filename.
"""
chunk_id = "clip-%s.wav" % (datetime.utcnow().strftime('%Y%m%d%H%M%S'))
return os.path.join(tempfile.gettempdir(), chunk_id)
# https://stackoverflow.com/a/43418319/9830227
def tmp_dir(self):
# return tempfile.gettempdir()
return "/tmp" if platform.system() == "Darwin" else tempfile.gettempdir()