跳到主要内容

如何使用函数自动化任务(S3 存储桶示例)

nbviewer

这段代码演示了如何与ChatGPT函数交互,执行与Amazon S3存储桶相关的任务。该笔记本涵盖了S3存储桶关键功能,如运行简单的列出命令、在所有存储桶中搜索特定文件、将文件上传到存储桶以及从存储桶下载文件。OpenAI Chat API能够理解用户的指令,生成自然语言响应,并根据用户的输入提取适当的函数调用。

要求: 要运行该笔记本,需要生成具有S3存储桶写入权限的AWS访问密钥,并将它们存储在本地环境文件中,与Openai密钥放在一起。“.env”文件格式如下:

AWS_ACCESS_KEY_ID=<your-key>
AWS_SECRET_ACCESS_KEY=<your-key>
OPENAI_API_KEY=<your-key>
! pip install openai
! pip install boto3
! pip install tenacity
! pip install python-dotenv

from openai import OpenAI
import json
import boto3
import os
import datetime
from urllib.request import urlretrieve

# 加载环境变量
from dotenv import load_dotenv
load_dotenv()

True

初始设置

OpenAI.api_key = os.environ.get("OPENAI_API_KEY")
GPT_MODEL = "gpt-3.5-turbo"

# 可选 - 如果你在加载环境文件时遇到问题,可以使用以下代码设置 AWS 值。
# os.environ['AWS_ACCESS_KEY_ID'] = ''
# os.environ['AWS_SECRET_ACCESS_KEY'] = ''

# 创建S3客户端
s3_client = boto3.client('s3')

# 创建OpenAI客户端
client = OpenAI()

实用工具

为了将用户的问题或命令与适当的函数连接起来,我们需要向ChatGPT提供必要的函数细节和预期参数。

# 功能字典用于传递GPT模型执行S3操作的详细信息
functions = [
{
"type": "function",
"function":{
"name": "list_buckets",
"description": "List all available S3 buckets",
"parameters": {
"type": "object",
"properties": {}
}
}
},
{
"type": "function",
"function":{
"name": "list_objects",
"description": "List the objects or files inside a given S3 bucket",
"parameters": {
"type": "object",
"properties": {
"bucket": {"type": "string", "description": "The name of the S3 bucket"},
"prefix": {"type": "string", "description": "The folder path in the S3 bucket"},
},
"required": ["bucket"],
},
}
},
{
"type": "function",
"function":{
"name": "download_file",
"description": "Download a specific file from an S3 bucket to a local distribution folder.",
"parameters": {
"type": "object",
"properties": {
"bucket": {"type": "string", "description": "The name of the S3 bucket"},
"key": {"type": "string", "description": "The path to the file inside the bucket"},
"directory": {"type": "string", "description": "The local destination directory to download the file, should be specificed by the user."},
},
"required": ["bucket", "key", "directory"],
}
}
},
{
"type": "function",
"function":{
"name": "upload_file",
"description": "Upload a file to an S3 bucket",
"parameters": {
"type": "object",
"properties": {
"source": {"type": "string", "description": "The local source path or remote URL"},
"bucket": {"type": "string", "description": "The name of the S3 bucket"},
"key": {"type": "string", "description": "The path to the file inside the bucket"},
"is_remote_url": {"type": "boolean", "description": "Is the provided source a URL (True) or local path (False)"},
},
"required": ["source", "bucket", "key", "is_remote_url"],
}
}
},
{
"type": "function",
"function":{
"name": "search_s3_objects",
"description": "Search for a specific file name inside an S3 bucket",
"parameters": {
"type": "object",
"properties": {
"search_name": {"type": "string", "description": "The name of the file you want to search for"},
"bucket": {"type": "string", "description": "The name of the S3 bucket"},
"prefix": {"type": "string", "description": "The folder path in the S3 bucket"},
"exact_match": {"type": "boolean", "description": "Set exact_match to True if the search should match the exact file name. Set exact_match to False to compare part of the file name string (the file contains)"}
},
"required": ["search_name"],
},
}
}
]

创建用于与S3服务交互的辅助函数,例如列出存储桶、列出对象、下载和上传文件,以及搜索特定文件。

