mirror of
https://github.com/chidiwilliams/buzz.git
synced 2026-03-14 22:55:46 +01:00
87 lines
3.2 KiB
Python
87 lines
3.2 KiB
Python
import enum
|
|
import logging
|
|
import os
|
|
import queue
|
|
from threading import Thread
|
|
from typing import Callable, Optional
|
|
|
|
import numpy as np
|
|
import sounddevice
|
|
import whisper
|
|
|
|
# When the app is opened as a .app from Finder, the path doesn't contain /usr/local/bin
|
|
# which breaks the call to run `ffmpeg`. This sets the path manually to fix that.
|
|
os.environ["PATH"] += os.pathsep + "/usr/local/bin"
|
|
|
|
|
|
class Transcriber:
|
|
"""Transcriber records audio from a system microphone and transcribes it into text using Whisper."""
|
|
|
|
class Task(enum.Enum):
|
|
TRANSLATE = "translate"
|
|
TRANSCRIBE = "transcribe"
|
|
|
|
current_thread: Optional[Thread]
|
|
current_stream: Optional[sounddevice.InputStream]
|
|
is_running = False
|
|
MAX_QUEUE_SIZE = 10
|
|
|
|
def __init__(self, model_name: str, language: Optional[str],
|
|
text_callback: Callable[[str], None], task: Task) -> None:
|
|
self.model_name = model_name
|
|
self.model = whisper.load_model(model_name)
|
|
self.current_stream = None
|
|
self.text_callback = text_callback
|
|
self.language = language
|
|
self.task = task
|
|
self.queue: queue.Queue[np.ndarray] = queue.Queue(
|
|
Transcriber.MAX_QUEUE_SIZE,
|
|
)
|
|
|
|
def start_recording(self, block_duration=10, input_device_index: Optional[int] = None):
|
|
logging.debug("Recording... language: \"%s\", model: \"%s\", task: \"%s\", device: \"%s\", block duration: \"%s\"" %
|
|
(self.language, self.model_name, self.task, input_device_index, block_duration))
|
|
self.current_stream = sounddevice.InputStream(
|
|
samplerate=whisper.audio.SAMPLE_RATE,
|
|
blocksize=block_duration * whisper.audio.SAMPLE_RATE,
|
|
device=input_device_index, dtype="float32",
|
|
channels=1, callback=self.stream_callback)
|
|
self.current_stream.start()
|
|
|
|
self.is_running = True
|
|
|
|
self.current_thread = Thread(target=self.process_queue)
|
|
self.current_thread.start()
|
|
|
|
def process_queue(self):
|
|
while self.is_running:
|
|
try:
|
|
block = self.queue.get(block=False)
|
|
logging.debug(
|
|
'Processing next frame. Current queue size: %d' % self.queue.qsize())
|
|
result = self.model.transcribe(
|
|
audio=block, language=self.language, task=self.task)
|
|
logging.debug("Received next result: \"%s\"" % result["text"])
|
|
self.text_callback(result["text"])
|
|
except queue.Empty:
|
|
continue
|
|
|
|
def stream_callback(self, in_data, frame_count, time_info, status):
|
|
# Try to enqueue the next block. If the queue is already full, drop the block.
|
|
try:
|
|
self.queue.put(in_data.ravel(), block=False)
|
|
except queue.Full:
|
|
return
|
|
|
|
def stop_recording(self):
|
|
if self.current_stream != None:
|
|
self.current_stream.close()
|
|
logging.debug('Closed recording stream')
|
|
|
|
self.is_running = False
|
|
self.queue.queue.clear()
|
|
|
|
if self.current_thread != None:
|
|
logging.debug('Waiting for processing thread to terminate')
|
|
self.current_thread.join()
|
|
logging.debug('Processing thread terminated')
|