Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions qubes/tests/integ/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,22 @@ def tearDown(self):

super(VmNetworkingMixin, self).tearDown()

def run_netvm_cmd(self, cmd):
try:
self.loop.run_until_complete(
self.testnetvm.run_for_stdio(cmd, user="root")
)
except subprocess.CalledProcessError as e:
self.fail(
"Command '%s' failed: %s%s"
% (cmd, e.stdout.decode(), e.stderr.decode())
)

def configure_netvm(self):
"""
:type self: qubes.tests.SystemTestCase | VMNetworkingMixin
"""

def run_netvm_cmd(cmd):
try:
self.loop.run_until_complete(
self.testnetvm.run_for_stdio(cmd, user="root")
)
except subprocess.CalledProcessError as e:
self.fail(
"Command '%s' failed: %s%s"
% (cmd, e.stdout.decode(), e.stderr.decode())
)

if not self.testnetvm.is_running():
self.loop.run_until_complete(self.testnetvm.start())
# Ensure that dnsmasq is installed:
Expand All @@ -160,28 +160,28 @@ def run_netvm_cmd(cmd):
except subprocess.CalledProcessError:
self.skipTest("dnsmasq not installed")

run_netvm_cmd("ip link add test0 type dummy")
run_netvm_cmd("ip link set test0 up")
run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip))
run_netvm_cmd(
self.run_netvm_cmd("ip link add test0 type dummy")
self.run_netvm_cmd("ip link set test0 up")
self.run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip))
self.run_netvm_cmd(
"nft add ip qubes custom-input ip daddr {} accept".format(
self.test_ip
)
)
# ignore failure
self.run_cmd(self.testnetvm, "while pkill dnsmasq; do sleep 1; done")
run_netvm_cmd(
self.run_netvm_cmd(
"dnsmasq -a {ip} -A /{name}/{ip} -i test0 -z".format(
ip=self.test_ip, name=self.test_name
)
)
run_netvm_cmd(
self.run_netvm_cmd(
"rm -f /etc/resolv.conf && echo nameserver {} > /etc/resolv.conf".format(
self.test_ip
)
)
run_netvm_cmd("systemctl try-restart systemd-resolved || :")
run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns")
self.run_netvm_cmd("systemctl try-restart systemd-resolved || :")
self.run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns")

def test_000_simple_networking(self):
"""
Expand Down
58 changes: 44 additions & 14 deletions qubes/tests/integ/network_ipv6.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,31 +69,37 @@ def configure_netvm(self):
self.testnetvm.features["ipv6"] = True
super(VmIPv6NetworkingMixin, self).configure_netvm()

def run_netvm_cmd(cmd):
try:
self.loop.run_until_complete(
self.testnetvm.run_for_stdio(cmd, user="root")
)
except subprocess.CalledProcessError as e:
self.fail(
"Command '%s' failed: %s%s"
% (cmd, e.stdout.decode(), e.stderr.decode())
)

run_netvm_cmd("ip addr add {}/128 dev test0".format(self.test_ip6))
run_netvm_cmd(
self.run_netvm_cmd("ip addr add {}/128 dev test0".format(
self.test_ip6))
self.run_netvm_cmd(
"nft add ip6 qubes custom-input ip6 daddr {} accept".format(
self.test_ip6
)
)
# ignore failure
self.run_cmd(self.testnetvm, "while pkill dnsmasq; do sleep 1; done")
run_netvm_cmd(
self.run_netvm_cmd(
"dnsmasq -a {ip} -A /{name}/{ip} -A /{name}/{ip6} -i test0 -z".format(
ip=self.test_ip, ip6=self.test_ip6, name=self.test_name
)
)

def setup_ipv6_dns(self, ipv6_only=False):
"""
Enable IPv6 DNS, needs configure_netvm to be called already
:return:
"""
self.run_netvm_cmd(
"echo nameserver {} {} /etc/resolv.conf".format(
self.test_ip6,
">" if ipv6_only else ">>"
)
)

self.run_netvm_cmd("systemctl try-restart systemd-resolved || :")
self.run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns; "
"[ $? -eq 0 -o $? -eq 100 ]")

def test_500_ipv6_simple_networking(self):
"""
:type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
Expand Down Expand Up @@ -523,6 +529,30 @@ def test_550_ipv6_spoof_ip(self):
packets = output[line].lstrip().split()[index]
self.assertEqual(packets, "0", "Some packet hit the INPUT rule")

def test_560_ipv6_dns(self):
"""DNS over IPv6/IPv4

:return:
"""
self.setup_ipv6_dns()
self.loop.run_until_complete(self.start_vm(self.testvm1))
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)

def test_560_ipv6_dns_only(self):
"""DNS over IPv6

:return:
"""
self.setup_ipv6_dns(ipv6_only=True)
self.loop.run_until_complete(self.start_vm(self.testvm1))
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)

def test_710_ipv6_custom_ip_simple(self):
"""Custom AppVM IP

Expand Down