如何利用sqlmapapi发起扫描
时间:2023-05-21 09:24
sqlmap可谓是sql注入探测的神器,但是利用sqlmap测试SQL注入的效率很低,每一个url都需要手动测试。sqlmap的开发者新加了sqlmapapi.py,可以直接通过接口调用来操作,简化了sqlmap命令执行方式。 sqlmap api分为服务端和客户端,sqlmap api有两种模式,一种是基于HTTP协议的接口模式,一种是基于命令行的接口模式。 sqlmap源码下载地址:https://github.com/sqlmapproject/sqlmap/ python sqlmapapi.py -h 在使用API服务之前,需要先启动API服务器,无论是基于HTTP协议还是基于命令行的接口模式。要开启API服务端,只需运行以下命令:python sqlmapapi.py -s 命令成功后,在命令行中会返回一些信息。以下命令大概的意思是api服务端在本地8775端口上运行,admin token为c6bbb0c1f86b7d7bc2ed6ce3e3bbdcb5等 但是通过上面的这种方式开启api服务端有一个缺点,当服务端和客户端不是一台主机会连接不上,因此如果要解决这个问题,可以通过输入以下命令来开启api服务端:python sqlmapapi.py -s -H "0.0.0.0" -p 8775 命令成功后,远程客户端就可以通过指定远程主机IP和端口来连接到API服务端。 python sqlmapapi.py -c 如果是客户端和服务端不是同一台计算机的话,输入以下命令: python sqlmapapi.py -c -H "192.168.1.101" -p 8775 3.3.1.new命令 new -u "url" 实例:new -u "http://www.baidu.com" 虽然我们仅仅只指定了-u参数,但是从返回的信息中可以看出,输入new命令后,首先先请求了/task/new,来创建一个新的taskid,后又发起了一个请求去开始任务,因此可以发现该模式实质也是基于HTTP协议的。 3.3.2. status 命令 获取该任务的扫描状态,若返回内容中的status字段为terminated,说明扫描完成,若返回内容中的status字段为run,说明扫描还在进行中。下图是扫描完成的截图: 3.3.3. data 命令 如果返回的数据中data字段非空,则可以得出注入已经成功。这个例子展示了一个含有 SQL 注入的返回内容,其中包含数据库类型、payload 和注入参数等信息。 简单介绍下sqlmapapi.py的h基于http接口调用模式的主要函数,进入到lib/utils/api.py的server类,可以发现通过向server提交数据进行与服务的交互。 一共分为3种类型。 Users' methods 用户方法 Admin function 管理函数 sqlmap core interact functions 核心交互函数 可以提交数据的种类如下: @get("/task/new") @get("/task/delete") @get("/option/list") @post("/option/get") @post("/option/set") @post("/scan/start") @get("/scan/stop") @get("/scan/kill") @get("/scan/status") @get("/scan/data") @get("/scan/log") @get("/download/") @get("/admin/list") @get("/admin//flush") 从sqlmapapi.py文件分析,提取调用关系。这些操作可以充分满足我们的测试需求,因此可以利用它们来进行批量操作。 sqlmapapi.py很方便的提供了http请求入口,但是使用起来只能得到最终是否注入的结果,每个接口进行注入扫描时具体发起了什么样的请求,多少个请求就难以得到,下面分享一下个人梳理sqlmap源码后记录的流程图。从图中可以定位到具体发起payload级别的请求位置,从而想要获取发起了什么样的请求,多少个请求,只需要在此处增加自定义代码即可。 以上就是如何利用sqlmapapi发起扫描的详细内容,更多请关注Gxl网其它相关文章!一、查看帮助
二、开启api服务端
三、基于命令行的接口模式
3.1、开启客户端,发起注入命令
3.2、help命令,获取所有命令
help 显示帮助信息new ARGS 开启一个新的扫描任务use TASKID 切换taskid data 获取当前任务返回的数据log 获取当前任务的扫描日志status 获取当前任务的扫描状态option OPTION 获取当前任务的选项options 获取当前任务的所有配置信息stop 停止当前任务 kill 杀死当前任务list 显示所有任务列表flush 清空所有任务exit 退出客户端
3.3、检测注入
四、基于HTTP协议的接口模式
4.1、用户方法
@get("/task/new")def task_new(): """ Create a new task """ taskid = encodeHex(os.urandom(8), binary=False) remote_addr = request.remote_addr DataStore.tasks[taskid] = Task(taskid, remote_addr) logger.debug("Created new task: '%s'" % taskid) return jsonize({"success": True, "taskid": taskid})
@get("/task/<taskid>/delete")def task_delete(taskid): """ Delete an existing task """ if taskid in DataStore.tasks: DataStore.tasks.pop(taskid) logger.debug("(%s) Deleted task" % taskid) return jsonize({"success": True}) else: response.status = 404 logger.warning("[%s] Non-existing task ID provided to task_delete()" % taskid) return jsonize({"success": False, "message": "Non-existing task ID"})
4.2、核心交互函数
@post("/option/<taskid>/set")def option_set(taskid): """ Set value of option(s) for a certain task ID """ if taskid not in DataStore.tasks: logger.warning("[%s] Invalid task ID provided to option_set()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) if request.json is None: logger.warning("[%s] Invalid JSON options provided to option_set()" % taskid) return jsonize({"success": False, "message": "Invalid JSON options"}) for option, value in request.json.items(): DataStore.tasks[taskid].set_option(option, value) logger.debug("(%s) Requested to set options" % taskid) return jsonize({"success": True})
@post("/scan/<taskid>/start")def scan_start(taskid): """ Launch a scan """ if taskid not in DataStore.tasks: logger.warning("[%s] Invalid task ID provided to scan_start()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) if request.json is None: logger.warning("[%s] Invalid JSON options provided to scan_start()" % taskid) return jsonize({"success": False, "message": "Invalid JSON options"}) # Initialize sqlmap engine's options with user's provided options, if any for option, value in request.json.items(): DataStore.tasks[taskid].set_option(option, value) # Launch sqlmap engine in a separate process DataStore.tasks[taskid].engine_start() logger.debug("(%s) Started scan" % taskid) return jsonize({"success": True, "engineid": DataStore.tasks[taskid].engine_get_id()})
@get("/scan/<taskid>/stop")def scan_stop(taskid): """ Stop a scan """ if (taskid not in DataStore.tasks or DataStore.tasks[taskid].engine_process() is None or DataStore.tasks[taskid].engine_has_terminated()): logger.warning("[%s] Invalid task ID provided to scan_stop()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) DataStore.tasks[taskid].engine_stop() logger.debug("(%s) Stopped scan" % taskid) return jsonize({"success": True})
@get("/scan/<taskid>/kill")def scan_kill(taskid): """ Kill a scan """ if (taskid not in DataStore.tasks or DataStore.tasks[taskid].engine_process() is None or DataStore.tasks[taskid].engine_has_terminated()): logger.warning("[%s] Invalid task ID provided to scan_kill()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) DataStore.tasks[taskid].engine_kill() logger.debug("(%s) Killed scan" % taskid) return jsonize({"success": True})
@get("/scan/<taskid>/status")def scan_status(taskid): """ Returns status of a scan """ if taskid not in DataStore.tasks: logger.warning("[%s] Invalid task ID provided to scan_status()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) if DataStore.tasks[taskid].engine_process() is None: status = "not running" else: status = "terminated" if DataStore.tasks[taskid].engine_has_terminated() is True else "running" logger.debug("(%s) Retrieved scan status" % taskid) return jsonize({ "success": True, "status": status, "returncode": DataStore.tasks[taskid].engine_get_returncode() })
@get("/scan/<taskid>/data")def scan_data(taskid): """ Retrieve the data of a scan """ json_data_message = list() json_errors_message = list() if taskid not in DataStore.tasks: logger.warning("[%s] Invalid task ID provided to scan_data()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) # Read all data from the IPC database for the taskid for status, content_type, value in DataStore.current_db.execute("SELECT status, content_type, value FROM data WHERE taskid = ? ORDER BY id ASC", (taskid,)): json_data_message.append({"status": status, "type": content_type, "value": dejsonize(value)}) # Read all error messages from the IPC database for error in DataStore.current_db.execute("SELECT error FROM errors WHERE taskid = ? ORDER BY id ASC", (taskid,)): json_errors_message.append(error) logger.debug("(%s) Retrieved scan data and error messages" % taskid) return jsonize({"success": True, "data": json_data_message, "error": json_errors_message})
4.3、管理函数
@get("/admin/list")@get("/admin/<token>/list")def task_list(token=None): """ Pull task list """ tasks = {} for key in DataStore.tasks: if is_admin(token) or DataStore.tasks[key].remote_addr == request.remote_addr: tasks[key] = dejsonize(scan_status(key))["status"] logger.debug("(%s) Listed task pool (%s)" % (token, "admin" if is_admin(token) else request.remote_addr)) return jsonize({"success": True, "tasks": tasks, "tasks_num": len(tasks)})
@get("/admin/flush")@get("/admin/<token>/flush")def task_flush(token=None): """ Flush task spool (delete all tasks) """ for key in list(DataStore.tasks): if is_admin(token) or DataStore.tasks[key].remote_addr == request.remote_addr: DataStore.tasks[key].engine_kill() del DataStore.tasks[key] logger.debug("(%s) Flushed task pool (%s)" % (token, "admin" if is_admin(token) else request.remote_addr)) return jsonize({"success": True})
五、利用sqlmapapi发起扫描
六、sql注入自动化实现流程