现在直播平台很多,对这些平台的数据攫取相信也有很多 .一些很深的技术不讲也不会,可以大概说下我的思路,以及实现过程,这是一个简单的系统,包括了前台与后台的全部工作.很丑.不嫌弃…
系统架构 实现细节 踩过的坑说明 : bilibili弹幕机:爬虫,爬取直播间的弹幕信息 数据处理服务:以合适的方式检索redis数据库中的数据,生成目标通过Redis的消息服务发布. Redis:本项目中使用的数据库,提供存储服务与消息服务. 界面:动态可视化目标数据.
bilibili弹幕机 基于aiohttp,能够异步处理大量并发,效果非常好,后期也计划支持动态删减任务. 主要代码:
async def fetch_all_danmu(roomlist): tasks = [] for i in roomlist: danmuji = bilibiliClient() r = redis.Redis(host='172.17.0.2', port=6379,db=11) q = queue.Queue() task = asyncio.ensure_future(danmuji.connectServer(r,q,roomid=i,w=True)) tasks.append(task) task = asyncio.ensure_future(danmuji.HeartbeatLoop()) tasks.append(task) print("this is romm ",i) # _=await asyncio.gather(*tasks) async def print_hello(i): await asyncio.sleep(10) print("this is ",i) if __name__ == '__main__': scheduler = AsyncIOScheduler() # scheduler.add_job(fetch_all, args=[URL_LIST], trigger='interval', seconds=2) lists = [] lists.append("865138") lists.append("5269") scheduler.add_job(fetch_all_danmu, args=[lists]) # scheduler.add_job(print_hello,args=[1]) scheduler.start() print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed. try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): pass没有贴爬虫的具体内容,网上方法也比较多.我是参考GIT上的bilibili_danmu实现的.
数据处理服务 由redis中读取数据 使用redis_timeseries作为时间序列数据的存储的工具.
pip install redis_timeseries这个工具的存储方式其实也比较简单,通过检索keys可以发现,其实是对数据以及时间进行了封装.封装结构为
':'.join([self.base_key, granularity, str(timestamp_key), str(key)])所以在定义了你想要的数据结构后,也可以使用redis的原生检索方法进行检索. 定义key:"roomid{0}:{1}".format(roomid,data)
data = redis.Redis(host='172.17.0.2', port=6379,db=11) def mess_pub(): my_granularities={} my_granularities['1second']={'ttl': redis_timeseries.days(1), 'duration': redis_timeseries.seconds(1)} ts = redis_timeseries.TimeSeries(data,base_key="lyt",granularities=my_granularities) while True: count = len(ts.scan_keys("1second",10,"roomid5269:*")) wcount = ts.scan_keys('1second',60,'roomwatcher5269:*')[-1].split(":")[-1] print("pub count and wcount :{0},{1}".format(count ,wcount)) s=json.dumps({"count":float(count),"wcount":float(wcount)})#变为bytes数组 data.publish('summary', u'%s' % (s)) time.sleep(2) if __name__ == '__main__': mess_pub()数据存储服务:已经说过了,最终选择了redis_timeseries作为存储工具,能够很好的支持时间序列数据. 消息发布服务:使用redis原生的消息发布订阅组件.
使用Flask+echarts实现.这一部分代码比较多,重点是可视化部分.前端本人不太会,所以很丑. 效果: HTML代码:
<!DOCTYPE html> <html> <head> <meta charset="utf-8"> <title>Proudly presented by ECharts</title> {% for jsfile_name in script_list %} <script src="{{host}}/{{jsfile_name}}.js"></script> {% endfor %} <script> var source = new EventSource("{{ url_for('live.stream') }}"); source.onmessage = function (event) { var summary = JSON.parse(event.data); document.getElementById("text").value = summary.count.toString() + " " + summary.wcount.toString();}; </script> </head> <body> <form> <input type="text" id="text" size="35" /> <a id="update">0</a> <div style="width: 800px;height: 100px;"><textarea id="info" style="width: 700px;height: 90px;"></textarea></div> <a>bilibili直播间观看人数与活跃人数对比(勿对号入座)</a> <div id="main" style="width: 800px;height:300px;"></div> <script language=javascript> function create_graph() { document.getElementById("text").value="test"; var myChart = echarts.init(document.getElementById('main')); function randomData() { now = new Date(+now + oneMin); value = value + Math.random() * 21 - 10; return { name: now.toString(), value: [ time_str(now), Math.round(0) ] } } var data = []; var data1 = []; var t = new Date() var now = +new Date(t.getFullYear(), t.getMonth()+1, t.getDate(),0,0,0); var oneDay = 24 * 3600 * 1000; var oneMin = 60 * 1000; var value = Math.random() * 1000; for (var i = 0; i < 60; i++) { data.push(randomData()); data1.push(randomData()); } var option = { title: { text: '' }, legend: { //图表上方的类别显示 show:true, data:['观看人数','活跃人数'] }, tooltip: { trigger: 'axis', formatter: function (params) { document.getElementById("info").innerText = "观看人数:" + params[0].value + "\n"+"活跃人数:" + params[1].value; params = params[0]; {# var date = new Date(params.name);#} {# return "观看人数:" + params[0].value + "\n"+"活跃人数:" + params[1].value;#} }, axisPointer: { animation: false } }, xAxis: { type: 'time', splitLine: { show: false } }, yAxis: { type: 'value', boundaryGap: [0, '100%'], splitLine: { show: false } }, series: [{ name: '观看人数', type: 'line', showSymbol: false, hoverAnimation: false, data: data }, { name: '活跃人数', type: 'line', showSymbol: false, hoverAnimation: false, data: data1 }] }; myChart.setOption(option) window.line = self.setInterval(function () { var ele = document.getElementById("text"); var value = ele.value.toString().split(" ") now = new Date(+now + oneMin); data.shift(); data.push({ name: now.toString(), value: [ time_str(now), Math.round(value[1]) ] }); data1.shift(); data1.push({ name: now.toString(), value: [ time_str(now), Math.round(value[0]) ] }); myChart.setOption({ series: [{ data: data }, { data: data1 }] }); }, 2000);}; function time_str(now) { Y = now.getFullYear()+'/'; M = (now.getMonth()+1)+'/'; D = now.getDate()+" "; h = now.getHours()+":"; m = now.getMinutes(); return Y+M+D+h+m; } create_graph(); </script> </form> </body> </html>网页中使用eventHandle处理来自Redis消息,代码为
def event_stream(): pubsub = red.pubsub() pubsub.subscribe('summary') mess = "" for message in pubsub.listen(): if message['data']!=1: # print(message['data'].decode("utf-8")) mess = {k : v for k, v in json.loads(message['data'].decode("utf-8")).items()} yield 'data:{0}\n\n'.format(json.dumps(mess)) @live.route('/stream') def stream(): return flask.Response(event_stream(), mimetype="text/event-stream")坑: 爬虫需要很长的时间调试参数 整个流程虽然简单,也是消耗了一天的时间,废… …很多 ,不想写了
以上…..