您的位置:首页 > 技术中心 > 其他 >

Python如何实现关键路径和七格图计算

时间:2023-04-26 21:08

1.主程序

主程序主要实现了一个Project类,其中包含了计算关键路径和七格图的方法。具体实现方式如下:

1. 定义了一个Activity类,包含了活动的id、名称、持续时间和紧前任务列表等属性。

2. 定义了一个Project类,包含了活动列表、项目持续时间、日志等属性,以及计算关键路径、计算七格图、计算总浮动时间、计算自由浮动时间等方法。

3. 从JSON文件中读取活动信息,并创建Project对象并添加活动。

4. 调用Project对象的calculate方法,计算每个活动的最早开始时间、最晚开始时间等数据。

5. 调用Project对象的calculate_critical_path方法,计算关键路径。

6. 调用Project对象的calculate_project_duration方法,计算项目总工期。

7. 使用Jinja2模板引擎生成项目的活动清单,并将关键路径

import json  from datetime import datetime  from typing import List    import graphviz  from jinja2 import Template    from activity import Activity      class Project:      def __init__(self):          self.activities: List[Activity] = []          self.duration = 0          self.logger = []        def log(self, log: str) -> None:          self.logger.append(log)        def add_activity(self, activity: Activity) -> None:          """          添加一个活动到项目中          :param          activity: 待添加的活动          """        # 将活动添加到项目中          self.activities.append(activity)        def calculate(self) -> None:          """ 计算整个项目的关键信息          :return: None          """        self.calculate_successor()          self._calculate_forward_pass()  # 计算正推法          self._calculate_backward_pass()  # 计算倒推法          self._calculate_total_floats()  # 计算总浮动时间          self._calculate_free_floats()  # 计算自由浮动时间        def calculate_successor(self) -> None:          self.log("开始计算紧后活动")          for act in self.activities:              for pred in act.predecessors:                  for act_inner in self.activities:                      if act_inner.id == pred:                          act_inner.successors.append(act.id)        def _calculate_forward_pass(self) -> None:          self.log("## 开始正推法计算")          # 进入 while 循环,只有当所有活动的最早开始时间和最早完成时间都已经计算出来时,才会退出循环          while not self._is_forward_pass_calculated():              # 遍历每个活动              for activity in self.activities:                  # 如果活动的最早开始时间已经被计算过,则跳过                  if activity.est is not None:                      continue                  # 如果活动没有前置活动, 则从1开始计算最早开始时间和最早结束时间                  if not activity.predecessors:                      activity.est = 1                      activity.eft = activity.est + activity.duration - 1                      self.log(                          f"活动 {activity.name} 没有紧前活动,设定最早开始时间为1, 并根据工期计算最早结束时间为{activity.eft}")                  else:                      # 计算当前活动的所有前置活动的最早完成时间                      predecessors_eft = [act.eft for act in self.activities if                                          act.id in activity.predecessors and act.eft is not None]                      # 如果当前活动的所有前置活动的最早完成时间都已经计算出来,则计算当前活动的最早开始时间和最早完成时间                      if len(predecessors_eft) == len(activity.predecessors):                          activity.est = max(predecessors_eft) + 1                          activity.eft = activity.est + activity.duration - 1                          self.log(                              f"活动 {activity.name} 紧前活动已完成正推法计算, 开始日期按最早开始时间里面最大的," +                              f"设定为{activity.est}并根据工期计算最早结束时间为{activity.eft}")                            # 更新项目总持续时间为最大最早完成时间          self.duration = max([act.eft for act in self.activities])        def _calculate_backward_pass(self) -> None:          """ 计算倒推法          :return: None          """          self.log("## 开始倒推法计算")  # 输出提示信息          # 进入 while 循环,只有当所有活动的最晚开始时间和最晚完成时间都已经计算出来时,才会退出循环          while not self._is_backward_pass_calculated():              # 遍历每个活动              for act in reversed(self.activities):                  # 如果活动的最晚开始时间已经被计算过,则跳过                  if act.lft is not None:                      continue                  # 如果活动没有后继活动, 则从总持续时间开始计算最晚开始时间和最晚结束时间                  if not act.successors:                      act.lft = self.duration                      act.lst = act.lft - act.duration + 1                      self.log(f"活动 {act.name} 没有紧后活动,按照正推工期设定最晚结束时间为{act.lft}," +                               f"并根据工期计算最晚开始时间为{act.lst}")                  else:                      # 计算当前活动的所有后继活动的最晚开始时间                      successors_lst = self._calculate_lst(act)                      # 如果当前活动的所有后继活动的最晚开始时间都已经计算出来,则计算当前活动的最晚开始时间和最晚完成时间                      if len(successors_lst) == len(act.successors):                          act.lft = min(successors_lst) - 1                          act.lst = act.lft - act.duration + 1                          self.log(f"活动 {act.name} 紧后活动计算完成,按照倒推工期设定最晚结束时间为{act.lft}," +                                   f"并根据工期计算最晚开始时间为{act.lst}")          # 更新项目总持续时间为最大最晚完成时间          self.duration = max([act.lft for act in self.activities])        def _calculate_lst(self, activity: Activity) -> List[int]:          """计算某一活动的所有最晚开始时间          :param activity: 活动对象          :return: 最晚开始时间列表          """        rst = []  # 初始化结果列表          for act in activity.successors:  # 遍历该活动的后继活动              for act2 in self.activities:  # 遍历所有活动                  if act2.id == act and act2.lst is not None:  # 如果找到了该后继活动且其最晚开始时间不为空                      rst.append(act2.lst)  # 将最晚开始时间加入结果列表          return rst  # 返回结果列表        def _is_forward_pass_calculated(self) -> bool:          """ 判断整个项目正推法计算已经完成          :return: 若已计算正向传递则返回True,否则返回False          """          for act in self.activities:  # 遍历所有活动              if act.est is None or act.eft is None:  # 如果该活动的最早开始时间或最早完成时间为空                  return False  # 则返回False,表示还未计算正向传递          return True  # 如果所有活动的最早开始时间和最早完成时间都已计算,则返回True,表示已计算正向传递        def _is_backward_pass_calculated(self) -> bool:          """ 判断整个项目倒推法计算已经完成          :return: 若已计算倒推法则返回True,否则返回False          """        for act in self.activities:  # 遍历所有活动              if act.lst is None or act.lft is None:  # 如果该活动的最晚开始时间或最晚完成时间为空                  return False  # 则返回False,表示还未计算倒推法          return True  # 如果所有活动的最晚开始时间和最晚完成时间都已计算,则返回True,表示已计算倒推法        def _calculate_total_floats(self) -> None:          """ 计算所有活动的总浮动时间          :return: None           """          self.log(f"## 开始计算项目所有活动的总浮动时间")          for act in self.activities:  # 遍历所有活动              if act.est is not None and act.lst is not None:  # 如果该活动的最早开始时间和最晚开始时间都已计算                  act.tf = act.lst - act.est  # 则计算该活动的总浮动时间                  self.log(f"计算{act.name}的总浮动时间" + f"最晚开始时间{act.lst} - 最早开始时间{act.est} = {act.tf}", )              else:  # 如果该活动的最早开始时间或最晚开始时间为空                  act.tf = None  # 则将该活动的总浮动时间设为None        def _calculate_free_floats(self) -> None:          """ 计算所有活动的自由浮动时间          :return: None          """        self.log(f"## 开始计算项目所有活动的自由浮动时间")  # 输出提示信息          for act in self.activities:  # 遍历所有活动              if act.tf == 0:  # 如果该活动的总浮动时间为0                  self.log(f"计算{act.name}的自由浮动时间" + f"因为{act.name}的总浮动时间为0,自由浮动时间为0")  # 输出提示信息                  act.ff = 0  # 则将该活动的自由浮动时间设为0              elif act.tf > 0:  # 如果该活动的总浮动时间大于0                  self.log(f"计算{act.name}的自由浮动时间")  # 输出提示信息                  self.log(f"- {act.name}的总浮动时间{act.tf} > 0,")  # 输出提示信息                  tmp = []  # 初始化临时列表                  for act2 in self.activities:  # 遍历所有活动                      if act2.id in act.successors:  # 如果该活动是该活动的紧后活动                          self.log(f"- {act.name}的紧后活动{act2.name}的自由浮动动时间为{act2.tf}")  # 输出提示信息                          tmp.append(act2.tf)  # 将该紧后活动的自由浮动时间加入临时列表                  if len(tmp) != 0:  # 如果临时列表不为空                      act.ff = act.tf - max(tmp)  # 则计算该活动的自由浮动时间                      if act.ff < 0:                          act.ff = 0                      self.log(f"- 用活动自己的总浮动{act.tf}减去多个紧后活动总浮动的最大值{max(tmp)} = {act.ff}")                  else:  # 如果临时列表为空                      act.ff = act.tf  # 则将该活动的自由浮动时间设为总浮动时间        def calculate_critical_path(self) -> List[Activity]:          """ 计算整个项目的关键路径          :return: 整个项目的关键路径          """        ctc_path = []  # 初始化关键路径列表          for act in self.activities:  # 遍历所有活动              if act.tf == 0:  # 如果该活动的总浮动时间为0                  ctc_path.append(act)  # 则将该活动加入关键路径列表          return ctc_path  # 返回关键路径列表        def calculate_project_duration(self) -> int:          """ 计算整个项目的持续时间          :return: 整个项目的持续时间          """        return max(activity.eft for activity in self.activities)  # 返回所有活动的最早完成时间中的最大值,即整个项目的持续时间      # 从JSON文件中读取活动信息  with open('activities.json', 'r', encoding='utf-8') as f:      activities_data = json.load(f)    # 创建Project对象并添加活动  project = Project()  for activity_data in activities_data:      activity = Activity(          activity_data['id'],          activity_data['name'],          activity_data['duration'],          activity_data['predecessors']      )    project.add_activity(activity)    # 计算每个活动的最早开始时间、最晚开始时间等数据  project.calculate()    # 计算关键路径和项目总工期  critical_path = project.calculate_critical_path()  project_duration = project.calculate_project_duration()    # 生成项目的活动清单  with open('template.html', 'r', encoding='utf-8') as f:      template = Template(f.read())  html = template.render(      activities=project.activities,      critical_path=critical_path,      project_duration=project_duration,      log=project.logger  )    # 生成项目进度网络图  aon_graph = graphviz.Digraph(format='png', graph_attr={'rankdir': 'LR'})  for activity in project.activities:      aon_graph.node(str(activity.id), activity.name)      for predecessor in activity.predecessors:          aon_graph.edge(str(predecessor), str(activity.id))    timestamp = datetime.now().strftime('%Y%m%d%H%M%S')    aon_filename = f"aon_{timestamp}"    aon_graph.render(aon_filename)    # 将项目进度网络图插入到HTML文件中  aon_image = f'<img src="{aon_filename}.png" alt="Precedence Diagramming Method: AON">'  html = html.replace('<p>Precedence Diagramming Method: AON: <br/>[image]</p>',                      '<p>紧前关系绘图法: AON: <br/>' + aon_image + '</p>')    filename = datetime.now().strftime('%Y%m%d%H%M%S') + '.html'  with open(filename, 'w', encoding='utf-8') as f:      f.write(html)

