Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ lint:
- sudo qubes-dom0-update -y ansible python3-pytest python3-coverage perl-Digest-SHA
# install from artifacts
- find $CI_PROJECT_DIR/artifacts/repository -name '*.noarch.rpm' -exec sudo dnf install -y {} \+
# install a minimal template for tests
- qvm-template install --enablerepo=*testing debian-12-minimal
script:
# run ansible's tests
- cd /usr/share/ansible && sudo coverage run --data-file=$CI_PROJECT_DIR/.coverage --include=plugins/modules/qubesos.py,plugins/connection/qubes.py -m pytest -vvv tests/qubes/
Expand Down
24 changes: 11 additions & 13 deletions plugins/connection/qubes.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
remote_user:
description:
- The user to execute as inside the qube.
choices:
- user
- root
default: user
vars:
- name: ansible_user
Expand Down Expand Up @@ -82,9 +85,7 @@ def __init__(self, play_context, new_stdin, *args, **kwargs):
else "user"
)

def _qubes(
self, cmd: str, in_data: bytes = None, shell: str = "qubes.VMShell"
):
def _qubes(self, cmd: str, in_data: bytes = None):
"""
Execute a command in the qube via qvm-run.

Expand All @@ -97,10 +98,12 @@ def _qubes(
if not cmd.endswith("\n"):
cmd += "\n"

local_cmd = ["qvm-run", "--pass-io", "--service"]
if self.user != "user":
local_cmd.extend(["-u", self.user])
local_cmd.extend([self._remote_vmname, shell])
local_cmd = ["qvm-run", "--pass-io", "--service", self._remote_vmname]
# The Ansible module framework catches invalid remote_user values
if self.user == "root":
local_cmd.append("qubes.VMRootShell")
else:
local_cmd.append("qubes.VMShell")
local_cmd_bytes = [
to_bytes(arg, errors="surrogate_or_strict") for arg in local_cmd
]
Expand Down Expand Up @@ -158,12 +161,7 @@ def put_file(self, in_path, out_path):
with open(in_path, "rb") as fobj:
source_data = fobj.read()

# Try using VMRootShell first; fallback to VMShell if needed.
retcode, _, _ = self._qubes(
f'cat > "{out_path}"\n', source_data, shell="qubes.VMRootShell"
)
if retcode == 127:
retcode, _, _ = self._qubes(f'cat > "{out_path}"\n', source_data)
retcode, _, _ = self._qubes(f'cat > "{out_path}"\n', source_data)
if retcode != 0:
raise RuntimeError(f"Failed to put_file to {out_path}")

Expand Down
2 changes: 2 additions & 0 deletions qubes-ansible.spec.in
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ install -m 644 plugins/modules/qubesos.py %{buildroot}%{_datadir}/ansible/plugin
install -m 644 plugins/connection/qubes.py %{buildroot}%{_datadir}/ansible/plugins/connection/qubes.py

install -m 644 tests/qubes/*.py %{buildroot}%{_datadir}/ansible/tests/qubes/
install -m 644 tests/ansible.cfg %{buildroot}%{_datadir}/ansible/tests/qubes/

%files
%doc README.md LICENSE EXAMPLES.md
Expand All @@ -49,6 +50,7 @@ install -m 644 tests/qubes/*.py %{buildroot}%{_datadir}/ansible/tests/qubes/

%files tests
%{_datadir}/ansible/tests/qubes/*.py
%{_datadir}/ansible/tests/qubes/ansible.cfg

%changelog
@CHANGELOG@
2 changes: 2 additions & 0 deletions tests/ansible.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[defaults]
stdout_callback = json
11 changes: 11 additions & 0 deletions tests/qubes/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ def vm(qubes, request):
return qubes.domains[vmname]


@pytest.fixture(scope="function")
def minimalvm(qubes, request):
vmname = f"test-minimalvm-{uuid.uuid4().hex[:8]}"
props = {"template": "debian-12-minimal"}
core(Module({"state": "present", "name": vmname, "properties": props}))
request.node.mark_vm_created(vmname)

qubes.domains.refresh_cache(force=True)
return qubes.domains[vmname]


@pytest.fixture(scope="function")
def netvm(qubes, request):
vmname = f"test-netvm-{uuid.uuid4().hex[:8]}"
Expand Down
219 changes: 213 additions & 6 deletions tests/qubes/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import subprocess
import uuid
import json
from typing import List

import pytest
Expand All @@ -15,7 +16,7 @@ def run_playbook(tmp_path):
Helper to write a playbook and execute it with ansible-playbook.
"""

def _run(playbook_content: List[dict]):
def _run(playbook_content: List[dict], vms: List[str] = []):
# Create playbook file
pb_file = tmp_path / "playbook.yml"
import yaml
Expand All @@ -24,17 +25,20 @@ def _run(playbook_content: List[dict]):
# Run ansible-playbook
cmd = [
"ansible-playbook",
"-vvv",
"-i",
"localhost,",
f"localhost,{','.join(vms)}",
"-c",
"local",
"-M",
str(PLUGIN_PATH),
str(pb_file),
]
result = subprocess.run(
cmd, cwd=tmp_path, capture_output=True, text=True
cmd,
cwd=tmp_path,
capture_output=True,
text=True,
env={"ANSIBLE_CONFIG": Path(__file__).parent / "ansible.cfg"},
)
return result

Expand Down Expand Up @@ -112,8 +116,14 @@ def test_properties_and_tags_playbook(run_playbook, request):
assert result.returncode == 0, result.stderr

