您的位置:首页 > 技术中心 > 其他 >

如何使用Python实现视频去重的小工具

时间:2023-04-19 17:36

同级目录下新建dup_video

import jsonimport osimport shutilimport cv2import imagehashfrom PIL import Imagefrom loguru import loggerfrom PySimpleGUI import popup_get_folderclass VideoDuplicate(object):    '''    返回整个视频的图片指纹列表    从1秒开始,每3秒抽帧,计算一张图像指纹    '''    def __init__(self):        self._over_length_video: list = []        self._no_video: list = []    def _video_hash(self, video_path) -> list:        '''        @param video_path -> 视频绝对路径;        '''        hash_arr = []        cap = cv2.VideoCapture(video_path)  ##打开视频文件        logger.info(f'开始抽帧【{video_path}】')        n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))  # 视频的帧数        logger.warning(f'视频帧数:{n_frames}')        fps = cap.get(cv2.CAP_PROP_FPS)  # 视频的帧率        logger.warning(f'视频帧率:{fps}')        dur = n_frames / fps * 1000  # 视频大致总长度        cap_set = 1000        logger.warning(f'视频大约总长:{dur / 1000}')        if dur // 1000 > 11:            logger.error(f'视频时长超出规定范围【6~10】;当前时长:【{dur // 1000}】;跳过该视频;')            self._over_length_video.append(video_path)            return []        while cap_set < dur:  # 从3秒开始,每60秒抽帧,计算图像指纹。总长度-3s,是因为有的时候计算出来的长度不准。            cap.set(cv2.CAP_PROP_POS_MSEC, cap_set)            logger.debug(f'开始提取:【{cap_set // 1000}】/s的图片;')            # 返回该时间点的,图像(numpy数组),及读取是否成功            success, image_np = cap.read()            if success:                img = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB))  # 转成cv图像格式                h = str(imagehash.dhash(img))                logger.success(f'【{cap_set}/s图像指纹:【{h}】')                hash_arr.append(h)  # 图像指纹            else:                logger.error(str(cap_set / 1000))            cap_set += 1000 * 2        cap.release()  # 释放视频        return hash_arr    def start(self, base_dir):        '''        @param base_dir -> 主文件路径;        '''        data: list = []        for video in os.listdir(base_dir):            logger.debug(f'-' * 80)            name, ext = os.path.splitext(video)            if ext not in ('.mp4', '.MP4'):                logger.error(f'视频文件格式不符;【{video}】;执行跳过;')                continue            abs_video_path = os.path.join(base_dir, video)            video_hash_list = self._video_hash(abs_video_path)            if video_hash_list:                data.append({'video_abs_path': abs_video_path, 'hash': video_hash_list})        self._write_log(data)        return data    @staticmethod    def _write_log(data: list) -> None:        '''视频哈希后的值写入日志文件'''        with open(f'log.txt', 'w+', encoding='utf-8') as f:            f.write(json.dumps(data))    def __call__(self, base_dir, *args, **kwargs):        self.start(base_dir)        logger.debug(f'-----------------------------------开始比对关键帧差值感知余弦算法-----------------------------')        with open('log.txt') as f:            data = json.loads(f.read())            for i in range(0, len(data) - 1):                for j in range(i + 1, len(data)):                    if data[i]['hash'] == data[j]['hash']:                        _, filename = os.path.split(data[i]['video_abs_path'])                        logger.error(f'移动文件:【{filename}】')                        shutil.move(                            os.path.join(base_dir, filename),                            os.path.join(os.path.join(os.getcwd(), 'dup_video'), filename)                        )        logger.warning('---------------------超长视频----------------------')        for i in self._over_length_video:            _, name = os.path.split(i)            logger.error(name)def main():    path = popup_get_folder('请选择[视频去重]文件夹')    v = VideoDuplicate()    v(path)if __name__ == '__main__':    main()

方法补充

python+opencv抽取视频帧并去重

