Skip to main content

通过检索增强生成进行群聊

在 Colab 中打开 在 GitHub 上打开

AutoGen 支持由 LLMs、工具或人类驱动的可对话代理,通过自动聊天共同执行任务。该框架允许通过多代理对话进行工具使用和人类参与。有关此功能的文档,请参阅此处

要求

此笔记本需要一些额外的依赖项,可以通过 pip 安装:

pip install pyautogen[retrievechat]

有关更多信息,请参阅安装指南

设置 API 端点

config_list_from_json 函数从环境变量或 json 文件中加载配置列表。

import chromadb
from typing_extensions import Annotated

import autogen
from autogen import AssistantAgent
from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent

config_list = autogen.config_list_from_json("OAI_CONFIG_LIST")

print("LLM models: ", [config_list[i]["model"] for i in range(len(config_list))])
LLM models:  ['gpt4-1106-preview', 'gpt-35-turbo', 'gpt-35-turbo-0613']
tip

了解有关为代理配置 LLM 的更多信息,请参阅此处

构建代理

def termination_msg(x):
return isinstance(x, dict) and "TERMINATE" == str(x.get("content", ""))[-9:].upper()


llm_config = {"config_list": config_list, "timeout": 60, "temperature": 0.8, "seed": 1234}

boss = autogen.UserProxyAgent(
name="Boss",
is_termination_msg=termination_msg,
human_input_mode="NEVER",
code_execution_config=False, # we don't want to execute code in this case.
default_auto_reply="Reply `TERMINATE` if the task is done.",
description="The boss who ask questions and give tasks.",
)

boss_aid = RetrieveUserProxyAgent(
name="Boss_Assistant",
is_termination_msg=termination_msg,
human_input_mode="NEVER",
default_auto_reply="Reply `TERMINATE` if the task is done.",
max_consecutive_auto_reply=3,
retrieve_config={
"task": "code",
"docs_path": "https://raw.githubusercontent.com/microsoft/FLAML/main/website/docs/Examples/Integrate%20-%20Spark.md",
"chunk_token_size": 1000,
"model": config_list[0]["model"],
"collection_name": "groupchat",
"get_or_create": True,
},
code_execution_config=False, # we don't want to execute code in this case.
description="Assistant who has extra content retrieval power for solving difficult problems.",
)

coder = AssistantAgent(
name="Senior_Python_Engineer",
is_termination_msg=termination_msg,
system_message="You are a senior python engineer, you provide python code to answer questions. Reply `TERMINATE` in the end when everything is done.",
llm_config=llm_config,
description="Senior Python Engineer who can write code to solve problems and answer questions.",
)

pm = autogen.AssistantAgent(
name="Product_Manager",
is_termination_msg=termination_msg,
system_message="You are a product manager. Reply `TERMINATE` in the end when everything is done.",
llm_config=llm_config,
description="Product Manager who can design and plan the project.",
)

reviewer = autogen.AssistantAgent(
name="Code_Reviewer",
is_termination_msg=termination_msg,
system_message="You are a code reviewer. Reply `TERMINATE` in the end when everything is done.",
llm_config=llm_config,
description="Code Reviewer who can review the code.",
)

PROBLEM = "How to use spark for parallel training in FLAML? Give me sample code."


def _reset_agents():
boss.reset()
boss_aid.reset()
coder.reset()
pm.reset()
reviewer.reset()


def rag_chat():
_reset_agents()
groupchat = autogen.GroupChat(
agents=[boss_aid, pm, coder, reviewer], messages=[], max_round=12, speaker_selection_method="round_robin"
)
manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)

# Start chatting with boss_aid as this is the user proxy agent.
boss_aid.initiate_chat(
manager,
message=boss_aid.message_generator,
problem=PROBLEM,
n_results=3,
)


def norag_chat():
_reset_agents()
groupchat = autogen.GroupChat(
agents=[boss, pm, coder, reviewer],
messages=[],
max_round=12,
speaker_selection_method="auto",
allow_repeat_speaker=False,
)
manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)

# Start chatting with the boss as this is the user proxy agent.
boss.initiate_chat(
manager,
message=PROBLEM,
)


def call_rag_chat():
_reset_agents()

# In this case, we will have multiple user proxy agents and we don't initiate the chat
# with RAG user proxy agent.
# In order to use RAG user proxy agent, we need to wrap RAG agents in a function and call
# it from other agents.
def retrieve_content(
message: Annotated[
str,
"Refined message which keeps the original meaning and can be used to retrieve content for code generation and question answering.",
],
n_results: Annotated[int, "number of results"] = 3,
) -> str:
boss_aid.n_results = n_results # Set the number of results to be retrieved.
# Check if we need to update the context.
update_context_case1, update_context_case2 = boss_aid._check_update_context(message)
if (update_context_case1 or update_context_case2) and boss_aid.update_context:
boss_aid.problem = message if not hasattr(boss_aid, "problem") else boss_aid.problem
_, ret_msg = boss_aid._generate_retrieve_user_reply(message)
else:
_context = {"problem": message, "n_results": n_results}
ret_msg = boss_aid.message_generator(boss_aid, None, _context)
return ret_msg if ret_msg else message

boss_aid.human_input_mode = "NEVER" # Disable human input for boss_aid since it only retrieves content.

for caller in [pm, coder, reviewer]:
d_retrieve_content = caller.register_for_llm(
description="retrieve content for code generation and question answering.", api_style="function"
)(retrieve_content)

for executor in [boss, pm]:
executor.register_for_execution()(d_retrieve_content)

groupchat = autogen.GroupChat(
agents=[boss, pm, coder, reviewer],
messages=[],
max_round=12,
speaker_selection_method="round_robin",
allow_repeat_speaker=False,
)

manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)