def datetime_converter(obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")

def list_buckets():
response = s3_client.list_buckets()
return json.dumps(response['Buckets'], default=datetime_converter)

def list_objects(bucket, prefix=''):
response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
return json.dumps(response.get('Contents', []), default=datetime_converter)

def download_file(bucket, key, directory):

filename = os.path.basename(key)

# 解析目标路径至正确的文件位置
destination = os.path.join(directory, filename)

s3_client.download_file(bucket, key, destination)
return json.dumps({"status": "success", "bucket": bucket, "key": key, "destination": destination})

def upload_file(source, bucket, key, is_remote_url=False):
if is_remote_url:
file_name = os.path.basename(source)
urlretrieve(source, file_name)
source = file_name

s3_client.upload_file(source, bucket, key)
return json.dumps({"status": "success", "source": source, "bucket": bucket, "key": key})

def search_s3_objects(search_name, bucket=None, prefix='', exact_match=True):
search_name = search_name.lower()

if bucket is None:
buckets_response = json.loads(list_buckets())
buckets = [bucket_info["Name"] for bucket_info in buckets_response]
else:
buckets = [bucket]

results = []

for bucket_name in buckets:
objects_response = json.loads(list_objects(bucket_name, prefix))
if exact_match:
bucket_results = [obj for obj in objects_response if search_name == obj['Key'].lower()]
else:
bucket_results = [obj for obj in objects_response if search_name in obj['Key'].lower()]

if bucket_results:
results.extend([{"Bucket": bucket_name, "Object": obj} for obj in bucket_results])

return json.dumps(results)

下面的字典将名称与函数连接起来,以便根据ChatGPT的响应来使用它进行执行。

available_functions = {
"list_buckets": list_buckets,
"list_objects": list_objects,
"download_file": download_file,
"upload_file": upload_file,
"search_s3_objects": search_s3_objects
}

ChatGPT

ChatGPT是一个基于GPT-3的模型,用于生成自然对话。它可以用于多种应用,如智能助手、聊天机器人、对话系统等。ChatGPT可以生成连贯、有逻辑的对话,并且可以根据上下文进行回复。

def chat_completion_request(messages, functions=None, function_call='auto', 
model_name=GPT_MODEL):

if functions is not None:
return client.chat.completions.create(
model=model_name,
messages=messages,
tools=functions,
tool_choice=function_call)
else:
return client.chat.completions.create(
model=model_name,
messages=messages)

对话流程

创建一个主函数用于聊天机器人,该函数接收用户输入,将其发送到OpenAI Chat API,接收响应,执行API生成的任何函数调用,并将最终响应返回给用户。

def run_conversation(user_input, topic="S3 bucket functions.", is_log=False):

system_message=f"Don't make assumptions about what values to plug into functions. Ask for clarification if a user request is ambiguous. If the user ask question not related to {topic} response your scope is {topic} only."

messages = [{"role": "system", "content": system_message},
{"role": "user", "content": user_input}]

# 调用模型以获取响应
response = chat_completion_request(messages, functions=functions)
response_message = response.choices[0].message

if is_log:
print(response.choices)

# 检查GPT是否想要调用一个函数。
if response_message.tool_calls:
function_name = response_message.tool_calls[0].function.name
function_args = json.loads(response_message.tool_calls[0].function.arguments)

# 调用该函数
function_response = available_functions[function_name](**function_args)

# 将回复添加到对话中
messages.append(response_message)
messages.append({
"role": "tool",
"content": function_response,
"tool_call_id": response_message.tool_calls[0].id,
})

# 再次调用模型以总结结果
second_response = chat_completion_request(messages)
final_message = second_response.choices[0].message.content
else:
final_message = response_message.content

return final_message

S3存储桶机器人测试

在以下示例中,在执行之前,请确保用您的特定值替换诸如<file_name><bucket_name><directory_path>之类的占位符。

列出和搜索

让我们首先列出所有可用的存储桶。

print(run_conversation('list my S3 buckets'))

您可以要求助手在所有存储桶中或特定存储桶中搜索特定文件名。

search_file = '<file_name>'
print(run_conversation(f'search for a file {search_file} in all buckets'))

search_word = '<file_name_part>'
bucket_name = '<bucket_name>'
print(run_conversation(f'search for a file contains {search_word} in {bucket_name}'))

模型应在系统消息中描述的参数值模糊时,澄清用户的要求。

print(run_conversation('search for a file'))

Sure, to help me find what you're looking for, could you please provide the name of the file you want to search for and the name of the S3 bucket? Also, should the search match the file name exactly, or should it also consider partial matches?

验证边缘情况

我们还指示模型拒绝不相关的任务。让我们测试一下,看看它在实际操作中是如何工作的。

# 模型不应回答与范围无关的细节。
print(run_conversation('what is the weather today'))

Apologies for the misunderstanding, but I am only able to assist with S3 bucket functions. Can you please ask a question related to S3 bucket functions?

提供的函数不仅限于仅仅检索信息。它们还可以帮助用户上传或下载文件。

下载文件

search_file = '<file_name>'
bucket_name = '<bucket_name>'
local_directory = '<directory_path>'
print(run_conversation(f'download {search_file} from {bucket_name} bucket to {local_directory} directory'))

上传文件

local_file = '<file_name>'
bucket_name = '<bucket_name>'
print(run_conversation(f'upload {local_file} to {bucket_name} bucket'))