From 49eaa87d4ac53c9554508451226fa04fff8d4161 Mon Sep 17 00:00:00 2001 From: Raivis Dejus Date: Fri, 27 Feb 2026 12:43:37 +0200 Subject: [PATCH] Adding tests --- buzz/widgets/recording_transcriber_widget.py | 16 +- .../recording_transcriber_widget_test.py | 276 ++++++++++++++++++ 2 files changed, 286 insertions(+), 6 deletions(-) diff --git a/buzz/widgets/recording_transcriber_widget.py b/buzz/widgets/recording_transcriber_widget.py index 4011c46f..8218d163 100644 --- a/buzz/widgets/recording_transcriber_widget.py +++ b/buzz/widgets/recording_transcriber_widget.py @@ -730,12 +730,16 @@ class RecordingTranscriberWidget(QWidget): """Append a new column to a single-row CSV export file, applying max_entries limit.""" existing_columns = [] if os.path.isfile(file_path): - raw = RecordingTranscriberWidget.read_export_file(file_path) - if raw.strip(): - reader = csv.reader(io.StringIO(raw)) - for row in reader: - existing_columns = row - break + try: + with open(file_path, "r", encoding="utf-8-sig") as f: + raw = f.read() + if raw.strip(): + reader = csv.reader(io.StringIO(raw)) + for row in reader: + existing_columns = row + break + except OSError: + pass existing_columns.append(text) if max_entries > 0: existing_columns = existing_columns[-max_entries:] diff --git a/tests/widgets/recording_transcriber_widget_test.py b/tests/widgets/recording_transcriber_widget_test.py index 333c72ca..4daeb9c8 100644 --- a/tests/widgets/recording_transcriber_widget_test.py +++ b/tests/widgets/recording_transcriber_widget_test.py @@ -890,6 +890,56 @@ class TestOnTranscriptionOptionsChanged: +def _model_loaded_ctx(qtbot, enable_llm_translation=False): + from buzz.transcriber.transcriber import TranscriptionOptions + ctx = _widget_ctx(qtbot) + widget = ctx.__enter__.__self__ if hasattr(ctx, '__enter__') else None + + class _Ctx: + def __enter__(self_inner): + self_inner.widget = ctx.__enter__() + self_inner.widget.transcription_options = TranscriptionOptions( + enable_llm_translation=enable_llm_translation + ) + return self_inner.widget + + def __exit__(self_inner, *args): + return ctx.__exit__(*args) + + return _Ctx() + + +class TestTranslatorSetup: + @pytest.mark.timeout(60) + def test_translator_created_when_llm_enabled(self, qtbot): + with _model_loaded_ctx(qtbot, enable_llm_translation=True) as widget, \ + patch("buzz.widgets.recording_transcriber_widget.Translator") as MockTranslator, \ + patch("buzz.widgets.recording_transcriber_widget.RecordingTranscriber"), \ + patch("buzz.widgets.recording_transcriber_widget.QThread"): + widget.on_model_loaded("/fake/model/path") + MockTranslator.assert_called_once() + + @pytest.mark.timeout(60) + def test_translator_not_created_when_llm_disabled(self, qtbot): + with _model_loaded_ctx(qtbot, enable_llm_translation=False) as widget, \ + patch("buzz.widgets.recording_transcriber_widget.Translator") as MockTranslator, \ + patch("buzz.widgets.recording_transcriber_widget.RecordingTranscriber"), \ + patch("buzz.widgets.recording_transcriber_widget.QThread"): + widget.on_model_loaded("/fake/model/path") + MockTranslator.assert_not_called() + + @pytest.mark.timeout(60) + def test_translator_translation_signal_connected_to_on_next_translation(self, qtbot): + with _model_loaded_ctx(qtbot, enable_llm_translation=True) as widget, \ + patch("buzz.widgets.recording_transcriber_widget.Translator") as MockTranslator, \ + patch("buzz.widgets.recording_transcriber_widget.RecordingTranscriber"), \ + patch("buzz.widgets.recording_transcriber_widget.QThread"): + mock_translator_instance = MagicMock() + MockTranslator.return_value = mock_translator_instance + widget.on_model_loaded("/fake/model/path") + mock_translator_instance.translation.connect.assert_called_with(widget.on_next_translation) + + class TestOnDeviceChanged: @pytest.mark.timeout(60) def test_no_new_listener_started_when_device_is_none(self, qtbot): @@ -1040,6 +1090,107 @@ class TestOnNextTranscriptionExport: finally: os.unlink(export_path) + @pytest.mark.timeout(60) + def test_append_above_csv_prepends_new_column(self, qtbot): + import csv + with _widget_ctx(qtbot) as widget, tempfile.NamedTemporaryFile( + suffix=".csv", delete=False, mode="w" + ) as f: + export_path = f.name + + try: + widget.transcriber_mode = RecordingTranscriberMode.APPEND_ABOVE + widget.export_enabled = True + widget.transcript_export_file = export_path + widget.settings.set_value( + Settings.Key.RECORDING_TRANSCRIBER_EXPORT_FILE_TYPE, "csv" + ) + widget.on_next_transcription("first") + widget.on_next_transcription("second") + + with open(export_path, newline="") as f: + rows = list(csv.reader(f)) + assert len(rows) == 1 + assert rows[0][0] == "second" + assert rows[0][1] == "first" + finally: + os.unlink(export_path) + + @pytest.mark.timeout(60) + def test_append_above_csv_respects_max_entries(self, qtbot): + import csv + with _widget_ctx(qtbot) as widget, tempfile.NamedTemporaryFile( + suffix=".csv", delete=False, mode="w" + ) as f: + export_path = f.name + + try: + widget.transcriber_mode = RecordingTranscriberMode.APPEND_ABOVE + widget.export_enabled = True + widget.transcript_export_file = export_path + widget.settings.set_value( + Settings.Key.RECORDING_TRANSCRIBER_EXPORT_FILE_TYPE, "csv" + ) + widget.settings.set_value( + Settings.Key.RECORDING_TRANSCRIBER_EXPORT_MAX_ENTRIES, 2 + ) + widget.on_next_transcription("first") + widget.on_next_transcription("second") + widget.on_next_transcription("third") + + with open(export_path, newline="") as f: + rows = list(csv.reader(f)) + assert len(rows) == 1 + assert len(rows[0]) == 2 + assert rows[0][0] == "third" + assert rows[0][1] == "second" + finally: + os.unlink(export_path) + + + +class TestUploadToServer: + @pytest.mark.timeout(60) + def test_transcript_uploaded_when_upload_url_set(self, qtbot): + with _widget_ctx(qtbot) as widget, \ + patch("buzz.widgets.recording_transcriber_widget.requests.post") as mock_post: + widget.upload_url = "http://example.com/upload" + widget.on_next_transcription("hello upload") + mock_post.assert_called_once_with( + url="http://example.com/upload", + json={"kind": "transcript", "text": "hello upload"}, + headers={"Content-Type": "application/json"}, + timeout=15, + ) + + @pytest.mark.timeout(60) + def test_transcript_not_uploaded_when_upload_url_empty(self, qtbot): + with _widget_ctx(qtbot) as widget, \ + patch("buzz.widgets.recording_transcriber_widget.requests.post") as mock_post: + widget.upload_url = "" + widget.on_next_transcription("no upload") + mock_post.assert_not_called() + + @pytest.mark.timeout(60) + def test_transcript_upload_failure_does_not_raise(self, qtbot): + with _widget_ctx(qtbot) as widget, \ + patch("buzz.widgets.recording_transcriber_widget.requests.post", + side_effect=Exception("connection error")): + widget.upload_url = "http://example.com/upload" + widget.on_next_transcription("hello") # should not raise + + @pytest.mark.timeout(60) + def test_translation_uploaded_when_upload_url_set(self, qtbot): + with _widget_ctx(qtbot) as widget, \ + patch("buzz.widgets.recording_transcriber_widget.requests.post") as mock_post: + widget.upload_url = "http://example.com/upload" + widget.on_next_translation("bonjour") + mock_post.assert_called_once_with( + url="http://example.com/upload", + json={"kind": "translation", "text": "bonjour"}, + headers={"Content-Type": "application/json"}, + timeout=15, + ) class TestOnNextTranslation: @@ -1176,6 +1327,131 @@ class TestExportFileHelpers: assert result == "" +class TestWriteCsvExport: + def test_creates_csv_with_single_entry(self, tmp_path): + path = str(tmp_path / "out.csv") + RecordingTranscriberWidget.write_csv_export(path, "hello", 0) + with open(path, encoding="utf-8-sig") as f: + content = f.read() + assert "hello" in content + + def test_appends_column_to_existing_csv(self, tmp_path): + import csv + path = str(tmp_path / "out.csv") + RecordingTranscriberWidget.write_csv_export(path, "first", 0) + RecordingTranscriberWidget.write_csv_export(path, "second", 0) + with open(path, encoding="utf-8-sig") as f: + rows = list(csv.reader(f)) + assert rows[0] == ["first", "second"] + + def test_max_entries_limits_columns(self, tmp_path): + import csv + path = str(tmp_path / "out.csv") + for word in ["a", "b", "c", "d"]: + RecordingTranscriberWidget.write_csv_export(path, word, max_entries=3) + with open(path, encoding="utf-8-sig") as f: + rows = list(csv.reader(f)) + assert rows[0] == ["b", "c", "d"] + + def test_max_entries_zero_means_no_limit(self, tmp_path): + import csv + path = str(tmp_path / "out.csv") + for i in range(10): + RecordingTranscriberWidget.write_csv_export(path, str(i), max_entries=0) + with open(path, encoding="utf-8-sig") as f: + rows = list(csv.reader(f)) + assert len(rows[0]) == 10 + + def test_handles_empty_existing_file(self, tmp_path): + import csv + path = str(tmp_path / "out.csv") + with open(path, "w") as f: + f.write("") + RecordingTranscriberWidget.write_csv_export(path, "entry", 0) + with open(path, encoding="utf-8-sig") as f: + rows = list(csv.reader(f)) + assert rows[0] == ["entry"] + + def test_retries_on_permission_error(self, tmp_path): + path = str(tmp_path / "out.csv") + call_count = [0] + original_open = open + + def flaky_open(p, mode="r", **kwargs): + if p == path and "w" in mode: + call_count[0] += 1 + if call_count[0] < 3: + raise PermissionError("locked") + return original_open(p, mode, **kwargs) + + with patch("builtins.open", side_effect=flaky_open), \ + patch("time.sleep"): + RecordingTranscriberWidget.write_csv_export(path, "data", 0) + + assert call_count[0] == 3 + + +class TestWriteTxtExport: + def test_append_mode_adds_text(self, tmp_path): + path = str(tmp_path / "out.txt") + RecordingTranscriberWidget.write_txt_export(path, "line1", "a", 0, "\n") + RecordingTranscriberWidget.write_txt_export(path, "line2", "a", 0, "\n") + with open(path) as f: + content = f.read() + assert content == "line1\nline2\n" + + def test_append_mode_max_entries_trims_oldest(self, tmp_path): + path = str(tmp_path / "out.txt") + for word in ["a", "b", "c", "d"]: + RecordingTranscriberWidget.write_txt_export(path, word, "a", max_entries=3, line_separator="\n") + with open(path) as f: + content = f.read() + parts = [p for p in content.split("\n") if p] + assert parts == ["b", "c", "d"] + + def test_prepend_mode_puts_text_first(self, tmp_path): + path = str(tmp_path / "out.txt") + RecordingTranscriberWidget.write_txt_export(path, "first", "a", 0, "\n") + RecordingTranscriberWidget.write_txt_export(path, "second", "prepend", 0, "\n") + with open(path) as f: + content = f.read() + parts = [p for p in content.split("\n") if p] + assert parts[0] == "second" + assert parts[1] == "first" + + def test_prepend_mode_max_entries_trims_oldest(self, tmp_path): + path = str(tmp_path / "out.txt") + RecordingTranscriberWidget.write_txt_export(path, "old1", "a", 0, "\n") + RecordingTranscriberWidget.write_txt_export(path, "old2", "a", 0, "\n") + RecordingTranscriberWidget.write_txt_export(path, "new", "prepend", max_entries=2, line_separator="\n") + with open(path) as f: + content = f.read() + parts = [p for p in content.split("\n") if p] + assert parts == ["new", "old1"] + + def test_write_mode_overwrites(self, tmp_path): + path = str(tmp_path / "out.txt") + RecordingTranscriberWidget.write_txt_export(path, "old", "w", 0, "\n") + RecordingTranscriberWidget.write_txt_export(path, "new", "w", 0, "\n") + with open(path) as f: + content = f.read() + assert content == "new" + + def test_prepend_on_nonexistent_file(self, tmp_path): + path = str(tmp_path / "out.txt") + RecordingTranscriberWidget.write_txt_export(path, "only", "prepend", 0, "\n") + with open(path) as f: + content = f.read() + assert "only" in content + + def test_append_max_entries_zero_means_no_limit(self, tmp_path): + path = str(tmp_path / "out.txt") + for i in range(10): + RecordingTranscriberWidget.write_txt_export(path, str(i), "a", max_entries=0, line_separator="\n") + with open(path) as f: + parts = [p for p in f.read().split("\n") if p] + assert len(parts) == 10 + class TestPresentationTranslationSync: @pytest.mark.timeout(60)