"""
Microsoft Teams platform adapter for Hermes Agent.

Uses the microsoft-teams-apps SDK for authentication and activity processing.
Runs an aiohttp webhook server to receive messages from Teams.
Proactive messaging (send, typing) uses the SDK's App.send() method.

Requires:
    pip install microsoft-teams-apps aiohttp
    TEAMS_CLIENT_ID, TEAMS_CLIENT_SECRET, and TEAMS_TENANT_ID env vars

Configuration in config.yaml:
    platforms:
      teams:
        enabled: true
        extra:
          client_id: "your-client-id"      # or TEAMS_CLIENT_ID env var
          client_secret: "your-secret"      # or TEAMS_CLIENT_SECRET env var
          tenant_id: "your-tenant-id"       # or TEAMS_TENANT_ID env var
          port: 3978                        # or TEAMS_PORT env var
"""

from __future__ import annotations

import asyncio
import html
import json
import logging
import os
from typing import Any, Dict, Optional
from urllib.parse import quote

# httpx is imported lazily — only the ``_write_summary_via_incoming_webhook``
# code path actually constructs an ``AsyncClient``. Top-level import here
# pulled in the entire httpx + httpcore stack (~37 ms, ~15 MB) on every
# process that triggered plugin discovery, even ones that never instantiate
# the Teams adapter. ``from __future__ import annotations`` above keeps the
# ``httpx.AsyncBaseTransport`` parameter annotation valid as a string at
# runtime; nothing in the codebase calls ``typing.get_type_hints()`` on
# this class so the annotation never has to resolve to a real symbol.

try:
    from aiohttp import web

    AIOHTTP_AVAILABLE = True
except ImportError:
    AIOHTTP_AVAILABLE = False
    web = None  # type: ignore[assignment]

try:
    from microsoft_teams.apps import App, ActivityContext
    from microsoft_teams.common.http.client import ClientOptions
    from microsoft_teams.api import MessageActivity, ConversationReference
    from microsoft_teams.api.activities.typing import TypingActivityInput
    from microsoft_teams.api.activities.invoke.adaptive_card import AdaptiveCardInvokeActivity
    from microsoft_teams.api.models.adaptive_card import (
        AdaptiveCardActionCardResponse,
        AdaptiveCardActionMessageResponse,
    )
    from microsoft_teams.api.models.invoke_response import InvokeResponse, AdaptiveCardInvokeResponse
    from microsoft_teams.apps.http.adapter import (
        HttpMethod,
        HttpRequest,
        HttpResponse,
        HttpRouteHandler,
    )
    from microsoft_teams.cards import AdaptiveCard, ExecuteAction, TextBlock

    TEAMS_SDK_AVAILABLE = True
except ImportError:
    TEAMS_SDK_AVAILABLE = False
    ClientOptions = None  # type: ignore[assignment,misc]
    App = None  # type: ignore[assignment,misc]
    ActivityContext = None  # type: ignore[assignment,misc]
    MessageActivity = None  # type: ignore[assignment,misc]
    ConversationReference = None  # type: ignore[assignment,misc]
    TypingActivityInput = None  # type: ignore[assignment,misc]
    AdaptiveCardInvokeActivity = None  # type: ignore[assignment,misc]
    AdaptiveCardActionCardResponse = None  # type: ignore[assignment,misc]
    AdaptiveCardActionMessageResponse = None  # type: ignore[assignment,misc]
    AdaptiveCardInvokeResponse = None  # type: ignore[assignment,misc,union-attr]
    InvokeResponse = None  # type: ignore[assignment,misc]
    HttpMethod = str  # type: ignore[assignment,misc]
    HttpRequest = None  # type: ignore[assignment,misc]
    HttpResponse = None  # type: ignore[assignment,misc]
    HttpRouteHandler = None  # type: ignore[assignment,misc]
    AdaptiveCard = None  # type: ignore[assignment,misc]
    ExecuteAction = None  # type: ignore[assignment,misc]
    TextBlock = None  # type: ignore[assignment,misc]

from gateway.config import Platform, PlatformConfig
from gateway.platforms.helpers import MessageDeduplicator
from gateway.platforms.base import (
    BasePlatformAdapter,
    MessageEvent,
    MessageType,
    SendResult,
    cache_image_from_url,
)

logger = logging.getLogger(__name__)

_DEFAULT_PORT = 3978
_WEBHOOK_PATH = "/api/messages"


def _parse_bool(value: Any, *, default: bool = False) -> bool:
    if isinstance(value, bool):
        return value
    if isinstance(value, str):
        normalized = value.strip().lower()
        if normalized in {"1", "true", "yes", "on"}:
            return True
        if normalized in {"0", "false", "no", "off"}:
            return False
    return default


def _coerce_port(value: Any, *, default: int = _DEFAULT_PORT) -> int:
    try:
        return int(value)
    except (TypeError, ValueError):
        return default


class _StaticAccessTokenProvider:
    """Minimal token-provider shim so outbound Graph delivery can reuse the shared client."""

    def __init__(self, access_token: str):
        self._access_token = str(access_token or "").strip()

    async def get_access_token(self, *, force_refresh: bool = False) -> str:
        del force_refresh
        if not self._access_token:
            raise ValueError("TEAMS_GRAPH_ACCESS_TOKEN is required for graph delivery mode.")
        return self._access_token

    def clear_cache(self) -> None:
        return None