2.活动类

程序名:activity.py

class Activity:      """      活动类,用于表示项目中的一个活动。        Attributes:        id (int): 活动的唯一标识符。          name (str): 活动的名称。          duration (int): 活动的持续时间。          predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。          est (int): 活动的最早开始时间。          lst (int): 活动的最晚开始时间。          eft (int): 活动的最早完成时间。          lft (int): 活动的最晚完成时间。          tf (int): 活动的总浮动时间。          ff (int): 活动的自由浮动时间。          successors (List[int]): 活动的后继活动列表,存储后继活动的Activity对象。      """      def __init__(self, id: int, name: str, duration: int, predecessors: List[int]):          """          初始化活动对象。            Args:            id (int): 活动的唯一标识符。              name (str): 活动的名称。              duration (int): 活动的持续时间。              predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。          """        self.id = id          self.name = name          self.duration = duration          self.predecessors = predecessors          self.est = None          self.lst = None          self.eft = None          self.lft = None          self.tf = None          self.ff = None          self.successors = []        def __str__(self):          return f"id: {self.id}, name: {self.name}, est: {self.est}, lst: {self.lst}, eft: {self.eft}, lft: {self.lft},"          + f"successors: {self.successors}"

3.任务列表JSON文件

文件名:activities.json

[    {    "id": 1,      "name": "A",      "duration": 2,      "predecessors": []    },    {      "id": 9,      "name": "A2",      "duration": 3,      "predecessors": []    },      {      "id": 10,      "name": "A3",      "duration": 2,      "predecessors": []    },    {      "id": 2,      "name": "B",      "duration": 3,      "predecessors": [        1,        9      ]    },    {      "id": 3,      "name": "C",      "duration": 4,      "predecessors": [        1      ]    },    {      "id": 4,      "name": "D",      "duration": 2,      "predecessors": [        2,10      ]    },    {      "id": 5,      "name": "E",      "duration": 3,      "predecessors": [        2      ]    },    {      "id": 6,      "name": "F",      "duration": 2,      "predecessors": [        3      ]    },    {      "id": 7,      "name": "G",      "duration": 3,      "predecessors": [        4,        5      ]    },    {      "id": 8,      "name": "H",      "duration": 2,      "predecessors": [        6,        7      ]    },    {      "id": 11,      "name": "H2",      "duration": 4,      "predecessors": [        6,        7      ]    }  ]