# Start chatting with the boss as this is the user proxy agent.
boss.initiate_chat(
manager,
message=PROBLEM,
)
!pip install flaml pyspark joblib-spark

首先,确保你已经设置好了 Spark 集群,并在你的环境中安装了 pysparkjoblib-spark 包。如果它们尚未安装,你可以使用 pip 进行安装:

!pip install flaml pyspark joblib-spark

下面是一个示例代码片段,演示了如何在 FLAML 中使用 Spark 进行并行训练:

from flaml import AutoML
from pyspark.sql import SparkSession
from sklearn.datasets import load_digits
from joblibspark import register_spark

# 初始化 Spark 会话
spark = SparkSession.builder \
.master("local[*]") \
.appName("FLAML_Spark_Example") \
.getOrCreate()

# 注册 joblib 的 Spark 后端
register_spark() # 这会注册用于并行处理的后端

# 加载示例数据
X, y = load_digits(return_X_y=True)

# 初始化 AutoML 实例
automl = AutoML()

# 定义 AutoML 运行的设置
settings = {
"time_budget": 60, # 总运行时间(秒)
"metric": 'accuracy', # 评估的主要指标
"task": 'classification', # 任务类型
"n_jobs": -1, # 并行运行的作业数(使用 -1 表示全部)
"estimator_list": ['lgbm', 'rf', 'xgboost'], # 要考虑的估计器列表
"log_file_name": "flaml_log.txt", # 日志文件名
}

# 使用 Spark 后端运行 AutoML 搜索
automl.fit(X_train=X, y_train=y, **settings)

# 输出最佳模型及其性能
print(f"最佳机器学习模型:{automl.model}")
print(f"最佳机器学习模型的准确率:{automl.best_loss}")

# 停止 Spark 会话
spark.stop()

这段代码演示了如何使用 Spark 进行并行训练。你需要先初始化一个 Spark 会话,然后注册 joblib 的 Spark 后端,接着加载数据并初始化 AutoML 实例。最后,通过调用 fit 方法并传入相应的参数,即可使用 Spark 后端运行 AutoML 搜索。搜索完成后,你可以输出最佳模型及其性能,并停止 Spark 会话。 joblib-spark 中的 register_spark() 函数用于将 Spark 后端与 joblib 注册,该后端在 FLAML 中用于并行训练。n_jobs=-1 参数告诉 FLAML 使用所有可用的 Spark 执行器进行并行训练。

请注意,设置 Spark 集群的实际过程可能很复杂,并可能涉及其他步骤,例如配置 Spark 工作节点、分配资源等,这超出了本代码片段的范围。

如果遇到任何问题或需要调整特定 Spark 设置的配置,请参考 Spark 和 FLAML 文档以获取更多详细信息。

在运行代码时,请确保您的 Spark 集群已正确配置并且可以从您的 Python 环境访问。如果您运行的是非本地集群,请将 .master("local[*]") 调整为指向您的 Spark 主节点的 URL。


要在 FLAML(快速轻量级自动机器学习)中使用 Spark 进行并行训练,您需要设置一个 Spark 集群,并使用 spark 后端进行 joblib,FLAML 在内部使用该后端进行并行训练。以下是如何设置和使用 Spark 与 FLAML 进行自动机器学习任务的示例:

首先,确保您已经设置好了 Spark 集群,并在您的环境中安装了 pysparkjoblib-spark 包。如果这些包尚未安装,您可以使用 pip 安装所需的包:

!pip install flaml pyspark joblib-spark

下面是一个示例代码片段,演示了如何在 Spark 中使用 FLAML 进行并行训练:

from flaml import AutoML
from pyspark.sql import SparkSession
from sklearn.datasets import load_digits
from joblibspark import register_spark

# 初始化 Spark 会话
spark = SparkSession.builder \
.master("local[*]") \
.appName("FLAML_Spark_Example") \
.getOrCreate()

# 注册 joblib spark 后端
register_spark() # 这将注册用于并行处理的后端

# 加载示例数据
X, y = load_digits(return_X_y=True)

# 初始化 AutoML 实例
automl = AutoML()

# 定义 AutoML 运行的设置
settings = {
"time_budget": 60, # 总运行时间(秒)
"metric": 'accuracy', # 评估的主要指标
"task": 'classification', # 任务类型
"n_jobs": -1, # 并行运行的作业数(使用 -1 表示全部)
"estimator_list": ['lgbm', 'rf', 'xgboost'], # 要考虑的估计器列表
"log_file_name": "flaml_log.txt", # 日志文件名
}

# 使用 Spark 后端运行 AutoML 搜索
automl.fit(X_train=X, y_train=y, **settings)

# 输出最佳模型及其性能
print(f"最佳机器学习模型:{automl.model}")
print(f"最佳机器学习模型的准确率:{automl.best_loss}")

# 停止 Spark 会话
spark.stop()

joblib-spark 中的 register_spark() 函数用于将 Spark 后端与 joblib 注册,用于 FLAML 中的并行训练。n_jobs=-1 参数告诉 FLAML 使用所有可用的 Spark 执行器进行并行训练。

请注意,设置 Spark 集群的实际过程可能很复杂,可能涉及配置 Spark 工作节点、分配资源等额外步骤,这超出了本代码片段的范围。

如果遇到任何问题或需要调整特定 Spark 设置的配置,请参考 Spark 和 FLAML 文档以获取更多详细信息。

在运行代码时,请确保您的 Spark 集群已正确配置,并且可以从 Python 环境访问。如果您运行的是非本地集群,请将 .master("local[*]") 调整为指向您的 Spark 主节点的 URL。


Code_Reviewer (to chat_manager):

TERMINATE



### RetrieveUserProxyAgent 获取正确的代码

由于 RetrieveUserProxyAgent 可以根据给定的文档文件执行检索增强生成,ChatGPT 可以为我们生成正确的代码!

