HCY Blog

AIOps的Python实战学习

Python学习

根据实际项目对python知识补漏的学习笔记

目录

AIOps Python实战学习

main.py

  1. 导入 & 项目根目录

pandas用来读csv;

OrderedDict(来自 typing,不过更常用的是 collections.OrderedDict)用来构造有序字典;

project_root 获取当前文件所在目录,用于拼接 input/output/ 的路径。

掌握点:

  • 模块导入:import ...,from ... import ...
  • os.path用法(dirname,abspath,join)
  1. 终止条件
max_messages_termination = MaxMessageTermination(max_messages=20)
termination = max_messages_termination

这两行是为 GraphFlow / GroupChat 设置一个终止条件:

对话最多 20 条消息,超过就停。

  1. 主函数 main()(异步)
async def main():
    input_path = os.path.join(project_root, 'input', 'input_timestamp.csv')
    df_input_timestamp = pd.read_csv(input_path, encoding='utf-8')
    
    for index, row in df_input_timestamp.iterrows():

        if index < 32:
            continue

这是一个 async def,说明里面会有 await 调用(例如调用 LLM 的异步接口); pd.read_csv 把 CSV 读成一个 DataFramedf_input_timestamp.iterrows() 逐行遍历: index 是行号; row 是这一行的数据(Series)。 if index < 32: continue:跳过前 32 行,只处理后面的 case。

掌握点:

  • for index, row in df.iterrows() 的用法;
  • continue 的含义(跳过本次循环剩余部分)。
  1. 从每一行取字段
        print(">>" * 100)
        print(f"index: {index}")

        start_timestamp = row['start_timestamp']
        end_timestamp = row['end_timestamp']
        start_time_hour = row['start_time_hour']
        uuid = row['uuid']
  • 使用 row['列名'] 取出这一行的几个关键字段;
  • f"index: {index}"f-string,格式化输出。
  1. 日志 refinement + 日志 LLM 提炼
        refined_logs = log_refinement(start_time_hour, start_timestamp, end_timestamp)
        if refined_logs is not None:
            print('//' * 20)
            refined_logs = await logs_agent.run(task=f"请提炼出以下日志中对故障诊断最关键、最有价值的日志:\n{refined_logs}")
            refined_logs = refined_logs.messages[-1].content
        else:
            refined_logs = None
        print('logs refinement completed!')
  • log_refinement(...) 返回某个时间窗口的日志内容(已经过预筛);

如果不是 None:

使用 `logs_agent.run(task=...)` 让日志 Agent 调用 LLM,拿到一个对话结果对象(里面有 messages 列表)。

`refined_logs.messages[-1].content` 取最后一条消息(LLM 的最终回答)的文本内容。

否则就保持 None。

掌握点

  • await 的含义(协程调用);
  • 对象属性访问:foo.bar.baz;
  • 列表索引 [-1] 表示最后一个元素。
  1. trace refinement + Agent 提炼
        refined_traces, trace_unique_dict, status_combinations_csv = trace_refinement(start_time_hour, start_timestamp, end_timestamp)
        if refined_traces is not None or status_combinations_csv is not None:
            print('//' * 20)
            refined_traces = await traces_agent.run(task=f"请提炼出以下trace中对故障诊断最关键、最有价值的traces:\n{refined_traces}\n{status_combinations_csv}")
            refined_traces = refined_traces.messages[-1].content
        else:
            refined_traces = None
        print('traces refinement completed!')

这里 trace_refinement 返回三个东西:

  • refined_traces:预处理后的 trace;
  • trace_unique_dict:可能是去重后的 trace 信息(这里没再用);
  • status_combinations_csv:某些状态组合的信息(转成文本?)。 如果两者之一不为空,就拼接起来扔给 traces_agent 处理。
  1. metrics refinement + Agent 提炼

  1. 构造多模态分析 prompt
        multimodal_prompt = get_multimodal_analysis_prompt(
            log_data=refined_logs ,
            trace_data=refined_traces ,
            metric_data=refined_metrics
        )

