mirror of
https://github.com/pi-hole/FTL.git
synced 2026-07-01 08:05:44 +01:00
20164bff26
The API query-count assertions depend on resolving mask.icloud.com, whose mask.icloud.com -> mask.apple-dns.net CNAME chain was recursed to the public internet. dnsmasq fires extra DNSKEY validation queries depending on whether Apple currently DNSSEC-signs icloud.com / apple-dns.net, and Apple toggles this over time. The runtime DS-probing workaround in conftest.py could not reliably model dnsmasq's behaviour (e.g. when Apple returns SERVFAIL on DS), so the suite went flaky again. Serve the icloud.com and apple-dns.net zones from the local authoritative PowerDNS server instead, so the chain resolves hermetically and the query counts are deterministic regardless of Apple's upstream DNSSEC posture. The DS-probing fixture is dropped and the expected counters become fixed constants again. Signed-off-by: DL6ER <dl6er@dl6er.de>
1148 lines
46 KiB
Python
1148 lines
46 KiB
Python
"""
|
|
Pi-hole FTL API integration tests — stats, lists, search, history,
|
|
config validation (API-side), HTTP errors, and Lua server pages.
|
|
|
|
These tests replace the equivalent curl-based BATS tests with native
|
|
Python assertions against a live FTL instance.
|
|
|
|
Usage:
|
|
pytest test/api/test_api.py -v
|
|
"""
|
|
|
|
import json
|
|
|
|
import pytest
|
|
|
|
FTL_URL = "http://127.0.0.1"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Expected query counters
|
|
# ---------------------------------------------------------------------------
|
|
# The bats test suite queries mask.icloud.com via an allowlisted client, which
|
|
# forwards the query upstream. The mask.icloud.com -> mask.apple-dns.net CNAME
|
|
# chain is served by the local (unsigned) PowerDNS authoritative server (see
|
|
# test/pdns/setup.sh and recursor.conf) instead of being recursed to the public
|
|
# internet. This keeps the resolution hermetic and the query counts below
|
|
# deterministic — previously they drifted by +2 whenever Apple toggled DNSSEC
|
|
# signing on these zones, which made the suite flaky (see issue #2908 / PR
|
|
# #2845). If you add or remove queries in test_suite.bats, update these.
|
|
|
|
TOTAL = 137
|
|
FORWARDED = 47
|
|
DNSKEY = 9
|
|
TOP_DOMAIN = "."
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _j(response, dump=None):
|
|
"""Return parsed JSON, stripping the volatile ``took`` field.
|
|
|
|
If *dump* is given, write the full response to
|
|
``/tmp/ftl_test_<dump>.json`` (best-effort, ignored on failure)
|
|
so the expected values can be inspected after a test run.
|
|
"""
|
|
data = response.json()
|
|
data.pop("took", None)
|
|
if dump:
|
|
try:
|
|
with open(f"/tmp/ftl_test_{dump}.json", "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
except OSError:
|
|
pass
|
|
return data
|
|
|
|
|
|
def set_config(api_session, dotted_key, value):
|
|
"""Set a FTL config item via the API.
|
|
|
|
Builds the nested JSON payload from a dotted key, e.g.
|
|
``set_config(s, "webserver.serve_all", True)`` sends
|
|
``PATCH /api/config/webserver/serve_all``
|
|
with ``{"config": {"webserver": {"serve_all": true}}}``.
|
|
"""
|
|
parts = dotted_key.split(".")
|
|
api_path = f"{FTL_URL}/api/config/" + "/".join(parts)
|
|
|
|
payload = value
|
|
for part in reversed(parts):
|
|
payload = {part: payload}
|
|
payload = {"config": payload}
|
|
|
|
r = api_session.patch(api_path, json=payload, timeout=20)
|
|
assert r.status_code == 200, \
|
|
f"Failed to set {dotted_key}: {r.status_code} {r.text}"
|
|
return r
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP error responses
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHTTPErrors:
|
|
|
|
def test_api_404_returns_json(self, api_session):
|
|
"""HTTP server responds with JSON error 404 to unknown API path."""
|
|
data = _j(api_session.get(f"{FTL_URL}/api/undefined", timeout=5))
|
|
assert data["error"] == {
|
|
"key": "not_found",
|
|
"message": "Not found",
|
|
"hint": "/api/undefined",
|
|
}, json.dumps(data, indent=2)
|
|
|
|
def test_non_admin_path_returns_404(self, api_session):
|
|
"""HTTP server responds with 404 to path outside /admin."""
|
|
r = api_session.head(f"{FTL_URL}/undefined", timeout=5)
|
|
assert r.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config validation via API (type-based)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestConfigValidationAPIType:
|
|
|
|
def test_blockESNI_rejects_float(self, api_session):
|
|
data = _j(api_session.patch(f"{FTL_URL}/api/config",
|
|
json={"config": {"dns": {"blockESNI": 15.5}}}, timeout=20))
|
|
assert data["error"] == {
|
|
"key": "bad_request",
|
|
"message": "Config item is invalid",
|
|
"hint": "dns.blockESNI: not of type bool",
|
|
}, json.dumps(data, indent=2)
|
|
|
|
def test_piholePTR_rejects_invalid_option(self, api_session):
|
|
data = _j(api_session.patch(f"{FTL_URL}/api/config",
|
|
json={"config": {"dns": {"piholePTR": "something_else"}}}, timeout=20))
|
|
assert data["error"] == {
|
|
"key": "bad_request",
|
|
"message": "Config item is invalid",
|
|
"hint": "dns.piholePTR: invalid option",
|
|
}, json.dumps(data, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config validation via API (validator-based)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestConfigValidationAPIValidator:
|
|
|
|
def test_files_pcap_rejects_invalid_path(self, api_session):
|
|
data = _j(api_session.patch(f"{FTL_URL}/api/config",
|
|
json={"config": {"files": {"pcap": "%gh4b"}}}, timeout=20))
|
|
assert data["error"] == {
|
|
"key": "bad_request",
|
|
"message": "Config item validation failed",
|
|
"hint": 'files.pcap: not a valid file path ("%gh4b")',
|
|
}, json.dumps(data, indent=2)
|
|
|
|
def test_cnameRecords_rejects_too_few_elements(self, api_session):
|
|
data = _j(api_session.patch(f"{FTL_URL}/api/config",
|
|
json={"config": {"dns": {"cnameRecords": ["a"]}}}, timeout=20))
|
|
assert data["error"] == {
|
|
"key": "bad_request",
|
|
"message": "Config item validation failed",
|
|
"hint": "dns.cnameRecords[0]: not a valid CNAME definition (too few elements)",
|
|
}, json.dumps(data, indent=2)
|
|
|
|
def test_cnameRecords_rejects_empty_string_position(self, api_session):
|
|
data = _j(api_session.patch(f"{FTL_URL}/api/config",
|
|
json={"config": {"dns": {"cnameRecords": ["a,b,c", "a,b,c,,c"]}}}, timeout=20))
|
|
assert data["error"] == {
|
|
"key": "bad_request",
|
|
"message": "Config item validation failed",
|
|
"hint": "dns.cnameRecords[1]: contains an empty string at position 3",
|
|
}, json.dumps(data, indent=2)
|
|
|
|
def test_cnameRecords_rejects_non_string_element(self, api_session):
|
|
data = _j(api_session.patch(f"{FTL_URL}/api/config",
|
|
json={"config": {"dns": {"cnameRecords": ["a,b,c", "a,b,c", 5]}}}, timeout=20))
|
|
assert data["error"] == {
|
|
"key": "bad_request",
|
|
"message": "Config item is invalid",
|
|
"hint": "dns.cnameRecords: array has invalid elements",
|
|
}, json.dumps(data, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Envvar-protected config: cannot change via API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEnvvarProtectedConfig:
|
|
|
|
def test_api_rejects_envvar_override(self, api_session):
|
|
"""API cannot change misc.nice when set via FTLCONF_misc_nice."""
|
|
data = _j(api_session.patch(f"{FTL_URL}/api/config/misc/nice",
|
|
json={"config": {"misc": {"nice": -12}}}, timeout=20))
|
|
assert data["error"] == {
|
|
"key": "bad_request",
|
|
"message": "Config items set via environment variables cannot be changed via the API",
|
|
"hint": "misc.nice",
|
|
}, json.dumps(data, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Domain search
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDomainSearch:
|
|
|
|
def test_nonexistent_domain(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/search/non.existent", timeout=5))
|
|
search = data["search"]
|
|
assert search["domains"] == []
|
|
assert search["gravity"] == []
|
|
assert search["results"] == {
|
|
"domains": {"exact": 0, "regex": 0},
|
|
"gravity": {"allow": 0, "block": 0},
|
|
"total": 0,
|
|
}, json.dumps(data, indent=2)
|
|
assert search["parameters"] == {
|
|
"N": 20,
|
|
"partial": False,
|
|
"domain": "non.existent",
|
|
"debug": False,
|
|
}, json.dumps(data, indent=2)
|
|
|
|
def test_antigravity_domain(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/search/antigravity.ftl", timeout=5))
|
|
search = data["search"]
|
|
assert search["results"] == {
|
|
"domains": {"exact": 0, "regex": 0},
|
|
"gravity": {"allow": 2, "block": 1},
|
|
"total": 3,
|
|
}, json.dumps(data, indent=2)
|
|
assert search["domains"] == []
|
|
|
|
gravity = search["gravity"]
|
|
assert len(gravity) == 3, json.dumps(gravity, indent=2)
|
|
|
|
# Block list match
|
|
g0 = gravity[0]
|
|
assert g0["domain"] == "antigravity.ftl"
|
|
assert g0["type"] == "block"
|
|
assert g0["address"] == "https://pi-hole.net/block.txt"
|
|
assert g0["comment"] == "Fake block-list"
|
|
assert g0["enabled"] is True
|
|
assert g0["id"] == 1
|
|
assert g0["number"] == 2000
|
|
assert g0["invalid_domains"] == 2
|
|
assert g0["groups"] == [0, 2]
|
|
|
|
# Allow list match (exact domain)
|
|
g1 = gravity[1]
|
|
assert g1["domain"] == "antigravity.ftl"
|
|
assert g1["type"] == "allow"
|
|
assert g1["address"] == "https://pi-hole.net/allow.txt"
|
|
assert g1["comment"] == "Fake allow-list"
|
|
assert g1["id"] == 2
|
|
assert g1["groups"] == [0]
|
|
|
|
# Allow list match (ABP-style antigravity entry)
|
|
g2 = gravity[2]
|
|
assert g2["domain"] == "@@||antigravity.ftl^"
|
|
assert g2["type"] == "allow"
|
|
assert g2["id"] == 2
|
|
|
|
def test_punycode_normalization(self, api_session):
|
|
"""Internationalized domain names should be normalized to punycode."""
|
|
data = _j(api_session.get(f"{FTL_URL}/api/search/\u00e4BC.com",
|
|
params={"debug": "true"}, timeout=5))
|
|
assert data["search"]["debug"]["punycode"] == "xn--bc-uia.com", \
|
|
json.dumps(data, indent=2)
|
|
assert data["search"]["results"]["total"] == 0
|
|
|
|
def test_punycode_domain_accepted(self, api_session):
|
|
"""Punycode domains (e.g. emoji IDNs) must not be rejected by the API.
|
|
|
|
Regression test for https://github.com/pi-hole/FTL/issues/2837
|
|
libidn2 rejects punycode for characters not in IDNA2008 (e.g. emoji),
|
|
but the ASCII punycode form is a perfectly valid DNS name.
|
|
xn--4ca0bs45142c.com is the punycode encoding of äöü😀.com.
|
|
"""
|
|
data = _j(api_session.get(f"{FTL_URL}/api/search/xn--4ca0bs45142c.com",
|
|
params={"debug": "true"}, timeout=5))
|
|
assert data["search"]["debug"]["punycode"] == "xn--4ca0bs45142c.com", \
|
|
json.dumps(data, indent=2)
|
|
# The domain does not exist in gravity, so total should be 0
|
|
assert data["search"]["results"]["total"] == 0
|
|
|
|
def test_partial_matching(self, api_session):
|
|
"""Partial matching returns substring hits in gravity."""
|
|
data = _j(api_session.get(f"{FTL_URL}/api/search/gravity",
|
|
params={"partial": "true"}, timeout=5))
|
|
search = data["search"]
|
|
assert search["parameters"]["partial"] is True
|
|
assert search["results"]["total"] > 0, \
|
|
f"Expected partial matches for 'gravity':\n{json.dumps(data, indent=2)}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# History
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHistory:
|
|
|
|
def test_history_returns_24h(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/history", timeout=5))
|
|
assert len(data["history"]) == 145, \
|
|
f"Expected 145 history entries (24h in 10-min slots), got {len(data['history'])}"
|
|
# Verify each slot has the expected structure
|
|
slot = data["history"][0]
|
|
for key in ("timestamp", "total", "cached", "blocked", "forwarded"):
|
|
assert key in slot, f"Missing key '{key}' in history slot: {slot}"
|
|
|
|
def test_history_clients_returns_24h(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/history/clients", timeout=5))
|
|
assert len(data["history"]) == 145, \
|
|
f"Expected 145 history entries, got {len(data['history'])}"
|
|
assert "clients" in data, f"Missing 'clients' key:\n{json.dumps(data, indent=2)}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestLists:
|
|
|
|
def test_block_lists_only(self, api_session):
|
|
lists = _j(api_session.get(f"{FTL_URL}/api/lists?type=block", timeout=5))["lists"]
|
|
assert len(lists) == 1, f"Expected 1 block list:\n{json.dumps(lists, indent=2)}"
|
|
bl = lists[0]
|
|
assert bl["type"] == "block"
|
|
assert bl["address"] == "https://pi-hole.net/block.txt"
|
|
assert bl["comment"] == "Fake block-list"
|
|
assert bl["enabled"] is True
|
|
assert bl["id"] == 1
|
|
assert bl["number"] == 2000
|
|
assert bl["invalid_domains"] == 2
|
|
assert bl["abp_entries"] == 0
|
|
assert bl["status"] == 1
|
|
assert bl["groups"] == [0, 2]
|
|
|
|
def test_allow_lists_only(self, api_session):
|
|
lists = _j(api_session.get(f"{FTL_URL}/api/lists?type=allow", timeout=5))["lists"]
|
|
assert len(lists) == 1, f"Expected 1 allow list:\n{json.dumps(lists, indent=2)}"
|
|
al = lists[0]
|
|
assert al["type"] == "allow"
|
|
assert al["address"] == "https://pi-hole.net/allow.txt"
|
|
assert al["comment"] == "Fake allow-list"
|
|
assert al["enabled"] is True
|
|
assert al["id"] == 2
|
|
assert al["number"] == 2000
|
|
assert al["invalid_domains"] == 2
|
|
assert al["abp_entries"] == 0
|
|
assert al["status"] == 1
|
|
assert al["groups"] == [0]
|
|
|
|
def test_all_lists_includes_both_types(self, api_session):
|
|
lists = _j(api_session.get(f"{FTL_URL}/api/lists", timeout=5))["lists"]
|
|
assert len(lists) == 2, f"Expected 2 lists:\n{json.dumps(lists, indent=2)}"
|
|
types = {lst["type"] for lst in lists}
|
|
assert types == {"block", "allow"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Queries
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestQueries:
|
|
|
|
def test_no_unknown_reply(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/queries?reply=UNKNOWN", timeout=5))
|
|
assert data["queries"] == []
|
|
assert data["recordsFiltered"] == 0, json.dumps(data, indent=2)
|
|
|
|
def test_no_unknown_status(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/queries?status=UNKNOWN", timeout=5))
|
|
assert data["queries"] == []
|
|
assert data["recordsFiltered"] == 0, json.dumps(data, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lua server pages
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestLuaServerPages:
|
|
|
|
def test_lua_page_outside_admin_not_served_by_default(self, api_session):
|
|
"""Lua server page outside /admin is not served when serve_all is off."""
|
|
set_config(api_session, "webserver.serve_all", False)
|
|
r = api_session.head(f"{FTL_URL}/broken_lua", timeout=5)
|
|
assert r.status_code == 404
|
|
|
|
def test_lua_page_generates_proper_backtrace(self, api_session):
|
|
"""Lua server page generates proper backtrace on error."""
|
|
set_config(api_session, "webserver.serve_all", True)
|
|
r = api_session.get(f"{FTL_URL}/broken_lua", timeout=5)
|
|
lines = r.text.splitlines()
|
|
assert lines[0] == "Hello, world 1!", f"Unexpected response:\n{r.text}"
|
|
assert lines[1] == "Hello, world 2!"
|
|
assert 'Cannot include [/var/www/html/does_not_exist.lp]: not found' in lines[2]
|
|
assert lines[3] == "stack traceback:"
|
|
|
|
def test_lua_page_outside_webhome_served_without_login(self, api_session):
|
|
"""After serve_all is enabled, Lua pages are served without login."""
|
|
r = api_session.get(f"{FTL_URL}/broken_lua", timeout=5)
|
|
lines = r.text.splitlines()
|
|
assert lines[0] == "Hello, world 1!", f"Unexpected response:\n{r.text}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DNS blocking status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDNSBlocking:
|
|
|
|
def test_blocking_enabled(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/dns/blocking", timeout=5))
|
|
assert data["blocking"] == "enabled"
|
|
assert data["timer"] is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Domains
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDomains:
|
|
|
|
def test_allow_exact(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/domains/allow/exact", timeout=5))
|
|
domains = data["domains"]
|
|
names = [d["domain"] for d in domains]
|
|
assert "allowed.ftl" in names, json.dumps(domains, indent=2)
|
|
assert "regex1.ftl" in names
|
|
assert "mask.icloud.com" in names
|
|
for d in domains:
|
|
assert d["type"] == "allow"
|
|
assert d["kind"] == "exact"
|
|
|
|
def test_allow_regex(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/domains/allow/regex", timeout=5))
|
|
domains = data["domains"]
|
|
assert len(domains) == 2, json.dumps(domains, indent=2)
|
|
assert domains[0] == {
|
|
"domain": "regex2", "unicode": "regex2",
|
|
"type": "allow", "kind": "regex", "comment": "",
|
|
"groups": [0], "enabled": True,
|
|
"id": 3, "date_added": 1559928803, "date_modified": 1559928803,
|
|
}
|
|
assert domains[1]["domain"] == "^gravity-allowed"
|
|
assert domains[1]["id"] == 4
|
|
|
|
def test_deny_exact(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/domains/deny/exact", timeout=5))
|
|
domains = data["domains"]
|
|
names = [d["domain"] for d in domains]
|
|
assert "denied.ftl" in names, json.dumps(domains, indent=2)
|
|
assert "blacklisted-group-disabled.com" in names
|
|
for d in domains:
|
|
assert d["type"] == "deny"
|
|
assert d["kind"] == "exact"
|
|
|
|
def test_deny_regex(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/domains/deny/regex", timeout=5))
|
|
domains = data["domains"]
|
|
assert len(domains) == 11, \
|
|
f"Expected 11 deny regex, got {len(domains)}:\n{json.dumps(domains, indent=2)}"
|
|
assert domains[0]["domain"] == "regex[0-9].ftl"
|
|
assert domains[0]["id"] == 6
|
|
assert domains[0]["groups"] == [0, 2]
|
|
for d in domains:
|
|
assert d["type"] == "deny"
|
|
assert d["kind"] == "regex"
|
|
|
|
def test_all_domains(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/domains", timeout=5))
|
|
domains = data["domains"]
|
|
types = {d["type"] for d in domains}
|
|
kinds = {d["kind"] for d in domains}
|
|
assert types == {"allow", "deny"}
|
|
assert kinds == {"exact", "regex"}
|
|
|
|
def test_single_domain_lookup(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/domains/deny/exact/denied.ftl", timeout=5))
|
|
domains = data["domains"]
|
|
assert len(domains) == 1, json.dumps(domains, indent=2)
|
|
assert domains[0]["domain"] == "denied.ftl"
|
|
assert domains[0]["comment"] == "Migrated from /etc/pihole/blacklist.txt"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Groups
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGroups:
|
|
|
|
def test_all_groups(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/groups", timeout=5))
|
|
groups = data["groups"]
|
|
assert len(groups) == 6, json.dumps(groups, indent=2)
|
|
names = {g["name"] for g in groups}
|
|
assert "Default" in names
|
|
assert "Test group" in names
|
|
assert "Second test group" in names
|
|
|
|
default = next(g for g in groups if g["name"] == "Default")
|
|
assert default["id"] == 0
|
|
assert default["enabled"] is True
|
|
assert default["comment"] == "The default group"
|
|
|
|
disabled = next(g for g in groups if g["name"] == "Test group")
|
|
assert disabled["id"] == 1
|
|
assert disabled["enabled"] is False
|
|
|
|
def test_single_group_lookup(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/groups/Default", timeout=5))
|
|
groups = data["groups"]
|
|
assert len(groups) == 1, json.dumps(groups, indent=2)
|
|
assert groups[0]["name"] == "Default"
|
|
assert groups[0]["id"] == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stats summary
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStatsSummary:
|
|
|
|
def test_summary_structure(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/stats/summary", timeout=5), dump="stats_summary")
|
|
q = data["queries"]
|
|
assert q["total"] == TOTAL, json.dumps(data, indent=2)
|
|
assert q["blocked"] == 49
|
|
assert q["forwarded"] == FORWARDED
|
|
assert q["cached"] == 41
|
|
assert q["unique_domains"] == 77
|
|
assert q["status"]["UNKNOWN"] == 0
|
|
assert q["status"]["GRAVITY"] == 7
|
|
assert q["status"]["FORWARDED"] == FORWARDED
|
|
assert q["status"]["CACHE"] == 41
|
|
assert q["status"]["REGEX"] == 21
|
|
assert q["status"]["DENYLIST"] == 4
|
|
assert q["status"]["SPECIAL_DOMAIN"] == 2
|
|
assert q["types"]["A"] == 69
|
|
assert q["types"]["AAAA"] == 19
|
|
|
|
assert data["clients"]["active"] == 11
|
|
assert data["clients"]["total"] == 11
|
|
assert data["gravity"]["domains_being_blocked"] == 8
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stats: top domains
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStatsTopDomains:
|
|
|
|
def test_top_domains_sorted_descending(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_domains", timeout=5), dump="top_domains")
|
|
domains = data["domains"]
|
|
assert len(domains) > 0
|
|
counts = [d["count"] for d in domains]
|
|
assert counts == sorted(counts, reverse=True), \
|
|
f"Not sorted descending: {counts}"
|
|
assert data["total_queries"] == TOTAL
|
|
assert data["blocked_queries"] == 49
|
|
|
|
def test_top_domains_blocked(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_domains?blocked=true", timeout=5))
|
|
domains = data["domains"]
|
|
names = [d["domain"] for d in domains]
|
|
assert "gravity.ftl" in names, json.dumps(domains, indent=2)
|
|
counts = [d["count"] for d in domains]
|
|
assert counts == sorted(counts, reverse=True)
|
|
|
|
def test_top_domains_permitted_excludes_gravity(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_domains?blocked=false", timeout=5))
|
|
names = [d["domain"] for d in data["domains"]]
|
|
assert "gravity.ftl" not in names, \
|
|
f"gravity.ftl should not be in permitted domains:\n{json.dumps(data, indent=2)}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stats: top clients
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStatsTopClients:
|
|
|
|
def test_top_clients_sorted_descending(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_clients", timeout=5), dump="top_clients")
|
|
clients = data["clients"]
|
|
assert len(clients) > 0
|
|
assert clients[0]["ip"] == "127.0.0.1"
|
|
counts = [c["count"] for c in clients]
|
|
assert counts == sorted(counts, reverse=True), \
|
|
f"Not sorted descending: {counts}"
|
|
assert data["total_queries"] == TOTAL
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stats: upstreams
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStatsUpstreams:
|
|
|
|
def test_upstreams(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/stats/upstreams", timeout=5), dump="upstreams")
|
|
upstreams = data["upstreams"]
|
|
assert len(upstreams) == 4, json.dumps(upstreams, indent=2)
|
|
assert data["total_queries"] == TOTAL
|
|
assert data["forwarded_queries"] == FORWARDED
|
|
|
|
blocklist = next(u for u in upstreams if u["ip"] == "blocklist")
|
|
assert blocklist["count"] == 49
|
|
assert blocklist["port"] == -1
|
|
|
|
cache = next(u for u in upstreams if u["ip"] == "cache")
|
|
assert cache["count"] == 41
|
|
assert cache["port"] == -1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stats: query types
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStatsQueryTypes:
|
|
|
|
def test_query_types(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/stats/query_types", timeout=5), dump="query_types")
|
|
assert data["types"] == {
|
|
"A": 69, "AAAA": 19, "ANY": 3, "SRV": 1, "SOA": 0,
|
|
"PTR": 8, "TXT": 10, "NAPTR": 1, "MX": 1, "DS": 7,
|
|
"RRSIG": 0, "DNSKEY": DNSKEY, "NS": 0, "SVCB": 3, "HTTPS": 3,
|
|
"OTHER": 1,
|
|
}, json.dumps(data, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stats: recent blocked
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStatsRecentBlocked:
|
|
|
|
def test_recent_blocked(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/stats/recent_blocked", timeout=5))
|
|
assert "denied.ftl" in data["blocked"], json.dumps(data, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stats: database endpoints (require from/until parameters)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStatsDatabase:
|
|
|
|
def test_database_endpoints_require_time_range(self, api_session):
|
|
"""Database stats endpoints return 400 without from/until."""
|
|
for endpoint in ("query_types", "summary", "top_clients",
|
|
"top_domains", "upstreams"):
|
|
data = _j(api_session.get(
|
|
f"{FTL_URL}/api/stats/database/{endpoint}", timeout=5))
|
|
assert data["error"]["key"] == "bad_request", \
|
|
f"/api/stats/database/{endpoint}: {json.dumps(data, indent=2)}"
|
|
assert "from" in data["error"]["message"]
|
|
assert "until" in data["error"]["message"]
|
|
|
|
def test_database_summary_with_range(self, api_session):
|
|
data = _j(api_session.get(
|
|
f"{FTL_URL}/api/stats/database/summary?from=1&until=9999999999",
|
|
timeout=5))
|
|
for key in ("sum_queries", "sum_blocked", "percent_blocked",
|
|
"total_clients"):
|
|
assert key in data, f"Missing key '{key}' in database summary"
|
|
|
|
def test_database_top_domains_with_range(self, api_session):
|
|
data = _j(api_session.get(
|
|
f"{FTL_URL}/api/stats/database/top_domains?from=1&until=9999999999",
|
|
timeout=5))
|
|
assert "domains" in data
|
|
assert isinstance(data["domains"], list)
|
|
assert "total_queries" in data
|
|
|
|
def test_database_top_clients_with_range(self, api_session):
|
|
data = _j(api_session.get(
|
|
f"{FTL_URL}/api/stats/database/top_clients?from=1&until=9999999999",
|
|
timeout=5))
|
|
assert "clients" in data
|
|
assert isinstance(data["clients"], list)
|
|
assert "total_queries" in data
|
|
|
|
def test_database_upstreams_with_range(self, api_session):
|
|
data = _j(api_session.get(
|
|
f"{FTL_URL}/api/stats/database/upstreams?from=1&until=9999999999",
|
|
timeout=5))
|
|
assert "upstreams" in data
|
|
assert isinstance(data["upstreams"], list)
|
|
|
|
def test_database_query_types_with_range(self, api_session):
|
|
data = _j(api_session.get(
|
|
f"{FTL_URL}/api/stats/database/query_types?from=1&until=9999999999",
|
|
timeout=5))
|
|
assert "types" in data
|
|
assert isinstance(data["types"], dict)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DHCP leases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDHCPLeases:
|
|
|
|
def test_no_leases(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/dhcp/leases", timeout=5))
|
|
assert data["leases"] == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Endpoints listing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEndpoints:
|
|
|
|
def test_endpoints_has_all_methods(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/endpoints", timeout=5))
|
|
eps = data["endpoints"]
|
|
for method in ("get", "post", "put", "patch", "delete"):
|
|
assert method in eps, f"Missing method '{method}':\n{json.dumps(eps.keys(), indent=2)}"
|
|
# GET should have the most endpoints
|
|
assert len(eps["get"]) > 20
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Info endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestInfo:
|
|
|
|
def test_info_ftl(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/info/ftl", timeout=5), dump="info_ftl")
|
|
ftl = data["ftl"]
|
|
db = ftl["database"]
|
|
assert db["gravity"] == 8, json.dumps(db, indent=2)
|
|
assert db["groups"] == 5
|
|
assert db["lists"] == 2
|
|
assert db["clients"] == 5
|
|
assert db["domains"]["allowed"] == {"total": 3, "enabled": 3}
|
|
assert db["domains"]["denied"] == {"total": 2, "enabled": 2}
|
|
assert db["regex"]["allowed"] == {"total": 2, "enabled": 2}
|
|
assert db["regex"]["denied"] == {"total": 11, "enabled": 11}
|
|
assert ftl["privacy_level"] == 0
|
|
assert ftl["clients"]["total"] == 11
|
|
assert ftl["clients"]["active"] == 11
|
|
|
|
def test_info_login(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/info/login", timeout=5))
|
|
assert data["dns"] is True
|
|
assert data["https_port"] == 443
|
|
|
|
def test_info_version(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/info/version", timeout=5))
|
|
v = data["version"]
|
|
assert "ftl" in v
|
|
assert "local" in v["ftl"]
|
|
assert v["ftl"]["local"]["version"].startswith("v")
|
|
assert "hash" in v["ftl"]["local"]
|
|
|
|
def test_info_messages(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/info/messages", timeout=5))
|
|
assert "messages" in data
|
|
assert isinstance(data["messages"], list)
|
|
|
|
def test_info_messages_count(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/info/messages/count", timeout=5))
|
|
assert "count" in data
|
|
assert isinstance(data["count"], int)
|
|
|
|
def test_info_client(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/info/client", timeout=5))
|
|
assert data["remote_addr"] == "127.0.0.1"
|
|
assert data["http_version"] == "1.1"
|
|
assert data["method"] == "GET"
|
|
|
|
def test_info_database(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/info/database", timeout=5))
|
|
assert data["type"] == "Regular file"
|
|
assert data["mode"] == "rw-r-----"
|
|
assert data["owner"]["user"]["name"] == "pihole"
|
|
assert data["owner"]["group"]["name"] == "pihole"
|
|
assert data["queries"] > 0
|
|
assert data["sqlite_version"].startswith("3.")
|
|
|
|
def test_info_system(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/info/system", timeout=5))
|
|
s = data["system"]
|
|
assert "uptime" in s
|
|
assert s["memory"]["ram"]["total"] > 0
|
|
assert s["cpu"]["nprocs"] > 0
|
|
assert "ftl" in s
|
|
assert "%mem" in s["ftl"]
|
|
assert "%cpu" in s["ftl"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth (read-only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestAuthReadOnly:
|
|
|
|
def test_totp_suggestion(self, api_session):
|
|
"""GET /api/auth/totp returns TOTP credential suggestions."""
|
|
data = _j(api_session.get(f"{FTL_URL}/api/auth/totp", timeout=5))
|
|
totp = data["totp"]
|
|
assert isinstance(totp["secret"], str)
|
|
assert len(totp["secret"]) > 0
|
|
assert totp["digits"] == 6
|
|
assert totp["period"] == 30
|
|
assert "algorithm" in totp
|
|
assert isinstance(totp["codes"], list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Network
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestNetwork:
|
|
|
|
def test_network_devices(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/network/devices", timeout=5))
|
|
devices = data["devices"]
|
|
hwaddrs = [d["hwaddr"] for d in devices]
|
|
assert "aa:bb:cc:dd:ee:ff" in hwaddrs, json.dumps(hwaddrs, indent=2)
|
|
assert "ip-127.0.0.1" in hwaddrs
|
|
|
|
def test_network_interfaces(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/network/interfaces", timeout=5))
|
|
ifaces = data["interfaces"]
|
|
names = [i["name"] for i in ifaces]
|
|
assert "lo" in names, json.dumps(names, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestLogs:
|
|
|
|
def test_dnsmasq_log(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/logs/dnsmasq", timeout=5))
|
|
assert len(data["log"]) > 0
|
|
entry = data["log"][0]
|
|
assert "timestamp" in entry
|
|
assert "message" in entry
|
|
|
|
def test_ftl_log(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/logs/ftl", timeout=5))
|
|
assert len(data["log"]) > 0
|
|
entry = data["log"][0]
|
|
assert "timestamp" in entry
|
|
assert "message" in entry
|
|
assert "prio" in entry
|
|
|
|
def test_webserver_log(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/logs/webserver", timeout=5))
|
|
assert len(data["log"]) > 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PADD
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPADD:
|
|
|
|
def test_padd(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/padd", timeout=5), dump="padd")
|
|
assert data["blocking"] == "enabled"
|
|
assert data["gravity_size"] == 8
|
|
assert data["active_clients"] == 11
|
|
assert data["top_domain"] == TOP_DOMAIN
|
|
assert data["top_blocked"] == "gravity.ftl"
|
|
assert data["top_client"] == "127.0.0.1"
|
|
q = data["queries"]
|
|
assert q["total"] == TOTAL, json.dumps(data, indent=2)
|
|
assert q["blocked"] == 49
|
|
cache = data["cache"]
|
|
assert cache["size"] == 10000
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Clients
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestClients:
|
|
|
|
def test_all_clients(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/clients", timeout=5))
|
|
clients = data["clients"]
|
|
assert len(clients) == 5, \
|
|
f"Expected 5 clients:\n{json.dumps(clients, indent=2)}"
|
|
names = [c["client"] for c in clients]
|
|
assert "127.0.0.1" in names
|
|
assert "127.0.0.2" in names
|
|
assert "aa:bb:cc:dd:ee:ff" in names
|
|
assert ":enp0s123" in names
|
|
|
|
def test_single_client_lookup(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/clients/127.0.0.1", timeout=5))
|
|
clients = data["clients"]
|
|
assert len(clients) == 1, json.dumps(clients, indent=2)
|
|
c = clients[0]
|
|
assert c["client"] == "127.0.0.1"
|
|
assert c["groups"] == [0]
|
|
|
|
def test_client_suggestions(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/clients/_suggestions", timeout=5))
|
|
assert "clients" in data
|
|
assert isinstance(data["clients"], list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config (GET)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestConfigGet:
|
|
|
|
def test_full_config(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/config", timeout=5))
|
|
config = data["config"]
|
|
assert "dns" in config
|
|
assert "webserver" in config
|
|
assert "misc" in config
|
|
assert "debug" in config
|
|
assert "database" in config
|
|
assert config["dns"]["CNAMEdeepInspect"] is True
|
|
assert config["dns"]["blockESNI"] is True
|
|
|
|
def test_config_element(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/config/dns/upstreams", timeout=5))
|
|
config = data["config"]
|
|
upstreams = config["dns"]["upstreams"]
|
|
assert isinstance(upstreams, list)
|
|
assert len(upstreams) > 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Network (additional)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestNetworkAdditional:
|
|
|
|
def test_network_gateway(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/network/gateway", timeout=5))
|
|
assert "gateway" in data
|
|
assert isinstance(data["gateway"], list)
|
|
|
|
def test_network_routes(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/network/routes", timeout=5))
|
|
assert "routes" in data
|
|
assert isinstance(data["routes"], list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Info (additional)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestInfoAdditional:
|
|
|
|
def test_info_host(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/info/host", timeout=5))
|
|
host = data["host"]
|
|
uname = host["uname"]
|
|
assert "sysname" in uname
|
|
assert "nodename" in uname
|
|
assert "release" in uname
|
|
assert "machine" in uname
|
|
|
|
def test_info_sensors(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/info/sensors", timeout=5))
|
|
sensors = data["sensors"]
|
|
assert "list" in sensors
|
|
assert isinstance(sensors["list"], list)
|
|
assert "unit" in sensors
|
|
|
|
def test_info_metrics(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/info/metrics", timeout=5))
|
|
m = data["metrics"]
|
|
dns = m["dns"]
|
|
assert dns["cache"]["size"] > 0
|
|
assert "replies" in dns
|
|
assert dns["replies"]["sum"] > 0
|
|
assert "dhcp" in m
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Queries (additional)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestQueriesAdditional:
|
|
|
|
def test_queries_default(self, api_session):
|
|
"""Default query (no filters) returns up to 100 results."""
|
|
data = _j(api_session.get(f"{FTL_URL}/api/queries", timeout=5))
|
|
assert "queries" in data
|
|
queries = data["queries"]
|
|
assert isinstance(queries, list)
|
|
assert len(queries) > 0
|
|
assert data["recordsTotal"] == TOTAL
|
|
# Check structure of a query entry
|
|
q = queries[0]
|
|
assert "id" in q
|
|
assert "time" in q
|
|
assert "type" in q
|
|
assert "domain" in q
|
|
assert "status" in q
|
|
assert "client" in q
|
|
assert "ip" in q["client"]
|
|
assert "reply" in q
|
|
assert "type" in q["reply"]
|
|
|
|
def test_queries_with_length(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/queries?length=5", timeout=5))
|
|
assert len(data["queries"]) == 5
|
|
|
|
def test_queries_filter_by_type(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/queries?type=AAAA", timeout=5))
|
|
for q in data["queries"]:
|
|
assert q["type"] == "AAAA", \
|
|
f"Expected type AAAA, got {q['type']}"
|
|
|
|
def test_queries_filter_by_status(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/queries?status=GRAVITY", timeout=5))
|
|
assert data["recordsFiltered"] > 0
|
|
for q in data["queries"]:
|
|
assert q["status"] == "GRAVITY"
|
|
|
|
def test_queries_filter_by_domain(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/queries?domain=gravity.ftl", timeout=5))
|
|
assert data["recordsFiltered"] > 0
|
|
for q in data["queries"]:
|
|
assert q["domain"] == "gravity.ftl"
|
|
|
|
def test_queries_filter_by_client_ip(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/queries?client_ip=127.0.0.1", timeout=5))
|
|
assert data["recordsFiltered"] > 0
|
|
for q in data["queries"]:
|
|
assert q["client"]["ip"] == "127.0.0.1"
|
|
|
|
def test_queries_filter_by_upstream_blocklist(self, api_session):
|
|
"""upstream=blocklist is a pseudo-upstream that matches all blocked queries."""
|
|
data = _j(api_session.get(f"{FTL_URL}/api/queries?upstream=blocklist", timeout=5))
|
|
assert data["recordsFiltered"] > 0
|
|
blocked_statuses = {"GRAVITY", "REGEX", "DENYLIST", "SPECIAL_DOMAIN",
|
|
"GRAVITY_CNAME", "REGEX_CNAME", "DENYLIST_CNAME",
|
|
"EXTERNAL_BLOCKED_IP", "EXTERNAL_BLOCKED_NULL",
|
|
"EXTERNAL_BLOCKED_NXRA", "EXTERNAL_BLOCKED_EDE15",
|
|
"DBBUSY"}
|
|
for q in data["queries"]:
|
|
assert q["status"] in blocked_statuses, \
|
|
f"Expected blocked status, got {q['status']}"
|
|
|
|
def test_queries_filter_by_upstream_address(self, api_session):
|
|
"""Filtering by an actual upstream address."""
|
|
data = _j(api_session.get(
|
|
f"{FTL_URL}/api/queries?upstream=127.0.0.1%235555", timeout=5))
|
|
assert data["recordsFiltered"] > 0
|
|
for q in data["queries"]:
|
|
assert q["upstream"] == "127.0.0.1#5555"
|
|
|
|
def test_queries_cursor_pagination(self, api_session):
|
|
"""Cursor + start offset returns non-overlapping pages."""
|
|
page1 = _j(api_session.get(f"{FTL_URL}/api/queries?length=5", timeout=5))
|
|
assert len(page1["queries"]) == 5
|
|
cursor = page1["cursor"]
|
|
assert isinstance(cursor, int)
|
|
|
|
# Page 2: same cursor, offset by start=5
|
|
page2 = _j(api_session.get(
|
|
f"{FTL_URL}/api/queries?length=5&cursor={cursor}&start=5", timeout=5))
|
|
assert len(page2["queries"]) == 5
|
|
|
|
ids1 = {q["id"] for q in page1["queries"]}
|
|
ids2 = {q["id"] for q in page2["queries"]}
|
|
assert ids1.isdisjoint(ids2), \
|
|
f"Pages overlap: {ids1 & ids2}"
|
|
|
|
def test_queries_suggestions(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/queries/suggestions", timeout=5))
|
|
s = data["suggestions"]
|
|
assert "domain" in s
|
|
assert "client_ip" in s
|
|
assert "type" in s
|
|
assert "status" in s
|
|
assert "reply" in s
|
|
assert isinstance(s["domain"], list)
|
|
assert len(s["domain"]) > 0
|
|
assert "127.0.0.1" in s["client_ip"]
|
|
assert "A" in s["type"]
|
|
assert "AAAA" in s["type"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# History (additional -- database endpoints)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHistoryDatabase:
|
|
|
|
def test_history_database_requires_params(self, api_session):
|
|
"""Database history endpoints return 400 without from/until."""
|
|
data = _j(api_session.get(f"{FTL_URL}/api/history/database", timeout=5))
|
|
assert data["error"]["key"] == "bad_request"
|
|
|
|
def test_history_database_with_range(self, api_session):
|
|
data = _j(api_session.get(
|
|
f"{FTL_URL}/api/history/database?from=0&until=9999999999", timeout=5))
|
|
assert "history" in data
|
|
assert isinstance(data["history"], list)
|
|
|
|
def test_history_database_clients_requires_params(self, api_session):
|
|
data = _j(api_session.get(f"{FTL_URL}/api/history/database/clients", timeout=5))
|
|
assert data["error"]["key"] == "bad_request"
|
|
|
|
def test_history_database_clients_with_range(self, api_session):
|
|
data = _j(api_session.get(
|
|
f"{FTL_URL}/api/history/database/clients?from=1&until=9999999999", timeout=5))
|
|
assert "history" in data
|
|
assert "clients" in data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# NTP server (protocol-level, not HTTP)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestNTP:
|
|
|
|
def test_ntp_server_responds(self, api_session):
|
|
"""FTL's built-in NTP server returns a valid NTPv4 response."""
|
|
import socket
|
|
import struct
|
|
|
|
# NTP v3 client request
|
|
request = b'\x1b' + 47 * b'\0'
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.settimeout(2.0)
|
|
try:
|
|
sock.sendto(request, ('127.0.0.1', 123))
|
|
data, _ = sock.recvfrom(1024)
|
|
finally:
|
|
sock.close()
|
|
|
|
assert len(data) == 48, f"Expected 48-byte NTP packet, got {len(data)}"
|
|
|
|
# LI/VN/Mode byte: mode should be 4 (server)
|
|
mode = data[0] & 0x7
|
|
version = (data[0] >> 3) & 0x7
|
|
assert mode == 4, f"Expected NTP mode 4 (server), got {mode}"
|
|
assert version == 4, f"Expected NTPv4, got v{version}"
|
|
|
|
# Transmit timestamp (bytes 40-47): seconds since 1900-01-01
|
|
# should be close to current time (within 2 seconds)
|
|
import time
|
|
ntp_epoch_offset = 2208988800 # seconds between 1900 and 1970
|
|
tx_seconds = struct.unpack('!I', data[40:44])[0]
|
|
now_ntp = int(time.time()) + ntp_epoch_offset
|
|
drift = abs(tx_seconds - now_ntp)
|
|
assert drift <= 2, \
|
|
f"NTP transmit timestamp off by {drift}s (expected ≤2s)"
|