Source code for langchain_core.utils.aiter

"""
改编自
https://github.com/maxfischer2781/asyncstdlib/blob/master/asyncstdlib/itertools.py
MIT许可证
"""

from collections import deque
from typing import (
    Any,
    AsyncContextManager,
    AsyncGenerator,
    AsyncIterator,
    Awaitable,
    Callable,
    Deque,
    Generic,
    Iterator,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
    cast,
    overload,
)

T = TypeVar("T")

_no_default = object()


# https://github.com/python/cpython/blob/main/Lib/test/test_asyncgen.py#L54
# before 3.10, the builtin anext() was not available
[docs]def py_anext( iterator: AsyncIterator[T], default: Union[T, Any] = _no_default ) -> Awaitable[Union[T, None, Any]]: """用于测试目的的纯Python实现anext()。 与内置的anext() C实现非常接近。 可用于比较内部协程机制的内置实现与C实现的__anext__()和send()或throw()在返回的生成器上的情况。 """ try: __anext__ = cast( Callable[[AsyncIterator[T]], Awaitable[T]], type(iterator).__anext__ ) except AttributeError: raise TypeError(f"{iterator!r} is not an async iterator") if default is _no_default: return __anext__(iterator) async def anext_impl() -> Union[T, Any]: try: # The C code is way more low-level than this, as it implements # all methods of the iterator protocol. In this implementation # we're relying on higher-level coroutine concepts, but that's # exactly what we want -- crosstest pure-Python high-level # implementation and low-level C anext() iterators. return await __anext__(iterator) except StopAsyncIteration: return default return anext_impl()
[docs]class NoLock: """虚拟锁,提供正确的接口但不提供保护""" async def __aenter__(self) -> None: pass async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: return False
[docs]async def tee_peer( iterator: AsyncIterator[T], # the buffer specific to this peer buffer: Deque[T], # the buffers of all peers, including our own peers: List[Deque[T]], lock: AsyncContextManager[Any], ) -> AsyncGenerator[T, None]: """一个 :py:func:`~.tee` 的单个迭代器""" try: while True: if not buffer: async with lock: # Another peer produced an item while we were waiting for the lock. # Proceed with the next loop iteration to yield the item. if buffer: continue try: item = await iterator.__anext__() except StopAsyncIteration: break else: # Append to all buffers, including our own. We'll fetch our # item from the buffer again, instead of yielding it directly. # This ensures the proper item ordering if any of our peers # are fetching items concurrently. They may have buffered their # item already. for peer_buffer in peers: peer_buffer.append(item) yield buffer.popleft() finally: async with lock: # this peer is done – remove its buffer for idx, peer_buffer in enumerate(peers): # pragma: no branch if peer_buffer is buffer: peers.pop(idx) break # if we are the last peer, try and close the iterator if not peers and hasattr(iterator, "aclose"): await iterator.aclose()
[docs]class Tee(Generic[T]): """创建``n``个单独的异步迭代器,遍历``iterable``。 这将一个单一的``iterable``拆分为多个迭代器,每个迭代器以相同的顺序提供相同的项。 所有子迭代器可以独立前进,但共享来自``iterable``的相同项--当最先进的迭代器检索到一个项时, 它会被缓冲,直到最不先进的迭代器也将其产出。 ``tee``是惰性的,并且可以处理无限的``iterable``,只要所有迭代器都前进。 .. code-block:: python3 async def derivative(sensor_data): previous, current = a.tee(sensor_data, n=2) await a.anext(previous) # 前进一个迭代器 return a.map(operator.sub, previous, current) 与 :py:func:`itertools.tee` 不同,:py:func:`~.tee` 返回一个自定义类型而不是 :py:class:`tuple`。 像元组一样,它可以被索引、迭代和解包以获取子迭代器。此外,它的 :py:meth:`~.tee.aclose` 方法 会立即关闭所有子迭代器,并且可以在``async with``上下文中使用以达到相同效果。 如果``iterable``是一个迭代器并且在其他地方读取,``tee``将*不会*提供这些项。另外,``tee``必须在内部缓冲每个项,直到最后一个迭代器产出它; 如果最先进和最不先进的迭代器之间的数据差异很大,使用 :py:class:`list` 更有效(但不是惰性)。 如果底层的可迭代对象是并发安全的(``anext``可以同时被等待),那么生成的迭代器也是并发安全的。否则, 如果只有一个单一的“最先进”迭代器,那么迭代器是安全的。为了强制顺序使用``anext``,提供一个``lock`` - 例如在 :py:mod:`asyncio` 应用程序中提供一个 :py:class:`asyncio.Lock` 实例 - 访问会自动同步。 """
[docs] def __init__( self, iterable: AsyncIterator[T], n: int = 2, *, lock: Optional[AsyncContextManager[Any]] = None, ): self._iterator = iterable.__aiter__() # before 3.10 aiter() doesn't exist self._buffers: List[Deque[T]] = [deque() for _ in range(n)] self._children = tuple( tee_peer( iterator=self._iterator, buffer=buffer, peers=self._buffers, lock=lock if lock is not None else NoLock(), ) for buffer in self._buffers )
def __len__(self) -> int: return len(self._children) @overload def __getitem__(self, item: int) -> AsyncIterator[T]: ... @overload def __getitem__(self, item: slice) -> Tuple[AsyncIterator[T], ...]: ... def __getitem__( self, item: Union[int, slice] ) -> Union[AsyncIterator[T], Tuple[AsyncIterator[T], ...]]: return self._children[item] def __iter__(self) -> Iterator[AsyncIterator[T]]: yield from self._children async def __aenter__(self) -> "Tee[T]": return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: await self.aclose() return False
[docs] async def aclose(self) -> None: for child in self._children: await child.aclose()
atee = Tee