class TeamsSummaryWriter:
    """Pipeline-facing Teams outbound delivery surface.

    This stays inside the existing Teams platform plugin so the meeting-pipeline
    PR can reuse one Teams integration surface instead of introducing a second
    adapter elsewhere in the gateway core.
    """

    def __init__(
        self,
        platform_config: PlatformConfig | None = None,
        *,
        graph_client: Any | None = None,
        transport: httpx.AsyncBaseTransport | None = None,
    ) -> None:
        self._platform_config = platform_config
        self._graph_client = graph_client
        self._transport = transport

    async def write_summary(
        self,
        payload: Any,
        config: dict[str, Any] | None,
        existing_record: Optional[dict[str, Any]] = None,
    ) -> dict[str, Any]:
        merged = self._resolve_delivery_config(config)
        if existing_record and not _parse_bool(merged.get("force_resend"), default=False):
            return dict(existing_record)

        mode = str(merged.get("delivery_mode") or merged.get("mode") or "").strip().lower()
        if not mode:
            if merged.get("incoming_webhook_url"):
                mode = "incoming_webhook"
            elif merged.get("chat_id") or (
                merged.get("team_id") and merged.get("channel_id")
            ):
                mode = "graph"
        if mode == "incoming_webhook":
            return await self._write_summary_via_incoming_webhook(payload, merged)
        if mode == "graph":
            return await self._write_summary_via_graph(payload, merged)
        raise ValueError(
            "Teams delivery_mode must be 'incoming_webhook' or 'graph'."
        )

    def _resolve_delivery_config(self, config: dict[str, Any] | None) -> dict[str, Any]:
        merged: dict[str, Any] = {}
        platform_cfg = self._platform_config
        if platform_cfg is not None:
            merged.update(dict(platform_cfg.extra or {}))
            if platform_cfg.token and "access_token" not in merged:
                merged["access_token"] = platform_cfg.token
            if platform_cfg.home_channel:
                merged.setdefault("channel_id", platform_cfg.home_channel.chat_id)
        merged.update(dict(config or {}))

        env_defaults = {
            "delivery_mode": os.getenv("TEAMS_DELIVERY_MODE", ""),
            "incoming_webhook_url": os.getenv("TEAMS_INCOMING_WEBHOOK_URL", ""),
            "access_token": os.getenv("TEAMS_GRAPH_ACCESS_TOKEN", ""),
            "team_id": os.getenv("TEAMS_TEAM_ID", ""),
            "channel_id": os.getenv("TEAMS_CHANNEL_ID", ""),
            "chat_id": os.getenv("TEAMS_CHAT_ID", ""),
        }
        for key, value in env_defaults.items():
            if value and not merged.get(key):
                merged[key] = value
        return merged

    async def _write_summary_via_incoming_webhook(
        self,
        payload: Any,
        config: dict[str, Any],
    ) -> dict[str, Any]:
        # Lazy import — see module-level note. The teams plugin loads on
        # every CLI invocation as a side effect of plugin discovery, but
        # 99% of those processes never reach this method.
        import httpx
        webhook_url = str(config.get("incoming_webhook_url") or "").strip()
        if not webhook_url:
            raise ValueError("TEAMS_INCOMING_WEBHOOK_URL is required for incoming_webhook mode.")
        body = {"text": self._render_summary_markdown(payload)}
        async with httpx.AsyncClient(timeout=20.0, transport=self._transport) as client:
            response = await client.post(webhook_url, json=body)
            response.raise_for_status()
        return {
            "delivery_mode": "incoming_webhook",
            "webhook_url": webhook_url,
            "status_code": response.status_code,
            "delivered": True,
        }

    async def _write_summary_via_graph(
        self,
        payload: Any,
        config: dict[str, Any],
    ) -> dict[str, Any]:
        graph_client = self._build_graph_client(config)
        chat_id = str(config.get("chat_id") or "").strip()
        if chat_id:
            path = f"/chats/{quote(chat_id, safe='')}/messages"
            response = await graph_client.post_json(
                path,
                json_body={"body": {"contentType": "html", "content": self._render_summary_html(payload)}},
            )
            return {
                "delivery_mode": "graph",
                "target_type": "chat",
                "chat_id": chat_id,
                "message_id": (response or {}).get("id"),
                "web_url": (response or {}).get("webUrl"),
            }

        team_id = str(config.get("team_id") or "").strip()
        channel_id = str(config.get("channel_id") or "").strip()
        if not team_id or not channel_id:
            raise ValueError(
                "Graph delivery mode requires chat_id, or both team_id and channel_id."
            )
        path = (
            f"/teams/{quote(team_id, safe='')}/channels/"
            f"{quote(channel_id, safe='')}/messages"
        )
        response = await graph_client.post_json(
            path,
            json_body={"body": {"contentType": "html", "content": self._render_summary_html(payload)}},
        )
        return {
            "delivery_mode": "graph",
            "target_type": "channel",
            "team_id": team_id,
            "channel_id": channel_id,
            "message_id": (response or {}).get("id"),
            "web_url": (response or {}).get("webUrl"),
        }

    def _build_graph_client(self, config: dict[str, Any]) -> Any:
        if self._graph_client is not None:
            return self._graph_client

        from tools.microsoft_graph_auth import MicrosoftGraphTokenProvider
        from tools.microsoft_graph_client import MicrosoftGraphClient

        access_token = str(config.get("access_token") or "").strip()
        if access_token:
            return MicrosoftGraphClient(
                _StaticAccessTokenProvider(access_token),
                transport=self._transport,
            )
        return MicrosoftGraphClient(
            MicrosoftGraphTokenProvider.from_env(),
            transport=self._transport,
        )

    def _render_summary_markdown(self, payload: Any) -> str:
        lines = [
            f"**{self._title(payload)}**",
            "",
            f"Summary: {self._text(getattr(payload, 'summary', None), 'No summary available.')}",
            "",
            "Key decisions:",
            *self._bullet_lines(getattr(payload, "key_decisions", None)),
            "",
            "Action items:",
            *self._bullet_lines(getattr(payload, "action_items", None)),
            "",
            "Risks:",
            *self._bullet_lines(getattr(payload, "risks", None)),
        ]
        return "\n".join(lines)

    def _render_summary_html(self, payload: Any) -> str:
        sections = [
            ("Summary", [self._text(getattr(payload, "summary", None), "No summary available.")]),
            ("Key decisions", list(getattr(payload, "key_decisions", None) or [])),
            ("Action items", list(getattr(payload, "action_items", None) or [])),
            ("Risks", list(getattr(payload, "risks", None) or [])),
        ]
        blocks = [f"<h2>{html.escape(self._title(payload))}</h2>"]
        for heading, items in sections:
            blocks.append(f"<h3>{html.escape(heading)}</h3>")
            if len(items) == 1 and heading == "Summary":
                blocks.append(f"<p>{html.escape(str(items[0]))}</p>")
                continue
            if items:
                rendered = "".join(f"<li>{html.escape(str(item))}</li>" for item in items if str(item).strip())
                blocks.append(rendered and f"<ul>{rendered}</ul>" or "<p>None</p>")
            else:
                blocks.append("<p>None</p>")
        return "".join(blocks)

    @staticmethod
    def _title(payload: Any) -> str:
        title = getattr(payload, "title", None)
        if title:
            return str(title)
        meeting_ref = getattr(payload, "meeting_ref", None)
        meeting_id = getattr(meeting_ref, "meeting_id", None) if meeting_ref else None
        return f"Meeting {meeting_id or 'summary'}"

    @staticmethod
    def _text(value: Any, default: str) -> str:
        text = str(value or "").strip()
        return text or default

    @classmethod
    def _bullet_lines(cls, values: Any) -> list[str]:
        items = [str(item).strip() for item in (values or []) if str(item).strip()]
        return [f"- {item}" for item in items] or ["- None"]


