From 180550b8f073cd5ee0eaade3b0be3fc5940abd04 Mon Sep 17 00:00:00 2001 From: binary-husky Date: Wed, 30 Oct 2024 13:37:35 +0000 Subject: [PATCH] upgrade auto comment --- check_proxy.py | 97 ++++++++++++++++--- crazy_functional.py | 2 + crazy_functions/SourceCode_Comment.py | 40 ++++++-- crazy_functions/SourceCode_Comment_Wrap.py | 36 +++++++ .../agent_fns/python_comment_agent.py | 68 ++++++++++++- crazy_functions/ast_fns/comment_remove.py | 80 ++++++++------- 6 files changed, 261 insertions(+), 62 deletions(-) create mode 100644 crazy_functions/SourceCode_Comment_Wrap.py diff --git a/check_proxy.py b/check_proxy.py index b5ee17d30a..6124a6efcb 100644 --- a/check_proxy.py +++ b/check_proxy.py @@ -1,24 +1,36 @@ from loguru import logger def check_proxy(proxies, return_ip=False): + """ + 检查代理配置并返回结果。 + + Args: + proxies (dict): 包含http和https代理配置的字典。 + return_ip (bool, optional): 是否返回代理的IP地址。默认为False。 + + Returns: + str or None: 检查的结果信息或代理的IP地址(如果`return_ip`为True)。 + """ import requests proxies_https = proxies['https'] if proxies is not None else '无' ip = None try: - response = requests.get("https://ipapi.co/json/", proxies=proxies, timeout=4) + response = requests.get("https://ipapi.co/json/", proxies=proxies, timeout=4) # ⭐ 执行GET请求以获取代理信息 data = response.json() if 'country_name' in data: country = data['country_name'] result = f"代理配置 {proxies_https}, 代理所在地:{country}" - if 'ip' in data: ip = data['ip'] + if 'ip' in data: + ip = data['ip'] elif 'error' in data: - alternative, ip = _check_with_backup_source(proxies) + alternative, ip = _check_with_backup_source(proxies) # ⭐ 调用备用方法检查代理配置 if alternative is None: result = f"代理配置 {proxies_https}, 代理所在地:未知,IP查询频率受限" else: result = f"代理配置 {proxies_https}, 代理所在地:{alternative}" else: result = f"代理配置 {proxies_https}, 代理数据解析失败:{data}" + if not return_ip: logger.warning(result) return result @@ -33,17 +45,33 @@ def check_proxy(proxies, return_ip=False): return ip def _check_with_backup_source(proxies): + """ + 通过备份源检查代理,并获取相应信息。 + + Args: + proxies (dict): 包含代理信息的字典。 + + Returns: + tuple: 代理信息(geo)和IP地址(ip)的元组。 + """ import random, string, requests random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=32)) try: - res_json = requests.get(f"http://{random_string}.edns.ip-api.com/json", proxies=proxies, timeout=4).json() + res_json = requests.get(f"http://{random_string}.edns.ip-api.com/json", proxies=proxies, timeout=4).json() # ⭐ 执行代理检查和备份源请求 return res_json['dns']['geo'], res_json['dns']['ip'] except: return None, None def backup_and_download(current_version, remote_version): """ - 一键更新协议:备份和下载 + 一键更新协议:备份当前版本,下载远程版本并解压缩。 + + Args: + current_version (str): 当前版本号。 + remote_version (str): 远程版本号。 + + Returns: + str: 新版本目录的路径。 """ from toolbox import get_conf import shutil @@ -60,7 +88,7 @@ def backup_and_download(current_version, remote_version): proxies = get_conf('proxies') try: r = requests.get('https://github.com/binary-husky/chatgpt_academic/archive/refs/heads/master.zip', proxies=proxies, stream=True) except: r = requests.get('https://public.agent-matrix.com/publish/master.zip', proxies=proxies, stream=True) - zip_file_path = backup_dir+'/master.zip' + zip_file_path = backup_dir+'/master.zip' # ⭐ 保存备份文件的路径 with open(zip_file_path, 'wb+') as f: f.write(r.content) dst_path = new_version_dir @@ -76,6 +104,17 @@ def backup_and_download(current_version, remote_version): def patch_and_restart(path): """ 一键更新协议:覆盖和重启 + + Args: + path (str): 新版本代码所在的路径 + + 注意事项: + 如果您的程序没有使用config_private.py私密配置文件,则会将config.py重命名为config_private.py以避免配置丢失。 + + 更新流程: + - 复制最新版本代码到当前目录 + - 更新pip包依赖 + - 如果更新失败,则提示手动安装依赖库并重启 """ from distutils import dir_util import shutil @@ -84,32 +123,43 @@ def patch_and_restart(path): import time import glob from shared_utils.colorful import log亮黄, log亮绿, log亮红 - # if not using config_private, move origin config.py as config_private.py + if not os.path.exists('config_private.py'): log亮黄('由于您没有设置config_private.py私密配置,现将您的现有配置移动至config_private.py以防止配置丢失,', '另外您可以随时在history子文件夹下找回旧版的程序。') shutil.copyfile('config.py', 'config_private.py') + path_new_version = glob.glob(path + '/*-master')[0] - dir_util.copy_tree(path_new_version, './') + dir_util.copy_tree(path_new_version, './') # ⭐ 将最新版本代码复制到当前目录 + log亮绿('代码已经更新,即将更新pip包依赖……') for i in reversed(range(5)): time.sleep(1); log亮绿(i) + try: import subprocess subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-r', 'requirements.txt']) except: log亮红('pip包依赖安装出现问题,需要手动安装新增的依赖库 `python -m pip install -r requirements.txt`,然后在用常规的`python main.py`的方式启动。') + log亮绿('更新完成,您可以随时在history子文件夹下找回旧版的程序,5s之后重启') log亮红('假如重启失败,您可能需要手动安装新增的依赖库 `python -m pip install -r requirements.txt`,然后在用常规的`python main.py`的方式启动。') log亮绿(' ------------------------------ -----------------------------------') + for i in reversed(range(8)): time.sleep(1); log亮绿(i) - os.execl(sys.executable, sys.executable, *sys.argv) + os.execl(sys.executable, sys.executable, *sys.argv) # 重启程序 def get_current_version(): + """ + 获取当前的版本号。 + + Returns: + str: 当前的版本号。如果无法获取版本号,则返回空字符串。 + """ import json try: with open('./version', 'r', encoding='utf8') as f: - current_version = json.loads(f.read())['version'] + current_version = json.loads(f.read())['version'] # ⭐ 从读取的json数据中提取版本号 except: current_version = "" return current_version @@ -118,6 +168,12 @@ def get_current_version(): def auto_update(raise_error=False): """ 一键更新协议:查询版本和用户意见 + + Args: + raise_error (bool, optional): 是否在出错时抛出错误。默认为 False。 + + Returns: + None """ try: from toolbox import get_conf @@ -137,13 +193,13 @@ def auto_update(raise_error=False): current_version = json.loads(current_version)['version'] if (remote_version - current_version) >= 0.01-1e-5: from shared_utils.colorful import log亮黄 - log亮黄(f'\n新版本可用。新版本:{remote_version},当前版本:{current_version}。{new_feature}') + log亮黄(f'\n新版本可用。新版本:{remote_version},当前版本:{current_version}。{new_feature}') # ⭐ 在控制台打印新版本信息 logger.info('(1)Github更新地址:\nhttps://github.com/binary-husky/chatgpt_academic\n') user_instruction = input('(2)是否一键更新代码(Y+回车=确认,输入其他/无输入+回车=不更新)?') if user_instruction in ['Y', 'y']: - path = backup_and_download(current_version, remote_version) + path = backup_and_download(current_version, remote_version) # ⭐ 备份并下载文件 try: - patch_and_restart(path) + patch_and_restart(path) # ⭐ 执行覆盖并重启操作 except: msg = '更新失败。' if raise_error: @@ -163,6 +219,9 @@ def auto_update(raise_error=False): logger.info(msg) def warm_up_modules(): + """ + 预热模块,加载特定模块并执行预热操作。 + """ logger.info('正在执行一些模块的预热 ...') from toolbox import ProxyNetworkActivate from request_llms.bridge_all import model_info @@ -173,6 +232,16 @@ def warm_up_modules(): enc.encode("模块预热", disallowed_special=()) def warm_up_vectordb(): + """ + 执行一些模块的预热操作。 + + 本函数主要用于执行一些模块的预热操作,确保在后续的流程中能够顺利运行。 + + ⭐ 关键作用:预热模块 + + Returns: + None + """ logger.info('正在执行一些模块的预热 ...') from toolbox import ProxyNetworkActivate with ProxyNetworkActivate("Warmup_Modules"): @@ -185,4 +254,4 @@ def warm_up_vectordb(): os.environ['no_proxy'] = '*' # 避免代理网络产生意外污染 from toolbox import get_conf proxies = get_conf('proxies') - check_proxy(proxies) + check_proxy(proxies) \ No newline at end of file diff --git a/crazy_functional.py b/crazy_functional.py index de07c1bb12..92bc2842cd 100644 --- a/crazy_functional.py +++ b/crazy_functional.py @@ -49,6 +49,7 @@ def get_crazy_functions(): from crazy_functions.Image_Generate import 图片生成_DALLE2, 图片生成_DALLE3, 图片修改_DALLE2 from crazy_functions.Image_Generate_Wrap import ImageGen_Wrap from crazy_functions.SourceCode_Comment import 注释Python项目 + from crazy_functions.SourceCode_Comment_Wrap import SourceCodeComment_Wrap function_plugins = { "虚空终端": { @@ -71,6 +72,7 @@ def get_crazy_functions(): "AsButton": False, "Info": "上传一系列python源文件(或者压缩包), 为这些代码添加docstring | 输入参数为路径", "Function": HotReload(注释Python项目), + "Class": SourceCodeComment_Wrap, }, "载入对话历史存档(先上传存档或输入路径)": { "Group": "对话", diff --git a/crazy_functions/SourceCode_Comment.py b/crazy_functions/SourceCode_Comment.py index c45da83b4f..9d9969ab95 100644 --- a/crazy_functions/SourceCode_Comment.py +++ b/crazy_functions/SourceCode_Comment.py @@ -6,9 +6,11 @@ from crazy_functions.crazy_utils import request_gpt_model_in_new_thread_with_ui_alive from crazy_functions.agent_fns.python_comment_agent import PythonCodeComment from crazy_functions.diagram_fns.file_tree import FileNode +from crazy_functions.agent_fns.watchdog import WatchDog from shared_utils.advanced_markdown_format import markdown_convertion_for_file from loguru import logger + def 注释源代码(file_manifest, project_folder, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt): summary_batch_isolation = True @@ -25,12 +27,13 @@ def 注释源代码(file_manifest, project_folder, llm_kwargs, plugin_kwargs, ch file_tree_struct.add_file(file_path, file_path) # <第一步,逐个文件分析,多线程> + lang = "" if not plugin_kwargs["use_chinese"] else " (you must use Chinese)" for index, fp in enumerate(file_manifest): # 读取文件 with open(fp, 'r', encoding='utf-8', errors='replace') as f: file_content = f.read() prefix = "" - i_say = prefix + f'Please conclude the following source code at {os.path.relpath(fp, project_folder)} with only one sentence, the code is:\n```{file_content}```' + i_say = prefix + f'Please conclude the following source code at {os.path.relpath(fp, project_folder)} with only one sentence{lang}, the code is:\n```{file_content}```' i_say_show_user = prefix + f'[{index+1}/{len(file_manifest)}] 请用一句话对下面的程序文件做一个整体概述: {fp}' # 装载请求内容 MAX_TOKEN_SINGLE_FILE = 2560 @@ -38,7 +41,7 @@ def 注释源代码(file_manifest, project_folder, llm_kwargs, plugin_kwargs, ch inputs_array.append(i_say) inputs_show_user_array.append(i_say_show_user) history_array.append([]) - sys_prompt_array.append("You are a software architecture analyst analyzing a source code project. Do not dig into details, tell me what the code is doing in general. Your answer must be short, simple and clear.") + sys_prompt_array.append(f"You are a software architecture analyst analyzing a source code project. Do not dig into details, tell me what the code is doing in general. Your answer must be short, simple and clear{lang}.") # 文件读取完成,对每一个源代码文件,生成一个请求线程,发送到大模型进行分析 gpt_response_collection = yield from request_gpt_model_multi_threads_with_very_awesome_ui_and_high_efficiency( inputs_array = inputs_array, @@ -51,10 +54,20 @@ def 注释源代码(file_manifest, project_folder, llm_kwargs, plugin_kwargs, ch ) # <第二步,逐个文件分析,生成带注释文件> + tasks = ["" for _ in range(len(file_manifest))] + def bark_fn(tasks): + for i in range(len(tasks)): tasks[i] = "watchdog is dead" + wd = WatchDog(timeout=10, bark_fn=lambda: bark_fn(tasks), interval=3, msg="ThreadWatcher timeout") + wd.begin_watch() from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(max_workers=get_conf('DEFAULT_WORKER_NUM')) - def _task_multi_threading(i_say, gpt_say, fp, file_tree_struct): - pcc = PythonCodeComment(llm_kwargs, language='English') + def _task_multi_threading(i_say, gpt_say, fp, file_tree_struct, index): + language = 'Chinese' if plugin_kwargs["use_chinese"] else 'English' + def observe_window_update(x): + if tasks[index] == "watchdog is dead": + raise TimeoutError("ThreadWatcher: watchdog is dead") + tasks[index] = x + pcc = PythonCodeComment(llm_kwargs, plugin_kwargs, language=language, observe_window_update=observe_window_update) pcc.read_file(path=fp, brief=gpt_say) revised_path, revised_content = pcc.begin_comment_source_code(None, None) file_tree_struct.manifest[fp].revised_path = revised_path @@ -66,7 +79,8 @@ def _task_multi_threading(i_say, gpt_say, fp, file_tree_struct): with open("crazy_functions/agent_fns/python_comment_compare.html", 'r', encoding='utf-8') as f: html_template = f.read() warp = lambda x: "```python\n\n" + x + "\n\n```" - from themes.theme import advanced_css + from themes.theme import load_dynamic_theme + _, advanced_css, _, _ = load_dynamic_theme("Default") html_template = html_template.replace("ADVANCED_CSS", advanced_css) html_template = html_template.replace("REPLACE_CODE_FILE_LEFT", pcc.get_markdown_block_in_html(markdown_convertion_for_file(warp(pcc.original_content)))) html_template = html_template.replace("REPLACE_CODE_FILE_RIGHT", pcc.get_markdown_block_in_html(markdown_convertion_for_file(warp(revised_content)))) @@ -74,17 +88,21 @@ def _task_multi_threading(i_say, gpt_say, fp, file_tree_struct): file_tree_struct.manifest[fp].compare_html = compare_html_path with open(compare_html_path, 'w', encoding='utf-8') as f: f.write(html_template) - # print('done 1') + tasks[index] = "" chatbot.append([None, f"正在处理:"]) futures = [] + index = 0 for i_say, gpt_say, fp in zip(gpt_response_collection[0::2], gpt_response_collection[1::2], file_manifest): - future = executor.submit(_task_multi_threading, i_say, gpt_say, fp, file_tree_struct) + future = executor.submit(_task_multi_threading, i_say, gpt_say, fp, file_tree_struct, index) + index += 1 futures.append(future) + # <第三步,等待任务完成> cnt = 0 while True: cnt += 1 + wd.feed() time.sleep(3) worker_done = [h.done() for h in futures] remain = len(worker_done) - sum(worker_done) @@ -100,10 +118,11 @@ def _task_multi_threading(i_say, gpt_say, fp, file_tree_struct): file_links = generate_file_link(preview_html_list) yield from update_ui_lastest_msg( - f"剩余源文件数量: {remain}.\n\n" + - f"已完成的文件: {sum(worker_done)}.\n\n" + + f"当前任务:
{'
'.join(tasks)}.
" + + f"剩余源文件数量: {remain}.
" + + f"已完成的文件: {sum(worker_done)}.
" + file_links + - "\n\n" + + "
" + ''.join(['.']*(cnt % 10 + 1) ), chatbot=chatbot, history=history, delay=0) yield from update_ui(chatbot=chatbot, history=[]) # 刷新界面 @@ -124,6 +143,7 @@ def _task_multi_threading(i_say, gpt_say, fp, file_tree_struct): @CatchException def 注释Python项目(txt, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, user_request): history = [] # 清空历史,以免输入溢出 + plugin_kwargs["use_chinese"] = plugin_kwargs.get("use_chinese", False) import glob, os if os.path.exists(txt): project_folder = txt diff --git a/crazy_functions/SourceCode_Comment_Wrap.py b/crazy_functions/SourceCode_Comment_Wrap.py new file mode 100644 index 0000000000..b742552667 --- /dev/null +++ b/crazy_functions/SourceCode_Comment_Wrap.py @@ -0,0 +1,36 @@ + +from toolbox import get_conf, update_ui +from crazy_functions.plugin_template.plugin_class_template import GptAcademicPluginTemplate, ArgProperty +from crazy_functions.SourceCode_Comment import 注释Python项目 + +class SourceCodeComment_Wrap(GptAcademicPluginTemplate): + def __init__(self): + """ + 请注意`execute`会执行在不同的线程中,因此您在定义和使用类变量时,应当慎之又慎! + """ + pass + + def define_arg_selection_menu(self): + """ + 定义插件的二级选项菜单 + """ + gui_definition = { + "main_input": + ArgProperty(title="路径", description="程序路径(上传文件后自动填写)", default_value="", type="string").model_dump_json(), # 主输入,自动从输入框同步 + "use_chinese": + ArgProperty(title="注释语言", options=["英文", "中文"], default_value="英文", description="无", type="dropdown").model_dump_json(), + # "use_emoji": + # ArgProperty(title="在注释中使用emoji", options=["禁止", "允许"], default_value="禁止", description="无", type="dropdown").model_dump_json(), + } + return gui_definition + + def execute(txt, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, user_request): + """ + 执行插件 + """ + if plugin_kwargs["use_chinese"] == "中文": + plugin_kwargs["use_chinese"] = True + else: + plugin_kwargs["use_chinese"] = False + + yield from 注释Python项目(txt, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, user_request) diff --git a/crazy_functions/agent_fns/python_comment_agent.py b/crazy_functions/agent_fns/python_comment_agent.py index dd4b6ce856..9f19b17e40 100644 --- a/crazy_functions/agent_fns/python_comment_agent.py +++ b/crazy_functions/agent_fns/python_comment_agent.py @@ -68,6 +68,7 @@ 1. You must NOT modify the indent of code. 2. You are NOT authorized to change or translate non-comment code, and you are NOT authorized to add empty lines either, toggle qu. 3. Use {LANG} to add comments and docstrings. Do NOT translate Chinese that is already in the code. +4. Besides adding a docstring, use the ⭐ symbol to annotate the most core and important line of code within the function, explaining its role. ------------------ Example ------------------ INPUT: @@ -116,10 +117,66 @@ def zip_result(folder): ''' +revise_funtion_prompt_chinese = ''' +您需要阅读以下代码,并根据以下说明修订源代码({FILE_BASENAME}): +1. 如果源代码中包含函数的话, 你应该分析给定函数实现了什么功能 +2. 如果源代码中包含函数的话, 你需要为函数添加docstring, docstring必须使用中文 + +请注意: +1. 你不得修改代码的缩进 +2. 你无权更改或翻译代码中的非注释部分,也不允许添加空行 +3. 使用 {LANG} 添加注释和文档字符串。不要翻译代码中已有的中文 +4. 除了添加docstring之外, 使用⭐符号给该函数中最核心、最重要的一行代码添加注释,并说明其作用 + +------------------ 示例 ------------------ +INPUT: +``` +L0000 | +L0001 |def zip_result(folder): +L0002 | t = gen_time_str() +L0003 | zip_folder(folder, get_log_folder(), f"result.zip") +L0004 | return os.path.join(get_log_folder(), f"result.zip") +L0005 | +L0006 | +``` + +OUTPUT: + + +该函数用于压缩指定文件夹,并返回生成的`zip`文件的路径。 + + +``` +def zip_result(folder): + """ + 该函数将指定的文件夹压缩成ZIP文件, 并将其存储在日志文件夹中。 + + 输入参数: + folder (str): 需要压缩的文件夹的路径。 + 返回值: + str: 日志文件夹中创建的ZIP文件的路径。 + """ + t = gen_time_str() + zip_folder(folder, get_log_folder(), f"result.zip") # ⭐ 执行文件夹的压缩 + return os.path.join(get_log_folder(), f"result.zip") +``` + +------------------ End of Example ------------------ + + +------------------ the real INPUT you need to process NOW ({FILE_BASENAME}) ------------------ +``` +{THE_CODE} +``` +{INDENT_REMINDER} +{BRIEF_REMINDER} +{HINT_REMINDER} +''' + class PythonCodeComment(): - def __init__(self, llm_kwargs, language) -> None: + def __init__(self, llm_kwargs, plugin_kwargs, language, observe_window_update) -> None: self.original_content = "" self.full_context = [] self.full_context_with_line_no = [] @@ -127,7 +184,13 @@ def __init__(self, llm_kwargs, language) -> None: self.page_limit = 100 # 100 lines of code each page self.ignore_limit = 20 self.llm_kwargs = llm_kwargs + self.plugin_kwargs = plugin_kwargs self.language = language + self.observe_window_update = observe_window_update + if self.language == "chinese": + self.core_prompt = revise_funtion_prompt_chinese + else: + self.core_prompt = revise_funtion_prompt self.path = None self.file_basename = None self.file_brief = "" @@ -258,7 +321,7 @@ def tag_code(self, fn, hint): hint_reminder = "" if hint is None else f"(Reminder: do not ignore or modify code such as `{hint}`, provide complete code in the OUTPUT.)" self.llm_kwargs['temperature'] = 0 result = predict_no_ui_long_connection( - inputs=revise_funtion_prompt.format( + inputs=self.core_prompt.format( LANG=self.language, FILE_BASENAME=self.file_basename, THE_CODE=code, @@ -348,6 +411,7 @@ def begin_comment_source_code(self, chatbot=None, history=None): try: # yield from update_ui_lastest_msg(f"({self.file_basename}) 正在读取下一段代码片段:\n", chatbot=chatbot, history=history, delay=0) next_batch, line_no_start, line_no_end = self.get_next_batch() + self.observe_window_update(f"正在处理{self.file_basename} - {line_no_start}/{len(self.full_context)}\n") # yield from update_ui_lastest_msg(f"({self.file_basename}) 处理代码片段:\n\n{next_batch}", chatbot=chatbot, history=history, delay=0) hint = None diff --git a/crazy_functions/ast_fns/comment_remove.py b/crazy_functions/ast_fns/comment_remove.py index 1c482afd17..b37c90e0c1 100644 --- a/crazy_functions/ast_fns/comment_remove.py +++ b/crazy_functions/ast_fns/comment_remove.py @@ -1,39 +1,47 @@ -import ast - -class CommentRemover(ast.NodeTransformer): - def visit_FunctionDef(self, node): - # 移除函数的文档字符串 - if (node.body and isinstance(node.body[0], ast.Expr) and - isinstance(node.body[0].value, ast.Str)): - node.body = node.body[1:] - self.generic_visit(node) - return node - - def visit_ClassDef(self, node): - # 移除类的文档字符串 - if (node.body and isinstance(node.body[0], ast.Expr) and - isinstance(node.body[0].value, ast.Str)): - node.body = node.body[1:] - self.generic_visit(node) - return node - - def visit_Module(self, node): - # 移除模块的文档字符串 - if (node.body and isinstance(node.body[0], ast.Expr) and - isinstance(node.body[0].value, ast.Str)): - node.body = node.body[1:] - self.generic_visit(node) - return node - - -def remove_python_comments(source_code): - # 解析源代码为 AST - tree = ast.parse(source_code) - # 移除注释 - transformer = CommentRemover() - tree = transformer.visit(tree) - # 将处理后的 AST 转换回源代码 - return ast.unparse(tree) +import token +import tokenize +import copy +import io + + +def remove_python_comments(input_source: str) -> str: + source_flag = copy.copy(input_source) + source = io.StringIO(input_source) + ls = input_source.split('\n') + prev_toktype = token.INDENT + readline = source.readline + + def get_char_index(lineno, col): + # find the index of the char in the source code + if lineno == 1: + return len('\n'.join(ls[:(lineno-1)])) + col + else: + return len('\n'.join(ls[:(lineno-1)])) + col + 1 + + def replace_char_between(start_lineno, start_col, end_lineno, end_col, source, replace_char, ls): + # replace char between start_lineno, start_col and end_lineno, end_col with replace_char, but keep '\n' and ' ' + b = get_char_index(start_lineno, start_col) + e = get_char_index(end_lineno, end_col) + for i in range(b, e): + if source[i] == '\n': + source = source[:i] + '\n' + source[i+1:] + elif source[i] == ' ': + source = source[:i] + ' ' + source[i+1:] + else: + source = source[:i] + replace_char + source[i+1:] + return source + + tokgen = tokenize.generate_tokens(readline) + for toktype, ttext, (slineno, scol), (elineno, ecol), ltext in tokgen: + if toktype == token.STRING and (prev_toktype == token.INDENT): + source_flag = replace_char_between(slineno, scol, elineno, ecol, source_flag, ' ', ls) + elif toktype == token.STRING and (prev_toktype == token.NEWLINE): + source_flag = replace_char_between(slineno, scol, elineno, ecol, source_flag, ' ', ls) + elif toktype == tokenize.COMMENT: + source_flag = replace_char_between(slineno, scol, elineno, ecol, source_flag, ' ', ls) + prev_toktype = toktype + return source_flag + # 示例使用 if __name__ == "__main__":