import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import okhttp3.*; import okhttp3.sse.EventSource; import okhttp3.sse.EventSourceListener; import okhttp3.sse.EventSources; import org.jetbrains.annotations.Nullable; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; /** * @author : dxw * @version : 1.0.0 * @description : * @createDate : 2024/8/26 12:17 **/ public class JavaDemo { private static final String SSE_URL = "http://192.168.2.37/v1/chat-messages"; // 替换为你的SSE端点地址 private static final String API_KEY = "app-eZmJGbEtekEkPKG6zQhgMGOQ";//api_key public static void main(String[] args) { OkHttpClient client = new OkHttpClient.Builder() .readTimeout(0, TimeUnit.MILLISECONDS) // 设置无限读取超时,因为SSE是长连接 .build(); //请求体json String json = "{\"inputs\": {},\"query\": \"中国移动\",\"response_mode\": \"streaming\",\"conversation_id\": \"\",\"user\": \"admin\"}"; RequestBody body = RequestBody.create(json,MediaType.parse("application/json; charset=utf-8")); // 请求对象 Request request = new Request.Builder() .url(SSE_URL) .header("authorization","Bearer "+API_KEY) .header("Content-Type","application/json") .header("accept","text/event-stream") .post(body) .build(); // 自定义监听器 EventSourceListener eventSourceListener = new EventSourceListener() { @Override public void onOpen(EventSource eventSource, Response response) { super.onOpen(eventSource, response); System.out.println("open"); } @Override public void onEvent(EventSource eventSource, @Nullable String id, @Nullable String type, String data) { // 接受消息 data super.onEvent(eventSource, id, type, data); // System.out.println(data); Map map = new HashMap<>(); ObjectMapper objectMapper = new ObjectMapper(); try { map = objectMapper.readValue(data,HashMap.class); } catch (JsonProcessingException e) { throw new RuntimeException(e); } if( map != null && !map.isEmpty() ) { Object event = map.get("event"); switch (event.toString()) { //workflow 开始执行 case "workflow_started": { System.out.println("----------工作流开始----------"); } break; //消息结束事件 case "message_end": { System.out.println(""); System.out.println("----------工作流结束----------"); } break; //LLM 返回文本块事件 case "message":{ Object answer = map.get("answer"); System.out.print(decodeUnicode(answer.toString())); } break; default: } } } @Override public void onClosed(EventSource eventSource) { super.onClosed(eventSource); System.out.println("closed"); } @Override public void onFailure(EventSource eventSource, @Nullable Throwable t, @Nullable Response response) { super.onFailure(eventSource, t, response); System.out.println(t.getMessage()); } }; EventSource.Factory factory = EventSources.createFactory(client); // 创建事件 EventSource eventSource = factory.newEventSource(request, eventSourceListener); } //unicode解码成utf8 private static String decodeUnicode(String unicode) { String chineseString = ""; if( unicode != null ) { try { byte[] utf8Bytes = unicode.getBytes("UTF-8"); chineseString = new String(utf8Bytes, "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } } return chineseString; } }