from collections import deque
from itertools import islice
from typing import (
Any,
ContextManager,
Deque,
Generator,
Generic,
Iterable,
Iterator,
List,
Optional,
Tuple,
TypeVar,
Union,
overload,
)
from typing_extensions import Literal
T = TypeVar("T")
[docs]class NoLock:
"""虚拟锁,提供正确的接口但没有保护"""
def __enter__(self) -> None:
pass
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]:
return False
[docs]def tee_peer(
iterator: Iterator[T],
# the buffer specific to this peer
buffer: Deque[T],
# the buffers of all peers, including our own
peers: List[Deque[T]],
lock: ContextManager[Any],
) -> Generator[T, None, None]:
"""一个 :py:func:`~.tee` 的单个迭代器"""
try:
while True:
if not buffer:
with lock:
# Another peer produced an item while we were waiting for the lock.
# Proceed with the next loop iteration to yield the item.
if buffer:
continue
try:
item = next(iterator)
except StopIteration:
break
else:
# Append to all buffers, including our own. We'll fetch our
# item from the buffer again, instead of yielding it directly.
# This ensures the proper item ordering if any of our peers
# are fetching items concurrently. They may have buffered their
# item already.
for peer_buffer in peers:
peer_buffer.append(item)
yield buffer.popleft()
finally:
with lock:
# this peer is done – remove its buffer
for idx, peer_buffer in enumerate(peers): # pragma: no branch
if peer_buffer is buffer:
peers.pop(idx)
break
# if we are the last peer, try and close the iterator
if not peers and hasattr(iterator, "close"):
iterator.close()
[docs]class Tee(Generic[T]):
"""创建``n``个单独的异步迭代器,遍历``iterable``
这将一个单一的``iterable``拆分为多个迭代器,每个迭代器按相同顺序提供相同的项。
所有子迭代器可以独立前进,但共享来自``iterable``的相同项--当最先进的迭代器检索到一个项时,
它会被缓冲,直到最不先进的迭代器也将其产出。
``tee``是惰性的,并且可以处理无限的``iterable``,只要所有迭代器都前进。
.. code-block:: python3
async def derivative(sensor_data):
previous, current = a.tee(sensor_data, n=2)
await a.anext(previous) # 推进一个迭代器
return a.map(operator.sub, previous, current)
与 :py:func:`itertools.tee` 不同,:py:func:`~.tee` 返回一个自定义类型而不是 :py:class:`tuple`。
像元组一样,它可以被索引、迭代和解包以获取子迭代器。此外,它的 :py:meth:`~.tee.aclose` 方法
立即关闭所有子迭代器,并且可以在 ``async with`` 上下文中使用以达到相同效果。
如果``iterable``是一个在其他地方读取的迭代器,``tee``将*不会*提供这些项。另外,``tee``必须在内部缓冲每个项,直到最后一个迭代器产出它;
如果最先进和最不先进的迭代器之间的数据差异很大,使用 :py:class:`list` 更有效(但不是惰性)。
如果底层的可迭代对象是并发安全的(``anext`` 可以同时等待),则生成的迭代器也是并发安全的。否则,
如果只有一个单一的“最先进”迭代器,那么迭代器是安全的。为了强制顺序使用 ``anext``,提供一个``lock``
- 例如在 :py:mod:`asyncio` 应用程序中的一个 :py:class:`asyncio.Lock` 实例 - 访问将自动同步。
"""
[docs] def __init__(
self,
iterable: Iterator[T],
n: int = 2,
*,
lock: Optional[ContextManager[Any]] = None,
):
self._iterator = iter(iterable)
self._buffers: List[Deque[T]] = [deque() for _ in range(n)]
self._children = tuple(
tee_peer(
iterator=self._iterator,
buffer=buffer,
peers=self._buffers,
lock=lock if lock is not None else NoLock(),
)
for buffer in self._buffers
)
def __len__(self) -> int:
return len(self._children)
@overload
def __getitem__(self, item: int) -> Iterator[T]:
...
@overload
def __getitem__(self, item: slice) -> Tuple[Iterator[T], ...]:
...
def __getitem__(
self, item: Union[int, slice]
) -> Union[Iterator[T], Tuple[Iterator[T], ...]]:
return self._children[item]
def __iter__(self) -> Iterator[Iterator[T]]:
yield from self._children
def __enter__(self) -> "Tee[T]":
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]:
self.close()
return False
[docs] def close(self) -> None:
for child in self._children:
child.close()
# Why this is needed https://stackoverflow.com/a/44638570
safetee = Tee
[docs]def batch_iterate(size: Optional[int], iterable: Iterable[T]) -> Iterator[List[T]]:
"""实用的批处理函数。
参数:
size:批次的大小。如果为None,则返回一个单一的批次。
iterable:要进行批处理的可迭代对象。
返回:
批次的迭代器。
"""
it = iter(iterable)
while True:
chunk = list(islice(it, size))
if not chunk:
return
yield chunk