by 96bcaa76-0f27-45e7-9aa3-65c535e53a05

Home Assistant Auth

I have recently fallen in love with Authelia and using it with ldap to be my soul source of authentication across all my apps. Getting AudioBookshelf, using oidc, and Jellyfin , using ldap directly where not that difficult.

However Home-Assistant seemed to be a bit more difficult. I found this post which made me think it would be a complete dead end. I then found hass-auth-header which seemeed to work well, except it does not seem to work with the mobile clients. I then found homeassistant-auth-authelia which was working mostly well. Although it would not find my existing users and would always just create a new user each time. Digging in some more it looks like the command_line auth feature of home-assistant was broken. So I decided to try and fix that.

This was difficult as I like many people only use home-assistant in a docker container, so making changes to the underlying python can’t be tested as any change to the image isn’t persisted and home-assistant needs to be restarted as you do this. I worked around that by doing this

      volumes = [ "home-assistant:/config"
"/home/mog/code/command_line.py:/usr/src/homeassistant/homeassistant/auth/providers/command_line.py"
"/home/mog/code/cache:/usr/src/homeassistant/homeassistant/auth/providers/__pycache__"
 ];

These mounts allowed me to make and test changes, although I still needed to restart the system each time and clear out the cache. After a few hours I had my pr built GITHub

So my working system is this now

Configuration.yaml

homeassistant:
  allowlist_external_dirs:
    - "/tmp"
  media_dirs:
    media: /config/media
  auth_providers:
    - type: command_line
      command: /config/bin/auth_authelia.sh
      args:
        ["https://auth.rldn.net", "https://hass.rldn.net", "hass", "hass_admin" ]
      meta: true
    - type: homeassistant

/config/bin/auth_authelia.sh

#! /usr/bin/env bash

## auth_authelia.sh
## Authenticate a Home Assistant user against an authelia instance

## Copyright 2023 Christian Baer
## http://github.com/chrisb86/

## Permission is hereby granted, free of charge, to any person obtaining
## a copy of this software and associated documentation files (the
## "Software"), to deal in the Software without restriction, including
## without limitation the rights to use, copy, modify, merge, publish,
## distribute, sublicense, and/or sell copies of the Software, and to
## permit persons to whom the Software is furnished to do so, subject to
## the following conditions:

## The above copyright notice and this permission notice shall be
## included in all copies or substantial portions of the Software.

## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
## EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
## MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
## NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
## LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
## OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
## WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

## Inspired by https://kevo.io/posts/2023-01-29-authelia-home-assistant-auth/ by Kevin O'Connor

## This script expects 2 command line parametes. You can specifiy a third one.
## Usage: authelia_auth.sh AUTHELIA_DOMAIN HOMEASSISTANT_DOMAIN [AUTHELIA_HOMEASSISTANT_GROUP]

## AUTHELIA_DOMAIN: Domain of your authelia instance 
## For example:
##  - https://sso.example.com
##  - https://example.com/auth
##  - http://example.com:8443

## HOMEASSISTANT_DOMAIN: Domain of your home assistant instance as configured in authelia
## For example:
##  - https://ha.example.com
##  - http://homeassistant.example.com:8123

## AUTHELIA_HOMEASSISTANT_GROUP (optional): The authelia group name for users that are allowed to access home assistant
## For example:
##  - homassistant_users
##  - home_automation

## The variables ${username} and ${password} will be set to the environment by home assistant


## Populate variables from command line
AUTHELIA_DOMAIN="${1}"
HOME_ASSISTANT_DOMAIN="${2}"
AUTHELIA_HOME_ASSISTANT_GROUP="${3}"
AUTHELIA_HOME_ASSISTANT_GROUP_ADMIN="${4}"



## Usernames should be validated using a regular expression to be of
## a known format. Special characters will be escaped anyway, but it is
## generally not recommended to allow more than necessary.
USERNAME_PATTERN='^[a-z|A-Z|0-9|_|-|.]+$'

## Temporary file path for storing authelia headers
TMP_FILE_NAME="./tmp_curl_${username}_$(date +%s)"

## Log messages to stderr.
log() {
  echo "$1" >&2
}

err=0
group_permissions=false

## Check username and password are present and not malformed.
if [ -z "$username" ] || [ -z "$password" ]; then
  log "Need username and password environment variables."
  err=1
