Tee#

class langchain_core.utils.aiter.Tee(iterable: AsyncIterator[T], n: int = 2, *, lock: AbstractAsyncContextManager[Any] | None = None)[source]#

创建 n 个独立的异步迭代器,用于 iterable

这将一个单一的iterable分割成多个迭代器,每个迭代器以相同的顺序提供相同的项目。 所有子迭代器可以独立前进,但共享来自iterable的相同项目——当最先进的迭代器检索一个项目时,它会被缓冲,直到最不先进的迭代器也产生了它。 一个tee是惰性的,并且可以处理无限的iterable,前提是所有迭代器都前进。

async def derivative(sensor_data):
    previous, current = a.tee(sensor_data, n=2)
    await a.anext(previous)  # advance one iterator
    return a.map(operator.sub, previous, current)

itertools.tee() 不同,tee() 返回一个自定义类型,而不是一个 tuple。与元组类似,它可以被索引、迭代和解包以获取子迭代器。此外,它的 aclose() 方法会立即关闭所有子迭代器,并且可以在 async with 上下文中使用以达到相同的效果。

如果 iterable 是一个迭代器并且在其他地方被读取,tee不会 提供这些项目。此外,tee 必须在内部缓冲每个项目,直到最后一个迭代器产生它;如果最先进和最不先进的迭代器在大多数数据上有所不同,使用 list 会更高效(但不是惰性的)。

如果底层可迭代对象是并发安全的(anext 可以并发等待),那么生成的迭代器也是并发安全的。否则,如果只有一个“最先进”的迭代器,那么迭代器是安全的。为了强制顺序使用 anext,可以提供一个 lock - 例如在 asyncio 应用程序中的 asyncio.Lock 实例 - 访问将自动同步。

方法

__init__(iterable[, n, lock])

aclose()

异步关闭所有子迭代器。

Parameters:
  • iterable (AsyncIterator[T])

  • n (整数)

  • lock (AbstractAsyncContextManager[Any] | None)

__init__(iterable: AsyncIterator[T], n: int = 2, *, lock: AbstractAsyncContextManager[Any] | None = None)[源代码]#
Parameters:
  • iterable (AsyncIterator[T])

  • n (整数)

  • lock (AbstractAsyncContextManager[Any] | None)

async aclose() None[source]#

异步关闭所有子迭代器。

Return type: