"""
改编自
https://github.com/maxfischer2781/asyncstdlib/blob/master/asyncstdlib/itertools.py
MIT许可证
"""
from collections import deque
from typing import (
Any,
AsyncContextManager,
AsyncGenerator,
AsyncIterator,
Awaitable,
Callable,
Deque,
Generic,
Iterator,
List,
Optional,
Tuple,
TypeVar,
Union,
cast,
overload,
)
T = TypeVar("T")
_no_default = object()
# https://github.com/python/cpython/blob/main/Lib/test/test_asyncgen.py#L54
# before 3.10, the builtin anext() was not available
[docs]def py_anext(
iterator: AsyncIterator[T], default: Union[T, Any] = _no_default
) -> Awaitable[Union[T, None, Any]]:
"""用于测试目的的纯Python实现anext()。
与内置的anext() C实现非常接近。
可用于比较内部协程机制的内置实现与C实现的__anext__()和send()或throw()在返回的生成器上的情况。
"""
try:
__anext__ = cast(
Callable[[AsyncIterator[T]], Awaitable[T]], type(iterator).__anext__
)
except AttributeError:
raise TypeError(f"{iterator!r} is not an async iterator")
if default is _no_default:
return __anext__(iterator)
async def anext_impl() -> Union[T, Any]:
try:
# The C code is way more low-level than this, as it implements
# all methods of the iterator protocol. In this implementation
# we're relying on higher-level coroutine concepts, but that's
# exactly what we want -- crosstest pure-Python high-level
# implementation and low-level C anext() iterators.
return await __anext__(iterator)
except StopAsyncIteration:
return default
return anext_impl()
[docs]class NoLock:
"""虚拟锁,提供正确的接口但不提供保护"""
async def __aenter__(self) -> None:
pass
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
return False
[docs]async def tee_peer(
iterator: AsyncIterator[T],
# the buffer specific to this peer
buffer: Deque[T],
# the buffers of all peers, including our own
peers: List[Deque[T]],
lock: AsyncContextManager[Any],
) -> AsyncGenerator[T, None]:
"""一个 :py:func:`~.tee` 的单个迭代器"""
try:
while True:
if not buffer:
async with lock:
# Another peer produced an item while we were waiting for the lock.
# Proceed with the next loop iteration to yield the item.
if buffer:
continue
try:
item = await iterator.__anext__()
except StopAsyncIteration:
break
else:
# Append to all buffers, including our own. We'll fetch our
# item from the buffer again, instead of yielding it directly.
# This ensures the proper item ordering if any of our peers
# are fetching items concurrently. They may have buffered their
# item already.
for peer_buffer in peers:
peer_buffer.append(item)
yield buffer.popleft()
finally:
async with lock:
# this peer is done – remove its buffer
for idx, peer_buffer in enumerate(peers): # pragma: no branch
if peer_buffer is buffer:
peers.pop(idx)
break
# if we are the last peer, try and close the iterator
if not peers and hasattr(iterator, "aclose"):
await iterator.aclose()
[docs]class Tee(Generic[T]):
"""创建``n``个单独的异步迭代器,遍历``iterable``。
这将一个单一的``iterable``拆分为多个迭代器,每个迭代器以相同的顺序提供相同的项。
所有子迭代器可以独立前进,但共享来自``iterable``的相同项--当最先进的迭代器检索到一个项时,
它会被缓冲,直到最不先进的迭代器也将其产出。
``tee``是惰性的,并且可以处理无限的``iterable``,只要所有迭代器都前进。
.. code-block:: python3
async def derivative(sensor_data):
previous, current = a.tee(sensor_data, n=2)
await a.anext(previous) # 前进一个迭代器
return a.map(operator.sub, previous, current)
与 :py:func:`itertools.tee` 不同,:py:func:`~.tee` 返回一个自定义类型而不是 :py:class:`tuple`。
像元组一样,它可以被索引、迭代和解包以获取子迭代器。此外,它的 :py:meth:`~.tee.aclose` 方法
会立即关闭所有子迭代器,并且可以在``async with``上下文中使用以达到相同效果。
如果``iterable``是一个迭代器并且在其他地方读取,``tee``将*不会*提供这些项。另外,``tee``必须在内部缓冲每个项,直到最后一个迭代器产出它;
如果最先进和最不先进的迭代器之间的数据差异很大,使用 :py:class:`list` 更有效(但不是惰性)。
如果底层的可迭代对象是并发安全的(``anext``可以同时被等待),那么生成的迭代器也是并发安全的。否则,
如果只有一个单一的“最先进”迭代器,那么迭代器是安全的。为了强制顺序使用``anext``,提供一个``lock``
- 例如在 :py:mod:`asyncio` 应用程序中提供一个 :py:class:`asyncio.Lock` 实例 - 访问会自动同步。
"""
[docs] def __init__(
self,
iterable: AsyncIterator[T],
n: int = 2,
*,
lock: Optional[AsyncContextManager[Any]] = None,
):
self._iterator = iterable.__aiter__() # before 3.10 aiter() doesn't exist
self._buffers: List[Deque[T]] = [deque() for _ in range(n)]
self._children = tuple(
tee_peer(
iterator=self._iterator,
buffer=buffer,
peers=self._buffers,
lock=lock if lock is not None else NoLock(),
)
for buffer in self._buffers
)
def __len__(self) -> int:
return len(self._children)
@overload
def __getitem__(self, item: int) -> AsyncIterator[T]:
...
@overload
def __getitem__(self, item: slice) -> Tuple[AsyncIterator[T], ...]:
...
def __getitem__(
self, item: Union[int, slice]
) -> Union[AsyncIterator[T], Tuple[AsyncIterator[T], ...]]:
return self._children[item]
def __iter__(self) -> Iterator[AsyncIterator[T]]:
yield from self._children
async def __aenter__(self) -> "Tee[T]":
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
await self.aclose()
return False
[docs] async def aclose(self) -> None:
for child in self._children:
await child.aclose()
atee = Tee