mirror of
https://github.com/abraunegg/onedrive
synced 2026-03-14 14:35:46 +01:00
Update PR
* Update PR
This commit is contained in:
parent
4ceec373e9
commit
c71469c792
15 changed files with 751 additions and 745 deletions
|
|
@ -1,73 +1,141 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0003DryRunValidation(Wave1TestCaseBase):
|
||||
class TestCase0003DryRunValidation(E2ETestCase):
|
||||
case_id = "0003"
|
||||
name = "dry-run validation"
|
||||
description = "Validate that --dry-run performs no local or remote changes"
|
||||
description = "Validate that --dry-run performs no changes locally or remotely"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
def _root_name(self, context: E2EContext) -> str:
|
||||
return f"ZZ_E2E_TC0003_{context.run_id}_{os.getpid()}"
|
||||
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0003 config\nbypass_data_preservation = \"true\"\n")
|
||||
|
||||
def _bootstrap_confdir(self, context: E2EContext, confdir: Path) -> Path:
|
||||
copied_refresh_token = context.bootstrap_config_dir(confdir)
|
||||
self._write_config(confdir / "config")
|
||||
return copied_refresh_token
|
||||
|
||||
def _create_local_fixture(self, sync_root: Path, root_name: str) -> None:
|
||||
reset_directory(sync_root)
|
||||
write_text_file(sync_root / root_name / "Upload" / "file1.txt", "tc0003 file1\n")
|
||||
write_text_file(sync_root / root_name / "Upload" / "file2.bin", "tc0003 file2\n")
|
||||
write_text_file(sync_root / root_name / "Notes" / "draft.md", "# tc0003\n")
|
||||
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0003"
|
||||
case_log_dir = context.logs_dir / "tc0003"
|
||||
state_dir = context.state_dir / "tc0003"
|
||||
reset_directory(case_work_dir)
|
||||
reset_directory(case_log_dir)
|
||||
reset_directory(state_dir)
|
||||
context.ensure_refresh_token_available()
|
||||
|
||||
sync_root = case_work_dir / "syncroot"
|
||||
seed_confdir = case_work_dir / "conf-seed"
|
||||
verify_root = case_work_dir / "verifyroot"
|
||||
verify_confdir = case_work_dir / "conf-verify"
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
self._create_local_fixture(sync_root, root_name)
|
||||
copied_refresh_token = self._bootstrap_confdir(context, seed_confdir)
|
||||
self._bootstrap_confdir(context, verify_confdir)
|
||||
|
||||
seed_root = case_work_dir / "seed-syncroot"
|
||||
seed_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(seed_root / root_name / "Remote" / "online.txt", "online baseline\n")
|
||||
self._create_text_file(seed_root / root_name / "Remote" / "keep.txt", "keep baseline\n")
|
||||
self._create_binary_file(seed_root / root_name / "Data" / "payload.bin", 64 * 1024)
|
||||
before_manifest = build_manifest(sync_root)
|
||||
before_manifest_file = state_dir / "before_manifest.txt"
|
||||
after_manifest_file = state_dir / "after_manifest.txt"
|
||||
remote_manifest_file = state_dir / "remote_verify_manifest.txt"
|
||||
metadata_file = state_dir / "metadata.txt"
|
||||
stdout_file = case_log_dir / "seed_stdout.log"
|
||||
stderr_file = case_log_dir / "seed_stderr.log"
|
||||
verify_stdout = case_log_dir / "verify_stdout.log"
|
||||
verify_stderr = case_log_dir / "verify_stderr.log"
|
||||
write_manifest(before_manifest_file, before_manifest)
|
||||
|
||||
seed_config_dir = self._new_config_dir(context, case_work_dir, "seed")
|
||||
config_path = self._write_config(seed_config_dir)
|
||||
artifacts.append(str(config_path))
|
||||
seed_result = self._run_onedrive(context, sync_root=seed_root, config_dir=seed_config_dir)
|
||||
artifacts.extend(self._write_command_artifacts(result=seed_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="seed"))
|
||||
artifacts.extend(self._write_manifests(seed_root, case_state_dir, "seed_local"))
|
||||
if seed_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote seed failed with status {seed_result.returncode}", artifacts, {"phase": "seed"})
|
||||
command = [
|
||||
context.onedrive_bin,
|
||||
"--sync",
|
||||
"--verbose",
|
||||
"--dry-run",
|
||||
"--resync",
|
||||
"--resync-auth",
|
||||
"--syncdir",
|
||||
str(sync_root),
|
||||
"--confdir",
|
||||
str(seed_confdir),
|
||||
]
|
||||
context.log(f"Executing Test Case {self.case_id}: {command_to_string(command)}")
|
||||
result = run_command(command, cwd=context.repo_root)
|
||||
write_text_file(stdout_file, result.stdout)
|
||||
write_text_file(stderr_file, result.stderr)
|
||||
|
||||
dry_root = case_work_dir / "dryrun-syncroot"
|
||||
dry_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(dry_root / root_name / "LocalOnly" / "draft.txt", "local only\n")
|
||||
self._create_text_file(dry_root / root_name / "Remote" / "keep.txt", "locally modified but should not upload\n")
|
||||
pre_snapshot = self._snapshot_files(dry_root)
|
||||
artifacts.append(self._write_json_artifact(case_state_dir / "pre_snapshot.json", pre_snapshot))
|
||||
after_manifest = build_manifest(sync_root)
|
||||
write_manifest(after_manifest_file, after_manifest)
|
||||
|
||||
dry_config_dir = self._new_config_dir(context, case_work_dir, "dryrun")
|
||||
config_path = self._write_config(dry_config_dir)
|
||||
artifacts.append(str(config_path))
|
||||
dry_result = self._run_onedrive(context, sync_root=dry_root, config_dir=dry_config_dir, extra_args=["--dry-run"])
|
||||
artifacts.extend(self._write_command_artifacts(result=dry_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="dry_run"))
|
||||
post_snapshot = self._snapshot_files(dry_root)
|
||||
artifacts.append(self._write_json_artifact(case_state_dir / "post_snapshot.json", post_snapshot))
|
||||
verify_command = [
|
||||
context.onedrive_bin,
|
||||
"--sync",
|
||||
"--verbose",
|
||||
"--download-only",
|
||||
"--resync",
|
||||
"--resync-auth",
|
||||
"--syncdir",
|
||||
str(verify_root),
|
||||
"--confdir",
|
||||
str(verify_confdir),
|
||||
]
|
||||
verify_result = run_command(verify_command, cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout)
|
||||
write_text_file(verify_stderr, verify_result.stderr)
|
||||
remote_manifest = build_manifest(verify_root)
|
||||
write_manifest(remote_manifest_file, remote_manifest)
|
||||
|
||||
if dry_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Dry-run exited with status {dry_result.returncode}", artifacts, {"phase": "dry-run"})
|
||||
if pre_snapshot != post_snapshot:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Local filesystem changed during --dry-run", artifacts, {"phase": "dry-run"})
|
||||
metadata_lines = [
|
||||
f"case_id={self.case_id}",
|
||||
f"name={self.name}",
|
||||
f"root_name={root_name}",
|
||||
f"copied_refresh_token={copied_refresh_token}",
|
||||
f"command={command_to_string(command)}",
|
||||
f"returncode={result.returncode}",
|
||||
f"verify_command={command_to_string(verify_command)}",
|
||||
f"verify_returncode={verify_result.returncode}",
|
||||
]
|
||||
write_text_file(metadata_file, "\n".join(metadata_lines) + "\n")
|
||||
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="verify_remote"))
|
||||
artifacts.extend(self._write_manifests(verify_root, case_state_dir, "verify_remote"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification download failed with status {verify_result.returncode}", artifacts)
|
||||
|
||||
downloaded = set(self._snapshot_files(verify_root).keys())
|
||||
expected_present = {
|
||||
f"{root_name}/Remote",
|
||||
f"{root_name}/Remote/online.txt",
|
||||
f"{root_name}/Remote/keep.txt",
|
||||
f"{root_name}/Data",
|
||||
f"{root_name}/Data/payload.bin",
|
||||
artifacts = [
|
||||
str(stdout_file),
|
||||
str(stderr_file),
|
||||
str(verify_stdout),
|
||||
str(verify_stderr),
|
||||
str(before_manifest_file),
|
||||
str(after_manifest_file),
|
||||
str(remote_manifest_file),
|
||||
str(metadata_file),
|
||||
]
|
||||
details = {
|
||||
"command": command,
|
||||
"returncode": result.returncode,
|
||||
"verify_command": verify_command,
|
||||
"verify_returncode": verify_result.returncode,
|
||||
"root_name": root_name,
|
||||
}
|
||||
unexpected_absent = sorted(expected_present - downloaded)
|
||||
if unexpected_absent:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Remote baseline changed after --dry-run", artifacts, {"missing": unexpected_absent})
|
||||
if f"{root_name}/LocalOnly/draft.txt" in downloaded:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Local-only file was uploaded during --dry-run", artifacts)
|
||||
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote seed failed with status {result.returncode}", artifacts, details)
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts, details)
|
||||
if before_manifest != after_manifest:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Local filesystem changed during --dry-run", artifacts, details)
|
||||
if any(entry == root_name or entry.startswith(root_name + "/") for entry in remote_manifest):
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Dry-run unexpectedly synchronised remote content: {root_name}", artifacts, details)
|
||||
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,52 +1,129 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0004SingleDirectorySync(Wave1TestCaseBase):
|
||||
class TestCase0004SingleDirectorySync(E2ETestCase):
|
||||
case_id = "0004"
|
||||
name = "single-directory synchronisation"
|
||||
description = "Validate that only the nominated subtree is synchronised"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0004 config\nbypass_data_preservation = \"true\"\n")
|
||||
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0004"
|
||||
case_log_dir = context.logs_dir / "tc0004"
|
||||
state_dir = context.state_dir / "tc0004"
|
||||
reset_directory(case_work_dir)
|
||||
reset_directory(case_log_dir)
|
||||
reset_directory(state_dir)
|
||||
context.ensure_refresh_token_available()
|
||||
|
||||
sync_root = case_work_dir / "syncroot"
|
||||
sync_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(sync_root / root_name / "Scoped" / "include.txt", "scoped file\n")
|
||||
self._create_text_file(sync_root / root_name / "Scoped" / "Nested" / "deep.txt", "nested scoped\n")
|
||||
self._create_text_file(sync_root / root_name / "Unscoped" / "exclude.txt", "should stay local only\n")
|
||||
confdir = case_work_dir / "conf-main"
|
||||
verify_root = case_work_dir / "verifyroot"
|
||||
verify_confdir = case_work_dir / "conf-verify"
|
||||
|
||||
config_dir = self._new_config_dir(context, case_work_dir, "main")
|
||||
config_path = self._write_config(config_dir)
|
||||
artifacts.append(str(config_path))
|
||||
result = self._run_onedrive(context, sync_root=sync_root, config_dir=config_dir, extra_args=["--single-directory", f"{root_name}/Scoped"])
|
||||
artifacts.extend(self._write_command_artifacts(result=result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="single_directory"))
|
||||
artifacts.extend(self._write_manifests(sync_root, case_state_dir, "local_after"))
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"--single-directory sync failed with status {result.returncode}", artifacts)
|
||||
target_dir = f"ZZ_E2E_TC0004_TARGET_{context.run_id}_{os.getpid()}"
|
||||
other_dir = f"ZZ_E2E_TC0004_OTHER_{context.run_id}_{os.getpid()}"
|
||||
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="verify_remote"))
|
||||
artifacts.extend(self._write_manifests(verify_root, case_state_dir, "remote_manifest"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts)
|
||||
write_text_file(sync_root / target_dir / "keep.txt", "target\n")
|
||||
write_text_file(sync_root / target_dir / "nested" / "inside.md", "nested\n")
|
||||
write_text_file(sync_root / other_dir / "skip.txt", "other\n")
|
||||
|
||||
snapshot = self._snapshot_files(verify_root)
|
||||
required = {
|
||||
f"{root_name}/Scoped",
|
||||
f"{root_name}/Scoped/include.txt",
|
||||
f"{root_name}/Scoped/Nested",
|
||||
f"{root_name}/Scoped/Nested/deep.txt",
|
||||
context.bootstrap_config_dir(confdir)
|
||||
self._write_config(confdir / "config")
|
||||
context.bootstrap_config_dir(verify_confdir)
|
||||
self._write_config(verify_confdir / "config")
|
||||
|
||||
stdout_file = case_log_dir / "single_directory_stdout.log"
|
||||
stderr_file = case_log_dir / "single_directory_stderr.log"
|
||||
verify_stdout = case_log_dir / "verify_stdout.log"
|
||||
verify_stderr = case_log_dir / "verify_stderr.log"
|
||||
local_manifest_file = state_dir / "local_after_manifest.txt"
|
||||
remote_manifest_file = state_dir / "remote_verify_manifest.txt"
|
||||
metadata_file = state_dir / "single_directory_metadata.txt"
|
||||
|
||||
command = [
|
||||
context.onedrive_bin,
|
||||
"--sync",
|
||||
"--verbose",
|
||||
"--resync",
|
||||
"--resync-auth",
|
||||
"--single-directory",
|
||||
target_dir,
|
||||
"--syncdir",
|
||||
str(sync_root),
|
||||
"--confdir",
|
||||
str(confdir),
|
||||
]
|
||||
context.log(f"Executing Test Case {self.case_id}: {command_to_string(command)}")
|
||||
result = run_command(command, cwd=context.repo_root)
|
||||
write_text_file(stdout_file, result.stdout)
|
||||
write_text_file(stderr_file, result.stderr)
|
||||
write_manifest(local_manifest_file, build_manifest(sync_root))
|
||||
|
||||
verify_command = [
|
||||
context.onedrive_bin,
|
||||
"--sync",
|
||||
"--verbose",
|
||||
"--download-only",
|
||||
"--resync",
|
||||
"--resync-auth",
|
||||
"--syncdir",
|
||||
str(verify_root),
|
||||
"--confdir",
|
||||
str(verify_confdir),
|
||||
]
|
||||
verify_result = run_command(verify_command, cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout)
|
||||
write_text_file(verify_stderr, verify_result.stderr)
|
||||
remote_manifest = build_manifest(verify_root)
|
||||
write_manifest(remote_manifest_file, remote_manifest)
|
||||
|
||||
metadata = [
|
||||
f"case_id={self.case_id}",
|
||||
f"target_dir={target_dir}",
|
||||
f"other_dir={other_dir}",
|
||||
f"command={command_to_string(command)}",
|
||||
f"returncode={result.returncode}",
|
||||
f"verify_command={command_to_string(verify_command)}",
|
||||
f"verify_returncode={verify_result.returncode}",
|
||||
]
|
||||
write_text_file(metadata_file, "\n".join(metadata) + "\n")
|
||||
|
||||
artifacts = [
|
||||
str(stdout_file),
|
||||
str(stderr_file),
|
||||
str(verify_stdout),
|
||||
str(verify_stderr),
|
||||
str(local_manifest_file),
|
||||
str(remote_manifest_file),
|
||||
str(metadata_file),
|
||||
]
|
||||
details = {
|
||||
"command": command,
|
||||
"returncode": result.returncode,
|
||||
"verify_returncode": verify_result.returncode,
|
||||
"target_dir": target_dir,
|
||||
"other_dir": other_dir,
|
||||
}
|
||||
missing = sorted(required - set(snapshot.keys()))
|
||||
if missing:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Scoped content was not uploaded as expected", artifacts, {"missing": missing})
|
||||
if f"{root_name}/Unscoped/exclude.txt" in snapshot:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Unscoped content was unexpectedly synchronised", artifacts)
|
||||
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"--single-directory sync failed with status {result.returncode}", artifacts, details)
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts, details)
|
||||
if not any(e == target_dir or e.startswith(target_dir + "/") for e in remote_manifest):
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Target directory was not synchronised: {target_dir}", artifacts, details)
|
||||
if any(e == other_dir or e.startswith(other_dir + "/") for e in remote_manifest):
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Non-target directory was unexpectedly synchronised: {other_dir}", artifacts, details)
|
||||
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,58 +1,103 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0005ForceSyncOverride(Wave1TestCaseBase):
|
||||
class TestCase0005ForceSyncOverride(E2ETestCase):
|
||||
case_id = "0005"
|
||||
name = "force-sync override"
|
||||
description = "Validate that --force-sync overrides skip_dir when using --single-directory"
|
||||
description = "Validate that --force-sync overrides skip_dir for blocked single-directory sync"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
def _write_config(self, config_path: Path, blocked_dir: str) -> None:
|
||||
write_text_file(config_path, f"# tc0005 config\nbypass_data_preservation = \"true\"\nskip_dir = \"{blocked_dir}\"\n")
|
||||
|
||||
seed_root = case_work_dir / "seed-syncroot"
|
||||
seed_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(seed_root / root_name / "Blocked" / "blocked.txt", "blocked remote file\n")
|
||||
seed_conf = self._new_config_dir(context, case_work_dir, "seed")
|
||||
config_path = self._write_config(seed_conf)
|
||||
artifacts.append(str(config_path))
|
||||
seed_result = self._run_onedrive(context, sync_root=seed_root, config_dir=seed_conf)
|
||||
artifacts.extend(self._write_command_artifacts(result=seed_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="seed"))
|
||||
if seed_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote seed failed with status {seed_result.returncode}", artifacts)
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0005"
|
||||
case_log_dir = context.logs_dir / "tc0005"
|
||||
state_dir = context.state_dir / "tc0005"
|
||||
reset_directory(case_work_dir)
|
||||
reset_directory(case_log_dir)
|
||||
reset_directory(state_dir)
|
||||
context.ensure_refresh_token_available()
|
||||
|
||||
no_force_root = case_work_dir / "no-force-syncroot"
|
||||
no_force_root.mkdir(parents=True, exist_ok=True)
|
||||
no_force_conf = self._new_config_dir(context, case_work_dir, "no-force")
|
||||
config_path = self._write_config(no_force_conf, extra_lines=['skip_dir = "Blocked"'])
|
||||
artifacts.append(str(config_path))
|
||||
no_force_result = self._run_onedrive(context, sync_root=no_force_root, config_dir=no_force_conf, extra_args=["--download-only", "--single-directory", f"{root_name}/Blocked"])
|
||||
artifacts.extend(self._write_command_artifacts(result=no_force_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="no_force"))
|
||||
if no_force_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Blocked single-directory sync without --force-sync failed with status {no_force_result.returncode}", artifacts)
|
||||
if (no_force_root / root_name / "Blocked" / "blocked.txt").exists():
|
||||
return TestResult.fail_result(self.case_id, self.name, "Blocked content was downloaded without --force-sync", artifacts)
|
||||
sync_root = case_work_dir / "syncroot"
|
||||
confdir = case_work_dir / "conf-seed"
|
||||
verify_root = case_work_dir / "verifyroot"
|
||||
verify_confdir = case_work_dir / "conf-verify"
|
||||
|
||||
force_root = case_work_dir / "force-syncroot"
|
||||
force_root.mkdir(parents=True, exist_ok=True)
|
||||
force_conf = self._new_config_dir(context, case_work_dir, "force")
|
||||
config_path = self._write_config(force_conf, extra_lines=['skip_dir = "Blocked"'])
|
||||
artifacts.append(str(config_path))
|
||||
force_result = self._run_onedrive(
|
||||
context,
|
||||
sync_root=force_root,
|
||||
config_dir=force_conf,
|
||||
extra_args=["--download-only", "--single-directory", f"{root_name}/Blocked", "--force-sync"],
|
||||
input_text="Y\n",
|
||||
)
|
||||
artifacts.extend(self._write_command_artifacts(result=force_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="force"))
|
||||
artifacts.extend(self._write_manifests(force_root, case_state_dir, "force_manifest"))
|
||||
if force_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Blocked single-directory sync with --force-sync failed with status {force_result.returncode}", artifacts)
|
||||
if not (force_root / root_name / "Blocked" / "blocked.txt").exists():
|
||||
return TestResult.fail_result(self.case_id, self.name, "Blocked content was not downloaded with --force-sync", artifacts)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
blocked_dir = f"ZZ_E2E_TC0005_BLOCKED_{context.run_id}_{os.getpid()}"
|
||||
write_text_file(sync_root / blocked_dir / "allowed_via_force.txt", "force\n")
|
||||
|
||||
context.bootstrap_config_dir(confdir)
|
||||
self._write_config(confdir / "config", blocked_dir)
|
||||
context.bootstrap_config_dir(verify_confdir)
|
||||
write_text_file(verify_confdir / "config", "# tc0005 verify\nbypass_data_preservation = \"true\"\n")
|
||||
|
||||
stdout_file = case_log_dir / "seed_stdout.log"
|
||||
stderr_file = case_log_dir / "seed_stderr.log"
|
||||
verify_stdout = case_log_dir / "verify_stdout.log"
|
||||
verify_stderr = case_log_dir / "verify_stderr.log"
|
||||
remote_manifest_file = state_dir / "remote_verify_manifest.txt"
|
||||
metadata_file = state_dir / "seed_metadata.txt"
|
||||
|
||||
command = [
|
||||
context.onedrive_bin,
|
||||
"--sync",
|
||||
"--verbose",
|
||||
"--resync",
|
||||
"--resync-auth",
|
||||
"--single-directory",
|
||||
blocked_dir,
|
||||
"--force-sync",
|
||||
"--syncdir",
|
||||
str(sync_root),
|
||||
"--confdir",
|
||||
str(confdir),
|
||||
]
|
||||
result = run_command(command, cwd=context.repo_root, input_text="Y\n")
|
||||
write_text_file(stdout_file, result.stdout)
|
||||
write_text_file(stderr_file, result.stderr)
|
||||
|
||||
verify_command = [
|
||||
context.onedrive_bin,
|
||||
"--sync",
|
||||
"--verbose",
|
||||
"--download-only",
|
||||
"--resync",
|
||||
"--resync-auth",
|
||||
"--syncdir",
|
||||
str(verify_root),
|
||||
"--confdir",
|
||||
str(verify_confdir),
|
||||
]
|
||||
verify_result = run_command(verify_command, cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout)
|
||||
write_text_file(verify_stderr, verify_result.stderr)
|
||||
remote_manifest = build_manifest(verify_root)
|
||||
write_manifest(remote_manifest_file, remote_manifest)
|
||||
|
||||
write_text_file(metadata_file, "\n".join([
|
||||
f"blocked_dir={blocked_dir}",
|
||||
f"command={command_to_string(command)}",
|
||||
f"returncode={result.returncode}",
|
||||
f"verify_returncode={verify_result.returncode}",
|
||||
]) + "\n")
|
||||
|
||||
artifacts = [str(stdout_file), str(stderr_file), str(verify_stdout), str(verify_stderr), str(remote_manifest_file), str(metadata_file)]
|
||||
details = {"command": command, "returncode": result.returncode, "verify_returncode": verify_result.returncode, "blocked_dir": blocked_dir}
|
||||
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Blocked single-directory sync with --force-sync failed with status {result.returncode}", artifacts, details)
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts, details)
|
||||
if f"{blocked_dir}/allowed_via_force.txt" not in remote_manifest:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"--force-sync did not synchronise blocked path: {blocked_dir}/allowed_via_force.txt", artifacts, details)
|
||||
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,51 +1,43 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0006DownloadOnly(Wave1TestCaseBase):
|
||||
class TestCase0006DownloadOnly(E2ETestCase):
|
||||
case_id = "0006"
|
||||
name = "download-only behaviour"
|
||||
description = "Validate that remote content downloads locally and local-only content is not uploaded"
|
||||
description = "Validate that download-only populates local content from remote data"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
seed_root = case_work_dir / "seed-syncroot"
|
||||
seed_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(seed_root / root_name / "Remote" / "download_me.txt", "remote file\n")
|
||||
seed_conf = self._new_config_dir(context, case_work_dir, "seed")
|
||||
config_path = self._write_config(seed_conf)
|
||||
artifacts.append(str(config_path))
|
||||
seed_result = self._run_onedrive(context, sync_root=seed_root, config_dir=seed_conf)
|
||||
artifacts.extend(self._write_command_artifacts(result=seed_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="seed"))
|
||||
if seed_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote seed failed with status {seed_result.returncode}", artifacts)
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0006 config\nbypass_data_preservation = \"true\"\n")
|
||||
|
||||
sync_root = case_work_dir / "download-syncroot"
|
||||
sync_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(sync_root / root_name / "LocalOnly" / "stay_local.txt", "must not upload\n")
|
||||
conf_dir = self._new_config_dir(context, case_work_dir, "download")
|
||||
config_path = self._write_config(conf_dir)
|
||||
artifacts.append(str(config_path))
|
||||
result = self._run_onedrive(context, sync_root=sync_root, config_dir=conf_dir, extra_args=["--download-only"])
|
||||
artifacts.extend(self._write_command_artifacts(result=result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="download_only"))
|
||||
artifacts.extend(self._write_manifests(sync_root, case_state_dir, "local_after"))
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"--download-only failed with status {result.returncode}", artifacts)
|
||||
if not (sync_root / root_name / "Remote" / "download_me.txt").exists():
|
||||
return TestResult.fail_result(self.case_id, self.name, "Remote file was not downloaded locally", artifacts)
|
||||
if not (sync_root / root_name / "LocalOnly" / "stay_local.txt").exists():
|
||||
return TestResult.fail_result(self.case_id, self.name, "Local-only file should remain present locally", artifacts)
|
||||
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "verify_remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="verify_remote"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts)
|
||||
verify_snapshot = self._snapshot_files(verify_root)
|
||||
if f"{root_name}/LocalOnly/stay_local.txt" in verify_snapshot:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Local-only file was uploaded during --download-only", artifacts)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0006"; case_log_dir = context.logs_dir / "tc0006"; state_dir = context.state_dir / "tc0006"
|
||||
reset_directory(case_work_dir); reset_directory(case_log_dir); reset_directory(state_dir); context.ensure_refresh_token_available()
|
||||
seed_root = case_work_dir / "seedroot"; seed_conf = case_work_dir / "conf-seed"; download_root = case_work_dir / "downloadroot"; download_conf = case_work_dir / "conf-download"; root_name = f"ZZ_E2E_TC0006_{context.run_id}_{os.getpid()}"
|
||||
write_text_file(seed_root / root_name / "remote.txt", "remote\n"); write_text_file(seed_root / root_name / "subdir" / "nested.txt", "nested\n")
|
||||
context.bootstrap_config_dir(seed_conf); self._write_config(seed_conf / "config")
|
||||
context.bootstrap_config_dir(download_conf); self._write_config(download_conf / "config")
|
||||
seed_stdout = case_log_dir / "seed_stdout.log"; seed_stderr = case_log_dir / "seed_stderr.log"; dl_stdout = case_log_dir / "download_stdout.log"; dl_stderr = case_log_dir / "download_stderr.log"; local_manifest_file = state_dir / "download_manifest.txt"; metadata_file = state_dir / "seed_metadata.txt"
|
||||
seed_command = [context.onedrive_bin, "--sync", "--verbose", "--resync", "--resync-auth", "--syncdir", str(seed_root), "--confdir", str(seed_conf)]
|
||||
seed_result = run_command(seed_command, cwd=context.repo_root)
|
||||
write_text_file(seed_stdout, seed_result.stdout); write_text_file(seed_stderr, seed_result.stderr)
|
||||
download_command = [context.onedrive_bin, "--sync", "--verbose", "--download-only", "--resync", "--resync-auth", "--syncdir", str(download_root), "--confdir", str(download_conf)]
|
||||
download_result = run_command(download_command, cwd=context.repo_root)
|
||||
write_text_file(dl_stdout, download_result.stdout); write_text_file(dl_stderr, download_result.stderr); local_manifest = build_manifest(download_root); write_manifest(local_manifest_file, local_manifest)
|
||||
write_text_file(metadata_file, "\n".join([f"root_name={root_name}", f"seed_command={command_to_string(seed_command)}", f"seed_returncode={seed_result.returncode}", f"download_command={command_to_string(download_command)}", f"download_returncode={download_result.returncode}"]) + "\n")
|
||||
artifacts = [str(seed_stdout), str(seed_stderr), str(dl_stdout), str(dl_stderr), str(local_manifest_file), str(metadata_file)]
|
||||
details = {"seed_returncode": seed_result.returncode, "download_returncode": download_result.returncode, "root_name": root_name}
|
||||
if seed_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"Remote seed failed with status {seed_result.returncode}", artifacts, details)
|
||||
if download_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"--download-only failed with status {download_result.returncode}", artifacts, details)
|
||||
wanted = [root_name, f"{root_name}/remote.txt", f"{root_name}/subdir", f"{root_name}/subdir/nested.txt"]
|
||||
missing = [w for w in wanted if w not in local_manifest]
|
||||
if missing: return TestResult.fail_result(self.case_id, self.name, "Downloaded manifest missing expected content: " + ", ".join(missing), artifacts, details)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,48 +1,43 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0007DownloadOnlyCleanupLocalFiles(Wave1TestCaseBase):
|
||||
class TestCase0007DownloadOnlyCleanupLocalFiles(E2ETestCase):
|
||||
case_id = "0007"
|
||||
name = "download-only cleanup-local-files"
|
||||
description = "Validate that stale local files are removed when cleanup_local_files is enabled"
|
||||
description = "Validate that cleanup_local_files removes stale local content in download-only mode"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
seed_root = case_work_dir / "seed-syncroot"
|
||||
seed_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(seed_root / root_name / "Keep" / "keep.txt", "keep\n")
|
||||
seed_conf = self._new_config_dir(context, case_work_dir, "seed")
|
||||
config_path = self._write_config(seed_conf)
|
||||
artifacts.append(str(config_path))
|
||||
seed_result = self._run_onedrive(context, sync_root=seed_root, config_dir=seed_conf)
|
||||
artifacts.extend(self._write_command_artifacts(result=seed_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="seed"))
|
||||
if seed_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote seed failed with status {seed_result.returncode}", artifacts)
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0007 config\nbypass_data_preservation = \"true\"\n")
|
||||
|
||||
sync_root = case_work_dir / "cleanup-syncroot"
|
||||
sync_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(sync_root / root_name / "Keep" / "keep.txt", "local keep placeholder\n")
|
||||
self._create_text_file(sync_root / root_name / "Obsolete" / "old.txt", "obsolete\n")
|
||||
conf_dir = self._new_config_dir(context, case_work_dir, "cleanup")
|
||||
config_path = self._write_config(conf_dir, extra_lines=['cleanup_local_files = "true"'])
|
||||
artifacts.append(str(config_path))
|
||||
result = self._run_onedrive(
|
||||
context,
|
||||
sync_root=sync_root,
|
||||
config_dir=conf_dir,
|
||||
extra_args=["--download-only", "--single-directory", root_name],
|
||||
)
|
||||
artifacts.extend(self._write_command_artifacts(result=result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="cleanup_download_only"))
|
||||
artifacts.extend(self._write_manifests(sync_root, case_state_dir, "local_after"))
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Cleanup validation failed with status {result.returncode}", artifacts)
|
||||
if not (sync_root / root_name / "Keep" / "keep.txt").exists():
|
||||
return TestResult.fail_result(self.case_id, self.name, "Expected retained file is missing after cleanup", artifacts)
|
||||
if (sync_root / root_name / "Obsolete" / "old.txt").exists():
|
||||
return TestResult.fail_result(self.case_id, self.name, "Stale local file still exists after cleanup_local_files processing", artifacts)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0007"; case_log_dir = context.logs_dir / "tc0007"; state_dir = context.state_dir / "tc0007"
|
||||
reset_directory(case_work_dir); reset_directory(case_log_dir); reset_directory(state_dir); context.ensure_refresh_token_available()
|
||||
sync_root = case_work_dir / "syncroot"; seed_conf = case_work_dir / "conf-seed"; cleanup_conf = case_work_dir / "conf-cleanup"; root_name = f"ZZ_E2E_TC0007_{context.run_id}_{os.getpid()}"
|
||||
write_text_file(sync_root / root_name / "keep.txt", "keep\n")
|
||||
context.bootstrap_config_dir(seed_conf); self._write_config(seed_conf / "config")
|
||||
context.bootstrap_config_dir(cleanup_conf); self._write_config(cleanup_conf / "config")
|
||||
seed_stdout = case_log_dir / "seed_stdout.log"; seed_stderr = case_log_dir / "seed_stderr.log"; cleanup_stdout = case_log_dir / "cleanup_stdout.log"; cleanup_stderr = case_log_dir / "cleanup_stderr.log"; post_manifest_file = state_dir / "post_cleanup_manifest.txt"; metadata_file = state_dir / "seed_metadata.txt"
|
||||
seed_command = [context.onedrive_bin, "--sync", "--verbose", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(seed_conf)]
|
||||
seed_result = run_command(seed_command, cwd=context.repo_root)
|
||||
write_text_file(seed_stdout, seed_result.stdout); write_text_file(seed_stderr, seed_result.stderr)
|
||||
stale = sync_root / root_name / "stale-local.txt"; write_text_file(stale, "stale\n")
|
||||
cleanup_command = [context.onedrive_bin, "--sync", "--verbose", "--download-only", "--cleanup-local-files", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(cleanup_conf)]
|
||||
cleanup_result = run_command(cleanup_command, cwd=context.repo_root)
|
||||
write_text_file(cleanup_stdout, cleanup_result.stdout); write_text_file(cleanup_stderr, cleanup_result.stderr); post_manifest = build_manifest(sync_root); write_manifest(post_manifest_file, post_manifest)
|
||||
write_text_file(metadata_file, "\n".join([f"root_name={root_name}", f"seed_returncode={seed_result.returncode}", f"cleanup_returncode={cleanup_result.returncode}"]) + "\n")
|
||||
artifacts = [str(seed_stdout), str(seed_stderr), str(cleanup_stdout), str(cleanup_stderr), str(post_manifest_file), str(metadata_file)]
|
||||
details = {"seed_returncode": seed_result.returncode, "cleanup_returncode": cleanup_result.returncode, "root_name": root_name}
|
||||
if seed_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"Remote seed failed with status {seed_result.returncode}", artifacts, details)
|
||||
if cleanup_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"cleanup_local_files processing failed with status {cleanup_result.returncode}", artifacts, details)
|
||||
if stale.exists() or f"{root_name}/stale-local.txt" in post_manifest: return TestResult.fail_result(self.case_id, self.name, "Stale local file still exists after cleanup_local_files processing", artifacts, details)
|
||||
if f"{root_name}/keep.txt" not in post_manifest: return TestResult.fail_result(self.case_id, self.name, "Expected remote-backed file missing after cleanup_local_files processing", artifacts, details)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,38 +1,41 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0008UploadOnly(Wave1TestCaseBase):
|
||||
class TestCase0008UploadOnly(E2ETestCase):
|
||||
case_id = "0008"
|
||||
name = "upload-only behaviour"
|
||||
description = "Validate that local content is uploaded when using --upload-only"
|
||||
description = "Validate that upload-only pushes local content remotely"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
sync_root = case_work_dir / "upload-syncroot"
|
||||
sync_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(sync_root / root_name / "Upload" / "file.txt", "upload me\n")
|
||||
self._create_binary_file(sync_root / root_name / "Upload" / "blob.bin", 70 * 1024)
|
||||
conf_dir = self._new_config_dir(context, case_work_dir, "upload")
|
||||
config_path = self._write_config(conf_dir)
|
||||
artifacts.append(str(config_path))
|
||||
result = self._run_onedrive(context, sync_root=sync_root, config_dir=conf_dir, extra_args=["--upload-only"])
|
||||
artifacts.extend(self._write_command_artifacts(result=result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="upload_only"))
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"--upload-only failed with status {result.returncode}", artifacts)
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "verify_remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="verify_remote"))
|
||||
artifacts.extend(self._write_manifests(verify_root, case_state_dir, "remote_manifest"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts)
|
||||
verify_snapshot = self._snapshot_files(verify_root)
|
||||
expected = {f"{root_name}/Upload/file.txt", f"{root_name}/Upload/blob.bin"}
|
||||
missing = sorted(expected - set(verify_snapshot.keys()))
|
||||
if missing:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Uploaded files were not present remotely", artifacts, {"missing": missing})
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0008 config\nbypass_data_preservation = \"true\"\n")
|
||||
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0008"; case_log_dir = context.logs_dir / "tc0008"; state_dir = context.state_dir / "tc0008"
|
||||
reset_directory(case_work_dir); reset_directory(case_log_dir); reset_directory(state_dir); context.ensure_refresh_token_available()
|
||||
upload_root = case_work_dir / "uploadroot"; upload_conf = case_work_dir / "conf-upload"; verify_root = case_work_dir / "verifyroot"; verify_conf = case_work_dir / "conf-verify"; root_name = f"ZZ_E2E_TC0008_{context.run_id}_{os.getpid()}"
|
||||
write_text_file(upload_root / root_name / "upload.txt", "upload only\n")
|
||||
context.bootstrap_config_dir(upload_conf); self._write_config(upload_conf / "config")
|
||||
context.bootstrap_config_dir(verify_conf); self._write_config(verify_conf / "config")
|
||||
stdout_file = case_log_dir / "upload_only_stdout.log"; stderr_file = case_log_dir / "upload_only_stderr.log"; verify_stdout = case_log_dir / "verify_stdout.log"; verify_stderr = case_log_dir / "verify_stderr.log"; remote_manifest_file = state_dir / "remote_verify_manifest.txt"; metadata_file = state_dir / "upload_metadata.txt"
|
||||
command = [context.onedrive_bin, "--sync", "--verbose", "--upload-only", "--resync", "--resync-auth", "--syncdir", str(upload_root), "--confdir", str(upload_conf)]
|
||||
result = run_command(command, cwd=context.repo_root)
|
||||
write_text_file(stdout_file, result.stdout); write_text_file(stderr_file, result.stderr)
|
||||
verify_command = [context.onedrive_bin, "--sync", "--verbose", "--download-only", "--resync", "--resync-auth", "--syncdir", str(verify_root), "--confdir", str(verify_conf)]
|
||||
verify_result = run_command(verify_command, cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout); write_text_file(verify_stderr, verify_result.stderr); remote_manifest = build_manifest(verify_root); write_manifest(remote_manifest_file, remote_manifest)
|
||||
write_text_file(metadata_file, "\n".join([f"root_name={root_name}", f"returncode={result.returncode}", f"verify_returncode={verify_result.returncode}"]) + "\n")
|
||||
artifacts = [str(stdout_file), str(stderr_file), str(verify_stdout), str(verify_stderr), str(remote_manifest_file), str(metadata_file)]
|
||||
details = {"returncode": result.returncode, "verify_returncode": verify_result.returncode, "root_name": root_name}
|
||||
if result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"--upload-only failed with status {result.returncode}", artifacts, details)
|
||||
if verify_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts, details)
|
||||
if f"{root_name}/upload.txt" not in remote_manifest: return TestResult.fail_result(self.case_id, self.name, f"Upload-only did not synchronise expected remote file: {root_name}/upload.txt", artifacts, details)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,48 +1,47 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0009UploadOnlyNoRemoteDelete(Wave1TestCaseBase):
|
||||
class TestCase0009UploadOnlyNoRemoteDelete(E2ETestCase):
|
||||
case_id = "0009"
|
||||
name = "upload-only no-remote-delete"
|
||||
description = "Validate that remote data is retained when local content is absent and no_remote_delete is enabled"
|
||||
description = "Validate that no_remote_delete preserves remote content in upload-only mode"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
seed_root = case_work_dir / "seed-syncroot"
|
||||
seed_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(seed_root / root_name / "RemoteKeep" / "preserve.txt", "preserve remotely\n")
|
||||
seed_conf = self._new_config_dir(context, case_work_dir, "seed")
|
||||
config_path = self._write_config(seed_conf)
|
||||
artifacts.append(str(config_path))
|
||||
seed_result = self._run_onedrive(context, sync_root=seed_root, config_dir=seed_conf)
|
||||
artifacts.extend(self._write_command_artifacts(result=seed_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="seed"))
|
||||
if seed_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote seed failed with status {seed_result.returncode}", artifacts)
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0009 config\nbypass_data_preservation = \"true\"\n")
|
||||
|
||||
sync_root = case_work_dir / "upload-syncroot"
|
||||
sync_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(sync_root / root_name / "LocalUpload" / "new.txt", "new upload\n")
|
||||
conf_dir = self._new_config_dir(context, case_work_dir, "upload")
|
||||
config_path = self._write_config(conf_dir, extra_lines=['no_remote_delete = "true"'])
|
||||
artifacts.append(str(config_path))
|
||||
result = self._run_onedrive(context, sync_root=sync_root, config_dir=conf_dir, extra_args=["--upload-only"])
|
||||
artifacts.extend(self._write_command_artifacts(result=result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="upload_only_no_remote_delete"))
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"--upload-only --no-remote-delete failed with status {result.returncode}", artifacts)
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "verify_remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="verify_remote"))
|
||||
artifacts.extend(self._write_manifests(verify_root, case_state_dir, "remote_manifest"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts)
|
||||
verify_snapshot = self._snapshot_files(verify_root)
|
||||
expected = {f"{root_name}/RemoteKeep/preserve.txt", f"{root_name}/LocalUpload/new.txt"}
|
||||
missing = sorted(expected - set(verify_snapshot.keys()))
|
||||
if missing:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Remote content was deleted or not uploaded as expected", artifacts, {"missing": missing})
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0009"; case_log_dir = context.logs_dir / "tc0009"; state_dir = context.state_dir / "tc0009"
|
||||
reset_directory(case_work_dir); reset_directory(case_log_dir); reset_directory(state_dir); context.ensure_refresh_token_available()
|
||||
sync_root = case_work_dir / "syncroot"; seed_conf = case_work_dir / "conf-seed"; upload_conf = case_work_dir / "conf-upload"; verify_root = case_work_dir / "verifyroot"; verify_conf = case_work_dir / "conf-verify"; root_name = f"ZZ_E2E_TC0009_{context.run_id}_{os.getpid()}"
|
||||
keep_file = sync_root / root_name / "keep.txt"; write_text_file(keep_file, "keep remote\n")
|
||||
context.bootstrap_config_dir(seed_conf); self._write_config(seed_conf / "config")
|
||||
context.bootstrap_config_dir(upload_conf); self._write_config(upload_conf / "config")
|
||||
context.bootstrap_config_dir(verify_conf); self._write_config(verify_conf / "config")
|
||||
seed_stdout = case_log_dir / "seed_stdout.log"; seed_stderr = case_log_dir / "seed_stderr.log"; upload_stdout = case_log_dir / "upload_only_stdout.log"; upload_stderr = case_log_dir / "upload_only_stderr.log"; verify_stdout = case_log_dir / "verify_stdout.log"; verify_stderr = case_log_dir / "verify_stderr.log"; remote_manifest_file = state_dir / "remote_verify_manifest.txt"; metadata_file = state_dir / "seed_metadata.txt"
|
||||
seed_command = [context.onedrive_bin, "--sync", "--verbose", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(seed_conf)]
|
||||
seed_result = run_command(seed_command, cwd=context.repo_root)
|
||||
write_text_file(seed_stdout, seed_result.stdout); write_text_file(seed_stderr, seed_result.stderr)
|
||||
if keep_file.exists(): keep_file.unlink()
|
||||
upload_command = [context.onedrive_bin, "--sync", "--verbose", "--upload-only", "--no-remote-delete", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(upload_conf)]
|
||||
upload_result = run_command(upload_command, cwd=context.repo_root)
|
||||
write_text_file(upload_stdout, upload_result.stdout); write_text_file(upload_stderr, upload_result.stderr)
|
||||
verify_command = [context.onedrive_bin, "--sync", "--verbose", "--download-only", "--resync", "--resync-auth", "--syncdir", str(verify_root), "--confdir", str(verify_conf)]
|
||||
verify_result = run_command(verify_command, cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout); write_text_file(verify_stderr, verify_result.stderr); remote_manifest = build_manifest(verify_root); write_manifest(remote_manifest_file, remote_manifest)
|
||||
write_text_file(metadata_file, "\n".join([f"root_name={root_name}", f"seed_returncode={seed_result.returncode}", f"upload_returncode={upload_result.returncode}", f"verify_returncode={verify_result.returncode}"]) + "\n")
|
||||
artifacts = [str(seed_stdout), str(seed_stderr), str(upload_stdout), str(upload_stderr), str(verify_stdout), str(verify_stderr), str(remote_manifest_file), str(metadata_file)]
|
||||
details = {"seed_returncode": seed_result.returncode, "upload_returncode": upload_result.returncode, "verify_returncode": verify_result.returncode, "root_name": root_name}
|
||||
if seed_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"Remote seed failed with status {seed_result.returncode}", artifacts, details)
|
||||
if upload_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"--upload-only --no-remote-delete failed with status {upload_result.returncode}", artifacts, details)
|
||||
if verify_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts, details)
|
||||
if f"{root_name}/keep.txt" not in remote_manifest: return TestResult.fail_result(self.case_id, self.name, f"Remote file was unexpectedly deleted despite --no-remote-delete: {root_name}/keep.txt", artifacts, details)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,37 +1,42 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0010UploadOnlyRemoveSourceFiles(Wave1TestCaseBase):
|
||||
class TestCase0010UploadOnlyRemoveSourceFiles(E2ETestCase):
|
||||
case_id = "0010"
|
||||
name = "upload-only remove-source-files"
|
||||
description = "Validate that local files are removed after successful upload when remove_source_files is enabled"
|
||||
description = "Validate that remove_source_files removes local files after upload-only succeeds"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
sync_root = case_work_dir / "upload-syncroot"
|
||||
sync_root.mkdir(parents=True, exist_ok=True)
|
||||
source_file = sync_root / root_name / "Source" / "upload_and_remove.txt"
|
||||
self._create_text_file(source_file, "remove after upload\n")
|
||||
conf_dir = self._new_config_dir(context, case_work_dir, "upload")
|
||||
config_path = self._write_config(conf_dir, extra_lines=['remove_source_files = "true"'])
|
||||
artifacts.append(str(config_path))
|
||||
result = self._run_onedrive(context, sync_root=sync_root, config_dir=conf_dir, extra_args=["--upload-only"])
|
||||
artifacts.extend(self._write_command_artifacts(result=result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="upload_only_remove_source"))
|
||||
artifacts.extend(self._write_manifests(sync_root, case_state_dir, "local_after"))
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"--upload-only with remove_source_files failed with status {result.returncode}", artifacts)
|
||||
if source_file.exists():
|
||||
return TestResult.fail_result(self.case_id, self.name, "Source file still exists locally after upload", artifacts)
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "verify_remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="verify_remote"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts)
|
||||
if not (verify_root / root_name / "Source" / "upload_and_remove.txt").exists():
|
||||
return TestResult.fail_result(self.case_id, self.name, "Uploaded file was not present remotely after local removal", artifacts)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0010 config\nbypass_data_preservation = \"true\"\n")
|
||||
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0010"; case_log_dir = context.logs_dir / "tc0010"; state_dir = context.state_dir / "tc0010"
|
||||
reset_directory(case_work_dir); reset_directory(case_log_dir); reset_directory(state_dir); context.ensure_refresh_token_available()
|
||||
sync_root = case_work_dir / "syncroot"; upload_conf = case_work_dir / "conf-upload"; verify_root = case_work_dir / "verifyroot"; verify_conf = case_work_dir / "conf-verify"; root_name = f"ZZ_E2E_TC0010_{context.run_id}_{os.getpid()}"
|
||||
source_file = sync_root / root_name / "source.txt"; write_text_file(source_file, "remove after upload\n")
|
||||
context.bootstrap_config_dir(upload_conf); self._write_config(upload_conf / "config")
|
||||
context.bootstrap_config_dir(verify_conf); self._write_config(verify_conf / "config")
|
||||
stdout_file = case_log_dir / "upload_only_remove_source_stdout.log"; stderr_file = case_log_dir / "upload_only_remove_source_stderr.log"; verify_stdout = case_log_dir / "verify_stdout.log"; verify_stderr = case_log_dir / "verify_stderr.log"; post_manifest_file = state_dir / "post_upload_manifest.txt"; remote_manifest_file = state_dir / "remote_verify_manifest.txt"; metadata_file = state_dir / "upload_metadata.txt"
|
||||
command = [context.onedrive_bin, "--sync", "--verbose", "--upload-only", "--remove-source-files", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(upload_conf)]
|
||||
result = run_command(command, cwd=context.repo_root)
|
||||
write_text_file(stdout_file, result.stdout); write_text_file(stderr_file, result.stderr); post_manifest = build_manifest(sync_root); write_manifest(post_manifest_file, post_manifest)
|
||||
verify_command = [context.onedrive_bin, "--sync", "--verbose", "--download-only", "--resync", "--resync-auth", "--syncdir", str(verify_root), "--confdir", str(verify_conf)]
|
||||
verify_result = run_command(verify_command, cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout); write_text_file(verify_stderr, verify_result.stderr); remote_manifest = build_manifest(verify_root); write_manifest(remote_manifest_file, remote_manifest)
|
||||
write_text_file(metadata_file, "\n".join([f"root_name={root_name}", f"returncode={result.returncode}", f"verify_returncode={verify_result.returncode}"]) + "\n")
|
||||
artifacts = [str(stdout_file), str(stderr_file), str(verify_stdout), str(verify_stderr), str(post_manifest_file), str(remote_manifest_file), str(metadata_file)]
|
||||
details = {"returncode": result.returncode, "verify_returncode": verify_result.returncode, "root_name": root_name}
|
||||
if result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"--upload-only with remove_source_files failed with status {result.returncode}", artifacts, details)
|
||||
if verify_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts, details)
|
||||
if source_file.exists() or f"{root_name}/source.txt" in post_manifest: return TestResult.fail_result(self.case_id, self.name, "Local source file still exists after remove_source_files processing", artifacts, details)
|
||||
if f"{root_name}/source.txt" not in remote_manifest: return TestResult.fail_result(self.case_id, self.name, f"Remote file missing after upload-only remove_source_files: {root_name}/source.txt", artifacts, details)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,43 +1,43 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0011SkipFileValidation(Wave1TestCaseBase):
|
||||
class TestCase0011SkipFileValidation(E2ETestCase):
|
||||
case_id = "0011"
|
||||
name = "skip_file validation"
|
||||
description = "Validate that skip_file patterns exclude matching files from synchronisation"
|
||||
description = "Validate that skip_file patterns prevent matching files from synchronising"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
sync_root = case_work_dir / "syncroot"
|
||||
sync_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(sync_root / root_name / "keep.txt", "keep me\n")
|
||||
self._create_text_file(sync_root / root_name / "ignore.tmp", "temp\n")
|
||||
self._create_text_file(sync_root / root_name / "editor.swp", "swap\n")
|
||||
self._create_text_file(sync_root / root_name / "Nested" / "keep.md", "nested keep\n")
|
||||
conf_dir = self._new_config_dir(context, case_work_dir, "main")
|
||||
config_path = self._write_config(conf_dir, extra_lines=['skip_file = "*.tmp|*.swp"'])
|
||||
artifacts.append(str(config_path))
|
||||
result = self._run_onedrive(context, sync_root=sync_root, config_dir=conf_dir)
|
||||
artifacts.extend(self._write_command_artifacts(result=result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="skip_file"))
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"skip_file validation failed with status {result.returncode}", artifacts)
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "verify_remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="verify_remote"))
|
||||
artifacts.extend(self._write_manifests(verify_root, case_state_dir, "remote_manifest"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts)
|
||||
snapshot = self._snapshot_files(verify_root)
|
||||
expected = {f"{root_name}/keep.txt", f"{root_name}/Nested/keep.md"}
|
||||
missing = sorted(expected - set(snapshot.keys()))
|
||||
if missing:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Expected non-skipped files are missing remotely", artifacts, {"missing": missing})
|
||||
present = sorted(path for path in [f"{root_name}/ignore.tmp", f"{root_name}/editor.swp"] if path in snapshot)
|
||||
if present:
|
||||
return TestResult.fail_result(self.case_id, self.name, "skip_file patterns did not exclude all matching files", artifacts, {"present": present})
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0011 config\nbypass_data_preservation = \"true\"\nskip_file = \"*.tmp|*.swp\"\n")
|
||||
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0011"; case_log_dir = context.logs_dir / "tc0011"; state_dir = context.state_dir / "tc0011"
|
||||
reset_directory(case_work_dir); reset_directory(case_log_dir); reset_directory(state_dir); context.ensure_refresh_token_available()
|
||||
sync_root = case_work_dir / "syncroot"; confdir = case_work_dir / "conf-main"; verify_root = case_work_dir / "verifyroot"; verify_conf = case_work_dir / "conf-verify"; root_name = f"ZZ_E2E_TC0011_{context.run_id}_{os.getpid()}"
|
||||
write_text_file(sync_root / root_name / "keep.txt", "keep\n"); write_text_file(sync_root / root_name / "skip.tmp", "skip\n"); write_text_file(sync_root / root_name / "editor.swp", "swap\n")
|
||||
context.bootstrap_config_dir(confdir); self._write_config(confdir / "config")
|
||||
context.bootstrap_config_dir(verify_conf); write_text_file(verify_conf / "config", "# tc0011 verify\nbypass_data_preservation = \"true\"\n")
|
||||
stdout_file = case_log_dir / "skip_file_stdout.log"; stderr_file = case_log_dir / "skip_file_stderr.log"; verify_stdout = case_log_dir / "verify_stdout.log"; verify_stderr = case_log_dir / "verify_stderr.log"; remote_manifest_file = state_dir / "remote_verify_manifest.txt"; metadata_file = state_dir / "metadata.txt"
|
||||
command = [context.onedrive_bin, "--sync", "--verbose", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(confdir)]
|
||||
result = run_command(command, cwd=context.repo_root)
|
||||
write_text_file(stdout_file, result.stdout); write_text_file(stderr_file, result.stderr)
|
||||
verify_command = [context.onedrive_bin, "--sync", "--verbose", "--download-only", "--resync", "--resync-auth", "--syncdir", str(verify_root), "--confdir", str(verify_conf)]
|
||||
verify_result = run_command(verify_command, cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout); write_text_file(verify_stderr, verify_result.stderr); remote_manifest = build_manifest(verify_root); write_manifest(remote_manifest_file, remote_manifest)
|
||||
write_text_file(metadata_file, f"root_name={root_name}\nreturncode={result.returncode}\nverify_returncode={verify_result.returncode}\n")
|
||||
artifacts = [str(stdout_file), str(stderr_file), str(verify_stdout), str(verify_stderr), str(remote_manifest_file), str(metadata_file)]
|
||||
details = {"returncode": result.returncode, "verify_returncode": verify_result.returncode, "root_name": root_name}
|
||||
if result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"skip_file validation failed with status {result.returncode}", artifacts, details)
|
||||
if verify_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts, details)
|
||||
if f"{root_name}/keep.txt" not in remote_manifest: return TestResult.fail_result(self.case_id, self.name, f"Expected non-skipped file missing remotely: {root_name}/keep.txt", artifacts, details)
|
||||
for unwanted in [f"{root_name}/skip.tmp", f"{root_name}/editor.swp"]:
|
||||
if unwanted in remote_manifest: return TestResult.fail_result(self.case_id, self.name, f"skip_file pattern failed, file was synchronised: {unwanted}", artifacts, details)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,70 +1,75 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0012SkipDirValidation(Wave1TestCaseBase):
|
||||
class TestCase0012SkipDirValidation(E2ETestCase):
|
||||
case_id = "0012"
|
||||
name = "skip_dir validation"
|
||||
description = "Validate loose and strict skip_dir matching behaviour"
|
||||
description = "Validate skip_dir loose matching and skip_dir_strict_match behaviour"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
failures = []
|
||||
def _write_config(self, config_path: Path, skip_dir_value: str, strict: bool) -> None:
|
||||
lines = ["# tc0012 config", "bypass_data_preservation = \"true\"", f"skip_dir = \"{skip_dir_value}\""]
|
||||
if strict:
|
||||
lines.append("skip_dir_strict_match = \"true\"")
|
||||
write_text_file(config_path, "\n".join(lines) + "\n")
|
||||
|
||||
loose_root = case_work_dir / "loose-syncroot"
|
||||
loose_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(loose_root / root_name / "project" / "build" / "out.bin", "skip me\n")
|
||||
self._create_text_file(loose_root / root_name / "build" / "root.bin", "skip me too\n")
|
||||
self._create_text_file(loose_root / root_name / "project" / "src" / "app.txt", "keep me\n")
|
||||
loose_conf = self._new_config_dir(context, case_work_dir, "loose")
|
||||
config_path = self._write_config(loose_conf, extra_lines=['skip_dir = "build"', 'skip_dir_strict_match = "false"'])
|
||||
artifacts.append(str(config_path))
|
||||
loose_result = self._run_onedrive(context, sync_root=loose_root, config_dir=loose_conf)
|
||||
artifacts.extend(self._write_command_artifacts(result=loose_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="loose_match"))
|
||||
if loose_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Loose skip_dir scenario failed with status {loose_result.returncode}", artifacts)
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "loose_remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="loose_verify"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Loose skip_dir verification failed with status {verify_result.returncode}", artifacts)
|
||||
loose_snapshot = self._snapshot_files(verify_root)
|
||||
if f"{root_name}/project/src/app.txt" not in loose_snapshot:
|
||||
failures.append("Loose matching did not retain non-build content")
|
||||
for forbidden in [f"{root_name}/project/build/out.bin", f"{root_name}/build/root.bin"]:
|
||||
if forbidden in loose_snapshot:
|
||||
failures.append(f"Loose matching did not exclude {forbidden}")
|
||||
def _run_loose(self, context: E2EContext, case_log_dir: Path, all_artifacts: list[str], failures: list[str]) -> None:
|
||||
scenario_root = context.work_root / "tc0012" / "loose_match"; scenario_state = context.state_dir / "tc0012" / "loose_match"
|
||||
reset_directory(scenario_root); reset_directory(scenario_state)
|
||||
sync_root = scenario_root / "syncroot"; confdir = scenario_root / "conf-loose"; verify_root = scenario_root / "verifyroot"; verify_conf = scenario_root / "conf-verify-loose"
|
||||
root = f"ZZ_E2E_TC0012_LOOSE_{context.run_id}_{os.getpid()}"
|
||||
write_text_file(sync_root / root / "Cache" / "top.txt", "skip top\n")
|
||||
write_text_file(sync_root / root / "App" / "Cache" / "nested.txt", "skip nested\n")
|
||||
write_text_file(sync_root / root / "Keep" / "ok.txt", "ok\n")
|
||||
context.bootstrap_config_dir(confdir); self._write_config(confdir / "config", "Cache", False)
|
||||
context.bootstrap_config_dir(verify_conf); write_text_file(verify_conf / "config", "# verify\nbypass_data_preservation = \"true\"\n")
|
||||
stdout_file = case_log_dir / "loose_match_stdout.log"; stderr_file = case_log_dir / "loose_match_stderr.log"; verify_stdout = case_log_dir / "loose_match_verify_stdout.log"; verify_stderr = case_log_dir / "loose_match_verify_stderr.log"; manifest_file = scenario_state / "remote_verify_manifest.txt"
|
||||
result = run_command([context.onedrive_bin, "--sync", "--verbose", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(confdir)], cwd=context.repo_root)
|
||||
write_text_file(stdout_file, result.stdout); write_text_file(stderr_file, result.stderr)
|
||||
verify_result = run_command([context.onedrive_bin, "--sync", "--verbose", "--download-only", "--resync", "--resync-auth", "--syncdir", str(verify_root), "--confdir", str(verify_conf)], cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout); write_text_file(verify_stderr, verify_result.stderr); manifest = build_manifest(verify_root); write_manifest(manifest_file, manifest)
|
||||
all_artifacts.extend([str(stdout_file), str(stderr_file), str(verify_stdout), str(verify_stderr), str(manifest_file)])
|
||||
if result.returncode != 0: failures.append(f"Loose skip_dir scenario failed with status {result.returncode}"); return
|
||||
if verify_result.returncode != 0: failures.append(f"Loose skip_dir verification failed with status {verify_result.returncode}"); return
|
||||
if f"{root}/Keep/ok.txt" not in manifest: failures.append("Loose skip_dir scenario did not synchronise expected non-skipped content")
|
||||
for unwanted in [f"{root}/Cache/top.txt", f"{root}/App/Cache/nested.txt"]:
|
||||
if unwanted in manifest: failures.append(f"Loose skip_dir scenario unexpectedly synchronised skipped directory content: {unwanted}")
|
||||
|
||||
strict_scope = f"{root_name}_STRICT"
|
||||
strict_root = case_work_dir / "strict-syncroot"
|
||||
strict_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(strict_root / strict_scope / "project" / "build" / "skip.bin", "skip strict\n")
|
||||
self._create_text_file(strict_root / strict_scope / "other" / "build" / "keep.bin", "keep strict\n")
|
||||
self._create_text_file(strict_root / strict_scope / "other" / "src" / "keep.txt", "keep strict txt\n")
|
||||
strict_conf = self._new_config_dir(context, case_work_dir, "strict")
|
||||
config_path = self._write_config(strict_conf, extra_lines=[f'skip_dir = "{strict_scope}/project/build"', 'skip_dir_strict_match = "true"'])
|
||||
artifacts.append(str(config_path))
|
||||
strict_result = self._run_onedrive(context, sync_root=strict_root, config_dir=strict_conf)
|
||||
artifacts.extend(self._write_command_artifacts(result=strict_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="strict_match"))
|
||||
if strict_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Strict skip_dir scenario failed with status {strict_result.returncode}", artifacts)
|
||||
strict_verify_root, strict_verify_result, strict_verify_artifacts = self._download_remote_scope(context, case_work_dir, strict_scope, "strict_remote")
|
||||
artifacts.extend(strict_verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=strict_verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="strict_verify"))
|
||||
if strict_verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Strict skip_dir verification failed with status {strict_verify_result.returncode}", artifacts)
|
||||
strict_snapshot = self._snapshot_files(strict_verify_root)
|
||||
if f"{strict_scope}/project/build/skip.bin" in strict_snapshot:
|
||||
failures.append("Strict matching did not exclude the targeted full path")
|
||||
for required in [f"{strict_scope}/other/build/keep.bin", f"{strict_scope}/other/src/keep.txt"]:
|
||||
if required not in strict_snapshot:
|
||||
failures.append(f"Strict matching excluded unexpected content: {required}")
|
||||
artifacts.extend(self._write_manifests(verify_root, case_state_dir, "loose_manifest"))
|
||||
artifacts.extend(self._write_manifests(strict_verify_root, case_state_dir, "strict_manifest"))
|
||||
if failures:
|
||||
return TestResult.fail_result(self.case_id, self.name, "; ".join(failures), artifacts, {"failure_count": len(failures)})
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name, "strict_scope": strict_scope})
|
||||
def _run_strict(self, context: E2EContext, case_log_dir: Path, all_artifacts: list[str], failures: list[str]) -> None:
|
||||
scenario_root = context.work_root / "tc0012" / "strict_match"; scenario_state = context.state_dir / "tc0012" / "strict_match"
|
||||
reset_directory(scenario_root); reset_directory(scenario_state)
|
||||
sync_root = scenario_root / "syncroot"; confdir = scenario_root / "conf-strict"; verify_root = scenario_root / "verifyroot"; verify_conf = scenario_root / "conf-verify-strict"
|
||||
root = f"ZZ_E2E_TC0012_STRICT_{context.run_id}_{os.getpid()}"
|
||||
write_text_file(sync_root / root / "Cache" / "top.txt", "top should remain\n")
|
||||
write_text_file(sync_root / root / "App" / "Cache" / "nested.txt", "nested should skip\n")
|
||||
write_text_file(sync_root / root / "Keep" / "ok.txt", "ok\n")
|
||||
context.bootstrap_config_dir(confdir); self._write_config(confdir / "config", f"{root}/App/Cache", True)
|
||||
context.bootstrap_config_dir(verify_conf); write_text_file(verify_conf / "config", "# verify\nbypass_data_preservation = \"true\"\n")
|
||||
stdout_file = case_log_dir / "strict_match_stdout.log"; stderr_file = case_log_dir / "strict_match_stderr.log"; verify_stdout = case_log_dir / "strict_match_verify_stdout.log"; verify_stderr = case_log_dir / "strict_match_verify_stderr.log"; manifest_file = scenario_state / "remote_verify_manifest.txt"
|
||||
result = run_command([context.onedrive_bin, "--sync", "--verbose", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(confdir)], cwd=context.repo_root)
|
||||
write_text_file(stdout_file, result.stdout); write_text_file(stderr_file, result.stderr)
|
||||
verify_result = run_command([context.onedrive_bin, "--sync", "--verbose", "--download-only", "--resync", "--resync-auth", "--syncdir", str(verify_root), "--confdir", str(verify_conf)], cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout); write_text_file(verify_stderr, verify_result.stderr); manifest = build_manifest(verify_root); write_manifest(manifest_file, manifest)
|
||||
all_artifacts.extend([str(stdout_file), str(stderr_file), str(verify_stdout), str(verify_stderr), str(manifest_file)])
|
||||
if result.returncode != 0: failures.append(f"Strict skip_dir scenario failed with status {result.returncode}"); return
|
||||
if verify_result.returncode != 0: failures.append(f"Strict skip_dir verification failed with status {verify_result.returncode}"); return
|
||||
if f"{root}/Keep/ok.txt" not in manifest: failures.append("Strict skip_dir scenario did not synchronise expected non-skipped content")
|
||||
if f"{root}/Cache/top.txt" not in manifest: failures.append("Strict skip_dir scenario incorrectly skipped top-level Cache directory")
|
||||
if f"{root}/App/Cache/nested.txt" in manifest: failures.append("Strict skip_dir scenario unexpectedly synchronised strict-matched directory content")
|
||||
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_log_dir = context.logs_dir / "tc0012"; reset_directory(case_log_dir); context.ensure_refresh_token_available()
|
||||
all_artifacts = []; failures = []
|
||||
self._run_loose(context, case_log_dir, all_artifacts, failures)
|
||||
self._run_strict(context, case_log_dir, all_artifacts, failures)
|
||||
details = {"failures": failures}
|
||||
if failures: return TestResult.fail_result(self.case_id, self.name, "; ".join(failures), all_artifacts, details)
|
||||
return TestResult.pass_result(self.case_id, self.name, all_artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,41 +1,43 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0013SkipDotfilesValidation(Wave1TestCaseBase):
|
||||
class TestCase0013SkipDotfilesValidation(E2ETestCase):
|
||||
case_id = "0013"
|
||||
name = "skip_dotfiles validation"
|
||||
description = "Validate that dotfiles and dot-directories are excluded when skip_dotfiles is enabled"
|
||||
description = "Validate that skip_dotfiles prevents dotfiles and dot-directories from synchronising"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
sync_root = case_work_dir / "syncroot"
|
||||
sync_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(sync_root / root_name / ".hidden.txt", "hidden\n")
|
||||
self._create_text_file(sync_root / root_name / ".dotdir" / "inside.txt", "inside dotdir\n")
|
||||
self._create_text_file(sync_root / root_name / "visible.txt", "visible\n")
|
||||
self._create_text_file(sync_root / root_name / "normal" / "keep.md", "normal keep\n")
|
||||
conf_dir = self._new_config_dir(context, case_work_dir, "main")
|
||||
config_path = self._write_config(conf_dir, extra_lines=['skip_dotfiles = "true"'])
|
||||
artifacts.append(str(config_path))
|
||||
result = self._run_onedrive(context, sync_root=sync_root, config_dir=conf_dir, extra_args=["--single-directory", root_name])
|
||||
artifacts.extend(self._write_command_artifacts(result=result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="skip_dotfiles"))
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"skip_dotfiles validation failed with status {result.returncode}", artifacts)
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "verify_remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="verify_remote"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts)
|
||||
snapshot = self._snapshot_files(verify_root)
|
||||
for required in [f"{root_name}/visible.txt", f"{root_name}/normal/keep.md"]:
|
||||
if required not in snapshot:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Expected visible content missing remotely: {required}", artifacts)
|
||||
for forbidden in [f"{root_name}/.hidden.txt", f"{root_name}/.dotdir/inside.txt"]:
|
||||
if forbidden in snapshot:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Dotfile content was unexpectedly synchronised: {forbidden}", artifacts)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0013 config\nbypass_data_preservation = \"true\"\nskip_dotfiles = \"true\"\n")
|
||||
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0013"; case_log_dir = context.logs_dir / "tc0013"; state_dir = context.state_dir / "tc0013"
|
||||
reset_directory(case_work_dir); reset_directory(case_log_dir); reset_directory(state_dir); context.ensure_refresh_token_available()
|
||||
sync_root = case_work_dir / "syncroot"; confdir = case_work_dir / "conf-main"; verify_root = case_work_dir / "verifyroot"; verify_conf = case_work_dir / "conf-verify"; root_name = f"ZZ_E2E_TC0013_{context.run_id}_{os.getpid()}"
|
||||
write_text_file(sync_root / root_name / "visible.txt", "visible\n"); write_text_file(sync_root / root_name / ".hidden.txt", "hidden\n"); write_text_file(sync_root / root_name / ".dotdir" / "inside.txt", "inside\n")
|
||||
context.bootstrap_config_dir(confdir); self._write_config(confdir / "config")
|
||||
context.bootstrap_config_dir(verify_conf); write_text_file(verify_conf / "config", "# verify\nbypass_data_preservation = \"true\"\n")
|
||||
stdout_file = case_log_dir / "skip_dotfiles_stdout.log"; stderr_file = case_log_dir / "skip_dotfiles_stderr.log"; verify_stdout = case_log_dir / "verify_stdout.log"; verify_stderr = case_log_dir / "verify_stderr.log"; remote_manifest_file = state_dir / "remote_verify_manifest.txt"; metadata_file = state_dir / "metadata.txt"
|
||||
command = [context.onedrive_bin, "--sync", "--verbose", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(confdir)]
|
||||
result = run_command(command, cwd=context.repo_root)
|
||||
write_text_file(stdout_file, result.stdout); write_text_file(stderr_file, result.stderr)
|
||||
verify_command = [context.onedrive_bin, "--sync", "--verbose", "--download-only", "--resync", "--resync-auth", "--syncdir", str(verify_root), "--confdir", str(verify_conf)]
|
||||
verify_result = run_command(verify_command, cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout); write_text_file(verify_stderr, verify_result.stderr); remote_manifest = build_manifest(verify_root); write_manifest(remote_manifest_file, remote_manifest)
|
||||
write_text_file(metadata_file, f"root_name={root_name}\nreturncode={result.returncode}\nverify_returncode={verify_result.returncode}\n")
|
||||
artifacts = [str(stdout_file), str(stderr_file), str(verify_stdout), str(verify_stderr), str(remote_manifest_file), str(metadata_file)]
|
||||
details = {"returncode": result.returncode, "verify_returncode": verify_result.returncode, "root_name": root_name}
|
||||
if result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"skip_dotfiles validation failed with status {result.returncode}", artifacts, details)
|
||||
if verify_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts, details)
|
||||
if f"{root_name}/visible.txt" not in remote_manifest: return TestResult.fail_result(self.case_id, self.name, "Visible file missing after skip_dotfiles processing", artifacts, details)
|
||||
for unwanted in [f"{root_name}/.hidden.txt", f"{root_name}/.dotdir", f"{root_name}/.dotdir/inside.txt"]:
|
||||
if unwanted in remote_manifest: return TestResult.fail_result(self.case_id, self.name, f"Dotfile content was unexpectedly synchronised: {unwanted}", artifacts, details)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,37 +1,43 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0014SkipSizeValidation(Wave1TestCaseBase):
|
||||
class TestCase0014SkipSizeValidation(E2ETestCase):
|
||||
case_id = "0014"
|
||||
name = "skip_size validation"
|
||||
description = "Validate that files above the configured size threshold are excluded from synchronisation"
|
||||
description = "Validate that skip_size prevents oversized files from synchronising"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
sync_root = case_work_dir / "syncroot"
|
||||
sync_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_binary_file(sync_root / root_name / "small.bin", 128 * 1024)
|
||||
self._create_binary_file(sync_root / root_name / "large.bin", 2 * 1024 * 1024)
|
||||
conf_dir = self._new_config_dir(context, case_work_dir, "main")
|
||||
config_path = self._write_config(conf_dir, extra_lines=['skip_size = "1"'])
|
||||
artifacts.append(str(config_path))
|
||||
result = self._run_onedrive(context, sync_root=sync_root, config_dir=conf_dir, extra_args=["--single-directory", root_name])
|
||||
artifacts.extend(self._write_command_artifacts(result=result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="skip_size"))
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"skip_size validation failed with status {result.returncode}", artifacts)
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "verify_remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="verify_remote"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts)
|
||||
snapshot = self._snapshot_files(verify_root)
|
||||
if f"{root_name}/small.bin" not in snapshot:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Small file is missing remotely", artifacts)
|
||||
if f"{root_name}/large.bin" in snapshot:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Large file exceeded skip_size threshold but was synchronised", artifacts)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0014 config\nbypass_data_preservation = \"true\"\nskip_size = \"1\"\n")
|
||||
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0014"; case_log_dir = context.logs_dir / "tc0014"; state_dir = context.state_dir / "tc0014"
|
||||
reset_directory(case_work_dir); reset_directory(case_log_dir); reset_directory(state_dir); context.ensure_refresh_token_available()
|
||||
sync_root = case_work_dir / "syncroot"; confdir = case_work_dir / "conf-main"; verify_root = case_work_dir / "verifyroot"; verify_conf = case_work_dir / "conf-verify"; root_name = f"ZZ_E2E_TC0014_{context.run_id}_{os.getpid()}"
|
||||
write_text_file(sync_root / root_name / "small.bin", "a" * 16384)
|
||||
big_path = sync_root / root_name / "large.bin"; big_path.parent.mkdir(parents=True, exist_ok=True); big_path.write_bytes(b"B" * (2 * 1024 * 1024))
|
||||
context.bootstrap_config_dir(confdir); self._write_config(confdir / "config")
|
||||
context.bootstrap_config_dir(verify_conf); write_text_file(verify_conf / "config", "# verify\nbypass_data_preservation = \"true\"\n")
|
||||
stdout_file = case_log_dir / "skip_size_stdout.log"; stderr_file = case_log_dir / "skip_size_stderr.log"; verify_stdout = case_log_dir / "verify_stdout.log"; verify_stderr = case_log_dir / "verify_stderr.log"; remote_manifest_file = state_dir / "remote_verify_manifest.txt"; metadata_file = state_dir / "metadata.txt"
|
||||
command = [context.onedrive_bin, "--sync", "--verbose", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(confdir)]
|
||||
result = run_command(command, cwd=context.repo_root)
|
||||
write_text_file(stdout_file, result.stdout); write_text_file(stderr_file, result.stderr)
|
||||
verify_command = [context.onedrive_bin, "--sync", "--verbose", "--download-only", "--resync", "--resync-auth", "--syncdir", str(verify_root), "--confdir", str(verify_conf)]
|
||||
verify_result = run_command(verify_command, cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout); write_text_file(verify_stderr, verify_result.stderr); remote_manifest = build_manifest(verify_root); write_manifest(remote_manifest_file, remote_manifest)
|
||||
write_text_file(metadata_file, f"root_name={root_name}\nlarge_size={big_path.stat().st_size}\nreturncode={result.returncode}\nverify_returncode={verify_result.returncode}\n")
|
||||
artifacts = [str(stdout_file), str(stderr_file), str(verify_stdout), str(verify_stderr), str(remote_manifest_file), str(metadata_file)]
|
||||
details = {"returncode": result.returncode, "verify_returncode": verify_result.returncode, "root_name": root_name, "large_size": big_path.stat().st_size}
|
||||
if result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"skip_size validation failed with status {result.returncode}", artifacts, details)
|
||||
if verify_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts, details)
|
||||
if f"{root_name}/small.bin" not in remote_manifest: return TestResult.fail_result(self.case_id, self.name, "Small file missing after skip_size processing", artifacts, details)
|
||||
if f"{root_name}/large.bin" in remote_manifest: return TestResult.fail_result(self.case_id, self.name, "Large file exceeded skip_size threshold but was synchronised", artifacts, details)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,43 +1,42 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0015SkipSymlinksValidation(Wave1TestCaseBase):
|
||||
class TestCase0015SkipSymlinksValidation(E2ETestCase):
|
||||
case_id = "0015"
|
||||
name = "skip_symlinks validation"
|
||||
description = "Validate that symbolic links are excluded when skip_symlinks is enabled"
|
||||
description = "Validate that skip_symlinks prevents symbolic links from synchronising"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
sync_root = case_work_dir / "syncroot"
|
||||
sync_root.mkdir(parents=True, exist_ok=True)
|
||||
target_file = sync_root / root_name / "real.txt"
|
||||
self._create_text_file(target_file, "real content\n")
|
||||
symlink_path = sync_root / root_name / "real-link.txt"
|
||||
symlink_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
os.symlink("real.txt", symlink_path)
|
||||
conf_dir = self._new_config_dir(context, case_work_dir, "main")
|
||||
config_path = self._write_config(conf_dir, extra_lines=['skip_symlinks = "true"'])
|
||||
artifacts.append(str(config_path))
|
||||
artifacts.append(self._write_json_artifact(case_state_dir / "local_snapshot_pre.json", self._snapshot_files(sync_root)))
|
||||
result = self._run_onedrive(context, sync_root=sync_root, config_dir=conf_dir)
|
||||
artifacts.extend(self._write_command_artifacts(result=result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="skip_symlinks"))
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"skip_symlinks validation failed with status {result.returncode}", artifacts)
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "verify_remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="verify_remote"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts)
|
||||
snapshot = self._snapshot_files(verify_root)
|
||||
if f"{root_name}/real.txt" not in snapshot:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Real file is missing remotely", artifacts)
|
||||
if f"{root_name}/real-link.txt" in snapshot:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Symbolic link was unexpectedly synchronised", artifacts)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0015 config\nbypass_data_preservation = \"true\"\nskip_symlinks = \"true\"\n")
|
||||
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0015"; case_log_dir = context.logs_dir / "tc0015"; state_dir = context.state_dir / "tc0015"
|
||||
reset_directory(case_work_dir); reset_directory(case_log_dir); reset_directory(state_dir); context.ensure_refresh_token_available()
|
||||
sync_root = case_work_dir / "syncroot"; confdir = case_work_dir / "conf-main"; verify_root = case_work_dir / "verifyroot"; verify_conf = case_work_dir / "conf-verify"; root_name = f"ZZ_E2E_TC0015_{context.run_id}_{os.getpid()}"
|
||||
target = sync_root / root_name / "real.txt"; write_text_file(target, "real\n"); link = sync_root / root_name / "linked.txt"; link.parent.mkdir(parents=True, exist_ok=True); link.symlink_to(target.name)
|
||||
context.bootstrap_config_dir(confdir); self._write_config(confdir / "config")
|
||||
context.bootstrap_config_dir(verify_conf); write_text_file(verify_conf / "config", "# verify\nbypass_data_preservation = \"true\"\n")
|
||||
stdout_file = case_log_dir / "skip_symlinks_stdout.log"; stderr_file = case_log_dir / "skip_symlinks_stderr.log"; verify_stdout = case_log_dir / "verify_stdout.log"; verify_stderr = case_log_dir / "verify_stderr.log"; remote_manifest_file = state_dir / "remote_verify_manifest.txt"; metadata_file = state_dir / "metadata.txt"
|
||||
command = [context.onedrive_bin, "--sync", "--verbose", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(confdir)]
|
||||
result = run_command(command, cwd=context.repo_root)
|
||||
write_text_file(stdout_file, result.stdout); write_text_file(stderr_file, result.stderr)
|
||||
verify_command = [context.onedrive_bin, "--sync", "--verbose", "--download-only", "--resync", "--resync-auth", "--syncdir", str(verify_root), "--confdir", str(verify_conf)]
|
||||
verify_result = run_command(verify_command, cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout); write_text_file(verify_stderr, verify_result.stderr); remote_manifest = build_manifest(verify_root); write_manifest(remote_manifest_file, remote_manifest)
|
||||
write_text_file(metadata_file, f"root_name={root_name}\nreturncode={result.returncode}\nverify_returncode={verify_result.returncode}\n")
|
||||
artifacts = [str(stdout_file), str(stderr_file), str(verify_stdout), str(verify_stderr), str(remote_manifest_file), str(metadata_file)]
|
||||
details = {"returncode": result.returncode, "verify_returncode": verify_result.returncode, "root_name": root_name}
|
||||
if result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"skip_symlinks validation failed with status {result.returncode}", artifacts, details)
|
||||
if verify_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts, details)
|
||||
if f"{root_name}/real.txt" not in remote_manifest: return TestResult.fail_result(self.case_id, self.name, "Regular file missing after skip_symlinks processing", artifacts, details)
|
||||
if f"{root_name}/linked.txt" in remote_manifest: return TestResult.fail_result(self.case_id, self.name, "Symbolic link was unexpectedly synchronised", artifacts, details)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,39 +1,43 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.result import TestResult
|
||||
from testcases.wave1_common import Wave1TestCaseBase
|
||||
from framework.utils import command_to_string, reset_directory, run_command, write_text_file
|
||||
|
||||
|
||||
class TestCase0016CheckNosyncValidation(Wave1TestCaseBase):
|
||||
class TestCase0016CheckNosyncValidation(E2ETestCase):
|
||||
case_id = "0016"
|
||||
name = "check_nosync validation"
|
||||
description = "Validate that local directories containing .nosync are excluded when check_nosync is enabled"
|
||||
description = "Validate that check_nosync prevents directories containing .nosync from synchronising"
|
||||
|
||||
def run(self, context):
|
||||
case_work_dir, case_log_dir, case_state_dir = self._initialise_case_dirs(context)
|
||||
root_name = self._root_name(context)
|
||||
artifacts = []
|
||||
sync_root = case_work_dir / "syncroot"
|
||||
sync_root.mkdir(parents=True, exist_ok=True)
|
||||
self._create_text_file(sync_root / root_name / "Blocked" / ".nosync", "marker\n")
|
||||
self._create_text_file(sync_root / root_name / "Blocked" / "blocked.txt", "blocked\n")
|
||||
self._create_text_file(sync_root / root_name / "Allowed" / "allowed.txt", "allowed\n")
|
||||
conf_dir = self._new_config_dir(context, case_work_dir, "main")
|
||||
config_path = self._write_config(conf_dir, extra_lines=['check_nosync = "true"'])
|
||||
artifacts.append(str(config_path))
|
||||
result = self._run_onedrive(context, sync_root=sync_root, config_dir=conf_dir)
|
||||
artifacts.extend(self._write_command_artifacts(result=result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="check_nosync"))
|
||||
if result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"check_nosync validation failed with status {result.returncode}", artifacts)
|
||||
verify_root, verify_result, verify_artifacts = self._download_remote_scope(context, case_work_dir, root_name, "verify_remote")
|
||||
artifacts.extend(verify_artifacts)
|
||||
artifacts.extend(self._write_command_artifacts(result=verify_result, log_dir=case_log_dir, state_dir=case_state_dir, phase_name="verify_remote"))
|
||||
if verify_result.returncode != 0:
|
||||
return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts)
|
||||
snapshot = self._snapshot_files(verify_root)
|
||||
if f"{root_name}/Allowed/allowed.txt" not in snapshot:
|
||||
return TestResult.fail_result(self.case_id, self.name, "Allowed content is missing remotely", artifacts)
|
||||
for forbidden in [f"{root_name}/Blocked/blocked.txt", f"{root_name}/Blocked/.nosync"]:
|
||||
if forbidden in snapshot:
|
||||
return TestResult.fail_result(self.case_id, self.name, f".nosync-protected content was unexpectedly synchronised: {forbidden}", artifacts)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, {"root_name": root_name})
|
||||
def _write_config(self, config_path: Path) -> None:
|
||||
write_text_file(config_path, "# tc0016 config\nbypass_data_preservation = \"true\"\ncheck_nosync = \"true\"\n")
|
||||
|
||||
def run(self, context: E2EContext) -> TestResult:
|
||||
case_work_dir = context.work_root / "tc0016"; case_log_dir = context.logs_dir / "tc0016"; state_dir = context.state_dir / "tc0016"
|
||||
reset_directory(case_work_dir); reset_directory(case_log_dir); reset_directory(state_dir); context.ensure_refresh_token_available()
|
||||
sync_root = case_work_dir / "syncroot"; confdir = case_work_dir / "conf-main"; verify_root = case_work_dir / "verifyroot"; verify_conf = case_work_dir / "conf-verify"; root_name = f"ZZ_E2E_TC0016_{context.run_id}_{os.getpid()}"
|
||||
write_text_file(sync_root / root_name / "Allowed" / "ok.txt", "ok\n"); write_text_file(sync_root / root_name / "Blocked" / ".nosync", ""); write_text_file(sync_root / root_name / "Blocked" / "blocked.txt", "blocked\n")
|
||||
context.bootstrap_config_dir(confdir); self._write_config(confdir / "config")
|
||||
context.bootstrap_config_dir(verify_conf); write_text_file(verify_conf / "config", "# verify\nbypass_data_preservation = \"true\"\n")
|
||||
stdout_file = case_log_dir / "check_nosync_stdout.log"; stderr_file = case_log_dir / "check_nosync_stderr.log"; verify_stdout = case_log_dir / "verify_stdout.log"; verify_stderr = case_log_dir / "verify_stderr.log"; remote_manifest_file = state_dir / "remote_verify_manifest.txt"; metadata_file = state_dir / "metadata.txt"
|
||||
command = [context.onedrive_bin, "--sync", "--verbose", "--resync", "--resync-auth", "--syncdir", str(sync_root), "--confdir", str(confdir)]
|
||||
result = run_command(command, cwd=context.repo_root)
|
||||
write_text_file(stdout_file, result.stdout); write_text_file(stderr_file, result.stderr)
|
||||
verify_command = [context.onedrive_bin, "--sync", "--verbose", "--download-only", "--resync", "--resync-auth", "--syncdir", str(verify_root), "--confdir", str(verify_conf)]
|
||||
verify_result = run_command(verify_command, cwd=context.repo_root)
|
||||
write_text_file(verify_stdout, verify_result.stdout); write_text_file(verify_stderr, verify_result.stderr); remote_manifest = build_manifest(verify_root); write_manifest(remote_manifest_file, remote_manifest)
|
||||
write_text_file(metadata_file, f"root_name={root_name}\nreturncode={result.returncode}\nverify_returncode={verify_result.returncode}\n")
|
||||
artifacts = [str(stdout_file), str(stderr_file), str(verify_stdout), str(verify_stderr), str(remote_manifest_file), str(metadata_file)]
|
||||
details = {"returncode": result.returncode, "verify_returncode": verify_result.returncode, "root_name": root_name}
|
||||
if result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"check_nosync validation failed with status {result.returncode}", artifacts, details)
|
||||
if verify_result.returncode != 0: return TestResult.fail_result(self.case_id, self.name, f"Remote verification failed with status {verify_result.returncode}", artifacts, details)
|
||||
if f"{root_name}/Allowed/ok.txt" not in remote_manifest: return TestResult.fail_result(self.case_id, self.name, "Allowed content missing after check_nosync processing", artifacts, details)
|
||||
for unwanted in [f"{root_name}/Blocked", f"{root_name}/Blocked/.nosync", f"{root_name}/Blocked/blocked.txt"]:
|
||||
if unwanted in remote_manifest: return TestResult.fail_result(self.case_id, self.name, f".nosync directory content was unexpectedly synchronised: {unwanted}", artifacts, details)
|
||||
return TestResult.pass_result(self.case_id, self.name, artifacts, details)
|
||||
|
|
|
|||
|
|
@ -1,194 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
from framework.base import E2ETestCase
|
||||
from framework.context import E2EContext
|
||||
from framework.manifest import build_manifest, write_manifest
|
||||
from framework.utils import (
|
||||
command_to_string,
|
||||
reset_directory,
|
||||
run_command,
|
||||
write_text_file,
|
||||
)
|
||||
|
||||
CONFIG_FILE_NAME = "config"
|
||||
|
||||
|
||||
class Wave1TestCaseBase(E2ETestCase):
|
||||
"""
|
||||
Shared helper base for Wave 1 E2E test cases.
|
||||
|
||||
Important design rule: Wave 1 test cases must not use sync_list.
|
||||
TC0002 is the sole owner of sync_list validation.
|
||||
"""
|
||||
|
||||
def _safe_run_id(self, context: E2EContext) -> str:
|
||||
value = re.sub(r"[^A-Za-z0-9]+", "_", context.run_id).strip("_").lower()
|
||||
return value or "run"
|
||||
|
||||
def _root_name(self, context: E2EContext) -> str:
|
||||
return f"ZZ_E2E_TC{self.case_id}_{self._safe_run_id(context)}"
|
||||
|
||||
def _initialise_case_dirs(self, context: E2EContext) -> tuple[Path, Path, Path]:
|
||||
case_work_dir = context.work_root / f"tc{self.case_id}"
|
||||
case_log_dir = context.logs_dir / f"tc{self.case_id}"
|
||||
case_state_dir = context.state_dir / f"tc{self.case_id}"
|
||||
reset_directory(case_work_dir)
|
||||
reset_directory(case_log_dir)
|
||||
reset_directory(case_state_dir)
|
||||
return case_work_dir, case_log_dir, case_state_dir
|
||||
|
||||
def _new_config_dir(self, context: E2EContext, case_work_dir: Path, name: str) -> Path:
|
||||
config_dir = case_work_dir / f"conf-{name}"
|
||||
reset_directory(config_dir)
|
||||
context.bootstrap_config_dir(config_dir)
|
||||
return config_dir
|
||||
|
||||
def _write_config(
|
||||
self,
|
||||
config_dir: Path,
|
||||
*,
|
||||
extra_lines: Iterable[str] | None = None,
|
||||
) -> Path:
|
||||
config_path = config_dir / CONFIG_FILE_NAME
|
||||
|
||||
lines = [
|
||||
f"# tc{self.case_id} generated config",
|
||||
'bypass_data_preservation = "true"',
|
||||
'monitor_interval = 5',
|
||||
]
|
||||
if extra_lines:
|
||||
lines.extend(list(extra_lines))
|
||||
write_text_file(config_path, "\n".join(lines) + "\n")
|
||||
|
||||
return config_path
|
||||
|
||||
def _run_onedrive(
|
||||
self,
|
||||
context: E2EContext,
|
||||
*,
|
||||
sync_root: Path,
|
||||
config_dir: Path,
|
||||
extra_args: list[str] | None = None,
|
||||
use_resync: bool = True,
|
||||
use_resync_auth: bool = True,
|
||||
input_text: str | None = None,
|
||||
):
|
||||
command = [context.onedrive_bin, "--sync", "--verbose"]
|
||||
if use_resync:
|
||||
command.append("--resync")
|
||||
if use_resync_auth:
|
||||
command.append("--resync-auth")
|
||||
command.extend(["--syncdir", str(sync_root), "--confdir", str(config_dir)])
|
||||
if extra_args:
|
||||
command.extend(extra_args)
|
||||
|
||||
context.log(f"Executing Test Case {self.case_id}: {command_to_string(command)}")
|
||||
return run_command(command, cwd=context.repo_root, input_text=input_text)
|
||||
|
||||
def _write_command_artifacts(
|
||||
self,
|
||||
*,
|
||||
result,
|
||||
log_dir: Path,
|
||||
state_dir: Path,
|
||||
phase_name: str,
|
||||
extra_metadata: dict[str, str | int | bool] | None = None,
|
||||
) -> list[str]:
|
||||
stdout_file = log_dir / f"{phase_name}_stdout.log"
|
||||
stderr_file = log_dir / f"{phase_name}_stderr.log"
|
||||
metadata_file = state_dir / f"{phase_name}_metadata.txt"
|
||||
|
||||
write_text_file(stdout_file, result.stdout)
|
||||
write_text_file(stderr_file, result.stderr)
|
||||
|
||||
metadata = {
|
||||
"phase": phase_name,
|
||||
"command": command_to_string(result.command),
|
||||
"returncode": result.returncode,
|
||||
}
|
||||
if extra_metadata:
|
||||
metadata.update(extra_metadata)
|
||||
|
||||
lines = [f"{key}={value}" for key, value in metadata.items()]
|
||||
write_text_file(metadata_file, "\n".join(lines) + "\n")
|
||||
|
||||
return [str(stdout_file), str(stderr_file), str(metadata_file)]
|
||||
|
||||
def _write_manifests(self, root: Path, state_dir: Path, prefix: str) -> list[str]:
|
||||
manifest_file = state_dir / f"{prefix}_manifest.txt"
|
||||
write_manifest(manifest_file, build_manifest(root))
|
||||
return [str(manifest_file)]
|
||||
|
||||
def _write_json_artifact(self, path: Path, payload: object) -> str:
|
||||
write_text_file(path, json.dumps(payload, indent=2, sort_keys=True) + "\n")
|
||||
return str(path)
|
||||
|
||||
def _create_text_file(self, path: Path, content: str) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(content, encoding="utf-8")
|
||||
|
||||
def _create_binary_file(self, path: Path, size_bytes: int) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
chunk = os.urandom(min(size_bytes, 1024 * 1024))
|
||||
with path.open("wb") as fp:
|
||||
remaining = size_bytes
|
||||
while remaining > 0:
|
||||
to_write = chunk[: min(len(chunk), remaining)]
|
||||
fp.write(to_write)
|
||||
remaining -= len(to_write)
|
||||
|
||||
def _snapshot_files(self, root: Path) -> dict[str, str]:
|
||||
result: dict[str, str] = {}
|
||||
if not root.exists():
|
||||
return result
|
||||
|
||||
for path in sorted(root.rglob("*")):
|
||||
rel = path.relative_to(root).as_posix()
|
||||
if path.is_symlink():
|
||||
result[rel] = f"symlink->{os.readlink(path)}"
|
||||
continue
|
||||
if path.is_dir():
|
||||
result[rel] = "dir"
|
||||
continue
|
||||
hasher = hashlib.sha256()
|
||||
with path.open("rb") as fp:
|
||||
while True:
|
||||
chunk = fp.read(8192)
|
||||
if not chunk:
|
||||
break
|
||||
hasher.update(chunk)
|
||||
result[rel] = hasher.hexdigest()
|
||||
return result
|
||||
|
||||
def _download_remote_scope(
|
||||
self,
|
||||
context: E2EContext,
|
||||
case_work_dir: Path,
|
||||
scope_root: str,
|
||||
name: str,
|
||||
*,
|
||||
extra_config_lines: Iterable[str] | None = None,
|
||||
extra_args: list[str] | None = None,
|
||||
) -> tuple[Path, object, list[str]]:
|
||||
verify_root = case_work_dir / f"verify-{name}"
|
||||
reset_directory(verify_root)
|
||||
config_dir = self._new_config_dir(context, case_work_dir, f"verify-{name}")
|
||||
config_path = self._write_config(
|
||||
config_dir,
|
||||
extra_lines=extra_config_lines,
|
||||
)
|
||||
result = self._run_onedrive(
|
||||
context,
|
||||
sync_root=verify_root,
|
||||
config_dir=config_dir,
|
||||
extra_args=["--download-only", "--single-directory", scope_root] + (extra_args or []),
|
||||
)
|
||||
artifacts = [str(config_path)]
|
||||
return verify_root, result, artifacts
|
||||
Loading…
Add table
Add a link
Reference in a new issue