Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Lib/pathlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1215,9 +1215,19 @@ def mkdir(self, mode=0o777, parents=False, exist_ok=False):
except OSError:
# Cannot rely on checking for EEXIST, since the operating system
# could give priority to other errors like EACCES or EROFS
if not exist_ok or not self.is_dir():
if not exist_ok:
raise

if not self.is_dir():
if not self.exists():
try:
os.mkdir(self, mode)
except FileExistsError:
if not self.is_dir():
raise
else:
raise

def chmod(self, mode, *, follow_symlinks=True):
"""
Change the permissions of the path, like os.chmod().
Expand Down
53 changes: 53 additions & 0 deletions Lib/test/test_pathlib/test_pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -2495,6 +2495,59 @@ def my_mkdir(path, mode=0o777):
self.assertNotIn(str(p12), concurrently_created)
self.assertTrue(p.exists())

def test_mkdir_exist_ok_concurrent_delete(self):
# Test for TOCTOU race condition with real threading.
# One thread repeatedly creates a directory with exist_ok=True,
# another thread repeatedly deletes it. This should never raise
# FileExistsError once the directory has been deleted.
import threading
import time

p = self.cls(self.base, 'dirTOCTOU')
self.assertFalse(p.exists())

p.mkdir()
self.assertTrue(p.exists())

errors = []
stop_flag = [False]

def mkdir_thread():
while not stop_flag[0]:
try:
p.mkdir(exist_ok=True)
except FileExistsError as e:
errors.append(e)
stop_flag[0] = True
break

def rmdir_thread():
while not stop_flag[0]:
try:
if p.exists():
p.rmdir()
except OSError:
pass

t1 = threading.Thread(target=mkdir_thread)
t2 = threading.Thread(target=rmdir_thread)

try:
t1.start()
t2.start()

time.sleep(0.5)

stop_flag[0] = True
t1.join(timeout=2.0)
t2.join(timeout=2.0)

self.assertEqual(errors, [])
finally:
stop_flag[0] = True
if p.exists():
p.rmdir()

@needs_symlinks
def test_symlink_to(self):
P = self.cls(self.base)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix mkdir TOCTOU race condition
Loading