elif [ ! -z "$USERNAME_PATTERN" ]; then
  username_match=$(echo "$username" | sed -r "s/$USERNAME_PATTERN/x/")
  if [ "$username_match" != "x" ]; then
    log "Username '$username' has an invalid format."
    err=1
  fi
fi

[ $err -ne 0 ] && exit 2

## Authenticate with authelia and dump headers to temporary file
curl --silent \
  --request GET \
  --header "X-Original-URL: ${HOME_ASSISTANT_DOMAIN}" \
  --basic --user "${username}:${password}" \
  -D "${TMP_FILE_NAME}" \
  "${AUTHELIA_DOMAIN}/api/verify?auth=basic"

maybe_admin="system-users"
## Extract user name and groups from temporary file
homeassistant_name=$(cat "${TMP_FILE_NAME}" | grep remote-name | cut -d ' ' -f 2-)
homeassistant_user=$(cat "${TMP_FILE_NAME}" | grep remote-user | cut -d ' ' -f 2-)
homeassistant_groups=$(cat "${TMP_FILE_NAME}" | grep remote-groups | cut -d ' ' -f 2-)
 
## Delete temporary file
rm "${TMP_FILE_NAME}"

## Check if user name is set. Otherwise exit becaus we'r not authenticated
if [ -z "${homeassistant_user}" ]; then
    log "Could not authenticate with server."
    exit 3
else
  ## Check if home assistant group is specified

  if [ -n "${AUTHELIA_HOME_ASSISTANT_GROUP}" ]; then 
	     if [[ $homeassistant_groups == *"${AUTHELIA_HOME_ASSISTANT_GROUP}"* ]]; then
               group_permissions=true
             fi
         if [ -n "${AUTHELIA_HOME_ASSISTANT_GROUP}" ]; then 
	     if [[ $homeassistant_groups == *"${AUTHELIA_HOME_ASSISTANT_GROUP_ADMIN}"* ]]; then
               group_permissions=true
	       maybe_admin="system-admin"
             fi
     fi
  fi

   ## If group permissions are granted, echo the user name as expected by home assistant
  if [ "${group_permissions}" == true ]; then
     echo "name = ${homeassistant_user}"
     echo "fullname = ${homeassistant_name}"
     echo "group = ${maybe_admin}"
  else
    ## Otherwise exit
    log "User has no permissions to access Home Assistant. Check group membership."
    exit 4
  fi
fi

/usr/src/homeassistant/homeassistant/auth/providers/command_line.py

"""Auth provider that validates credentials via an external command."""
from __future__ import annotations

import asyncio
from collections.abc import Mapping
import logging
import os
from typing import Any, cast

import voluptuous as vol

from homeassistant.core import async_get_hass
from homeassistant.core import callback

from homeassistant.components import person

from homeassistant.const import CONF_COMMAND
from homeassistant.data_entry_flow import FlowResult
from homeassistant.exceptions import HomeAssistantError

from ..models import Credentials, UserMeta
from . import AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, AuthProvider, LoginFlow

CONF_ARGS = "args"
CONF_META = "meta"

CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend(
    {
        vol.Required(CONF_COMMAND): vol.All(
            str, os.path.normpath, msg="must be an absolute path"
        ),
        vol.Optional(CONF_ARGS, default=None): vol.Any(vol.DefaultTo(list), [str]),
        vol.Optional(CONF_META, default=False): bool,
    },
    extra=vol.PREVENT_EXTRA,
)

_LOGGER = logging.getLogger(__name__)


class InvalidAuthError(HomeAssistantError):
    """Raised when authentication with given credentials fails."""


