Skip to content

Span handlers

BaseSpanHandler #

Bases: BaseModel, Generic[T]

Source code in llama_index/core/instrumentation/span_handlers/base.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class BaseSpanHandler(BaseModel, Generic[T]):
    open_spans: Dict[str, T] = Field(
        default_factory=dict, description="Dictionary of open spans."
    )
    completed_spans: List[T] = Field(
        default_factory=list, description="List of completed spans."
    )
    dropped_spans: List[T] = Field(
        default_factory=list, description="List of completed spans."
    )
    current_span_ids: Dict[Any, Optional[str]] = Field(
        default={}, description="Id of current spans in a given thread."
    )
    _lock: Optional[threading.Lock] = PrivateAttr()

    class Config:
        arbitrary_types_allowed = True

    def __init__(
        self,
        open_spans: Dict[str, T] = {},
        completed_spans: List[T] = [],
        dropped_spans: List[T] = [],
        current_span_ids: Dict[Any, str] = {},
    ):
        self._lock = None
        super().__init__(
            open_spans=open_spans,
            completed_spans=completed_spans,
            dropped_spans=dropped_spans,
            current_span_ids=current_span_ids,
        )

    def class_name(cls) -> str:
        """类名。"""
        return "BaseSpanHandler"

    @property
    def lock(self) -> threading.Lock:
        if self._lock is None:
            self._lock = threading.Lock()
        return self._lock

    @property
    def current_span_id(self) -> Optional[str]:
        current_thread = threading.get_ident()
        if current_thread in self.current_span_ids:
            return self.current_span_ids[current_thread]
        return None

    def set_current_span_id(self, value: str) -> None:
        current_thread = threading.get_ident()
        self.current_span_ids[current_thread] = value

    def span_enter(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        parent_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """进入跨度的逻辑。"""
        if id_ in self.open_spans:
            pass  # should probably raise an error here
        else:
            # TODO: thread safe?
            span = self.new_span(
                id_=id_,
                bound_args=bound_args,
                instance=instance,
                parent_span_id=parent_id or self.current_span_id,
            )
            if span:
                with self.lock:
                    self.open_spans[id_] = span
                    self.set_current_span_id(id_)

    def span_exit(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        result: Optional[Any] = None,
        **kwargs: Any,
    ) -> None:
        """退出span的逻辑。"""
        span = self.prepare_to_exit_span(
            id_=id_, bound_args=bound_args, instance=instance, result=result
        )
        if span:
            with self.lock:
                if self.current_span_id == id_:
                    self.set_current_span_id(self.open_spans[id_].parent_id)
                del self.open_spans[id_]
        if not self.open_spans:  # empty so flush
            with self.lock:
                self.set_current_span_id(None)

    def span_drop(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        err: Optional[BaseException] = None,
        **kwargs: Any,
    ) -> None:
        """逻辑用于丢弃一个跨度,即提前退出。"""
        span = self.prepare_to_drop_span(
            id_=id_, bound_args=bound_args, instance=instance, err=err
        )
        if span:
            with self.lock:
                if self.current_span_id == id_:
                    self.set_current_span_id(self.open_spans[id_].parent_id)
                del self.open_spans[id_]

    @abstractmethod
    def new_span(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        parent_span_id: Optional[str] = None,
        **kwargs: Any,
    ) -> Optional[T]:
        """创建一个span。

BaseSpanHandler的子类应该创建相应的span类型T并返回它。只有NullSpanHandler应该在这里返回None。
"""
        ...

    @abstractmethod
    def prepare_to_exit_span(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        result: Optional[Any] = None,
        **kwargs: Any,
    ) -> Optional[T]:
        """退出跨度的准备逻辑。

BaseSpanHandler的子类应该返回要退出的特定跨度T。如果返回None,则实际上不会退出该跨度。
"""
        ...

    @abstractmethod
    def prepare_to_drop_span(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        err: Optional[BaseException] = None,
        **kwargs: Any,
    ) -> Optional[T]:
        """准备放弃跨度的逻辑。

BaseSpanHandler的子类应该返回要放弃的特定跨度T。如果返回None,则实际上不会放弃该跨度。
"""
        ...

class_name #

class_name() -> str

类名。

Source code in llama_index/core/instrumentation/span_handlers/base.py
45
46
47
def class_name(cls) -> str:
    """类名。"""
    return "BaseSpanHandler"

span_enter #

span_enter(
    id_: str,
    bound_args: BoundArguments,
    instance: Optional[Any] = None,
    parent_id: Optional[str] = None,
    **kwargs: Any
) -> None

进入跨度的逻辑。

Source code in llama_index/core/instrumentation/span_handlers/base.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def span_enter(
    self,
    id_: str,
    bound_args: inspect.BoundArguments,
    instance: Optional[Any] = None,
    parent_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    """进入跨度的逻辑。"""
    if id_ in self.open_spans:
        pass  # should probably raise an error here
    else:
        # TODO: thread safe?
        span = self.new_span(
            id_=id_,
            bound_args=bound_args,
            instance=instance,
            parent_span_id=parent_id or self.current_span_id,
        )
        if span:
            with self.lock:
                self.open_spans[id_] = span
                self.set_current_span_id(id_)

span_exit #

span_exit(
    id_: str,
    bound_args: BoundArguments,
    instance: Optional[Any] = None,
    result: Optional[Any] = None,
    **kwargs: Any
) -> None

退出span的逻辑。

Source code in llama_index/core/instrumentation/span_handlers/base.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def span_exit(
    self,
    id_: str,
    bound_args: inspect.BoundArguments,
    instance: Optional[Any] = None,
    result: Optional[Any] = None,
    **kwargs: Any,
) -> None:
    """退出span的逻辑。"""
    span = self.prepare_to_exit_span(
        id_=id_, bound_args=bound_args, instance=instance, result=result
    )
    if span:
        with self.lock:
            if self.current_span_id == id_:
                self.set_current_span_id(self.open_spans[id_].parent_id)
            del self.open_spans[id_]
    if not self.open_spans:  # empty so flush
        with self.lock:
            self.set_current_span_id(None)

span_drop #

span_drop(
    id_: str,
    bound_args: BoundArguments,
    instance: Optional[Any] = None,
    err: Optional[BaseException] = None,
    **kwargs: Any
) -> None

逻辑用于丢弃一个跨度,即提前退出。

Source code in llama_index/core/instrumentation/span_handlers/base.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def span_drop(
    self,
    id_: str,
    bound_args: inspect.BoundArguments,
    instance: Optional[Any] = None,
    err: Optional[BaseException] = None,
    **kwargs: Any,
) -> None:
    """逻辑用于丢弃一个跨度,即提前退出。"""
    span = self.prepare_to_drop_span(
        id_=id_, bound_args=bound_args, instance=instance, err=err
    )
    if span:
        with self.lock:
            if self.current_span_id == id_:
                self.set_current_span_id(self.open_spans[id_].parent_id)
            del self.open_spans[id_]

new_span abstractmethod #

new_span(
    id_: str,
    bound_args: BoundArguments,
    instance: Optional[Any] = None,
    parent_span_id: Optional[str] = None,
    **kwargs: Any
) -> Optional[T]

创建一个span。

BaseSpanHandler的子类应该创建相应的span类型T并返回它。只有NullSpanHandler应该在这里返回None。

Source code in llama_index/core/instrumentation/span_handlers/base.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
    @abstractmethod
    def new_span(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        parent_span_id: Optional[str] = None,
        **kwargs: Any,
    ) -> Optional[T]:
        """创建一个span。

BaseSpanHandler的子类应该创建相应的span类型T并返回它。只有NullSpanHandler应该在这里返回None。
"""
        ...

prepare_to_exit_span abstractmethod #

prepare_to_exit_span(
    id_: str,
    bound_args: BoundArguments,
    instance: Optional[Any] = None,
    result: Optional[Any] = None,
    **kwargs: Any
) -> Optional[T]

退出跨度的准备逻辑。

BaseSpanHandler的子类应该返回要退出的特定跨度T。如果返回None,则实际上不会退出该跨度。

Source code in llama_index/core/instrumentation/span_handlers/base.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
    @abstractmethod
    def prepare_to_exit_span(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        result: Optional[Any] = None,
        **kwargs: Any,
    ) -> Optional[T]:
        """退出跨度的准备逻辑。

BaseSpanHandler的子类应该返回要退出的特定跨度T。如果返回None,则实际上不会退出该跨度。
"""
        ...

prepare_to_drop_span abstractmethod #

prepare_to_drop_span(
    id_: str,
    bound_args: BoundArguments,
    instance: Optional[Any] = None,
    err: Optional[BaseException] = None,
    **kwargs: Any
) -> Optional[T]

准备放弃跨度的逻辑。

BaseSpanHandler的子类应该返回要放弃的特定跨度T。如果返回None,则实际上不会放弃该跨度。

Source code in llama_index/core/instrumentation/span_handlers/base.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
    @abstractmethod
    def prepare_to_drop_span(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        err: Optional[BaseException] = None,
        **kwargs: Any,
    ) -> Optional[T]:
        """准备放弃跨度的逻辑。

BaseSpanHandler的子类应该返回要放弃的特定跨度T。如果返回None,则实际上不会放弃该跨度。
"""
        ...

SimpleSpanHandler #

Bases: BaseSpanHandler[SimpleSpan]

跨度处理程序,管理SimpleSpan。

Source code in llama_index/core/instrumentation/span_handlers/simple.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class SimpleSpanHandler(BaseSpanHandler[SimpleSpan]):
    """跨度处理程序,管理SimpleSpan。"""

    def class_name(cls) -> str:
        """类名。"""
        return "SimpleSpanHandler"

    def new_span(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        parent_span_id: Optional[str] = None,
        **kwargs: Any,
    ) -> SimpleSpan:
        """创建一个跨度。"""
        return SimpleSpan(id_=id_, parent_id=parent_span_id)

    def prepare_to_exit_span(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        result: Optional[Any] = None,
        **kwargs: Any,
    ) -> SimpleSpan:
        """准备删除跨度的逻辑。"""
        span = self.open_spans[id_]
        span = cast(SimpleSpan, span)
        span.end_time = datetime.now()
        span.duration = (span.end_time - span.start_time).total_seconds()
        with self.lock:
            self.completed_spans += [span]
        return span

    def prepare_to_drop_span(
        self,
        id_: str,
        bound_args: inspect.BoundArguments,
        instance: Optional[Any] = None,
        err: Optional[BaseException] = None,
        **kwargs: Any,
    ) -> SimpleSpan:
        """删除跨度的逻辑。"""
        if id_ in self.open_spans:
            with self.lock:
                span = self.open_spans[id_]
                span.metadata = {"error": str(err)}
                self.dropped_spans += [span]
            return span

        return None

    def _get_parents(self) -> List[SimpleSpan]:
        """辅助方法,用于获取所有父/根跨度。"""
        all_spans = self.completed_spans + self.dropped_spans
        return [s for s in all_spans if s.parent_id is None]

    def _build_tree_by_parent(
        self, parent: SimpleSpan, acc: List[SimpleSpan], spans: List[SimpleSpan]
    ) -> List[SimpleSpan]:
        """通过根节点构建树。"""
        if not spans:
            return acc

        children = [s for s in spans if s.parent_id == parent.id_]
        if not children:
            return acc
        updated_spans = [s for s in spans if s not in children]

        children_trees = [
            self._build_tree_by_parent(
                parent=c, acc=[c], spans=[s for s in updated_spans if c != s]
            )
            for c in children
        ]

        return acc + reduce(lambda x, y: x + y, children_trees)

    def _get_trace_trees(self) -> List["Tree"]:
        """获取跟踪树的方法。"""
        try:
            from treelib import Tree
        except ImportError as e:
            raise ImportError(
                "`treelib` package is missing. Please install it by using "
                "`pip install treelib`."
            )

        all_spans = self.completed_spans + self.dropped_spans
        for s in all_spans:
            if s.parent_id is None:
                continue
            if not any(ns.id_ == s.parent_id for ns in all_spans):
                warnings.warn("Parent with id {span.parent_id} missing from spans")
                s.parent_id += "-MISSING"
                all_spans.append(SimpleSpan(id_=s.parent_id, parent_id=None))

        parents = self._get_parents()
        span_groups = []
        for p in parents:
            this_span_group = self._build_tree_by_parent(
                parent=p, acc=[p], spans=[s for s in all_spans if s != p]
            )
            sorted_span_group = sorted(this_span_group, key=lambda x: x.start_time)
            span_groups.append(sorted_span_group)

        trees = []
        tree = Tree()
        for grp in span_groups:
            for span in grp:
                if span.parent_id is None:
                    # complete old tree unless its empty (i.e., start of loop)
                    if tree.all_nodes():
                        trees.append(tree)
                        # start new tree
                        tree = Tree()

                tree.create_node(
                    tag=f"{span.id_} ({span.duration})",
                    identifier=span.id_,
                    parent=span.parent_id,
                    data=span.start_time,
                )

        trees.append(tree)
        return trees

    def print_trace_trees(self) -> None:
        """查看跟踪树的方法。"""
        trees = self._get_trace_trees()
        for tree in trees:
            print(tree.show(stdout=False, sorting=True, key=lambda node: node.data))
            print("")

class_name #

class_name() -> str

类名。

Source code in llama_index/core/instrumentation/span_handlers/simple.py
16
17
18
def class_name(cls) -> str:
    """类名。"""
    return "SimpleSpanHandler"

new_span #

new_span(
    id_: str,
    bound_args: BoundArguments,
    instance: Optional[Any] = None,
    parent_span_id: Optional[str] = None,
    **kwargs: Any
) -> SimpleSpan

创建一个跨度。

Source code in llama_index/core/instrumentation/span_handlers/simple.py
20
21
22
23
24
25
26
27
28
29
def new_span(
    self,
    id_: str,
    bound_args: inspect.BoundArguments,
    instance: Optional[Any] = None,
    parent_span_id: Optional[str] = None,
    **kwargs: Any,
) -> SimpleSpan:
    """创建一个跨度。"""
    return SimpleSpan(id_=id_, parent_id=parent_span_id)

prepare_to_exit_span #

prepare_to_exit_span(
    id_: str,
    bound_args: BoundArguments,
    instance: Optional[Any] = None,
    result: Optional[Any] = None,
    **kwargs: Any
) -> SimpleSpan

准备删除跨度的逻辑。

Source code in llama_index/core/instrumentation/span_handlers/simple.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def prepare_to_exit_span(
    self,
    id_: str,
    bound_args: inspect.BoundArguments,
    instance: Optional[Any] = None,
    result: Optional[Any] = None,
    **kwargs: Any,
) -> SimpleSpan:
    """准备删除跨度的逻辑。"""
    span = self.open_spans[id_]
    span = cast(SimpleSpan, span)
    span.end_time = datetime.now()
    span.duration = (span.end_time - span.start_time).total_seconds()
    with self.lock:
        self.completed_spans += [span]
    return span

prepare_to_drop_span #

prepare_to_drop_span(
    id_: str,
    bound_args: BoundArguments,
    instance: Optional[Any] = None,
    err: Optional[BaseException] = None,
    **kwargs: Any
) -> SimpleSpan

删除跨度的逻辑。

Source code in llama_index/core/instrumentation/span_handlers/simple.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def prepare_to_drop_span(
    self,
    id_: str,
    bound_args: inspect.BoundArguments,
    instance: Optional[Any] = None,
    err: Optional[BaseException] = None,
    **kwargs: Any,
) -> SimpleSpan:
    """删除跨度的逻辑。"""
    if id_ in self.open_spans:
        with self.lock:
            span = self.open_spans[id_]
            span.metadata = {"error": str(err)}
            self.dropped_spans += [span]
        return span

    return None

print_trace_trees #

print_trace_trees() -> None

查看跟踪树的方法。

Source code in llama_index/core/instrumentation/span_handlers/simple.py
141
142
143
144
145
146
def print_trace_trees(self) -> None:
    """查看跟踪树的方法。"""
    trees = self._get_trace_trees()
    for tree in trees:
        print(tree.show(stdout=False, sorting=True, key=lambda node: node.data))
        print("")