您的位置:首页 > 博客中心 > 数据库 >

阅读sqlmap源代码,编写burpsuite插件--sqlmapapi

时间:2022-03-15 08:49

burpsuite插件编写---sql injection

0x00 概要

在安全测试过程中,大部分人会使用burpsuite的scanner模块进行测试,可以发现一些浅显的漏洞:比如xss、sql injection、c***f、xxe、Arbitrary file existence disclosure in Act、明文传输等。
说到sql injection,测试人员都会有一种想法是否存在一款自动化工具,可以将某一网站的所有链接都去尝试一边,尽可能的发现所有的sql injection。有了这种想法后大家会去想解决方案,有一种解决方案是编写burpsuite插件。

0x01 为什么sqlmapapi只能检测get请求是否存在注入?

编写burpsuite插件检查sql injection,网上已经存在了很多代码。
技术分享图片
如图中代码一样,网上所有的资料在调用sqlmapapi进行扫描时只能扫描get请求的链接,比如: 对post提交的参数无法进行扫描,开始我一直很迷惑到底是sqlmap没有提供该方法还是前辈们没有写,所以我就查看了一下sqlmap的源代码。
调用sqlmapapi的基本过程:
技术分享图片
接下来我们看看为什么sqlmapapi不能进行post扫描:
1、首先查看sqlmapapi.py文件:

    # Start the client or the server
    if args.server is True:
        server(args.host, args.port, adapter=args.adapter, username=args.username, password=args.password)
    elif args.client is True:
        client(args.host, args.port, username=args.username, password=args.password)
    else:
        apiparser.print_help()

sqlmapapi开启服务后,我们在插件中的请求就是一个客户端,所以我们需要关注 client(args.host, args.port, username=args.username, password=args.password)
技术分享图片
2、cilent函数所在文件:
技术分享图片
3、由于在api.py中存在大量代码,逐行查找很费劲,我们直接搜索def client
技术分享图片
4、分析cilent函数

def client(host=RESTAPI_DEFAULT_ADDRESS, port=RESTAPI_DEFAULT_PORT, username=None, password=None):
    """
    REST-JSON API client
    """

    DataStore.username = username
    DataStore.password = password

    dbgMsg = "Example client access from command line:"
    dbgMsg += "\n\t$ taskid=$(curl http://%s:%d/task/new 2>1 | grep -o -I ‘[a-f0-9]\{16\}‘) && echo $taskid" % (host, port)
    dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/data" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/log" % (host, port)
    logger.debug(dbgMsg)

    addr = "http://%s:%d" % (host, port)
    logger.info("Starting REST-JSON API client to ‘%s‘..." % addr)

    try:
        _client(addr)
    except Exception, ex:
        if not isinstance(ex, urllib2.HTTPError) or ex.code == httplib.UNAUTHORIZED:
            errMsg = "There has been a problem while connecting to the "
            errMsg += "REST-JSON API server at ‘%s‘ " % addr
            errMsg += "(%s)" % ex
            logger.critical(errMsg)
            return

我们查看的重点代码是

    dbgMsg = "Example client access from command line:"
    dbgMsg += "\n\t$ taskid=$(curl http://%s:%d/task/new 2>1 | grep -o -I ‘[a-f0-9]\{16\}‘) && echo $taskid" % (host, port)
    dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/data" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/log" % (host, port)
    logger.debug(dbgMsg)

在 dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"\"}‘ " % (host, port)这一行代码表示开启一个任务扫描,Content-Type: application/json为请求头的部分;http://%s:%d/scan/$taskid/start 开启的具体那个任务;-X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ post请求参数,其中只有url,没有关于post请求的data参数,因此sqlmapapi只能进行get请求的sql injection 检测。

0x02继承IHttpListener接口(方法一),存在缺陷

先上源代码:

from burp import IBurpExtender
from burp import IHttpListener
from java.io import PrintWriter
import re
import urllib
import urllib2
import time
import json
from threading import Thread
import requests

class BurpExtender(IBurpExtender, IHttpListener):

    #
    #implement IBurpExtender
    #
    def    registerExtenderCallbacks(self, callbacks):
        # keep a reference to our callbacks object
        self._callbacks = callbacks

        # set our extension name
        callbacks.setExtensionName("fanyingjie")

        # obtain our output stream
        self._stdout = PrintWriter(callbacks.getStdout(), True)

        self._helpers  = callbacks.getHelpers()

        # register ourselves as an
        callbacks.registerHttpListener(self)

    def processHttpMessage(self,toolFlag,messageIsRequest, messageInfo):
        if(messageIsRequest):
            a=self._helpers.analyzeRequest(messageInfo)
            method=a.getMethod()
            url=str(a.getUrl())
            if(("?" in url) and (method=="GET")):
                self._stdout.println("start")
                t=AutoSqli(target=url,stdout=self._stdout,method=method)
                t.run()

