您的位置:首页 > 技术中心 > 运维 >

Python怎么实时获取任务请求对应的Nginx日志

时间:2023-05-17 16:52

需求描述

项目需求测试过程中,需要向Nginx服务器发送一些用例请求,然后查看对应的Nginx日志,判断是否存在特征内容,来判断任务是否执行成功。为了提升效率,需要将这一过程实现自动化。

实践环境

Python 3.6.5

代码设计与实现

#!/usr/bin/env python# -*- coding:utf-8 -*-"""@CreateTime: 2021/06/26 9:05@Author : shouke"""import timeimport threadingimport subprocessfrom collections import dequedef collect_nginx_log():    global nginx_log_queue    global is_tasks_compete    global task_status    args = "tail -0f /usr/local/openresty/nginx/logs/access.log"    while task_status != "req_log_got":        with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc:            log_for_req = ""            outs, errs = "", ""            try:                outs, errs = proc.communicate(timeout=2)            except subprocess.TimeoutExpired:                print("获取nginx日志超时,正在重试")                proc.kill()                try:                    outs, errs = proc.communicate(timeout=5)                except subprocess.TimeoutExpired:                    print("获取nginx日志超时,再次超时,停止重试")                    break            finally:                for line in outs.split(""):                    flag = ""client_ip":"10.118.0.77"" # 特征                    if flag in line: # 查找包含特征内容的日志                        log_for_req += line                if task_status == "req_finished":                    nginx_log_queue.append(log_for_req)                    task_status = "req_log_got"def run_tasks(task_list):    """    运行任务    :param task_list 任务列表    """    global nginx_log_queue    global is_tasks_compete    global task_status    for task in task_list:        thread = threading.Thread(target=collect_nginx_log,                                    name="collect_nginx_log")        thread.start()        time.sleep(1) # 执行任务前,让收集日志线程先做好准备        print("正在执行任务:%s" % task.get("name"))        # 执行Nginx任务请求        # ...        task_status = "req_finished"        time_to_wait = 0.1        while task_status != "req_log_got": # 请求触发的nginx日志收集未完成            time.sleep(time_to_wait)            time_to_wait += 0.01        else:# 获取到用例请求触发的nginx日志            if nginx_log_queue:                nginx_log = nginx_log_queue.popleft()                task_status = "req_ready"                # 解析日志                # do something here                # ...            else:                print("存储请求日志的队列为空")                # do something here                # ...if __name__ == "__main__":    nginx_log_queue = deque()    is_tasks_compete = False # 所有任务是否执行完成    task_status = "req_ready" # req_ready,req_finished,req_log_got  # 存放执行次任务任务的一些状态    print("###########################任务开始###########################")    tast_list = [{"name":"test_task", "other":"..."}]    run_tasks(tast_list)    is_tasks_compete = True    current_active_thread_num = len(threading.enumerate())    while current_active_thread_num != 1:        time.sleep(2)        current_active_thread_num = len(threading.enumerate())    print("###########################任务完成###########################")

注意:

1、上述代码为啥不一步到位,直接 tail -0f /usr/local/openresty/nginx/logs/access.log | grep "特征内容"呢?这是因为这样做无法获取到Nginx的日志

2、实践时发现,第一次执行proc.communicate(timeout=2)获取日志时,总是无法获取,会超时,需要二次获取,并且timeout设置太小时(实践时尝试过设置为1秒),也会导致第二次执行时无法获取Nginx日志。

以上就是Python怎么实时获取任务请求对应的Nginx日志的详细内容,更多请关注Gxl网其它相关文章!

热门排行

今日推荐

热门手游