linux libevent linux库编译怎样管理多个连接

他的最新文章
他的热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)博客访问: 2793731
博文数量: 824
博客积分: 13014
博客等级: 上将
技术积分: 10321
注册时间:
认证徽章:
IT168企业级官微
微信号:IT168qiye
系统架构师大会
微信号:SACC2013
分类: LINUX
一 线程的初始化
&&&& 在进行事件驱动时,每个线程需建立自己的事件根基。由于libevent未提供线程之间通信的方式,我们采用管道来进行线程的通信。同时为方便主线程分配线程,我们还需保留各个线程的id号。因此我们采用如下结构来保留每个线程的有关信息。
typedef&struct&{
&&&&pthread_t&thread_&&&&&&&&//线程ID
&&&&struct&event_base&*base;&&&//事件根基
&&&&struct&event&notify_
&&&&int&notify_receive_&&&
&&&&int&notify_send_&&&&&
&&&&CQ&new_conn_&&&&//连接队列
}&LIBEVENT_THREAD;
2 初始化线程
我们先介绍如何为每个线程创建自己的事件根基。正如前面介绍的我们需自己建立管道来进行线程通信,因此在线程根基初始化后,我们为为管道的读添加事件对象。注意:在libevent中,线程的初始化需在事件根基初始化之后(即event_init之后)。
static&void&setup_thread(LIBEVENT_THREAD&*me)&{
&&&&if&(!&me->base)&
&&&&&&&&me->base&=&event_init();
&&&&&& event_set(&me->notify_event,&me->notify_receive_fd,
&&&&&&&&&&&&&&EV_READ&|&EV_PERSIST,&thread_libevent_process,&me);
&&&& event_base_set(me->base,&&me->notify_event);
&&&& event_add(&me->notify_event,&0)&==&-1)
&&&&cq_init(&me->new_conn_queue);
& 将主线程存储于结构体的第0个对象中,然后为每个线程创建管道并创建事件根基
threads&=&malloc(sizeof(LIBEVENT_THREAD)&*&nthreads);
threads[0].base&=&main_
threads[0].thread_id&=&pthread_self();
for&(i&=&0;&i&<&&i++)&{
int&fds[2];
threads[i].notify_receive_fd&=&fds[0];
threads[i].notify_send_fd&=&fds[1];
setup_thread(&threads[i]);
接下来从主线程中创建线程,线程创建成功后激活该线程的事件根基,进入事件循环。
for&(i&=&1;&i&<&&i++)&{
&&&&pthread_t&
&&&&pthread_attr_t&
&&&pthread_attr_init(&attr);
&&&&&&pthread_create(&thread,&&attr,&worker_libevent,&&threads[i])
二 分发连接
&&& 现在我们有了多个线程和事件根基,那么我们应该如何将主线程接收到的连接分发给各个线程并将其激活呢?本例子中线程的选择采用最简单的轮询方式。
我们需要对accept_handle函数进行修改。在线程池模型中,我们使用子线程代替主线程来创建缓冲区事件对象,主线程只是对激活选中的线程。我们通过向管道写入一个空字节来激活该管道的读请求。
thread += 1;
item->sfd = sfd
cq_push(&threads[thread].new_conn_queue, item);
write(threads[thread].notify_send_fd, "", 1)
三 处理请求
管道的读请求就绪后,回调函数thread_libevent_process被调用,此时就进入到了线程中,后面所有的调度都将在该线程中进行。
首先从管道中读取1个字节,然后创建事件缓冲区对象来实际处理请求。
static void thread_libevent_process(int fd, short which, void *arg) {
char buf[1];
read(fd, buf, 1)
&&& 到这里我们的程序就可以使用线程池和libevent事件驱动来协同工作了。在实际的服务中,我们通常要将服务作为守护进程来运行,那么在linux中,如何编写守护进程呢?后面在继续学习如何编写linux守护进程。
阅读(7244) | 评论(0) | 转发(0) |
相关热门文章
给主人留下些什么吧!~~
请登录后评论。查看:1791|回复:2
助理工程师
大家好,最近小弟所在公司要我做一个任务,用来测试公司主要的服务器的承载能力,已经快结束了,不过还有待优化.
我用了epoll和Libevent两种方法,epoll的就不贴出来了(还没有完善).
想优化一下Libevent方法来实现的程序,还请各位指点
任务概述 (“我”代表客户端):
1. 首先创建N个(N可以用config库来配置)BufferEvent,每一个BufferEvent绑定一个Socket,连到服务器的相同端口
2. 每个BufferEvent有一个 “读Callback” 和一个 “事件Callback”
3. 当 事件 BEV_EVENT_CONNECTED 发生时(表示连接socket成功),在 “事件Callback” 中发送 APPLICATION_HELLO包,服务器收到这个包之后,会立即发还一个 指令 (20几个字节的数据)
4. 当服务器发送 指令之后,收到指令(用 “读Callback” 判断)后,接连顺序发送三个包:&&APPLICATION_COMMAND_RESULT, APPLICATION_GIS_DATA和APPLICATION_END
5. 一旦服务器收到 APPLICATION_END 这个包,服务器会关闭(close)他那端的socket,然后我会接收到 事件 BEV_EVENT_EOF,我就调用 bufferevent_free() 来释放资源,并关闭客户端socket.
6. 之后,重新创建socket,再重新创建BufferEvent绑定Socket,连接服务器,每个BufferEvent有一个 读Callback 和一个 事件Callback
7. 对每一个设备来说,第3步到第6步会一直循环执行.&&以期达到模拟N台设备同时且一直向服务器发送四个包(APPLICATION_HELLO,APPLICATION_COMMAND_RESULT, APPLICATION_GIS_DATA和APPLICATION_END),以测试服务器能够承载多少设备同时向其发送数据.
附上主体部分代码:#include &event2/event.h&
#include &event2/bufferevent.h&
#include &event2/buffer.h&
struct MyPckt {
&&struct KprotoPckt KprotoP
&&evutil_socket_t& &socket_
&&uint32_t& && && &
}__attribute__ ((packed));
uint8_t main(){
&&// 测试运行时间
&&runtime.tv_sec = DeviceR
&&runtime.tv_usec = 0;
&&struct hostent *
&&struct bufferevent *bev[SimulationNumberOfDevices];
&&// Main structure array
&&struct MyPckt MyPckt[SimulationNumberOfDevices];
&&// 创建N个socket
&&for(i=0; i&SimulationNumberOfD i++){
& & if((MyPckt[i].socket_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
& && &printf(&Could not create socket for device %d\n&, i);
& && &perror(&Can not create socket&);
& & evutil_make_socket_nonblocking(MyPckt[i].socket_fd);&&// 设置socket为非阻塞
&&// 创建一个event_base
&&evbase = event_base_new();
&&if(!evbase){
& & perror(&Could not open event base!&);
&&// 服务器设置
&&if((he=gethostbyname(TargetServer))==NULL){
& & fprintf(stderr, &Can't resolve \&%s\&.\n&, TargetServer);
&&memcpy(&mySockAddr_in.sin_addr, he-&h_addr_list[0], he-&h_length);
&&mySockAddr_in.sin_family = AF_INET;
&&mySockAddr_in.sin_port = htons(TargetPort);
&&// 主体FOR循环
&&for(i=0; i&SimulationNumberOfD i++){
& & // 声称设备的序列号
& & DeviceMakeNumber(MyPckt[i].KprotoPckt.KprotoPcktHeader.Serial,DeviceVariant,DeviceBaseNumber,i);
& & MyPckt[i].KprotoPckt.Pad=0;
& & MyPckt[i].i =
& & // 基本的设置,callback设置
& & bev[i] = bufferevent_socket_new(evbase, MyPckt[i].socket_fd, BEV_OPT_CLOSE_ON_FREE);
& & bufferevent_setcb(bev[i], buff_input_cb, NULL, buff_ev_cb, &MyPckt[i]);
& & bufferevent_setwatermark(bev[i], EV_READ, DeviceReadLowWaterMark, DeviceReadHighWaterMark);
& & bufferevent_enable(bev[i], EV_READ | EV_WRITE);
& & //bufferevent_enable(bev[i], EV_READ | EV_WRITE | EV_PERSIST);
& & // 将每个bufferevent连接到服务器
& & if(bufferevent_socket_connect(bev[i],(struct sockaddr *)&mySockAddr_in, sizeof(struct sockaddr_in)) & 0) {
& && &bufferevent_free(bev[i]);
& && &printf(&error connecting buffer event %d\n&, i);
&&printf(&FOR 循环结束 开始测试服务器!!\n&);
&&event_base_loopexit(evbase, &runtime);
&&event_base_dispatch(evbase);
&&printf(&测试停止!!\n&);
/**\brief Libevent read callback
void buff_input_cb(struct bufferevent *bev, void *ctx){
&&uint32_t deviceNb = (*(struct MyPckt *)ctx).i;
&&/** 发送3个包 :
& &* APPLICATION_COMMAND_RESULT
& &* APPLICATION_GIS_DATA
& &* APPLICATION_END
&&// 发送 APPLICATION_COMMAND_RESULT 包
&&if(KpManagement(bev, KP_COMMAND_RESULT, (struct MyPckt *)ctx)){
& & // 发送 APPLICATION_GIS_DATA 包,其内包含十多个地理位置GPS数据
& & if(KpManagement(bev, KP_GIS_DATA, (struct MyPckt *)ctx)){
& && &// 发送 APPLICATION_END 包
& && &if(!KpManagement(bev, KP_END, (struct MyPckt *)ctx)){
& && &&&printf(&Can not send KP_END for device %d\n&, deviceNb);
& && &printf(&Can not send KP_GIS_DATA for device %d\n&, deviceNb);
& & printf(&Can not send KP_COMMAND_RESULT for device %d\n&, deviceNb);
/**\brief Libevent event callback
void buff_ev_cb(struct bufferevent *bev, short events, void *ctx){
&&uint32_t deviceNb = (*(struct MyPckt *)ctx).i;
&&if(events & BEV_EVENT_CONNECTED){
& & //printf(&服务器已连接 设备号 %d\n&, deviceNb);
& & // 发送 APPLICATION_HELLO包
& & if(!KpManagement(bev, KP_HELLO, (struct MyPckt *)ctx)){
& && &printf(&Can not send KP_HELLO for device %d\n&, deviceNb);
&&// 如果服务器关闭了那端的socket
&&else if(events & BEV_EVENT_EOF){
& & bufferevent_free(bev);
& & // 重新创建 socket,将其绑定到 eventbuffer上
& & if(((*(struct MyPckt *)ctx).socket_fd=socket(AF_INET, SOCK_STREAM, 0))==-1)
& && &printf(&Could not re-create socket for device %d\n&, deviceNb);
& & evutil_make_socket_nonblocking((*(struct MyPckt *)ctx).socket_fd);
& & bev = bufferevent_socket_new(evbase, (*(struct MyPckt *)ctx).socket_fd, BEV_OPT_CLOSE_ON_FREE);
& & bufferevent_setcb(bev, buff_input_cb, NULL, buff_ev_cb, (struct MyPckt *)ctx);
& & bufferevent_setwatermark(bev, EV_READ, DeviceReadLowWaterMark, DeviceReadHighWaterMark);
& & bufferevent_enable(bev, EV_READ | EV_WRITE);
& & //bufferevent_enable(bev, EV_READ | EV_WRITE | EV_PERSIST);
& & // 重新连接 bufferevent 到服务器
& & if(bufferevent_socket_connect(bev,(struct sockaddr *)&mySockAddr_in, sizeof(struct sockaddr_in)) & 0) {
& && &bufferevent_free(bev);
& && &printf(&error connecting buffer event %d\n&, deviceNb);
&&else if(events & BEV_EVENT_ERROR){
& & bufferevent_free(bev);
& & printf(&Connect server error for device %d, fd %d\n&, deviceNb, (*(struct MyPckt *)ctx).socket_fd);
& & perror(&BEV_EVENT_ERROR&);
[i][i][i][i][i][i][i][i][i][i][i][i][i][i]目前有时会出现 Connection reset by peer 错误,应该就是说 “我的新的socket还没生成,就又被用来连接服务器了”.
所以目前我的程序一开始(假如我设定N为1000)是可以有1000个左右socket连接上的,但是其实并不能并发执行发送包的任务,而且运行了几十秒之后,连接的socket数目会降低到几十个.
我的初衷是想让N个设备并行执行发送四个包的任务,并且一直保持N个设备同时运行(虽然数目会变动).
1. 怎么解决Connection reset by peer 错误呢?
2. 我的目的能够用Libevent达到吗?
3. 我觉得程序肯定还有改进的余地,有没有好的建议?
4. 不知道用多线程可以实现我的测试目的吗?多线程的版本我也写了,不过目前没办法并发运行,就是只有一个线程在运行,其他在等待. 怎么能够实现多线程一起发送呢?
万分感谢!
版规,发帖可获2无忧币
本帖最后由 frogoscar 于
23:32 编辑
没搞过这个
不知道这个会不会对你有帮助
助理工程师
引用:原帖由 向立天 于
10:42 发表
没搞过这个
不知道这个会不会对你有帮助
http://www.360doc.com/content/13/76.shtml 谢谢
这个我看过啦,已经
版规,回帖可获2无忧币他的最新文章
他的热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)新手园地& & & 硬件问题Linux系统管理Linux网络问题Linux环境编程Linux桌面系统国产LinuxBSD& & & BSD文档中心AIX& & & 新手入门& & & AIX文档中心& & & 资源下载& & & Power高级应用& & & IBM存储AS400Solaris& & & Solaris文档中心HP-UX& & & HP文档中心SCO UNIX& & & SCO文档中心互操作专区IRIXTru64 UNIXMac OS X门户网站运维集群和高可用服务器应用监控和防护虚拟化技术架构设计行业应用和管理服务器及硬件技术& & & 服务器资源下载云计算& & & 云计算文档中心& & & 云计算业界& & & 云计算资源下载存储备份& & & 存储文档中心& & & 存储业界& & & 存储资源下载& & & Symantec技术交流区安全技术网络技术& & & 网络技术文档中心C/C++& & & GUI编程& & & Functional编程内核源码& & & 内核问题移动开发& & & 移动开发技术资料ShellPerlJava& & & Java文档中心PHP& & & php文档中心Python& & & Python文档中心RubyCPU与编译器嵌入式开发驱动开发Web开发VoIP开发技术MySQL& & & MySQL文档中心SybaseOraclePostgreSQLDB2Informix数据仓库与数据挖掘NoSQL技术IT业界新闻与评论IT职业生涯& & & 猎头招聘IT图书与评论& & & CU技术图书大系& & & Linux书友会二手交易下载共享Linux文档专区IT培训与认证& & & 培训交流& & & 认证培训清茶斋投资理财运动地带快乐数码摄影& & & 摄影器材& & & 摄影比赛专区IT爱车族旅游天下站务交流版主会议室博客SNS站务交流区CU活动专区& & & Power活动专区& & & 拍卖交流区频道交流区
稍有积蓄, 积分 240, 距离下一级还需 260 积分
论坛徽章:0
有几个疑问
1 纯粹用libevent开发服务器,一个线程,是不是达不到高并发。
2 多线程的话,一个线程一个连接,如果再配合libevent怎么去做,感觉理解起来很费劲。是在每个线程里增加event事件?
3 线程池和epool哪个效率更高?
稍有积蓄, 积分 240, 距离下一级还需 260 积分
论坛徽章:0
哦,看了sky的帖子,大概知道了一些,未必需要每个连接一个线程,可以一个线程处理n个连接。多谢sky
白手起家, 积分 69, 距离下一级还需 131 积分
论坛徽章:0
1、可以,在主线程就是libevent的线程里派发网络事件,收包切割后丢到其他线程来处理。
2、你缺乏这方面的概念,建议多看看。
3、这两东东牛头不矛盾,可以混用的。
丰衣足食, 积分 709, 距离下一级还需 291 积分
论坛徽章:4
我一直不喜欢在linux下做多线程,原因太多了,但是又需要实现多线程的功能,最后找到了一个大神器libpth,实现进程内代码切换
算是个用户级线程吧,性能好多了。
巨富豪门, 积分 26638, 距离下一级还需 13362 积分
论坛徽章:14
weishuo1999 发表于
我一直不喜欢在linux下做多线程,原因太多了,但是又需要实现多线程的功能,最后找到了一个大神器libpth,实 ...
linux的多线程,还可以吧。就是线程调度需要3-7微秒,太慢了,严重影响并发能力。
丰衣足食, 积分 709, 距离下一级还需 291 积分
论坛徽章:4
不喜欢线程的原因有以下几点
1.调试复杂,当然大牛不在乎这个问题,但是实际项目组的人不可能人人是大牛
2.调度并没有多大优势,线程每秒调度在几十w吧,进程也能达到十几w
3.最怕就是某个开发一下子在自己的进程里面,开了几千个线程,外面看不出来,把机器坑死了
4.线程不如进程可靠,一个线程除了bug,整个进程都挂了,进程相对独立,容灾好一些
PS:对于大牛来说这都不是事回复
巨富豪门, 积分 22811, 距离下一级还需 17189 积分
论坛徽章:15
weishuo1999 发表于
不喜欢线程的原因有以下几点
1.调试复杂,当然大牛不在乎这个问题,但是实际项目组的人不可能人人是大牛
很有道理,有同感,不过这个神器没用过,有机会研究看看。
稍有积蓄, 积分 240, 距离下一级还需 260 积分
论坛徽章:0
我怎么听说libevent更难 比多线程还难
家境小康, 积分 1877, 距离下一级还需 123 积分
论坛徽章:1
&呵呵....&
1、你能测试出线程的调度时间???
2、测量到调度时间的单位为:微秒 ??? 如果你是用gettimeofday,time等函数测试的话,那么不可能精确到微妙的。 即使是rdtsc指令,也有误差。
& & google :CLOCK_TICK_RATE,应该是这个,不太记得了。
3、3-7微妙,就对高并发量影响这么大了? 那你还能写代码吗? 随便的一个系统调用都是这个时间的几十、几百倍以上。

我要回帖

更多关于 libevent linux库编译 的文章

 

随机推荐