langchain_core.utils.iter
.Tee¶
- class langchain_core.utils.iter.Tee(iterable: Iterator[T], n: int = 2, *, lock: Optional[ContextManager[Any]] = None)[source]¶
创建``n``个单独的异步迭代器,遍历``iterable``
这将一个单一的``iterable``拆分为多个迭代器,每个迭代器按相同顺序提供相同的项。 所有子迭代器可以独立前进,但共享来自``iterable``的相同项–当最先进的迭代器检索到一个项时, 它会被缓冲,直到最不先进的迭代器也将其产出。
tee``是惰性的,并且可以处理无限的``iterable
,只要所有迭代器都前进。async def derivative(sensor_data): previous, current = a.tee(sensor_data, n=2) await a.anext(previous) # 推进一个迭代器 return a.map(operator.sub, previous, current)
与
itertools.tee()
不同,tee()
返回一个自定义类型而不是tuple
。 像元组一样,它可以被索引、迭代和解包以获取子迭代器。此外,它的aclose()
方法 立即关闭所有子迭代器,并且可以在async with
上下文中使用以达到相同效果。如果``iterable``是一个在其他地方读取的迭代器,``tee``将*不会*提供这些项。另外,``tee``必须在内部缓冲每个项,直到最后一个迭代器产出它; 如果最先进和最不先进的迭代器之间的数据差异很大,使用
list
更有效(但不是惰性)。如果底层的可迭代对象是并发安全的(
anext
可以同时等待),则生成的迭代器也是并发安全的。否则, 如果只有一个单一的“最先进”迭代器,那么迭代器是安全的。为了强制顺序使用anext
,提供一个``lock`` - 例如在asyncio
应用程序中的一个asyncio.Lock
实例 - 访问将自动同步。Methods
__init__
(iterable[, n, lock])close
()- Parameters
iterable (Iterator[T]) –
n (int) –
lock (Optional[ContextManager[Any]]) –