mirror of
https://github.com/home-assistant/core.git
synced 2026-04-17 23:53:49 +01:00
Improve ProxmoxVE permissions validation (#164770)
This commit is contained in:
@@ -265,7 +265,8 @@ class ProxmoxNodeButtonEntity(ProxmoxNodeEntity, ProxmoxBaseButton):
|
||||
|
||||
async def _async_press_call(self) -> None:
|
||||
"""Execute the node button action via executor."""
|
||||
if not is_granted(self.coordinator.permissions, p_type="nodes"):
|
||||
node_id = self._node_data.node["node"]
|
||||
if not is_granted(self.coordinator.permissions, p_type="nodes", p_id=node_id):
|
||||
raise ServiceValidationError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="no_permission_node_power",
|
||||
@@ -273,7 +274,7 @@ class ProxmoxNodeButtonEntity(ProxmoxNodeEntity, ProxmoxBaseButton):
|
||||
await self.hass.async_add_executor_job(
|
||||
self.entity_description.press_action,
|
||||
self.coordinator,
|
||||
self._node_data.node["node"],
|
||||
node_id,
|
||||
)
|
||||
|
||||
|
||||
@@ -284,7 +285,8 @@ class ProxmoxVMButtonEntity(ProxmoxVMEntity, ProxmoxBaseButton):
|
||||
|
||||
async def _async_press_call(self) -> None:
|
||||
"""Execute the VM button action via executor."""
|
||||
if not is_granted(self.coordinator.permissions, p_type="vms"):
|
||||
vmid = self.vm_data["vmid"]
|
||||
if not is_granted(self.coordinator.permissions, p_type="vms", p_id=vmid):
|
||||
raise ServiceValidationError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="no_permission_vm_lxc_power",
|
||||
@@ -293,7 +295,7 @@ class ProxmoxVMButtonEntity(ProxmoxVMEntity, ProxmoxBaseButton):
|
||||
self.entity_description.press_action,
|
||||
self.coordinator,
|
||||
self._node_name,
|
||||
self.vm_data["vmid"],
|
||||
vmid,
|
||||
)
|
||||
|
||||
|
||||
@@ -304,8 +306,9 @@ class ProxmoxContainerButtonEntity(ProxmoxContainerEntity, ProxmoxBaseButton):
|
||||
|
||||
async def _async_press_call(self) -> None:
|
||||
"""Execute the container button action via executor."""
|
||||
vmid = self.container_data["vmid"]
|
||||
# Container power actions fall under vms
|
||||
if not is_granted(self.coordinator.permissions, p_type="vms"):
|
||||
if not is_granted(self.coordinator.permissions, p_type="vms", p_id=vmid):
|
||||
raise ServiceValidationError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="no_permission_vm_lxc_power",
|
||||
@@ -314,5 +317,5 @@ class ProxmoxContainerButtonEntity(ProxmoxContainerEntity, ProxmoxBaseButton):
|
||||
self.entity_description.press_action,
|
||||
self.coordinator,
|
||||
self._node_name,
|
||||
self.container_data["vmid"],
|
||||
vmid,
|
||||
)
|
||||
|
||||
@@ -6,8 +6,13 @@ from .const import PERM_POWER
|
||||
def is_granted(
|
||||
permissions: dict[str, dict[str, int]],
|
||||
p_type: str = "vms",
|
||||
p_id: str | int | None = None, # can be str for nodes
|
||||
permission: str = PERM_POWER,
|
||||
) -> bool:
|
||||
"""Validate user permissions for the given type and permission."""
|
||||
path = f"/{p_type}"
|
||||
return permissions.get(path, {}).get(permission) == 1
|
||||
paths = [f"/{p_type}/{p_id}", f"/{p_type}", "/"]
|
||||
for path in paths:
|
||||
value = permissions.get(path, {}).get(permission)
|
||||
if value is not None:
|
||||
return value == 1
|
||||
return False
|
||||
|
||||
@@ -25,16 +25,15 @@ AUDIT_PERMISSIONS = {
|
||||
}
|
||||
|
||||
POWER_PERMISSIONS = {
|
||||
"/": {"VM.PowerMgmt": 1},
|
||||
"/nodes": {"VM.PowerMgmt": 1},
|
||||
"/vms": {"VM.PowerMgmt": 1},
|
||||
"/": {
|
||||
"VM.PowerMgmt": 1,
|
||||
},
|
||||
"/vms/101": {"VM.PowerMgmt": 0},
|
||||
}
|
||||
|
||||
MERGED_PERMISSIONS = {
|
||||
key: value | POWER_PERMISSIONS.get(key, {})
|
||||
for key, value in AUDIT_PERMISSIONS.items()
|
||||
key: {**AUDIT_PERMISSIONS.get(key, {}), **POWER_PERMISSIONS.get(key, {})}
|
||||
for key in set(AUDIT_PERMISSIONS) | set(POWER_PERMISSIONS)
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -75,7 +75,7 @@ def mock_proxmox_client():
|
||||
"access_ticket.json", DOMAIN
|
||||
)
|
||||
|
||||
# Default to PVEUser privileges
|
||||
# Default privileges as defined
|
||||
mock_instance.access.permissions.get.return_value = MERGED_PERMISSIONS
|
||||
|
||||
# Make a separate mock for the qemu and lxc endpoints
|
||||
|
||||
@@ -330,7 +330,7 @@ async def test_node_buttons_permission_denied_for_auditor_role(
|
||||
entity_id: str,
|
||||
translation_key: str,
|
||||
) -> None:
|
||||
"""Test that buttons are missing when only Audit permissions exist."""
|
||||
"""Test that buttons are raising accordingly for Auditor permissions."""
|
||||
mock_proxmox_client.access.permissions.get.return_value = AUDIT_PERMISSIONS
|
||||
|
||||
await setup_integration(hass, mock_config_entry)
|
||||
@@ -343,3 +343,21 @@ async def test_node_buttons_permission_denied_for_auditor_role(
|
||||
blocking=True,
|
||||
)
|
||||
assert exc_info.value.translation_key == translation_key
|
||||
|
||||
|
||||
async def test_vm_buttons_denied_for_specific_vm(
|
||||
hass: HomeAssistant,
|
||||
mock_proxmox_client: MagicMock,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
) -> None:
|
||||
"""Test that button only works on actual permissions."""
|
||||
await setup_integration(hass, mock_config_entry)
|
||||
mock_proxmox_client._node_mock.qemu(101)
|
||||
|
||||
with pytest.raises(ServiceValidationError):
|
||||
await hass.services.async_call(
|
||||
BUTTON_DOMAIN,
|
||||
SERVICE_PRESS,
|
||||
{ATTR_ENTITY_ID: "button.vm_db_start"},
|
||||
blocking=True,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user