langchain_core.utils.iter.Tee

class langchain_core.utils.iter.Tee(iterable: Iterator[T], n: int = 2, *, lock: Optional[ContextManager[Any]] = None)[source]

创建``n``个单独的异步迭代器,遍历``iterable``

这将一个单一的``iterable``拆分为多个迭代器,每个迭代器按相同顺序提供相同的项。 所有子迭代器可以独立前进,但共享来自``iterable``的相同项–当最先进的迭代器检索到一个项时, 它会被缓冲,直到最不先进的迭代器也将其产出。 tee``是惰性的,并且可以处理无限的``iterable,只要所有迭代器都前进。

async def derivative(sensor_data):
    previous, current = a.tee(sensor_data, n=2)
    await a.anext(previous)  # 推进一个迭代器
    return a.map(operator.sub, previous, current)

itertools.tee() 不同,tee() 返回一个自定义类型而不是 tuple。 像元组一样,它可以被索引、迭代和解包以获取子迭代器。此外,它的 aclose() 方法 立即关闭所有子迭代器,并且可以在 async with 上下文中使用以达到相同效果。

如果``iterable``是一个在其他地方读取的迭代器,``tee``将*不会*提供这些项。另外,``tee``必须在内部缓冲每个项,直到最后一个迭代器产出它; 如果最先进和最不先进的迭代器之间的数据差异很大,使用 list 更有效(但不是惰性)。

如果底层的可迭代对象是并发安全的(anext 可以同时等待),则生成的迭代器也是并发安全的。否则, 如果只有一个单一的“最先进”迭代器,那么迭代器是安全的。为了强制顺序使用 anext,提供一个``lock`` - 例如在 asyncio 应用程序中的一个 asyncio.Lock 实例 - 访问将自动同步。

Methods

__init__(iterable[, n, lock])

close()

Parameters
  • iterable (Iterator[T]) –

  • n (int) –

  • lock (Optional[ContextManager[Any]]) –

__init__(iterable: Iterator[T], n: int = 2, *, lock: Optional[ContextManager[Any]] = None)[source]
Parameters
  • iterable (Iterator[T]) –

  • n (int) –

  • lock (Optional[ContextManager[Any]]) –

close() None[source]
Return type

None