From c774a09570dccc13d88fe8734dfc507f96da70a4 Mon Sep 17 00:00:00 2001 From: Lucas Hoffmann Date: Tue, 15 Jul 2025 08:30:15 +0200 Subject: [PATCH] test: include original tests for atomic_write The tests were migrated from the original tests of the atomicwrites project: https://github.com/untitaker/python-atomicwrites --- test/test_contacts.py | 117 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 116 insertions(+), 1 deletion(-) diff --git a/test/test_contacts.py b/test/test_contacts.py index 7610e46b..c1580057 100644 --- a/test/test_contacts.py +++ b/test/test_contacts.py @@ -4,10 +4,14 @@ import base64 import datetime +import errno +import os +import pathlib +import tempfile import unittest from unittest import mock -from khard.contacts import Contact, multi_property_key +from khard.contacts import Contact, atomic_write, multi_property_key class ContactFormatDateObject(unittest.TestCase): @@ -80,3 +84,114 @@ def test_all_strings_are_sorted_before_dicts(self) -> None: my_list = ["a", {"c": "d"}, "e", {"f": "g"}] my_list.sort(key=multi_property_key) # type: ignore[arg-type] self.assertEqual(my_list, ["a", "e", {"c": "d"}, {"f": "g"}]) + + +class AtomicWrite(unittest.TestCase): + """Tests for the atomic_write functionself. + + These tests have been migrated from the original atomicwrites repository. + """ + + def setUp(self) -> None: + self.t = tempfile.TemporaryDirectory() + self.tmpdir = pathlib.Path(self.t.name) + return super().setUp() + + def tearDown(self) -> None: + self.t.cleanup() + return super().tearDown() + + def assertFileContents(self, file: pathlib.Path, contents: str) -> None: + """Assert the file contents of the given file""" + with file.open() as f: + return self.assertEqual(f.read(), contents) + + @staticmethod + def write(path: pathlib.Path, contents: str) -> None: + """Write a string to a file""" + with path.open("w") as f: + f.write(contents) + + def test_atomic_write(self) -> None: + fname = self.tmpdir / 'ha' + for i in range(2): + with atomic_write(str(fname), overwrite=True) as f: + f.write('hoho') + + with self.assertRaises(OSError) as excinfo: + with atomic_write(str(fname), overwrite=False) as f: + f.write('haha') + + self.assertEqual(excinfo.exception.errno, errno.EEXIST) + + self.assertFileContents(fname, 'hoho') + self.assertEqual(len(list(self.tmpdir.iterdir())), 1) + + + def test_teardown(self) -> None: + fname = self.tmpdir / 'ha' + with self.assertRaises(AssertionError): + with atomic_write(str(fname), overwrite=True): + self.fail("This code should not be reached") + + self.assertFalse(any(self.tmpdir.iterdir())) + + + def test_replace_simultaneously_created_file(self) -> None: + fname = self.tmpdir / 'ha' + with atomic_write(str(fname), overwrite=True) as f: + f.write('hoho') + self.write(fname, 'harhar') + self.assertFileContents(fname, 'harhar') + self.assertFileContents(fname, 'hoho') + self.assertEqual(len(list(self.tmpdir.iterdir())), 1) + + + def test_dont_remove_simultaneously_created_file(self) -> None: + fname = self.tmpdir / 'ha' + with self.assertRaises(OSError) as excinfo: + with atomic_write(str(fname), overwrite=False) as f: + f.write('hoho') + self.write(fname, 'harhar') + self.assertFileContents(fname, 'harhar') + + self.assertEqual(excinfo.exception.errno, errno.EEXIST) + self.assertFileContents(fname, 'harhar') + self.assertEqual(len(list(self.tmpdir.iterdir())), 1) + + + def test_open_reraise(self) -> None: + """Verify that nested exceptions during rollback do not overwrite the + initial exception that triggered a rollback.""" + fname = self.tmpdir / 'ha' + with self.assertRaises(AssertionError): + with atomic_write(str(fname), overwrite=False): + # Mess with internals; find and remove the temp file used by + # atomic_write internally. We're testing that the initial + # AssertionError triggered below is propagated up the stack, + # not the second exception triggered during commit. + tmp = next(self.tmpdir.iterdir()) + tmp.unlink() + # Now trigger our own exception. + self.fail("Intentional failure for testing purposes") + + + def test_atomic_write_in_cwd(self) -> None: + orig_curdir = os.getcwd() + try: + os.chdir(str(self.tmpdir)) + fname = 'ha' + for i in range(2): + with atomic_write(fname, overwrite=True) as f: + f.write('hoho') + + with self.assertRaises(OSError) as excinfo: + with atomic_write(fname, overwrite=False) as f: + f.write('haha') + + self.assertEqual(excinfo.exception.errno, errno.EEXIST) + + self.assertFileContents(pathlib.Path(fname), 'hoho') + self.assertEqual(len(list(self.tmpdir.iterdir())), 1) + finally: + os.chdir(orig_curdir)