class _AiohttpBridgeAdapter:
    """HttpServerAdapter that bridges the Teams SDK into an aiohttp server.

    Without a custom adapter, ``App()`` unconditionally imports fastapi/uvicorn
    and allocates a ``FastAPI()`` instance.  This bridge captures the SDK's
    route registrations and wires them into our own aiohttp ``Application``.
    """

    def __init__(self, aiohttp_app: "web.Application"):
        self._aiohttp_app = aiohttp_app

    def register_route(self, method: "HttpMethod", path: str, handler: "HttpRouteHandler") -> None:
        """Register an SDK route handler as an aiohttp route."""

        async def _aiohttp_handler(request: "web.Request") -> "web.Response":
            body = await request.json()
            headers = dict(request.headers)
            result: "HttpResponse" = await handler(HttpRequest(body=body, headers=headers))
            status = result.get("status", 200)
            resp_body = result.get("body")
            if resp_body is not None:
                return web.Response(
                    status=status,
                    body=json.dumps(resp_body),
                    content_type="application/json",
                )
            return web.Response(status=status)

        self._aiohttp_app.router.add_route(method, path, _aiohttp_handler)

    def serve_static(self, path: str, directory: str) -> None:
        pass

    async def start(self, port: int) -> None:
        raise NotImplementedError("aiohttp server is managed by the adapter")

    async def stop(self) -> None:
        pass


def check_requirements() -> bool:
    """Return True when all Teams dependencies and credentials are present."""
    return TEAMS_SDK_AVAILABLE and AIOHTTP_AVAILABLE


def validate_config(config) -> bool:
    """Return True when the config has the minimum required credentials."""
    extra = getattr(config, "extra", {}) or {}
    client_id = os.getenv("TEAMS_CLIENT_ID") or extra.get("client_id", "")
    client_secret = os.getenv("TEAMS_CLIENT_SECRET") or extra.get("client_secret", "")
    tenant_id = os.getenv("TEAMS_TENANT_ID") or extra.get("tenant_id", "")
    return bool(client_id and client_secret and tenant_id)


def is_connected(config) -> bool:
    """Check whether Teams is configured (env or config.yaml)."""
    return validate_config(config)


def _env_enablement() -> dict | None:
    """Seed ``PlatformConfig.extra`` from env vars during gateway config load.

    Called by the platform registry's env-enablement hook BEFORE adapter
    construction, so ``gateway status`` and ``get_connected_platforms()``
    reflect env-only configuration without instantiating the Teams SDK.
    Returns ``None`` when Teams isn't minimally configured.

    The special ``home_channel`` key in the returned dict becomes a proper
    ``HomeChannel`` dataclass on the ``PlatformConfig`` via the core hook.
    """
    client_id = os.getenv("TEAMS_CLIENT_ID", "").strip()
    client_secret = os.getenv("TEAMS_CLIENT_SECRET", "").strip()
    tenant_id = os.getenv("TEAMS_TENANT_ID", "").strip()
    if not (client_id and client_secret and tenant_id):
        return None
    seed: dict = {
        "client_id": client_id,
        "client_secret": client_secret,
        "tenant_id": tenant_id,
    }
    port = os.getenv("TEAMS_PORT", "").strip()
    if port:
        try:
            seed["port"] = int(port)
        except ValueError:
            pass
    service_url = os.getenv("TEAMS_SERVICE_URL", "").strip()
    if service_url:
        seed["service_url"] = service_url
    home = os.getenv("TEAMS_HOME_CHANNEL", "").strip()
    if home:
        seed["home_channel"] = {
            "chat_id": home,
            "name": os.getenv("TEAMS_HOME_CHANNEL_NAME", "Home"),
        }
    return seed