@AUTH_PROVIDERS.register("command_line")
class CommandLineAuthProvider(AuthProvider):
    """Auth provider validating credentials by calling a command."""

    DEFAULT_TITLE = "Command Line Authentication"

    # which keys to accept from a program's stdout
    ALLOWED_META_KEYS = (
        "name",
        "fullname",
        "group",
        "local_only",
    )

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Extend parent's __init__.

        Adds self._user_meta dictionary to hold the user-specific
        attributes provided by external programs.
        """
        super().__init__(*args, **kwargs)
        self._user_meta: dict[str, dict[str, Any]] = {}

    async def async_login_flow(self, context: dict[str, Any] | None) -> LoginFlow:
        """Return a flow to login."""
        return CommandLineLoginFlow(self)

    async def async_validate_login(self, username: str, password: str) -> None:
        """Validate a username and password."""
        env = {"username": username, "password": password}
        try:
            process = await asyncio.create_subprocess_exec(
                self.config[CONF_COMMAND],
                *self.config[CONF_ARGS],
                env=env,
                stdout=asyncio.subprocess.PIPE if self.config[CONF_META] else None,
                close_fds=False,  # required for posix_spawn
            )
            stdout, _ = await process.communicate()
        except OSError as err:
            # happens when command doesn't exist or permission is denied
            _LOGGER.error("Error while authenticating %r: %s", username, err)
            raise InvalidAuthError from err

        if process.returncode != 0:
            _LOGGER.error(
                "User %r failed to authenticate, command exited with code %d",
                username,
                process.returncode,
            )
            raise InvalidAuthError

        if self.config[CONF_META]:
            meta: dict[str, str] = {}
            for _line in stdout.splitlines():
                try:
                    line = _line.decode().lstrip()
                except ValueError:
                    # malformed line
                    continue
                if line.startswith("#") or "=" not in line:
                    continue
                key, _, value = line.partition("=")
                key = key.strip()
                value = value.strip()
                if key in self.ALLOWED_META_KEYS:
                    meta[key] = value
            self._user_meta[username] = meta

    async def async_get_or_create_credentials(
        self, flow_result: Mapping[str, str]
    ) -> Credentials:
        """Get credentials based on the flow result."""
        username = flow_result["username"].strip().casefold()

        users = await self.store.async_get_users()
        for user in users:
            if user.name and user.name.strip().casefold() != username:
                continue

            if not user.is_active:
                continue

            for credential in await self.async_credentials():
                if credential.data["username"] and credential.data["username"].strip().casefold() == username:
                    return credential

            cred = self.async_create_credentials({"username": username})
            await self.store.async_link_user(user, cred)
            return cred

        hass = async_get_hass()
        meta = self._user_meta.get(flow_result["username"], {})

        provider = _async_get_hass_provider(hass)
        await provider.async_initialize()

        user = await hass.auth.async_create_user(flow_result["username"], group_ids=[meta.get("group")])
        cred = await provider.async_get_or_create_credentials({"username": flow_result["username"]})

        pretty_name = meta.get("fullname")
        if not pretty_name:
            pretty_name = flow_result["username"]
        await provider.data.async_save()
        await hass.auth.async_link_user(user, cred)
        if "person" in hass.config.components:
            await person.async_create_person(hass, pretty_name, user_id=user.id)
        # Create new credentials.
        return cred

    async def async_user_meta_for_credentials(
        self, credentials: Credentials
    ) -> UserMeta:
        """Return extra user metadata for credentials.

        Currently, supports name, group and local_only.
        """
        meta = self._user_meta.get(credentials.data["username"], {})
        username = credentials.data["username"]
        user_meta = UserMeta(
            name=credentials.data["username"],
            is_active=True,
            group=meta.get("group"),
            local_only=meta.get("local_only") == "true",
        )

        return user_meta

class CommandLineLoginFlow(LoginFlow):
    """Handler for the login flow."""

    async def async_step_init(
        self, user_input: dict[str, str] | None = None
    ) -> FlowResult:
        """Handle the step of the form."""
        errors = {}

        if user_input is not None:
            user_input["username"] = user_input["username"].strip()
            try:
                await cast(
                    CommandLineAuthProvider, self._auth_provider
                ).async_validate_login(user_input["username"], user_input["password"])
            except InvalidAuthError:
                errors["base"] = "invalid_auth"

            if not errors:
                user_input.pop("password")
                return await self.async_finish(user_input)

        return self.async_show_form(
            step_id="init",
            data_schema=vol.Schema(
                {
                    vol.Required("username"): str,
                    vol.Required("password"): str,
                }
            ),
            errors=errors,
        )


@callback
def _async_get_hass_provider(hass):
    """Get the Home Assistant auth provider."""
    for prv in hass.auth.auth_providers:
        if prv.type == "homeassistant":
            return prv

    raise RuntimeError("No Home Assistant provider found")