# dialects/sqlite/aiosqlite.py
# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php


r"""

.. dialect:: sqlite+aiosqlite
    :name: aiosqlite
    :dbapi: aiosqlite
    :connectstring: sqlite+aiosqlite:///file_path
    :url: https://pypi.org/project/aiosqlite/

The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
running on top of pysqlite.

aiosqlite is a wrapper around pysqlite that uses a background thread for
each connection.   It does not actually use non-blocking IO, as SQLite
databases are not socket-based.  However it does provide a working asyncio
interface that's useful for testing and prototyping purposes.

Using a special asyncio mediation layer, the aiosqlite dialect is usable
as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
extension package.

This dialect should normally be used only with the
:func:`_asyncio.create_async_engine` engine creation function::

    from sqlalchemy.ext.asyncio import create_async_engine

    engine = create_async_engine("sqlite+aiosqlite:///filename")

The URL passes through all arguments to the ``pysqlite`` driver, so all
connection arguments are the same as they are for that of :ref:`pysqlite`.

.. _aiosqlite_udfs:

User-Defined Functions
----------------------

aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.

.. _aiosqlite_serializable:

Serializable isolation / Savepoints / Transactional DDL (asyncio version)
-------------------------------------------------------------------------

A newly revised version of this important section is now available
at the top level of the SQLAlchemy SQLite documentation, in the section
:ref:`sqlite_transactions`.


.. _aiosqlite_pooling:

Pooling Behavior
----------------

The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
based on the kind of SQLite database that's requested:

* When a ``:memory:`` SQLite database is specified, the dialect by default
  will use :class:`.StaticPool`. This pool maintains a single
  connection, so that all access to the engine
  use the same ``:memory:`` database.
* When a file-based database is specified, the dialect will use
  :class:`.AsyncAdaptedQueuePool` as the source of connections.

  .. versionchanged:: 2.0.38

    SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
    Previously, :class:`.NullPool` were used.  The :class:`.NullPool` class
    may be used by specifying it via the
    :paramref:`_sa.create_engine.poolclass` parameter.

"""  # noqa
from __future__ import annotations

import asyncio
from collections import deque
from functools import partial
from types import ModuleType
from typing import Any
from typing import cast
from typing import Deque
from typing import Iterator
from typing import NoReturn
from typing import Optional
from typing import Sequence
from typing import TYPE_CHECKING
from typing import Union

from .base import SQLiteExecutionContext
from .pysqlite import SQLiteDialect_pysqlite
from ... import pool
from ... import util
from ...connectors.asyncio import AsyncAdapt_dbapi_module
from ...engine import AdaptedConnection
from ...util.concurrency import await_fallback
from ...util.concurrency import await_only

if TYPE_CHECKING:
    from ...connectors.asyncio import AsyncIODBAPIConnection
    from ...connectors.asyncio import AsyncIODBAPICursor
    from ...engine.interfaces import _DBAPICursorDescription
    from ...engine.interfaces import _DBAPIMultiExecuteParams
    from ...engine.interfaces import _DBAPISingleExecuteParams
    from ...engine.interfaces import DBAPIConnection
    from ...engine.interfaces import DBAPICursor
    from ...engine.interfaces import DBAPIModule
    from ...engine.url import URL
    from ...pool.base import PoolProxiedConnection


