Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions src/azure-cli/azure/cli/command_modules/acs/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -2050,11 +2050,32 @@ def merge_kubernetes_configurations(existing_file, addition_file, replace, conte
existing_file_perms,
)

# Refuse to write through a symlink
if os.path.islink(existing_file):
raise CLIError(
'Kubeconfig path "{}" is a symbolic link. '
'Refusing to write to prevent symlink-following attacks.'.format(existing_file)
)

# Atomic write: write to a temp file in the same directory, then replace
parent_dir = os.path.dirname(existing_file) or '.'
tmp_fd, tmp_path = tempfile.mkstemp(dir=parent_dir)
try:
with open(existing_file, 'w+') as stream:
with os.fdopen(tmp_fd, 'w') as stream:
yaml.safe_dump(existing, stream, default_flow_style=False)
except OSError as ex:
if getattr(ex, 'errno', 0) in (errno.EACCES, errno.EPERM, errno.EROFS):
# Preserve existing file permissions if available, otherwise default to 0600
if os.path.exists(existing_file):
existing_mode = stat.S_IMODE(os.stat(existing_file).st_mode)
os.chmod(tmp_path, existing_mode)
else:
os.chmod(tmp_path, 0o600)
os.replace(tmp_path, existing_file)
except Exception as ex: # pylint: disable=broad-except
try:
os.unlink(tmp_path)
except OSError:
pass
if isinstance(ex, OSError) and getattr(ex, 'errno', 0) in (errno.EACCES, errno.EPERM, errno.EROFS):
raise FileOperationError(
'Permission denied when trying to write to {}. '
'Please ensure you have write access to this file, or specify a different file path '
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,41 @@ def test_merge_credentials_already_present(self):
self.assertEqual(merged['users'], expected_users)
self.assertEqual(merged['current-context'], obj2['current-context'])

@unittest.skipIf(os.name == 'nt', 'Symlink test not applicable on Windows')
def test_merge_credentials_rejects_symlink(self):
# Create a real kubeconfig file and a symlink pointing to it
target = tempfile.NamedTemporaryFile(delete=False, suffix='.kubeconfig')
target.close()
with open(target.name, 'w') as f:
yaml.safe_dump({'clusters': [], 'contexts': [], 'users': [],
'current-context': '', 'kind': 'Config'}, f)
self.addCleanup(os.remove, target.name)

symlink_path = target.name + '.link'
os.symlink(target.name, symlink_path)
self.addCleanup(lambda: os.remove(symlink_path) if os.path.islink(symlink_path) else None)

addition = tempfile.NamedTemporaryFile(delete=False)
addition.close()
obj = {
'clusters': [{'cluster': {'server': 'https://test'}, 'name': 'c1'}],
'contexts': [{'context': {'cluster': 'c1', 'user': 'u1'}, 'name': 'ctx1'}],
'users': [{'name': 'u1', 'user': {'token': 'tok'}}],
'current-context': 'ctx1',
}
with open(addition.name, 'w') as f:
yaml.safe_dump(obj, f)
self.addCleanup(os.remove, addition.name)

# Should raise CLIError when existing_file is a symlink
with self.assertRaises(CLIError):
merge_kubernetes_configurations(symlink_path, addition.name, False)

# Verify the symlink target was not modified
with open(target.name, 'r') as f:
content = yaml.safe_load(f)
self.assertEqual(content['clusters'], [])

@mock.patch('azure.cli.command_modules.acs.addonconfiguration.get_rg_location', return_value='eastus')
@mock.patch('azure.cli.command_modules.acs.addonconfiguration.get_resource_groups_client', autospec=True)
@mock.patch('azure.cli.command_modules.acs.addonconfiguration.get_resources_client', autospec=True)
Expand Down
Loading