def get_sd_notify_func()

in github-runner-ami/packer/files/runner-supervisor.py [0:0]


def get_sd_notify_func() -> Callable[[str], None]:
    # http://www.freedesktop.org/software/systemd/man/sd_notify.html
    addr = os.getenv('NOTIFY_SOCKET')
    if not addr:
        return lambda status: None

    sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
    if addr[0] == '@':
        addr = '\0' + addr[1:]
    sock.connect(addr)

    def notify(status: str):
        sock.sendall(status.encode('utf-8'))

    return notify