# Bot Framework default service URL for the global Teams endpoint.  Some
# regional/government tenants need a different host (e.g.
# ``https://smba.infra.gov.teams.microsoft.us/``) which can be supplied via
# ``TEAMS_SERVICE_URL`` or ``extra['service_url']``.
_DEFAULT_TEAMS_SERVICE_URL = "https://smba.trafficmanager.net/teams/"

# Allowlist of Bot Framework service hosts that may receive a freshly
# minted bearer token.  Operator-supplied URLs are matched against this
# allowlist to block SSRF / token-exfiltration via a tampered env var.
_ALLOWED_TEAMS_SERVICE_HOSTS = frozenset({
    "smba.trafficmanager.net",
    "smba.infra.gov.teams.microsoft.us",
})

# Conservative pattern for Bot Framework conversation IDs.  Real values
# combine digits, colons, hyphens, dots, '@', and the ``thread.skype`` /
# ``thread.tacv2`` suffixes; reject anything outside this set so a hostile
# value cannot path-traverse out of ``/v3/conversations/<id>/activities``.
import re as _re_teams
_TEAMS_CONV_ID_RE = _re_teams.compile(r"^[A-Za-z0-9:@\-_.]+$")


def _validate_teams_service_url(raw: str) -> Optional[str]:
    """Return a normalized service URL or ``None`` if it is not allowed.

    Requires ``https://`` and a host in ``_ALLOWED_TEAMS_SERVICE_HOSTS``.
    The trailing slash is added if absent so callers can append
    ``v3/conversations/...`` without double slashes.
    """
    if not raw:
        return None
    try:
        from urllib.parse import urlparse

        parsed = urlparse(raw)
    except Exception:
        return None
    if parsed.scheme != "https":
        return None
    if parsed.hostname not in _ALLOWED_TEAMS_SERVICE_HOSTS:
        return None
    normalized = raw if raw.endswith("/") else raw + "/"
    return normalized


async def _standalone_send(
    pconfig,
    chat_id: str,
    message: str,
    *,
    thread_id: Optional[str] = None,
    media_files: Optional[list] = None,
    force_document: bool = False,
) -> Dict[str, Any]:
    """Acquire a Bot Framework bearer token and POST a single message activity.

    Used by ``tools/send_message_tool._send_via_adapter`` when the gateway
    runner is not in this process (e.g. ``hermes cron`` running as a
    separate process from ``hermes gateway``).  Without this hook,
    ``deliver=teams`` cron jobs fail with ``No live adapter for platform``.

    Configuration: requires ``TEAMS_CLIENT_ID``, ``TEAMS_CLIENT_SECRET``,
    ``TEAMS_TENANT_ID``, ``TEAMS_HOME_CHANNEL`` (the conversation ID), and
    optionally ``TEAMS_SERVICE_URL`` (Bot Framework service host; must be
    a known Bot Framework endpoint, see ``_ALLOWED_TEAMS_SERVICE_HOSTS``).

    Security: ``service_url`` is validated against an allowlist of known
    Bot Framework hosts to block SSRF / token-exfiltration via a tampered
    env var.  ``chat_id`` is validated to match the documented Bot
    Framework ID character set so it cannot escape the URL path.

    ``media_files`` and ``force_document`` are accepted for signature
    parity but not implemented for the standalone path; messages with
    attachments will send as text-only.  The live adapter handles
    attachments via the SDK.
    """
    extra = getattr(pconfig, "extra", {}) or {}
    client_id = os.getenv("TEAMS_CLIENT_ID") or extra.get("client_id", "")
    client_secret = os.getenv("TEAMS_CLIENT_SECRET") or extra.get("client_secret", "")
    tenant_id = os.getenv("TEAMS_TENANT_ID") or extra.get("tenant_id", "")
    if not (client_id and client_secret and tenant_id):
        return {"error": "Teams standalone send: TEAMS_CLIENT_ID, TEAMS_CLIENT_SECRET, and TEAMS_TENANT_ID are all required"}

    raw_service_url = (
        os.getenv("TEAMS_SERVICE_URL")
        or extra.get("service_url", "")
        or _DEFAULT_TEAMS_SERVICE_URL
    )
    service_url = _validate_teams_service_url(raw_service_url)
    if service_url is None:
        return {"error": (
            f"Teams standalone send: TEAMS_SERVICE_URL host is not on the "
            f"Bot Framework allowlist; expected one of "
            f"{sorted(_ALLOWED_TEAMS_SERVICE_HOSTS)}"
        )}

    # Bot Framework conversation IDs are restricted to a known character
    # set; anything else means a tampered chat_id trying to break out of
    # the URL path.
    if not chat_id:
        return {"error": "Teams standalone send: chat_id (conversation ID) is required"}
    if not _TEAMS_CONV_ID_RE.match(chat_id):
        return {"error": "Teams standalone send: chat_id contains characters outside the Bot Framework conversation ID set"}
    if not _TEAMS_CONV_ID_RE.match(tenant_id):
        return {"error": "Teams standalone send: TEAMS_TENANT_ID contains characters outside the expected set"}

    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
    activities_url = f"{service_url}v3/conversations/{chat_id}/activities"

    if not AIOHTTP_AVAILABLE:
        return {"error": "Teams standalone send: aiohttp not installed"}

    try:
        import aiohttp as _aiohttp

        # Per-request timeouts so a slow STS endpoint cannot starve the
        # subsequent activity POST of its budget.
        per_request_timeout = _aiohttp.ClientTimeout(total=15.0)
        async with _aiohttp.ClientSession(trust_env=True) as session:
            async with session.post(
                token_url,
                data={
                    "grant_type": "client_credentials",
                    "client_id": client_id,
                    "client_secret": client_secret,
                    "scope": "https://api.botframework.com/.default",
                },
                headers={"Content-Type": "application/x-www-form-urlencoded"},
                timeout=per_request_timeout,
            ) as token_resp:
                if token_resp.status >= 400:
                    body = await token_resp.text()
                    return {"error": f"Teams standalone send: token request failed ({token_resp.status}): {body[:300]}"}
                token_payload = await token_resp.json()
            access_token = token_payload.get("access_token")
            if not access_token:
                return {"error": "Teams standalone send: token response missing access_token"}

            activity = {
                "type": "message",
                "text": message,
                "textFormat": "markdown",
            }
            async with session.post(
                activities_url,
                json=activity,
                headers={
                    "Authorization": f"Bearer {access_token}",
                    "Content-Type": "application/json",
                },
                timeout=per_request_timeout,
            ) as send_resp:
                if send_resp.status >= 400:
                    body = await send_resp.text()
                    return {"error": f"Teams standalone send: activity post failed ({send_resp.status}): {body[:300]}"}
                send_payload = await send_resp.json()
        return {
            "success": True,
            "message_id": send_payload.get("id"),
        }
    except asyncio.CancelledError:
        raise
    except Exception as e:
        logger.debug("Teams standalone send raised", exc_info=True)
        return {"error": f"Teams standalone send failed: {e}"}


