mirror of
https://github.com/chidiwilliams/buzz.git
synced 2026-03-14 14:45:46 +01:00
Adding tests
This commit is contained in:
parent
7258290128
commit
49eaa87d4a
2 changed files with 286 additions and 6 deletions
|
|
@ -730,12 +730,16 @@ class RecordingTranscriberWidget(QWidget):
|
|||
"""Append a new column to a single-row CSV export file, applying max_entries limit."""
|
||||
existing_columns = []
|
||||
if os.path.isfile(file_path):
|
||||
raw = RecordingTranscriberWidget.read_export_file(file_path)
|
||||
if raw.strip():
|
||||
reader = csv.reader(io.StringIO(raw))
|
||||
for row in reader:
|
||||
existing_columns = row
|
||||
break
|
||||
try:
|
||||
with open(file_path, "r", encoding="utf-8-sig") as f:
|
||||
raw = f.read()
|
||||
if raw.strip():
|
||||
reader = csv.reader(io.StringIO(raw))
|
||||
for row in reader:
|
||||
existing_columns = row
|
||||
break
|
||||
except OSError:
|
||||
pass
|
||||
existing_columns.append(text)
|
||||
if max_entries > 0:
|
||||
existing_columns = existing_columns[-max_entries:]
|
||||
|
|
|
|||
|
|
@ -890,6 +890,56 @@ class TestOnTranscriptionOptionsChanged:
|
|||
|
||||
|
||||
|
||||
def _model_loaded_ctx(qtbot, enable_llm_translation=False):
|
||||
from buzz.transcriber.transcriber import TranscriptionOptions
|
||||
ctx = _widget_ctx(qtbot)
|
||||
widget = ctx.__enter__.__self__ if hasattr(ctx, '__enter__') else None
|
||||
|
||||
class _Ctx:
|
||||
def __enter__(self_inner):
|
||||
self_inner.widget = ctx.__enter__()
|
||||
self_inner.widget.transcription_options = TranscriptionOptions(
|
||||
enable_llm_translation=enable_llm_translation
|
||||
)
|
||||
return self_inner.widget
|
||||
|
||||
def __exit__(self_inner, *args):
|
||||
return ctx.__exit__(*args)
|
||||
|
||||
return _Ctx()
|
||||
|
||||
|
||||
class TestTranslatorSetup:
|
||||
@pytest.mark.timeout(60)
|
||||
def test_translator_created_when_llm_enabled(self, qtbot):
|
||||
with _model_loaded_ctx(qtbot, enable_llm_translation=True) as widget, \
|
||||
patch("buzz.widgets.recording_transcriber_widget.Translator") as MockTranslator, \
|
||||
patch("buzz.widgets.recording_transcriber_widget.RecordingTranscriber"), \
|
||||
patch("buzz.widgets.recording_transcriber_widget.QThread"):
|
||||
widget.on_model_loaded("/fake/model/path")
|
||||
MockTranslator.assert_called_once()
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_translator_not_created_when_llm_disabled(self, qtbot):
|
||||
with _model_loaded_ctx(qtbot, enable_llm_translation=False) as widget, \
|
||||
patch("buzz.widgets.recording_transcriber_widget.Translator") as MockTranslator, \
|
||||
patch("buzz.widgets.recording_transcriber_widget.RecordingTranscriber"), \
|
||||
patch("buzz.widgets.recording_transcriber_widget.QThread"):
|
||||
widget.on_model_loaded("/fake/model/path")
|
||||
MockTranslator.assert_not_called()
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_translator_translation_signal_connected_to_on_next_translation(self, qtbot):
|
||||
with _model_loaded_ctx(qtbot, enable_llm_translation=True) as widget, \
|
||||
patch("buzz.widgets.recording_transcriber_widget.Translator") as MockTranslator, \
|
||||
patch("buzz.widgets.recording_transcriber_widget.RecordingTranscriber"), \
|
||||
patch("buzz.widgets.recording_transcriber_widget.QThread"):
|
||||
mock_translator_instance = MagicMock()
|
||||
MockTranslator.return_value = mock_translator_instance
|
||||
widget.on_model_loaded("/fake/model/path")
|
||||
mock_translator_instance.translation.connect.assert_called_with(widget.on_next_translation)
|
||||
|
||||
|
||||
class TestOnDeviceChanged:
|
||||
@pytest.mark.timeout(60)
|
||||
def test_no_new_listener_started_when_device_is_none(self, qtbot):
|
||||
|
|
@ -1040,6 +1090,107 @@ class TestOnNextTranscriptionExport:
|
|||
finally:
|
||||
os.unlink(export_path)
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_append_above_csv_prepends_new_column(self, qtbot):
|
||||
import csv
|
||||
with _widget_ctx(qtbot) as widget, tempfile.NamedTemporaryFile(
|
||||
suffix=".csv", delete=False, mode="w"
|
||||
) as f:
|
||||
export_path = f.name
|
||||
|
||||
try:
|
||||
widget.transcriber_mode = RecordingTranscriberMode.APPEND_ABOVE
|
||||
widget.export_enabled = True
|
||||
widget.transcript_export_file = export_path
|
||||
widget.settings.set_value(
|
||||
Settings.Key.RECORDING_TRANSCRIBER_EXPORT_FILE_TYPE, "csv"
|
||||
)
|
||||
widget.on_next_transcription("first")
|
||||
widget.on_next_transcription("second")
|
||||
|
||||
with open(export_path, newline="") as f:
|
||||
rows = list(csv.reader(f))
|
||||
assert len(rows) == 1
|
||||
assert rows[0][0] == "second"
|
||||
assert rows[0][1] == "first"
|
||||
finally:
|
||||
os.unlink(export_path)
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_append_above_csv_respects_max_entries(self, qtbot):
|
||||
import csv
|
||||
with _widget_ctx(qtbot) as widget, tempfile.NamedTemporaryFile(
|
||||
suffix=".csv", delete=False, mode="w"
|
||||
) as f:
|
||||
export_path = f.name
|
||||
|
||||
try:
|
||||
widget.transcriber_mode = RecordingTranscriberMode.APPEND_ABOVE
|
||||
widget.export_enabled = True
|
||||
widget.transcript_export_file = export_path
|
||||
widget.settings.set_value(
|
||||
Settings.Key.RECORDING_TRANSCRIBER_EXPORT_FILE_TYPE, "csv"
|
||||
)
|
||||
widget.settings.set_value(
|
||||
Settings.Key.RECORDING_TRANSCRIBER_EXPORT_MAX_ENTRIES, 2
|
||||
)
|
||||
widget.on_next_transcription("first")
|
||||
widget.on_next_transcription("second")
|
||||
widget.on_next_transcription("third")
|
||||
|
||||
with open(export_path, newline="") as f:
|
||||
rows = list(csv.reader(f))
|
||||
assert len(rows) == 1
|
||||
assert len(rows[0]) == 2
|
||||
assert rows[0][0] == "third"
|
||||
assert rows[0][1] == "second"
|
||||
finally:
|
||||
os.unlink(export_path)
|
||||
|
||||
|
||||
|
||||
class TestUploadToServer:
|
||||
@pytest.mark.timeout(60)
|
||||
def test_transcript_uploaded_when_upload_url_set(self, qtbot):
|
||||
with _widget_ctx(qtbot) as widget, \
|
||||
patch("buzz.widgets.recording_transcriber_widget.requests.post") as mock_post:
|
||||
widget.upload_url = "http://example.com/upload"
|
||||
widget.on_next_transcription("hello upload")
|
||||
mock_post.assert_called_once_with(
|
||||
url="http://example.com/upload",
|
||||
json={"kind": "transcript", "text": "hello upload"},
|
||||
headers={"Content-Type": "application/json"},
|
||||
timeout=15,
|
||||
)
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_transcript_not_uploaded_when_upload_url_empty(self, qtbot):
|
||||
with _widget_ctx(qtbot) as widget, \
|
||||
patch("buzz.widgets.recording_transcriber_widget.requests.post") as mock_post:
|
||||
widget.upload_url = ""
|
||||
widget.on_next_transcription("no upload")
|
||||
mock_post.assert_not_called()
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_transcript_upload_failure_does_not_raise(self, qtbot):
|
||||
with _widget_ctx(qtbot) as widget, \
|
||||
patch("buzz.widgets.recording_transcriber_widget.requests.post",
|
||||
side_effect=Exception("connection error")):
|
||||
widget.upload_url = "http://example.com/upload"
|
||||
widget.on_next_transcription("hello") # should not raise
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_translation_uploaded_when_upload_url_set(self, qtbot):
|
||||
with _widget_ctx(qtbot) as widget, \
|
||||
patch("buzz.widgets.recording_transcriber_widget.requests.post") as mock_post:
|
||||
widget.upload_url = "http://example.com/upload"
|
||||
widget.on_next_translation("bonjour")
|
||||
mock_post.assert_called_once_with(
|
||||
url="http://example.com/upload",
|
||||
json={"kind": "translation", "text": "bonjour"},
|
||||
headers={"Content-Type": "application/json"},
|
||||
timeout=15,
|
||||
)
|
||||
|
||||
|
||||
class TestOnNextTranslation:
|
||||
|
|
@ -1176,6 +1327,131 @@ class TestExportFileHelpers:
|
|||
assert result == ""
|
||||
|
||||
|
||||
class TestWriteCsvExport:
|
||||
def test_creates_csv_with_single_entry(self, tmp_path):
|
||||
path = str(tmp_path / "out.csv")
|
||||
RecordingTranscriberWidget.write_csv_export(path, "hello", 0)
|
||||
with open(path, encoding="utf-8-sig") as f:
|
||||
content = f.read()
|
||||
assert "hello" in content
|
||||
|
||||
def test_appends_column_to_existing_csv(self, tmp_path):
|
||||
import csv
|
||||
path = str(tmp_path / "out.csv")
|
||||
RecordingTranscriberWidget.write_csv_export(path, "first", 0)
|
||||
RecordingTranscriberWidget.write_csv_export(path, "second", 0)
|
||||
with open(path, encoding="utf-8-sig") as f:
|
||||
rows = list(csv.reader(f))
|
||||
assert rows[0] == ["first", "second"]
|
||||
|
||||
def test_max_entries_limits_columns(self, tmp_path):
|
||||
import csv
|
||||
path = str(tmp_path / "out.csv")
|
||||
for word in ["a", "b", "c", "d"]:
|
||||
RecordingTranscriberWidget.write_csv_export(path, word, max_entries=3)
|
||||
with open(path, encoding="utf-8-sig") as f:
|
||||
rows = list(csv.reader(f))
|
||||
assert rows[0] == ["b", "c", "d"]
|
||||
|
||||
def test_max_entries_zero_means_no_limit(self, tmp_path):
|
||||
import csv
|
||||
path = str(tmp_path / "out.csv")
|
||||
for i in range(10):
|
||||
RecordingTranscriberWidget.write_csv_export(path, str(i), max_entries=0)
|
||||
with open(path, encoding="utf-8-sig") as f:
|
||||
rows = list(csv.reader(f))
|
||||
assert len(rows[0]) == 10
|
||||
|
||||
def test_handles_empty_existing_file(self, tmp_path):
|
||||
import csv
|
||||
path = str(tmp_path / "out.csv")
|
||||
with open(path, "w") as f:
|
||||
f.write("")
|
||||
RecordingTranscriberWidget.write_csv_export(path, "entry", 0)
|
||||
with open(path, encoding="utf-8-sig") as f:
|
||||
rows = list(csv.reader(f))
|
||||
assert rows[0] == ["entry"]
|
||||
|
||||
def test_retries_on_permission_error(self, tmp_path):
|
||||
path = str(tmp_path / "out.csv")
|
||||
call_count = [0]
|
||||
original_open = open
|
||||
|
||||
def flaky_open(p, mode="r", **kwargs):
|
||||
if p == path and "w" in mode:
|
||||
call_count[0] += 1
|
||||
if call_count[0] < 3:
|
||||
raise PermissionError("locked")
|
||||
return original_open(p, mode, **kwargs)
|
||||
|
||||
with patch("builtins.open", side_effect=flaky_open), \
|
||||
patch("time.sleep"):
|
||||
RecordingTranscriberWidget.write_csv_export(path, "data", 0)
|
||||
|
||||
assert call_count[0] == 3
|
||||
|
||||
|
||||
class TestWriteTxtExport:
|
||||
def test_append_mode_adds_text(self, tmp_path):
|
||||
path = str(tmp_path / "out.txt")
|
||||
RecordingTranscriberWidget.write_txt_export(path, "line1", "a", 0, "\n")
|
||||
RecordingTranscriberWidget.write_txt_export(path, "line2", "a", 0, "\n")
|
||||
with open(path) as f:
|
||||
content = f.read()
|
||||
assert content == "line1\nline2\n"
|
||||
|
||||
def test_append_mode_max_entries_trims_oldest(self, tmp_path):
|
||||
path = str(tmp_path / "out.txt")
|
||||
for word in ["a", "b", "c", "d"]:
|
||||
RecordingTranscriberWidget.write_txt_export(path, word, "a", max_entries=3, line_separator="\n")
|
||||
with open(path) as f:
|
||||
content = f.read()
|
||||
parts = [p for p in content.split("\n") if p]
|
||||
assert parts == ["b", "c", "d"]
|
||||
|
||||
def test_prepend_mode_puts_text_first(self, tmp_path):
|
||||
path = str(tmp_path / "out.txt")
|
||||
RecordingTranscriberWidget.write_txt_export(path, "first", "a", 0, "\n")
|
||||
RecordingTranscriberWidget.write_txt_export(path, "second", "prepend", 0, "\n")
|
||||
with open(path) as f:
|
||||
content = f.read()
|
||||
parts = [p for p in content.split("\n") if p]
|
||||
assert parts[0] == "second"
|
||||
assert parts[1] == "first"
|
||||
|
||||
def test_prepend_mode_max_entries_trims_oldest(self, tmp_path):
|
||||
path = str(tmp_path / "out.txt")
|
||||
RecordingTranscriberWidget.write_txt_export(path, "old1", "a", 0, "\n")
|
||||
RecordingTranscriberWidget.write_txt_export(path, "old2", "a", 0, "\n")
|
||||
RecordingTranscriberWidget.write_txt_export(path, "new", "prepend", max_entries=2, line_separator="\n")
|
||||
with open(path) as f:
|
||||
content = f.read()
|
||||
parts = [p for p in content.split("\n") if p]
|
||||
assert parts == ["new", "old1"]
|
||||
|
||||
def test_write_mode_overwrites(self, tmp_path):
|
||||
path = str(tmp_path / "out.txt")
|
||||
RecordingTranscriberWidget.write_txt_export(path, "old", "w", 0, "\n")
|
||||
RecordingTranscriberWidget.write_txt_export(path, "new", "w", 0, "\n")
|
||||
with open(path) as f:
|
||||
content = f.read()
|
||||
assert content == "new"
|
||||
|
||||
def test_prepend_on_nonexistent_file(self, tmp_path):
|
||||
path = str(tmp_path / "out.txt")
|
||||
RecordingTranscriberWidget.write_txt_export(path, "only", "prepend", 0, "\n")
|
||||
with open(path) as f:
|
||||
content = f.read()
|
||||
assert "only" in content
|
||||
|
||||
def test_append_max_entries_zero_means_no_limit(self, tmp_path):
|
||||
path = str(tmp_path / "out.txt")
|
||||
for i in range(10):
|
||||
RecordingTranscriberWidget.write_txt_export(path, str(i), "a", max_entries=0, line_separator="\n")
|
||||
with open(path) as f:
|
||||
parts = [p for p in f.read().split("\n") if p]
|
||||
assert len(parts) == 10
|
||||
|
||||
|
||||
class TestPresentationTranslationSync:
|
||||
@pytest.mark.timeout(60)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue