Files
FTL/test/api/test_api.py
T
Dominik bb89fbfe7b test: add comprehensive API endpoint tests, fix double-free in printFTLenv
Add pytest tests for all previously untested GET API endpoints:
dns/blocking, domains (all type/kind combinations and single lookup),
groups, stats/summary, stats/top_domains, stats/top_clients,
stats/upstreams, stats/query_types, stats/recent_blocked,
stats/database (error handling), dhcp/leases, endpoints, info/ftl,
info/login, info/version, info/messages, info/client, info/database,
info/system, network/devices, network/interfaces, logs (dnsmasq, ftl,
webserver), and padd.

All assertions use exact expected values derived from the deterministic
BATS DNS query seeding (137 total queries, 49 blocked, 47 forwarded,
41 cached, 11 active clients, 8 gravity domains).  On failure, the
full JSON response is dumped to /tmp/ftl_test_*.json for easy
inspection.

Fix double-free bug in printFTLenv() (src/config/env.c): when
printFTLenv() was called more than once (e.g. after config reload
triggered by the CLI password test), it would free item->error a
second time because neither the pointer nor the error_allocated flag
were reset after the first free.  This produced "Trying to free NULL
pointer in printFTLenv()" warnings.  Fix: set item->error = NULL and
item->error_allocated = false after freeing.

Files modified:
  src/config/env.c      — reset error/error_allocated after free
  test/api/test_api.py  — add 34 new endpoint tests (22 -> 56 total)

Signed-off-by: Dominik <dl6er@dl6er.de>
2026-03-25 13:29:17 +01:00

770 lines
30 KiB
Python

