1
0
mirror of https://github.com/home-assistant/core.git synced 2026-02-15 07:36:16 +00:00

Add Ghost integration (#162041)

Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
This commit is contained in:
John O'Nolan
2026-02-06 14:47:53 +04:00
committed by GitHub
parent 0456eb54ee
commit 2d308aaa20
22 changed files with 2364 additions and 0 deletions

View File

@@ -221,6 +221,7 @@ homeassistant.components.generic_hygrostat.*
homeassistant.components.generic_thermostat.*
homeassistant.components.geo_location.*
homeassistant.components.geocaching.*
homeassistant.components.ghost.*
homeassistant.components.gios.*
homeassistant.components.github.*
homeassistant.components.glances.*

2
CODEOWNERS generated
View File

@@ -595,6 +595,8 @@ build.json @home-assistant/supervisor
/tests/components/geonetnz_quakes/ @exxamalte
/homeassistant/components/geonetnz_volcano/ @exxamalte
/tests/components/geonetnz_volcano/ @exxamalte
/homeassistant/components/ghost/ @johnonolan
/tests/components/ghost/ @johnonolan
/homeassistant/components/gios/ @bieniu
/tests/components/gios/ @bieniu
/homeassistant/components/github/ @timmo001 @ludeeus

View File

@@ -0,0 +1,52 @@
"""The Ghost integration."""
from __future__ import annotations
from dataclasses import dataclass
import logging
from aioghost import GhostAdminAPI
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import CONF_ADMIN_API_KEY, CONF_API_URL, DOMAIN as DOMAIN
from .coordinator import GhostDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
PLATFORMS: list[Platform] = [Platform.SENSOR]
type GhostConfigEntry = ConfigEntry[GhostRuntimeData]
@dataclass
class GhostRuntimeData:
"""Runtime data for Ghost integration."""
coordinator: GhostDataUpdateCoordinator
api: GhostAdminAPI
async def async_setup_entry(hass: HomeAssistant, entry: GhostConfigEntry) -> bool:
"""Set up Ghost from a config entry."""
api_url = entry.data[CONF_API_URL]
admin_api_key = entry.data[CONF_ADMIN_API_KEY]
api = GhostAdminAPI(api_url, admin_api_key, session=async_get_clientsession(hass))
coordinator = GhostDataUpdateCoordinator(hass, api, entry)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = GhostRuntimeData(coordinator=coordinator, api=api)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: GhostConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -0,0 +1,101 @@
"""Config flow for Ghost integration."""
from __future__ import annotations
import logging
from typing import Any
from aioghost import GhostAdminAPI
from aioghost.exceptions import GhostAuthError, GhostError
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import CONF_ADMIN_API_KEY, CONF_API_URL, DOMAIN
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_API_URL): str,
vol.Required(CONF_ADMIN_API_KEY): str,
}
)
class GhostConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Ghost."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors: dict[str, str] = {}
if user_input is not None:
api_url = user_input[CONF_API_URL].rstrip("/")
admin_api_key = user_input[CONF_ADMIN_API_KEY]
if ":" not in admin_api_key:
errors["base"] = "invalid_api_key"
else:
result = await self._validate_and_create(api_url, admin_api_key, errors)
if result:
return result
return self.async_show_form(
step_id="user",
data_schema=STEP_USER_DATA_SCHEMA,
errors=errors,
description_placeholders={
"docs_url": "https://account.ghost.org/?r=settings/integrations/new"
},
)
async def _validate_credentials(
self, api_url: str, admin_api_key: str
) -> dict[str, Any]:
"""Validate credentials against the Ghost API.
Returns site data on success. Raises GhostAuthError or GhostError on failure.
"""
api = GhostAdminAPI(
api_url, admin_api_key, session=async_get_clientsession(self.hass)
)
return await api.get_site()
async def _validate_and_create(
self,
api_url: str,
admin_api_key: str,
errors: dict[str, str],
) -> ConfigFlowResult | None:
"""Validate credentials and create entry."""
try:
site = await self._validate_credentials(api_url, admin_api_key)
except GhostAuthError:
errors["base"] = "invalid_auth"
return None
except GhostError:
errors["base"] = "cannot_connect"
return None
except Exception:
_LOGGER.exception("Unexpected error during Ghost setup")
errors["base"] = "unknown"
return None
site_title = site["title"]
await self.async_set_unique_id(site["uuid"])
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=site_title,
data={
CONF_API_URL: api_url,
CONF_ADMIN_API_KEY: admin_api_key,
},
)

