# encoding:utf-8 import time import json import requests from models.bot import Bot from models.minimax.minimax_session import MinimaxSession from models.session_manager import SessionManager from bridge.context import Context, ContextType from bridge.reply import Reply, ReplyType from common.log import logger from config import conf, load_config from common import const # MiniMax对话模型API class MinimaxBot(Bot): def __init__(self): super().__init__() self.args = { "model": conf().get("model") or "MiniMax-M2.1", "temperature": conf().get("temperature", 0.3), "top_p": conf().get("top_p", 0.95), } # Use unified key name: minimax_api_key self.api_key = conf().get("minimax_api_key") if not self.api_key: # Fallback to old key name for backward compatibility self.api_key = conf().get("Minimax_api_key") if self.api_key: logger.warning("[MINIMAX] 'Minimax_api_key' is deprecated, please use 'minimax_api_key' instead") # REST API endpoint # Use Chinese endpoint by default, users can override in config # International users should set: "minimax_api_base": "https://api.minimax.io/v1" self.api_base = conf().get("minimax_api_base", "https://api.minimaxi.com/v1") self.sessions = SessionManager(MinimaxSession, model=const.MiniMax) def reply(self, query, context: Context = None) -> Reply: # acquire reply content logger.info("[MINIMAX] query={}".format(query)) if context.type == ContextType.TEXT: session_id = context["session_id"] reply = None clear_memory_commands = conf().get("clear_memory_commands", ["#清除记忆"]) if query in clear_memory_commands: self.sessions.clear_session(session_id) reply = Reply(ReplyType.INFO, "记忆已清除") elif query == "#清除所有": self.sessions.clear_all_session() reply = Reply(ReplyType.INFO, "所有人记忆已清除") elif query == "#更新配置": load_config() reply = Reply(ReplyType.INFO, "配置已更新") if reply: return reply session = self.sessions.session_query(query, session_id) logger.debug("[MINIMAX] session query={}".format(session)) model = context.get("Minimax_model") new_args = self.args.copy() if model: new_args["model"] = model reply_content = self.reply_text(session, args=new_args) logger.debug( "[MINIMAX] new_query={}, session_id={}, reply_cont={}, completion_tokens={}".format( session.messages, session_id, reply_content["content"], reply_content["completion_tokens"], ) ) if reply_content["completion_tokens"] == 0 and len(reply_content["content"]) > 0: reply = Reply(ReplyType.ERROR, reply_content["content"]) elif reply_content["completion_tokens"] > 0: self.sessions.session_reply(reply_content["content"], session_id, reply_content["total_tokens"]) reply = Reply(ReplyType.TEXT, reply_content["content"]) else: reply = Reply(ReplyType.ERROR, reply_content["content"]) logger.debug("[MINIMAX] reply {} used 0 tokens.".format(reply_content)) return reply else: reply = Reply(ReplyType.ERROR, "Bot不支持处理{}类型的消息".format(context.type)) return reply def reply_text(self, session: MinimaxSession, args=None, retry_count=0) -> dict: """ Call MiniMax API to get the answer using REST API :param session: a conversation session :param args: request arguments :param retry_count: retry count :return: {} """ try: if args is None: args = self.args # Build request headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } request_body = { "model": args.get("model", self.args["model"]), "messages": session.messages, "temperature": args.get("temperature", self.args["temperature"]), "top_p": args.get("top_p", self.args["top_p"]), } url = f"{self.api_base}/chat/completions" logger.debug(f"[MINIMAX] Calling {url} with model={request_body['model']}") response = requests.post(url, headers=headers, json=request_body, timeout=60) if response.status_code == 200: result = response.json() content = result["choices"][0]["message"]["content"] total_tokens = result["usage"]["total_tokens"] completion_tokens = result["usage"]["completion_tokens"] logger.debug(f"[MINIMAX] reply_text: content_length={len(content)}, tokens={total_tokens}") return { "total_tokens": total_tokens, "completion_tokens": completion_tokens, "content": content, } else: error_msg = response.text logger.error(f"[MINIMAX] API error: status={response.status_code}, msg={error_msg}") # Parse error for better messages result = {"completion_tokens": 0, "content": "我现在有点累了,等会再来吧"} need_retry = False if response.status_code >= 500: logger.warning(f"[MINIMAX] Server error, retry={retry_count}") need_retry = retry_count < 2 elif response.status_code == 401: result["content"] = "授权失败,请检查API Key是否正确" need_retry = False elif response.status_code == 429: result["content"] = "请求过于频繁,请稍后再试" need_retry = retry_count < 2 else: need_retry = False if need_retry: time.sleep(3) return self.reply_text(session, args, retry_count + 1) else: return result except requests.exceptions.Timeout: logger.error("[MINIMAX] Request timeout") need_retry = retry_count < 2 result = {"completion_tokens": 0, "content": "请求超时,请稍后再试"} if need_retry: time.sleep(3) return self.reply_text(session, args, retry_count + 1) else: return result except Exception as e: logger.error(f"[MINIMAX] reply_text error: {e}") import traceback logger.error(traceback.format_exc()) need_retry = retry_count < 2 result = {"completion_tokens": 0, "content": "我现在有点累了,等会再来吧"} if need_retry: time.sleep(3) return self.reply_text(session, args, retry_count + 1) else: return result def call_with_tools(self, messages, tools=None, stream=False, **kwargs): """ Call MiniMax API with tool support for agent integration This method handles: 1. Format conversion (Claude format → OpenAI format) 2. System prompt injection 3. API calling with REST API 4. Interleaved Thinking support (reasoning_split=True) Args: messages: List of messages (may be in Claude format from agent) tools: List of tool definitions (may be in Claude format from agent) stream: Whether to use streaming **kwargs: Additional parameters (max_tokens, temperature, system, etc.) Returns: Formatted response or generator for streaming """ try: # Convert messages from Claude format to OpenAI format converted_messages = self._convert_messages_to_openai_format(messages) # Extract and inject system prompt if provided system_prompt = kwargs.pop("system", None) if system_prompt: # Add system message at the beginning converted_messages.insert(0, {"role": "system", "content": system_prompt}) # Convert tools from Claude format to OpenAI format converted_tools = None if tools: converted_tools = self._convert_tools_to_openai_format(tools) # Prepare API parameters model = kwargs.pop("model", None) or self.args["model"] max_tokens = kwargs.pop("max_tokens", 4096) temperature = kwargs.pop("temperature", self.args["temperature"]) # Build request body request_body = { "model": model, "messages": converted_messages, "max_tokens": max_tokens, "temperature": temperature, "stream": stream, } # Add tools if provided if converted_tools: request_body["tools"] = converted_tools # Add reasoning_split=True for better thinking control (M2.1 feature) # This separates thinking content into reasoning_details field request_body["reasoning_split"] = True logger.debug(f"[MINIMAX] API call: model={model}, tools={len(converted_tools) if converted_tools else 0}, stream={stream}") # Check if we should show thinking process show_thinking = kwargs.pop("show_thinking", conf().get("minimax_show_thinking", False)) if stream: return self._handle_stream_response(request_body, show_thinking=show_thinking) else: return self._handle_sync_response(request_body) except Exception as e: logger.error(f"[MINIMAX] call_with_tools error: {e}") import traceback logger.error(traceback.format_exc()) def error_generator(): yield {"error": True, "message": str(e), "status_code": 500} return error_generator() def _convert_messages_to_openai_format(self, messages): """ Convert messages from Claude format to OpenAI format Claude format: - role: "user" | "assistant" - content: string | list of content blocks OpenAI format: - role: "user" | "assistant" | "tool" - content: string - tool_calls: list (for assistant) - tool_call_id: string (for tool results) """ converted = [] for msg in messages: role = msg.get("role") content = msg.get("content") if role == "user": # Handle user message if isinstance(content, list): # Extract text from content blocks text_parts = [] tool_result = None for block in content: if isinstance(block, dict): if block.get("type") == "text": text_parts.append(block.get("text", "")) elif block.get("type") == "tool_result": # Tool result should be a separate message with role="tool" tool_result = { "role": "tool", "tool_call_id": block.get("tool_use_id"), "content": str(block.get("content", "")) } if text_parts: converted.append({ "role": "user", "content": "\n".join(text_parts) }) if tool_result: converted.append(tool_result) else: # Simple text content converted.append({ "role": "user", "content": str(content) }) elif role == "assistant": # Handle assistant message openai_msg = {"role": "assistant"} if isinstance(content, list): # Parse content blocks text_parts = [] tool_calls = [] for block in content: if isinstance(block, dict): if block.get("type") == "text": text_parts.append(block.get("text", "")) elif block.get("type") == "tool_use": # Convert to OpenAI tool_calls format tool_calls.append({ "id": block.get("id"), "type": "function", "function": { "name": block.get("name"), "arguments": json.dumps(block.get("input", {})) } }) # Set content (can be empty if only tool calls) if text_parts: openai_msg["content"] = "\n".join(text_parts) elif not tool_calls: openai_msg["content"] = "" # Set tool_calls if tool_calls: openai_msg["tool_calls"] = tool_calls # When tool_calls exist and content is empty, set to None if not text_parts: openai_msg["content"] = None else: # Simple text content openai_msg["content"] = str(content) if content else "" converted.append(openai_msg) return converted def _convert_tools_to_openai_format(self, tools): """ Convert tools from Claude format to OpenAI format Claude format: { "name": "tool_name", "description": "description", "input_schema": {...} } OpenAI format: { "type": "function", "function": { "name": "tool_name", "description": "description", "parameters": {...} } } """ converted = [] for tool in tools: converted.append({ "type": "function", "function": { "name": tool.get("name"), "description": tool.get("description"), "parameters": tool.get("input_schema", {}) } }) return converted def _handle_sync_response(self, request_body): """Handle synchronous API response""" try: headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } # Remove stream from body for sync request request_body.pop("stream", None) url = f"{self.api_base}/chat/completions" response = requests.post(url, headers=headers, json=request_body, timeout=60) if response.status_code != 200: error_msg = response.text logger.error(f"[MINIMAX] API error: status={response.status_code}, msg={error_msg}") yield {"error": True, "message": error_msg, "status_code": response.status_code} return result = response.json() message = result["choices"][0]["message"] finish_reason = result["choices"][0]["finish_reason"] # Build response in Claude-like format response_data = { "role": "assistant", "content": [] } # Add reasoning_details (thinking) if present if "reasoning_details" in message: for reasoning in message["reasoning_details"]: if "text" in reasoning: response_data["content"].append({ "type": "thinking", "thinking": reasoning["text"] }) # Add text content if present if message.get("content"): response_data["content"].append({ "type": "text", "text": message["content"] }) # Add tool calls if present if message.get("tool_calls"): for tool_call in message["tool_calls"]: response_data["content"].append({ "type": "tool_use", "id": tool_call["id"], "name": tool_call["function"]["name"], "input": json.loads(tool_call["function"]["arguments"]) }) # Set stop_reason if finish_reason == "tool_calls": response_data["stop_reason"] = "tool_use" elif finish_reason == "stop": response_data["stop_reason"] = "end_turn" else: response_data["stop_reason"] = finish_reason yield response_data except requests.exceptions.Timeout: logger.error("[MINIMAX] Request timeout") yield {"error": True, "message": "Request timeout", "status_code": 500} except Exception as e: logger.error(f"[MINIMAX] sync response error: {e}") import traceback logger.error(traceback.format_exc()) yield {"error": True, "message": str(e), "status_code": 500} def _handle_stream_response(self, request_body, show_thinking=False): """Handle streaming API response Args: request_body: API request parameters show_thinking: Whether to show thinking/reasoning process to users """ try: headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } url = f"{self.api_base}/chat/completions" response = requests.post(url, headers=headers, json=request_body, stream=True, timeout=60) if response.status_code != 200: error_msg = response.text logger.error(f"[MINIMAX] API error: status={response.status_code}, msg={error_msg}") yield {"error": True, "message": error_msg, "status_code": response.status_code} return current_content = [] current_tool_calls = {} current_reasoning = [] finish_reason = None chunk_count = 0 # Process SSE stream for line in response.iter_lines(): if not line: continue line = line.decode('utf-8') if not line.startswith('data: '): continue data_str = line[6:] # Remove 'data: ' prefix if data_str.strip() == '[DONE]': break try: chunk = json.loads(data_str) chunk_count += 1 except json.JSONDecodeError as e: logger.warning(f"[MINIMAX] JSON decode error: {e}, data: {data_str[:100]}") continue # Check for error response (MiniMax format) if chunk.get("type") == "error" or "error" in chunk: error_data = chunk.get("error", {}) error_msg = error_data.get("message", "Unknown error") error_type = error_data.get("type", "") http_code = error_data.get("http_code", "") logger.error(f"[MINIMAX] API error: {error_msg} (type: {error_type}, code: {http_code})") yield { "error": True, "message": error_msg, "status_code": int(http_code) if http_code.isdigit() else 500 } return if not chunk.get("choices"): continue choice = chunk["choices"][0] delta = choice.get("delta", {}) # Handle reasoning_details (thinking) if "reasoning_details" in delta: for reasoning in delta["reasoning_details"]: if "text" in reasoning: reasoning_id = reasoning.get("id", "reasoning-text-1") reasoning_index = reasoning.get("index", 0) reasoning_text = reasoning["text"] # Accumulate reasoning text if reasoning_index >= len(current_reasoning): current_reasoning.append({"id": reasoning_id, "text": ""}) current_reasoning[reasoning_index]["text"] += reasoning_text # Optionally yield thinking as visible content if show_thinking: # Format thinking text for display formatted_thinking = f"💭 {reasoning_text}" # Yield as OpenAI-format content delta yield { "choices": [{ "index": 0, "delta": { "role": "assistant", "content": formatted_thinking } }] } # Handle text content if "content" in delta and delta["content"]: # Start new content block if needed if not any(block.get("type") == "text" for block in current_content): current_content.append({"type": "text", "text": ""}) # Accumulate text for block in current_content: if block.get("type") == "text": block["text"] += delta["content"] break # Yield OpenAI-format delta (for agent_stream.py compatibility) yield { "choices": [{ "index": 0, "delta": { "role": "assistant", "content": delta["content"] } }] } # Handle tool calls if "tool_calls" in delta: for tool_call_chunk in delta["tool_calls"]: index = tool_call_chunk.get("index", 0) if index not in current_tool_calls: # Start new tool call current_tool_calls[index] = { "id": tool_call_chunk.get("id", ""), "type": "tool_use", "name": tool_call_chunk.get("function", {}).get("name", ""), "input": "" } # Accumulate tool call arguments if "function" in tool_call_chunk and "arguments" in tool_call_chunk["function"]: current_tool_calls[index]["input"] += tool_call_chunk["function"]["arguments"] # Yield OpenAI-format tool call delta yield { "choices": [{ "index": 0, "delta": { "tool_calls": [tool_call_chunk] } }] } # Handle finish_reason if choice.get("finish_reason"): finish_reason = choice["finish_reason"] # Log complete reasoning_details for debugging if current_reasoning: logger.debug(f"[MINIMAX] ===== Complete Reasoning Details =====") for i, reasoning in enumerate(current_reasoning): reasoning_text = reasoning.get("text", "") logger.debug(f"[MINIMAX] Reasoning {i+1} (length={len(reasoning_text)}):") logger.debug(f"[MINIMAX] {reasoning_text}") logger.debug(f"[MINIMAX] ===== End Reasoning Details =====") # Yield final chunk with finish_reason (OpenAI format) yield { "choices": [{ "index": 0, "delta": {}, "finish_reason": finish_reason }] } except requests.exceptions.Timeout: logger.error("[MINIMAX] Request timeout") yield {"error": True, "message": "Request timeout", "status_code": 500} except Exception as e: logger.error(f"[MINIMAX] stream response error: {e}") import traceback logger.error(traceback.format_exc()) yield {"error": True, "message": str(e), "status_code": 500}