langchain_core.utils.aiter
.Tee¶
- class langchain_core.utils.aiter.Tee(iterable: AsyncIterator[T], n: int = 2, *, lock: Optional[AsyncContextManager[Any]] = None)[source]¶
创建``n``个单独的异步迭代器,遍历``iterable``。
这将一个单一的``iterable``拆分为多个迭代器,每个迭代器以相同的顺序提供相同的项。 所有子迭代器可以独立前进,但共享来自``iterable``的相同项–当最先进的迭代器检索到一个项时, 它会被缓冲,直到最不先进的迭代器也将其产出。
tee``是惰性的,并且可以处理无限的``iterable
,只要所有迭代器都前进。async def derivative(sensor_data): previous, current = a.tee(sensor_data, n=2) await a.anext(previous) # 前进一个迭代器 return a.map(operator.sub, previous, current)
与
itertools.tee()
不同,tee()
返回一个自定义类型而不是tuple
。 像元组一样,它可以被索引、迭代和解包以获取子迭代器。此外,它的aclose()
方法 会立即关闭所有子迭代器,并且可以在``async with``上下文中使用以达到相同效果。如果``iterable``是一个迭代器并且在其他地方读取,``tee``将*不会*提供这些项。另外,``tee``必须在内部缓冲每个项,直到最后一个迭代器产出它; 如果最先进和最不先进的迭代器之间的数据差异很大,使用
list
更有效(但不是惰性)。如果底层的可迭代对象是并发安全的(
anext``可以同时被等待),那么生成的迭代器也是并发安全的。否则, 如果只有一个单一的“最先进”迭代器,那么迭代器是安全的。为了强制顺序使用``anext
,提供一个``lock`` - 例如在asyncio
应用程序中提供一个asyncio.Lock
实例 - 访问会自动同步。Methods
__init__
(iterable[, n, lock])aclose
()- Parameters
iterable (AsyncIterator[T]) –
n (int) –
lock (Optional[AsyncContextManager[Any]]) –