RsStreamer监控模块设计

0x1 需求分析

0x10 HTTP监控

RsStreamer作为P2P流媒体服务器,需要和很多并发连接的P2P播放器进行通讯,把媒体数据发送给这些P2P播放器进行播放,另外还需要和其他如认证服务器进行通讯,进行用户验证等操作.

RsStreamer在运行的过程中,很多内部状态需要及时地提供给用户,如统计有多少个用户观看了某部影片,提供某部影片流媒体服务的总带宽是多少,通过http服务的形式提供这些内部状态的监控信息,用户可以通过浏览器来直观地访问这些数据,从而了解系统运行的相关信息.

另外用户还可以通过浏览器来配置RsStreamer,如限制某个用户的带宽,对某个特定的媒体频道限制用户数目.

其他流媒体平台如helix server也提供网页形式的界面供用户查看运行状态和进行相关配置管理.

其他开源软件如nginx也有类似的功能.

0x11 数据分析

通过RsStreamer提供的http服务,把系统运行的相关信息通过RESTful接口的形式提供给数据分析系统(如elasticsearch),进行分析以后,可以对用户播放流媒体影片的行为进行数据挖掘,训练影片推荐系统,从而提供更好的用户体验.

下面先分析一下haproxy系统中监控模块是如何实现的,然后再提出RsStreamer中监控模块的设计方案.

0x2 HAProxy的实现

0x20 总体结构

haproxy中有关监控模块的总体结构图如下所示.
haproxy stats architecture

http service基于epoll对外提供http服务,接受浏览器的请求,建立和浏览器之间的tcp连接,根据运行状态生成对应的html格式数据,通过tcp把html数据发送给浏览器,浏览器接收到数据以后显示haproxy的运行状态.

0x21 代码流程

程序执行流程图如下

haproxy sequence

程序的执行从main函数开始,然后调用poller函数等待事件的触发.
有stats的请求到来,poller函数触发.
然后根据运行状态生成html格式数据,通过tcp发生给浏览器.
下面对这个流程图的执行过程进行进一步的分析.

0x22 相关代码模块分析

处理网络连接的主循环

该循环调用poller的poll函数,检测是否有socket事件发生,用户在浏览器中输入http://127.0.0.1:1080/stats, haproxy收到请求,run_poll_loop()检测到有client来连接,建立tcp连接,然后调用applet_run_active()来处理这些事件.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* Runs the polling loop */
static void run_poll_loop()
{
while (1) {
/* Process a few tasks */
process_runnable_tasks();
/* check if we caught some signals and process them */
signal_process_queue();
/* Check if we can expire some tasks */
next = wake_expired_tasks();
/* stop when there's nothing left to do */
if (jobs == 0)
break;
/* expire immediately if events are pending */
if (fd_cache_num || tasks_run_queue || signal_queue_len || applets_active_queue)
next = now_ms;
/* The poller will ensure it returns around <next> */
cur_poller.poll(&cur_poller, next);
fd_process_cached_events();
applet_run_active();
}
}

poller的poll()函数实现

1.首先扫描fd更新列表fd_updt,找到需要处理的fd,再调用epoll_ctl()来往内核来注册epoll事件.
2.然后调用epoll_wait()等待事件的发生,事件触发以后,epoll_wait()函数返回,返回值是触发的事件数目.
3.再遍历已经触发的事件,根据是要读还是写分别调用fd_may_recv()和fd_may_send().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
REGPRM2 static void _do_poll(struct poller *p, int exp)
{
... ...
/* first, scan the update list to find polling changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed */
... ...
epoll_ctl(epoll_fd, opcode, fd, &ev);
}
}
... ...
/* now let's wait for polled events */
status = epoll_wait(epoll_fd, epoll_events, global.tune.maxpollevents, wait_time);
/* process polled events */
for (count = 0; count < status; count++) {
... ...
fd = epoll_events[count].data.fd;
... ...
if (n & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
fd_may_recv(fd);
if (n & (FD_POLL_OUT | FD_POLL_ERR))
fd_may_send(fd);
}
}

applet_run_active()函数实现

从applet队列中找到需要执行的任务,调用相应的handler函数,对stats模块,其handler函数为http_stats_io_handler().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void applet_run_active()
{
... ...
while (!LIST_ISEMPTY(&applet_cur_queue)) {
curr = LIST_ELEM(applet_cur_queue.n, typeof(curr), runq);
... ...
si_applet_cant_get(si);
si_applet_stop_put(si);
curr->applet->fct(curr);
si_applet_wake_cb(si);
channel_release_buffer(si_ic(si), &curr->buffer_wait);
... ...
}
}

stats模块在applet中的注册

把stats的处理函数http_stats_io_handler注册到applet中.
请注意关键字attribute((constructor))是gcc的关键字,表明这个函数是在main()函数执行之前被调用.

1
2
3
4
5
6
7
8
9
10
11
12
truct applet http_stats_applet = {
.obj_type = OBJ_TYPE_APPLET,
.name = "<STATS>", /* used for logging */
.fct = http_stats_io_handler,
.release = NULL,
};
__attribute__((constructor))
static void __stat_init(void)
{
cli_register_kw(&cli_kws);
}

生成html格式监控数据

生成html格式监控数据的调用堆栈如下.

1
2
3
4
5
6
7
8
9
10
11
12
(gdb) bt
#0 stats_fill_be_stats (px=px@entry=0x74fc30, flags=flags@entry=17, stats=stats@entry=0x7328e0 <stats>, len=len@entry=83)
at src/stats.c:1725
#1 0x000000000046d238 in stats_dump_be_stats (si=si@entry=0x7a1a50, px=px@entry=0x74fc30, flags=flags@entry=17)
at src/stats.c:1814
#2 0x000000000046edff in stats_dump_proxy_to_buffer (si=si@entry=0x7a1a50, px=px@entry=0x74fc30, uri=uri@entry=0x751a60)
at src/stats.c:2112
#3 0x000000000046f967 in stats_dump_stat_to_buffer (si=si@entry=0x7a1a50, uri=0x751a60) at src/stats.c:2570
#4 0x000000000046ffe7 in http_stats_io_handler (appctx=0x7a1e00) at src/stats.c:3072
#5 0x00000000004dc208 in applet_run_active () at src/applet.c:66
#6 0x000000000040a3d8 in run_poll_loop () at src/haproxy.c:2194
#7 main (argc=<optimized out>, argv=0x7fffffffddb8) at src/haproxy.c:2700

0x23 监控相关配置

在配置文件中加入下面的内容,然后启动haproxy,这样haproxy就支持状态监控了.

1
2
3
4
5
6
7
8
listen admin_stats
bind 0.0.0.0:1080
mode http #http的7层模式
option httplog #http日志格式
maxconn 10 #最大连接数
stats refresh 30s #自动刷新时间
stats uri /stats #页面url
stats auth admin:admin #设置监控页面的用户和密码:admin

0x24 浏览器运行效果如下

从图中可以看到系统运行的状态.
haproxy stats

0x3 系统设计

参考haproxy的监控模块,设计RssStreamer的监控模块如下图所示.
http service基于st对外提供http服务,st是基于epoll来实现的, 接受浏览器的请求,建立和浏览器之间的tcp连接,state monitor负责收集系统各个模块的运行状态,html generator根据运行状态生成对应的html格式数据,通过tcp把html数据发送给浏览器,浏览器接收到数据以后显示RsStreamer的运行状态.
rsstreamer stats architecture