"""
Pi-hole FTL API integration tests — stats, lists, search, history,
config validation (API-side), HTTP errors, and Lua server pages.
These tests replace the equivalent curl-based BATS tests with native
Python assertions against a live FTL instance.
Usage:
pytest test/api/test_api.py -v
"""
import json
import pytest
FTL_URL = "http://127.0.0.1"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _j(response, dump=None):
"""Return parsed JSON, stripping the volatile ``took`` field.
If *dump* is given, write the full response to
``/tmp/ftl_test_<dump>.json`` (best-effort, ignored on failure)
so the expected values can be inspected after a test run.
"""
data = response.json()
data.pop("took", None)
if dump:
try:
with open(f"/tmp/ftl_test_{dump}.json", "w") as f:
json.dump(data, f, indent=2)
except OSError:
pass
return data
def set_config(api_session, dotted_key, value):
"""Set a FTL config item via the API.
Builds the nested JSON payload from a dotted key, e.g.
``set_config(s, "webserver.serve_all", True)`` sends
``PATCH /api/config/webserver/serve_all``
with ``{"config": {"webserver": {"serve_all": true}}}``.
"""
parts = dotted_key.split(".")
api_path = f"{FTL_URL}/api/config/" + "/".join(parts)
payload = value
for part in reversed(parts):
payload = {part: payload}
payload = {"config": payload}
r = api_session.patch(api_path, json=payload, timeout=20)
assert r.status_code == 200, \
f"Failed to set {dotted_key}: {r.status_code} {r.text}"
return r
# ---------------------------------------------------------------------------
# HTTP error responses
# ---------------------------------------------------------------------------
class TestHTTPErrors:
def test_api_404_returns_json(self, api_session):
"""HTTP server responds with JSON error 404 to unknown API path."""
data = _j(api_session.get(f"{FTL_URL}/api/undefined", timeout=5))
assert data["error"] == {
"key": "not_found",
"message": "Not found",
"hint": "/api/undefined",
}, json.dumps(data, indent=2)
def test_non_admin_path_returns_404(self, api_session):
"""HTTP server responds with 404 to path outside /admin."""
r = api_session.head(f"{FTL_URL}/undefined", timeout=5)
assert r.status_code == 404
# ---------------------------------------------------------------------------
# Config validation via API (type-based)
# ---------------------------------------------------------------------------
class TestConfigValidationAPIType:
def test_blockESNI_rejects_float(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"dns": {"blockESNI": 15.5}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item is invalid",
"hint": "dns.blockESNI: not of type bool",
}, json.dumps(data, indent=2)
def test_piholePTR_rejects_invalid_option(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"dns": {"piholePTR": "something_else"}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item is invalid",
"hint": "dns.piholePTR: invalid option",
}, json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Config validation via API (validator-based)
# ---------------------------------------------------------------------------
class TestConfigValidationAPIValidator:
def test_files_pcap_rejects_invalid_path(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"files": {"pcap": "%gh4b"}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item validation failed",
"hint": 'files.pcap: not a valid file path ("%gh4b")',
}, json.dumps(data, indent=2)
def test_cnameRecords_rejects_too_few_elements(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"dns": {"cnameRecords": ["a"]}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item validation failed",
"hint": "dns.cnameRecords[0]: not a valid CNAME definition (too few elements)",
}, json.dumps(data, indent=2)
def test_cnameRecords_rejects_empty_string_position(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"dns": {"cnameRecords": ["a,b,c", "a,b,c,,c"]}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item validation failed",
"hint": "dns.cnameRecords[1]: contains an empty string at position 3",
}, json.dumps(data, indent=2)
def test_cnameRecords_rejects_non_string_element(self, api_session):
data = _j(api_session.patch(f"{FTL_URL}/api/config",
json={"config": {"dns": {"cnameRecords": ["a,b,c", "a,b,c", 5]}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config item is invalid",
"hint": "dns.cnameRecords: array has invalid elements",
}, json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Envvar-protected config: cannot change via API
# ---------------------------------------------------------------------------
class TestEnvvarProtectedConfig:
def test_api_rejects_envvar_override(self, api_session):
"""API cannot change misc.nice when set via FTLCONF_misc_nice."""
data = _j(api_session.patch(f"{FTL_URL}/api/config/misc/nice",
json={"config": {"misc": {"nice": -12}}}, timeout=20))
assert data["error"] == {
"key": "bad_request",
"message": "Config items set via environment variables cannot be changed via the API",
"hint": "misc.nice",
}, json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Domain search
# ---------------------------------------------------------------------------
class TestDomainSearch:
def test_nonexistent_domain(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/search/non.existent", timeout=5))
search = data["search"]
assert search["domains"] == []
assert search["gravity"] == []
assert search["results"] == {
"domains": {"exact": 0, "regex": 0},
"gravity": {"allow": 0, "block": 0},
"total": 0,
}, json.dumps(data, indent=2)
assert search["parameters"] == {
"N": 20,
"partial": False,
"domain": "non.existent",
"debug": False,
}, json.dumps(data, indent=2)
def test_antigravity_domain(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/search/antigravity.ftl", timeout=5))
search = data["search"]
assert search["results"] == {
"domains": {"exact": 0, "regex": 0},
"gravity": {"allow": 2, "block": 1},
"total": 3,
}, json.dumps(data, indent=2)
assert search["domains"] == []
gravity = search["gravity"]
assert len(gravity) == 3, json.dumps(gravity, indent=2)
# Block list match
g0 = gravity[0]
assert g0["domain"] == "antigravity.ftl"
assert g0["type"] == "block"
assert g0["address"] == "https://pi-hole.net/block.txt"
assert g0["comment"] == "Fake block-list"
assert g0["enabled"] is True
assert g0["id"] == 1
assert g0["number"] == 2000
assert g0["invalid_domains"] == 2
assert g0["groups"] == [0, 2]
# Allow list match (exact domain)
g1 = gravity[1]
assert g1["domain"] == "antigravity.ftl"
assert g1["type"] == "allow"
assert g1["address"] == "https://pi-hole.net/allow.txt"
assert g1["comment"] == "Fake allow-list"
assert g1["id"] == 2
assert g1["groups"] == [0]
# Allow list match (ABP-style antigravity entry)
g2 = gravity[2]
assert g2["domain"] == "@@||antigravity.ftl^"
assert g2["type"] == "allow"
assert g2["id"] == 2
def test_punycode_normalization(self, api_session):
"""Internationalized domain names should be normalized to punycode."""
data = _j(api_session.get(f"{FTL_URL}/api/search/\u00e4BC.com",
params={"debug": "true"}, timeout=5))
assert data["search"]["debug"]["punycode"] == "xn--bc-uia.com", \
json.dumps(data, indent=2)
assert data["search"]["results"]["total"] == 0
# ---------------------------------------------------------------------------
# History
# ---------------------------------------------------------------------------
class TestHistory:
def test_history_returns_24h(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/history", timeout=5))
assert len(data["history"]) == 145, \
f"Expected 145 history entries (24h in 10-min slots), got {len(data['history'])}"
# Verify each slot has the expected structure
slot = data["history"][0]
for key in ("timestamp", "total", "cached", "blocked", "forwarded"):
assert key in slot, f"Missing key '{key}' in history slot: {slot}"
def test_history_clients_returns_24h(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/history/clients", timeout=5))
assert len(data["history"]) == 145, \
f"Expected 145 history entries, got {len(data['history'])}"
assert "clients" in data, f"Missing 'clients' key:\n{json.dumps(data, indent=2)}"
# ---------------------------------------------------------------------------
# Lists
# ---------------------------------------------------------------------------
class TestLists:
def test_block_lists_only(self, api_session):
lists = _j(api_session.get(f"{FTL_URL}/api/lists?type=block", timeout=5))["lists"]
assert len(lists) == 1, f"Expected 1 block list:\n{json.dumps(lists, indent=2)}"
bl = lists[0]
assert bl["type"] == "block"
assert bl["address"] == "https://pi-hole.net/block.txt"
assert bl["comment"] == "Fake block-list"
assert bl["enabled"] is True
assert bl["id"] == 1
assert bl["number"] == 2000
assert bl["invalid_domains"] == 2
assert bl["abp_entries"] == 0
assert bl["status"] == 1
assert bl["groups"] == [0, 2]
def test_allow_lists_only(self, api_session):
lists = _j(api_session.get(f"{FTL_URL}/api/lists?type=allow", timeout=5))["lists"]
assert len(lists) == 1, f"Expected 1 allow list:\n{json.dumps(lists, indent=2)}"
al = lists[0]
assert al["type"] == "allow"
assert al["address"] == "https://pi-hole.net/allow.txt"
assert al["comment"] == "Fake allow-list"
assert al["enabled"] is True
assert al["id"] == 2
assert al["number"] == 2000
assert al["invalid_domains"] == 2
assert al["abp_entries"] == 0
assert al["status"] == 1
assert al["groups"] == [0]
def test_all_lists_includes_both_types(self, api_session):
lists = _j(api_session.get(f"{FTL_URL}/api/lists", timeout=5))["lists"]
assert len(lists) == 2, f"Expected 2 lists:\n{json.dumps(lists, indent=2)}"
types = {lst["type"] for lst in lists}
assert types == {"block", "allow"}
# ---------------------------------------------------------------------------
# Queries
# ---------------------------------------------------------------------------
class TestQueries:
def test_no_unknown_reply(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/queries?reply=UNKNOWN", timeout=5))
assert data["queries"] == []
assert data["recordsFiltered"] == 0, json.dumps(data, indent=2)
def test_no_unknown_status(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/queries?status=UNKNOWN", timeout=5))
assert data["queries"] == []
assert data["recordsFiltered"] == 0, json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Lua server pages
# ---------------------------------------------------------------------------
class TestLuaServerPages:
def test_lua_page_outside_admin_not_served_by_default(self, api_session):
"""Lua server page outside /admin is not served when serve_all is off."""
set_config(api_session, "webserver.serve_all", False)
r = api_session.head(f"{FTL_URL}/broken_lua", timeout=5)
assert r.status_code == 404
def test_lua_page_generates_proper_backtrace(self, api_session):
"""Lua server page generates proper backtrace on error."""
set_config(api_session, "webserver.serve_all", True)
r = api_session.get(f"{FTL_URL}/broken_lua", timeout=5)
lines = r.text.splitlines()
assert lines[0] == "Hello, world 1!", f"Unexpected response:\n{r.text}"
assert lines[1] == "Hello, world 2!"
assert 'Cannot include [/var/www/html/does_not_exist.lp]: not found' in lines[2]
assert lines[3] == "stack traceback:"
def test_lua_page_outside_webhome_served_without_login(self, api_session):
"""After serve_all is enabled, Lua pages are served without login."""
r = api_session.get(f"{FTL_URL}/broken_lua", timeout=5)
lines = r.text.splitlines()
assert lines[0] == "Hello, world 1!", f"Unexpected response:\n{r.text}"
# ---------------------------------------------------------------------------
# DNS blocking status
# ---------------------------------------------------------------------------
class TestDNSBlocking:
def test_blocking_enabled(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/dns/blocking", timeout=5))
assert data["blocking"] == "enabled"
assert data["timer"] is None
# ---------------------------------------------------------------------------
# Domains
# ---------------------------------------------------------------------------
class TestDomains:
def test_allow_exact(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains/allow/exact", timeout=5))
domains = data["domains"]
names = [d["domain"] for d in domains]
assert "allowed.ftl" in names, json.dumps(domains, indent=2)
assert "regex1.ftl" in names
assert "mask.icloud.com" in names
for d in domains:
assert d["type"] == "allow"
assert d["kind"] == "exact"
def test_allow_regex(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains/allow/regex", timeout=5))
domains = data["domains"]
assert len(domains) == 2, json.dumps(domains, indent=2)
assert domains[0] == {
"domain": "regex2", "unicode": "regex2",
"type": "allow", "kind": "regex", "comment": "",
"groups": [0], "enabled": True,
"id": 3, "date_added": 1559928803, "date_modified": 1559928803,
}
assert domains[1]["domain"] == "^gravity-allowed"
assert domains[1]["id"] == 4
def test_deny_exact(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains/deny/exact", timeout=5))
domains = data["domains"]
names = [d["domain"] for d in domains]
assert "denied.ftl" in names, json.dumps(domains, indent=2)
assert "blacklisted-group-disabled.com" in names
for d in domains:
assert d["type"] == "deny"
assert d["kind"] == "exact"
def test_deny_regex(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains/deny/regex", timeout=5))
domains = data["domains"]
assert len(domains) == 11, \
f"Expected 11 deny regex, got {len(domains)}:\n{json.dumps(domains, indent=2)}"
assert domains[0]["domain"] == "regex[0-9].ftl"
assert domains[0]["id"] == 6
assert domains[0]["groups"] == [0, 2]
for d in domains:
assert d["type"] == "deny"
assert d["kind"] == "regex"
def test_all_domains(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains", timeout=5))
domains = data["domains"]
types = {d["type"] for d in domains}
kinds = {d["kind"] for d in domains}
assert types == {"allow", "deny"}
assert kinds == {"exact", "regex"}
def test_single_domain_lookup(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/domains/deny/exact/denied.ftl", timeout=5))
domains = data["domains"]
assert len(domains) == 1, json.dumps(domains, indent=2)
assert domains[0]["domain"] == "denied.ftl"
assert domains[0]["comment"] == "Migrated from /etc/pihole/blacklist.txt"
# ---------------------------------------------------------------------------
# Groups
# ---------------------------------------------------------------------------
class TestGroups:
def test_all_groups(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/groups", timeout=5))
groups = data["groups"]
assert len(groups) == 6, json.dumps(groups, indent=2)
names = {g["name"] for g in groups}
assert "Default" in names
assert "Test group" in names
assert "Second test group" in names
default = next(g for g in groups if g["name"] == "Default")
assert default["id"] == 0
assert default["enabled"] is True
assert default["comment"] == "The default group"
disabled = next(g for g in groups if g["name"] == "Test group")
assert disabled["id"] == 1
assert disabled["enabled"] is False
def test_single_group_lookup(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/groups/Default", timeout=5))
groups = data["groups"]
assert len(groups) == 1, json.dumps(groups, indent=2)
assert groups[0]["name"] == "Default"
assert groups[0]["id"] == 0
# ---------------------------------------------------------------------------
# Stats summary
# ---------------------------------------------------------------------------
class TestStatsSummary:
def test_summary_structure(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/summary", timeout=5), dump="stats_summary")
q = data["queries"]
assert q["total"] == 137, json.dumps(data, indent=2)
assert q["blocked"] == 49
assert q["forwarded"] == 47
assert q["cached"] == 41
assert q["unique_domains"] == 77
assert q["status"]["UNKNOWN"] == 0
assert q["status"]["GRAVITY"] == 7
assert q["status"]["FORWARDED"] == 47
assert q["status"]["CACHE"] == 41
assert q["status"]["REGEX"] == 21
assert q["status"]["DENYLIST"] == 4
assert q["status"]["SPECIAL_DOMAIN"] == 2
assert q["types"]["A"] == 69
assert q["types"]["AAAA"] == 19
assert data["clients"]["active"] == 11
assert data["clients"]["total"] == 11
assert data["gravity"]["domains_being_blocked"] == 8
# ---------------------------------------------------------------------------
# Stats: top domains
# ---------------------------------------------------------------------------
class TestStatsTopDomains:
def test_top_domains_sorted_descending(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_domains", timeout=5), dump="top_domains")
domains = data["domains"]
assert len(domains) > 0
counts = [d["count"] for d in domains]
assert counts == sorted(counts, reverse=True), \
f"Not sorted descending: {counts}"
assert data["total_queries"] == 137
assert data["blocked_queries"] == 49
def test_top_domains_blocked(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_domains?blocked=true", timeout=5))
domains = data["domains"]
names = [d["domain"] for d in domains]
assert "gravity.ftl" in names, json.dumps(domains, indent=2)
counts = [d["count"] for d in domains]
assert counts == sorted(counts, reverse=True)
def test_top_domains_permitted_excludes_gravity(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_domains?blocked=false", timeout=5))
names = [d["domain"] for d in data["domains"]]
assert "gravity.ftl" not in names, \
f"gravity.ftl should not be in permitted domains:\n{json.dumps(data, indent=2)}"
# ---------------------------------------------------------------------------
# Stats: top clients
# ---------------------------------------------------------------------------
class TestStatsTopClients:
def test_top_clients_sorted_descending(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/top_clients", timeout=5), dump="top_clients")
clients = data["clients"]
assert len(clients) > 0
assert clients[0]["ip"] == "127.0.0.1"
counts = [c["count"] for c in clients]
assert counts == sorted(counts, reverse=True), \
f"Not sorted descending: {counts}"
assert data["total_queries"] == 137
# ---------------------------------------------------------------------------
# Stats: upstreams
# ---------------------------------------------------------------------------
class TestStatsUpstreams:
def test_upstreams(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/upstreams", timeout=5), dump="upstreams")
upstreams = data["upstreams"]
assert len(upstreams) == 4, json.dumps(upstreams, indent=2)
assert data["total_queries"] == 137
assert data["forwarded_queries"] == 47
blocklist = next(u for u in upstreams if u["ip"] == "blocklist")
assert blocklist["count"] == 49
assert blocklist["port"] == -1
cache = next(u for u in upstreams if u["ip"] == "cache")
assert cache["count"] == 41
assert cache["port"] == -1
# ---------------------------------------------------------------------------
# Stats: query types
# ---------------------------------------------------------------------------
class TestStatsQueryTypes:
def test_query_types(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/query_types", timeout=5), dump="query_types")
assert data["types"] == {
"A": 69, "AAAA": 19, "ANY": 3, "SRV": 1, "SOA": 0,
"PTR": 8, "TXT": 10, "NAPTR": 1, "MX": 1, "DS": 7,
"RRSIG": 0, "DNSKEY": 9, "NS": 0, "SVCB": 3, "HTTPS": 3,
"OTHER": 1,
}, json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Stats: recent blocked
# ---------------------------------------------------------------------------
class TestStatsRecentBlocked:
def test_recent_blocked(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/stats/recent_blocked", timeout=5))
assert "denied.ftl" in data["blocked"], json.dumps(data, indent=2)
# ---------------------------------------------------------------------------
# Stats: database endpoints (require from/until parameters)
# ---------------------------------------------------------------------------
class TestStatsDatabase:
def test_database_endpoints_require_time_range(self, api_session):
"""Database stats endpoints return 400 without from/until."""
for endpoint in ("query_types", "summary", "top_clients",
"top_domains", "upstreams"):
data = _j(api_session.get(
f"{FTL_URL}/api/stats/database/{endpoint}", timeout=5))
assert data["error"]["key"] == "bad_request", \
f"/api/stats/database/{endpoint}: {json.dumps(data, indent=2)}"
assert "from" in data["error"]["message"]
assert "until" in data["error"]["message"]
# ---------------------------------------------------------------------------
# DHCP leases
# ---------------------------------------------------------------------------
class TestDHCPLeases:
def test_no_leases(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/dhcp/leases", timeout=5))
assert data["leases"] == []
# ---------------------------------------------------------------------------
# Endpoints listing
# ---------------------------------------------------------------------------
class TestEndpoints:
def test_endpoints_has_all_methods(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/endpoints", timeout=5))
eps = data["endpoints"]
for method in ("get", "post", "put", "patch", "delete"):
assert method in eps, f"Missing method '{method}':\n{json.dumps(eps.keys(), indent=2)}"
# GET should have the most endpoints
assert len(eps["get"]) > 20
# ---------------------------------------------------------------------------
# Info endpoints
# ---------------------------------------------------------------------------
class TestInfo:
def test_info_ftl(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/ftl", timeout=5), dump="info_ftl")
ftl = data["ftl"]
db = ftl["database"]
assert db["gravity"] == 8, json.dumps(db, indent=2)
assert db["groups"] == 5
assert db["lists"] == 2
assert db["clients"] == 5
assert db["domains"]["allowed"] == {"total": 3, "enabled": 3}
assert db["domains"]["denied"] == {"total": 2, "enabled": 2}
assert db["regex"]["allowed"] == {"total": 2, "enabled": 2}
assert db["regex"]["denied"] == {"total": 11, "enabled": 11}
assert ftl["privacy_level"] == 0
assert ftl["clients"]["total"] == 11
assert ftl["clients"]["active"] == 11
def test_info_login(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/login", timeout=5))
assert data["dns"] is True
assert data["https_port"] == 443
def test_info_version(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/version", timeout=5))
v = data["version"]
assert "ftl" in v
assert "local" in v["ftl"]
assert v["ftl"]["local"]["version"].startswith("v")
assert "hash" in v["ftl"]["local"]
def test_info_messages(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/messages", timeout=5))
assert "messages" in data
assert isinstance(data["messages"], list)
def test_info_messages_count(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/messages/count", timeout=5))
assert "count" in data
assert isinstance(data["count"], int)
def test_info_client(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/client", timeout=5))
assert data["remote_addr"] == "127.0.0.1"
assert data["http_version"] == "1.1"
assert data["method"] == "GET"
def test_info_database(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/database", timeout=5))
assert data["type"] == "Regular file"
assert data["mode"] == "rw-r-----"
assert data["owner"]["user"]["name"] == "pihole"
assert data["owner"]["group"]["name"] == "pihole"
assert data["queries"] > 0
assert data["sqlite_version"].startswith("3.")
def test_info_system(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/info/system", timeout=5))
s = data["system"]
assert "uptime" in s
assert s["memory"]["ram"]["total"] > 0
assert s["cpu"]["nprocs"] > 0
assert "ftl" in s
assert "%mem" in s["ftl"]
assert "%cpu" in s["ftl"]
# ---------------------------------------------------------------------------
# Network
# ---------------------------------------------------------------------------
class TestNetwork:
def test_network_devices(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/network/devices", timeout=5))
devices = data["devices"]
hwaddrs = [d["hwaddr"] for d in devices]
assert "aa:bb:cc:dd:ee:ff" in hwaddrs, json.dumps(hwaddrs, indent=2)
assert "ip-127.0.0.1" in hwaddrs
def test_network_interfaces(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/network/interfaces", timeout=5))
ifaces = data["interfaces"]
names = [i["name"] for i in ifaces]
assert "lo" in names, json.dumps(names, indent=2)
# ---------------------------------------------------------------------------
# Logs
# ---------------------------------------------------------------------------
class TestLogs:
def test_dnsmasq_log(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/logs/dnsmasq", timeout=5))
assert len(data["log"]) > 0
entry = data["log"][0]
assert "timestamp" in entry
assert "message" in entry
def test_ftl_log(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/logs/ftl", timeout=5))
assert len(data["log"]) > 0
entry = data["log"][0]
assert "timestamp" in entry
assert "message" in entry
assert "prio" in entry
def test_webserver_log(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/logs/webserver", timeout=5))
assert len(data["log"]) > 0
# ---------------------------------------------------------------------------
# PADD
# ---------------------------------------------------------------------------
class TestPADD:
def test_padd(self, api_session):
data = _j(api_session.get(f"{FTL_URL}/api/padd", timeout=5), dump="padd")
assert data["blocking"] == "enabled"
assert data["gravity_size"] == 8
assert data["active_clients"] == 11
assert data["top_domain"] == "."
assert data["top_blocked"] == "gravity.ftl"
assert data["top_client"] == "127.0.0.1"
q = data["queries"]
assert q["total"] == 137, json.dumps(data, indent=2)
assert q["blocked"] == 49
cache = data["cache"]
assert cache["size"] == 10000