class AsyncAdapt_aiosqlite_cursor:
    # TODO: base on connectors/asyncio.py
    # see #10415

    __slots__ = (
        "_adapt_connection",
        "_connection",
        "description",
        "await_",
        "_rows",
        "arraysize",
        "rowcount",
        "lastrowid",
    )

    server_side = False

    def __init__(self, adapt_connection: AsyncAdapt_aiosqlite_connection):
        self._adapt_connection = adapt_connection
        self._connection = adapt_connection._connection
        self.await_ = adapt_connection.await_
        self.arraysize = 1
        self.rowcount = -1
        self.description: Optional[_DBAPICursorDescription] = None
        self._rows: Deque[Any] = deque()

    def close(self) -> None:
        self._rows.clear()

    def execute(
        self,
        operation: Any,
        parameters: Optional[_DBAPISingleExecuteParams] = None,
    ) -> Any:

        try:
            _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor())  # type: ignore[arg-type] # noqa: E501

            if parameters is None:
                self.await_(_cursor.execute(operation))
            else:
                self.await_(_cursor.execute(operation, parameters))

            if _cursor.description:
                self.description = _cursor.description
                self.lastrowid = self.rowcount = -1

                if not self.server_side:
                    self._rows = deque(self.await_(_cursor.fetchall()))
            else:
                self.description = None
                self.lastrowid = _cursor.lastrowid
                self.rowcount = _cursor.rowcount

            if not self.server_side:
                self.await_(_cursor.close())
            else:
                self._cursor = _cursor  # type: ignore[misc]
        except Exception as error:
            self._adapt_connection._handle_exception(error)

    def executemany(
        self,
        operation: Any,
        seq_of_parameters: _DBAPIMultiExecuteParams,
    ) -> Any:
        try:
            _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor())  # type: ignore[arg-type] # noqa: E501
            self.await_(_cursor.executemany(operation, seq_of_parameters))
            self.description = None
            self.lastrowid = _cursor.lastrowid
            self.rowcount = _cursor.rowcount
            self.await_(_cursor.close())
        except Exception as error:
            self._adapt_connection._handle_exception(error)

    def setinputsizes(self, *inputsizes: Any) -> None:
        pass

    def __iter__(self) -> Iterator[Any]:
        while self._rows:
            yield self._rows.popleft()

    def fetchone(self) -> Optional[Any]:
        if self._rows:
            return self._rows.popleft()
        else:
            return None

    def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
        if size is None:
            size = self.arraysize

        rr = self._rows
        return [rr.popleft() for _ in range(min(size, len(rr)))]

    def fetchall(self) -> Sequence[Any]:
        retval = list(self._rows)
        self._rows.clear()
        return retval


class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
    # TODO: base on connectors/asyncio.py
    # see #10415
    __slots__ = "_cursor"

    server_side = True

    def __init__(self, *arg: Any, **kw: Any) -> None:
        super().__init__(*arg, **kw)
        self._cursor: Optional[AsyncIODBAPICursor] = None

    def close(self) -> None:
        if self._cursor is not None:
            self.await_(self._cursor.close())
            self._cursor = None

    def fetchone(self) -> Optional[Any]:
        assert self._cursor is not None
        return self.await_(self._cursor.fetchone())

    def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
        assert self._cursor is not None
        if size is None:
            size = self.arraysize
        return self.await_(self._cursor.fetchmany(size=size))

    def fetchall(self) -> Sequence[Any]:
        assert self._cursor is not None
        return self.await_(self._cursor.fetchall())


