Files
FTL/test/api/test_api.py
DL6ER 20164bff26 tests: resolve iCloud Private Relay zones from local pdns
The API query-count assertions depend on resolving mask.icloud.com, whose
mask.icloud.com -> mask.apple-dns.net CNAME chain was recursed to the public
internet. dnsmasq fires extra DNSKEY validation queries depending on whether
Apple currently DNSSEC-signs icloud.com / apple-dns.net, and Apple toggles this
over time. The runtime DS-probing workaround in conftest.py could not reliably
model dnsmasq's behaviour (e.g. when Apple returns SERVFAIL on DS), so the suite
went flaky again.

Serve the icloud.com and apple-dns.net zones from the local authoritative
PowerDNS server instead, so the chain resolves hermetically and the query counts
are deterministic regardless of Apple's upstream DNSSEC posture. The DS-probing
fixture is dropped and the expected counters become fixed constants again.

Signed-off-by: DL6ER <dl6er@dl6er.de>
2026-06-24 06:25:39 +00:00

1148 lines
46 KiB
Python

"""
Pi-hole FTL API integration tests — stats, lists, search, history,
config validation (API-side), HTTP errors, and Lua server pages.
These tests replace the equivalent curl-based BATS tests with native
Python assertions against a live FTL instance.
Usage:
pytest test/api/test_api.py -v
"""
import json
import pytest
FTL_URL = "http://127.0.0.1"
# ---------------------------------------------------------------------------
# Expected query counters
# ---------------------------------------------------------------------------
# The bats test suite queries mask.icloud.com via an allowlisted client, which
# forwards the query upstream. The mask.icloud.com -> mask.apple-dns.net CNAME
# chain is served by the local (unsigned) PowerDNS authoritative server (see
# test/pdns/setup.sh and recursor.conf) instead of being recursed to the public
# internet. This keeps the resolution hermetic and the query counts below
# deterministic — previously they drifted by +2 whenever Apple toggled DNSSEC
# signing on these zones, which made the suite flaky (see issue #2908 / PR
# #2845). If you add or remove queries in test_suite.bats, update these.
TOTAL = 137
FORWARDED = 47
DNSKEY = 9
TOP_DOMAIN = "."
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _j(response, dump=None):
"""Return parsed JSON, stripping the volatile ``took`` field.
If *dump* is given, write the full response to
``/tmp/ftl_test_<dump>.json`` (best-effort, ignored on failure)
so the expected values can be inspected after a test run.
"""
data = response.json()
data.pop("took", None)
if dump:
try:
with open(f"/tmp/ftl_test_{dump}.json", "w") as f:
json.dump(data, f, indent=2)
except OSError:
pass
return data
def set_config(api_session, dotted_key, value):
"""Set a FTL config item via the API.
Builds the nested JSON payload from a dotted key, e.g.
``set_config(s, "webserver.serve_all", True)`` sends
``PATCH /api/config/webserver/serve_all``
with ``{"config": {"webserver": {"serve_all": true}}}``.
"""
parts = dotted_key.split(".")
api_path = f"{FTL_URL}/api/config/" + "/".join(parts)
payload = value
for part in reversed(parts):
payload = {part: payload}
payload = {"config": payload}
r = api_session.patch(api_path, json=payload, timeout=20)
assert r.status_code == 200, \
f"Failed to set {dotted_key}: {r.status_code} {r.text}"
return r
# ---------------------------------------------------------------------------
# HTTP error responses
# ---------------------------------------------------------------------------
class TestHTTPErrors:
def test_api_404_returns_json(self, api_session):
"""HTTP server responds with JSON error 404 to unknown API path."""
data = _j(api_session.get(f"{FTL_URL}/api/undefined", timeout=5))
assert data["error"] == {
"key": "not_found",
"message": "Not found",
"hint": "/api/undefined",
}, json.dumps(data, indent=2)
def test_non_admin_path_returns_404(self, api_session):
"""HTTP server responds with 404 to path outside /admin."""
r = api_session.head(f"{FTL_URL}/undefined", timeout=5)
assert r.status_code == 404
# ---------------------------------------------------------------------------
# Config validation via API (type-based)
# ---------------------------------------------------------------------------
class TestConfigValidationAPIType:
def test_blockESNI_rejects_float(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"dns": {"blockESNI": 15.5}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item is invalid",
"hint": "dns.blockESNI: not of type bool",
}, json.dumps(data, indent=2)
def test_piholePTR_rejects_invalid_option(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"dns": {"piholePTR": "something_else"}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item is invalid",
"hint": "dns.piholePTR: invalid option",
}, json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Config validation via API (validator-based)
# ---------------------------------------------------------------------------
class TestConfigValidationAPIValidator:
def test_files_pcap_rejects_invalid_path(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"files": {"pcap": "%gh4b"}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item validation failed",
"hint": 'files.pcap: not a valid file path ("%gh4b")',
}, json.dumps(data, indent=2)
def test_cnameRecords_rejects_too_few_elements(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"dns": {"cnameRecords": ["a"]}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item validation failed",
"hint": "dns.cnameRecords[0]: not a valid CNAME definition (too few elements)",
}, json.dumps(data, indent=2)
def test_cnameRecords_rejects_empty_string_position(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"dns": {"cnameRecords": ["a,b,c", "a,b,c,,c"]}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item validation failed",
"hint": "dns.cnameRecords[1]: contains an empty string at position 3",
}, json.dumps(data, indent=2)
def test_cnameRecords_rejects_non_string_element(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"dns": {"cnameRecords": ["a,b,c", "a,b,c", 5]}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item is invalid",
"hint": "dns.cnameRecords: array has invalid elements",
}, json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Envvar-protected config: cannot change via API
# ---------------------------------------------------------------------------
class TestEnvvarProtectedConfig:
def test_api_rejects_envvar_override(self, api_session):
"""API cannot change misc.nice when set via FTLCONF_misc_nice."""
data = _j(api_session.patch(f"{FTL_URL}/api/config/misc/nice",
json={"config": {"misc": {"nice": -12}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config items set via environment variables cannot be changed via the API",
"hint": "misc.nice",
}, json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Domain search
# ---------------------------------------------------------------------------
class TestDomainSearch:
def test_nonexistent_domain(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/search/non.existent", timeout=5))
search = data["search"]
assert search["domains"] == []
assert search["gravity"] == []
assert search["results"] == {
"domains": {"exact": 0, "regex": 0},
"gravity": {"allow": 0, "block": 0},
"total": 0,
}, json.dumps(data, indent=2)
assert search["parameters"] == {
"N": 20,
"partial": False,
"domain": "non.existent",
"debug": False,
}, json.dumps(data, indent=2)
def test_antigravity_domain(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/search/antigravity.ftl", timeout=5))
search = data["search"]
assert search["results"] == {
"domains": {"exact": 0, "regex": 0},
"gravity": {"allow": 2, "block": 1},
"total": 3,
}, json.dumps(data, indent=2)
assert search["domains"] == []
gravity = search["gravity"]
assert len(gravity) == 3, json.dumps(gravity, indent=2)
# Block list match
g0 = gravity[0]
assert g0["domain"] == "antigravity.ftl"
assert g0["type"] == "block"
assert g0["address"] == "https://pi-hole.net/block.txt"
assert g0["comment"] == "Fake block-list"
assert g0["enabled"] is True
assert g0["id"] == 1
assert g0["number"] == 2000
assert g0["invalid_domains"] == 2
assert g0["groups"] == [0, 2]
# Allow list match (exact domain)
g1 = gravity[1]
assert g1["domain"] == "antigravity.ftl"
assert g1["type"] == "allow"
assert g1["address"] == "https://pi-hole.net/allow.txt"
assert g1["comment"] == "Fake allow-list"
assert g1["id"] == 2
assert g1["groups"] == [0]
# Allow list match (ABP-style antigravity entry)
g2 = gravity[2]
assert g2["domain"] == "@@||antigravity.ftl^"
assert g2["type"] == "allow"
assert g2["id"] == 2
def test_punycode_normalization(self, api_session):
"""Internationalized domain names should be normalized to punycode."""
data = _j(api_session.get(f"{FTL_URL}/api/search/\u00e4BC.com",
params={"debug": "true"}, timeout=5))
assert data["search"]["debug"]["punycode"] == "xn--bc-uia.com", \
json.dumps(data, indent=2)
assert data["search"]["results"]["total"] == 0
def test_punycode_domain_accepted(self, api_session):
"""Punycode domains (e.g. emoji IDNs) must not be rejected by the API.
Regression test for https://github.com/pi-hole/FTL/issues/2837
libidn2 rejects punycode for characters not in IDNA2008 (e.g. emoji),
but the ASCII punycode form is a perfectly valid DNS name.
xn--4ca0bs45142c.com is the punycode encoding of äöü😀.com.
"""
data = _j(api_session.get(f"{FTL_URL}/api/search/xn--4ca0bs45142c.com",
params={"debug": "true"}, timeout=5))
assert data["search"]["debug"]["punycode"] == "xn--4ca0bs45142c.com", \
json.dumps(data, indent=2)
# The domain does not exist in gravity, so total should be 0
assert data["search"]["results"]["total"] == 0
def test_partial_matching(self, api_session):
"""Partial matching returns substring hits in gravity."""
data = _j(api_session.get(f"{FTL_URL}/api/search/gravity",
params={"partial": "true"}, timeout=5))
search = data["search"]
assert search["parameters"]["partial"] is True
assert search["results"]["total"] > 0, \
f"Expected partial matches for 'gravity':\n{json.dumps(data, indent=2)}"
# ---------------------------------------------------------------------------
# History
# ---------------------------------------------------------------------------
class TestHistory:
def test_history_returns_24h(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/history", timeout=5))
assert len(data["history"]) == 145, \
f"Expected 145 history entries (24h in 10-min slots), got {len(data['history'])}"
# Verify each slot has the expected structure
slot = data["history"][0]
for key in ("timestamp", "total", "cached", "blocked", "forwarded"):
assert key in slot, f"Missing key '{key}' in history slot: {slot}"
def test_history_clients_returns_24h(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/history/clients", timeout=5))
assert len(data["history"]) == 145, \
f"Expected 145 history entries, got {len(data['history'])}"
assert "clients" in data, f"Missing 'clients' key:\n{json.dumps(data, indent=2)}"
# ---------------------------------------------------------------------------
# Lists
# ---------------------------------------------------------------------------
class TestLists:
def test_block_lists_only(self, api_session):
lists = _j(api_session.get(f"{FTL_URL}/api/lists?type=block", timeout=5))["lists"]
assert len(lists) == 1, f"Expected 1 block list:\n{json.dumps(lists, indent=2)}"
bl = lists[0]
assert bl["type"] == "block"
assert bl["address"] == "https://pi-hole.net/block.txt"
assert bl["comment"] == "Fake block-list"
assert bl["enabled"] is True
assert bl["id"] == 1
assert bl["number"] == 2000
assert bl["invalid_domains"] == 2
assert bl["abp_entries"] == 0
assert bl["status"] == 1
assert bl["groups"] == [0, 2]
def test_allow_lists_only(self, api_session):
lists = _j(api_session.get(f"{FTL_URL}/api/lists?type=allow", timeout=5))["lists"]
assert len(lists) == 1, f"Expected 1 allow list:\n{json.dumps(lists, indent=2)}"
al = lists[0]
assert al["type"] == "allow"
assert al["address"] == "https://pi-hole.net/allow.txt"
assert al["comment"] == "Fake allow-list"
assert al["enabled"] is True
assert al["id"] == 2
assert al["number"] == 2000
assert al["invalid_domains"] == 2
assert al["abp_entries"] == 0
assert al["status"] == 1
assert al["groups"] == [0]
def test_all_lists_includes_both_types(self, api_session):
lists = _j(api_session.get(f"{FTL_URL}/api/lists", timeout=5))["lists"]
assert len(lists) == 2, f"Expected 2 lists:\n{json.dumps(lists, indent=2)}"
types = {lst["type"] for lst in lists}
assert types == {"block", "allow"}
# ---------------------------------------------------------------------------
# Queries
# ---------------------------------------------------------------------------
class TestQueries:
def test_no_unknown_reply(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/queries?reply=UNKNOWN", timeout=5))
assert data["queries"] == []
assert data["recordsFiltered"] == 0, json.dumps(data, indent=2)
def test_no_unknown_status(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/queries?status=UNKNOWN", timeout=5))
assert data["queries"] == []
assert data["recordsFiltered"] == 0, json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Lua server pages
# ---------------------------------------------------------------------------
class TestLuaServerPages:
def test_lua_page_outside_admin_not_served_by_default(self, api_session):
"""Lua server page outside /admin is not served when serve_all is off."""
set_config(api_session, "webserver.serve_all", False)
r = api_session.head(f"{FTL_URL}/broken_lua", timeout=5)
assert r.status_code == 404
def test_lua_page_generates_proper_backtrace(self, api_session):
"""Lua server page generates proper backtrace on error."""
set_config(api_session, "webserver.serve_all", True)
r = api_session.get(f"{FTL_URL}/broken_lua", timeout=5)
lines = r.text.splitlines()
assert lines[0] == "Hello, world 1!", f"Unexpected response:\n{r.text}"
assert lines[1] == "Hello, world 2!"
assert 'Cannot include [/var/www/html/does_not_exist.lp]: not found' in lines[2]
assert lines[3] == "stack traceback:"
def test_lua_page_outside_webhome_served_without_login(self, api_session):
"""After serve_all is enabled, Lua pages are served without login."""
r = api_session.get(f"{FTL_URL}/broken_lua", timeout=5)
lines = r.text.splitlines()
assert lines[0] == "Hello, world 1!", f"Unexpected response:\n{r.text}"
# ---------------------------------------------------------------------------
# DNS blocking status
# ---------------------------------------------------------------------------
class TestDNSBlocking:
def test_blocking_enabled(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/dns/blocking", timeout=5))
assert data["blocking"] == "enabled"
assert data["timer"] is None
# ---------------------------------------------------------------------------
# Domains
# ---------------------------------------------------------------------------
class TestDomains:
def test_allow_exact(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains/allow/exact", timeout=5))
domains = data["domains"]
names = [d["domain"] for d in domains]
assert "allowed.ftl" in names, json.dumps(domains, indent=2)
assert "regex1.ftl" in names
assert "mask.icloud.com" in names
for d in domains:
assert d["type"] == "allow"
assert d["kind"] == "exact"
def test_allow_regex(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains/allow/regex", timeout=5))
domains = data["domains"]
assert len(domains) == 2, json.dumps(domains, indent=2)
assert domains[0] == {
"domain": "regex2", "unicode": "regex2",
"type": "allow", "kind": "regex", "comment": "",
"groups": [0], "enabled": True,
"id": 3, "date_added": 1559928803, "date_modified": 1559928803,
}
assert domains[1]["domain"] == "^gravity-allowed"
assert domains[1]["id"] == 4
def test_deny_exact(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains/deny/exact", timeout=5))
domains = data["domains"]
names = [d["domain"] for d in domains]
assert "denied.ftl" in names, json.dumps(domains, indent=2)
assert "blacklisted-group-disabled.com" in names
for d in domains:
assert d["type"] == "deny"
assert d["kind"] == "exact"
def test_deny_regex(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains/deny/regex", timeout=5))
domains = data["domains"]
assert len(domains) == 11, \
f"Expected 11 deny regex, got {len(domains)}:\n{json.dumps(domains, indent=2)}"
assert domains[0]["domain"] == "regex[0-9].ftl"
assert domains[0]["id"] == 6
assert domains[0]["groups"] == [0, 2]
for d in domains:
assert d["type"] == "deny"
assert d["kind"] == "regex"
def test_all_domains(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains", timeout=5))
domains = data["domains"]
types = {d["type"] for d in domains}
kinds = {d["kind"] for d in domains}
assert types == {"allow", "deny"}
assert kinds == {"exact", "regex"}
def test_single_domain_lookup(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains/deny/exact/denied.ftl", timeout=5))
domains = data["domains"]
assert len(domains) == 1, json.dumps(domains, indent=2)
assert domains[0]["domain"] == "denied.ftl"
assert domains[0]["comment"] == "Migrated from /etc/pihole/blacklist.txt"
# ---------------------------------------------------------------------------
# Groups
# ---------------------------------------------------------------------------
class TestGroups:
def test_all_groups(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/groups", timeout=5))
groups = data["groups"]
assert len(groups) == 6, json.dumps(groups, indent=2)
names = {g["name"] for g in groups}
assert "Default" in names
assert "Test group" in names
assert "Second test group" in names
default = next(g for g in groups if g["name"] == "Default")
assert default["id"] == 0
assert default["enabled"] is True
assert default["comment"] == "The default group"
disabled = next(g for g in groups if g["name"] == "Test group")
assert disabled["id"] == 1
assert disabled["enabled"] is False
def test_single_group_lookup(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/groups/Default", timeout=5))
groups = data["groups"]
assert len(groups) == 1, json.dumps(groups, indent=2)
assert groups[0]["name"] == "Default"
assert groups[0]["id"] == 0
# ---------------------------------------------------------------------------
# Stats summary
# ---------------------------------------------------------------------------
class TestStatsSummary:
def test_summary_structure(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/summary", timeout=5), dump="stats_summary")
q = data["queries"]
assert q["total"] == TOTAL, json.dumps(data, indent=2)
assert q["blocked"] == 49
assert q["forwarded"] == FORWARDED
assert q["cached"] == 41
assert q["unique_domains"] == 77
assert q["status"]["UNKNOWN"] == 0
assert q["status"]["GRAVITY"] == 7
assert q["status"]["FORWARDED"] == FORWARDED
assert q["status"]["CACHE"] == 41
assert q["status"]["REGEX"] == 21
assert q["status"]["DENYLIST"] == 4
assert q["status"]["SPECIAL_DOMAIN"] == 2
assert q["types"]["A"] == 69
assert q["types"]["AAAA"] == 19
assert data["clients"]["active"] == 11
assert data["clients"]["total"] == 11
assert data["gravity"]["domains_being_blocked"] == 8
# ---------------------------------------------------------------------------
# Stats: top domains
# ---------------------------------------------------------------------------
class TestStatsTopDomains:
def test_top_domains_sorted_descending(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_domains", timeout=5), dump="top_domains")
domains = data["domains"]
assert len(domains) > 0
counts = [d["count"] for d in domains]
assert counts == sorted(counts, reverse=True), \
f"Not sorted descending: {counts}"
assert data["total_queries"] == TOTAL
assert data["blocked_queries"] == 49
def test_top_domains_blocked(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_domains?blocked=true", timeout=5))
domains = data["domains"]
names = [d["domain"] for d in domains]
assert "gravity.ftl" in names, json.dumps(domains, indent=2)
counts = [d["count"] for d in domains]
assert counts == sorted(counts, reverse=True)
def test_top_domains_permitted_excludes_gravity(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_domains?blocked=false", timeout=5))
names = [d["domain"] for d in data["domains"]]
assert "gravity.ftl" not in names, \
f"gravity.ftl should not be in permitted domains:\n{json.dumps(data, indent=2)}"
# ---------------------------------------------------------------------------
# Stats: top clients
# ---------------------------------------------------------------------------
class TestStatsTopClients:
def test_top_clients_sorted_descending(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_clients", timeout=5), dump="top_clients")
clients = data["clients"]
assert len(clients) > 0
assert clients[0]["ip"] == "127.0.0.1"
counts = [c["count"] for c in clients]
assert counts == sorted(counts, reverse=True), \
f"Not sorted descending: {counts}"
assert data["total_queries"] == TOTAL
# ---------------------------------------------------------------------------
# Stats: upstreams
# ---------------------------------------------------------------------------
class TestStatsUpstreams:
def test_upstreams(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/upstreams", timeout=5), dump="upstreams")
upstreams = data["upstreams"]
assert len(upstreams) == 4, json.dumps(upstreams, indent=2)
assert data["total_queries"] == TOTAL
assert data["forwarded_queries"] == FORWARDED
blocklist = next(u for u in upstreams if u["ip"] == "blocklist")
assert blocklist["count"] == 49
assert blocklist["port"] == -1
cache = next(u for u in upstreams if u["ip"] == "cache")
assert cache["count"] == 41
assert cache["port"] == -1
# ---------------------------------------------------------------------------
# Stats: query types
# ---------------------------------------------------------------------------
class TestStatsQueryTypes:
def test_query_types(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/query_types", timeout=5), dump="query_types")
assert data["types"] == {
"A": 69, "AAAA": 19, "ANY": 3, "SRV": 1, "SOA": 0,
"PTR": 8, "TXT": 10, "NAPTR": 1, "MX": 1, "DS": 7,
"RRSIG": 0, "DNSKEY": DNSKEY, "NS": 0, "SVCB": 3, "HTTPS": 3,
"OTHER": 1,
}, json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Stats: recent blocked
# ---------------------------------------------------------------------------
class TestStatsRecentBlocked:
def test_recent_blocked(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/recent_blocked", timeout=5))
assert "denied.ftl" in data["blocked"], json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Stats: database endpoints (require from/until parameters)
# ---------------------------------------------------------------------------
class TestStatsDatabase:
def test_database_endpoints_require_time_range(self, api_session):
"""Database stats endpoints return 400 without from/until."""
for endpoint in ("query_types", "summary", "top_clients",
"top_domains", "upstreams"):
data = _j(api_session.get(
f"{FTL_URL}/api/stats/database/{endpoint}", timeout=5))
assert data["error"]["key"] == "bad_request", \
f"/api/stats/database/{endpoint}: {json.dumps(data, indent=2)}"
assert "from" in data["error"]["message"]
assert "until" in data["error"]["message"]
def test_database_summary_with_range(self, api_session):
data = _j(api_session.get(
f"{FTL_URL}/api/stats/database/summary?from=1&until=9999999999",
timeout=5))
for key in ("sum_queries", "sum_blocked", "percent_blocked",
"total_clients"):
assert key in data, f"Missing key '{key}' in database summary"
def test_database_top_domains_with_range(self, api_session):
data = _j(api_session.get(
f"{FTL_URL}/api/stats/database/top_domains?from=1&until=9999999999",
timeout=5))
assert "domains" in data
assert isinstance(data["domains"], list)
assert "total_queries" in data
def test_database_top_clients_with_range(self, api_session):
data = _j(api_session.get(
f"{FTL_URL}/api/stats/database/top_clients?from=1&until=9999999999",
timeout=5))
assert "clients" in data
assert isinstance(data["clients"], list)
assert "total_queries" in data
def test_database_upstreams_with_range(self, api_session):
data = _j(api_session.get(
f"{FTL_URL}/api/stats/database/upstreams?from=1&until=9999999999",
timeout=5))
assert "upstreams" in data
assert isinstance(data["upstreams"], list)
def test_database_query_types_with_range(self, api_session):
data = _j(api_session.get(
f"{FTL_URL}/api/stats/database/query_types?from=1&until=9999999999",
timeout=5))
assert "types" in data
assert isinstance(data["types"], dict)
# ---------------------------------------------------------------------------
# DHCP leases
# ---------------------------------------------------------------------------
class TestDHCPLeases:
def test_no_leases(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/dhcp/leases", timeout=5))
assert data["leases"] == []
# ---------------------------------------------------------------------------
# Endpoints listing
# ---------------------------------------------------------------------------
class TestEndpoints:
def test_endpoints_has_all_methods(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/endpoints", timeout=5))
eps = data["endpoints"]
for method in ("get", "post", "put", "patch", "delete"):
assert method in eps, f"Missing method '{method}':\n{json.dumps(eps.keys(), indent=2)}"
# GET should have the most endpoints
assert len(eps["get"]) > 20
# ---------------------------------------------------------------------------
# Info endpoints
# ---------------------------------------------------------------------------
class TestInfo:
def test_info_ftl(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/ftl", timeout=5), dump="info_ftl")
ftl = data["ftl"]
db = ftl["database"]
assert db["gravity"] == 8, json.dumps(db, indent=2)
assert db["groups"] == 5
assert db["lists"] == 2
assert db["clients"] == 5
assert db["domains"]["allowed"] == {"total": 3, "enabled": 3}
assert db["domains"]["denied"] == {"total": 2, "enabled": 2}
assert db["regex"]["allowed"] == {"total": 2, "enabled": 2}
assert db["regex"]["denied"] == {"total": 11, "enabled": 11}
assert ftl["privacy_level"] == 0
assert ftl["clients"]["total"] == 11
assert ftl["clients"]["active"] == 11
def test_info_login(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/login", timeout=5))
assert data["dns"] is True
assert data["https_port"] == 443
def test_info_version(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/version", timeout=5))
v = data["version"]
assert "ftl" in v
assert "local" in v["ftl"]
assert v["ftl"]["local"]["version"].startswith("v")
assert "hash" in v["ftl"]["local"]
def test_info_messages(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/messages", timeout=5))
assert "messages" in data
assert isinstance(data["messages"], list)
def test_info_messages_count(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/messages/count", timeout=5))
assert "count" in data
assert isinstance(data["count"], int)
def test_info_client(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/client", timeout=5))
assert data["remote_addr"] == "127.0.0.1"
assert data["http_version"] == "1.1"
assert data["method"] == "GET"
def test_info_database(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/database", timeout=5))
assert data["type"] == "Regular file"
assert data["mode"] == "rw-r-----"
assert data["owner"]["user"]["name"] == "pihole"
assert data["owner"]["group"]["name"] == "pihole"
assert data["queries"] > 0
assert data["sqlite_version"].startswith("3.")
def test_info_system(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/system", timeout=5))
s = data["system"]
assert "uptime" in s
assert s["memory"]["ram"]["total"] > 0
assert s["cpu"]["nprocs"] > 0
assert "ftl" in s
assert "%mem" in s["ftl"]
assert "%cpu" in s["ftl"]
# ---------------------------------------------------------------------------
# Auth (read-only)
# ---------------------------------------------------------------------------
class TestAuthReadOnly:
def test_totp_suggestion(self, api_session):
"""GET /api/auth/totp returns TOTP credential suggestions."""
data = _j(api_session.get(f"{FTL_URL}/api/auth/totp", timeout=5))
totp = data["totp"]
assert isinstance(totp["secret"], str)
assert len(totp["secret"]) > 0
assert totp["digits"] == 6
assert totp["period"] == 30
assert "algorithm" in totp
assert isinstance(totp["codes"], list)
# ---------------------------------------------------------------------------
# Network
# ---------------------------------------------------------------------------
class TestNetwork:
def test_network_devices(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/network/devices", timeout=5))
devices = data["devices"]
hwaddrs = [d["hwaddr"] for d in devices]
assert "aa:bb:cc:dd:ee:ff" in hwaddrs, json.dumps(hwaddrs, indent=2)
assert "ip-127.0.0.1" in hwaddrs
def test_network_interfaces(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/network/interfaces", timeout=5))
ifaces = data["interfaces"]
names = [i["name"] for i in ifaces]
assert "lo" in names, json.dumps(names, indent=2)
# ---------------------------------------------------------------------------
# Logs
# ---------------------------------------------------------------------------
class TestLogs:
def test_dnsmasq_log(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/logs/dnsmasq", timeout=5))
assert len(data["log"]) > 0
entry = data["log"][0]
assert "timestamp" in entry
assert "message" in entry
def test_ftl_log(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/logs/ftl", timeout=5))
assert len(data["log"]) > 0
entry = data["log"][0]
assert "timestamp" in entry
assert "message" in entry
assert "prio" in entry
def test_webserver_log(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/logs/webserver", timeout=5))
assert len(data["log"]) > 0
# ---------------------------------------------------------------------------
# PADD
# ---------------------------------------------------------------------------
class TestPADD:
def test_padd(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/padd", timeout=5), dump="padd")
assert data["blocking"] == "enabled"
assert data["gravity_size"] == 8
assert data["active_clients"] == 11
assert data["top_domain"] == TOP_DOMAIN
assert data["top_blocked"] == "gravity.ftl"
assert data["top_client"] == "127.0.0.1"
q = data["queries"]
assert q["total"] == TOTAL, json.dumps(data, indent=2)
assert q["blocked"] == 49
cache = data["cache"]
assert cache["size"] == 10000
# ---------------------------------------------------------------------------
# Clients
# ---------------------------------------------------------------------------
class TestClients:
def test_all_clients(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/clients", timeout=5))
clients = data["clients"]
assert len(clients) == 5, \
f"Expected 5 clients:\n{json.dumps(clients, indent=2)}"
names = [c["client"] for c in clients]
assert "127.0.0.1" in names
assert "127.0.0.2" in names
assert "aa:bb:cc:dd:ee:ff" in names
assert ":enp0s123" in names
def test_single_client_lookup(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/clients/127.0.0.1", timeout=5))
clients = data["clients"]
assert len(clients) == 1, json.dumps(clients, indent=2)
c = clients[0]
assert c["client"] == "127.0.0.1"
assert c["groups"] == [0]
def test_client_suggestions(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/clients/_suggestions", timeout=5))
assert "clients" in data
assert isinstance(data["clients"], list)
# ---------------------------------------------------------------------------
# Config (GET)
# ---------------------------------------------------------------------------
class TestConfigGet:
def test_full_config(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/config", timeout=5))
config = data["config"]
assert "dns" in config
assert "webserver" in config
assert "misc" in config
assert "debug" in config
assert "database" in config
assert config["dns"]["CNAMEdeepInspect"] is True
assert config["dns"]["blockESNI"] is True
def test_config_element(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/config/dns/upstreams", timeout=5))
config = data["config"]
upstreams = config["dns"]["upstreams"]
assert isinstance(upstreams, list)
assert len(upstreams) > 0
# ---------------------------------------------------------------------------
# Network (additional)
# ---------------------------------------------------------------------------
class TestNetworkAdditional:
def test_network_gateway(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/network/gateway", timeout=5))
assert "gateway" in data
assert isinstance(data["gateway"], list)
def test_network_routes(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/network/routes", timeout=5))
assert "routes" in data
assert isinstance(data["routes"], list)
# ---------------------------------------------------------------------------
# Info (additional)
# ---------------------------------------------------------------------------
class TestInfoAdditional:
def test_info_host(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/host", timeout=5))
host = data["host"]
uname = host["uname"]
assert "sysname" in uname
assert "nodename" in uname
assert "release" in uname
assert "machine" in uname
def test_info_sensors(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/sensors", timeout=5))
sensors = data["sensors"]
assert "list" in sensors
assert isinstance(sensors["list"], list)
assert "unit" in sensors
def test_info_metrics(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/metrics", timeout=5))
m = data["metrics"]
dns = m["dns"]
assert dns["cache"]["size"] > 0
assert "replies" in dns
assert dns["replies"]["sum"] > 0
assert "dhcp" in m
# ---------------------------------------------------------------------------
# Queries (additional)
# ---------------------------------------------------------------------------
class TestQueriesAdditional:
def test_queries_default(self, api_session):
"""Default query (no filters) returns up to 100 results."""
data = _j(api_session.get(f"{FTL_URL}/api/queries", timeout=5))
assert "queries" in data
queries = data["queries"]
assert isinstance(queries, list)
assert len(queries) > 0
assert data["recordsTotal"] == TOTAL
# Check structure of a query entry
q = queries[0]
assert "id" in q
assert "time" in q
assert "type" in q
assert "domain" in q
assert "status" in q
assert "client" in q
assert "ip" in q["client"]
assert "reply" in q
assert "type" in q["reply"]
def test_queries_with_length(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/queries?length=5", timeout=5))
assert len(data["queries"]) == 5
def test_queries_filter_by_type(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/queries?type=AAAA", timeout=5))
for q in data["queries"]:
assert q["type"] == "AAAA", \
f"Expected type AAAA, got {q['type']}"
def test_queries_filter_by_status(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/queries?status=GRAVITY", timeout=5))
assert data["recordsFiltered"] > 0
for q in data["queries"]:
assert q["status"] == "GRAVITY"
def test_queries_filter_by_domain(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/queries?domain=gravity.ftl", timeout=5))
assert data["recordsFiltered"] > 0
for q in data["queries"]:
assert q["domain"] == "gravity.ftl"
def test_queries_filter_by_client_ip(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/queries?client_ip=127.0.0.1", timeout=5))
assert data["recordsFiltered"] > 0
for q in data["queries"]:
assert q["client"]["ip"] == "127.0.0.1"
def test_queries_filter_by_upstream_blocklist(self, api_session):
"""upstream=blocklist is a pseudo-upstream that matches all blocked queries."""
data = _j(api_session.get(f"{FTL_URL}/api/queries?upstream=blocklist", timeout=5))
assert data["recordsFiltered"] > 0
blocked_statuses = {"GRAVITY", "REGEX", "DENYLIST", "SPECIAL_DOMAIN",
"GRAVITY_CNAME", "REGEX_CNAME", "DENYLIST_CNAME",
"EXTERNAL_BLOCKED_IP", "EXTERNAL_BLOCKED_NULL",
"EXTERNAL_BLOCKED_NXRA", "EXTERNAL_BLOCKED_EDE15",
"DBBUSY"}
for q in data["queries"]:
assert q["status"] in blocked_statuses, \
f"Expected blocked status, got {q['status']}"
def test_queries_filter_by_upstream_address(self, api_session):
"""Filtering by an actual upstream address."""
data = _j(api_session.get(
f"{FTL_URL}/api/queries?upstream=127.0.0.1%235555", timeout=5))
assert data["recordsFiltered"] > 0
for q in data["queries"]:
assert q["upstream"] == "127.0.0.1#5555"
def test_queries_cursor_pagination(self, api_session):
"""Cursor + start offset returns non-overlapping pages."""
page1 = _j(api_session.get(f"{FTL_URL}/api/queries?length=5", timeout=5))
assert len(page1["queries"]) == 5
cursor = page1["cursor"]
assert isinstance(cursor, int)
# Page 2: same cursor, offset by start=5
page2 = _j(api_session.get(
f"{FTL_URL}/api/queries?length=5&cursor={cursor}&start=5", timeout=5))
assert len(page2["queries"]) == 5
ids1 = {q["id"] for q in page1["queries"]}
ids2 = {q["id"] for q in page2["queries"]}
assert ids1.isdisjoint(ids2), \
f"Pages overlap: {ids1 & ids2}"
def test_queries_suggestions(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/queries/suggestions", timeout=5))
s = data["suggestions"]
assert "domain" in s
assert "client_ip" in s
assert "type" in s
assert "status" in s
assert "reply" in s
assert isinstance(s["domain"], list)
assert len(s["domain"]) > 0
assert "127.0.0.1" in s["client_ip"]
assert "A" in s["type"]
assert "AAAA" in s["type"]
# ---------------------------------------------------------------------------
# History (additional -- database endpoints)
# ---------------------------------------------------------------------------
class TestHistoryDatabase:
def test_history_database_requires_params(self, api_session):
"""Database history endpoints return 400 without from/until."""
data = _j(api_session.get(f"{FTL_URL}/api/history/database", timeout=5))
assert data["error"]["key"] == "bad_request"
def test_history_database_with_range(self, api_session):
data = _j(api_session.get(
f"{FTL_URL}/api/history/database?from=0&until=9999999999", timeout=5))
assert "history" in data
assert isinstance(data["history"], list)
def test_history_database_clients_requires_params(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/history/database/clients", timeout=5))
assert data["error"]["key"] == "bad_request"
def test_history_database_clients_with_range(self, api_session):
data = _j(api_session.get(
f"{FTL_URL}/api/history/database/clients?from=1&until=9999999999", timeout=5))
assert "history" in data
assert "clients" in data
# ---------------------------------------------------------------------------
# NTP server (protocol-level, not HTTP)
# ---------------------------------------------------------------------------
class TestNTP:
def test_ntp_server_responds(self, api_session):
"""FTL's built-in NTP server returns a valid NTPv4 response."""
import socket
import struct
# NTP v3 client request
request = b'\x1b' + 47 * b'\0'
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(2.0)
try:
sock.sendto(request, ('127.0.0.1', 123))
data, _ = sock.recvfrom(1024)
finally:
sock.close()
assert len(data) == 48, f"Expected 48-byte NTP packet, got {len(data)}"
# LI/VN/Mode byte: mode should be 4 (server)
mode = data[0] & 0x7
version = (data[0] >> 3) & 0x7
assert mode == 4, f"Expected NTP mode 4 (server), got {mode}"
assert version == 4, f"Expected NTPv4, got v{version}"
# Transmit timestamp (bytes 40-47): seconds since 1900-01-01
# should be close to current time (within 2 seconds)
import time
ntp_epoch_offset = 2208988800 # seconds between 1900 and 1970
tx_seconds = struct.unpack('!I', data[40:44])[0]
now_ntp = int(time.time()) + ntp_epoch_offset
drift = abs(tx_seconds - now_ntp)
assert drift <= 2, \
f"NTP transmit timestamp off by {drift}s (expected ≤2s)"