Python3开发--19--Docker与python的碰撞结合

xiaoxiao2022-06-11  27

一、Docker安全

# --------------------------------------------------------- # 主扫描任务 # --------------------------------------------------------- @dockerSec.task(bind=True) def scan_task(self, ImageName): pull = "docker pull %s" % ImageName os.system(pull) analyze = "anchore analyze --image %s --imagetype base" % ImageName os.system(analyze) cve_scan = "anchore --json query --image %s cve-scan all" % ImageName cve = os.popen(cve_scan).readlines() # 删除不需要的键值,请直接删除相关源代码 return cve parser = reqparse.RequestParser() parser.add_argument('imagename', type=unicode, required=True, help='imagename', location='json') class ScanTask(Resource): def get(self): pass def post(self): args = parser.parse_args() ImageName = args['imagename'] try: scan_start_time = time.strftime("%Y-%m-%d %H:%M:%S") group([scan_task.s(ImageName)]) jobA = scan_task.apply_async(args=[ImageName]) print jobA.id while True: status = scan_task.AsyncResult(jobA.id) if status.state == 'SUCCESS': scan_end_time = time.strftime("%Y-%m-%d %H:%M:%S") result = status.result break time.sleep(3) except: abort(500, message="Image {} scan failed".format(ImageName)) return scan_start_time, '', scan_end_time, '', result # --------------------------------------------------------- # 停止扫描任务(完成) # --------------------------------------------------------- @dockerSec.task def stop_task(task_id): current_app.control.revoke(task_id, terminate=True, signal='SIGKILL') #from celery import app #app.control.revoke(task_id, terminate=True, signal='SIGKILL') class StopTask(Resource): def get(self, task_id): try: group([stop_task.s(task_id)]) stop_task.apply_async(args=[task_id]) except: abort(500, message="Task Stop Failed") return {'status': 'Task Stop Success'}, 200 if __name__ == '__main__': ''' api.add_resource(AllScanList, '/allscanlist') api.add_resource(SingleScanTask, '/singlescantask/<string:task_id>') ''' api.add_resource(ScanTask, '/scantask') # post方法 api.add_resource(StopTask, '/stoptask/<string:task_id>') # get方法 manager = Manager(app) manager.run()

二、Docker漏洞扫描引擎

readme.md

============================================================== 自动安装: 1、执行install.sh,自动安装所需环境(一般只执行一次即可) 注:环境安装正常情况下非常耗时 2、执行start_up.sh,启动服务(每次启动进程都要执行) 3、访问接口即可 ============================================================== 手动安装: 安装运行所需的python库: pip2 install flask pip2 install flask_script pip2 install flask_restful pip2 install celery 安装Docker: sudo yum install -y yum-utils device-mapper-persistent-data lvm2 sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo #shell脚本中默认是aliyun的,如需添加官方的为这个:https://download.docker.com/linux/centos/docker-ce.repo sudo yum makecache fast sudo yum -y install docker 安装Anchore: sudo yum install epel-release sudo yum install rpm-python pip2 install docker-compose sudo yum install dpkg pip2 install anchore \导入CVE指纹库: export PATH=~/.local/bin:$PATH anchore feeds list anchore feeds sync 安装redis: yum install redis ============================================================== 引擎功能: 1、获取Docker中的镜像列表,并展示给前端页面,前端页面选择要扫描哪一个镜像 2、拉取要扫描的镜像(可以并发) 3、扫描镜像并生成报告到数据库,扫描过程中可以停止任务(不是暂停,也不能删除)(可以并发) 4、从数据库中读取扫描报告 ============================================================== 模块说明: ScanTask.py : 扫描并在本地生成.txt格式的报告 Report.py: 调用ScanTask.py完成扫描,并读取本地.txt格式报告到数据库,随后删除本地报告 BaseTask.py : 获取镜像列表并展示给前端,让前端做选择 拉取要选择的镜像 停止扫描任务 ScanAPI.py: API封装。所有API都在这里 database.py: 读取配置文件及连接数据库 model.py: 将数据写入数据库 config: 配置文件 ============================================================== 接口说明: 1、获取镜像列表并展示 :/getscanlist 请求URL如:http://127.0.0.1:5000/getscanlist 获取docker镜像库中的镜像列表 获取镜像列表成功返回:镜像列表 获取镜像列表失败返回:"获取镜像列表失败",状态码:200 2、之后前端做选择,选择结果形成get请求形式:/image?image=所选择的镜像名 请求URL如:http://127.0.0.1:5000/image?image=nginx 表明前端选择的镜像是nginx 如果该过程请求正常:nginx将自动赋值给变量ImageName 如果该过程爆出异常返回:"选择镜像出现异常",状态码:200 3、拉取前端选择的镜像:/pullimage 请求URL如:http://127.0.0.1:5000/pullimage 从镜像库中拉取刚才选择的镜像。本例中为nginx 该过程正常返回:"镜像拉取成功",状态码:200 该过程异常返回:"镜像拉取失败",状态码:200 4、停止扫描任务:/stoptask 请求URL如:http://127.0.0.1:5000/stoptask 停止扫描任务,本例中为停止nginx的扫描任务 如果扫描任务停止正常返回: "扫描任务已停止",状态码:200 如果扫描任务停止异常返回: "扫描任务停止出现异常",状态码:200 5、开始扫描并自动写入报告到数据库:/scantask 请求URL如:http://127.0.0.1:5000/scantask 开始扫描并自动写入报告到数据库。本例中为扫描nginx镜像并将扫描报告写入数据库 扫描正常结束返回:"扫描完成,请查看报告",状态码:200 扫描出现异常返回:"扫描失败,请查看原因",状态码:200 6、从数据库中读取报告:/report/所选择的镜像名 请求URL如:http://127.0.0.1:5000/report/nginx 从数据库中提取所有同一镜像名的报告。本例中为读取所有镜像名为nginx的报告。 因为扫描任务名为所选择的镜像名,当出现一模一样的镜像名扫描了两次时,就会同时读取本次扫描报告和历史扫描报告。 读取正常返回:扫描报告内容 读取异常返回:404,message="Image nginx dosn't scan" //其中nginx是动态的