class AutoSqli(Thread):
    def __init__(self,target,stdout,method):
        self.server="http://192.168.159.134:8775"
        self.taskid = ‘‘
        self.target=target
        self.method=method
        self._stdout=stdout
        self.start_time = time.time()

    def task_new(self):
        self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
        self._stdout.println(‘Created new task: ‘ + self.taskid )
        if len(self.taskid) > 0:
            return True
        return False

    def task_delete(self):
        if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
            self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
            return True
        return False

    def scan_start(self):
        headers = {‘Content-Type‘: ‘application/json‘}
        payload = {‘url‘:self.target}
        url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
        #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)

        req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
        t=json.loads(urllib2.urlopen(req).read())
        self._stdout.println("start "+ self.taskid)

        if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
            return True
        return False

    def scan_status(self):
        status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
        if status == ‘running‘:
            return ‘running‘
        if status == ‘terminated‘:
            return ‘terminated‘
        return "error"

    def scan_data(self):

        data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
        if len(data) == 0:
            self._stdout.println(‘not injection:\t‘ + self.target)
            return False
        else:
            self._stdout.println(‘injection:\t‘ + self.target)
            return True

    def scan_kill(self):
        json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
        self._stdout.println("%s kill")%(self.taskid)

    def scan_stop(self):
        json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
        self._stdout.println("%s stop")%(self.taskid)

    def run(self):
        try:
            if not self.task_new():
                return False
            if not self.scan_start():
                return False
            while True:
                if self.scan_status() == ‘running‘:
                    time.sleep(10)
                elif self.scan_status() == ‘terminated‘:
                    break
                else:
                    break
                #print self.target + ":\t" + str(time.time() - self.start_time)
                if time.time() - self.start_time > 500:
                    self.scan_stop()
                    self.scan_kill()
                    break
            self.scan_data()
            #self.task_delete()

        except Exception as e:
            pass

在本插件中继承了IHttpListener接口,继承该接口后,每点击一次链接就会执行一次插件,只有当插件完成执行完成后,才能进行下一步:
技术分享图片
访问链接后,浏览器一直等待回应,继承IHttpListener接口的方法很影响测试效率,不过优点是可以检查出存在注入的链接
接下来对该代码进行简单的讲解一下:

        if(messageIsRequest):  #当包是请求包时执行sql injection检查
            a=self._helpers.analyzeRequest(messageInfo)  #这是burp提供的一个函数,可以从请求包中获取到url method header等
            method=a.getMethod()
            url=str(a.getUrl())
            if(("?" in url) and (method=="GET")): #当请求是get请求和链接中存在参数时进行sql injection 检查
                self._stdout.println("start")
                t=AutoSqli(target=url,stdout=self._stdout,method=method)
                t.run()
class AutoSqli(Thread): 
    def __init__(self,target,stdout,method):
        self.server="http://192.168.159.134:8775"#开启sqlmapapi服务的ip
        self.taskid = ‘‘
        self.target=target
        self.method=method
        self._stdout=stdout
        self.start_time = time.time()

    def task_new(self):#创建一个新的任务,并获取taskid
        self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
        self._stdout.println(‘Created new task: ‘ + self.taskid )
        if len(self.taskid) > 0:
            return True
        return False

    def task_delete(self):#通过taskid删除某一个任务
        if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
            self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
            return True
        return False

    def scan_start(self):#开始一个扫描,传入需要扫描的地址
        headers = {‘Content-Type‘: ‘application/json‘}
        payload = {‘url‘:self.target}
        url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
        #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)

        req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
        t=json.loads(urllib2.urlopen(req).read())
        self._stdout.println("start "+ self.taskid)

        if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
            return True
        return False

    def scan_status(self):#查看是否扫描完成,通过status判断,terminated是扫描完成
        status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
        if status == ‘running‘:
            return ‘running‘
        if status == ‘terminated‘:
            return ‘terminated‘
        return "error"

    def scan_data(self):#获取扫描完成的结果,如果data有值表名存在注入,否则不存在

        data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
        if len(data) == 0:
            self._stdout.println(‘not injection:\t‘ + self.target)
            return False
        else:
            self._stdout.println(‘injection:\t‘ + self.target)
            return True

    def scan_kill(self):
        json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
        self._stdout.println("%s kill")%(self.taskid)

    def scan_stop(self):
        json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
        self._stdout.println("%s stop")%(self.taskid)

    def run(self):
        try:
            if not self.task_new():
                return False
            if not self.scan_start():
                return False
            while True:
                if self.scan_status() == ‘running‘:
                    time.sleep(10)
                elif self.scan_status() == ‘terminated‘:
                    break
                else:
                    break
                #print self.target + ":\t" + str(time.time() - self.start_time)
                if time.time() - self.start_time > 500:
                    self.scan_stop()
                    self.scan_kill()
                    break
            self.scan_data()
            #self.task_delete()

        except Exception as e:
            pass

在下一篇文章中,我会记录一下第二种方法,可以在不影响测试效率的情况下,依然可以有效的检查出注入;在设置一些sqlmapapi参数的前提下,比较一下sqlmapapi和burpsuite scanner模块那个更有效率。

热门排行

今日推荐

热门手游