Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions qubesadmin/device_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ def qbool(value):

if isinstance(value, str):
lcvalue = value.lower()
if lcvalue in ('0', 'no', 'false', 'off'):
if lcvalue in ("0", "no", "false", "off"):
return False
if lcvalue in ('1', 'yes', 'true', 'on'):
if lcvalue in ("1", "yes", "true", "on"):
return True
raise qubesadmin.exc.QubesValueError(
'Invalid literal for boolean property: {!r}'.format(value))
raise QubesValueError(
"Invalid literal for boolean property: {!r}".format(value)
)

return bool(value)

Expand Down Expand Up @@ -545,6 +546,15 @@ def __lt__(self, other):
def __repr__(self):
return f"{self.port!r}:{self.device_id}"

@property
def repr_for_qarg(self):
"""Object representation for qrexec argument"""
res = repr(self).replace(":", "+")
# replace '?' in category
unknown_dev = repr(DeviceInterface.unknown())
res = res.replace(unknown_dev, "_" * len(unknown_dev))
return res.replace("*", "_")

def __str__(self):
return f"{self.port}:{self.device_id}"

Expand Down Expand Up @@ -604,12 +614,20 @@ def _parse(
"""
if backend is None:
backend_name, identity = representation.split(sep, 1)
if backend_name == "_":
backend_name = "*"
if backend_name != "*":
backend = get_domain(backend_name)
else:
identity = representation

port_id, _, devid = identity.partition(":")
port_id, _, devid = identity.replace(sep, ":").partition(":")
if port_id == "_":
port_id = "*"
if devid == "_":
devid = "*"
unknown_dev = repr(DeviceInterface.unknown())
devid = devid.replace("_" * len(unknown_dev), unknown_dev)
return cls(
Port(backend_domain=backend, port_id=port_id, devclass=devclass),
device_id=devid or None,
Expand Down Expand Up @@ -641,7 +659,7 @@ class DeviceCategory(Enum):
"""

# pylint: disable=invalid-name
Other = ("*******", ) # also matches all devices, if used to block
Other = ("*******",) # also matches all devices, if used to block

# The following devices are used in GUI for blocks; take note when changing

Expand All @@ -651,17 +669,17 @@ class DeviceCategory(Enum):
Mouse = ("u03**02", "p0902**")
Input = ("u03****", "p09****") # HID etc.
Printer = ("u07****",)
Camera = ("p0903**", "u06****", "u0e****") # cameras and scanners
Camera = ("p0903**", "u06****", "u0e****") # cameras and scanners

Microphone = ("m******",)
Audio = ("p0403**", "p0401**", "p0408**", "u01****", "m******")
# Multimedia = Audio, Video, Displays etc.
Multimedia = ("u10****", "p03****", "p04****")
USB_Storage = ("u08****", )
Block_Storage = ("b******", )
USB_Storage = ("u08****",)
Block_Storage = ("b******",)
Storage = ("b******", "u08****", "p01****")
Bluetooth = ("ue00101", "p0d11**")
Smart_Card_Readers = ("u0b****", )
Smart_Card_Readers = ("u0b****",)

Display = ("p0300**", "p0380**") # PCI screens?
Memory = ("p05****",)
Expand Down Expand Up @@ -1285,6 +1303,11 @@ def clone(self, **kwargs):
def __repr__(self):
return f"{self.virtual_device!r}"

@property
def repr_for_qarg(self):
"""Object representation for qrexec argument"""
return self.virtual_device.repr_for_qarg

def __str__(self):
return f"{self.virtual_device}"

Expand Down
10 changes: 6 additions & 4 deletions qubesadmin/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _add(self, assignment: DeviceAssignment, action: str) -> None:
self._vm.qubesd_call(
None,
f"admin.vm.device.{self._class}.{action.capitalize()}",
repr(assignment),
assignment.repr_for_qarg,
assignment.serialize(),
)

Expand All @@ -179,7 +179,7 @@ def _remove(self, assignment: DeviceAssignment, action: str) -> None:
self._vm.qubesd_call(
None,
f"admin.vm.device.{self._class}.{action.capitalize()}",
repr(assignment),
assignment.repr_for_qarg,
)

def get_dedicated_devices(self) -> Iterable[DeviceAssignment]:
Expand Down Expand Up @@ -266,7 +266,9 @@ def get_exposed_devices(self) -> Iterable[DeviceInfo]:
expected_devclass=self._class,
)