```python
rag_chat()
# 输入 exit 以终止对话
```

``` text
2024-04-07 18:26:04,562 - autogen.agentchat.contrib.retrieve_user_proxy_agent - INFO - 使用现有的集合 `groupchat`。
```

``` text
正在尝试创建集合。
```
2024年4月7日18:26:05,485 - autogen.agentchat.contrib.retrieve_user_proxy_agent - INFO - 发现1个块。
请求的结果数量为3,大于索引中的元素数量1,更新n_results = 1
未找到模型gpt4-1106-preview。使用cl100k_base编码。
VectorDB 返回 doc_ids: [['bdfbc921']]
将文档 bdfbc921 的内容添加到上下文中。
Boss_Assistant(对 chat_manager 说):

你是一个增强型的检索编码助手。你根据自己的知识和用户提供的上下文来回答用户的问题。
如果你无法根据当前上下文回答问题,你应该回复“UPDATE CONTEXT”。
对于代码生成,你必须遵守以下规则:
规则 1. 你不得安装任何包,因为所需的所有包都已经安装好了。
规则 2. 你必须按照以下格式编写你的代码:
```language
# your code
```

用户的问题是:如何在 FLAML 中使用 Spark 进行并行训练?给我一个示例代码。

上下文是:# 集成 - Spark

FLAML 已经集成了 Spark 用于分布式训练。Spark 集成主要有两个方面:

- 使用 Spark ML 估计器进行自动机器学习。
- 使用 Spark 运行并行的 Spark 作业进行训练。

## Spark ML 估计器

FLAML 集成了基于 Spark ML 模型的估计器。这些模型使用 Spark 并行训练,因此我们称之为 Spark 估计器。要使用这些模型,首先需要按照所需的格式组织数据。

### 数据

对于 Spark 估计器,AutoML 只能使用 Spark 数据。FLAML 在 `flaml.automl.spark.utils` 模块中提供了一个方便的函数 `to_pandas_on_spark`,用于将数据转换为 pandas-on-spark (`pyspark.pandas`) 数据帧/系列,这是 Spark 估计器所需的。

这个实用函数接受以 `pandas.Dataframe` 或 `pyspark.sql.Dataframe` 形式的数据,并将其转换为 pandas-on-spark 数据帧。它还接受 `pandas.Series` 或 `pyspark.sql.Dataframe` 并将其转换为 [pandas-on-spark](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html) 系列。如果传入 `pyspark.pandas.Dataframe`,它将不会进行任何更改。

这个函数还接受可选参数 `index_col` 和 `default_index_type`。

- `index_col` 是要用作索引的列名,默认为 None。
- `default_index_type` 是默认的索引类型,默认为 "distributed-sequence"。有关默认索引类型的更多信息,请参阅 Spark 官方 [文档](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type)

下面是一个使用 Spark 数据的示例代码片段:

```python
import pandas as pd
from flaml.automl.spark.utils import to_pandas_on_spark

# 创建一个字典
data = {
"Square_Feet": [800, 1200, 1800, 1500, 850],
"Age_Years": [20, 15, 10, 7, 25],
"Price": [100000, 200000, 300000, 240000, 120000],
}

# 创建一个 pandas 数据帧
dataframe = pd.DataFrame(data)
label = "Price"

# 转换为 pandas-on-spark 数据帧
psdf = to_pandas_on_spark(dataframe)
要使用Spark ML模型,您需要适当地格式化数据。具体来说,使用[`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html)将所有特征列合并为一个向量列。

以下是如何使用它的示例:

```python
```python
from pyspark.ml.feature import VectorAssembler

columns = psdf.columns
feature_cols = [col for col in columns if col != label]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
psdf = featurizer.transform(psdf.to_spark(index_col="index"))["index", "features"]
```

在进行实验时,可以像处理非 Spark 数据一样使用你的 pandas-on-spark 数据,并使用 `X_train, y_train` 或 `dataframe, label` 进行传递。

### 估计器

#### 模型列表

- `lgbm_spark`:用于微调 Spark 版本 LightGBM 模型的类,使用 [SynapseML](https://microsoft.github.io/SynapseML/docs/features/lightgbm/about/) API。

#### 用法

首先,按照前面部分所述的要求格式准备你的数据。

通过将你打算尝试的模型包含在 `estimators_list` 参数中传递给 `flaml.automl`,FLAML 将开始尝试这些模型的配置。如果你的输入是 Spark 数据,FLAML 默认还会使用带有 `_spark` 后缀的估计器,即使你没有指定它们。

下面是一个使用 SparkML 模型的示例代码片段:

```python
import flaml

# 按照之前提到的方式,将你的数据准备成 pandas-on-spark 格式

automl = flaml.AutoML()
settings = {
"time_budget": 30,
"metric": "r2",
"estimator_list": ["lgbm_spark"], # 这个设置是可选的
"task": "regression",
}