4.输出模板文件

<!DOCTYPE html>  <html>  <head>      <meta charset="UTF-8">      <title>PMP关键路径计算</title>      <style>        table {              border-collapse: collapse;              width: 100%;          }            th, td {              border: 1px solid black;              padding: 8px;              text-align: center;          }            th {              background-color: #4CAF50;              color: white;          }            .critical {              background-color: #ffcccc;          }      </style>  </head>  <body>  <h2>活动清单</h2>  <table>      <tr>          <th>ID</th>          <th>活动名</th>          <th>持续时间</th>          <th>紧前活动</th>          <th>紧后活动</th>          <th>最早开始时间EST</th>          <th>最早结束时间EFT</th>          <th>最晚开始时间LST</th>          <th>最晚结束时间LFT</th>          <th>总浮动时间TF</th>          <th>自由浮动时间FF</th>      </tr>      {% for activity in activities %}      <tr {% if activity in critical_path %}class="critical" {% endif %}>          <td>{{ activity.id }}</td>          <td>{{ activity.name }}</td>          <td>{{ activity.duration }}</td>          <td>            {% for predecessor in activity.predecessors %}              {% for act in activities %}              {% if act.id == predecessor %}              {{ act.name }}              {% endif %}              {% endfor %}              {% if not loop.last %}, {% endif %}              {% endfor %}          </td>          <td>            {% for successor in activity.successors %}              {% for act in activities %}                {% if act.id == successor %}              {{ act.name }}              {% endif %}              {% endfor %}              {% if not loop.last %}, {% endif %}              {% endfor %}          </td>          <td>{{ activity.est }}</td>          <td>{{ activity.eft }}</td>          <td>{{ activity.lst }}</td>          <td>{{ activity.lft }}</td>          <td>{{ activity.tf }}</td>          <td>{{ activity.ff }}</td>      </tr>      {% endfor %}  </table>  <p>关键路径是: {% for activity in critical_path %}{{ activity.name }}{% if not loop.last %} -> {% endif %}{% endfor      %}</p>  <p>项目总工期: {{ project_duration }}</p>  <p>Precedence Diagramming Method: AON: <br/>[image]</p>  <p>  <table>      <tr>          <th>执行过程</th>      </tr>      {% for i in log %}      <tr>          <td >{{i}}</td>      </tr>      {% endfor %}  </table>  </p>  </body>  </html>

以上就是Python如何实现关键路径和七格图计算的详细内容,更多请关注Gxl网其它相关文章!

热门排行

今日推荐

热门手游