buzz/transcriber.py
2022-09-28 00:45:49 +01:00

87 lines
3.2 KiB
Python

import enum
import logging
import os
import queue
from threading import Thread
from typing import Callable, Optional
import numpy as np
import sounddevice
import whisper
# When the app is opened as a .app from Finder, the path doesn't contain /usr/local/bin
# which breaks the call to run `ffmpeg`. This sets the path manually to fix that.
os.environ["PATH"] += os.pathsep + "/usr/local/bin"
class Transcriber:
"""Transcriber records audio from a system microphone and transcribes it into text using Whisper."""
class Task(enum.Enum):
TRANSLATE = "translate"
TRANSCRIBE = "transcribe"
current_thread: Optional[Thread]
current_stream: Optional[sounddevice.InputStream]
is_running = False
MAX_QUEUE_SIZE = 10
def __init__(self, model_name: str, language: Optional[str],
text_callback: Callable[[str], None], task: Task) -> None:
self.model_name = model_name
self.model = whisper.load_model(model_name)
self.current_stream = None
self.text_callback = text_callback
self.language = language
self.task = task
self.queue: queue.Queue[np.ndarray] = queue.Queue(
Transcriber.MAX_QUEUE_SIZE,
)
def start_recording(self, block_duration=10, input_device_index: Optional[int] = None):
logging.debug("Recording... language: \"%s\", model: \"%s\", task: \"%s\", device: \"%s\", block duration: \"%s\"" %
(self.language, self.model_name, self.task, input_device_index, block_duration))
self.current_stream = sounddevice.InputStream(
samplerate=whisper.audio.SAMPLE_RATE,
blocksize=block_duration * whisper.audio.SAMPLE_RATE,
device=input_device_index, dtype="float32",
channels=1, callback=self.stream_callback)
self.current_stream.start()
self.is_running = True
self.current_thread = Thread(target=self.process_queue)
self.current_thread.start()
def process_queue(self):
while self.is_running:
try:
block = self.queue.get(block=False)
logging.debug(
'Processing next frame. Current queue size: %d' % self.queue.qsize())
result = self.model.transcribe(
audio=block, language=self.language, task=self.task)
logging.debug("Received next result: \"%s\"" % result["text"])
self.text_callback(result["text"])
except queue.Empty:
continue
def stream_callback(self, in_data, frame_count, time_info, status):
# Try to enqueue the next block. If the queue is already full, drop the block.
try:
self.queue.put(in_data.ravel(), block=False)
except queue.Full:
return
def stop_recording(self):
if self.current_stream != None:
self.current_stream.close()
logging.debug('Closed recording stream')
self.is_running = False
self.queue.queue.clear()
if self.current_thread != None:
logging.debug('Waiting for processing thread to terminate')
self.current_thread.join()
logging.debug('Processing thread terminated')