''' 工作流应用流式输出API的调用示例 ''' import requests import json # 使用requests 接收流式信息 def api_call(base_url: str, api_key: str, inputs: dict, query : str): bearer = 'Bearer ' + api_key headers = { 'authorization': bearer, 'Content-Type': 'application/json' } req_data = { "inputs": inputs, "query": query, "response_mode": "streaming", "user": "abc-123" # user填写用于区分不同调用方,每个调用方一个唯一user名称为好 } json_req_data = json.dumps(req_data) print('###################开始调用工作流应用###################') response = requests.post(base_url, data=json_req_data, headers=headers, stream=True) # 用于记录会话id conversation_id = None print('###################工作流应用的流式输出###################') # 获取chunk response # chunk_size设置为None,同时requests中stream=True,则不管多大的chunk都放进来 for chunk in response.iter_content(chunk_size=None, decode_unicode=True): orig_text = str(chunk).strip() if (not str(orig_text).startswith('data: ')) :# 判断是否流式输出消息格式 continue new_text = orig_text[6:] if not (new_text.find('"event": "workflow_started"')>=0 or new_text.find('"event": "message"')>=0 or new_text.find('"event": "workflow_finished"')>=0): # 属于工作流中间节点的输出消息,不处理 continue #print( new_text) resp_data_dict = json.loads(new_text) resp_event_name = resp_data_dict['event'] if resp_event_name == 'workflow_started': # 表示工作流应用开始执行,记录下conversation_id备用 conversation_id = resp_data_dict['conversation_id'] elif resp_event_name == 'message': # 表示工作流应用输出的内容,需要获取和输出 resp_text = resp_data_dict['answer'] print(resp_text, end="") elif resp_event_name == 'workflow_finished': # 表示工作流应用执行完成 print("") print('###################工作流应用的流式输出结束###################') break # 调用测试 if __name__ == '__main__': base_url = 'http://192.168.2.37/v1/chat-messages' api_key = 'app-eZmJGbEtekEkPKG6zQhgMGOQ' # 应用的输入变量和值 inputs = {} # 对应用的提问 query = '查一下广西柳工' # api调用,流式 api_call(base_url, api_key, inputs, query)