Source code for langchain_core.utils.iter

from collections import deque
from itertools import islice
from typing import (
    Any,
    ContextManager,
    Deque,
    Generator,
    Generic,
    Iterable,
    Iterator,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
    overload,
)

from typing_extensions import Literal

T = TypeVar("T")


[docs]class NoLock: """虚拟锁,提供正确的接口但没有保护""" def __enter__(self) -> None: pass def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]: return False
[docs]def tee_peer( iterator: Iterator[T], # the buffer specific to this peer buffer: Deque[T], # the buffers of all peers, including our own peers: List[Deque[T]], lock: ContextManager[Any], ) -> Generator[T, None, None]: """一个 :py:func:`~.tee` 的单个迭代器""" try: while True: if not buffer: with lock: # Another peer produced an item while we were waiting for the lock. # Proceed with the next loop iteration to yield the item. if buffer: continue try: item = next(iterator) except StopIteration: break else: # Append to all buffers, including our own. We'll fetch our # item from the buffer again, instead of yielding it directly. # This ensures the proper item ordering if any of our peers # are fetching items concurrently. They may have buffered their # item already. for peer_buffer in peers: peer_buffer.append(item) yield buffer.popleft() finally: with lock: # this peer is done – remove its buffer for idx, peer_buffer in enumerate(peers): # pragma: no branch if peer_buffer is buffer: peers.pop(idx) break # if we are the last peer, try and close the iterator if not peers and hasattr(iterator, "close"): iterator.close()
[docs]class Tee(Generic[T]): """创建``n``个单独的异步迭代器,遍历``iterable`` 这将一个单一的``iterable``拆分为多个迭代器,每个迭代器按相同顺序提供相同的项。 所有子迭代器可以独立前进,但共享来自``iterable``的相同项--当最先进的迭代器检索到一个项时, 它会被缓冲,直到最不先进的迭代器也将其产出。 ``tee``是惰性的,并且可以处理无限的``iterable``,只要所有迭代器都前进。 .. code-block:: python3 async def derivative(sensor_data): previous, current = a.tee(sensor_data, n=2) await a.anext(previous) # 推进一个迭代器 return a.map(operator.sub, previous, current) 与 :py:func:`itertools.tee` 不同,:py:func:`~.tee` 返回一个自定义类型而不是 :py:class:`tuple`。 像元组一样,它可以被索引、迭代和解包以获取子迭代器。此外,它的 :py:meth:`~.tee.aclose` 方法 立即关闭所有子迭代器,并且可以在 ``async with`` 上下文中使用以达到相同效果。 如果``iterable``是一个在其他地方读取的迭代器,``tee``将*不会*提供这些项。另外,``tee``必须在内部缓冲每个项,直到最后一个迭代器产出它; 如果最先进和最不先进的迭代器之间的数据差异很大,使用 :py:class:`list` 更有效(但不是惰性)。 如果底层的可迭代对象是并发安全的(``anext`` 可以同时等待),则生成的迭代器也是并发安全的。否则, 如果只有一个单一的“最先进”迭代器,那么迭代器是安全的。为了强制顺序使用 ``anext``,提供一个``lock`` - 例如在 :py:mod:`asyncio` 应用程序中的一个 :py:class:`asyncio.Lock` 实例 - 访问将自动同步。 """
[docs] def __init__( self, iterable: Iterator[T], n: int = 2, *, lock: Optional[ContextManager[Any]] = None, ): self._iterator = iter(iterable) self._buffers: List[Deque[T]] = [deque() for _ in range(n)] self._children = tuple( tee_peer( iterator=self._iterator, buffer=buffer, peers=self._buffers, lock=lock if lock is not None else NoLock(), ) for buffer in self._buffers )
def __len__(self) -> int: return len(self._children) @overload def __getitem__(self, item: int) -> Iterator[T]: ... @overload def __getitem__(self, item: slice) -> Tuple[Iterator[T], ...]: ... def __getitem__( self, item: Union[int, slice] ) -> Union[Iterator[T], Tuple[Iterator[T], ...]]: return self._children[item] def __iter__(self) -> Iterator[Iterator[T]]: yield from self._children def __enter__(self) -> "Tee[T]": return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]: self.close() return False
[docs] def close(self) -> None: for child in self._children: child.close()
# Why this is needed https://stackoverflow.com/a/44638570 safetee = Tee
[docs]def batch_iterate(size: Optional[int], iterable: Iterable[T]) -> Iterator[List[T]]: """实用的批处理函数。 参数: size:批次的大小。如果为None,则返回一个单一的批次。 iterable:要进行批处理的可迭代对象。 返回: 批次的迭代器。 """ it = iter(iterable) while True: chunk = list(islice(it, size)) if not chunk: return yield chunk