install.sh

#!/bin/bash echo "安装运行所需的python库" pip2 install flask pip2 install flask_script pip2 install flask_restful pip2 install celery echo "安装Docker" sudo yum install -y yum-utils device-mapper-persistent-data lvm2 sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo sudo yum makecache fast sudo yum -y install docker echo "安装Anchore" sudo yum install epel-release sudo yum install rpm-python pip2 install docker-compose sudo yum install dpkg pip2 install anchore echo "导入CVE指纹库" export PATH=~/.local/bin:$PATH anchore feeds list anchore feeds sync echo "安装redis" yum install redis

start_up.sh

#!/bin/bash echo "启动Anchore服务" export PATH=~/.local/bin:$PATH echo "启动docker服务" sudo systemctl start docker echo "启动redis服务" redis-server echo "启动多进程并行" celery -A ScanAPI.dockerSec worker -l INFO echo "启动API服务" python ScanAPI.py runserver

database.py

from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.ext.declarative import declarative_base import ConfigParser import os config = ConfigParser.ConfigParser() config.read("%s/config" % os.path.dirname(os.path.abspath(__file__))) engine = create_engine(config.get('database', 'url'), convert_unicode=True) db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine)) Base = declarative_base() Base.query = db_session.query_property() def init_db(): import model Base.metadata.drop_all(bind=engine) Base.metadata.create_all(bind=engine)

module.py

#! /usr/bin/env python #-*- coding:utf-8 -*- from database import Base from sqlalchemy import Column, Integer, String, Text from flask_restful import fields class DockerScan(Base): resource_fields = { 'id': fields.Integer, 'scan_name': fields.String, 'scan_time': fields.String, 'scan_result': fields.String, } __tablename__ = 'DockerScan' id = Column(Integer, primary_key=True) scan_name = Column(String(50)) scan_time = Column(String(50)) scan_result = Column(Text()) def __init__(self, scan_name="", scan_time="", scan_result=""): self.scan_name = scan_name self.scan_time = scan_time self.scan_result = scan_result

BaseTask.py