automl.fit(
dataframe=psdf,
label=label,
**settings,
)
[链接到笔记本](https://github.com/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb) | [在Colab中打开](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb)

## 并行 Spark 作业

您可以在[自动机器学习](/docs/Use-Cases/Task-Oriented-AutoML#parallel-tuning)和[超参数调整](/docs/Use-Cases/Tune-User-Defined-Function#parallel-tuning)中将 Spark 激活为并行后端,方法是将 `use_spark` 设置为 `true`。FLAML 将使用 [`joblib-spark`](https://github.com/joblib/joblib-spark) 将您的作业分发到分布式 Spark 后端。

请注意,当应用于 Spark 数据的自动机器学习和调整时,不应将 `use_spark` 设置为 `true`。这是因为自动机器学习和调整中仅使用 SparkML 模型来处理 Spark 数据。由于 SparkML 模型可以并行运行,因此不需要再使用 `use_spark` 进行分发。

下面列出了所有与 Spark 相关的参数。这些参数在超参数调整和自动机器学习中都可用:

- `use_spark`:布尔值,默认为 False | 是否使用 Spark 在并行 Spark 作业中运行训练。这可以用于加速大型模型和大型数据集的训练,但会增加时间开销,从而在某些情况下减慢训练速度。当 `use_spark` 为 True 时,不支持 GPU 训练。对于 Spark 集群,默认情况下,我们将为每个执行器启动一个试验。但是,有时我们希望启动的试验数量超过执行器的数量(例如,本地模式)。在这种情况下,我们可以设置环境变量 `FLAML_MAX_CONCURRENT` 来覆盖检测到的 `num_executors`。最终的并发试验数量将是 `n_concurrent_trials` 和 `num_executors` 中的最小值。
- `n_concurrent_trials`:整数,默认为 1 | 并发试验的数量。当 n_concurrent_trials > 1 时,FLAML 执行并行调整。
- `force_cancel`:布尔值,默认为 False | 如果搜索时间超过时间预算,则是否强制取消 Spark 作业。Spark 作业包括并行调整作业和基于 Spark 的模型训练作业。

使用并行 Spark 作业的示例代码段:

```python
```python
import flaml

automl_experiment = flaml.AutoML()
automl_settings = {
"time_budget": 30,
"metric": "r2",
"task": "regression",
"n_concurrent_trials": 2,
"use_spark": True,
"force_cancel": True, # 激活 force_cancel 选项可以在超过分配的 time_budget 后立即停止 Spark 作业。
}

automl.fit(
dataframe=dataframe,
label=label,
**automl_settings,
)
```

[笔记本链接](https://github.com/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb) | [在 Colab 中打开](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb)
```
用户的问题是:如何在FLAML中使用Spark进行并行训练?给我一个示例代码。

背景是:# 集成 - Spark

FLAML已经集成了Spark用于分布式训练。与Spark的集成有两个主要方面:

- 使用Spark ML估计器进行自动机器学习。
- 使用Spark运行并行的Spark作业进行训练。

## Spark ML估计器

FLAML集成了基于Spark ML模型的估计器。这些模型使用Spark并行训练,因此我们称之为Spark估计器。要使用这些模型,您首先需要按照所需的格式组织数据。

### 数据

对于Spark估计器,AutoML只能使用Spark数据。FLAML在`flaml.automl.spark.utils`模块中提供了一个方便的函数`to_pandas_on_spark`,用于将您的数据转换为pandas-on-spark(`pyspark.pandas`)数据帧/系列,这是Spark估计器所需的。

此实用函数接受`pandas.Dataframe`或`pyspark.sql.Dataframe`形式的数据,并将其转换为pandas-on-spark数据帧。它还接受`pandas.Series`或`pyspark.sql.Dataframe`并将其转换为[pandas-on-spark](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html)系列。如果传入`pyspark.pandas.Dataframe`,它将不会进行任何更改。

此函数还接受可选参数`index_col`和`default_index_type`。

- `index_col`是要用作索引的列名,默认为None。
- `default_index_type`是默认的索引类型,默认为"distributed-sequence"。有关默认索引类型的更多信息,请参阅Spark官方[文档](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type)

下面是一个Spark数据的示例代码片段:

```python
```python
import pandas as pd
from flaml.automl.spark.utils import to_pandas_on_spark

# 创建一个字典
data = {
"Square_Feet": [800, 1200, 1800, 1500, 850],
"Age_Years": [20, 15, 10, 7, 25],
"Price": [100000, 200000, 300000, 240000, 120000],
}

# 创建一个 pandas DataFrame
dataframe = pd.DataFrame(data)
label = "Price"

# 转换为 pandas-on-spark DataFrame
psdf = to_pandas_on_spark(dataframe)
```

要使用 Spark ML 模型,您需要适当地格式化数据。具体来说,使用 [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) 将所有特征列合并为一个向量列。

以下是如何使用的示例:

```python
from pyspark.ml.feature import VectorAssembler

columns = psdf.columns
feature_cols = [col for col in columns if col != label]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
psdf = featurizer.transform(psdf.to_spark(index_col="index"))["index", "features"]
```

在进行实验时,像处理非 Spark 数据一样使用您的 pandas-on-spark 数据,并使用 `X_train, y_train` 或 `dataframe, label` 进行传递。

### 估计器

#### 模型列表

- `lgbm_spark`:用于微调 Spark 版本 LightGBM 模型的类,使用 [SynapseML](https://microsoft.github.io/SynapseML/docs/features/lightgbm/about/) API。

#### 用法

首先,按照前面部分的说明将数据准备成所需的格式。

通过将您打算尝试的模型包含在 `estimators_list` 参数中传递给 `flaml.automl`,FLAML 将开始尝试这些模型的配置。如果您的输入是 Spark 数据,FLAML 默认还会使用带有 `_spark` 后缀的估计器,即使您没有指定它们。

以下是使用 SparkML 模型的示例代码片段:

```python
import flaml

# 按照前面提到的方式将数据准备成 pandas-on-spark 格式

automl = flaml.AutoML()
settings = {
"time_budget": 30,
"metric": "r2",
"estimator_list": ["lgbm_spark"], # 此设置是可选的
"task": "regression",
}

automl.fit(
dataframe=psdf,
label=label,
**settings,
)
[链接到笔记本](https://github.com/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb) | [在colab中打开](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb)

## 并行 Spark 作业

您可以在[自动机器学习](/docs/Use-Cases/Task-Oriented-AutoML#parallel-tuning)和[超参数调整](/docs/Use-Cases/Tune-User-Defined-Function#parallel-tuning)中将 Spark 激活为并行后端,方法是将 `use_spark` 设置为 `true`。FLAML 将使用 [`joblib-spark`](https://github.com/joblib/joblib-spark) 将您的作业分发到分布式 Spark 后端。

请注意,在应用于 Spark 数据的自动机器学习和调整过程中,不应将 `use_spark` 设置为 `true`。这是因为自动机器学习和调整过程中只会使用 SparkML 模型来处理 Spark 数据。由于 SparkML 模型可以并行运行,因此不需要再使用 `use_spark` 进行分发。

下面列出了所有与 Spark 相关的参数。这些参数在超参数调整和自动机器学习中都可用:

- `use_spark`:布尔值,默认为 False | 是否使用 Spark 在并行 Spark 作业中运行训练。这可以用于加速大型模型和大型数据集的训练,但会增加时间开销,从而在某些情况下减慢训练速度。当 use_spark 为 True 时,不支持 GPU 训练。对于 Spark 集群,默认情况下,我们将为每个执行器启动一个试验。然而,有时我们希望启动的试验数量超过执行器的数量(例如,本地模式)。在这种情况下,我们可以设置环境变量 `FLAML_MAX_CONCURRENT` 来覆盖检测到的 `num_executors`。最终的并发试验数量将是 `n_concurrent_trials` 和 `num_executors` 中的最小值。
- `n_concurrent_trials`:整数,默认为 1 | 并发试验的数量。当 n_concurrent_trials > 1 时,FLAML 执行并行调整。
- `force_cancel`:布尔值,默认为 False | 如果搜索时间超过时间预算,则是否强制取消 Spark 作业。Spark 作业包括并行调整作业和基于 Spark 的模型训练作业。

使用并行 Spark 作业的示例代码段:

```python
```python
import flaml

automl_experiment = flaml.AutoML()
automl_settings = {
"time_budget": 30,
"metric": "r2",
"task": "regression",
"n_concurrent_trials": 2,
"use_spark": True,
"force_cancel": True, # 激活 force_cancel 选项可以在超过分配的 time_budget 后立即停止 Spark 作业。
}

automl.fit(
dataframe=dataframe,
label=label,
**automl_settings,
)
```

[笔记本链接](https://github.com/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb) | [在 colab 中打开](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb)



--------------------------------------------------------------------------------
产品经理(对话经理):

```python
from flaml.automl import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
from pyspark.ml.feature import VectorAssembler
import pandas as pd

# 一个包含样本数据的字典
data = {
"Square_Feet": [800, 1200, 1800, 1500, 850],
"Age_Years": [20, 15, 10, 7, 25],
"Price": [100000, 200000, 300000, 240000, 120000],
}

# 将字典转换为 pandas DataFrame
dataframe = pd.DataFrame(data)
label = "Price"

# 将 pandas DataFrame 转换为 pandas-on-spark DataFrame
psdf = to_pandas_on_spark(dataframe)

# 使用 VectorAssembler 将特征列合并为一个向量列
feature_cols = [col for col in psdf.columns if col != label]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
psdf = featurizer.transform(psdf.to_spark(index_col="index"))["index", "features", label]

# 初始化 AutoML 实例
automl = AutoML()

# AutoML 设置
automl_settings = {
"time_budget": 30, # 总运行时间(秒)
"metric": "r2", # 评估指标
"task": "regression",
"n_concurrent_trials": 2, # 并发 Spark 作业的数量
"use_spark": True, # 启用 Spark 进行并行训练
"force_cancel": True, # 如果 Spark 作业超过时间预算,则强制取消
"estimator_list": ["lgbm_spark"] # 可选:指定要使用的特定估计器
}

# 使用 pandas-on-spark dataframe 运行 AutoML fit
automl.fit(
dataframe=psdf,
label=label,
**automl_settings,
)
```
终止

--------------------------------------------------------------------------------

在与另一个用户代理进行初始聊天时调用 RetrieveUserProxyAgent

有时,在群聊中可能需要在不初始化与其的聊天的情况下使用 RetrieveUserProxyAgent。在这种情况下,创建一个函数来包装 RAG 代理,并允许从其他代理调用它们变得必要。

call_rag_chat()
Boss (对 chat_manager 说):

如何在 FLAML 中使用 Spark 进行并行训练?给我一个示例代码。

--------------------------------------------------------------------------------
Product_Manager (对 chat_manager 说):

***** 建议的函数调用: retrieve_content *****
参数:
{"message":"使用 Apache Spark 在 FLAML 中进行并行训练的示例代码"}
*****************************************************

--------------------------------------------------------------------------------

>>>>>>>> 执行函数 retrieve_content...
from pyspark.sql import SparkSession
from flaml.automl.spark.utils import to_pandas_on_spark

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Convert pandas DataFrame to pandas-on-spark DataFrame
dataframe_spark = to_pandas_on_spark(dataframe)

# Convert pandas Series to pandas-on-spark Series
label_spark = to_pandas_on_spark(dataframe[label])

# Print the converted data
dataframe_spark.show()
label_spark.show()

Spark Parallel Training

FLAML uses Spark to run training in parallel spark jobs. To enable parallel training, you need to set the n_jobs parameter in the AutoML class to a positive integer. The n_jobs parameter specifies the number of parallel spark jobs to run.

Here is an example code snippet for parallel training using Spark:

from flaml import AutoML

# Create an AutoML instance with n_jobs=2
automl = AutoML(n_jobs=2)

# Fit the AutoML instance to the data
automl.fit(X_train, y_train)

In this example, the AutoML instance is created with n_jobs=2, which means FLAML will run two parallel spark jobs for training.

Note that you need to have a Spark cluster set up and running to use Spark parallel training with FLAML.

For more information on using Spark with FLAML, refer to the FLAML documentation.

UPDATE CONTEXT


```` text
VectorDB returns doc_ids: [['bdfbc921']]
Adding content of doc bdfbc921 to context.
Boss (to chat_manager):

***** Response from calling function (retrieve_content) *****
You're a retrieve augmented coding assistant. You answer user's questions based on your own knowledge and the
context provided by the user.
If you can't answer the question with or without the current context, you should reply exactly `UPDATE CONTEXT`.
For code generation, you must obey the following rules:
Rule 1. You MUST NOT install any packages because all the packages needed are already installed.
Rule 2. You must follow the formats below to write your code:
```language
# your code
```

User's question is: using Apache Spark for parallel training in FLAML with sample code

Context is: # Integrate - Spark

FLAML has integrated Spark for distributed training. There are two main aspects of integration with Spark:

- Use Spark ML estimators for AutoML.
- Use Spark to run training in parallel spark jobs.

## Spark ML Estimators

FLAML integrates estimators based on Spark ML models. These models are trained in parallel using Spark, so we called them Spark estimators. To use these models, you first need to organize your data in the required format.

### Data

For Spark estimators, AutoML only consumes Spark data. FLAML provides a convenient function `to_pandas_on_spark` in the `flaml.automl.spark.utils` module to convert your data into a pandas-on-spark (`pyspark.pandas`) dataframe/series, which Spark estimators require.

This utility function takes data in the form of a `pandas.Dataframe` or `pyspark.sql.Dataframe` and converts it into a pandas-on-spark dataframe. It also takes `pandas.Series` or `pyspark.sql.Dataframe` and converts it into a [pandas-on-spark](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html) series. If you pass in a `pyspark.pandas.Dataframe`, it will not make any changes.

This function also accepts optional arguments `index_col` and `default_index_type`.

- `index_col` is the column name to use as the index, default is None.
- `default_index_type` is the default index type, default is "distributed-sequence". More info about default index type could be found on Spark official [documentation](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type)

Here is an example code snippet for Spark Data:

```python
import pandas as pd
from flaml.automl.spark.utils import to_pandas_on_spark

# Creating a dictionary
data = {
"Square_Feet": [800, 1200, 1800, 1500, 850],
"Age_Years": [20, 15, 10, 7, 25],
"Price": [100000, 200000, 300000, 240000, 120000],
}

# Creating a pandas DataFrame
dataframe = pd.DataFrame(data)
label = "Price"

# Convert to pandas-on-spark dataframe
```python
psdf = to_pandas_on_spark(dataframe)
```

要使用 Spark ML 模型,您需要适当地格式化数据。具体来说,使用 [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) 将所有特征列合并为一个向量列。

以下是如何使用的示例:

```python
from pyspark.ml.feature import VectorAssembler

columns = psdf.columns
feature_cols = [col for col in columns if col != label]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
psdf = featurizer.transform(psdf.to_spark(index_col="index"))["index", "features"]
```

在进行实验时,像非 Spark 数据一样使用您的 pandas-on-spark 数据,并使用 `X_train, y_train` 或 `dataframe, label` 进行传递。

### 估计器

#### 模型列表

- `lgbm_spark`:用于微调 Spark 版本 LightGBM 模型的类,使用 [SynapseML](https://microsoft.github.io/SynapseML/docs/features/lightgbm/about/) API。

#### 用法

首先,按照前一节中的说明,将数据准备成所需的格式。

通过将您打算尝试的模型包含在 `estimators_list` 参数中传递给 `flaml.automl`,FLAML 将开始尝试这些模型的配置。如果您的输入是 Spark 数据,则 FLAML 默认还会使用带有 `_spark` 后缀的估计器,即使您没有指定它们。

以下是在 AutoML 中使用 SparkML 模型的示例代码片段:

```python
import flaml

# 按照我们之前提到的方式,将数据准备成 pandas-on-spark 格式

automl = flaml.AutoML()
settings = {
"time_budget": 30,
"metric": "r2",
"estimator_list": ["lgbm_spark"], # 此设置是可选的
"task": "regression",
}

automl.fit(
dataframe=psdf,
label=label,
**settings,
)
[链接到笔记本](https://github.com/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb) | [在colab中打开](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb)

## 并行 Spark 作业

您可以在[自动机器学习](/docs/Use-Cases/Task-Oriented-AutoML#parallel-tuning)和[超参数调整](/docs/Use-Cases/Tune-User-Defined-Function#parallel-tuning)中,通过将 `use_spark` 设置为 `true`,将 Spark 激活为并行后端。FLAML 将使用 [`joblib-spark`](https://github.com/joblib/joblib-spark) 将您的作业分发到分布式 Spark 后端。

请注意,当应用于 Spark 数据的自动机器学习和调整时,不应将 `use_spark` 设置为 `true`。这是因为自动机器学习和调整中仅使用 SparkML 模型来处理 Spark 数据。由于 SparkML 模型可以并行运行,因此不需要再使用 `use_spark` 进行分发。

下面列出了所有与 Spark 相关的参数。这些参数在超参数调整和自动机器学习中都可用:

- `use_spark`:布尔值,默认为 False | 是否使用 Spark 在并行 Spark 作业中运行训练。这可以用于加速大型模型和大型数据集的训练,但会增加时间开销,从而在某些情况下减慢训练速度。当 `use_spark` 为 True 时,不支持 GPU 训练。对于 Spark 集群,默认情况下,我们将为每个执行器启动一个试验。但是,有时我们希望启动的试验数量超过执行器的数量(例如,本地模式)。在这种情况下,我们可以设置环境变量 `FLAML_MAX_CONCURRENT` 来覆盖检测到的 `num_executors`。最终的并发试验数量将是 `n_concurrent_trials` 和 `num_executors` 中较小的那个。
- `n_concurrent_trials`:整数,默认为 1 | 并发试验的数量。当 n_concurrent_trials > 1 时,FLAML 执行并行调整。
- `force_cancel`:布尔值,默认为 False | 如果搜索时间超过时间预算,则是否强制取消 Spark 作业。Spark 作业包括并行调整作业和基于 Spark 的模型训练作业。

使用并行 Spark 作业的示例代码片段:

```python
import flaml

automl_experiment = flaml.AutoML()
automl_settings = {
"time_budget": 30,
"metric": "r2",
"task": "regression",
"n_concurrent_trials": 2,
"use_spark": True,
"force_cancel": True, # 激活 force_cancel 选项可以在超过分配的 time_budget 后立即停止 Spark 作业。
}

automl.fit(
dataframe=dataframe,
label=label,
**automl_settings,
)
```

[笔记本链接](https://github.com/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb) | [在 Colab 中打开](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb)


*************************************************************

--------------------------------------------------------------------------------
Boss (to chat_manager):

***** 调用函数(retrieve_content)的响应 *****
你是一个检索增强型编码助手。你根据自己的知识和用户提供的上下文回答用户的问题。
如果你无法根据当前上下文回答问题,你应该回复 `UPDATE CONTEXT`。
对于代码生成,你必须遵守以下规则:
规则 1. 你绝对不能安装任何包,因为所需的所有包都已经安装好了。
规则 2. 你必须按照下面的格式编写你的代码:
```language
# 你的代码
用户的问题是:如何在FLAML中使用Apache Spark进行并行训练,附带示例代码。

背景是:# 集成 - Spark

FLAML已经集成了Spark用于分布式训练。Spark集成主要有两个方面:

- 使用Spark ML估计器进行自动机器学习。
- 使用Spark在并行的Spark作业中运行训练。

## Spark ML估计器

FLAML集成了基于Spark ML模型的估计器。这些模型使用Spark并行训练,因此我们称之为Spark估计器。要使用这些模型,您首先需要按照所需的格式组织数据。

### 数据

对于Spark估计器,AutoML只能使用Spark数据。FLAML在`flaml.automl.spark.utils`模块中提供了一个方便的函数`to_pandas_on_spark`,用于将数据转换为pandas-on-spark(`pyspark.pandas`)数据帧/系列,这是Spark估计器所需的。

这个实用函数接受`pandas.Dataframe`或`pyspark.sql.Dataframe`形式的数据,并将其转换为pandas-on-spark数据帧。它还接受`pandas.Series`或`pyspark.sql.Dataframe`并将其转换为[pandas-on-spark](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html)系列。如果传入`pyspark.pandas.Dataframe`,它将不会进行任何更改。

该函数还接受可选参数`index_col`和`default_index_type`。

- `index_col`是要用作索引的列名,默认为None。
- `default_index_type`是默认索引类型,默认为"distributed-sequence"。有关默认索引类型的更多信息,请参阅Spark官方[文档](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type)。

下面是一个使用Spark数据的示例代码片段:

```python
```python
import pandas as pd
from flaml.automl.spark.utils import to_pandas_on_spark

# 创建一个字典
data = {
"Square_Feet": [800, 1200, 1800, 1500, 850],
"Age_Years": [20, 15, 10, 7, 25],
"Price": [100000, 200000, 300000, 240000, 120000],
}

# 创建一个 pandas DataFrame
dataframe = pd.DataFrame(data)
label = "Price"

# 转换为 pandas-on-spark DataFrame
psdf = to_pandas_on_spark(dataframe)
```

要使用 Spark ML 模型,您需要适当地格式化数据。具体来说,使用 [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) 将所有特征列合并为一个向量列。

以下是如何使用它的示例:

```python
from pyspark.ml.feature import VectorAssembler

columns = psdf.columns
feature_cols = [col for col in columns if col != label]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
psdf = featurizer.transform(psdf.to_spark(index_col="index"))["index", "features"]
```

在进行实验时,像处理非 Spark 数据一样使用您的 pandas-on-spark 数据,并使用 `X_train, y_train` 或 `dataframe, label` 进行传递。

### 估计器

#### 模型列表

- `lgbm_spark`:用于微调 Spark 版本 LightGBM 模型的类,使用 [SynapseML](https://microsoft.github.io/SynapseML/docs/features/lightgbm/about/) API。

#### 用法

首先,按照前面部分的描述,将数据准备成所需的格式。

通过将您打算尝试的模型包含在 `estimators_list` 参数中传递给 `flaml.automl`,FLAML 将开始尝试这些模型的配置。如果您的输入是 Spark 数据,则 FLAML 默认还会使用带有 `_spark` 后缀的估计器,即使您没有指定它们。

以下是使用 SparkML 模型的示例代码片段:

```python
import flaml

# 按照前面提到的方式,将数据准备成 pandas-on-spark 格式

automl = flaml.AutoML()
settings = {
"time_budget": 30,
"metric": "r2",
"estimator_list": ["lgbm_spark"], # 这个设置是可选的
"task": "regression",
}

automl.fit(
dataframe=psdf,
label=label,
**settings,
)
[链接到笔记本](https://github.com/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb) | [在colab中打开](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb)

## 并行 Spark 作业

您可以在[自动机器学习](/docs/Use-Cases/Task-Oriented-AutoML#parallel-tuning)和[超参数调整](/docs/Use-Cases/Tune-User-Defined-Function#parallel-tuning)中将 Spark 激活为并行后端,方法是将 `use_spark` 设置为 `true`。FLAML 将使用 [`joblib-spark`](https://github.com/joblib/joblib-spark) 将您的作业分发到分布式 Spark 后端。

请注意,在应用于 Spark 数据的自动机器学习和调整时,不应将 `use_spark` 设置为 `true`。这是因为在自动机器学习和调整中,只会使用 SparkML 模型来处理 Spark 数据。由于 SparkML 模型可以并行运行,因此不需要再使用 `use_spark` 进行分发。

下面列出了所有与 Spark 相关的参数。这些参数在超参数调整和自动机器学习中都可用:

- `use_spark`:布尔值,默认为 False | 是否使用 Spark 在并行 Spark 作业中运行训练。这可以用于加速大型模型和大型数据集的训练,但会增加时间开销,从而在某些情况下减慢训练速度。当 `use_spark` 为 True 时,不支持 GPU 训练。对于 Spark 集群,默认情况下,我们将为每个执行器启动一个试验。然而,有时我们希望启动的试验数量超过执行器的数量(例如,本地模式)。在这种情况下,我们可以设置环境变量 `FLAML_MAX_CONCURRENT` 来覆盖检测到的 `num_executors`。最终的并发试验数量将是 `n_concurrent_trials` 和 `num_executors` 中的最小值。
- `n_concurrent_trials`:整数,默认为 1 | 并发试验的数量。当 n_concurrent_trials > 1 时,FLAML 执行并行调整。
- `force_cancel`:布尔值,默认为 False | 如果搜索时间超过时间预算,则是否强制取消 Spark 作业。Spark 作业包括并行调整作业和基于 Spark 的模型训练作业。

使用并行 Spark 作业的示例代码段:

```python
```python
import flaml

automl_experiment = flaml.AutoML()
automl_settings = {
"time_budget": 30,
"metric": "r2",
"task": "regression",
"n_concurrent_trials": 2,
"use_spark": True,
"force_cancel": True, # 激活 force_cancel 选项可以在超出分配的 time_budget 后立即停止 Spark 作业。
}

automl.fit(
dataframe=dataframe,
label=label,
**automl_settings,
)
```

[笔记本链接](https://github.com/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb) | [在 Colab 中打开](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb)


*************************************************************

--------------------------------------------------------------------------------
产品经理(给聊天经理):

要在 FLAML 中使用 Apache Spark 进行并行训练,您可以按照以下步骤操作:

1. 确保您的数据以所需的 pandas-on-spark 格式存在。
2. 在 `estimator_list` 中包含 Spark ML 估计器。
3. 将 `use_spark` 设置为 `True` 以进行并行调优。

以下是一个示例代码,演示如何在 FLAML 中使用 Spark 进行并行训练:

```python
import flaml
from flaml.automl.spark.utils import to_pandas_on_spark
import pandas as pd
from pyspark.ml.feature import VectorAssembler

# 在 pandas DataFrame 中的示例数据
data = {
"Square_Feet": [800, 1200, 1800, 1500, 850],
"Age_Years": [20, 15, 10, 7, 25],
"Price": [100000, 200000, 300000, 240000, 120000],
}
label = "Price"

# 创建一个 pandas DataFrame
dataframe = pd.DataFrame(data)

# 转换为 pandas-on-spark DataFrame
psdf = to_pandas_on_spark(dataframe)

# 使用 VectorAssembler 准备特征
columns = psdf.columns
feature_cols = [col for col in columns if col != label]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
psdf = featurizer.transform(psdf.to_spark(index_col="index"))["index", "features"]

# 初始化 AutoML
automl = flaml.AutoML()

# 配置 AutoML 的设置
settings = {
"time_budget": 30, # 时间预算(以秒为单位)
"metric": "r2",
"estimator_list": ["lgbm_spark"], # 使用 Spark ML 估计器
"task": "regression",
"n_concurrent_trials": 2, # 并行试验的数量
"use_spark": True, # 启用使用 Spark 进行并行训练
"force_cancel": True, # 如果超出时间预算,则强制取消 Spark 作业
}

# 开始训练
automl.fit(dataframe=psdf, label=label, **settings)
```
在这段代码片段中:
- `to_pandas_on_spark` 函数用于将 pandas DataFrame 转换为 pandas-on-spark DataFrame。
- `VectorAssembler` 用于将特征列转换为单个向量列。
- 创建了 `AutoML` 对象,并配置了自动机器学习运行的设置,包括将 `use_spark` 设置为 `True` 以进行并行训练。
- 调用 `fit` 方法开始自动化机器学习过程。

通过使用这些设置,FLAML 将使用 Spark 并行训练模型,从而加速大型模型和数据集的训练过程。

终止

--------------------------------------------------------------------------------
# 标题1

这是一段关于某个主题的科普文章。文章介绍了该主题的背景和相关概念,并提供了一些实例和数据来支持论点。

## 小标题1

这是第一个小标题。在这一部分,我们将讨论一些关于该主题的基本知识。我们将介绍一些术语和概念,并解释它们的含义。

例如,我们将讨论FLAC(Free Lossless Audio Codec)是一种无损音频压缩格式,它可以将音频文件压缩到较小的尺寸,同时保持音质不变。这对于音乐爱好者来说非常有用,因为他们可以在不牺牲音质的情况下节省存储空间。

## 小标题2

这是第二个小标题。在这一部分,我们将探讨一些与该主题相关的研究和发现。

例如,研究人员在一项研究中发现,JPEG(Joint Photographic Experts Group)是一种常用的图像压缩格式,它可以将图像文件压缩到较小的尺寸,同时保持图像质量。这对于网站和社交媒体平台来说非常重要,因为它们需要在保持图像质量的同时减少带宽使用。

## 小标题3

这是第三个小标题。在这一部分,我们将讨论一些与该主题相关的应用和实际应用。

例如,微软(Microsoft)开发了一种名为Microsoft Word的文字处理软件,它可以帮助用户创建和编辑文档。这对于学生和专业人士来说非常有用,因为他们可以使用这个软件来写作和组织他们的想法。

## 结论

通过这篇文章,我们希望读者能够对该主题有一个更深入的了解。我们介绍了一些基本概念、研究发现和实际应用,并提供了一些实例来支持我们的论点。希望这篇文章能够对读者有所帮助。

参考文献:
[20] 引用的论文1
[30] 引用的论文2