def update_assignment(self, device: Port, required: AssignmentMode):
def update_assignment(
self, device: VirtualDevice, required: AssignmentMode
):
"""
Update assignment of already attached device.

Expand All @@ -279,7 +281,7 @@ def update_assignment(self, device: Port, required: AssignmentMode):
self._vm.qubesd_call(
None,
"admin.vm.device.{}.Set.assignment".format(self._class),
repr(device),
device.repr_for_qarg,
required.value.encode("utf-8"),
)
self._assignment_cache = None
Expand Down
7 changes: 7 additions & 0 deletions qubesadmin/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

# pylint: disable=missing-docstring

import string
import subprocess
import traceback
import unittest
Expand All @@ -29,6 +30,7 @@
import qubesadmin
import qubesadmin.app

QREXEC_ALLOWED_CHARS = string.ascii_letters + string.digits + "_-+."

class TestVM(object):
def __init__(self, name, **kwargs):
Expand Down Expand Up @@ -162,6 +164,9 @@ def __init__(self):

def qubesd_call(self, dest, method, arg=None, payload=None,
payload_stream=None):
if arg:
assert all(c in QREXEC_ALLOWED_CHARS for c in arg), \
f"forbidden char in service arg '{arg}"
if payload_stream:
payload = (payload or b'') + payload_stream.read()
call_key = (dest, method, arg, payload)
Expand All @@ -178,6 +183,8 @@ def qubesd_call(self, dest, method, arg=None, payload=None,

def run_service(self, dest, service, **kwargs):
# pylint: disable=arguments-differ
assert all(c in QREXEC_ALLOWED_CHARS for c in service), \
f"forbidden char in service '{service}"
self.service_calls.append((dest, service, kwargs))
call_key = (dest, service)
# TODO: consider it as a future extension, as a replacement for
Expand Down
8 changes: 4 additions & 4 deletions qubesadmin/tests/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,13 +751,13 @@ def test_043_clone_devices(self):
b"backend_domain='test-vm3' mode='required'\n")

self.app.expected_calls[
('new-name', 'admin.vm.device.pci.Assign', 'test-vm2+dev1:*',
('new-name', 'admin.vm.device.pci.Assign', 'test-vm2+dev1+_',
b"device_id='*' port_id='dev1' "
b"devclass='pci' backend_domain='test-vm2' mode='required' "
b"frontend_domain='new-name' _ro='yes'")] = b'0\0'

self.app.expected_calls[
('new-name', 'admin.vm.device.pci.Assign', 'test-vm3+dev2:*',
('new-name', 'admin.vm.device.pci.Assign', 'test-vm3+dev2+_',
b"device_id='*' port_id='dev2' "
b"devclass='pci' backend_domain='test-vm3' mode='required' "
b"frontend_domain='new-name'")] = b'0\0'
Expand Down Expand Up @@ -798,13 +798,13 @@ def test_044_clone_devices_fail(self):
b"backend_domain='test-vm3' mode='required'\n")

self.app.expected_calls[
('new-name', 'admin.vm.device.pci.Assign', 'test-vm2+dev1:*',
('new-name', 'admin.vm.device.pci.Assign', 'test-vm2+dev1+_',
b"device_id='*' port_id='dev1' "
b"devclass='pci' backend_domain='test-vm2' mode='required' "
b"frontend_domain='new-name' _ro='yes'")] = b'0\0'

self.app.expected_calls[
('new-name', 'admin.vm.device.pci.Assign', 'test-vm3+dev2:*',
('new-name', 'admin.vm.device.pci.Assign', 'test-vm3+dev2+_',
b"device_id='*' port_id='dev2' "
b"devclass='pci' backend_domain='test-vm3' mode='required' "
b"frontend_domain='new-name'")] = \
Expand Down
2 changes: 1 addition & 1 deletion qubesadmin/tests/backup/backupcompatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,7 @@ def setup_expected_calls(self, parsed_qubes_xml, templates_map=None):
f"frontend_domain='{name}'".encode())
self.app.expected_calls[
(name, 'admin.vm.device.{}.Assign'.format(bus),
'{}+{}:*'.format(backend_domain, port_id),
'{}+{}+_'.format(backend_domain, port_id),
encoded_options)] = b'0\0'

for feature, value in vm['features'].items():
Expand Down
16 changes: 8 additions & 8 deletions qubesadmin/tests/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def test_011_getitem_missing(self):

def test_020_attach(self):
self.app.expected_calls[
('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1:*',
('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1+_',
b"device_id='*' port_id='dev1' devclass='test' "
b"backend_domain='test-vm2' mode='manual' "
b"frontend_domain='test-vm'")] = \
Expand All @@ -129,7 +129,7 @@ def test_020_attach(self):

def test_021_attach_options(self):
self.app.expected_calls[
('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1:*',
('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1+_',
b"device_id='*' port_id='dev1' devclass='test' "
b"backend_domain='test-vm2' mode='manual' "
b"frontend_domain='test-vm' _ro='True' "
Expand All @@ -143,7 +143,7 @@ def test_021_attach_options(self):

def test_022_attach_required(self):
self.app.expected_calls[
('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1:*',
('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1+_',
b"device_id='*' port_id='dev1' devclass='test' "
b"backend_domain='test-vm2' mode='required' "
b"frontend_domain='test-vm'")] = b'0\0'
Expand All @@ -155,7 +155,7 @@ def test_022_attach_required(self):

def test_023_attach_required_options(self):
self.app.expected_calls[
('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1:*',
('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1+_',
b"device_id='*' port_id='dev1' devclass='test' "
b"backend_domain='test-vm2' mode='required' "
b"frontend_domain='test-vm' _ro='True'")] = b'0\0'
Expand All @@ -168,7 +168,7 @@ def test_023_attach_required_options(self):

def test_030_detach(self):
self.app.expected_calls[
('test-vm', 'admin.vm.device.test.Detach', 'test-vm2+dev1:*',
('test-vm', 'admin.vm.device.test.Detach', 'test-vm2+dev1+_',
None)] = b'0\0'
assign = DeviceAssignment.new(
self.app.domains['test-vm2'], 'dev1', devclass='test')
Expand Down Expand Up @@ -333,7 +333,7 @@ def test_060_attached(self):
def test_070_update_assignment_required(self):
self.app.expected_calls[
('test-vm', 'admin.vm.device.test.Set.assignment',
'test-vm2+dev1:*', b'required')] = b'0\0'
'test-vm2+dev1+_', b'required')] = b'0\0'
dev = DeviceAssignment.new(
self.app.domains['test-vm2'], devclass='test', port_id='dev1')
self.vm.devices['test'].update_assignment(dev, AssignmentMode.REQUIRED)
Expand All @@ -342,7 +342,7 @@ def test_070_update_assignment_required(self):
def test_071_update_assignment_ask(self):
self.app.expected_calls[
('test-vm', 'admin.vm.device.test.Set.assignment',
'test-vm2+dev1:*', b'ask-to-attach')] = b'0\0'
'test-vm2+dev1+_', b'ask-to-attach')] = b'0\0'
dev = DeviceAssignment.new(
self.app.domains['test-vm2'], devclass='test', port_id='dev1')
self.vm.devices['test'].update_assignment(dev, AssignmentMode.ASK)
Expand All @@ -351,7 +351,7 @@ def test_071_update_assignment_ask(self):
def test_072_update_assignment_auto(self):
self.app.expected_calls[
('test-vm', 'admin.vm.device.test.Set.assignment',
'test-vm2+dev1:*', b'auto-attach')] = b'0\0'
'test-vm2+dev1+_', b'auto-attach')] = b'0\0'
dev = DeviceAssignment.new(
self.app.domains['test-vm2'], devclass='test', port_id='dev1')
self.vm.devices['test'].update_assignment(dev, AssignmentMode.AUTO)
Expand Down
Loading