#!/usr/bin/python #-*- coding:utf-8 -*- import os import sys from flask import jsonify from itertools import islice reload(sys) sys.setdefaultencoding('utf-8') # ---------------------------------------------------------- # 获取镜像列表 # ---------------------------------------------------------- def ImageTask(): lis = '' DockerList = """docker images | awk '{print $1,":",$3}'""" list = os.popen(DockerList).readlines() # ------------------------------------------------------ # 展示镜像列表(去掉标题和空行/空格) # ------------------------------------------------------ for line in islice(list,1,None): data = line.strip() if len(data) != 0: l = data.replace(' ','') # 展示举例:nginx lis = lis + l + ' ' list_image = lis.split() return jsonify(list_image) # ---------------------------------------------------------- # 拉取镜像 # ---------------------------------------------------------- def PullImage(DockerImage): DockerPull = "docker pull %s" % DockerImage os.system(DockerPull) # ---------------------------------------------------------- # 停止进程。写入了一半的文件不做删除 # ---------------------------------------------------------- def StopTask(): try: sys.exit() # 停止进程。默认不停止,须手动触发 except SystemExit, e: return 'ScanTask be Stoped' if '__name__' == '__main__': from ScanAPI import Image #循环导入,保证要在使用时再导入 ImageTask() image = Image() PullImage(image.get()) StopTask()

ScanTask.py

#!/usr/bin/python #-*- coding:utf-8 -*- import os import sys from BaseTask import * from itertools import islice reload(sys) sys.setdefaultencoding('utf-8') # ---------------------------------------------------------- # 报告处理。如果文件名称重复,则自动在后面添加“_数字”类推 # ---------------------------------------------------------- class Output(object): # 控制台内容生成txt报告 def __init__(self, check_filename = "default.log"): self.terminal = sys.stdout self.log = open(check_filename, "w") def write(self, message): self.terminal.write(message) self.log.write(message) def flush(self): # 即时更新 pass ''' # 判断文件名是否存在 def check_filename(filename): n = [2] def check_meta(file_name): file_name_new = file_name if os.path.isfile(file_name): file_name_new = file_name[:file_name.rfind('.')]+'_'+str(n[0])+file_name[file_name.rfind('.'):] n[0] += 1 if os.path.isfile(file_name_new): file_name_new = check_meta(file_name) return file_name_new return_name = check_meta(filename) print return_name return return_name ''' # ---------------------------------------------------------- # 镜像扫描 # ---------------------------------------------------------- def ScanTask(DockerImage): # name = check_filename('%s.txt'% DockerImage) sys.stdout = Output('%s.txt'% DockerImage) # print "本次扫描的镜像是: %s" % DockerImage # CVE漏洞扫描 cve_scan = "anchore query --image %s cve-scan all" % DockerImage cve = os.popen(cve_scan).readlines() print "CVE漏洞扫描结果:\n------------------------------------------" for line in islice(cve,0,None): cve_data = line.strip() if len(cve_data) != 0: print cve_data # 镜像常规分析 analysis = "anchore analyze --image %s --imagetype base" % DockerImage os.popen(analysis).readlines() analysis_scan = "anchore gate --image %s" % DockerImage analysis_result = os.popen(analysis_scan).readlines() print "\n镜像常规分析结果:\n------------------------------------------" for line in islice(analysis_result,0,None): analysis_result_data = line.strip() if len(analysis_result_data) != 0: print analysis_result_data # 扫描与纯净镜像的区别 pure_scan = "anchore query --image %s show-file-diffs base" % DockerImage pure = os.popen(pure_scan).readlines() print "\n提取与纯净镜像的对比区别:\n------------------------------------------" for line in islice(pure,0,None): pure_data = line.strip() if len(pure_data) != 0: print pure_data # 镜像特征提取 feature_scan = "anchore toolbox --image %s show" % DockerImage feature = os.popen(feature_scan).readlines() print "\n镜像特征提取结果:\n------------------------------------------" for line in islice(feature,0,None): feature_data = line.strip() if len(feature_data) != 0: print feature_data if __name__ == "__main__": from ScanAPI import Image image = Image() ScanTask(image.get())

Report.py

#!/usr/bin/python #-*- coding:utf-8 -*- import os import sys import time from database import db_session from flask_restful import marshal_with from model import DockerScan reload(sys) sys.setdefaultencoding('utf-8') # ---------------------------------------------------------- # 报告处理。 # ---------------------------------------------------------- @marshal_with(DockerScan.resource_fields) def Scan_Report(): Report_List = "python ScanTask.py" os.system(Report_List) scan_time = time.strftime("%Y-%m-%d %H:%M:%S") # scan_time是写入数据库之一 from ScanAPI import Image image = Image() scan_name = image.get() file_a = open('%s.txt' % scan_name, 'r') # 支持所有换行符 lis = '' for line in file_a: # 遍历文件,构造scan_result lis = lis + line + '\t' list1 = lis.split('\t') # 指定分隔符切片 scan_result = "".join(list1) # 转换为字符串 args = { 'scan_name': scan_name, 'scan_time': scan_time, 'scan_result': scan_result } dockerscan = DockerScan(**args) try: db_session.add(dockerscan) db_session.commit() except Exception,e: print e os.remove('%s.txt'% scan_name) if __name__ == "__main__": Scan_Report()