import os import sysimport cv2import globimport jsonimport numpy as npimport skimagefrom skimage import metricsimport hashlibprint(skimage.__version__)def load_json(json_file):    with open(json_file) as fp:        data = json.load(fp)    return data['outputs']def ssim_dis(im1, im2):    ssim = metrics.structural_similarity(im1, im2, data_range=255, multichannel=True)    return ssim# cv2def isdarkOrBright(grayImg, thre_dark=10, thre_bright=230):    mean = np.mean(grayImg)    if mean < thre_dark or mean > thre_bright:        return True     else:        return Falsedef get_file_md5(file_name):    """    caculate md5    : param file_name    : return md5    """    m = hashlib.md5()    with open(file_name, 'rb') as fobj:        while True:            data = fobj.read(4096)            if not data:                break            m.update(data)    return m.hexdigest()def extract_frame(video_path, save_dir, prefix, ssim_thre=0.90):    count = 0    md5set = {}    last_frame = None    cap = cv2.VideoCapture(video_path)    fps = cap.get(cv2.CAP_PROP_FPS)    index = 0    tmp_frames = []    while cap.isOpened():        frameState, frame = cap.read()        if not frameState or frame is None:            break        grayImg = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)        # if isdarkOrBright(grayImg):        #     index += 1        #     continue        assert cv2.imwrite('tmp.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 100])        md5 = get_file_md5('tmp.jpg')        if md5 in md5set:            md5set[md5] += 1            index += 1            continue        md5set[md5] = 1                save_path = os.path.join(save_dir, prefix+'_'+str(index).rjust(4, '0')+'.jpg')        if last_frame is None:            if cv2.imwrite(save_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 100]):                count += 1                last_frame = frame                tmp_frames.append(frame)        else:            dis = ssim_dis(last_frame, frame)            if dis <= ssim_thre:                save_frame = tmp_frames[len(tmp_frames)//2]                if cv2.imwrite(save_path, save_frame, [cv2.IMWRITE_JPEG_QUALITY, 100]):                    count += 1                    last_frame = frame                    tmp_frames = [frame]            else:                tmp_frames.append(frame)        index += 1    cap.release()    return count                if __name__ == '__main__':    import sys    video_path = "videos/***.mp4"    video_name = video_path.split("/")[-1]    prefix = video_name[:-4]    save_imgs_dir = prefix    if not os.path.exists(save_imgs_dir):        os.mkdir(save_imgs_dir)    N = extract_frame(video_path, save_imgs_dir, prefix)    print(video_name, N)

对图片,视频,文件进行去重

import osfrom tkinter import *from tkinter import messageboximport tkinter.filedialogroot=Tk()root.title("筛选重复的视频和照片")root.geometry("500x500+500+200")def wbb():      a=[]      c={}      filename=tkinter.filedialog.askopenfilenames()                  for i in filename:            with open(i,'rb') as f:                  a.append(f.read())      for j in range(len(a)):            c[a[j]]=filename[j]      filename1=tkinter.filedialog.askdirectory()           if filename1!="":            p=1            lb1.config(text=filename1+"下的文件为:")            for h in c:                k=c[h].split(".")[-1]                with open(filename1+"/"+str(p)+"."+k,'wb') as f:                      f.write(h)                p=p+1                  for g in os.listdir(filename1):                  txt.insert(END,g+'
')                        else:            messagebox.showinfo("提示",message ='请选择路径')frame1=Frame(root,relief=RAISED)frame1.place(relx=0.0)frame2=Frame(root,relief=GROOVE)frame2.place(relx=0.5)lb1=Label(frame1,text="等等下面会有变化?",font=('华文新魏',13))lb1.pack(fill=X)    txt=Text(frame1,width=30,height=50,font=('华文新魏',10))txt.pack(fill=X)        lb=Label(frame2,text="点我选择要进行筛选的文件:",font=('华文新魏',10))lb.pack(fill=X)                                          btn=Button(frame2,text="请选择要进行筛选的文件",fg='black',relief="raised",bd="9",command=wbb)btn.pack(fill=X)root.mainloop()

效果图

以上就是如何使用Python实现视频去重的小工具的详细内容,更多请关注Gxl网其它相关文章!

热门排行

今日推荐

热门手游