Tee#
- class langchain_core.utils.iter.Tee(iterable: Iterator[T], n: int = 2, *, lock: AbstractContextManager[Any] | None = None)[源代码]#
创建
n
个独立的异步迭代器,用于iterable
这将一个单一的
iterable
分割成多个迭代器,每个迭代器以相同的顺序提供相同的项目。 所有子迭代器可以独立前进,但共享来自iterable
的相同项目——当最先进的迭代器检索一个项目时,它会被缓冲,直到最不先进的迭代器也产生了它。 一个tee
是惰性的,并且可以处理无限的iterable
,前提是所有迭代器都前进。async def derivative(sensor_data): previous, current = a.tee(sensor_data, n=2) await a.anext(previous) # advance one iterator return a.map(operator.sub, previous, current)
与
itertools.tee()
不同,tee()
返回一个自定义类型,而不是一个tuple
。与元组类似,它可以被索引、迭代和解包以获取子迭代器。此外,它的aclose()
方法会立即关闭所有子迭代器,并且可以在async with
上下文中使用以达到相同的效果。如果
iterable
是一个迭代器并且在其他地方被读取,tee
将 不会 提供这些项目。此外,tee
必须在内部缓冲每个项目,直到最后一个迭代器产生它;如果最先进和最不先进的迭代器在大多数数据上有所不同,使用list
会更高效(但不是惰性的)。如果底层可迭代对象是并发安全的(
anext
可以并发等待),那么生成的迭代器也是并发安全的。否则,如果只有一个“最先进”的迭代器,那么迭代器是安全的。为了强制顺序使用anext
,可以提供一个lock
- 例如在asyncio
应用程序中的asyncio.Lock
实例 - 访问将自动同步。创建一个新的
tee
。- Parameters:
iterable (Iterator[T]) – 要分割的可迭代对象。
n (int) – 要创建的迭代器数量。默认为2。
lock (AbstractContextManager[Any] | None) – 用于同步访问共享缓冲区的锁。默认为None。
方法