这个函数应该是把三个模态的信息按照某个模板组合成一个大的任务描述,给 orchestrator agent用。

  1. 构建 Agent 对话图(Graph)
        builder = DiGraphBuilder()
        builder.add_node(orchestration_agent)
        builder.add_node(ad_agent).add_node(ft_agent).add_node(rcl_agent)
        builder.add_node(reflection_agent).add_node(summarization_agent)

        builder.add_edge(orchestration_agent, ad_agent)
        builder.add_edge(orchestration_agent, ft_agent).add_edge(ad_agent, ft_agent)
        builder.add_edge(orchestration_agent, rcl_agent).add_edge(ft_agent, rcl_agent)
        builder.add_edge(ad_agent, reflection_agent).add_edge(ft_agent, reflection_agent).add_edge(rcl_agent, reflection_agent)
        builder.add_edge(reflection_agent, orchestration_agent, condition=lambda msg: "APPROVE" not in msg.to_model_text())
        builder.add_edge(reflection_agent, summarization_agent, condition=lambda msg: "APPROVE" in msg.to_model_text())

        builder.set_entry_point(orchestration_agent)
        graph = builder.build()

这里使用 DiGraphBuilder 构造一个有向图:

  • 节点是不同的 agent
  • 边表示“谁把消息发给谁”;
  • 最后两条边用 condition 决定从 reflection 分流到 orchestrationsummarization
    • 如果消息里不含 "APPROVE":回到 orchestrator 继续迭代;
    • 如果含 "APPROVE":进入 summarization,总结最终答案。

掌握点

  • lambda 表达式 lambda msg: ...
  • 链式调用 builder.add_edge(...).add_edge(...) 是什么含义(返回 builder 自己)。
  1. 创建 GraphFlow 团队并运行
        team = GraphFlow(
            participants = [orchestration_agent, ad_agent, ft_agent, rcl_agent, reflection_agent, summarization_agent],
            graph = graph,
            termination_condition=termination,
        )
        await team.reset()

        # await Console(team.run_stream(task=f"{multimodal_prompt}"))
        respose = await team.run(task=f"{multimodal_prompt}")
  • GraphFlow 用刚才的图 + agent 列表构建一个团队;
  • await team.reset() 重置对话状态;
  • team.run(task=...) 运行任务一次,并得到一个包含消息历史的结果对象。 (注:respose 这个变量名有点小拼写错误,正常应该是 response,但不影响运行。)
  1. 从结果里抓 JSON,解析并写文件
        result = re.search(r'(\{.*\})', respose.messages[-1].content, re.DOTALL)
        if result:
            result = result.group(1)
        else:
            result = None
    
        json_result = json.loads(result)
        result_data = OrderedDict()
        result_data["component"] = json_result.get("component", "")
        result_data["uuid"] = uuid
        result_data["reason"] = json_result.get("reason", "")
        result_data["reasoning_trace"] = json_result.get("reasoning_trace", [])

        result_list_path = os.path.join(project_root, 'output', 'results_list.json')
        with open(result_list_path, 'a', encoding='utf-8') as f:
            json.dump(result_data, f)   
            f.write('\n')   
        print(f"第{index+1}条数据处理完成")
  • respose.messages[-1].content:取最后一条消息文本;
  • re.search(r'(\{.*\})', ..., re.DOTALL)
    • 用正则在这段文本里找一个“从第一个 { 到最后一个 } 的大块字符串”;
    • re.DOTALL 让 . 能匹配换行符。
  • json.loads(result):把 JSON 字符串变成 Python 字典;
  • OrderedDict():按顺序插入字段;
  • 写入文件时用 ‘a’ 模式,表示“追加写入”,每一行写一个 JSON + 换行。

掌握点

  • 正则表达式 re.search 的基本用法;
  • json.loads / json.dump;
  • with open(…) as f: 的上下文管理器语法。
  1. 程序入口
if __name__ == "__main__":
    asyncio.run(main())

这段是 Python 脚本的常见写法:

- 只有当这个文件被“直接运行”时才会执行 `main()`;
- 如果被别的文件 `import` 时,这一块不会执行。

asyncio.run(main()) 负责跑异步的主函数。

List 1

  1. 基本语法与数据结构
  • for 循环:特别是 for index, row in df.iterrows():
  • if/else + continue
  • 列表索引、字典取值(dict.get())
  1. 文件与路径操作
  • os.path.join, os.path.dirname, os.path.abspath
  • with open(path, ‘a’, encoding=‘utf-8’) as f: 写文件
  1. pandas
  • pd.read_csv 读表
  • df.iterrows() 遍历
  • 用 row[‘col’] 访问字段
  1. 异步与协程(了解用法即可,不必深挖原理)
  • async def / await
  • asyncio.run(main())
  1. JSON 与正则
  • json.loads / json.dump 基本用法
  • re.search(pattern, text, flags):找出匹配结果,用 .group(1) 取括号里的子串
  1. 面向对象的简单用法(通过 Agent 对象理解)
  • 对象的方法调用:logs_agent.run(…)
  • 对象属性:respose.messages[-1].content