View File

@@ -0,0 +1,13 @@
"""Constants for the Ghost integration."""
DOMAIN: str = "ghost"
CONF_ADMIN_API_KEY: str = "admin_api_key"
CONF_API_URL: str = "api_url"
DEFAULT_SCAN_INTERVAL: int = 300 # 5 minutes
# Device info.
CURRENCY: str = "USD"
MANUFACTURER: str = "Ghost Foundation"
MODEL: str = "Ghost"

View File

@@ -0,0 +1,104 @@
"""DataUpdateCoordinator for Ghost."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from datetime import timedelta
import logging
from typing import TYPE_CHECKING, Any
from aioghost import GhostAdminAPI
from aioghost.exceptions import GhostAuthError, GhostError
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DEFAULT_SCAN_INTERVAL, DOMAIN
if TYPE_CHECKING:
from . import GhostConfigEntry
_LOGGER = logging.getLogger(__name__)
@dataclass
class GhostData:
"""Data returned by the Ghost coordinator."""
site: dict[str, Any]
posts: dict[str, Any]
members: dict[str, Any]
latest_post: dict[str, Any] | None
latest_email: dict[str, Any] | None
activitypub: dict[str, Any]
mrr: dict[str, Any]
arr: dict[str, Any]
comments: int
newsletters: dict[str, dict[str, Any]]
class GhostDataUpdateCoordinator(DataUpdateCoordinator[GhostData]):
"""Class to manage fetching Ghost data."""
config_entry: GhostConfigEntry
def __init__(
self,
hass: HomeAssistant,
api: GhostAdminAPI,
config_entry: GhostConfigEntry,
) -> None:
"""Initialize the coordinator."""
self.api = api
super().__init__(
hass,
_LOGGER,
name=f"Ghost ({config_entry.title})",
update_interval=timedelta(seconds=DEFAULT_SCAN_INTERVAL),
config_entry=config_entry,
)
async def _async_update_data(self) -> GhostData:
"""Fetch data from Ghost API."""
try:
(site, posts, members, latest_post, latest_email) = await asyncio.gather(
self.api.get_site(),
self.api.get_posts_count(),
self.api.get_members_count(),
self.api.get_latest_post(),
self.api.get_latest_email(),
)
(activitypub, mrr, arr, comments, newsletters) = await asyncio.gather(
self.api.get_activitypub_stats(),
self.api.get_mrr(),
self.api.get_arr(),
self.api.get_comments_count(),
self.api.get_newsletters(),
)
except GhostAuthError as err:
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="invalid_api_key",
) from err
except GhostError as err:
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="api_error",
translation_placeholders={"error": str(err)},
) from err
return GhostData(
site=site,
posts=posts,
members=members,
latest_post=latest_post,
latest_email=latest_email,
activitypub=activitypub,
mrr=mrr,
arr=arr,
comments=comments,
newsletters={n["id"]: n for n in newsletters if "id" in n},
)

View File

@@ -0,0 +1,66 @@
{
"entity": {
"sensor": {
"arr": {
"default": "mdi:cash-multiple"
},
"comped_members": {
"default": "mdi:account-star"
},
"draft_posts": {
"default": "mdi:file-edit-outline"
},
"free_members": {
"default": "mdi:account-outline"
},
"latest_email": {
"default": "mdi:email-newsletter"
},
"latest_email_click_rate": {
"default": "mdi:cursor-default-click-outline"
},
"latest_email_clicked": {
"default": "mdi:cursor-default-click"
},
"latest_email_open_rate": {
"default": "mdi:email-open-outline"
},
"latest_email_opened": {
"default": "mdi:email-open"
},
"latest_email_sent": {
"default": "mdi:send"
},
"latest_post": {
"default": "mdi:newspaper"
},
"mrr": {
"default": "mdi:cash-multiple"
},
"newsletter_subscribers": {
"default": "mdi:email-newsletter"
},
"paid_members": {
"default": "mdi:account-cash"
},
"published_posts": {
"default": "mdi:post"
},
"scheduled_posts": {
"default": "mdi:clock-outline"
},
"socialweb_followers": {
"default": "mdi:account-multiple"
},
"socialweb_following": {
"default": "mdi:account-multiple-outline"
},
"total_comments": {
"default": "mdi:comment-multiple"
},
"total_members": {
"default": "mdi:account-group"
}
}
}
}

View File

@@ -0,0 +1,12 @@
{
"domain": "ghost",
"name": "Ghost",
"codeowners": ["@johnonolan"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/ghost",
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["aioghost"],
"quality_scale": "bronze",
"requirements": ["aioghost==0.4.0"]
}

View File

@@ -0,0 +1,82 @@
rules:
# Bronze
action-setup:
status: exempt
comment: Integration does not provide service actions.
appropriate-polling: done
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: Integration does not provide service actions.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup:
status: exempt
comment: Integration does not subscribe to entity events.
entity-unique-id: done
has-entity-name: done
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: Integration does not provide service actions.
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
comment: Integration has no configuration options beyond initial setup.
docs-installation-parameters: done
entity-unavailable: done
integration-owner: done
log-when-unavailable: done
parallel-updates: done
reauthentication-flow: todo
test-coverage: done
# Gold
devices: done
diagnostics: todo
discovery-update-info:
status: exempt
comment: Cloud service integration, not discoverable.
discovery:
status: exempt
comment: Cloud service integration, not discoverable.
docs-data-update: done
docs-examples: done
docs-known-limitations: done
docs-supported-devices: done
docs-supported-functions: done
docs-troubleshooting: done
docs-use-cases: done
dynamic-devices:
status: done
comment: New newsletters entities are automatically created
entity-category: done
entity-device-class: done
entity-disabled-by-default:
status: exempt
comment: All sensors provide useful data; none are noisy or low-value.
entity-translations: done
exception-translations: done
icon-translations: done
reconfiguration-flow: todo
repair-issues:
status: exempt
comment: No repair scenarios identified for this integration.
stale-devices:
status: todo
comment: Remove newsletter entities when newsletter is removed
# Platinum
async-dependency: done
inject-websession: done
strict-typing: done

View File

@@ -0,0 +1,323 @@
"""Sensor platform for Ghost."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import CURRENCY, DOMAIN, MANUFACTURER, MODEL
from .coordinator import GhostData, GhostDataUpdateCoordinator
if TYPE_CHECKING:
from . import GhostConfigEntry
# Coordinator handles batching, no limit needed.
PARALLEL_UPDATES = 0
def _get_currency_value(currency_data: dict[str, Any]) -> int | None:
"""Extract the first currency value from a currency dict."""
if not currency_data:
return None
first_value = next(iter(currency_data.values()), None)
if first_value is None:
return None
return int(first_value)
@dataclass(frozen=True, kw_only=True)
class GhostSensorEntityDescription(SensorEntityDescription):
"""Describes a Ghost sensor entity."""
value_fn: Callable[[GhostData], str | int | None]
SENSORS: tuple[GhostSensorEntityDescription, ...] = (
# Core member metrics
GhostSensorEntityDescription(
key="total_members",
translation_key="total_members",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: data.members.get("total", 0),
),
GhostSensorEntityDescription(
key="paid_members",
translation_key="paid_members",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: data.members.get("paid", 0),
),
GhostSensorEntityDescription(
key="free_members",
translation_key="free_members",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: data.members.get("free", 0),
),
GhostSensorEntityDescription(
key="comped_members",
translation_key="comped_members",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: data.members.get("comped", 0),
),
# Post metrics
GhostSensorEntityDescription(
key="published_posts",
translation_key="published_posts",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: data.posts.get("published", 0),
),
GhostSensorEntityDescription(
key="draft_posts",
translation_key="draft_posts",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: data.posts.get("drafts", 0),
),
GhostSensorEntityDescription(
key="scheduled_posts",
translation_key="scheduled_posts",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: data.posts.get("scheduled", 0),
),
GhostSensorEntityDescription(
key="latest_post",
translation_key="latest_post",
value_fn=lambda data: (
data.latest_post.get("title") if data.latest_post else None
),
),
# Email metrics
GhostSensorEntityDescription(
key="latest_email",
translation_key="latest_email",
value_fn=lambda data: (
data.latest_email.get("title") if data.latest_email else None
),
),
GhostSensorEntityDescription(
key="latest_email_sent",
translation_key="latest_email_sent",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: (
data.latest_email.get("email_count") if data.latest_email else None
),
),
GhostSensorEntityDescription(
key="latest_email_opened",
translation_key="latest_email_opened",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: (
data.latest_email.get("opened_count") if data.latest_email else None
),
),
GhostSensorEntityDescription(
key="latest_email_open_rate",
translation_key="latest_email_open_rate",
native_unit_of_measurement="%",
value_fn=lambda data: (
data.latest_email.get("open_rate") if data.latest_email else None
),
),
GhostSensorEntityDescription(
key="latest_email_clicked",
translation_key="latest_email_clicked",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: (
data.latest_email.get("clicked_count") if data.latest_email else None
),
),
GhostSensorEntityDescription(
key="latest_email_click_rate",
translation_key="latest_email_click_rate",
native_unit_of_measurement="%",
value_fn=lambda data: (
data.latest_email.get("click_rate") if data.latest_email else None
),
),
# Social/ActivityPub metrics
GhostSensorEntityDescription(
key="socialweb_followers",
translation_key="socialweb_followers",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: data.activitypub.get("followers", 0),
),
GhostSensorEntityDescription(
key="socialweb_following",
translation_key="socialweb_following",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: data.activitypub.get("following", 0),
),
# Engagement metrics
GhostSensorEntityDescription(
key="total_comments",
translation_key="total_comments",
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: data.comments,
),
)
REVENUE_SENSORS: tuple[GhostSensorEntityDescription, ...] = (
GhostSensorEntityDescription(
key="mrr",
translation_key="mrr",
device_class=SensorDeviceClass.MONETARY,
state_class=SensorStateClass.TOTAL,
native_unit_of_measurement=CURRENCY,
suggested_display_precision=0,
value_fn=lambda data: _get_currency_value(data.mrr),
),
GhostSensorEntityDescription(
key="arr",
translation_key="arr",
device_class=SensorDeviceClass.MONETARY,
state_class=SensorStateClass.TOTAL,
native_unit_of_measurement=CURRENCY,
suggested_display_precision=0,
value_fn=lambda data: _get_currency_value(data.arr),
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: GhostConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Ghost sensors based on a config entry."""
coordinator = entry.runtime_data.coordinator
entities: list[GhostSensorEntity | GhostNewsletterSensorEntity] = [
GhostSensorEntity(coordinator, description, entry) for description in SENSORS
]
# Add revenue sensors only when Stripe is linked.
if coordinator.data.mrr:
entities.extend(
GhostSensorEntity(coordinator, description, entry)
for description in REVENUE_SENSORS
)
async_add_entities(entities)
newsletter_added: set[str] = set()
@callback
def _async_add_newsletter_entities() -> None:
"""Add newsletter entities when new newsletters appear."""
nonlocal newsletter_added
new_newsletters = {
newsletter_id
for newsletter_id, newsletter in coordinator.data.newsletters.items()
if newsletter.get("status") == "active"
} - newsletter_added
if not new_newsletters:
return
async_add_entities(
GhostNewsletterSensorEntity(
coordinator,
entry,
newsletter_id,
coordinator.data.newsletters[newsletter_id].get("name", "Newsletter"),
)
for newsletter_id in new_newsletters
)
newsletter_added |= new_newsletters
_async_add_newsletter_entities()
entry.async_on_unload(
coordinator.async_add_listener(_async_add_newsletter_entities)
)
class GhostSensorEntity(CoordinatorEntity[GhostDataUpdateCoordinator], SensorEntity):
"""Representation of a Ghost sensor."""
entity_description: GhostSensorEntityDescription
_attr_has_entity_name = True
def __init__(
self,
coordinator: GhostDataUpdateCoordinator,
description: GhostSensorEntityDescription,
entry: GhostConfigEntry,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self.entity_description = description
self._attr_unique_id = f"{entry.unique_id}_{description.key}"
self._attr_device_info = DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, entry.entry_id)},
manufacturer=MANUFACTURER,
model=MODEL,
configuration_url=coordinator.api.api_url,
)
@property
def native_value(self) -> str | int | None:
"""Return the state of the sensor."""
return self.entity_description.value_fn(self.coordinator.data)
class GhostNewsletterSensorEntity(
CoordinatorEntity[GhostDataUpdateCoordinator], SensorEntity
):
"""Representation of a Ghost newsletter subscriber sensor."""
_attr_has_entity_name = True
_attr_translation_key = "newsletter_subscribers"
_attr_state_class = SensorStateClass.TOTAL
def __init__(
self,
coordinator: GhostDataUpdateCoordinator,
entry: GhostConfigEntry,
newsletter_id: str,
newsletter_name: str,
) -> None:
"""Initialize the newsletter sensor."""
super().__init__(coordinator)
self._newsletter_id = newsletter_id
self._newsletter_name = newsletter_name
self._attr_unique_id = f"{entry.unique_id}_newsletter_{newsletter_id}"
self._attr_translation_placeholders = {"newsletter_name": newsletter_name}
self._attr_device_info = DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, entry.entry_id)},
manufacturer=MANUFACTURER,
model=MODEL,
configuration_url=coordinator.api.api_url,
)
def _get_newsletter_by_id(self) -> dict[str, Any] | None:
"""Get newsletter data by ID."""
return self.coordinator.data.newsletters.get(self._newsletter_id)
@property
def available(self) -> bool:
"""Return True if the entity is available."""
if not super().available or self.coordinator.data is None:
return False
return self._newsletter_id in self.coordinator.data.newsletters
@property
def native_value(self) -> int | None:
"""Return the subscriber count for this newsletter."""
if newsletter := self._get_newsletter_by_id():
count: int = newsletter.get("count", {}).get("members", 0)
return count
return None

View File

@@ -0,0 +1,105 @@
{
"config": {
"abort": {
"already_configured": "This Ghost site is already configured."
},
"error": {
"cannot_connect": "Failed to connect to Ghost. Please check your URL.",
"invalid_api_key": "Invalid API key. Please check your credentials.",
"invalid_auth": "[%key:component::ghost::config::error::invalid_api_key%]",
"unknown": "An unexpected error occurred."
},
"step": {
"user": {
"data": {
"admin_api_key": "Admin API key",
"api_url": "API URL"
},
"data_description": {
"admin_api_key": "The Admin API key for your Ghost integration",
"api_url": "The API URL for your Ghost integration"
},
"description": "[Create a custom integration]({docs_url}) to get your API URL and Admin API key.",
"title": "Connect to Ghost"
}
}
},
"entity": {
"sensor": {
"arr": {
"name": "ARR"
},
"comped_members": {
"name": "Comped members"
},
"draft_posts": {
"name": "Draft posts"
},
"free_members": {
"name": "Free members"
},
"latest_email": {
"name": "Latest email"
},
"latest_email_click_rate": {
"name": "Latest email click rate"
},
"latest_email_clicked": {
"name": "Latest email clicked"
},
"latest_email_open_rate": {
"name": "Latest email open rate"
},
"latest_email_opened": {
"name": "Latest email opened"
},
"latest_email_sent": {
"name": "Latest email sent"
},
"latest_post": {
"name": "Latest post"
},
"mrr": {
"name": "MRR"
},
"newsletter_subscribers": {
"name": "{newsletter_name} subscribers"
},
"paid_members": {
"name": "Paid members"
},
"published_posts": {
"name": "Published posts"
},
"scheduled_posts": {
"name": "Scheduled posts"
},
"socialweb_followers": {
"name": "SocialWeb followers",
"unit_of_measurement": "followers"
},
"socialweb_following": {
"name": "SocialWeb following",
"unit_of_measurement": "accounts"
},
"total_comments": {
"name": "Total comments",
"unit_of_measurement": "comments"
},
"total_members": {
"name": "Total members"
}
}
},
"exceptions": {
"api_error": {
"message": "Error communicating with Ghost API: {error}"
},
"cannot_connect": {
"message": "Failed to connect to Ghost: {error}"
},
"invalid_api_key": {
"message": "Invalid API key"
}
}
}

View File

@@ -250,6 +250,7 @@ FLOWS = {
"geofency",
"geonetnz_quakes",
"geonetnz_volcano",
"ghost",
"gios",
"github",
"glances",

View File

@@ -2370,6 +2370,12 @@
}
}
},
"ghost": {
"name": "Ghost",
"integration_type": "service",
"config_flow": true,
"iot_class": "cloud_polling"
},
"gios": {
"name": "GIO\u015a",
"integration_type": "service",

10
mypy.ini generated
View File

@@ -1966,6 +1966,16 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.ghost.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.gios.*]
check_untyped_defs = true
disallow_incomplete_defs = true

3
requirements_all.txt generated
View File

@@ -265,6 +265,9 @@ aioflo==2021.11.0
# homeassistant.components.yi
aioftp==0.21.3
# homeassistant.components.ghost
aioghost==0.4.0
# homeassistant.components.github
aiogithubapi==24.6.0

View File

@@ -253,6 +253,9 @@ aiofiles==24.1.0
# homeassistant.components.flo
aioflo==2021.11.0
# homeassistant.components.ghost
aioghost==0.4.0
# homeassistant.components.github
aiogithubapi==24.6.0

View File

@@ -0,0 +1,13 @@
"""Tests for the Ghost integration."""
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
async def setup_integration(hass: HomeAssistant, config_entry: MockConfigEntry) -> None:
"""Set up the Ghost integration for tests."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()

View File

@@ -0,0 +1,109 @@
"""Fixtures for Ghost integration tests."""
from collections.abc import Generator
from unittest.mock import AsyncMock, patch
import pytest
from homeassistant.components.ghost.const import (
CONF_ADMIN_API_KEY,
CONF_API_URL,
DOMAIN,
)
from tests.common import MockConfigEntry
API_URL = "https://test.ghost.io"
API_KEY = "650b7a9f8e8c1234567890ab:1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
SITE_UUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
SITE_DATA = {"title": "Test Ghost", "url": API_URL, "uuid": SITE_UUID}
POSTS_DATA = {"published": 42, "drafts": 5, "scheduled": 2}
MEMBERS_DATA = {"total": 1000, "paid": 100, "free": 850, "comped": 50}
LATEST_POST_DATA = {
"title": "Latest Post",
"slug": "latest-post",
"url": f"{API_URL}/latest-post/",
"published_at": "2026-01-15T10:00:00Z",
}
LATEST_EMAIL_DATA = {
"title": "Newsletter #1",
"subject": "Newsletter #1",
"email_count": 500,
"delivered_count": 490,
"opened_count": 200,
"clicked_count": 50,
"failed_count": 10,
"open_rate": 40,
"click_rate": 10,
"submitted_at": "2026-01-15T10:00:00Z",
}
ACTIVITYPUB_DATA = {"followers": 150, "following": 25}
MRR_DATA = {"usd": 5000}
ARR_DATA = {"usd": 60000}
COMMENTS_COUNT = 156
NEWSLETTERS_DATA = [
{
"id": "nl1",
"name": "Weekly",
"status": "active",
"count": {"members": 800},
},
{
"id": "nl2",
"name": "Archive",
"status": "archived",
"count": {"members": 200},
},
]
@pytest.fixture
def mock_config_entry() -> MockConfigEntry:
"""Return a mock config entry."""
return MockConfigEntry(
domain=DOMAIN,
title="Test Ghost",
data={
CONF_API_URL: API_URL,
CONF_ADMIN_API_KEY: API_KEY,
},
unique_id=SITE_UUID,
)
@pytest.fixture
def mock_ghost_api() -> Generator[AsyncMock]:
"""Mock the GhostAdminAPI."""
with (
patch(
"homeassistant.components.ghost.GhostAdminAPI", autospec=True
) as mock_api_class,
patch(
"homeassistant.components.ghost.config_flow.GhostAdminAPI",
new=mock_api_class,
),
):
mock_api = mock_api_class.return_value
mock_api.api_url = API_URL
mock_api.get_site.return_value = SITE_DATA
mock_api.get_posts_count.return_value = POSTS_DATA
mock_api.get_members_count.return_value = MEMBERS_DATA
mock_api.get_latest_post.return_value = LATEST_POST_DATA
mock_api.get_latest_email.return_value = LATEST_EMAIL_DATA
mock_api.get_activitypub_stats.return_value = ACTIVITYPUB_DATA
mock_api.get_mrr.return_value = MRR_DATA
mock_api.get_arr.return_value = ARR_DATA
mock_api.get_comments_count.return_value = COMMENTS_COUNT
mock_api.get_newsletters.return_value = NEWSLETTERS_DATA
mock_api.close.return_value = None
yield mock_api
@pytest.fixture
def mock_setup_entry() -> Generator[AsyncMock]:
"""Mock setup entry."""
with patch(
"homeassistant.components.ghost.async_setup_entry",
return_value=True,
) as mock_setup:
yield mock_setup

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,140 @@
"""Tests for Ghost config flow."""
from unittest.mock import AsyncMock
from aioghost.exceptions import GhostAuthError, GhostConnectionError
import pytest
from homeassistant.components.ghost.const import (
CONF_ADMIN_API_KEY,
CONF_API_URL,
DOMAIN,
)
from homeassistant.config_entries import SOURCE_USER
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from .conftest import API_KEY, API_URL, SITE_UUID
@pytest.mark.usefixtures("mock_setup_entry")
async def test_form_user(hass: HomeAssistant, mock_ghost_api: AsyncMock) -> None:
"""Test the user config flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_API_URL: API_URL,
CONF_ADMIN_API_KEY: API_KEY,
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Test Ghost"
assert result["result"].unique_id == SITE_UUID
assert result["data"] == {
CONF_API_URL: API_URL,
CONF_ADMIN_API_KEY: API_KEY,
}
async def test_form_invalid_api_key_format(hass: HomeAssistant) -> None:
"""Test error on invalid API key format."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_API_URL: API_URL,
CONF_ADMIN_API_KEY: "invalid-no-colon",
},
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": "invalid_api_key"}
@pytest.mark.usefixtures("mock_setup_entry")
async def test_form_already_configured(
hass: HomeAssistant, mock_config_entry, mock_ghost_api: AsyncMock
) -> None:
"""Test error when already configured."""
# Add existing entry to hass
mock_config_entry.add_to_hass(hass)
# Try to configure a second entry with same URL
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_API_URL: API_URL,
CONF_ADMIN_API_KEY: API_KEY,
},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
@pytest.mark.parametrize(
("side_effect", "error_key"),
[
(GhostAuthError("Invalid API key"), "invalid_auth"),
(GhostConnectionError("Connection failed"), "cannot_connect"),
(RuntimeError("Unexpected"), "unknown"),
],
)
@pytest.mark.usefixtures("mock_setup_entry")
async def test_form_errors_can_recover(
hass: HomeAssistant,
mock_ghost_api: AsyncMock,
side_effect: Exception,
error_key: str,
) -> None:
"""Test errors and recovery during setup."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
mock_ghost_api.get_site.side_effect = side_effect
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_API_URL: API_URL,
CONF_ADMIN_API_KEY: API_KEY,
},
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": error_key}
mock_ghost_api.get_site.side_effect = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_API_URL: API_URL,
CONF_ADMIN_API_KEY: API_KEY,
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Test Ghost"
assert result["result"].unique_id == SITE_UUID
assert result["data"] == {
CONF_API_URL: API_URL,
CONF_ADMIN_API_KEY: API_KEY,
}

View File

@@ -0,0 +1,47 @@
"""Tests for Ghost integration setup."""
from unittest.mock import AsyncMock
from aioghost.exceptions import GhostAuthError, GhostConnectionError
import pytest
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from . import setup_integration
@pytest.mark.parametrize(
("side_effect", "expected_state"),
[
(GhostAuthError("Invalid API key"), ConfigEntryState.SETUP_ERROR),
(GhostConnectionError("Connection failed"), ConfigEntryState.SETUP_RETRY),
],
)
async def test_setup_entry_errors(
hass: HomeAssistant,
mock_ghost_api: AsyncMock,
mock_config_entry,
side_effect: Exception,
expected_state: ConfigEntryState,
) -> None:
"""Test setup errors."""
mock_ghost_api.get_site.side_effect = side_effect
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is expected_state
async def test_unload_entry(
hass: HomeAssistant, mock_ghost_api: AsyncMock, mock_config_entry
) -> None:
"""Test unloading config entry."""
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is ConfigEntryState.LOADED
await hass.config_entries.async_unload(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert mock_config_entry.state is ConfigEntryState.NOT_LOADED

View File

@@ -0,0 +1,127 @@
"""Tests for Ghost sensors."""
from datetime import timedelta
from unittest.mock import AsyncMock
from aioghost.exceptions import GhostError
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from . import setup_integration
from .conftest import NEWSLETTERS_DATA
from tests.common import MockConfigEntry, async_fire_time_changed, snapshot_platform
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
async def test_sensor_entities(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
entity_registry: er.EntityRegistry,
mock_ghost_api: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Snapshot all Ghost sensor entities."""
await setup_integration(hass, mock_config_entry)
await snapshot_platform(hass, entity_registry, snapshot, mock_config_entry.entry_id)
async def test_newsletter_sensor_added_on_update(
hass: HomeAssistant,
mock_ghost_api: AsyncMock,
mock_config_entry: MockConfigEntry,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test newsletter sensors are added after updates."""
await setup_integration(hass, mock_config_entry)
assert hass.states.get("sensor.test_ghost_monthly_subscribers") is None
mock_ghost_api.get_newsletters.return_value = [
*NEWSLETTERS_DATA,
{
"id": "nl3",
"name": "Monthly",
"status": "active",
"count": {"members": 300},
},
]
freezer.tick(timedelta(minutes=5))
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)
state = hass.states.get("sensor.test_ghost_monthly_subscribers")
assert state is not None
assert state.state == "300"
async def test_revenue_sensors_not_created_without_stripe(
hass: HomeAssistant, mock_ghost_api: AsyncMock, mock_config_entry
) -> None:
"""Test MRR/ARR sensors are not created when Stripe is not linked."""
# Return empty MRR/ARR data (no Stripe linked)
mock_ghost_api.get_mrr.return_value = {}
mock_ghost_api.get_arr.return_value = {}
await setup_integration(hass, mock_config_entry)
assert hass.states.get("sensor.test_ghost_mrr") is None
assert hass.states.get("sensor.test_ghost_arr") is None
async def test_newsletter_sensor_not_found(
hass: HomeAssistant,
mock_ghost_api: AsyncMock,
mock_config_entry: MockConfigEntry,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test newsletter sensor when newsletter is removed."""
await setup_integration(hass, mock_config_entry)
# Verify newsletter sensor exists
state = hass.states.get("sensor.test_ghost_weekly_subscribers")
assert state is not None
assert state.state == "800"
# Now return empty newsletters list
mock_ghost_api.get_newsletters.return_value = []
freezer.tick(timedelta(minutes=5))
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)
# Sensor should now be unavailable (newsletter not found)
state = hass.states.get("sensor.test_ghost_weekly_subscribers")
assert state is not None
assert state.state == STATE_UNAVAILABLE
async def test_entities_unavailable_on_update_failure(
hass: HomeAssistant,
mock_ghost_api: AsyncMock,
mock_config_entry: MockConfigEntry,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test entities become unavailable on update failure."""
await setup_integration(hass, mock_config_entry)
state = hass.states.get("sensor.test_ghost_total_members")
assert state is not None
assert state.state == "1000"
mock_ghost_api.get_site.side_effect = GhostError("Update failed")
freezer.tick(timedelta(minutes=5))
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)
state = hass.states.get("sensor.test_ghost_total_members")
assert state is not None
assert state.state == STATE_UNAVAILABLE