# Ensure properties and tags were applied
assert "changed=" in result.stdout
assert "tag1" in result.stdout and "tag2" in result.stdout
run_output = json.loads(result.stdout)
assert run_output["plays"][0]["tasks"][1]["hosts"]["localhost"][
"changed"
], result.stdout
# Tags don't appear in qubes status output
# assert (
# "tag1" in run_output["plays"][0]["tasks"][2]["hosts"]["localhost"]["status"]
# ), result.stdout


def test_inventory_playbook(run_playbook, tmp_path, qubes):
Expand Down Expand Up @@ -144,3 +154,200 @@ def test_inventory_playbook(run_playbook, tmp_path, qubes):
for vm in qubes.domains.values():
if vm.name != "dom0" and vm.klass == "AppVM":
assert vm.name in content


def test_vm_connection(vm, run_playbook):
play_attrs = {
"hosts": vm.name,
"gather_facts": False,
"connection": "qubes",
}

default_user_playbook = [
{
**play_attrs,
"tasks": [
{
"name": "Default VM user is 'user'",
"ansible.builtin.command": "whoami",
"register": "default_result",
"failed_when": "default_result.stdout != 'user'",
},
],
},
]

default_user_result = run_playbook(default_user_playbook, vms=[vm.name])
assert default_user_result.returncode == 0, default_user_result.stdout

connect_user_playbook = [
{
**play_attrs,
"remote_user": "user",
"tasks": [
{
"name": "VM user with 'remote_user: user' is 'user'",
"ansible.builtin.command": "whoami",
"register": "user_result",
"failed_when": "user_result.stdout != 'user'",
},
],
},
]

connect_user_result = run_playbook(connect_user_playbook, vms=[vm.name])
assert connect_user_result.returncode == 0, connect_user_result.stdout

connect_root_playbook = [
{
**play_attrs,
"remote_user": "root",
"tasks": [
{
"name": "VM user with 'remote_user: root' is 'root'",
"ansible.builtin.command": "whoami",
"register": "root_result",
"failed_when": "root_result.stdout != 'root'",
},
],
},
]

connect_root_result = run_playbook(connect_root_playbook, vms=[vm.name])
assert connect_root_result.returncode == 0, connect_root_result.stdout

become_playbook = [
{
**play_attrs,
"become": True,
"tasks": [
{
"name": "VM user with 'become: true' is 'root'",
"ansible.builtin.command": "whoami",
"register": "become_result",
"failed_when": "become_result.stdout != 'root'",
},
],
},
]

become_result = run_playbook(become_playbook, vms=[vm.name])
assert become_result.returncode == 0, become_result.returncode

invalid_user = "somebody"
invalid_user_playbook = [
{
**play_attrs,
"remote_user": invalid_user,
"tasks": [
{
"name": "No-op",
"ansible.builtin.command": "true",
},
],
},
]

invalid_user_result = run_playbook(invalid_user_playbook, vms=[vm.name])
assert invalid_user_result.returncode == 2, invalid_user_result.stdout

invalid_user_output = json.loads(invalid_user_result.stdout)
assert (
invalid_user_output["plays"][0]["tasks"][0]["hosts"][vm.name]["msg"]
== f'Invalid value "{invalid_user}" for configuration option "plugin_type: connection plugin: qubes setting: remote_user ", valid values are: user, root'
), invalid_user_result.stdout


def test_minimalvm_connection(minimalvm, run_playbook):
play_attrs = {
"hosts": minimalvm.name,
"gather_facts": False,
"connection": "qubes",
}

default_user_playbook = [
{
**play_attrs,
"tasks": [
{
"name": "Default minimal VM user is 'user'",
"ansible.builtin.command": "whoami",
"register": "default_result",
"failed_when": "default_result.stdout != 'user'",
},
],
},
]

default_user_result = run_playbook(
default_user_playbook, vms=[minimalvm.name]
)
assert default_user_result.returncode == 0, default_user_result.stdout

connect_user_playbook = [
{
**play_attrs,
"remote_user": "user",
"tasks": [
{
"name": "Minimal VM user with 'remote_user: user' is 'user'",
"ansible.builtin.command": "whoami",
"register": "user_result",
"failed_when": "user_result.stdout != 'user'",
},
],
},
]

connect_user_result = run_playbook(
connect_user_playbook, vms=[minimalvm.name]
)
assert connect_user_result.returncode == 0, connect_user_result.stdout

connect_root_playbook = [
{
**play_attrs,
"remote_user": "root",
"tasks": [
{
"name": "Minimal VM user with 'remote_user: root' is 'root'",
"ansible.builtin.command": "whoami",
"register": "root_result",
"failed_when": "root_result.stdout != 'root'",
},
],
},
]

connect_root_result = run_playbook(
connect_root_playbook, vms=[minimalvm.name]
)
assert connect_root_result.returncode == 0, connect_root_result.stdout

become_playbook = [
{
**play_attrs,
"become": True,
"tasks": [
{
"name": "No-op",
"ansible.builtin.command": "true",
},
],
},
]

become_result = run_playbook(become_playbook, vms=[minimalvm.name])
# Playbook should fail because "become" isn't possibile on unmodified minimal vms.
assert become_result.returncode == 2, become_result.stdout

become_output = json.loads(become_result.stdout)
become_module_result = become_output["plays"][0]["tasks"][0]["hosts"][
minimalvm.name
]
assert become_module_result["failed"], become_result.stdout
assert become_module_result["rc"] == 1, become_result.stdout
assert (
become_module_result["module_stderr"].rstrip()
== "sudo: a password is required"
), become_result.stdout