class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
    await_ = staticmethod(await_only)
    __slots__ = ("dbapi",)

    def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection) -> None:
        self.dbapi = dbapi
        self._connection = connection

    @property
    def isolation_level(self) -> Optional[str]:
        return cast(str, self._connection.isolation_level)

    @isolation_level.setter
    def isolation_level(self, value: Optional[str]) -> None:
        # aiosqlite's isolation_level setter works outside the Thread
        # that it's supposed to, necessitating setting check_same_thread=False.
        # for improved stability, we instead invent our own awaitable version
        # using aiosqlite's async queue directly.

        def set_iso(
            connection: AsyncAdapt_aiosqlite_connection, value: Optional[str]
        ) -> None:
            connection.isolation_level = value

        function = partial(set_iso, self._connection._conn, value)
        future = asyncio.get_event_loop().create_future()

        self._connection._tx.put_nowait((future, function))

        try:
            self.await_(future)
        except Exception as error:
            self._handle_exception(error)

    def create_function(self, *args: Any, **kw: Any) -> None:
        try:
            self.await_(self._connection.create_function(*args, **kw))
        except Exception as error:
            self._handle_exception(error)

    def cursor(self, server_side: bool = False) -> AsyncAdapt_aiosqlite_cursor:
        if server_side:
            return AsyncAdapt_aiosqlite_ss_cursor(self)
        else:
            return AsyncAdapt_aiosqlite_cursor(self)

    def execute(self, *args: Any, **kw: Any) -> Any:
        return self.await_(self._connection.execute(*args, **kw))

    def rollback(self) -> None:
        try:
            self.await_(self._connection.rollback())
        except Exception as error:
            self._handle_exception(error)

    def commit(self) -> None:
        try:
            self.await_(self._connection.commit())
        except Exception as error:
            self._handle_exception(error)

    def close(self) -> None:
        try:
            self.await_(self._connection.close())
        except ValueError:
            # this is undocumented for aiosqlite, that ValueError
            # was raised if .close() was called more than once, which is
            # both not customary for DBAPI and is also not a DBAPI.Error
            # exception. This is now fixed in aiosqlite via my PR
            # https://github.com/omnilib/aiosqlite/pull/238, so we can be
            # assured this will not become some other kind of exception,
            # since it doesn't raise anymore.

            pass
        except Exception as error:
            self._handle_exception(error)

    def _handle_exception(self, error: Exception) -> NoReturn:
        if (
            isinstance(error, ValueError)
            and error.args[0] == "no active connection"
        ):
            raise self.dbapi.sqlite.OperationalError(
                "no active connection"
            ) from error
        else:
            raise error


class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
    __slots__ = ()

    await_ = staticmethod(await_fallback)


class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
    def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
        self.aiosqlite = aiosqlite
        self.sqlite = sqlite
        self.paramstyle = "qmark"
        self._init_dbapi_attributes()

    def _init_dbapi_attributes(self) -> None:
        for name in (
            "DatabaseError",
            "Error",
            "IntegrityError",
            "NotSupportedError",
            "OperationalError",
            "ProgrammingError",
            "sqlite_version",
            "sqlite_version_info",
        ):
            setattr(self, name, getattr(self.aiosqlite, name))

        for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
            setattr(self, name, getattr(self.sqlite, name))

        for name in ("Binary",):
            setattr(self, name, getattr(self.sqlite, name))

    def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection:
        async_fallback = kw.pop("async_fallback", False)

        creator_fn = kw.pop("async_creator_fn", None)
        if creator_fn:
            connection = creator_fn(*arg, **kw)
        else:
            connection = self.aiosqlite.connect(*arg, **kw)
            # it's a Thread.   you'll thank us later
            connection.daemon = True

        if util.asbool(async_fallback):
            return AsyncAdaptFallback_aiosqlite_connection(
                self,
                await_fallback(connection),
            )
        else:
            return AsyncAdapt_aiosqlite_connection(
                self,
                await_only(connection),
            )


class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
    def create_server_side_cursor(self) -> DBAPICursor:
        return self._dbapi_connection.cursor(server_side=True)


class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
    driver = "aiosqlite"
    supports_statement_cache = True

    is_async = True

    supports_server_side_cursors = True

    execution_ctx_cls = SQLiteExecutionContext_aiosqlite

    @classmethod
    def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
        return AsyncAdapt_aiosqlite_dbapi(
            __import__("aiosqlite"), __import__("sqlite3")
        )

    @classmethod
    def get_pool_class(cls, url: URL) -> type[pool.Pool]:
        if cls._is_url_file_db(url):
            return pool.AsyncAdaptedQueuePool
        else:
            return pool.StaticPool

    def is_disconnect(
        self,
        e: DBAPIModule.Error,
        connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
        cursor: Optional[DBAPICursor],
    ) -> bool:
        self.dbapi = cast("DBAPIModule", self.dbapi)
        if isinstance(
            e, self.dbapi.OperationalError
        ) and "no active connection" in str(e):
            return True

        return super().is_disconnect(e, connection, cursor)

    def get_driver_connection(
        self, connection: DBAPIConnection
    ) -> AsyncIODBAPIConnection:
        return connection._connection  # type: ignore[no-any-return]


dialect = SQLiteDialect_aiosqlite