ScanAPI.py 主引擎-API封装功能

#!/usr/bin/python #-*- coding:utf-8 -*- from BaseTask import StopTask from Report import Scan_Report import sys from celery import Celery from celery import group from flask_script import Manager from flask import Flask, jsonify, request from flask_restful import Resource, Api, abort from flask_restful import marshal_with from model import DockerScan from database import db_session from sqlalchemy import and_ reload(sys) sys.setdefaultencoding('utf-8') app = Flask(__name__) api = Api(app) er = 'redis://127.0.0.1:6379/5' end = 'redis://127.0.0.1:6379/6' dockerSec = Celery('ScanAPI', broker=er, backend=end) # 并行任务设定 @dockerSec.task def pull_image(): from BaseTask import PullImage image = Image() try: PullImage(image.get()) # 传入参数,拉取镜像 except: return {'status': '拉取镜像失败'}, 200 else: return {'status': '拉取镜像成功'}, 200 @dockerSec.task def scan_task(): try: Scan_Report() # 扫描并生成报告 except: return {'status': '扫描失败,请查看原因'}, 200 else: return {'status': '扫描完成,请查看报告'}, 200 @dockerSec.task def stop_task(): try: StopTask() except: return {'status': '扫描任务停止出现异常'}, 200 else: return {'status': '扫描任务已停止'}, 200 # 停止任务 # 获取镜像列表(完成) class GetScanList(Resource): # 无需并行 def get(self): from BaseTask import ImageTask try: return ImageTask() except: return {'status': '获取镜像列表失败'}, 200 # 前端选择 class Image(Resource): #选择镜像 def get(self): try: ImageName = request.args.get('image') # "nginx" except: return {'status': '选择镜像出现异常'}, 200 else: return ImageName # 拉取选定的镜像(完成) class pullImage(Resource): def get(self): jobB = group([pull_image.s()]) # group为一组,该组任务并发执行 jobB.apply_async() # apply_async是进程池 return jobB # 停止扫描任务(完成) class stopTask(Resource): def get(self): jobC = group([stop_task.s()]) # group为一组,该组任务并发执行 jobC.apply_async() return jobC # 镜像扫描开始(完成) class scanTask(Resource): def get(self): jobD = group([scan_task.s()]) # 扫描,并同时将报告写入数据库 jobD.apply_async() return jobD # 报告(完成) class report(Resource): @marshal_with(DockerScan.resource_fields) def get(self, ScanName): scanName = db_session.query(DockerScan).filter(and_(DockerScan.scan_name == ScanName)).all() if scanName: return scanName else: abort(404, message="Image {} doesn't scan".format(ScanName)) # 主页。不用每次都看代码再输入地址测试(完成) class index(Resource): def get(self): list_a = """该页面为主页。请键入具体接口地址: ----------------- /getscanlist //展示镜像列表 /image?image=选定镜像名 //选择了要扫描的镜像 /pullimage //拉取镜像 /stoptask //停止扫描 /scantask //开始扫描并生成报告 /report/所选择的镜像名 //查看扫描报告 """ list_b = list_a.split('\n') return jsonify(list_b) if __name__ == '__main__': # app.run(debug=True) api.add_resource(GetScanList, '/getscanlist') api.add_resource(pullImage, '/pullimage') api.add_resource(stopTask, '/stoptask') api.add_resource(scanTask, '/scantask') api.add_resource(report, '/report/<string:ScanName>') api.add_resource(Image, '/image') # 浏览器输入的地址是要和类中的操作做拼接的,所以比这个要复杂 api.add_resource(index, '/') manager = Manager(app) manager.run()

config

[database] url: mysql+pymysql://root:root123456@localhost:3306/DockerScan?charset=utf8 [celery] broker_url: redis://127.0.0.1:6379 随 亦 认证博客专家 安全博客专家 甲方安全部负责人,坐标杭州,欢迎猎挖。擅长安全架构、web渗透、移动安全、代码审计、隐私合规、安全开发、安全运营
转载请注明原文地址: https://www.6miu.com/read-4930181.html

最新回复(0)