# Keep the old name as an alias so existing test imports don't break.
check_teams_requirements = check_requirements


class TeamsAdapter(BasePlatformAdapter):
    """Microsoft Teams adapter using the microsoft-teams-apps SDK."""

    MAX_MESSAGE_LENGTH = 28000  # Teams text message limit (~28 KB)

    def __init__(self, config: PlatformConfig):
        super().__init__(config, Platform("teams"))
        extra = config.extra or {}
        self._client_id = extra.get("client_id") or os.getenv("TEAMS_CLIENT_ID", "")
        self._client_secret = extra.get("client_secret") or os.getenv("TEAMS_CLIENT_SECRET", "")
        self._tenant_id = extra.get("tenant_id") or os.getenv("TEAMS_TENANT_ID", "")
        self._port = _coerce_port(
            extra.get("port") or os.getenv("TEAMS_PORT", str(_DEFAULT_PORT))
        )
        self._app: Optional["App"] = None
        self._runner: Optional["web.AppRunner"] = None
        self._dedup = MessageDeduplicator(max_size=1000)
        # Maps chat_id → ConversationReference captured from incoming messages.
        # Used to send cards with the correct conversation type (personal/group/channel).
        self._conv_refs: Dict[str, Any] = {}

    async def connect(self) -> bool:
        if not TEAMS_SDK_AVAILABLE:
            self._set_fatal_error(
                "MISSING_SDK",
                "microsoft-teams-apps not installed. Run: pip install microsoft-teams-apps",
                retryable=False,
            )
            return False

        if not AIOHTTP_AVAILABLE:
            self._set_fatal_error(
                "MISSING_SDK",
                "aiohttp not installed. Run: pip install aiohttp",
                retryable=False,
            )
            return False

        if not self._client_id or not self._client_secret or not self._tenant_id:
            self._set_fatal_error(
                "MISSING_CREDENTIALS",
                "TEAMS_CLIENT_ID, TEAMS_CLIENT_SECRET, and TEAMS_TENANT_ID are all required",
                retryable=False,
            )
            return False

        try:
            # Set up aiohttp app first — the bridge adapter wires SDK routes into it
            aiohttp_app = web.Application()
            aiohttp_app.router.add_get("/health", lambda _: web.Response(text="ok"))

            self._app = App(
                client_id=self._client_id,
                client_secret=self._client_secret,
                tenant_id=self._tenant_id,
                http_server_adapter=_AiohttpBridgeAdapter(aiohttp_app),
                client=ClientOptions(headers={"User-Agent": "Hermes"}),
            )

            # Register message handler before initialize()
            @self._app.on_message
            async def _handle_message(ctx: ActivityContext[MessageActivity]):
                await self._on_message(ctx)

            @self._app.on_card_action
            async def _handle_card_action(
                ctx: ActivityContext[AdaptiveCardInvokeActivity],
            ) -> InvokeResponse[AdaptiveCardActionMessageResponse]:
                return await self._on_card_action(ctx)

            # initialize() calls register_route() on the bridge, which adds
            # POST /api/messages to aiohttp_app automatically
            await self._app.initialize()

            self._runner = web.AppRunner(aiohttp_app)
            await self._runner.setup()
            site = web.TCPSite(self._runner, "0.0.0.0", self._port)
            await site.start()

            self._running = True
            self._mark_connected()
            logger.info(
                "[teams] Webhook server listening on 0.0.0.0:%d%s",
                self._port,
                _WEBHOOK_PATH,
            )
            return True

        except Exception as e:
            self._set_fatal_error(
                "CONNECT_FAILED",
                f"Teams connection failed: {e}",
                retryable=True,
            )
            logger.error("[teams] Failed to connect: %s", e)
            return False

    async def disconnect(self) -> None:
        self._running = False
        if self._runner:
            await self._runner.cleanup()
            self._runner = None
        self._app = None
        self._mark_disconnected()
        logger.info("[teams] Disconnected")

    async def _on_message(self, ctx: ActivityContext[MessageActivity]) -> None:
        """Process an incoming Teams message and dispatch to the gateway."""
        activity = ctx.activity

        # Self-message filter
        bot_id = self._app.id if self._app else None
        if bot_id and getattr(activity.from_, "id", None) == bot_id:
            return

        # Deduplication
        msg_id = getattr(activity, "id", None)
        if msg_id and self._dedup.is_duplicate(msg_id):
            return

        # Cache the conversation reference for proactive sends (approval cards, etc.)
        conv_id = getattr(activity.conversation, "id", None)
        if conv_id:
            self._conv_refs[conv_id] = ctx.conversation_ref

        # Extract text — strip bot @mentions
        text = ""
        if hasattr(activity, "text") and activity.text:
            text = activity.text
        # Strip <at>BotName</at> HTML tags that Teams prepends for @mentions
        if "<at>" in text:
            import re
            text = re.sub(r"<at>[^<]*</at>\s*", "", text).strip()

        # Determine chat type from conversation
        conv = activity.conversation
        conv_type = getattr(conv, "conversation_type", None) or ""
        if conv_type == "personal":
            chat_type = "dm"
        elif conv_type == "groupChat":
            chat_type = "group"
        elif conv_type == "channel":
            chat_type = "channel"
        else:
            chat_type = "dm"

        # Build source
        from_account = activity.from_
        user_id = getattr(from_account, "aad_object_id", None) or getattr(from_account, "id", "")
        user_name = getattr(from_account, "name", None) or ""

        source = self.build_source(
            chat_id=conv.id,
            chat_name=getattr(conv, "name", None) or "",
            chat_type=chat_type,
            user_id=str(user_id),
            user_name=user_name,
            guild_id=getattr(conv, "tenant_id", None) or self._tenant_id,
        )

        # Handle image attachments
        media_urls = []
        media_types = []
        for att in getattr(activity, "attachments", None) or []:
            content_url = getattr(att, "content_url", None)
            content_type = getattr(att, "content_type", None) or ""
            if content_url and content_type.startswith("image/"):
                try:
                    cached = await cache_image_from_url(content_url)
                    if cached:
                        media_urls.append(cached)
                        media_types.append(content_type)
                except Exception as e:
                    logger.warning("[teams] Failed to cache image attachment: %s", e)

        msg_type = MessageType.PHOTO if media_urls else MessageType.TEXT

        event = MessageEvent(
            text=text,
            source=source,
            message_type=msg_type,
            media_urls=media_urls,
            media_types=media_types,
            message_id=msg_id,
        )
        await self.handle_message(event)

    async def _send_card(self, chat_id: str, card: "AdaptiveCard") -> "Any":
        """Send an AdaptiveCard, using a stored ConversationReference when available."""
        from microsoft_teams.api import MessageActivityInput

        conv_ref = self._conv_refs.get(chat_id)
        if conv_ref and self._app:
            activity = MessageActivityInput().add_card(card)
            return await self._app.activity_sender.send(activity, conv_ref)
        elif self._app:
            return await self._app.send(chat_id, card)
        return None

    async def _on_card_action(
        self, ctx: "ActivityContext[AdaptiveCardInvokeActivity]"
    ) -> "InvokeResponse[AdaptiveCardActionMessageResponse]":
        """Handle an Adaptive Card Action.Execute button click."""
        from tools.approval import resolve_gateway_approval, has_blocking_approval

        action = ctx.activity.value.action
        data = action.data or {}
        hermes_action = data.get("hermes_action", "")
        session_key = data.get("session_key", "")

        if not hermes_action or not session_key:
            return InvokeResponse(
                status=200,
                body=AdaptiveCardActionMessageResponse(value="Unknown action."),
            )

        # Only authorized users may click approval buttons.
        # Default-deny: require either TEAMS_ALLOWED_USERS or an explicit
        # TEAMS_ALLOW_ALL_USERS=true opt-in. Without one of these set, the
        # bot silently treated every clicker as authorized — meaning any
        # Teams user who could message the bot could approve dangerous commands.
        allowed_csv = os.getenv("TEAMS_ALLOWED_USERS", "").strip()
        allow_all = os.getenv("TEAMS_ALLOW_ALL_USERS", "").strip().lower() in {"1", "true", "yes"}

        if not allow_all:
            if not allowed_csv:
                logger.warning(
                    "[teams] card action rejected: TEAMS_ALLOWED_USERS not configured "
                    "and TEAMS_ALLOW_ALL_USERS not set — default deny"
                )
                return InvokeResponse(
                    status=200,
                    body=AdaptiveCardActionMessageResponse(
                        value="⛔ Approval buttons require TEAMS_ALLOWED_USERS to be configured."
                    ),
                )
            from_account = ctx.activity.from_
            clicker_id = getattr(from_account, "aad_object_id", None) or getattr(from_account, "id", "")
            allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()}
            if "*" not in allowed_ids and clicker_id not in allowed_ids:
                logger.warning("[teams] Unauthorized card action by %s — ignoring", clicker_id)
                return InvokeResponse(
                    status=200,
                    body=AdaptiveCardActionMessageResponse(value="⛔ Not authorized."),
                )

        choice_map = {
            "approve_once": "once",
            "approve_session": "session",
            "approve_always": "always",
            "deny": "deny",
        }
        choice = choice_map.get(hermes_action)
        if not choice:
            return InvokeResponse(
                status=200,
                body=AdaptiveCardActionMessageResponse(value="Unknown action."),
            )

        if not has_blocking_approval(session_key):
            return InvokeResponse(
                status=200,
                body=AdaptiveCardActionCardResponse(
                    value=AdaptiveCard()
                    .with_version("1.4")
                    .with_body([TextBlock(text="⚠️ Approval already resolved or expired.", wrap=True)])
                ),
            )

        resolve_gateway_approval(session_key, choice)

        label_map = {
            "once": "✅ Allowed (once)",
            "session": "✅ Allowed (session)",
            "always": "✅ Always allowed",
            "deny": "❌ Denied",
        }
        cmd = data.get("cmd", "")
        desc = data.get("desc", "")
        body = []
        if cmd:
            body.append(TextBlock(text="⚠️ Command Approval Required", wrap=True, weight="Bolder"))
            body.append(TextBlock(text=f"```\n{cmd}\n```", wrap=True))
        if desc:
            body.append(TextBlock(text=f"Reason: {desc}", wrap=True, isSubtle=True))
        body.append(TextBlock(text=label_map[choice], wrap=True, weight="Bolder"))

        return InvokeResponse(
            status=200,
            body=AdaptiveCardActionCardResponse(
                value=AdaptiveCard().with_version("1.4").with_body(body)
            ),
        )

    async def send_exec_approval(
        self,
        chat_id: str,
        command: str,
        session_key: str,
        description: str = "dangerous command",
        metadata: Optional[Dict[str, Any]] = None,
    ) -> SendResult:
        """Send an Adaptive Card approval prompt with Allow/Deny buttons."""
        if not self._app:
            return SendResult(success=False, error="Teams app not initialized")

        cmd_preview = command[:2000] + "..." if len(command) > 2000 else command
        # Truncated for button data payload — just enough to reconstruct the card body.
        btn_data_base = {
            "session_key": session_key,
            "cmd": command[:200] + "..." if len(command) > 200 else command,
            "desc": description,
        }

        card = (
            AdaptiveCard()
            .with_version("1.4")
            .with_body([
                TextBlock(text="⚠️ Command Approval Required", wrap=True, weight="Bolder"),
                TextBlock(text=f"```\n{cmd_preview}\n```", wrap=True),
                TextBlock(text=f"Reason: {description}", wrap=True, isSubtle=True),
            ])
            .with_actions([
                ExecuteAction(
                    title="Allow Once",
                    verb="hermes_approve",
                    data={**btn_data_base, "hermes_action": "approve_once"},
                    style="positive",
                ),
                ExecuteAction(
                    title="Allow Session",
                    verb="hermes_approve",
                    data={**btn_data_base, "hermes_action": "approve_session"},
                ),
                ExecuteAction(
                    title="Always Allow",
                    verb="hermes_approve",
                    data={**btn_data_base, "hermes_action": "approve_always"},
                ),
                ExecuteAction(
                    title="Deny",
                    verb="hermes_approve",
                    data={**btn_data_base, "hermes_action": "deny"},
                    style="destructive",
                ),
            ])
        )

        try:
            result = await self._send_card(chat_id, card)
            message_id = getattr(result, "id", None) if result else None
            return SendResult(success=True, message_id=message_id)
        except Exception as e:
            logger.error("[teams] send_exec_approval failed: %s", e, exc_info=True)
            return SendResult(success=False, error=str(e), retryable=True)

    async def send(
        self,
        chat_id: str,
        content: str,
        reply_to: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> SendResult:
        if not self._app:
            return SendResult(success=False, error="Teams app not initialized")

        formatted = self.format_message(content)
        chunks = self.truncate_message(formatted)
        last_message_id = None

        for chunk in chunks:
            try:
                if reply_to and reply_to.isdigit() and reply_to != "0":
                    try:
                        result = await self._app.reply(chat_id, reply_to, chunk)
                    except Exception as reply_err:
                        # Group chats 400 on threaded sends; the Teams SDK
                        # doesn't expose typed HTTP errors, so fall back on
                        # any exception and log for diagnostics.
                        logger.debug(
                            "Teams reply() failed, falling back to flat send: %s",
                            reply_err,
                        )
                        result = await self._app.send(chat_id, chunk)
                else:
                    result = await self._app.send(chat_id, chunk)
                last_message_id = getattr(result, "id", None)
            except Exception as e:
                return SendResult(success=False, error=str(e), retryable=True)

        return SendResult(success=True, message_id=last_message_id)

    async def send_typing(self, chat_id: str, metadata: Optional[Dict[str, Any]] = None) -> None:
        if not self._app:
            return
        try:
            await self._app.send(chat_id, TypingActivityInput())
        except Exception:
            pass

    async def send_image(
        self,
        chat_id: str,
        image_url: str,
        caption: Optional[str] = None,
        reply_to: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> SendResult:
        if not self._app:
            return SendResult(success=False, error="Teams app not initialized")

        try:
            import base64
            import mimetypes
            from microsoft_teams.api import Attachment, MessageActivityInput

            if image_url.startswith("http://") or image_url.startswith("https://"):
                content_url = image_url
                mime_type = "image/png"
            else:
                # Local path — encode as base64 data URI
                path = image_url.removeprefix("file://")
                mime_type = mimetypes.guess_type(path)[0] or "image/png"
                with open(path, "rb") as f:
                    content_url = f"data:{mime_type};base64,{base64.b64encode(f.read()).decode()}"

            attachment = Attachment(content_type=mime_type, content_url=content_url)
            activity = MessageActivityInput().add_attachments(attachment)
            if caption:
                activity = activity.add_text(caption)

            conv_ref = self._conv_refs.get(chat_id)
            if conv_ref:
                result = await self._app.activity_sender.send(activity, conv_ref)
            else:
                result = await self._app.send(chat_id, activity)

            return SendResult(success=True, message_id=getattr(result, "id", None))
        except Exception as e:
            logger.error("[teams] send_image failed: %s", e, exc_info=True)
            return SendResult(success=False, error=str(e), retryable=True)

    async def send_image_file(
        self,
        chat_id: str,
        image_path: str,
        caption: Optional[str] = None,
        reply_to: Optional[str] = None,
        **kwargs,
    ) -> SendResult:
        return await self.send_image(
            chat_id=chat_id,
            image_url=image_path,
            caption=caption,
            reply_to=reply_to,
        )

    async def get_chat_info(self, chat_id: str) -> dict:
        return {"name": chat_id, "type": "unknown", "chat_id": chat_id}


# ── Interactive setup ─────────────────────────────────────────────────────────

def interactive_setup() -> None:
    """Guide the user through Teams setup using the Teams CLI."""
    from hermes_cli.config import (
        get_env_value,
        save_env_value,
    )
    from hermes_cli.cli_output import (
        prompt,
        prompt_yes_no,
        print_info,
        print_success,
        print_warning,
    )

    existing_id = get_env_value("TEAMS_CLIENT_ID")
    if existing_id:
        print_info(f"Teams: already configured (app ID: {existing_id})")
        if not prompt_yes_no("Reconfigure Teams?", False):
            return

    print_info("You'll need the Teams CLI. If you haven't already:")
    print_info("  npm install -g @microsoft/teams.cli@preview")
    print_info("  teams login")
    print()
    print_info("Then expose port 3978 publicly (devtunnel / ngrok / cloudflared),")
    print_info("and create your bot:")
    print_info("  teams app create --name \"Hermes\" --endpoint \"https://<tunnel>/api/messages\"")
    print()
    print_info("The CLI will print CLIENT_ID, CLIENT_SECRET, and TENANT_ID. Paste them below.")
    print()

    client_id = prompt("Client ID", default=existing_id or "")
    if not client_id:
        print_warning("Client ID is required — skipping Teams setup")
        return
    save_env_value("TEAMS_CLIENT_ID", client_id.strip())

    client_secret = prompt("Client secret", default=get_env_value("TEAMS_CLIENT_SECRET") or "", password=True)
    if not client_secret:
        print_warning("Client secret is required — skipping Teams setup")
        return
    save_env_value("TEAMS_CLIENT_SECRET", client_secret.strip())

    tenant_id = prompt("Tenant ID", default=get_env_value("TEAMS_TENANT_ID") or "")
    if not tenant_id:
        print_warning("Tenant ID is required — skipping Teams setup")
        return
    save_env_value("TEAMS_TENANT_ID", tenant_id.strip())

    print()
    print_info("To find your AAD object ID for the allowlist: teams status --verbose")
    if prompt_yes_no("Restrict access to specific users? (recommended)", True):
        allowed = prompt(
            "Allowed AAD object IDs (comma-separated)",
            default=get_env_value("TEAMS_ALLOWED_USERS") or "",
        )
        if allowed:
            save_env_value("TEAMS_ALLOWED_USERS", allowed.replace(" ", ""))
            print_success("Allowlist configured")
        else:
            save_env_value("TEAMS_ALLOWED_USERS", "")
    else:
        save_env_value("TEAMS_ALLOW_ALL_USERS", "true")
        print_warning("⚠️  Open access — anyone who can message the bot can command it.")

    print()
    print_success("Teams configuration saved to ~/.hermes/.env")
    print_info("Install the app in Teams:  teams app install --id <teamsAppId>")
    print_info("Restart the gateway:       hermes gateway restart")


# ── Plugin entry point ────────────────────────────────────────────────────────

def register(ctx) -> None:
    """Plugin entry point — called by the Hermes plugin system."""
    ctx.register_platform(
        name="teams",
        label="Microsoft Teams",
        adapter_factory=lambda cfg: TeamsAdapter(cfg),
        check_fn=check_requirements,
        validate_config=validate_config,
        is_connected=is_connected,
        required_env=["TEAMS_CLIENT_ID", "TEAMS_CLIENT_SECRET", "TEAMS_TENANT_ID"],
        install_hint="pip install microsoft-teams-apps aiohttp",
        setup_fn=interactive_setup,
        # Env-driven auto-configuration — seeds PlatformConfig.extra with
        # client_id/secret/tenant + port + home_channel so env-only setups
        # show up in gateway status without instantiating the Teams SDK.
        env_enablement_fn=_env_enablement,
        # Cron home-channel delivery support.  Lets deliver=teams cron
        # jobs route to the configured Teams chat/channel without editing
        # cron/scheduler.py's hardcoded sets.
        cron_deliver_env_var="TEAMS_HOME_CHANNEL",
        # Out-of-process cron delivery via Bot Framework REST.  Without
        # this hook, deliver=teams cron jobs fail with "No live adapter"
        # when cron runs separately from the gateway.
        standalone_sender_fn=_standalone_send,
        # Auth env vars for _is_user_authorized() integration
        allowed_users_env="TEAMS_ALLOWED_USERS",
        allow_all_env="TEAMS_ALLOW_ALL_USERS",
        # Teams supports up to ~28 KB per message
        max_message_length=28000,
        # Display
        emoji="💼",
        allow_update_command=True,
        # LLM guidance
        platform_hint=(
            "You are chatting via Microsoft Teams. Teams renders a subset of "
            "markdown — bold (**text**), italic (*text*), and inline code "
            "(`code`) work, but complex tables or raw HTML do not. Keep "
            "responses clear and professional."
        ),
    )
