|
| 1 | +# encoding:utf-8 |
| 2 | + |
| 3 | +import json |
| 4 | +import os |
| 5 | +from bridge.bridge import Bridge |
| 6 | +from bridge.context import ContextType |
| 7 | +from bridge.reply import Reply, ReplyType |
| 8 | +import plugins |
| 9 | +from plugins import * |
| 10 | +from common.log import logger |
| 11 | + |
| 12 | + |
| 13 | +class RolePlay(): |
| 14 | + def __init__(self, bot, sessionid, desc, wrapper=None): |
| 15 | + self.bot = bot |
| 16 | + self.sessionid = sessionid |
| 17 | + bot.sessions.clear_session(sessionid) |
| 18 | + bot.sessions.build_session(sessionid, desc) |
| 19 | + self.wrapper = wrapper or "%s" # 用于包装用户输入 |
| 20 | + |
| 21 | + def reset(self): |
| 22 | + self.bot.sessions.clear_session(self.sessionid) |
| 23 | + |
| 24 | + def action(self, user_action): |
| 25 | + prompt = self.wrapper % user_action |
| 26 | + return prompt |
| 27 | + |
| 28 | +@plugins.register(name="Role", desc="为你的Bot设置预设角色", version="1.0", author="lanvent", desire_priority= 0) |
| 29 | +class Role(Plugin): |
| 30 | + def __init__(self): |
| 31 | + super().__init__() |
| 32 | + curdir = os.path.dirname(__file__) |
| 33 | + config_path = os.path.join(curdir, "roles.json") |
| 34 | + try: |
| 35 | + with open(config_path, "r", encoding="utf-8") as f: |
| 36 | + config = json.load(f) |
| 37 | + self.roles = {role["title"].lower(): role for role in config["roles"]} |
| 38 | + if len(self.roles) == 0: |
| 39 | + raise Exception("no role found") |
| 40 | + self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context |
| 41 | + self.roleplays = {} |
| 42 | + logger.info("[Role] inited") |
| 43 | + except FileNotFoundError: |
| 44 | + logger.error(f"[Role] init failed, {config_path} not found") |
| 45 | + except Exception as e: |
| 46 | + logger.error("[Role] init failed, exception: %s" % e) |
| 47 | + |
| 48 | + def get_role(self, name, find_closest=True): |
| 49 | + name = name.lower() |
| 50 | + found_role = None |
| 51 | + if name in self.roles: |
| 52 | + found_role = name |
| 53 | + elif find_closest: |
| 54 | + import difflib |
| 55 | + |
| 56 | + def str_simularity(a, b): |
| 57 | + return difflib.SequenceMatcher(None, a, b).ratio() |
| 58 | + max_sim = 0.0 |
| 59 | + max_role = None |
| 60 | + for role in self.roles: |
| 61 | + sim = str_simularity(name, role) |
| 62 | + if sim >= max_sim: |
| 63 | + max_sim = sim |
| 64 | + max_role = role |
| 65 | + found_role = max_role |
| 66 | + return found_role |
| 67 | + |
| 68 | + def on_handle_context(self, e_context: EventContext): |
| 69 | + |
| 70 | + if e_context['context'].type != ContextType.TEXT: |
| 71 | + return |
| 72 | + bottype = Bridge().get_bot_type("chat") |
| 73 | + if bottype != "chatGPT": |
| 74 | + return |
| 75 | + bot = Bridge().get_bot("chat") |
| 76 | + content = e_context['context'].content[:] |
| 77 | + clist = e_context['context'].content.split(maxsplit=1) |
| 78 | + desckey = None |
| 79 | + sessionid = e_context['context']['session_id'] |
| 80 | + if clist[0] == "$停止扮演": |
| 81 | + if sessionid in self.roleplays: |
| 82 | + self.roleplays[sessionid].reset() |
| 83 | + del self.roleplays[sessionid] |
| 84 | + reply = Reply(ReplyType.INFO, "角色扮演结束!") |
| 85 | + e_context['reply'] = reply |
| 86 | + e_context.action = EventAction.BREAK_PASS |
| 87 | + return |
| 88 | + elif clist[0] == "$角色": |
| 89 | + desckey = "descn" |
| 90 | + elif clist[0].lower() == "$role": |
| 91 | + desckey = "description" |
| 92 | + elif sessionid not in self.roleplays: |
| 93 | + return |
| 94 | + logger.debug("[Role] on_handle_context. content: %s" % content) |
| 95 | + if desckey is not None: |
| 96 | + if len(clist) == 1 or (len(clist) > 1 and clist[1].lower() in ["help", "帮助"]): |
| 97 | + reply = Reply(ReplyType.INFO, self.get_help_text()) |
| 98 | + e_context['reply'] = reply |
| 99 | + e_context.action = EventAction.BREAK_PASS |
| 100 | + return |
| 101 | + role = self.get_role(clist[1]) |
| 102 | + if role is None: |
| 103 | + reply = Reply(ReplyType.ERROR, "角色不存在") |
| 104 | + e_context['reply'] = reply |
| 105 | + e_context.action = EventAction.BREAK_PASS |
| 106 | + return |
| 107 | + else: |
| 108 | + self.roleplays[sessionid] = RolePlay(bot, sessionid, self.roles[role][desckey],self.roles[role].get("wrapper","%s")) |
| 109 | + reply = Reply(ReplyType.INFO, f"角色设定为 {role} :\n"+self.roles[role][desckey]) |
| 110 | + e_context['reply'] = reply |
| 111 | + e_context.action = EventAction.BREAK_PASS |
| 112 | + else: |
| 113 | + prompt = self.roleplays[sessionid].action(content) |
| 114 | + e_context['context'].type = ContextType.TEXT |
| 115 | + e_context['context'].content = prompt |
| 116 | + e_context.action = EventAction.CONTINUE |
| 117 | + |
| 118 | + def get_help_text(self): |
| 119 | + help_text = "输入\"$角色 (角色名)\"或\"$role (角色名)\"为我设定角色吧,#reset 可以清除设定的角色。\n目前可用角色列表:\n" |
| 120 | + for role in self.roles: |
| 121 | + help_text += f"[{role}]: {self.roles[role]['remark']}\n" |
| 122 | + return help_text |
0 commit comments