阅读sqlmap源代码,编写burpsuite插件--sqlmapapi
时间:2022-03-15 08:49
burpsuite插件编写---sql injection
0x00 概要
在安全测试过程中,大部分人会使用burpsuite的scanner模块进行测试,可以发现一些浅显的漏洞:比如xss、sql injection、c***f、xxe、Arbitrary file existence disclosure in Act、明文传输等。
说到sql injection,测试人员都会有一种想法是否存在一款自动化工具,可以将某一网站的所有链接都去尝试一边,尽可能的发现所有的sql injection。有了这种想法后大家会去想解决方案,有一种解决方案是编写burpsuite插件。
0x01 为什么sqlmapapi只能检测get请求是否存在注入?
编写burpsuite插件检查sql injection,网上已经存在了很多代码。
如图中代码一样,网上所有的资料在调用sqlmapapi进行扫描时只能扫描get请求的链接,比如: 对post提交的参数无法进行扫描,开始我一直很迷惑到底是sqlmap没有提供该方法还是前辈们没有写,所以我就查看了一下sqlmap的源代码。
调用sqlmapapi的基本过程:
接下来我们看看为什么sqlmapapi不能进行post扫描:
1、首先查看sqlmapapi.py文件:
# Start the client or the server
if args.server is True:
server(args.host, args.port, adapter=args.adapter, username=args.username, password=args.password)
elif args.client is True:
client(args.host, args.port, username=args.username, password=args.password)
else:
apiparser.print_help()
sqlmapapi开启服务后,我们在插件中的请求就是一个客户端,所以我们需要关注 client(args.host, args.port, username=args.username, password=args.password)
2、cilent函数所在文件:
3、由于在api.py中存在大量代码,逐行查找很费劲,我们直接搜索def client
4、分析cilent函数
def client(host=RESTAPI_DEFAULT_ADDRESS, port=RESTAPI_DEFAULT_PORT, username=None, password=None):
"""
REST-JSON API client
"""
DataStore.username = username
DataStore.password = password
dbgMsg = "Example client access from command line:"
dbgMsg += "\n\t$ taskid=$(curl http://%s:%d/task/new 2>1 | grep -o -I ‘[a-f0-9]\{16\}‘) && echo $taskid" % (host, port)
dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)
dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/data" % (host, port)
dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/log" % (host, port)
logger.debug(dbgMsg)
addr = "http://%s:%d" % (host, port)
logger.info("Starting REST-JSON API client to ‘%s‘..." % addr)
try:
_client(addr)
except Exception, ex:
if not isinstance(ex, urllib2.HTTPError) or ex.code == httplib.UNAUTHORIZED:
errMsg = "There has been a problem while connecting to the "
errMsg += "REST-JSON API server at ‘%s‘ " % addr
errMsg += "(%s)" % ex
logger.critical(errMsg)
return
我们查看的重点代码是
dbgMsg = "Example client access from command line:"
dbgMsg += "\n\t$ taskid=$(curl http://%s:%d/task/new 2>1 | grep -o -I ‘[a-f0-9]\{16\}‘) && echo $taskid" % (host, port)
dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)
dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/data" % (host, port)
dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/log" % (host, port)
logger.debug(dbgMsg)
在 dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"\"}‘ " % (host, port)这一行代码表示开启一个任务扫描,Content-Type: application/json为请求头的部分;http://%s:%d/scan/$taskid/start
开启的具体那个任务;-X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘
post请求参数,其中只有url,没有关于post请求的data参数,因此sqlmapapi只能进行get请求的sql injection 检测。
0x02继承IHttpListener接口(方法一),存在缺陷
先上源代码:
from burp import IBurpExtender
from burp import IHttpListener
from java.io import PrintWriter
import re
import urllib
import urllib2
import time
import json
from threading import Thread
import requests
class BurpExtender(IBurpExtender, IHttpListener):
#
#implement IBurpExtender
#
def registerExtenderCallbacks(self, callbacks):
# keep a reference to our callbacks object
self._callbacks = callbacks
# set our extension name
callbacks.setExtensionName("fanyingjie")
# obtain our output stream
self._stdout = PrintWriter(callbacks.getStdout(), True)
self._helpers = callbacks.getHelpers()
# register ourselves as an
callbacks.registerHttpListener(self)
def processHttpMessage(self,toolFlag,messageIsRequest, messageInfo):
if(messageIsRequest):
a=self._helpers.analyzeRequest(messageInfo)
method=a.getMethod()
url=str(a.getUrl())
if(("?" in url) and (method=="GET")):
self._stdout.println("start")
t=AutoSqli(target=url,stdout=self._stdout,method=method)
t.run()
class AutoSqli(Thread):
def __init__(self,target,stdout,method):
self.server="http://192.168.159.134:8775"
self.taskid = ‘‘
self.target=target
self.method=method
self._stdout=stdout
self.start_time = time.time()
def task_new(self):
self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
self._stdout.println(‘Created new task: ‘ + self.taskid )
if len(self.taskid) > 0:
return True
return False
def task_delete(self):
if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
return True
return False
def scan_start(self):
headers = {‘Content-Type‘: ‘application/json‘}
payload = {‘url‘:self.target}
url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
#t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)
req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
t=json.loads(urllib2.urlopen(req).read())
self._stdout.println("start "+ self.taskid)
if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
return True
return False
def scan_status(self):
status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
if status == ‘running‘:
return ‘running‘
if status == ‘terminated‘:
return ‘terminated‘
return "error"
def scan_data(self):
data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
if len(data) == 0:
self._stdout.println(‘not injection:\t‘ + self.target)
return False
else:
self._stdout.println(‘injection:\t‘ + self.target)
return True
def scan_kill(self):
json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
self._stdout.println("%s kill")%(self.taskid)
def scan_stop(self):
json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
self._stdout.println("%s stop")%(self.taskid)
def run(self):
try:
if not self.task_new():
return False
if not self.scan_start():
return False
while True:
if self.scan_status() == ‘running‘:
time.sleep(10)
elif self.scan_status() == ‘terminated‘:
break
else:
break
#print self.target + ":\t" + str(time.time() - self.start_time)
if time.time() - self.start_time > 500:
self.scan_stop()
self.scan_kill()
break
self.scan_data()
#self.task_delete()
except Exception as e:
pass
在本插件中继承了IHttpListener接口,继承该接口后,每点击一次链接就会执行一次插件,只有当插件完成执行完成后,才能进行下一步:
访问链接后,浏览器一直等待回应,继承IHttpListener接口的方法很影响测试效率,不过优点是可以检查出存在注入的链接
接下来对该代码进行简单的讲解一下:
if(messageIsRequest): #当包是请求包时执行sql injection检查
a=self._helpers.analyzeRequest(messageInfo) #这是burp提供的一个函数,可以从请求包中获取到url method header等
method=a.getMethod()
url=str(a.getUrl())
if(("?" in url) and (method=="GET")): #当请求是get请求和链接中存在参数时进行sql injection 检查
self._stdout.println("start")
t=AutoSqli(target=url,stdout=self._stdout,method=method)
t.run()
class AutoSqli(Thread):
def __init__(self,target,stdout,method):
self.server="http://192.168.159.134:8775"#开启sqlmapapi服务的ip
self.taskid = ‘‘
self.target=target
self.method=method
self._stdout=stdout
self.start_time = time.time()
def task_new(self):#创建一个新的任务,并获取taskid
self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
self._stdout.println(‘Created new task: ‘ + self.taskid )
if len(self.taskid) > 0:
return True
return False
def task_delete(self):#通过taskid删除某一个任务
if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
return True
return False
def scan_start(self):#开始一个扫描,传入需要扫描的地址
headers = {‘Content-Type‘: ‘application/json‘}
payload = {‘url‘:self.target}
url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
#t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)
req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
t=json.loads(urllib2.urlopen(req).read())
self._stdout.println("start "+ self.taskid)
if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
return True
return False
def scan_status(self):#查看是否扫描完成,通过status判断,terminated是扫描完成
status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
if status == ‘running‘:
return ‘running‘
if status == ‘terminated‘:
return ‘terminated‘
return "error"
def scan_data(self):#获取扫描完成的结果,如果data有值表名存在注入,否则不存在
data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
if len(data) == 0:
self._stdout.println(‘not injection:\t‘ + self.target)
return False
else:
self._stdout.println(‘injection:\t‘ + self.target)
return True
def scan_kill(self):
json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
self._stdout.println("%s kill")%(self.taskid)
def scan_stop(self):
json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
self._stdout.println("%s stop")%(self.taskid)
def run(self):
try:
if not self.task_new():
return False
if not self.scan_start():
return False
while True:
if self.scan_status() == ‘running‘:
time.sleep(10)
elif self.scan_status() == ‘terminated‘:
break
else:
break
#print self.target + ":\t" + str(time.time() - self.start_time)
if time.time() - self.start_time > 500:
self.scan_stop()
self.scan_kill()
break
self.scan_data()
#self.task_delete()
except Exception as e:
pass