Linux下C++轻量级Web服务器配套详解。
原文链接:
https://mp.weixin.qq.com/mp/appmsgalbum?__biz=MzAxNzU2MzcwMw==&action=getalbum&album_id=1339230165934882817&scene=173&from_msgid=2649274431&from_itemidx=1&count=3&nolastread=1#wechat_redirect
WebServer学习笔记
00 改进
- 1.配置文件的解析改用json文件,相比于原来更加简单方便。
- 2.write文件的操作由mmap改用sendfile实现真正的零拷贝。
- 3.定时器容器设计为最小堆。
- 4.如果数据库太大怎么办(使用布隆过滤器,使用多级哈希)
服务器编程基本框架
虽然服务器程序种类繁多,但其基本框架都一样,不同之处在于逻辑处理。
模块 | 功能 |
---|---|
I/O 处理单元 | 处理客户连接,读写网络数据 |
逻辑单元 | 业务进程或线程(一般一个线程去处理一个客户的请求) |
网络存储单元 | 数据库、文件或缓存 |
请求队列 | 各单元之间的通信方式 |
I/O 处理单元是服务器管理客户连接的模块。它通常要完成以下工作:等待并接受新的客户连接,接收客户数据,将服务器响应数据返回给客户端。但是数据的收发不一定在 I/O 处理单元中执行,也可能在逻辑单元中执行,具体在何处执行取决于事件处理模式。
一个逻辑单元通常是一个进程或线程。它分析并处理客户数据,然后将结果传递给 I/O 处理单元或者直接发送给客户端(具体使用哪种方式取决于事件处理模式)。服务器通常拥有多个逻辑单元,以实现对多个客户任务的并发处理。
网络存储单元可以是数据库、缓存和文件,但不是必须的。
请求队列是各单元之间的通信方式的抽象。I/O 处理单元接收到客户请求时,需要以某种方式通知一个逻辑单元来处理该请求。同样,多个逻辑单元同时访问一个存储单元时,也需要采用某种机制来协调处理竞态条件。请求队列通常被实现为池的一部分。
01 线程同步机制封装类
1.1 RAII
- RAII全称是“Resource Acquisition is Initialization”,直译过来是“资源获取即初始化”.
- 在构造函数中申请分配资源,在析构函数中释放资源。因为C++的语言机制保证了,当一个对象创建的时候,自动调用构造函数,当对象超出作用域的时候会自动调用析构函数。所以,在RAII的指导下,我们应该使用类来管理资源,将资源和对象的生命周期绑定
- RAII的核心思想是将资源或者状态与对象的生命周期绑定,通过C++的语言机制,实现资源和状态的安全管理,智能指针是RAII最好的例子
1.2 信号量
信号量是一种特殊的变量,它只能取自然数值并且只支持两种操作:等待(P)和信号(V).假设有信号量SV,对其的P、V操作如下:
信号量的取值可以是任何自然数,最常用的,最简单的信号量是二进制信号量,只有0和1两个值.
sem_init
函数用于初始化一个未命名的信号量sem_destory
函数用于销毁信号量sem_wait
函数将以原子操作方式将信号量减一,信号量为0时,sem_wait
阻塞sem_post
函数以原子操作方式将信号量加一,信号量大于0时,唤醒调用sem_post
的线程
以上,成功返回0,失败返回errno
1.3 条件变量
条件变量提供了一种线程间的通知机制,当某个共享数据达到某个值时,唤醒等待这个共享数据的线程.
pthread_cond_init
函数用于初始化条件变量pthread_cond_destory
函数销毁条件变量pthread_cond_broadcast
函数以广播的方式唤醒所有等待目标条件变量的线程pthread_cond_wait
函数用于等待目标条件变量.该函数调用时需要传入 mutex参数(加锁的互斥锁) ,函数执行时,先把调用线程放入条件变量的请求队列,然后将互斥锁mutex解锁,当函数成功返回为0时,互斥锁会再次被锁上. 也就是说函数内部会有一次解锁和加锁操作.
1.4 锁机制
- 实现多线程同步,通过锁机制,确保任一时刻只能有一个线程能进入关键代码段.
1.5 封装
- 类中主要是Linux下三种锁进行封装,将锁的创建于销毁函数封装在类的构造与析构函数中,实现RAII机制
1.6 locker.h代码
#ifndef LOCKER_H
#define LOCKER_H
#include <exception> //异常
#include <pthread.h> //线程,锁
#include <semaphore.h> //信号量
//信号量类
class sem{
public:
sem(){ //默认构造函数
if (sem_init(&m_sem, 0, 0) != 0){
throw std::exception();
}
}
sem(int num){ //含参构造函数
if (sem_init(&m_sem, 0, num) != 0){
throw std::exception();
}
}
~sem(){ //析构函数
sem_destroy(&m_sem);
}
bool wait(){ //从信号量的值减去一个“1”,但它永远会先等待该信号量为一个非零值(大于0)才开始做减法。
return sem_wait(&m_sem) == 0;
}
bool post(){ //功能:它的作用来增加信号量的值。给信号量加1
return sem_post(&m_sem) == 0;
}
private:
sem_t m_sem;
};
//互斥锁类
class locker{
public:
locker(){ //默认构造函数,初始化互斥锁
if (pthread_mutex_init(&m_mutex, NULL) != 0){
throw std::exception();
}
}
~locker(){ //默认析构函数,释放互斥锁
pthread_mutex_destroy(&m_mutex);
}
bool lock(){//加锁
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock(){//解锁
return pthread_mutex_unlock(&m_mutex) == 0;
}
pthread_mutex_t *get(){//获取指向该锁地址的指针
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
//条件变量类. 条件变量不是锁,可以由某个条件阻塞/解除阻塞线程
class cond{
public:
cond(){
if (pthread_cond_init(&m_cond, NULL) != 0){
//pthread_mutex_destroy(&m_mutex);
throw std::exception();
}
}
~cond(){
pthread_cond_destroy(&m_cond);
}
bool wait(pthread_mutex_t *m_mutex){ //等待,调用了该函数,线程会阻塞.调用线程必须锁定mutex,否则会产生未定义行为。
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_wait(&m_cond, m_mutex);//当这个函数调用阻塞的时候,会对互斥锁进行解锁,当不阻塞的,继续向下执行,会重新加锁。
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
//等待多长时间,调用了该函数,线程会阻塞.调用线程必须锁定mutex,否则会产生未定义行为。
bool timewait(pthread_mutex_t *m_mutex, struct timespec t){
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool signal(){//唤醒一个或者多个等待的线程
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast() {//唤醒所有的等待的线程
return pthread_cond_broadcast(&m_cond) == 0;
}
private:
//static pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif
02 半同步/半反应堆线程池
2.1 五种I/O模型
- 阻塞IO:调用者调用了某个函数,等待这个函数返回,期间什么也不做,不停的去检查这个函数有没有返回,必须等这个函数返回才能进行下一步动作
- 非阻塞IO:非阻塞等待,每隔一段时间就去检测IO事件是否就绪。没有就绪就可以做其他事。非阻塞I/O执行系统调用总是立即返回,不管时间是否已经发生,若时间没有发生,则返回-1,此时可以根据errno区分这两种情况,对于accept,recv和send,事件未发生时,errno通常被设置成eagain
- 信号驱动IO:linux用套接口进行信号驱动IO,安装一个信号处理函数,进程继续运行并不阻塞,当IO时间就绪,进程收到SIGIO信号。然后处理IO事件。
- IO复用:linux用select/poll函数实现IO复用模型,这两个函数也会使进程阻塞,但是和阻塞IO所不同的是这两个函数可以同时阻塞多个IO操作。而且可以同时对多个读操作、写操作的IO函数进行检测。知道有数据可读或可写时,才真正调用IO操作函数
- 异步IO:linux中,可以调用aio_read函数告诉内核描述字缓冲区指针和缓冲区的大小、文件偏移及通知的方式,然后立即返回,当内核将数据拷贝到缓冲区后,再通知应用程序。
注意:阻塞I/O,非阻塞I/O,信号驱动I/O和I/O复用都是同步I/O。同步I/O指内核向应用程序通知的是就绪事件,比如只通知有客户端连接,要求用户代码自行执行I/O操作,异步I/O是指内核向应用程序通知的是完成事件,比如读取客户端的数据后才通知应用程序,由内核完成I/O操作。
2.2 事件处理模式
reactor模式中,主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话立即通知工作线程(逻辑单元 ),读写数据、接受新连接及处理客户请求均在工作线程中完成。通常由同步I/O实现。
proactor模式中,主线程和内核负责处理读写数据、接受新连接等I/O操作,工作线程仅负责业务逻辑,如处理客户请求。通常由异步I/O实现。
同步I/O模拟proactor模式
由于异步I/O并不成熟,实际中使用较少,这里将使用同步I/O模拟实现proactor模式。
同步I/O模型的工作流程如下(epoll_wait为例):
- 主线程往epoll内核事件表注册socket上的读就绪事件。
- 主线程调用epoll_wait等待socket上有数据可读
- 当socket上有数据可读,epoll_wait通知主线程,主线程从socket循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列。
- 睡眠在请求队列上某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪事件
- 主线程调用epoll_wait等待socket可写。
- 当socket上有数据可写,epoll_wait通知主线程。主线程往socket上写入服务器处理客户请求的结果。
2.3 并发编程模式
并发编程方法的实现有多线程和多进程两种,但这里涉及的并发模式指I/O处理单元与逻辑单元的协同完成任务的方法。
- 半同步/半异步模式
- 领导者/追随者模式
2.4 半同步/半反应堆
半同步/半反应堆并发模式是半同步/半异步的变体,将半异步具体化为某种事件处理模式.
并发模式中的同步和异步
- 同步指的是程序完全按照代码序列的顺序执行
- 异步指的是程序的执行需要由系统事件驱动
半同步/半异步模式工作流程
- 同步线程用于处理客户逻辑
- 异步线程用于处理I/O事件
- 异步线程监听到客户请求后,就将其封装成请求对象并插入请求队列中
- 请求队列将通知某个工作在同步模式的工作线程来读取并处理该请求对象
半同步/半反应堆工作流程(以Proactor模式为例)
- 主线程充当异步线程,负责监听所有socket上的事件
- 若有新请求到来,主线程接收之以得到新的连接socket,然后往epoll内核事件表中注册该socket上的读写事件
- 如果连接socket上有读写事件发生,主线程从socket上接收数据,并将数据封装成请求对象插入到请求队列中
- 所有工作线程睡眠在请求队列上,当有任务到来时,通过竞争(如互斥锁)获得任务的接管权
2.4 线程池
- 空间换时间,浪费服务器的硬件资源,换取运行效率.
- 池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源.
- 当服务器进入正式运行阶段,开始处理客户请求的时候,如果它需要相关的资源,可以直接从池中获取,无需动态分配.
- 当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用释放资源.
2.5 基础知识
2.5.1 静态成员变量
将类成员变量声明为static,则为静态成员变量,与一般的成员变量不同,无论建立多少对象,都只有一个静态成员变量的拷贝,静态成员变量属于一个类,所有对象共享。
静态变量在编译阶段就分配了空间,对象还没创建时就已经分配了空间,放到全局静态区。
静态成员变量
- 最好是类内声明,类外初始化(以免类名访问静态成员访问不到)。
- 无论公有,私有,静态成员都可以在类外定义,但私有成员仍有访问权限。
- 非静态成员类外不能初始化。
- 静态成员数据是共享的。
- 最好是类内声明,类外初始化(以免类名访问静态成员访问不到)。
2.5.2 静态成员函数
将类成员函数声明为static,则为静态成员函数。
静态成员函数
- 静态成员函数可以直接访问静态成员变量,不能直接访问普通成员变量,但可以通过参数传递的方式访问。
- 普通成员函数可以访问普通成员变量,也可以访问静态成员变量。
- 静态成员函数没有this指针。非静态数据成员为对象单独维护,但静态成员函数为共享函数,无法区分是哪个对象,因此不能直接访问普通变量成员,也没有this指针。
- 静态成员函数可以直接访问静态成员变量,不能直接访问普通成员变量,但可以通过参数传递的方式访问。
2.5.3 pthread_create陷阱
首先看一下该函数的函数原型。
1#include <pthread.h>
2int pthread_create (pthread_t *thread_tid, //返回新生成的线程的id
3 const pthread_attr_t *attr, //指向线程属性的指针,通常设置为NULL
4 void * (*start_routine) (void *), //处理线程函数的地址
5 void *arg); //start_routine()中的参数
函数原型中的第三个参数,为函数指针,指向处理线程函数的地址。该函数,要求为静态函数。如果处理线程函数为类成员函数时,需要将其设置为静态成员函数。
2.5.4 this指针的锅
pthread_create的函数原型中第三个参数的类型为函数指针,指向的线程处理函数参数类型为(void *)
,若线程函数为类成员函数,则this指针会作为默认的参数被传进函数中,从而和线程函数参数(void*)
不能匹配,不能通过编译。
静态成员函数就没有这个问题,里面没有this指针。
2.6 线程池分析
线程池的设计模式为半同步/半反应堆,其中反应堆具体为Proactor事件处理模式。
具体的,主线程为异步线程,负责监听文件描述符,接收socket新连接,若当前监听的socket发生了读写事件,然后将任务插入到请求队列。工作线程从请求队列中取出任务,完成读写数据的处理。
2.6.1 线程池类定义
具体定义可以看代码。需要注意,线程处理函数和运行函数设置为私有属性。
2.6.2 线程池创建与回收
构造函数中创建线程池,pthread_create函数中将类的对象作为参数传递给静态函数(worker),在静态函数中引用这个对象,并调用其动态方法(run)。
具体的,类对象传递时用this指针,传递给静态函数后,将其转换为线程池类,并调用私有成员函数run。
2.6.3 向请求队列中添加任务
通过list容器创建请求队列,向队列中添加时,通过互斥锁保证线程安全,添加完成后通过信号量提醒有任务要处理,最后注意线程同步。
2.6.4 线程处理函数
内部访问私有成员函数run,完成线程处理要求。
2.6.5run执行任务
主要实现,工作线程从请求队列中取出某个任务进行处理,注意线程同步。
template<typename T>
void threadpool<T>::run(){
while(!m_stop){
//信号量等待
m_queuestat.wait();
//被唤醒后先加互斥锁
m_queuelocker.lock();
if(m_workqueue.empty()){
m_queuelocker.unlock();
continue;
}
//从请求队列中取出第一个任务
//将任务从请求队列删除
T* request=m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if(!request)
continue;
//从连接池中取出一个数据库连接
request->mysql = m_connPool->GetConnection();
//process(模板类中的方法,这里是http类)进行处理
request->process();
//将数据库连接放回连接池
m_connPool->ReleaseConnection(request->mysql);
}
}
2.6.6 线程池概述
使用一个链表实现的工作队列完全解除了主线程和工作线程的耦合关系:主线程往工作队列中插入任务,工作线程通过竞争来取得任务并执行它。
- 同步I/O模拟proactor模式
- 半同步/半反应堆
该项目使用线程池(半同步半反应堆模式)并发处理用户请求,主线程负责读写,工作线程(线程池中的线程)负责处理逻辑(HTTP请求报文的解析等等)。通过之前的代码,我们将listenfd
上到达的connection
通过 accept()
接收,并返回一个新的socket文件描述符connfd
用于和用户通信,并对用户请求返回响应,同时将这个connfd
注册到内核事件表中,等用户发来请求报文。这个过程是:
通过epoll_wait
发现这个connfd
上有可读事件了(EPOLLIN
),主线程就将这个HTTP的请求报文读进这个连接socket的读缓存中users[sockfd].read()
,然后将该任务对象(指针)插入线程池的请求队列中pool->append(users + sockfd);
,线程池的实现还需要依靠锁机制以及信号量机制来实现线程同步,保证操作的原子性。
在线程池部分做几点解释,然后大家去看代码的时候就更容易看懂了:
- 所谓线程池,就是一个
pthread_t
类型的普通数组,通过pthread_create()
函数创建m_thread_number
个线程,用来执行worker()
函数以执行每个请求处理函数(HTTP请求的process
函数),通过pthread_detach()
将线程设置成脱离态(detached)后,当这一线程运行结束时,它的资源会被系统自动回收,而不再需要在其它线程中对其进行pthread_join()
操作。 - 操作工作队列一定要加锁(
locker
),因为它被所有线程共享。 - 我们用信号量来标识请求队列中的请求数,通过
m_queuestat.wait();
来等待一个请求队列中待处理的HTTP请求,然后交给线程池中的空闲线程来处理。
为什么要使用线程池?
当你需要限制你应用程序中同时运行的线程数时,线程池非常有用。因为启动一个新线程会带来性能开销,每个线程也会为其堆栈分配一些内存等。为了任务的并发执行,我们可以将这些任务任务传递到线程池,而不是为每个任务动态开启一个新的线程。
⭐线程池中的线程数量是依据什么确定的?
线程池中的线程数量最直接的限制因素是中央处理器(CPU)的处理器(processors/cores)的数量N
:如果你的CPU是4-cores的,对于CPU密集型的任务(如视频剪辑等消耗CPU计算资源的任务)来说,那线程池中的线程数量最好也设置为4(或者+1防止其他因素造成的线程阻塞);对于IO密集型的任务,一般要多于CPU的核数,因为线程间竞争的不是CPU的计算资源而是IO,IO的处理一般较慢,多于cores数的线程将为CPU争取更多的任务,不至在线程处理IO的过程造成CPU空闲导致资源浪费,公式:最佳线程数 = CPU当前可使用的Cores数 * 当前CPU的利用率 * (1 + CPU等待时间 / CPU处理时间)
(还有回答里面提到的Amdahl准则可以了解一下)
2.7 threadpool.h代码
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "../lock/locker.h"
#include "../CGImysql/sql_connection_pool.h"
// 线程池类,将它定义为模板类是为了代码复用,模板参数T是任务类
template <typename T>
class threadpool{
public:
/*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
~threadpool();
bool append(T *request, int state);//往工作队列添加任务
bool append_p(T *request);
private:
/*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
static void *worker(void *arg); //worker——线程执行的函数(必须是静态的函数),主要就是启动run()函数用的
void run();
private:
int m_thread_number; //线程池中的线程数量
int m_max_requests; //请求队列中允许的最大请求数
pthread_t *m_threads; //描述线程池的数组,其大小为m_thread_number
std::list<T *> m_workqueue; //请求队列
locker m_queuelocker; //保护请求队列的互斥锁
sem m_queuestat; //是否有任务需要处理
connection_pool *m_connPool;//数据库
int m_actor_model; //线程工作模式
};
//构造函数初始化。actor_model工作模式,connPool数据库连接池,thread_number线程数量,max_requests请求队列中允许的最大请求数
template <typename T>
threadpool<T>::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) :
m_actor_model(actor_model),m_thread_number(thread_number),
m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool){
if (thread_number <= 0 || max_requests <= 0)
throw std::exception();
m_threads = new pthread_t[m_thread_number]; //new创建的线程池的数组,到时候析构的时候要记得释放资源
if (!m_threads)
throw std::exception();
// 创建thread_number 个线程,并将他们设置为脱离线程。m_threads+i指针表示线程号
for (int i = 0; i < thread_number; ++i){
if (pthread_create(m_threads + i, NULL, worker, this) != 0){ //worker线程执行的函数必须是静态的函数,this是传给work的参数
delete[] m_threads;
throw std::exception();
}
if (pthread_detach(m_threads[i])){
delete[] m_threads;
throw std::exception();
}
}
}
//析构函数释放资源
template <typename T>
threadpool<T>::~threadpool(){
delete[] m_threads;
}
//往队列中添加任务,request需要添加的任务,state该任务的状态
template <typename T>
bool threadpool<T>::append(T *request, int state){
m_queuelocker.lock();
if (m_workqueue.size() >= m_max_requests){
m_queuelocker.unlock();
return false;
}
request->m_state = state;
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post(); //信号量加1
return true;
}
template <typename T>
bool threadpool<T>::append_p(T *request){
m_queuelocker.lock();
if (m_workqueue.size() >= m_max_requests){
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post(); //信号量加1
return true;
}
template <typename T>
void *threadpool<T>::worker(void *arg){
threadpool *pool = (threadpool *)arg; //拿到这个threadpool类,可以操作里面的所有成员
pool->run(); //去执行任务
return pool;
}
template <typename T>
void threadpool<T>::run(){
while (true){
m_queuestat.wait(); //如果这个值大于0,说明有任务可执行,取出一个任务,信号量减1。如果值为0就会阻塞等待。
m_queuelocker.lock(); //被唤醒后先加互斥锁
if (m_workqueue.empty()){ //没有数据解锁 continue
m_queuelocker.unlock();
continue;
}
T *request = m_workqueue.front();//获取任务
m_workqueue.pop_front(); //删除队列中这个以获取的任务
m_queuelocker.unlock();
if (!request) //如果这个取出的任务为空,就continue,request就是http_conn这个类
continue;
if (1 == m_actor_model){
if (0 == request->m_state){ //m_state=0表示需要读入数据
if (request->read_once()){
request->improv = 1;
connectionRAII mysqlcon(&request->mysql, m_connPool); //从连接池中取出一个数据库连接到if{}结束就会放回去和智能指针管理类似
request->process(); //由线程池中的工作线程调用,这是处理HTTP请求的入口函数
}
else{
request->improv = 1;
request->timer_flag = 1;
}
}
else{ //m_state=1表示需要写入数据
if (request->write()){
request->improv = 1;
}
else{
request->improv = 1;
request->timer_flag = 1;
}
}
}
else{
connectionRAII mysqlcon(&request->mysql, m_connPool);//从连接池中取出一个数据库连接,将数据库连接放回连接池
request->process();
}
}
}
#endif
04 http连接处理(上)
4.1 epoll
epoll涉及的知识较多,这里仅对API和基础知识作介绍。更多资料请查阅资料,或查阅游双的Linux高性能服务器编程 第9章 I/O复用
epoll_create函数
int epoll_create(int size);
- 参数:
size : 目前没有意义了。为了兼容之前的版本。现在随便写一个数,必须大于0
- 返回值:
-1 : 失败
> 0 : 文件描述符,操作epoll实例的
epoll_ctl函数
// 对epoll实例进行管理:添加文件描述符信息,删除信息,修改信息
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
- 参数:
- epfd : epoll实例对应的文件描述符(就是这个epoll_create()的返回值)
- op : 要进行什么操作 (就是对底层的红黑树进行修改)
EPOLL_CTL_ADD: 添加
EPOLL_CTL_MOD: 修改
EPOLL_CTL_DEL: 删除
- fd : 要检测的文件描述符
- event : 告诉内核检测文件描述符什么事情
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */ //用户的数据信息
};
events描述事件类型,其中epoll事件类型有以下几种
EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
EPOLLOUT:表示对应的文件描述符可以写
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
EPOLLERR:表示对应的文件描述符发生错误
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
epoll_wait函数 该函数用于等待所监控文件描述符上有事件的产生,返回就绪的文件描述符个数
// 检测函数
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
- 参数:
- epfd : epoll实例对应的文件描述符
- events : 传出参数,保存了发送了变化的文件描述符的信息(返回的那个双向链表,一般用数组保存)
- maxevents : 第二个参数结构体数组的大小
- timeout : 阻塞时间
- 0 : 不阻塞
- -1 : 阻塞,直到检测到fd数据发生变化,解除阻塞
- >0 : 阻塞的时长(毫秒)
- 返回值:
- 成功,返回发送变化的文件描述符的个数 > 0
- 失败 -1
4.1.1 select/poll/epoll
调用函数
- select和poll都是一个函数,epoll是一组函数
文件描述符数量
- select通过线性表描述文件描述符集合,文件描述符有上限,一般是1024,但可以修改源码,重新编译内核,不推荐
- poll是链表描述,突破了文件描述符上限,最大可以打开文件的数目
- epoll通过红黑树描述,最大可以打开文件的数目,可以通过命令ulimit -n number修改,仅对当前终端有效
- select通过线性表描述文件描述符集合,文件描述符有上限,一般是1024,但可以修改源码,重新编译内核,不推荐
将文件描述符从用户传给内核
- select和poll通过将所有文件描述符拷贝到内核态,每次调用都需要拷贝
- epoll通过epoll_create建立一棵红黑树,通过epoll_ctl将要监听的文件描述符注册到红黑树上
- select和poll通过将所有文件描述符拷贝到内核态,每次调用都需要拷贝
内核判断就绪的文件描述符
- select和poll通过遍历文件描述符集合,判断哪个文件描述符上有事件发生
- epoll_create时,内核除了帮我们在epoll文件系统里建了个红黑树用于存储以后epoll_ctl传来的fd外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。
- epoll是根据每个fd上面的回调函数(中断函数)判断,只有发生了事件的socket才会主动的去调用 callback函数,其他空闲状态socket则不会,若是就绪事件,插入list
- select和poll通过遍历文件描述符集合,判断哪个文件描述符上有事件发生
应用程序索引就绪文件描述符
- select/poll只返回发生了事件的文件描述符的个数,若知道是哪个发生了事件,同样需要遍历
- epoll返回的发生了事件的个数和结构体数组,结构体包含socket的信息,因此直接处理返回的数组即可
- select/poll只返回发生了事件的文件描述符的个数,若知道是哪个发生了事件,同样需要遍历
工作模式
- select和poll都只能工作在相对低效的LT模式下
- epoll则可以工作在ET高效模式,并且epoll还支持EPOLLONESHOT事件,该事件能进一步减少可读、可写和异常事件被触发的次数。
- select和poll都只能工作在相对低效的LT模式下
应用场景
- 当所有的fd都是活跃连接,使用epoll,需要建立文件系统,红黑书和链表对于此来说,效率反而不高,不如selece和poll
- 当监测的fd数目较小,且各个fd都比较活跃,建议使用select或者poll
- 当监测的fd数目非常大,成千上万,且单位时间只有其中的一部分fd处于就绪状态,这个时候使用epoll能够明显提升性能
- 当所有的fd都是活跃连接,使用epoll,需要建立文件系统,红黑书和链表对于此来说,效率反而不高,不如selece和poll
4.1.2 ET、LT、EPOLLONESHOT
LT水平触发模式
- epoll_wait检测到文件描述符有事件发生,则将其通知给应用程序,应用程序可以不立即处理该事件。
- 当下一次调用epoll_wait时,epoll_wait还会再次向应用程序报告此事件,直至被处理
- epoll_wait检测到文件描述符有事件发生,则将其通知给应用程序,应用程序可以不立即处理该事件。
ET边缘触发模式
- epoll_wait检测到文件描述符有事件发生,则将其通知给应用程序,应用程序必须立即处理该事件
- 必须要一次性将数据读取完,使用非阻塞I/O,读取到出现eagain
- epoll_wait检测到文件描述符有事件发生,则将其通知给应用程序,应用程序必须立即处理该事件
EPOLLONESHOT
- 一个线程读取某个socket上的数据后开始处理数据,在处理过程中该socket上又有新数据可读,此时另一个线程被唤醒读取,此时出现两个线程处理同一个socket
- 我们期望的是一个socket连接在任一时刻都只被一个线程处理,通过epoll_ctl对该文件描述符注册epolloneshot事件,一个线程处理socket时,其他线程将无法处理,当该线程处理完后,需要通过epoll_ctl重置epolloneshot事件
- 一个线程读取某个socket上的数据后开始处理数据,在处理过程中该socket上又有新数据可读,此时另一个线程被唤醒读取,此时出现两个线程处理同一个socket
4.2 HTTP报文格式
HTTP报文分为请求报文和响应报文两种,每种报文必须按照特有格式生成,才能被浏览器端识别。
其中,浏览器端向服务器发送的为请求报文,服务器处理后返回给浏览器端的为响应报文。
4.2.1 请求报文
HTTP请求报文由请求行(request line)、请求头部(header)、空行和请求数据四个部分组成。
其中,请求分为两种,GET和POST,具体的:
- GET
GET /562f25980001b1b106000338.jpg HTTP/1.1
Host:img.mukewang.com
User-Agent:Mozilla/5.0 (Windows NT10.0; WOw64)
ApplewebKit/537.36(KHTML,like Gecko) Chrome/51.0.2704.106 Safari/537.36
Accept :image/webp,image/*,*/*;q=0.8
Referer:http:// www .imooc.com/
Accept-Encoding:gzip, deflate,sdchAccept-Language: zh-CN, zh;q=0.8
空行
请求数据为空
- POST
POST / HTTP1.1
Host:www.wrox.com
User-Agent:Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727; .NET CLR 3.0.04506.648; .NET CLR 3.5.21022)
Content-Type:application/x-www-form-urlencoded
Content-Length:40
Connection: Keep-Alive
空行
name=Professional%20Ajax&publisher=Wiley
请求行,用来说明请求类型,要访问的资源以及所使用的HTTP版本。
GET说明请求类型为GET,/562f25980001b1b106000338.jpg(URL)为要访问的资源,该行的最后一部分说明使用的是HTTP1.1版本。请求头部,紧接着请求行(即第一行)之后的部分,用来说明服务器要使用的附加信息。
- HOST,给出请求资源所在服务器的域名。
- User-Agent,HTTP客户端程序的信息,该信息由你发出请求使用的浏览器来定义,并且在每个请求中自动发送等。
- Accept,说明用户代理可处理的媒体类型。
- Accept-Encoding,说明用户代理支持的内容编码。
- Accept-Language,说明用户代理能够处理的自然语言集。
- Content-Type,说明实现主体的媒体类型。
- Content-Length,说明实现主体的大小。
- Connection,连接管理,可以是Keep-Alive或close。
空行,请求头部后面的空行是必须的即使第四部分的请求数据为空,也必须有空行。
请求数据也叫主体,可以添加任意的其他数据。
4.2.2 响应报文
HTTP响应也由四个部分组成,分别是:状态行、消息报头、空行和响应正文。
HTTP/1.1 200 OK
Date: Fri, 22 May 2009 06:07:21 GMT
Content-Type: text/html; charset=UTF-8
空行
<html>
<head></head>
<body>
<!--body goes here-->
</body>
</html>
- 状态行,由HTTP协议版本号, 状态码, 状态消息 三部分组成。
第一行为状态行,(HTTP/1.1)表明HTTP版本为1.1版本,状态码为200,状态消息为OK。- 消息报头,用来说明客户端要使用的一些附加信息。
第二行和第三行为消息报头,Date:生成响应的日期和时间;Content-Type:指定了MIME类型的HTML(text/html),编码类型是UTF-8。- 空行,消息报头后面的空行是必须的。
- 响应正文,服务器返回给客户端的文本信息。空行后面的html部分为响应正文。
4.3 HTTP状态码
HTTP有5种类型的状态码,具体的:
1xx:指示信息–表示请求已接收,继续处理。
2xx:成功–表示请求正常处理完毕。
- 200 OK:客户端请求被正常处理。
- 206 Partial content:客户端进行了范围请求。
- 200 OK:客户端请求被正常处理。
3xx:重定向–要完成请求必须进行更进一步的操作。
- 301 Moved Permanently:永久重定向,该资源已被永久移动到新位置,将来任何对该资源的访问都要使用本响应返回的若干个URI之一。
- 302 Found:临时重定向,请求的资源现在临时从不同的URI中获得。
- 301 Moved Permanently:永久重定向,该资源已被永久移动到新位置,将来任何对该资源的访问都要使用本响应返回的若干个URI之一。
4xx:客户端错误–请求有语法错误,服务器无法处理请求。
- 400 Bad Request:请求报文存在语法错误。
- 403 Forbidden:请求被服务器拒绝。
- 404 Not Found:请求不存在,服务器上找不到请求的资源。
- 400 Bad Request:请求报文存在语法错误。
5xx:服务器端错误–服务器处理请求出错。
- 500 Internal Server Error:服务器在执行请求时出现错误。
4.4 有限状态机
有限状态机,是一种抽象的理论模型,它能够把有限个变量描述的状态变化过程,以可构造可验证的方式呈现出来。比如,封闭的有向图。
有限状态机可以通过if-else,switch-case和函数指针来实现,从软件工程的角度看,主要是为了封装逻辑。
带有状态转移的有限状态机示例代码。
STATE_MACHINE(){
State cur_State = type_A;
while(cur_State != type_C){
Package _pack = getNewPackage();
switch(){
case type_A:
process_pkg_state_A(_pack);
cur_State = type_B;
break;
case type_B:
process_pkg_state_B(_pack);
cur_State = type_C;
break;
}
}
}
该状态机包含三种状态:type_A,type_B和type_C。其中,type_A是初始状态,type_C是结束状态。
状态机的当前状态记录在cur_State变量中,逻辑处理时,状态机先通过getNewPackage获取数据包,然后根据当前状态对数据进行处理,处理完后,状态机通过改变cur_State完成状态转移。
有限状态机一种逻辑单元内部的一种高效编程方法,在服务器编程中,服务器可以根据不同状态或者消息类型进行相应的处理逻辑,使得程序逻辑清晰易懂。
4.5 http处理流程
首先对http报文处理的流程进行简要介绍,然后具体介绍http类的定义和服务器接收http请求的具体过程。
4.5.1 http报文处理流程
- 浏览器端发出http连接请求,主线程创建http对象接收请求并将所有数据读入对应buffer,将该对象插入任务队列,工作线程从任务队列中取出一个任务进行处理。(本篇讲)
- 工作线程取出任务后,调用process_read函数,通过主、从状态机对请求报文进行解析。(中篇讲)
- 解析完之后,跳转do_request函数生成响应报文,通过process_write写入buffer,返回给浏览器端。(下篇讲)
4.5.2 http类
这一部分代码在TinyWebServer/http/http_conn.h中,主要是http类的定义。
class http_conn{
public:
static const int FILENAME_LEN = 200; //设置读取文件的名称m_real_file大小
static const int READ_BUFFER_SIZE = 2048; //设置读缓冲区m_read_buf大小
static const int WRITE_BUFFER_SIZE = 1024; //设置写缓冲区m_write_buf大小
enum METHOD{GET = 0,POST,HEAD,PUT,DELETE,TRACE,OPTIONS,CONNECT,PATH};//报文的请求方法,本项目只用到GET和POST
enum CHECK_STATE{ //主状态机的状态
CHECK_STATE_REQUESTLINE = 0, //请求行
CHECK_STATE_HEADER, //请求头
CHECK_STATE_CONTENT //消息体
};
enum HTTP_CODE{ //报文解析的结果
NO_REQUEST,
GET_REQUEST,
BAD_REQUEST,
NO_RESOURCE,
FORBIDDEN_REQUEST,
FILE_REQUEST,
INTERNAL_ERROR,
CLOSED_CONNECTION
};
enum LINE_STATUS { //从状态机的状态
LINE_OK = 0,
LINE_BAD,
LINE_OPEN
};
public:
http_conn() {}
~http_conn() {}
public:
//初始化套接字地址,函数内部会调用私有方法init
void init(int sockfd, const sockaddr_in &addr, char *, int, int, string user, string passwd, string sqlname);
//关闭http连接
void close_conn(bool real_close = true);
//各子线程通过process函数对任务进行处理,调用process_read函数和process_write函数分别完成报文解析与报文响应两个任务。
void process();
//读取浏览器端发来的全部数据
bool read_once();
//响应报文写入函数
bool write();
sockaddr_in *get_address(){ return &m_address; } //返回客户端的地址(inline)
void initmysql_result(connection_pool *connPool); //同步线程初始化数据库读取表
int timer_flag;
int improv;
private:
void init();
HTTP_CODE process_read(); //从m_read_buf读取,并处理请求报文
bool process_write(HTTP_CODE ret); //向m_write_buf写入响应报文数据
HTTP_CODE parse_request_line(char *text); //主状态机解析报文中的请求行数据
HTTP_CODE parse_headers(char *text); //主状态机解析报文中的请求头数据
HTTP_CODE parse_content(char *text); //主状态机解析报文中的请求内容
HTTP_CODE do_request(); //生成响应报文
//m_start_line是已经解析的字符
//get_line用于将指针向后偏移,指向未处理的字符
char *get_line() { return m_read_buf + m_start_line; };
//从状态机读取一行,分析是请求报文的哪一部分
LINE_STATUS parse_line();
void unmap();
//根据响应报文格式,生成对应8个部分,以下函数均由do_request调用
bool add_response(const char *format, ...);
bool add_content(const char *content);
bool add_status_line(int status, const char *title);
bool add_headers(int content_length);
bool add_content_type();
bool add_content_length(int content_length);
bool add_linger();
bool add_blank_line();
public:
static int m_epollfd; //epoll_creat()生成的epollfd
static int m_user_count; //用户数量
MYSQL *mysql;
int m_state; //读为0, 写为1
private:
int m_sockfd;
sockaddr_in m_address;
char m_read_buf[READ_BUFFER_SIZE]; //存储读取的请求报文数据
int m_read_idx; //缓冲区中m_read_buf中数据的最后一个字节的下一个位置
int m_checked_idx; //m_read_buf解析的位置m_checked_idx
int m_start_line; //m_read_buf中已经解析的字符个数
char m_write_buf[WRITE_BUFFER_SIZE]; //存储发出的响应报文数据
int m_write_idx; //指示buffer中实际使用的长度
CHECK_STATE m_check_state; //主状态机的状态
METHOD m_method; //请求方法
//以下为解析请求报文中对应的6个变量
char m_real_file[FILENAME_LEN]; //存储读取文件的名称
char *m_url;
char *m_version;
char *m_host;
int m_content_length;
bool m_linger;
char *m_file_address; //读取服务器上的文件地址
struct stat m_file_stat;
struct iovec m_iv[2]; //io向量机制iovec
int m_iv_count;
int cgi; //是否启用的POST
char *m_string; //存储请求头数据
int bytes_to_send; //剩余发送字节数
int bytes_have_send; //已发送字节数
char *doc_root;
map<string, string> m_users;
int m_TRIGMode; //0代表LT读取数据,1代表ET读取数据
int m_close_log; //是否关闭日志
char sql_user[100]; //连接数据库的参数
char sql_passwd[100];
char sql_name[100];
};
#endif
在http请求接收部分,会涉及到init和read_once函数,但init仅仅是对私有成员变量进行初始化,不用过多讲解。
这里,对read_once进行介绍。read_once读取浏览器端发送来的请求报文,直到无数据可读或对方关闭连接,读取到m_read_buffer中,并更新m_read_idx。
//循环读取客户数据,直到无数据可读或对方关闭连接
//非阻塞ET工作模式下,需要一次性将数据读完
bool http_conn::read_once(){
if (m_read_idx >= READ_BUFFER_SIZE){
return false;
}
int bytes_read = 0;
//LT读取数据,Lt模式下,读一次就好了
if (0 == m_TRIGMode){
bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0);
m_read_idx += bytes_read;
if (bytes_read <= 0){
return false;
}
return true;
}
//ET读数据
else{
while (true){
bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0);
if (bytes_read == -1){
if (errno == EAGAIN || errno == EWOULDBLOCK) //数据读完了
break;
return false; //信号为EINTR被中断
}
else if (bytes_read == 0){//客户端断开
return false;
}
m_read_idx += bytes_read;
}
return true;
}
}
4.5.3 epoll相关代码
项目中epoll相关代码部分包括非阻塞模式、内核事件表注册事件、删除事件、重置EPOLLONESHOT事件四种。
- 非阻塞模式
//对文件描述符设置非阻塞
int setnonblocking(int fd){
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
- 内核事件表注册新事件,开启EPOLLONESHOT,针对客户端连接的描述符,listenfd不用开启
//将内核事件表注册读事件,fd通信的文件描述符,选择开启EPOLLONESHOT,(TRIGMode=1 ET模式)/LT模式
void addfd(int epollfd, int fd, bool one_shot, int TRIGMode){
epoll_event event;
event.data.fd = fd;
if (1 == TRIGMode) //边沿触发,接收到数据就触发
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
else //水平触发:有数据就触发
event.events = EPOLLIN | EPOLLRDHUP;
if (one_shot) //只触发一次
event.events |= EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
- 内核事件表删除事件
//从内核事件表删除描述符
void removefd(int epollfd, int fd){
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
close(fd);
}
- 重置EPOLLONESHOT事件
//将事件重置为EPOLLONESHOT
void modfd(int epollfd, int fd, int ev, int TRIGMode){
epoll_event event;
event.data.fd = fd;
if (1 == TRIGMode)
event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP;
else
event.events = ev | EPOLLONESHOT | EPOLLRDHUP; //EPOLLHUP:表示对应的读事件文件描述符被挂断;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}
4.5.4 服务器接收http请求
浏览器端发出http连接请求,主线程创建http对象接收请求并将所有数据读入对应buffer,将该对象插入任务队列,工作线程从任务队列中取出一个任务进行处理。
//创建MAX_FD个http类对象
http_conn* users=new http_conn[MAX_FD];
//创建内核事件表
epoll_event events[MAX_EVENT_NUMBER]; //一次最大的监听数量:传出参数
epollfd = epoll_create(5);
assert(epollfd != -1);
//将listenfd放在epoll树上
addfd(epollfd, listenfd, false);
//将上述epollfd赋值给http类对象的m_epollfd属性
http_conn::m_epollfd = epollfd;
void WebServer::eventLoop(){
bool timeout = false;
bool stop_server = false;
while (!stop_server){
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1); //等待所监控文件描述符上有事件的产生
if (number < 0 && errno != EINTR){
LOG_ERROR("%s", "epoll failure");
break;
}
for (int i = 0; i < number; i++){
int sockfd = events[i].data.fd;
//处理新到的客户连接
if (sockfd == m_listenfd){
bool flag = dealclinetdata();
if (false == flag)
continue;
}
//处理异常事件
else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
//服务器端关闭连接,移除对应的定时器
util_timer *timer = users_timer[sockfd].timer;
deal_timer(timer, sockfd);
}
//处理信号
else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN)){
bool flag = dealwithsignal(timeout, stop_server);
if (false == flag)
LOG_ERROR("%s", "dealclientdata failure");
}
//处理客户连接上接收到的数据
else if (events[i].events & EPOLLIN){ //读事件
dealwithread(sockfd);
}
else if (events[i].events & EPOLLOUT){ //写事件
dealwithwrite(sockfd);
}
}
if (timeout){
utils.timer_handler();
LOG_INFO("%s", "timer tick");
timeout = false;
}
}
}
05 http连接处理(中)
上篇,我们对http连接的基础知识、服务器接收请求的处理流程进行了介绍,本篇将结合流程图和代码分别对状态机和服务器解析请求报文进行详解。
流程图部分,描述主、从状态机调用关系与状态转移过程。
代码部分,结合代码对http请求报文的解析进行详解。
5.1 流程图与状态机
从状态机负责读取报文的一行,主状态机负责对该行数据进行解析,主状态机内部调用从状态机,从状态机驱动主状态机。
5.1.1 主状态机
三种状态,标识解析位置。
CHECK_STATE_REQUESTLINE
,解析请求行CHECK_STATE_HEADER
,解析请求头CHECK_STATE_CONTENT
,解析消息体,仅用于解析POST请求
5.1.2 从状态机
三种状态,标识解析一行的读取状态。
LINE_OK
,完整读取一行LINE_BAD
,报文语法有误LINE_OPEN
,读取的行不完整
5.2 代码分析-http报文解析
上篇中介绍了服务器接收http请求的流程与细节,简单来讲,浏览器端发出http连接请求,服务器端主线程创建http对象接收请求并将所有数据读入对应buffer,将该对象插入任务队列后,工作线程从任务队列中取出一个任务进行处理。
各子线程通过process函数对任务进行处理,调用process_read函数和process_write函数分别完成报文解析与报文响应两个任务。
//各子线程通过process函数对任务进行处理,调用process_read函数和process_write函数分别完成报文解析与报文响应两个任务。
void http_conn::process(){
HTTP_CODE read_ret = process_read();
if (read_ret == NO_REQUEST){//NO_REQUEST,表示请求不完整,需要继续接收请求数据
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);//注册并监听读事件
return;
}
bool write_ret = process_write(read_ret);//调用process_write完成报文响应
if (!write_ret){
close_conn();
}
modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode);//注册并监听写事件
}
本篇将对报文解析的流程和process_read函数细节进行详细介绍。
5.2.1 HTTP_CODE含义
表示HTTP请求的处理结果,在头文件中初始化了八种情形,在报文解析时只涉及到四种。
- NO_REQUEST——请求不完整,需要继续读取请求报文数据
- GET_REQUEST——获得了完整的HTTP请求
- BAD_REQUEST——HTTP请求报文有语法错误
- INTERNAL_ERROR——服务器内部错误,该结果在主状态机逻辑switch的default下,一般不会触发
enum HTTP_CODE{ //报文解析的结果
NO_REQUEST, //请求不完整,需要继续读取请求报文数据
GET_REQUEST, //获得了完整的HTTP请求
BAD_REQUEST, //HTTP请求报文有语法错误
NO_RESOURCE, //没有相应的资源
FORBIDDEN_REQUEST, //禁止访问
FILE_REQUEST, //文件请求
INTERNAL_ERROR, //服务器内部错误,该结果在主状态机逻辑switch的default下,一般不会触发
CLOSED_CONNECTION //请求不完整,需要继续读取请求报文数据
};
5.2.2 解析报文整体流程
process_read通过while循环,将主从状态机进行封装,对报文的每一行进行循环处理。
判断条件
- 主状态机转移到CHECK_STATE_CONTENT,该条件涉及解析消息体
- 从状态机转移到LINE_OK,该条件涉及解析请求行和请求头部
- 两者为或关系,当条件为真则继续循环,否则退出
- 主状态机转移到CHECK_STATE_CONTENT,该条件涉及解析消息体
循环体
- 从状态机读取数据
- 调用get_line函数,通过m_start_line将从状态机读取数据间接赋给text
- 主状态机解析text
- 从状态机读取数据
//m_start_line是已经解析的字符 //m_start_line是行在buffer中的起始位置,将该位置后面的数据赋给text
//此时从状态机已提前将一行的末尾字符\r\n变为\0\0,所以text可以直接取出完整的行进行解析
//get_line用于将指针向后偏移,指向未处理的字符
char *get_line() { return m_read_buf + m_start_line; };
http_conn::HTTP_CODE http_conn::process_read(){
//初始化从状态机状态、HTTP请求解析结果
LINE_STATUS line_status = LINE_OK;
HTTP_CODE ret = NO_REQUEST;
char *text = 0;
//这里为什么要写两个判断条件?第一个判断条件为什么这样写?
//具体的在主状态机逻辑中会讲解。
//parse_line为从状态机的具体实现
while ((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK) || ((line_status = parse_line()) == LINE_OK)){
text = get_line();
//m_start_line是每一个数据行在m_read_buf中的起始位置
//m_checked_idx表示从状态机在m_read_buf中读取的位置
m_start_line = m_checked_idx;
LOG_INFO("%s", text);
//主状态机的三种状态转移逻辑
switch (m_check_state){
case CHECK_STATE_REQUESTLINE:
{
ret = parse_request_line(text); //解析请求行
if (ret == BAD_REQUEST)
return BAD_REQUEST;
break;
}
case CHECK_STATE_HEADER:
{
ret = parse_headers(text); //解析请求头
if (ret == BAD_REQUEST)
return BAD_REQUEST;
else if (ret == GET_REQUEST){ //完整解析GET请求后,跳转到报文响应函数
return do_request();
}
break;
}
case CHECK_STATE_CONTENT:
{
ret = parse_content(text); //解析消息体
if (ret == GET_REQUEST) //完整解析POST请求后,跳转到报文响应函数
return do_request();
line_status = LINE_OPEN; //解析完消息体即完成报文解析,避免再次进入循环,更新line_status
break;
}
default:
return INTERNAL_ERROR;
}
}
return NO_REQUEST;
}
5.2.3 从状态机逻辑
上一篇的基础知识讲解中,对于HTTP报文的讲解遗漏了一点细节,在这里作为补充。
在HTTP报文中,每一行的数据由\r\n作为结束字符,空行则是仅仅是字符\r\n。因此,可以通过查找\r\n将报文拆解成单独的行进行解析,项目中便是利用了这一点。
从状态机负责读取buffer中的数据,将每行数据末尾的\r\n置为\0\0,并更新从状态机在buffer中读取的位置m_checked_idx,以此来驱动主状态机解析。
从状态机从m_read_buf中逐字节读取,判断当前字节是否为\r
- 接下来的字符是\n,将\r\n修改成\0\0,将m_checked_idx指向下一行的开头,则返回LINE_OK
- 接下来达到了buffer末尾,表示buffer还需要继续接收,返回LINE_OPEN
- 否则,表示语法错误,返回LINE_BAD
- 接下来的字符是\n,将\r\n修改成\0\0,将m_checked_idx指向下一行的开头,则返回LINE_OK
当前字节不是\r,判断是否是\n(一般是上次读取到\r就到了buffer末尾,没有接收完整,再次接收时会出现这种情况)
- 如果前一个字符是\r,则将\r\n修改成\0\0,将m_checked_idx指向下一行的开头,则返回LINE_OK
当前字节既不是\r,也不是\n
- 表示接收不完整,需要继续接收,返回LINE_OPEN
//从状态机,用于分析出一行内容
//返回值为行的读取状态,有LINE_OK,LINE_BAD,LINE_OPEN
http_conn::LINE_STATUS http_conn::parse_line(){
//m_read_idx指向缓冲区m_read_buf的数据末尾的下一个字节
//m_checked_idx指向从状态机当前正在分析的字节
char temp;
for (; m_checked_idx < m_read_idx; ++m_checked_idx) {
temp = m_read_buf[m_checked_idx]; //temp为将要分析的字节
//如果当前是\r字符,则有可能会读取到完整行
if (temp == '\r'){//从状态机 LINE_OK-完整读取一行 LINE_BAD-报文语法有误 LINE_OPEN-读取的行不完整
if ((m_checked_idx + 1) == m_read_idx) //下一个字符达到了buffer结尾,则接收不完整,需要继续接收
return LINE_OPEN;
else if (m_read_buf[m_checked_idx + 1] == '\n'){//下一个字符是\n,将\r\n改为\0\0
m_read_buf[m_checked_idx++] = '\0';
m_read_buf[m_checked_idx++] = '\0';
return LINE_OK;
}
return LINE_BAD;//如果都不符合,则返回语法错误
}
//如果当前字符是\n,也有可能读取到完整行
//一般是上次读取到\r就到buffer末尾了,没有接收完整,再次接收时会出现这种情况
else if (temp == '\n'){
if (m_checked_idx > 1 && m_read_buf[m_checked_idx - 1] == '\r'){ //前一个字符是\r,则接收完整
m_read_buf[m_checked_idx - 1] = '\0';
m_read_buf[m_checked_idx++] = '\0';
return LINE_OK;
}
return LINE_BAD;
}
}
return LINE_OPEN; //并没有找到\r\n,需要继续接收
}
5.2.4 主状态机逻辑
主状态机初始状态是CHECK_STATE_REQUESTLINE
,通过调用从状态机来驱动主状态机,在主状态机进行解析前,从状态机已经将每一行的末尾\r\n符号改为\0\0,以便于主状态机直接取出对应字符串进行处理。
CHECK_STATE_REQUESTLINE
- 主状态机的初始状态,调用parse_request_line函数解析请求行
- 解析函数从m_read_buf中解析HTTP请求行,获得请求方法、目标URL及HTTP版本号
- 解析完成后主状态机的状态变为
CHECK_STATE_HEADER
- 主状态机的初始状态,调用parse_request_line函数解析请求行
//解析http请求行,获得请求方法,目标url及http版本号
http_conn::HTTP_CODE http_conn::parse_request_line(char *text){
//在HTTP报文中,请求行用来说明请求类型,要访问的资源以及所使用的HTTP版本,其中各个部分之间通过\t或空格分隔。
//请求行中最先含有空格和\t任一字符的位置并返回
m_url = strpbrk(text, " \t");
if (!m_url) return BAD_REQUEST;//如果没有空格或\t,则报文格式有误
*m_url++ = '\0'; //将该位置改为\0,用于将前面数据取出
//取出数据,并通过与GET和POST比较,以确定请求方式
char *method = text;
if (strcasecmp(method, "GET") == 0)
m_method = GET;
else if (strcasecmp(method, "POST") == 0){
m_method = POST;
cgi = 1;
}
else
return BAD_REQUEST;
//m_url此时跳过了第一个空格或\t字符,但不知道之后是否还有
//将m_url向后偏移,通过查找,继续跳过空格和\t字符,指向请求资源的第一个字符
m_url += strspn(m_url, " \t");
//使用与判断请求方式的相同逻辑,判断HTTP版本号
m_version = strpbrk(m_url, " \t");
if (!m_version)
return BAD_REQUEST;
*m_version++ = '\0';
m_version += strspn(m_version, " \t");
if (strcasecmp(m_version, "HTTP/1.1") != 0)//仅支持HTTP/1.1
return BAD_REQUEST;
//对请求资源前7个字符进行判断
//这里主要是有些报文的请求资源中会带有http://,这里需要对这种情况进行单独处理
if (strncasecmp(m_url, "http://", 7) == 0){
m_url += 7;
m_url = strchr(m_url, '/');
}
if (strncasecmp(m_url, "https://", 8) == 0){//同样增加https情况
m_url += 8;
m_url = strchr(m_url, '/');
}
//一般的不会带有上述两种符号,直接是单独的/或/后面带访问资源
if (!m_url || m_url[0] != '/')
return BAD_REQUEST;
//当url为/时,显示判断界面
if (strlen(m_url) == 1)
strcat(m_url, "judge.html");
//请求行处理完毕,将主状态机转移处理请求头
m_check_state = CHECK_STATE_HEADER;
return NO_REQUEST;
}
解析完请求行后,主状态机继续分析请求头。在报文中,请求头和空行的处理使用的同一个函数,这里通过判断当前的text首位是不是\0字符,若是,则表示当前处理的是空行,若不是,则表示当前处理的是请求头。
CHECK_STATE_HEADER
- 调用parse_headers函数解析请求头部信息
- 判断是空行还是请求头,若是空行,进而判断content-length是否为0,如果不是0,表明是POST请求,则状态转移到CHECK_STATE_CONTENT,否则说明是GET请求,则报文解析结束。
- 若解析的是请求头部字段,则主要分析connection字段,content-length字段,其他字段可以直接跳过,各位也可以根据需求继续分析。
- connection字段判断是keep-alive还是close,决定是长连接还是短连接
- content-length字段,这里用于读取post请求的消息体长度
- 调用parse_headers函数解析请求头部信息
//解析http请求的一个头部信息
http_conn::HTTP_CODE http_conn::parse_headers(char *text){
if (text[0] == '\0'){ //判断是空行还是请求头
if (m_content_length != 0){ //判断是GET还是POST请求
//判断是GET还是POST请求
m_check_state = CHECK_STATE_CONTENT; //POST需要跳转到消息体处理状态
return NO_REQUEST;
}
return GET_REQUEST;
}
//解析请求头部连接字段
else if (strncasecmp(text, "Connection:", 11) == 0){
text += 11;
text += strspn(text, " \t");
if (strcasecmp(text, "keep-alive") == 0){
m_linger = true;
}
}
//解析请求头部内容长度字段
else if (strncasecmp(text, "Content-length:", 15) == 0){
text += 15;
text += strspn(text, " \t");
m_content_length = atol(text);
}
//解析请求头部HOST字段
else if (strncasecmp(text, "Host:", 5) == 0){
text += 5;
text += strspn(text, " \t");
m_host = text;
}
else{
LOG_INFO("oop!unknow header: %s", text);
}
return NO_REQUEST;
}
如果仅仅是GET请求,如项目中的欢迎界面,那么主状态机只设置之前的两个状态足矣。
因为在上篇推文中我们曾说道,GET和POST请求报文的区别之一是有无消息体部分,GET请求没有消息体,当解析完空行之后,便完成了报文的解析。
但后续的登录和注册功能,为了避免将用户名和密码直接暴露在URL中,我们在项目中改用了POST请求,将用户名和密码添加在报文中作为消息体进行了封装。
为此,我们需要在解析报文的部分添加解析消息体的模块。
while ((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK) || ((line_status = parse_line()) == LINE_OK))
那么,这里的判断条件为什么要写成这样呢?
在GET请求报文中,每一行都是\r\n作为结束,所以对报文进行拆解时,仅用从状态机的状态line_status=parse_line())==LINE_OK
语句即可。
但,在POST请求报文中,消息体的末尾没有任何字符,所以不能使用从状态机的状态,这里转而使用主状态机的状态作为循环入口条件。
那后面的&& line_status==LINE_OK
又是为什么?
解析完消息体后,报文的完整解析就完成了,但此时主状态机的状态还是CHECK_STATE_CONTENT
,也就是说,符合循环入口条件,还会再次进入循环,这并不是我们所希望的。
为此,增加了该语句,并在完成消息体解析后,将line_status变量更改为LINE_OPEN,此时可以跳出循环,完成报文解析任务。
CHECK_STATE_CONTENT
- 仅用于解析POST请求,调用parse_content函数解析消息体
- 用于保存post请求消息体,为后面的登录和注册做准备
- 仅用于解析POST请求,调用parse_content函数解析消息体
//判断http请求是否被完整读入
http_conn::HTTP_CODE http_conn::parse_content(char *text){
if (m_read_idx >= (m_content_length + m_checked_idx)){ //判断buffer中是否读取了消息体
text[m_content_length] = '\0';
//POST请求中最后为输入的用户名和密码
m_string = text;
return GET_REQUEST;
}
return NO_REQUEST;
}
06 http连接处理(下)
上一篇详解中,我们对状态机和服务器解析请求报文进行了介绍。
本篇,我们将介绍服务器如何响应请求报文,并将该报文发送给浏览器端。首先介绍一些基础API,然后结合流程图和代码对服务器响应请求报文进行详解。
基础API部分,介绍stat
、mmap
、iovec
、writev
。
流程图部分,描述服务器端响应请求报文的逻辑,各模块间的关系。
代码部分,结合代码对服务器响应请求报文进行详解。
6.1 基础API
为了更好的源码阅读体验,这里提前对代码中使用的一些API进行简要介绍,更丰富的用法可以自行查阅资料。
6.1.1 stat
stat函数用于取得指定文件的文件属性,并将文件属性存储在结构体stat里,这里仅对其中用到的成员进行介绍。
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
//获取文件属性,存储在statbuf中
int stat(const char *pathname, struct stat *statbuf);
作用:获取一个文件相关的一些信息
参数:
- pathname:操作的文件的路径
- statbuf:结构体变量,传出参数,用于保存获取到的文件的信息
返回值:
成功:返回0
失败:返回-1 设置errno
//作用:获取一个软连接文件(不包含链接的文件)相关的一些信息
int lstat(const char *pathname, struct stat *statbuf);
参数:
- pathname:操作的文件的路径
- statbuf:结构体变量,传出参数,用于保存获取到的文件的信息
返回值:
成功:返回0
失败:返回-1 设置errno
struct stat {
mode_t st_mode; /* 文件类型和权限 */
off_t st_size; /* 文件大小,字节数*/
......
};
6.1.2 mmap
用于将一个文件或其他对象映射到内存,提高文件的访问速度。
#include <sys/mman.h>
void *mmap(void *addr, size_t length, int prot, int flags,int fd, off_t offset);
- 功能:将一个文件或者设备的数据映射到内存中
- 参数:
- addr: 映射区的开始地址 指定NULL, 由内核指定
- length : 要映射的数据的长度,这个值不能为0。建议使用文件的长度。
获取文件的长度:stat lseek(如果文件太小,其大小会是分页的整数倍)
- prot : 对申请的内存映射区的操作权限
-PROT_EXEC :可执行的权限
-PROT_READ :读权限
-PROT_WRITE :写权限
-PROT_NONE :没有权限
要操作映射内存,必须要有读的权限。PROT_READ、PROT_READ|PROT_WRITE
- flags :
- MAP_SHARED : 映射区的数据会自动和磁盘文件进行同步,进程间通信,必须要设置这个选项
- MAP_PRIVATE :不同步,内存映射区的数据改变了,对原来的文件不会修改,会重新创建一个新的文件。(copy on write)
- fd: 需要映射的那个文件的文件描述符
- 通过open得到,open的是一个磁盘文件
- 注意:文件的大小不能为0,open指定的权限不能和prot参数有冲突。
prot: PROT_READ open:只读/读写
prot: PROT_READ | PROT_WRITE open:读写
- offset:偏移量,一般不用。必须指定的是4k的整数倍,0表示不偏移。
- 返回值:返回创建的内存的首地址
失败返回MAP_FAILED,(void *) -1 ——一个宏
int munmap(void *addr, size_t length);
- 功能:释放内存映射
- 参数:
- addr : 要释放的内存的首地址
- length : 要释放的内存的大小,要和mmap函数中的length参数的值一样。
6.1.3 iovec
定义了一个向量元素,通常,这个结构用作一个多元素的数组。
struct iovec {
void *iov_base; /* iov_base指向数据的地址*/
size_t iov_len; /* iov_len表示数据的长度*/
};
6.1.4 writev
writev函数用于在一次函数调用中写多个非连续缓冲区,有时也将这该函数称为聚集写。
#include <sys/uio.h>
ssize_t writev(int filedes, const struct iovec *iov, int iovcnt);
- filedes表示文件描述符
- iov为前述io向量机制结构体iovec
- iovcnt为结构体的个数
若成功则返回已写的字节数,
若出错则返回-1。
`writev`以顺序`iov[0]`,`iov[1]`至`iov[iovcnt-1]`从缓冲区中聚集输出数据到文件描述符filedes。`writev`返回输出的字节总数,通常,它应等于所有缓冲区长度之和。()
特别注意: 循环调用writev时,需要重新处理iovec中的指针和长度,该函数不会对这两个成员做任何处理。writev的返回值为已写的字节数,但这个返回值“实用性”并不高,因为参数传入的是iovec数组,计量单位是iovcnt,而不是字节数,我们仍然需要通过遍历iovec来计算新的基址,另外写入数据的“结束点”可能位于一个iovec的中间某个位置,因此需要调整临界iovec的io_base和io_len。
6.1.5 vsnprintf
vsnprintf用来将可变参数格式化输出到一个字符数组,常和va_start和va_end一起使用。
函数功能:将可变参数格式化输出到一个字符数组。
注意:
在linux环境下是:vsnprintf
但在VC6环境下是:_vsnprintf
#include <stdarg.h>
int vsnprintf(char *str, size_t size, const char *format, va_list ap);
参数:
str-保存输出字符数组的存储区。
size-存储区的大小。
format-包含格式字符串的C字符串,其格式字符串与printf中的格式相同
arg-变量参数列表,用va_list 定义。
返回值:
执行成功,返回最终生成字符串的长度,若生成字符串的长度大于size,则将字符串的前size个字符复制到str,同时将原串的长度返回(不包含终止符);执行失败,返回负值,并置errno.
#include <stdio.h>
#include <stdarg.h>
#define MAXLEN 20
char buffer[MAXLEN];
int mon_log(char* format, ...){
int i=0,j=0;
va_list vArgList;
va_start (vArgList, format);
i=_vsnprintf(buffer, MAXLEN, format, vArgList);/*把可变参数表中的数据转成字符存到buffer中,每个参数间用','隔开 */
va_end(vArgList);
printf("%s\r\n", buffer);
for(j=0;j<MAXLEN;j++) { /*打印buffer中每个字符的值 */
printf("%d ", buffer[j]);
}
printf("\r\n");
return i;
}
void main(){
int i;
for(i=0;i<MAXLEN;i++)
buffer[i]=100;
i=mon_log("%s,%d,%d,%c","abc",2,3,'4');
printf("return value1=%d\r\n\r\n",i);
i=mon_log("para1:%s,%d,%d,%c","123",2,3,'4');
printf("return value2=%d\r\n\r\n",i);
i=mon_log("%s,%d,%d,last=%c\r\n","abc",2,3,'4');
printf("return value3=%d\r\n\r\n",i);
}
//输出
abc,2,3,4
97 98 99 44 50 44 51 44 52 0 100 100 100 100 100 100 100 100 100 100
return value1=9
para1:123,2,3,4
112 97 114 97 49 58 49 50 51 44 50 44 51 44 52 0 100 100 100 100
return value2=15
abc,2,3,last=4
97 98 99 44 50 44 51 44 108 97 115 116 61 52 13 10 0 100 100 100
return value3=16
6.1.6 sendfile
#include <sys/sendfile.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
out_fd是需要输出数据的fd;
in_fd是需要获取数据的fd;
offset是拷贝数据的起始点,如果是NULL,则从头开始拷贝;
count是拷贝的字节数。
int readfd = open(argv[1], O_RDONLY);
int writefd = open(argv[2], O_WRONLY | O_CREAT);
if (sendfile(writefd, readfd, NULL, len) < 0) {
perror("sendfile() error\n");
return -1;
}
6.2 流程图
浏览器端发出HTTP请求报文,服务器端接收该报文并调用process_read
对其进行解析,根据解析结果HTTP_CODE
,进入相应的逻辑和模块。
其中,服务器子线程完成报文的解析与响应;主线程监测读写事件,调用read_once
和http_conn::write
完成数据的读取与发送。
6.2.1 HTTP_CODE含义
表示HTTP请求的处理结果,在头文件中初始化了八种情形,在报文解析与响应中只用到了七种。
NO_REQUEST
- 请求不完整,需要继续读取请求报文数据
- 跳转主线程继续监测读事件
- 请求不完整,需要继续读取请求报文数据
GET_REQUEST
- 获得了完整的HTTP请求
- 调用do_request完成请求资源映射
- 获得了完整的HTTP请求
NO_RESOURCE
- 请求资源不存在
- 跳转process_write完成响应报文
- 请求资源不存在
BAD_REQUEST
- HTTP请求报文有语法错误或请求资源为目录
- 跳转process_write完成响应报文
- HTTP请求报文有语法错误或请求资源为目录
FORBIDDEN_REQUEST
- 请求资源禁止访问,没有读取权限
- 跳转process_write完成响应报文
- 请求资源禁止访问,没有读取权限
FILE_REQUEST
- 请求资源可以正常访问
- 跳转process_write完成响应报文
- 请求资源可以正常访问
INTERNAL_ERROR
- 服务器内部错误,该结果在主状态机逻辑switch的default下,一般不会触发
6.3 代码分析
6.3.1 do_request
process_read
函数的返回值是对请求的文件分析后的结果,一部分是语法错误导致的BAD_REQUEST
,一部分是do_request
的返回结果.该函数将网站根目录和url
文件拼接,然后通过stat判断该文件属性。另外,为了提高访问速度,通过mmap进行映射,将普通文件映射到内存逻辑地址。
为了更好的理解请求资源的访问流程,这里对各种各页面跳转机制进行简要介绍。其中,浏览器网址栏中的字符,即url
,可以将其抽象成ip:port/xxx
,xxx
通过html
文件的action
属性进行设置。
m_url为请求报文中解析出的请求资源,以/开头,也就是/xxx
,项目中解析后的m_url有8种情况。
/
- GET请求,跳转到judge.html,即欢迎访问页面
/0
- POST请求,跳转到register.html,即注册页面
/1
- POST请求,跳转到log.html,即登录页面
/2CGISQL.cgi
- POST请求,进行登录校验
- 验证成功跳转到welcome.html,即资源请求成功页面
- 验证失败跳转到logError.html,即登录失败页面
- POST请求,进行登录校验
/3CGISQL.cgi
- POST请求,进行注册校验
- 注册成功跳转到log.html,即登录页面
- 注册失败跳转到registerError.html,即注册失败页面
- POST请求,进行注册校验
/5
- POST请求,跳转到picture.html,即图片请求页面
/6
- POST请求,跳转到video.html,即视频请求页面
/7
- POST请求,跳转到fans.html,即关注页面
如果大家对上述设置方式不理解,不用担心。具体的登录和注册校验功能会在第12节进行详解,到时候还会针对html进行介绍。
//root文件夹路径, //网站根目录,文件夹内存放请求的资源和跳转的html文件
char server_path[200];
getcwd(server_path, 200); //获取当前工作目录
char root[6] = "/root";
m_root = (char *)malloc(strlen(server_path) + strlen(root) + 1);
strcpy(m_root, server_path);
strcat(m_root, root);
http_conn::HTTP_CODE http_conn::do_request(){
//将初始化的m_real_file赋值为网站根目录
strcpy(m_real_file, doc_root);
int len = strlen(doc_root);
//printf("m_url:%s\n", m_url);
//找到m_url中/的位置
const char *p = strrchr(m_url, '/');
//处理cgi
if (cgi == 1 && (*(p + 1) == '2' || *(p + 1) == '3')){
//根据标志判断是登录检测还是注册检测
char flag = m_url[1];
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/");
strcat(m_url_real, m_url + 2);
strncpy(m_real_file + len, m_url_real, FILENAME_LEN - len - 1);
free(m_url_real);
//将用户名和密码提取出来
//user=123&passwd=123
char name[100], password[100];
int i;
for (i = 5; m_string[i] != '&'; ++i)
name[i - 5] = m_string[i];
name[i - 5] = '\0';
int j = 0;
for (i = i + 10; m_string[i] != '\0'; ++i, ++j)
password[j] = m_string[i];
password[j] = '\0';
if (*(p + 1) == '3'){
//如果是注册,先检测数据库中是否有重名的
//没有重名的,进行增加数据
char *sql_insert = (char *)malloc(sizeof(char) * 200);
strcpy(sql_insert, "INSERT INTO user(username, passwd) VALUES(");
strcat(sql_insert, "'");
strcat(sql_insert, name);
strcat(sql_insert, "', '");
strcat(sql_insert, password);
strcat(sql_insert, "')");
if (users.find(name) == users.end()){
m_lock.lock();
int res = mysql_query(mysql, sql_insert);
users.insert(pair<string, string>(name, password)); //在程序中存储用户名和密码
m_lock.unlock();
if (!res)
strcpy(m_url, "/log.html");
else
strcpy(m_url, "/registerError.html");
}
else
strcpy(m_url, "/registerError.html");
}
//如果是登录,直接判断
//若浏览器端输入的用户名和密码在表中可以查找到,返回1,否则返回0
else if (*(p + 1) == '2'){
if (users.find(name) != users.end() && users[name] == password)
strcpy(m_url, "/welcome.html");
else
strcpy(m_url, "/logError.html");
}
}
//如果请求资源为/0,表示跳转注册界面
if (*(p + 1) == '0'){
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/register.html");
//将网站目录和/register.html进行拼接,更新到m_real_file中
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
//如果请求资源为/1,表示跳转登录界面
else if (*(p + 1) == '1'){
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/log.html");
//将网站目录和/log.html进行拼接,更新到m_real_file中
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
//如果请求资源为/5,表示访问图片界面
else if (*(p + 1) == '5'){
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/picture.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else if (*(p + 1) == '6'){
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/video.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else if (*(p + 1) == '7'){
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/fans.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else
//如果以上均不符合,即不是登录和注册,直接将url与网站目录拼接
//这里的情况是welcome界面,请求服务器上的一个图片
strncpy(m_real_file + len, m_url, FILENAME_LEN - len - 1);
//通过stat获取请求资源文件信息,成功则将信息更新到m_file_stat结构体
//失败返回NO_RESOURCE状态,表示资源不存在
if (stat(m_real_file, &m_file_stat) < 0) //m_file_stat是输出参数
return NO_RESOURCE;
if (!(m_file_stat.st_mode & S_IROTH))//判断文件的权限,是否可读,不可读则返回FORBIDDEN_REQUEST状态
return FORBIDDEN_REQUEST;
if (S_ISDIR(m_file_stat.st_mode))//判断文件类型,如果是目录,则返回BAD_REQUEST,表示请求报文有误
return BAD_REQUEST;
//以只读方式获取文件描述符,通过mmap将该文件映射到内存中
int fd = open(m_real_file, O_RDONLY);
m_file_address = (char *)mmap(0, m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
close(fd); //避免文件描述符的浪费和占用
return FILE_REQUEST; //表示请求文件存在,且可以访问
}
6.3.2 process_write
根据do_request
的返回状态,服务器子线程调用process_write
向m_write_buf
中写入响应报文。
add_status_line函数,添加状态行:http/1.1 状态码 状态消息
add_headers函数添加消息报头,内部调用add_content_length和add_linger函数
- content-length记录响应报文长度,用于浏览器端判断服务器是否发送完数据
- connection记录连接状态,用于告诉浏览器端保持长连接
- content-length记录响应报文长度,用于浏览器端判断服务器是否发送完数据
add_blank_line添加空行
上述涉及的5个函数,均是内部调用add_response
函数更新m_write_idx
指针和缓冲区m_write_buf
中的内容。
bool http_conn::add_response(const char *format, ...){
if (m_write_idx >= WRITE_BUFFER_SIZE) //如果写入内容超出m_write_buf大小则报错
return false;
va_list arg_list; //定义可变参数列表
va_start(arg_list, format); //将变量arg_list初始化为传入参数
//将数据format从可变参数列表写入缓冲区写,返回写入数据的长度
int len = vsnprintf(m_write_buf + m_write_idx, WRITE_BUFFER_SIZE - 1 - m_write_idx, format, arg_list);
//如果写入的数据长度超过缓冲区剩余空间,则报错
if (len >= (WRITE_BUFFER_SIZE - 1 - m_write_idx)){
va_end(arg_list);
return false;
}
m_write_idx += len;//更新m_write_idx位置
va_end(arg_list);//清空可变参列表
LOG_INFO("request:%s", m_write_buf);
return true;
}
bool http_conn::add_status_line(int status, const char *title){//添加消息报头,具体的添加文本长度、连接状态和空行
return add_response("%s %d %s\r\n", "HTTP/1.1", status, title);
}
bool http_conn::add_headers(int content_len){//添加Content-Length,表示响应报文的长度
return add_content_length(content_len) && add_linger() && add_blank_line();
}
bool http_conn::add_content_length(int content_len){//添加文本类型,这里是html
return add_response("Content-Length:%d\r\n", content_len);
}
bool http_conn::add_content_type(){//添加文本类型,这里是html
return add_response("Content-Type:%s\r\n", "text/html");
}
bool http_conn::add_linger(){//添加连接状态,通知浏览器端是保持连接还是关闭
return add_response("Connection:%s\r\n", (m_linger == true) ? "keep-alive" : "close");
}
bool http_conn::add_blank_line(){//添加空行
return add_response("%s", "\r\n");
}
bool http_conn::add_content(const char *content){//添加文本content
return add_response("%s", content);
}
响应报文分为两种,一种是请求文件的存在,通过io
向量机制iovec
,声明两个iovec
,第一个指向m_write_buf
,第二个指向mmap
的地址m_file_address
;一种是请求出错,这时候只申请一个iovec
,指向m_write_buf
。
- iovec是一个结构体,里面有两个元素,指针成员iov_base指向一个缓冲区,这个缓冲区是存放的是writev将要发送的数据。
- 成员iov_len表示实际写入的长度
bool http_conn::process_write(HTTP_CODE ret){
switch (ret) {
case INTERNAL_ERROR://内部错误,500
{
add_status_line(500, error_500_title);//状态行
add_headers(strlen(error_500_form));//消息报头
if (!add_content(error_500_form))
return false;
break;
}
case BAD_REQUEST://报文语法有误,404
{
add_status_line(404, error_404_title);
add_headers(strlen(error_404_form));
if (!add_content(error_404_form))
return false;
break;
}
case FORBIDDEN_REQUEST://资源没有访问权限,403
{
add_status_line(403, error_403_title);
add_headers(strlen(error_403_form));
if (!add_content(error_403_form))
return false;
break;
}
case FILE_REQUEST://文件存在,200
{
add_status_line(200, ok_200_title);
if (m_file_stat.st_size != 0){//如果请求的资源存在
add_headers(m_file_stat.st_size);
m_iv[0].iov_base = m_write_buf;//第一个iovec指针指向响应报文缓冲区,长度指向m_write_idx
m_iv[0].iov_len = m_write_idx;
m_iv[1].iov_base = m_file_address;//第二个iovec指针指向mmap返回的文件指针,长度指向文件大小
m_iv[1].iov_len = m_file_stat.st_size;
m_iv_count = 2;
bytes_to_send = m_write_idx + m_file_stat.st_size;//发送的全部数据为响应报文头部信息和文件大小
return true;
}
else //如果请求的资源大小为0,则返回空白html文件
{
const char *ok_string = "<html><body></body></html>";
add_headers(strlen(ok_string));
if (!add_content(ok_string))
return false;
}
}
default:
return false;
}
//除FILE_REQUEST状态外,其余状态只申请一个iovec,指向响应报文缓冲区
m_iv[0].iov_base = m_write_buf;
m_iv[0].iov_len = m_write_idx;
m_iv_count = 1;
bytes_to_send = m_write_idx;
return true;
}
6.3.3 http_conn::write(可更新sendfile)
服务器子线程调用process_write
完成响应报文,随后注册epollout
事件。服务器主线程检测写事件,并调用http_conn::write
函数将响应报文发送给浏览器端。
该函数具体逻辑如下:
在生成响应报文时初始化byte_to_send,包括头部信息和文件数据大小。通过writev函数循环发送响应报文数据,根据返回值更新byte_have_send和iovec结构体的指针和长度,并判断响应报文整体是否发送成功。
若writev单次发送成功,更新byte_to_send和byte_have_send的大小,若响应报文整体发送成功,则取消mmap映射,并判断是否是长连接.
- 长连接重置http类实例,注册读事件,不关闭连接,
- 短连接直接关闭连接
- 长连接重置http类实例,注册读事件,不关闭连接,
若writev单次发送不成功,判断是否是写缓冲区满了。
- 若不是因为缓冲区满了而失败,取消mmap映射,关闭连接
- 若eagain则满了,更新iovec结构体的指针和长度,并注册写事件,等待下一次写事件触发(当写缓冲区从不可写变为可写,触发epollout),因此在此期间无法立即接收到同一用户的下一请求,但可以保证连接的完整性。
- 若不是因为缓冲区满了而失败,取消mmap映射,关闭连接
bool http_conn::write(){
int temp = 0;
//若要发送的数据长度为0
//表示响应报文为空,一般不会出现这种情况
if (bytes_to_send == 0) {
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);
init();
return true;
}
while (1){
//将响应报文的状态行、消息头、空行和响应正文发送给浏览器端
temp = writev(m_sockfd, m_iv, m_iv_count);
if (temp < 0){//发送失败
if (errno == EAGAIN){ //判断缓冲区是否满了
modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode);
return true;
}
unmap();//如果发送失败,但不是缓冲区问题,取消映射
return false;
}
bytes_have_send += temp; //更新已发送字节数
bytes_to_send -= temp; //更新剩余发送字节数
if (bytes_have_send >= m_iv[0].iov_len){//第一个iovec头部信息的数据已发送完,发送第二个iovec数据
m_iv[0].iov_len = 0;
m_iv[1].iov_base = m_file_address + (bytes_have_send - m_write_idx);
m_iv[1].iov_len = bytes_to_send;
}
else {//继续发送第一个iovec头部信息的数据
m_iv[0].iov_base = m_write_buf + bytes_have_send;
m_iv[0].iov_len = m_iv[0].iov_len - bytes_have_send;
}
if (bytes_to_send <= 0){//判断条件,数据已全部发送完
unmap();
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);//在epoll树上重置EPOLLONESHOT事件
if (m_linger){//浏览器的请求为长连接
init(); //重新初始化HTTP对象
return true;
}
else{
return false;
}
}
}
}
07 定时器处理非活动连接(上)
7.1 基础知识
非活跃
,是指客户端(这里是浏览器)与服务器端建立连接后,长时间不交换数据,一直占用服务器端的文件描述符,导致连接资源的浪费。定时事件
,是指固定一段时间之后触发某段代码,由该段代码处理一个事件,如从内核事件表删除事件,并关闭文件描述符,释放连接资源。定时器
,是指利用结构体或其他形式,将多种定时事件进行封装起来。具体的,这里只涉及一种定时事件,即定期检测非活跃连接,这里将该定时事件与连接资源封装为一个结构体定时器。定时器容器
,是指使用某种容器类数据结构,将上述多个定时器组合起来,便于对定时事件统一管理。具体的,项目中使用升序链表将所有定时器串联组织起来。
7.2 整体概述
本项目中,服务器主循环为每一个连接创建一个定时器,并对每个连接进行定时。另外,利用升序时间链表容器将所有定时器串联起来,若主循环接收到定时通知,则在链表中依次执行定时任务。
Linux
下提供了三种定时的方法:
- socket选项
SO_RECVTIMEO
和SO_SNDTIMEO
SIGALRM
信号- I/O复用系统调用的超时参数
三种方法没有一劳永逸的应用场景,也没有绝对的优劣。由于项目中使用的是SIGALRM
信号,这里仅对其进行介绍,另外两种方法可以查阅游双的Linux高性能服务器编程 第11章 定时器
。
具体的,利用alarm
函数周期性地触发SIGALRM
信号,信号处理函数利用管道通知主循环,主循环接收到该信号后对升序链表上所有定时器进行处理,若该段时间内没有交换数据,则将该连接关闭,释放所占用的资源。
从上面的简要描述中,可以看出定时器处理非活动连接模块,主要分为两部分,其一为定时方法与信号通知流程,其二为定时器及其容器设计与定时任务的处理。
7.3 本文内容
本篇将介绍定时方法与信号通知流程,具体的涉及到基础API、信号通知流程和代码实现。
基础API,描述sigaction
结构体、sigaction
函数、sigfillset
函数、SIGALRM
信号、SIGTERM
信号、alarm
函数、socketpair
函数、send
函数。
信号通知流程,介绍统一事件源和信号处理机制。
代码实现,结合代码对信号处理函数的设计与使用进行详解。
7.4 基础API
为了更好的源码阅读体验,这里提前对代码中使用的一些API进行简要介绍,更丰富的用法可以自行查阅资料。
7.4.1 sigaction结构体
struct sigaction {
void (*sa_handler)(int);
void (*sa_sigaction)(int, siginfo_t *, void *);
sigset_t sa_mask;
int sa_flags;
void (*sa_restorer)(void);
}
sa_handler是一个函数指针,指向信号处理函数
sa_sigaction同样是信号处理函数,有三个参数,可以获得关于信号更详细的信息
sa_mask用来指定在信号处理函数执行期间需要被屏蔽的信号
sa_flags用于指定信号处理的行为
- SA_RESTART,使被信号打断的系统调用自动重新发起
- SA_NOCLDSTOP,使父进程在它的子进程暂停或继续运行时不会收到 SIGCHLD 信号
- SA_NOCLDWAIT,使父进程在它的子进程退出时不会收到 SIGCHLD 信号,这时子进程如果退出也不会成为僵尸进程
- SA_NODEFER,使对信号的屏蔽无效,即在信号处理函数执行期间仍能发出这个信号
- SA_RESETHAND,信号处理之后重新设置为默认的处理方式
- SA_SIGINFO,使用 sa_sigaction 成员而不是 sa_handler 作为信号处理函数
- SA_RESTART,使被信号打断的系统调用自动重新发起
sa_restorer一般不使用
7.4.2 sigaction函数
#include <signal.h>
int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);
- signum表示操作的信号。
- act表示对信号设置新的处理方式。
- oldact表示信号原来的处理方式。一般不使用,传递NULL
- 返回值,0 表示成功,-1 表示有错误发生。
7.4.3 sigfillset函数
#include <signal.h>
int sigfillset(sigset_t *set);
用来将参数set信号集初始化,然后把所有的信号加入到此信号集里。
7.4.4 SIGALRM、SIGTERM信号
#define SIGALRM 14 //由alarm系统调用产生timer时钟信号
#define SIGTERM 15 //终端发送的终止信号
7.4.5 alarm函数
#include <unistd.h>;
unsigned int alarm(unsigned int seconds);
设置信号传送闹钟,即用来设置信号SIGALRM在经过参数seconds秒数后发送给目前的进程。如果未设置信号SIGALRM的处理函数,那么alarm()默认处理终止进程.
7.4.6 socketpair函数
在linux下,使用socketpair函数能够创建一对套接字进行通信,项目中使用管道通信。
#include <sys/types.h>
#include <sys/socket.h>
int socketpair(int domain, int type, int protocol, int sv[2]);
- domain表示协议族,PF_UNIX或者AF_UNIX
- type表示协议,可以是SOCK_STREAM或者SOCK_DGRAM,SOCK_STREAM基于TCP,SOCK_DGRAM基于UDP
- protocol表示类型,只能为0
- sv[2]表示套节字柄对,该两个句柄作用相同,均能进行读写双向操作
- 返回结果, 0为创建成功,-1为创建失败
7.4.7 send函数
#include <sys/types.h>
#include <sys/socket.h>
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
当套接字发送缓冲区变满时,send通常会阻塞,除非套接字设置为非阻塞模式,当缓冲区变满时,返回EAGAIN或者EWOULDBLOCK错误,此时可以调用select函数来监视何时可以发送数据。
7.5 信号通知流程
Linux下的信号采用的异步处理机制,信号处理函数和当前进程是两条不同的执行路线。具体的,当进程收到信号时,操作系统会中断进程当前的正常流程,转而进入信号处理函数执行操作,完成后再返回中断的地方继续执行。
为避免信号竞态现象发生,信号处理期间系统不会再次触发它。所以,为确保该信号不被屏蔽太久,信号处理函数需要尽可能快地执行完毕。
一般的信号处理函数需要处理该信号对应的逻辑,当该逻辑比较复杂时,信号处理函数执行时间过长,会导致信号屏蔽太久。
这里的解决方案是,信号处理函数仅仅发送信号通知程序主循环,将信号对应的处理逻辑放在程序主循环中,由主循环执行信号对应的逻辑代码。
7.5.1 统一事件源
统一事件源,是指将信号事件与其他事件一样被处理。
具体的,信号处理函数使用管道将信号传递给主循环,信号处理函数往管道的写端写入信号值,主循环则从管道的读端读出信号值,使用I/O复用系统调用来监听管道读端的可读事件,这样信号事件与其他文件描述符都可以通过epoll来监测,从而实现统一处理。
7.5.2 信号处理机制
每个进程之中,都有存着一个表,里面存着每种信号所代表的含义,内核通过设置表项中每一个位来标识对应的信号类型。
信号的接收
- 接收信号的任务是由内核代理的,当内核接收到信号后,会将其放到对应进程的信号队列中,同时向进程发送一个中断,使其陷入内核态。注意,此时信号还只是在队列中,对进程来说暂时是不知道有信号到来的。
信号的检测
- 进程从内核态返回到用户态前进行信号检测
- 进程在内核态中,从睡眠状态被唤醒的时候进行信号检测
- 进程陷入内核态后,有两种场景会对信号进行检测:
- 当发现有新信号时,便会进入下一步,信号的处理。
- 进程从内核态返回到用户态前进行信号检测
信号的处理
- ( 内核 )信号处理函数是运行在用户态的,调用处理函数前,内核会将当前内核栈的内容备份拷贝到用户栈上,并且修改指令寄存器(eip)将其指向信号处理函数。
- ( 用户 )接下来进程返回到用户态中,执行相应的信号处理函数。
- ( 内核 )信号处理函数执行完成后,还需要返回内核态,检查是否还有其它信号未处理。
- ( 用户 )如果所有信号都处理完成,就会将内核栈恢复(从用户栈的备份拷贝回来),同时恢复指令寄存器(eip)将其指向中断前的运行位置,最后回到用户态继续执行进程。
- ( 内核 )信号处理函数是运行在用户态的,调用处理函数前,内核会将当前内核栈的内容备份拷贝到用户栈上,并且修改指令寄存器(eip)将其指向信号处理函数。
至此,一个完整的信号处理流程便结束了,如果同时有多个信号到达,上面的处理流程会在第2步和第3步骤间重复进行。
7.6 代码分析
7.6.1 信号处理函数
自定义信号处理函数,创建sigaction结构体变量,设置信号函数。
//信号处理函数
void Utils::sig_handler(int sig){
//为保证函数的可重入性,保留原来的errno
//可重入性表示中断后再次进入该函数,环境变量与之前相同,不会丢失数据
int save_errno = errno;
int msg = sig;
//将信号值从管道写端写入,传输字符类型,而非整型
send(u_pipefd[1], (char *)&msg, 1, 0);
errno = save_errno; //将原来的errno赋值为当前的errno
}
信号处理函数中仅仅通过管道发送信号值,不处理信号对应的逻辑,缩短异步执行时间,减少对主程序的影响。
//设置信号函数
void Utils::addsig(int sig, void(handler)(int), bool restart){
struct sigaction sa; //创建sigaction结构体变量
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler; //信号处理函数中仅仅发送信号值,不做对应逻辑处理
if (restart)
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);//将所有信号添加到信号集中
assert(sigaction(sig, &sa, NULL) != -1);//执行sigaction函数
}
项目中设置信号函数,仅关注SIGTERM和SIGALRM两个信号。
7.6.2 信号通知逻辑
创建管道,其中管道写端写入信号值,管道读端通过I/O复用系统监测读事件
设置信号处理函数SIGALRM(时间到了触发)和SIGTERM(kill会触发,Ctrl+C)
- 通过struct sigaction结构体和sigaction函数注册信号捕捉函数
- 在结构体的handler参数设置信号处理函数,具体的,从管道写端写入信号的名字
- 通过struct sigaction结构体和sigaction函数注册信号捕捉函数
利用I/O复用系统监听管道读端文件描述符的可读事件
信息值传递给主循环,主循环再根据接收到的信号值执行目标信号对应的逻辑代码
utils.init(TIMESLOT); //定时器开关初始化
//epoll创建内核事件表
epoll_event events[MAX_EVENT_NUMBER];
m_epollfd = epoll_create(5);
assert(m_epollfd != -1);
utils.addfd(m_epollfd, m_listenfd, false, m_LISTENTrigmode);
http_conn::m_epollfd = m_epollfd;
//创建管道套接字
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_pipefd);
assert(ret != -1);
//设置管道写端为非阻塞,为啥?管道默认是阻塞的:如果管道中没有数据,read阻塞,如果管道满了,write阻塞
utils.setnonblocking(m_pipefd[1]);
//设置管道读端为ET非阻塞
utils.addfd(m_epollfd, m_pipefd[0], false, 0);
//传递给主循环的信号值,这里只关注SIGALRM (定时器)、SIGTERM (ctrl+c)和SIGPIPE(忽略管道错误)
utils.addsig(SIGPIPE, SIG_IGN);
utils.addsig(SIGALRM, utils.sig_handler, false);
utils.addsig(SIGTERM, utils.sig_handler, false);
alarm(TIMESLOT); //每隔TIMESLOT时间触发SIGALRM信号
//工具类,信号和描述符基础操作
Utils::u_pipefd = m_pipefd;
Utils::u_epollfd = m_epollfd;
//服务器接收http请求,以及处理的流程函数
void WebServer::eventLoop(){
bool timeout = false;//超时标志
bool stop_server = false;//循环条件
while (!stop_server){
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1); //等待所监控文件描述符上有事件的产生
if (number < 0 && errno != EINTR){
LOG_ERROR("%s", "epoll failure");
break;
}
for (int i = 0; i < number; i++){//轮询文件描述符
int sockfd = events[i].data.fd;
//处理新到的客户连接
if (sockfd == m_listenfd){
bool flag = dealclinetdata();
if (false == flag)
continue;
}
else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
//服务器端关闭连接,移除对应的定时器
util_timer *timer = users_timer[sockfd].timer;
deal_timer(timer, sockfd);
}
//处理信号 //管道读端对应文件描述符发生读事件
else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN)){
bool flag = dealwithsignal(timeout, stop_server);//在下面
if (false == flag)
LOG_ERROR("%s", "dealclientdata failure");
}
//处理客户连接上接收到的数据
else if (events[i].events & EPOLLIN){
dealwithread(sockfd);
}
else if (events[i].events & EPOLLOUT){
dealwithwrite(sockfd);
}
}
if (timeout){
utils.timer_handler();
LOG_INFO("%s", "timer tick");
timeout = false;
}
}
}
bool WebServer::dealwithsignal(bool &timeout, bool &stop_server){
int ret = 0;
int sig;
char signals[1024];
//从管道读端读出信号值,成功返回字节数,失败返回-1
//正常情况下,这里的ret返回值总是1,只有14和15两个ASCII码对应的字符
ret = recv(m_pipefd[0], signals, sizeof(signals), 0);
if (ret == -1 || ret == 0){//处理异常
return false;
}
else{
for (int i = 0; i < ret; ++i){
switch (signals[i]){ //这里面是字符
case SIGALRM: //这里是整型(14号信号)
{
timeout = true;
break;
}
case SIGTERM: //这里是整型(15号信号 Ctrl+c)
{
stop_server = true;
break;
}
}
}
}
return true;
}
为什么管道写端要非阻塞?
send是将信息发送给套接字缓冲区,如果缓冲区满了,则会阻塞,这时候会进一步增加信号处理函数的执行时间,为此,将其修改为非阻塞。
没有对非阻塞返回值处理,如果阻塞是不是意味着这一次定时事件失效了?
是的,但定时事件是非必须立即处理的事件,可以允许这样的情况发生。
管道传递的是什么类型?switch-case的变量冲突?
信号本身是整型数值,管道中传递的是ASCII码表中整型数值对应的字符。
switch的变量一般为字符或整型,当switch的变量为字符时,case中可以是字符,也可以是字符对应的ASCII码。
08 定时器处理非活动连接(下)
定时器处理非活动连接模块,主要分为两部分,其一为定时方法与信号通知流程,其二为定时器及其容器设计、定时任务的处理。
本篇对第二部分进行介绍,具体的涉及到定时器设计、容器设计、定时任务处理函数和使用定时器。
定时器设计
,将连接资源和定时事件等封装起来,具体包括连接资源、超时时间和回调函数,这里的回调函数指向定时事件。定时器容器设计
,将多个定时器串联组织起来统一处理,具体包括升序链表设计。定时任务处理函数
,该函数封装在容器类中,具体的,函数遍历升序链表容器,根据超时时间,处理对应的定时器。代码分析-使用定时器
,通过代码分析,如何在项目中使用定时器。
8.1 定时器设计
项目中将连接资源、定时事件和超时时间封装为定时器类,具体的,
- 连接资源包括客户端套接字地址、文件描述符和定时器
- 定时事件为回调函数,将其封装起来由用户自定义,这里是删除非活动socket上的注册事件,并关闭
- 定时器超时时间 = 浏览器和服务器连接时刻 + 固定时间(TIMESLOT),可以看出,定时器使用绝对时间作为超时值,这里alarm设置为5秒,连接超时为15秒。
//连接资源结构体成员需要用到定时器类
//需要前向声明
class util_timer;
//连接资源
struct client_data{
sockaddr_in address; //客户端socket地址
int sockfd; //客户端socket地址
util_timer *timer; //定时器
};
//定时器类
class util_timer{
public:
util_timer() : prev(NULL), next(NULL) {} //构造函数,做双向链表
public:
time_t expire; //超时时间
void (* cb_func)(client_data *); //回调函数
client_data *user_data; //连接资源
util_timer *prev; //前向定时器
util_timer *next; //后继定时器
};
定时事件,具体的,从内核事件表删除事件,关闭文件描述符,释放连接资源。
//定时器回调函数
void cb_func(client_data *user_data){
//删除非活动连接在socket上的注册事件
epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
assert(user_data);
//关闭文件描述符
close(user_data->sockfd);
//关闭文件描述符
http_conn::m_user_count--;
}
8.2 定时器容器设计
项目中的定时器容器为带头尾结点的升序双向链表,具体的为每个连接创建一个定时器,将其添加到链表中,并按照超时时间升序排列。执行定时任务时,将到期的定时器从链表中删除。
从实现上看,主要涉及双向链表的插入,删除操作,**其中添加定时器的事件复杂度是O(n),删除定时器的事件复杂度是O(1)**。
升序双向链表主要逻辑如下,具体的,
创建头尾节点,其中头尾节点没有意义,仅仅统一方便调整
add_timer函数,将目标定时器添加到链表中,添加时按照升序添加
- 若当前链表中只有头尾节点,直接插入
- 否则,将定时器按升序插入
- 若当前链表中只有头尾节点,直接插入
adjust_timer函数,当定时任务发生变化,调整对应定时器在链表中的位置
- 客户端在设定时间内有数据收发,则当前时刻对该定时器重新设定时间,这里只是往后延长超时时间
- 被调整的目标定时器在尾部,或定时器新的超时值仍然小于下一个定时器的超时,不用调整
- 否则先将定时器从链表取出,重新插入链表
- 客户端在设定时间内有数据收发,则当前时刻对该定时器重新设定时间,这里只是往后延长超时时间
del_timer函数将超时的定时器从链表中删除
- 常规双向链表删除结点
//定时器容器类
class sort_timer_lst{
public:
sort_timer_lst():head(NULL),tail(NULL){};
//常规销毁链表
~sort_timer_lst();
//添加定时器,内部调用私有成员add_timer
void add_timer(util_timer *timer);
//调整定时器,任务发生变化时,调整定时器在链表中的位置
void adjust_timer(util_timer *timer);
//删除定时器
void del_timer(util_timer *timer);
void tick();
private:
//私有成员,被公有成员add_timer和adjust_time调用
//主要用于调整链表内部结点
void add_timer(util_timer *timer, util_timer *lst_head);
//头尾结点
util_timer *head;
util_timer *tail;
};
//常规销毁链表
sort_timer_lst::~sort_timer_lst(){
util_timer *tmp = head;
while (tmp){
head = tmp->next;
delete tmp;
tmp = head;
}
}
//添加定时器,内部调用私有成员add_timer
void sort_timer_lst::add_timer(util_timer *timer){ //添加一个节点
if (!timer){ //添加的节点为空
return;
}
if (!head){ //头节点为空,容器内没有节点,让头和尾都指向新加入的这个节点
head = tail = timer;
return;
}
if (timer->expire < head->expire){ //添加的节点截止时间还在头节点的前面,就把它放在头结点位置
timer->next = head;
head->prev = timer;
head = timer;
return;
}
add_timer(timer, head); //函数重写
}
//调整定时器,任务发生变化时,调整定时器在链表中的位置
void sort_timer_lst::adjust_timer(util_timer *timer){
if (!timer){
return;
}
util_timer *tmp = timer->next;
//被调整的定时器在链表尾部
//定时器超时值仍然小于下一个定时器超时值,不调整
if (!tmp || (timer->expire < tmp->expire)){
return;
}
//被调整定时器是链表头结点,将定时器取出,重新插入
if (timer == head){
head = head->next;
head->prev = NULL;
timer->next = NULL;
add_timer(timer, head);
}
else{ //被调整定时器在内部,将定时器取出,重新插入
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
add_timer(timer, timer->next);
}
}
//删除定时器
void sort_timer_lst::del_timer(util_timer *timer){
if (!timer){
return;
}
//链表中只有一个定时器,需要删除该定时器
if ((timer == head) && (timer == tail)){
delete timer;
head = NULL;
tail = NULL;
return;
}
if (timer == head){//被删除的定时器为头结点
head = head->next;
head->prev = NULL;
delete timer;
return;
}
if (timer == tail){//被删除的定时器为尾结点
tail = tail->prev;
tail->next = NULL;
delete timer;
return;
}
//被删除的定时器在链表内部,常规链表结点删除
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}
//私有成员,被公有成员add_timer和adjust_time调用 //主要用于调整链表内部结点
void sort_timer_lst::add_timer(util_timer *timer, util_timer *lst_head){
util_timer *prev = lst_head;
util_timer *tmp = prev->next;
//遍历当前结点之后的链表,按照超时时间找到目标定时器对应的位置,常规双向链表插入
while (tmp){
if (timer->expire < tmp->expire){
prev->next = timer;
timer->next = tmp;
tmp->prev = timer;
timer->prev = prev;
break;
}
prev = tmp;
tmp = tmp->next;
}
if (!tmp){//遍历完发现,目标定时器需要放到尾结点处
prev->next = timer;
timer->prev = prev;
timer->next = NULL;
tail = timer;
}
}
8.3 定时任务处理函数
使用统一事件源,SIGALRM
信号每次被触发,主循环中调用一次定时任务处理函数,处理链表容器中到期的定时器。
具体的逻辑如下,
- 遍历定时器升序链表容器,从头结点开始依次处理每个定时器,直到遇到尚未到期的定时器
- 若当前时间小于定时器超时时间,跳出循环,即未找到到期的定时器
- 若当前时间大于定时器超时时间,即找到了到期的定时器,执行回调函数,然后将它从链表中删除,然后继续遍历
//定时任务处理函数
void sort_timer_lst::tick(){
if (!head){
return;
}
//获取当前时间
time_t cur = time(NULL);
util_timer *tmp = head;
//遍历定时器链表
while (tmp){
//链表容器为升序排列
//当前时间小于定时器的超时时间,后面的定时器也没有到期
if (cur < tmp->expire){
break;
}
//当前定时器到期,则调用回调函数,执行定时事件
tmp->cb_func(tmp->user_data);
//将处理后的定时器从链表容器中删除,并重置头结点
head = tmp->next;
if (head){
head->prev = NULL;
}
delete tmp;
tmp = head;
}
}
8.4 代码分析-如何使用定时器
服务器首先创建定时器容器链表,然后用统一事件源将异常事件,读写事件和信号事件统一处理,根据不同事件的对应逻辑使用定时器。
具体的,
- 浏览器与服务器连接时,创建该连接对应的定时器,并将该定时器添加到链表上
- 处理异常事件时,执行定时事件,服务器关闭连接,从链表上移除对应定时器
- 处理定时信号时,将定时标志设置为true
- 处理读事件时,若某连接上发生读事件,将对应定时器向后移动,否则,执行定时事件
- 处理写事件时,若服务器通过某连接给浏览器发送数据,将对应定时器向后移动,否则,执行定时事件
//定时处理任务,重新定时以不断触发SIGALRM信号
void Utils::timer_handler(){
m_timer_lst.tick();
alarm(m_TIMESLOT);
}
sort_timer_lst m_timer_lst; //创建定时器容器链表 Utils类中
users_timer = new client_data[MAX_FD];//定时器 //创建连接资源数组 WebServer::WebServer()中构造函数
bool timeout = false;//超时标志 WebServer::eventLoop()实现函数中
alarm(TIMESLOT); //每隔TIMESLOT时间触发SIGALRM信号 WebServer::eventListen()实现函数中
Utils::u_pipefd = m_pipefd;//工具类,信号和描述符基础操作
Utils::u_epollfd = m_epollfd;
//服务器接收http请求,以及处理的流程函数
void WebServer::eventLoop(){
bool timeout = false;//超时标志
bool stop_server = false;//循环条件
while (!stop_server){
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1); //等待所监控文件描述符上有事件的产生
if (number < 0 && errno != EINTR){
LOG_ERROR("%s", "epoll failure");
break;
}
for (int i = 0; i < number; i++){//轮询文件描述符
int sockfd = events[i].data.fd;
//处理新到的客户连接
if (sockfd == m_listenfd){
bool flag = dealclinetdata();
if (false == flag)
continue;
}
else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
//服务器端关闭连接,移除对应的定时器
util_timer *timer = users_timer[sockfd].timer;
deal_timer(timer, sockfd);
}
//处理信号 //管道读端对应文件描述符发生读事件
else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN)){
bool flag = dealwithsignal(timeout, stop_server); //接收到SIGALRM信号,timeout设置为True
if (false == flag)
LOG_ERROR("%s", "dealclientdata failure");
}
//处理客户连接上接收到的数据
else if (events[i].events & EPOLLIN){
dealwithread(sockfd);
}
else if (events[i].events & EPOLLOUT){
dealwithwrite(sockfd);
}
}
//处理定时器为非必须事件,收到信号并不是立马处理 //完成读写事件后,再进行处理
if (timeout){
utils.timer_handler();
LOG_INFO("%s", "timer tick");
timeout = false;
}
}
}
void WebServer::timer(int connfd, struct sockaddr_in client_address){
users[connfd].init(connfd, client_address, m_root, m_CONNTrigmode, m_close_log, m_user, m_passWord, m_databaseName);
//初始化client_data数据
//创建定时器,设置回调函数和超时时间,绑定用户数据,将定时器添加到链表中
users_timer[connfd].address = client_address;//初始化该连接对应的连接资源
users_timer[connfd].sockfd = connfd;
util_timer *timer = new util_timer; //创建定时器临时变量
timer->user_data = &users_timer[connfd];//设置定时器对应的连接资源
timer->cb_func = cb_func; //设置回调函数
time_t cur = time(NULL); //记录当前时间
timer->expire = cur + 3 * TIMESLOT; //设置绝对超时时间
users_timer[connfd].timer = timer; //创建该连接对应的定时器,初始化为前述临时变量
utils.m_timer_lst.add_timer(timer); //将该定时器添加到链表中
}
//若有数据传输,则将定时器往后延迟3个单位
//并对新的定时器在链表上的位置进行调整
void WebServer::adjust_timer(util_timer *timer){
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
utils.m_timer_lst.adjust_timer(timer);
LOG_INFO("%s", "adjust timer once");
}
//删除定时器释放资源
void WebServer::deal_timer(util_timer *timer, int sockfd){
timer->cb_func(&users_timer[sockfd]);
if (timer){
utils.m_timer_lst.del_timer(timer);
}
LOG_INFO("close fd %d", users_timer[sockfd].sockfd);
}
//处理链接请求
bool WebServer::dealclinetdata(){
//初始化客户端连接地址
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
if (0 == m_LISTENTrigmode){
//该连接分配的文件描述符
int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0){
LOG_ERROR("%s:errno is:%d", "accept error", errno);
return false;
}
if (http_conn::m_user_count >= MAX_FD){
utils.show_error(connfd, "Internal server busy");
LOG_ERROR("%s", "Internal server busy");
return false;
}
timer(connfd, client_address);
}
else{
while (1){
int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0){
LOG_ERROR("%s:errno is:%d", "accept error", errno);
break;
}
if (http_conn::m_user_count >= MAX_FD){
utils.show_error(connfd, "Internal server busy");
LOG_ERROR("%s", "Internal server busy");
break;
}
timer(connfd, client_address);
}
return false;
}
return true;
}
bool WebServer::dealwithsignal(bool &timeout, bool &stop_server){
int ret = 0;
int sig;
char signals[1024];
//从管道读端读出信号值,成功返回字节数,失败返回-1
//正常情况下,这里的ret返回值总是1,只有14和15两个ASCII码对应的字符
ret = recv(m_pipefd[0], signals, sizeof(signals), 0);
if (ret == -1 || ret == 0){//处理异常
return false;
}
else{
for (int i = 0; i < ret; ++i){
switch (signals[i]){ //这里面是字符
case SIGALRM: //这里是整型(14号信号)
{
timeout = true;
break;
}
case SIGTERM: //这里是整型(15号信号)
{
stop_server = true;
break;
}
}
}
}
return true;
}
//处理客户连接上接收到的数据
void WebServer::dealwithread(int sockfd){
//创建定时器临时变量,将该连接对应的定时器取出来
util_timer *timer = users_timer[sockfd].timer;
//reactor
if (1 == m_actormodel){
if (timer){//若有数据传输,则将定时器往后延迟3个单位 //对其在链表上的位置进行调整
adjust_timer(timer); //调整定时器
}
//若监测到读事件,将该事件放入请求队列
m_pool->append(users + sockfd, 0);
while (true){
if (1 == users[sockfd].improv){ //是否处理该非活动链接
if (1 == users[sockfd].timer_flag){
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else{ //proactor
if (users[sockfd].read_once()){
LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
//若监测到读事件,将该事件放入请求队列
m_pool->append_p(users + sockfd);
if (timer){
adjust_timer(timer);
}
}
else{//服务器端关闭连接,移除对应的定时器
deal_timer(timer, sockfd); //处理定时器
}
}
}
void WebServer::dealwithwrite(int sockfd){
util_timer *timer = users_timer[sockfd].timer;
//reactor
if (1 == m_actormodel){
if (timer){
adjust_timer(timer);
}
m_pool->append(users + sockfd, 1);
while (true) {
if (1 == users[sockfd].improv){
if (1 == users[sockfd].timer_flag){
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else {
//proactor
if (users[sockfd].write()){
LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
if (timer) {
adjust_timer(timer);
}
}
else {
deal_timer(timer, sockfd);
}
}
}
09 日志系统(上)
9.1 基础知识
- **
日志
**,由服务器自动创建,并记录运行状态,错误信息,访问数据的文件。 - **
同步日志
**,日志写入函数与工作线程串行执行,由于涉及到I/O操作,当单条日志比较大的时候,同步模式会阻塞整个处理流程,服务器所能处理的并发能力将有所下降,尤其是在峰值的时候,写日志可能成为系统的瓶颈。 - **
生产者-消费者模型
**,并发编程中的经典模型。以多线程为例,为了实现线程间数据同步,生产者线程与消费者线程共享一个缓冲区,其中生产者线程往缓冲区中push消息,消费者线程从缓冲区中pop消息。 - **
阻塞队列
**,将生产者-消费者模型进行封装,使用循环数组实现队列,作为两者共享的缓冲区。 - **
异步日志
**,将所写的日志内容先存入阻塞队列,写线程从阻塞队列中取出内容,写入日志。 - **
单例模式
**,最简单也是被问到最多的设计模式之一,保证一个类只创建一个实例,同时提供全局访问的方法。
9.2 整体概述
本项目中,使用单例模式创建日志系统,对服务器运行状态、错误信息和访问数据进行记录,该系统可以实现按天分类,超行分类功能,可以根据实际情况分别使用同步和异步写入两种方式。
其中异步写入方式,将生产者-消费者模型封装为阻塞队列,创建一个写线程,工作线程将要写的内容push进队列,写线程从队列中取出内容,写入日志文件。
日志系统大致可以分成两部分,其一是单例模式与阻塞队列的定义,其二是日志类的定义与使用。
9.3 本文内容
本篇将介绍单例模式与阻塞队列的定义,具体的涉及到单例模式、生产者-消费者模型,阻塞队列的代码实现。
- 单例模式,描述懒汉与饿汉两种单例模式,并结合线程安全进行讨论。
- 生产者-消费者模型,描述条件变量,基于该同步机制实现简单的生产者-消费者模型。
- 代码实现,结合代码对阻塞队列的设计进行详解。
9.4 单例模式
单例模式作为最常用的设计模式之一,保证一个类仅有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。
实现思路:私有化它的构造函数,以防止外界创建单例类的对象;使用类的私有静态指针变量指向类的唯一实例,并用一个公有的静态方法获取该实例。
单例模式有两种实现方法,分别是懒汉和饿汉模式。顾名思义,懒汉模式,即非常懒,不用的时候不去初始化,所以在第一次被使用时才进行初始化;饿汉模式,即迫不及待,在程序运行时立即初始化。
9.4.1 经典的线程安全懒汉模式
单例模式的实现思路如前述所示,其中,经典的线程安全懒汉模式,使用双检测锁模式。
class single{
private:
//私有静态指针变量指向唯一实例
static single *p;
//静态锁,是由于静态函数只能访问静态成员
static pthread_mutex_t lock;
//私有化构造函数
single(){
pthread_mutex_init(&lock,NULL);
}
~single(){}
public:
//公有静态方法获取实例
static single*getinstance();
};
pthread_mutex_t single:: lock;
single* single::p = NULL;
single* single::getinstance(){
if(NULL == p){
pthread_mutex_lock( &lock);
if (NULL == p){
p = new single;
}
pthread_mutex_unlock( &lock);
}
return p;
}
为什么要用双检测,只检测一次不行吗?
如果只检测一次,在每次调用获取实例的方法时,都需要加锁,这将严重影响程序性能。双层检测可以有效避免这种情况,仅在第一次创建单例的时候加锁,其他时候都不再符合NULL == p
的情况,直接返回已创建好的实例。
9.4.2 局部静态变量之线程安全懒汉模式
前面的双检测锁模式,写起来不太优雅,《Effective C++》(Item 04)中的提出另一种更优雅的单例模式实现,使用函数内的局部静态对象,这种方法不用加锁和解锁操作。
class single{
private:
single(){}
~single(){}
public:
static single* getinstance();
};
single* single::getinstance(){
static single obj;
return &obj;
}
这时候有人说了,这种方法不加锁会不会造成线程安全问题?
其实,C++0X以后,要求编译器保证内部静态变量的线程安全性,故C++0x之后该实现是线程安全的,C++0x之前仍需加锁,其中C++0x是C++11标准成为正式标准之前的草案临时名字。
所以,如果使用C++11之前的标准,还是需要加锁,这里同样给出加锁的版本。
class single{
private:
static pthread_mutex_t lock;
single(){
pthread_mutex_init (&lock,NULL);
}
~single()[}
public:
static single* getinstance();
};
pthread_mutex_t single:: lock;
single* single::getinstance(){
pthread_mutex_lock( &lock);
static single obj;
pthread_mutex_unlock(&lock);
return &obj;
}
9.4.3 饿汉模式
饿汉模式不需要用锁,就可以实现线程安全。原因在于,在程序运行时就定义了对象,并对其初始化。之后,不管哪个线程调用成员函数getinstance(),都只不过是返回一个对象的指针而已。所以是线程安全的,不需要在获取实例的成员函数中加锁。
class single{
private:
static single* p;
single(){}
~single(){}
public:
static single* getinstance();
};
//加载时实例化
single* single::p = new single();
single* single::getinstance(){
return p;
}
//测试方法
int main(){
single *p1 = single::getinstance();
single *p2 = single::getinstance();
if (p1 == p2)
cout << "same" << endl;
system("pause");
return 0;
}
饿汉模式虽好,但其存在隐藏的问题,在于非静态对象(函数外的static对象)在不同编译单元中的初始化顺序是未定义的。如果在初始化完成之前调用 getInstance() 方法会返回一个未定义的实例。
9.5 条件变量与生产者-消费者模型
9.5.1 条件变量API与陷阱
条件变量提供了一种线程间的通知机制,当某个共享数据达到某个值时,唤醒等待这个共享数据的线程。
基础API
pthread_cond_init
函数,用于初始化条件变量pthread_cond_destory
函数,销毁条件变量pthread_cond_broadcast
函数,以广播的方式唤醒所有等待目标条件变量的线程pthread_cond_wait
函数,用于等待目标条件变量。该函数调用时需要传入 mutex参数(加锁的互斥锁) ,函数执行时,先把调用线程放入条件变量的请求队列,然后将互斥锁mutex解锁,当函数成功返回为0时,表示重新抢到了互斥锁,互斥锁会再次被锁上, 也就是说函数内部会有一次解锁和加锁操作.
使用pthread_cond_wait方式如下:
pthread _mutex_lock( &mutex)
while(线程执行的条件是否成立){
pthread_cond_wait( &cond,&mutex);
}
pthread_mutex_unlock(&mutex);
pthread_cond_wait
执行后的内部操作分为以下几步:
- 将线程放在条件变量的请求队列后,内部解锁
- 线程等待被
pthread_cond_broadcast
信号唤醒或者pthread_cond_signal
信号唤醒,唤醒后去竞争锁 - 若竞争到互斥锁,内部再次加锁
陷阱一
使用前要加锁,为什么要加锁?
多线程访问,为了避免资源竞争,所以要加锁,使得每个线程互斥的访问公有资源。
pthread_cond_wait内部为什么要解锁?
如果while或者if判断的时候,满足执行条件,线程便会调用pthread_cond_wait
阻塞自己,此时它还在持有锁,如果他不解锁,那么其他线程将会无法访问公有资源。
具体到pthread_cond_wait
的内部实现,当pthread_cond_wait
被调用线程阻塞的时候,pthread_cond_wait会自动释放互斥锁。
为什么要把调用线程放入条件变量的请求队列后再解锁?
线程是并发执行的,如果在把调用线程A放在等待队列之前,就释放了互斥锁,这就意味着其他线程比如线程B可以获得互斥锁去访问公有资源,这时候线程A所等待的条件改变了,但是它没有被放在等待队列上,导致A忽略了等待条件被满足的信号。
倘若在线程A调用pthread_cond_wait开始,到把A放在等待队列的过程中,都持有互斥锁,其他线程无法得到互斥锁,就不能改变公有资源。
为什么最后还要加锁?
将线程放在条件变量的请求队列后,将其解锁,此时等待被唤醒,若成功竞争到互斥锁,再次加锁。
陷阱二
为什么判断线程执行的条件用while而不是if?
一般来说,在多线程资源竞争的时候,在一个使用资源的线程里面(消费者)判断资源是否可用,不可用,便调用pthread_cond_wait,在另一个线程里面(生产者)如果判断资源可用的话,则调用pthread_cond_signal发送一个资源可用信号。
在wait成功之后,资源就一定可以被使用么?答案是否定的,如果同时有两个或者两个以上的线程正在等待此资源,wait返回后,资源可能已经被使用了。
再具体点,有可能多个线程都在等待这个资源可用的信号,信号发出后只有一个资源可用,但是有A,B两个线程都在等待,B比较速度快,获得互斥锁,然后加锁,消耗资源,然后解锁,之后A获得互斥锁,但A回去发现资源已经被使用了,它便有两个选择,一个是去访问不存在的资源,另一个就是继续等待,那么继续等待下去的条件就是使用while,要不然使用if的话pthread_cond_wait
返回后,就会顺序执行下去。
所以,在这种情况下,应该使用while而不是if:
while(resource == FALSE)
pthread_cond_wait(&cond, &mutex);
如果只有一个消费者,那么使用if是可以的。
9.5.2 生产者-消费者模型
这里摘抄《Unix 环境高级编程》中第11章线程关于pthread_cond_wait的介绍中有一个生产者-消费者的例子P311,其中,process_msg相当于消费者,enqueue_msg相当于生产者,struct msg* workq作为缓冲队列。
生产者和消费者是互斥关系,两者对缓冲区访问互斥,同时生产者和消费者又是一个相互协作与同步的关系,只有生产者生产之后,消费者才能消费。
#include <pthread.h>
struct msg {
struct msg *m_next;
/*value. . .*/
};
struct msg* workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
void process_msg() {
struct msg* mp;
for (;;){
pthread_mutex_lock(&qlock);//这里需要用while,而不是if
while (workq == NULL){
pthread_cond_wait(&qread,&qlock);
}
mq= workq;
workq = mp->m_next;
pthread_mutex_unlock(&qlock);
/*now process the message mp */
}
void enqueue_msg(struct msg* mp) {
pthread_mutex_lock( &qlock);
mp->m_next = workq;
workq = mp;
pthread_mutex_unlock(&qlock);
/**此时另外一个线程在signal之前,执行了process_msg,刚好把mp元素拿走*/
pthread_ cond_signal( &qready);
/**此时执行signal,在pthread_cond_wait等待的线程被唤醒,
但是mp元素已经被另外一个线程拿走,所以,workq还是NULL ,因此需要继续等待*/
}
9.6 阻塞队列代码分析
阻塞队列类中封装了生产者-消费者模型,其中push成员是生产者,pop成员是消费者。
阻塞队列中,使用了循环数组实现了队列,作为两者共享缓冲区,当然了,队列也可以使用STL中的queue。
9.6.1 自定义队列
当队列为空时,从队列中获取元素的线程将会被挂起;当队列是满时,往队列里添加元素的线程将会挂起。
阻塞队列类中,有些代码比较简单,这里仅对push和pop成员进行详解。
/*************************************************************
*循环数组实现的阻塞队列,m_back = (m_back + 1) % m_max_size;
*线程安全,每个操作前都要先加互斥锁,操作完后,再解锁
**************************************************************/
#ifndef BLOCK_QUEUE_H
#define BLOCK_QUEUE_H
#include <iostream>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include "../lock/locker.h"
using namespace std;
template <class T>
class block_queue{
public:
//初始化私有成员
block_queue(int max_size = 1000){
if (max_size <= 0)
exit(-1);
//构造函数创建循环数组
m_max_size = max_size;
m_array = new T[max_size];
m_size = 0;
m_front = -1;
m_back = -1;
}
void clear(){ //清除队列
m_mutex.lock();
m_size = 0;
m_front = -1;
m_back = -1;
m_mutex.unlock();
}
~block_queue(){ //析构
m_mutex.lock();
if (m_array != NULL)
delete [] m_array;
m_mutex.unlock();
}
//判断队列是否满了
bool full() {
m_mutex.lock();
if (m_size >= m_max_size){
m_mutex.unlock();
return true;
}
m_mutex.unlock();
return false;
}
//判断队列是否为空
bool empty() {
m_mutex.lock();
if (0 == m_size){
m_mutex.unlock();
return true;
}
m_mutex.unlock();
return false;
}
//返回队首元素
bool front(T &value) {
m_mutex.lock();
if (0 == m_size){
m_mutex.unlock();
return false;
}
value = m_array[m_front];
m_mutex.unlock();
return true;
}
//返回队尾元素
bool back(T &value) {
m_mutex.lock();
if (0 == m_size){
m_mutex.unlock();
return false;
}
value = m_array[m_back];
m_mutex.unlock();
return true;
}
//返回队列大小
int size() {
int tmp = 0;
m_mutex.lock();
tmp = m_size;
m_mutex.unlock();
return tmp;
}
//返回队列容量
int max_size(){
int tmp = 0;
m_mutex.lock();
tmp = m_max_size;
m_mutex.unlock();
return tmp;
}
//往队列添加元素,需要将所有使用队列的线程先唤醒
//当有元素push进队列,相当于生产者生产了一个元素
//若当前没有线程等待条件变量,则唤醒无意义
bool push(const T &item){
m_mutex.lock();
if (m_size >= m_max_size){
m_cond.broadcast();
m_mutex.unlock();
return false;
}
//将新增数据放在循环数组的对应位置
m_back = (m_back + 1) % m_max_size;
m_array[m_back] = item;
m_size++;
m_cond.broadcast();
m_mutex.unlock();
return true;
}
//pop时,如果当前队列没有元素,将会等待条件变量
bool pop(T &item){
m_mutex.lock();
//多个消费者的时候,这里要是用while而不是if
while (m_size <= 0){
//当重新抢到互斥锁,pthread_cond_wait返回为0
if (!m_cond.wait(m_mutex.get())){
m_mutex.unlock();
return false;
}
}
//取出队列首的元素,这里需要理解一下,使用循环数组模拟的队列
m_front = (m_front + 1) % m_max_size;
item = m_array[m_front];
m_size--;
m_mutex.unlock();
return true;
}
//增加了超时处理,在项目中没有使用到
//在pthread_cond_wait基础上增加了等待的时间,只指定时间内能抢到互斥锁即可
//其他逻辑不变
bool pop(T &item, int ms_timeout){
struct timespec t = {0, 0};
struct timeval now = {0, 0};
gettimeofday(&now, NULL);
m_mutex.lock();
if (m_size <= 0){
t.tv_sec = now.tv_sec + ms_timeout / 1000;
t.tv_nsec = (ms_timeout % 1000) * 1000;
if (!m_cond.timewait(m_mutex.get(), t)){
m_mutex.unlock();
return false;
}
}
if (m_size <= 0){
m_mutex.unlock();
return false;
}
m_front = (m_front + 1) % m_max_size;
item = m_array[m_front];
m_size--;
m_mutex.unlock();
return true;
}
private:
locker m_mutex; //锁
cond m_cond; //条件变量
T *m_array; //用数组实现阻塞队列
int m_size; //阻塞队列目前的大小
int m_max_size; //阻塞队列的容量
int m_front; //阻塞队列的队头
int m_back; //阻塞队列的队尾
};
#endif
10 日志系统(下)
10.1 本文内容
日志系统分为两部分,其一是单例模式与阻塞队列的定义,其二是日志类的定义与使用。
本篇将介绍日志类的定义与使用,具体的涉及到基础API,流程图与日志类定义,功能实现。
基础API,描述fputs,可变参数宏__VA_ARGS__,fflush
流程图与日志类定义,描述日志系统整体运行流程,介绍日志类的具体定义
功能实现,结合代码分析同步、异步写文件逻辑,分析超行、按天分文件和日志分级的具体实现
10.2 基础API
为了更好的源码阅读体验,这里对一些API用法进行介绍。
fputs
#include <stdio.h>
int fputs(const char *str, FILE *stream);
- str,一个数组,包含了要写入的以空字符终止的字符序列。
- stream,指向FILE对象的指针,该FILE对象标识了要被写入字符串的流。
可变参数宏__VA_ARGS__
__VA_ARGS__是一个可变参数的宏,定义时宏定义中参数列表的最后一个参数为省略号,在实际使用时会发现有时会加##,有时又不加。
//最简单的定义
#define my_print1(...) printf(__VA_ARGS__)
//搭配va_list的format使用
#define my_print2(format, ...) printf(format, __VA_ARGS__)
#define my_print3(format, ...) printf(format, ##__VA_ARGS__)
__VA_ARGS__宏前面加上##的作用在于,当可变参数的个数为0时,这里printf参数列表中的的##会把前面多余的”,”去掉,否则会编译出错,建议使用后面这种,使得程序更加健壮。
fflush
#include <stdio.h>
int fflush(FILE *stream);
fflush()会强迫将缓冲区内的数据写回参数stream 指定的文件中,如果参数stream 为NULL,fflush()会将所有打开的文件数据更新。
在使用多个输出函数连续进行多次输出到控制台时,有可能下一个数据再上一个数据还没输出完毕,还在输出缓冲区中时,下一个printf就把另一个数据加入输出缓冲区,结果冲掉了原来的数据,出现输出错误。
在prinf()后加上fflush(stdout); 强制马上输出到控制台,可以避免出现上述错误。
10.3 流程图与日志类定义
流程图
日志文件
- 局部变量的懒汉模式获取实例
- 生成日志文件,并判断同步和异步写入方式
- 局部变量的懒汉模式获取实例
同步
- 判断是否分文件
- 直接格式化输出内容,将信息写入日志文件
- 判断是否分文件
异步
- 判断是否分文件
- 格式化输出内容,将内容写入阻塞队列,创建一个写线程,从阻塞队列取出内容写入日志文件
- 判断是否分文件
日志类定义
通过局部变量的懒汉单例模式创建日志实例,对其进行初始化生成日志文件后,格式化输出内容,并根据不同的写入方式,完成对应逻辑,写入日志文件。
日志类包括但不限于如下方法,
- 公有的实例获取方法
- 初始化日志文件方法
- 异步日志写入方法,内部调用私有异步方法
- 内容格式化方法
- 刷新缓冲区
- …
#ifndef LOG_H
#define LOG_H
#include <stdio.h>
#include <iostream>
#include <string>
#include <stdarg.h>
#include <pthread.h>
#include "block_queue.h"
using namespace std;
class Log{
public:
//C++11以后,使用局部变量懒汉不用加锁
static Log *get_instance(){
static Log instance;
return &instance;
}
//异步写日志公有方法,调用私有方法async_write_log
static void *flush_log_thread(void *args){
Log::get_instance()->async_write_log();
}
//可选择的参数有日志文件、日志缓冲区大小、最大行数以及最长日志条队列
bool init(const char *file_name, int close_log, int log_buf_size = 8192, int split_lines = 5000000, int max_queue_size = 0);
//将输出内容按照标准格式整理
void write_log(int level, const char *format, ...);
//强制刷新缓冲区
void flush(void);
private:
Log();
virtual ~Log();
//异步写日志方法
void *async_write_log(){
string single_log;
//从阻塞队列中取出一个日志string,写入文件
while (m_log_queue->pop(single_log)){
m_mutex.lock();
fputs(single_log.c_str(), m_fp);
m_mutex.unlock();
}
}
private:
char dir_name[128]; //路径名
char log_name[128]; //log文件名
int m_split_lines; //日志最大行数
int m_log_buf_size; //日志缓冲区大小
long long m_count; //日志行数记录
int m_today; //因为按天分类,记录当前时间是那一天
FILE *m_fp; //打开log的文件指针
char *m_buf; //要输出的内容
block_queue<string> *m_log_queue; //阻塞队列
bool m_is_async; //是否异步标志位
locker m_mutex;
int m_close_log; //关闭日志
};
//这四个宏定义在其他文件中使用,主要用于不同类型的日志输出
#define LOG_DEBUG(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(0, format, ##__VA_ARGS__); Log::get_instance()->flush();}
#define LOG_INFO(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(1, format, ##__VA_ARGS__); Log::get_instance()->flush();}
#define LOG_WARN(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(2, format, ##__VA_ARGS__); Log::get_instance()->flush();}
#define LOG_ERROR(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(3, format, ##__VA_ARGS__); Log::get_instance()->flush();}
#endif
日志类中的方法都不会被其他程序直接调用,末尾的四个可变参数宏提供了其他程序的调用方法。
前述方法对日志等级进行分类,包括DEBUG,INFO,WARN和ERROR四种级别的日志。
10.4 功能实现
init函数实现日志创建、写入方式的判断。
write_log函数完成写入日志文件中的具体内容,主要实现日志分级、分文件、格式化输出内容。
生成日志文件 && 判断写入方式
通过单例模式获取唯一的日志类,调用init方法,初始化生成日志文件,服务器启动按当前时刻创建日志,前缀为时间,后缀为自定义log文件名,并记录创建日志的时间day和行数count。
写入方式通过初始化时是否设置队列大小(表示在队列中可以放几条数据)来判断,若队列大小为0,则为同步,否则为异步。
//异步需要设置阻塞队列的长度,同步不需要设置
bool Log::init(const char *file_name, int close_log, int log_buf_size, int split_lines, int max_queue_size){
//如果设置了max_queue_size,则设置为异步
if (max_queue_size >= 1){
m_is_async = true;//设置写入方式flag
m_log_queue = new block_queue<string>(max_queue_size);//创建并设置阻塞队列长度
pthread_t tid;
//flush_log_thread为回调函数,这里表示创建线程异步写日志
pthread_create(&tid, NULL, flush_log_thread, NULL);
}
//输出内容的长度
m_close_log = close_log;
m_log_buf_size = log_buf_size;
m_buf = new char[m_log_buf_size];
memset(m_buf, '\0', m_log_buf_size);
//日志的最大行数
m_split_lines = split_lines;
time_t t = time(NULL);
struct tm *sys_tm = localtime(&t);
struct tm my_tm = *sys_tm;
//从后往前找到第一个/的位置
const char *p = strrchr(file_name, '/');
char log_full_name[256] = {0};
//相当于自定义日志名
//若输入的文件名没有/,则直接将时间+文件名作为日志名
if (p == NULL){
snprintf(log_full_name, 255, "%d_%02d_%02d_%s", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, file_name);
}
else{
//将/的位置向后移动一个位置,然后复制到logname中
//p - file_name + 1是文件所在路径文件夹的长度
//dirname相当于./
strcpy(log_name, p + 1);
strncpy(dir_name, file_name, p - file_name + 1);
//后面的参数跟format有关
snprintf(log_full_name, 255, "%s%d_%02d_%02d_%s", dir_name, my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, log_name);
}
m_today = my_tm.tm_mday;
m_fp = fopen(log_full_name, "a");
if (m_fp == NULL){
return false;
}
return true;
}
日志分级与分文件
日志分级的实现大同小异,一般的会提供五种级别,具体的,
- Debug,调试代码时的输出,在系统实际运行时,一般不使用。
- Warn,这种警告与调试时终端的warning类似,同样是调试代码时使用。
- Info,报告系统当前的状态,当前执行的流程或接收的信息等。
- Error和Fatal,输出系统的错误信息。
上述的使用方法仅仅是个人理解,在开发中具体如何选择等级因人而异。项目中给出了除Fatal外的四种分级,实际使用了Debug,Info和Error三种。
超行、按天分文件逻辑,具体的,
日志写入前会判断当前day是否为创建日志的时间,行数是否超过最大行限制
- 若为创建日志时间,写入日志,否则按当前时间创建新log,更新创建时间和行数
- 若行数超过最大行限制,在当前日志的末尾加count/max_lines为后缀创建新log
- 若为创建日志时间,写入日志,否则按当前时间创建新log,更新创建时间和行数
将系统信息格式化后输出,具体为:格式化时间 + 格式化内容
//将系统信息格式化后输出,具体为:格式化时间 + 格式化内容
void Log::write_log(int level, const char *format, ...){
struct timeval now = {0, 0};
gettimeofday(&now, NULL);
time_t t = now.tv_sec;
struct tm *sys_tm = localtime(&t);
struct tm my_tm = *sys_tm;
char s[16] = {0};
//日志分级
switch (level)
{
case 0:
strcpy(s, "[debug]:");
break;
case 1:
strcpy(s, "[info]:");
break;
case 2:
strcpy(s, "[warn]:");
break;
case 3:
strcpy(s, "[erro]:");
break;
default:
strcpy(s, "[info]:");
break;
}
//写入一个log,对m_count++, m_split_lines最大行数
m_mutex.lock();
m_count++;
//日志不是今天或写入的日志行数是最大行的倍数
//m_split_lines为最大行数
if (m_today != my_tm.tm_mday || m_count % m_split_lines == 0){ //everyday log
char new_log[256] = {0};
fflush(m_fp);
fclose(m_fp);
char tail[16] = {0};
//格式化日志名中的时间部分
snprintf(tail, 16, "%d_%02d_%02d_", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday);
//如果是时间不是今天,则创建今天的日志,更新m_today和m_count
if (m_today != my_tm.tm_mday){
snprintf(new_log, 255, "%s%s%s", dir_name, tail, log_name);
m_today = my_tm.tm_mday;
m_count = 0;
}
else{
//超过了最大行,在之前的日志名基础上加后缀, m_count/m_split_lines
snprintf(new_log, 255, "%s%s%s.%lld", dir_name, tail, log_name, m_count / m_split_lines);
}
m_fp = fopen(new_log, "a");
}
m_mutex.unlock();
va_list valst;
va_start(valst, format);
string log_str;
m_mutex.lock();
//写入的具体时间内容格式 //时间格式化,snprintf成功返回写字符的总数,其中不包括结尾的null字符
int n = snprintf(m_buf, 48, "%d-%02d-%02d %02d:%02d:%02d.%06ld %s ",
my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday,
my_tm.tm_hour, my_tm.tm_min, my_tm.tm_sec, now.tv_usec, s);
//内容格式化,用于向字符串中打印数据、数据格式用户自定义,返回写入到字符数组str中的字符个数(不包含终止符)
int m = vsnprintf(m_buf + n, m_log_buf_size - 1, format, valst);
m_buf[n + m] = '\n';
m_buf[n + m + 1] = '\0';
log_str = m_buf;
m_mutex.unlock();
//若m_is_async为true表示异步,默认为同步
//若异步,则将日志信息加入阻塞队列,同步则加锁向文件中写
if (m_is_async && !m_log_queue->full()){
m_log_queue->push(log_str);
}
else{
m_mutex.lock();
fputs(log_str.c_str(), m_fp);
m_mutex.unlock();
}
va_end(valst);
}
void Log::flush(void){
m_mutex.lock();
//强制刷新写入流缓冲区
fflush(m_fp);
m_mutex.unlock();
}
11 数据库连接池
11.1 基础知识
什么是数据库连接池?
池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化。通俗来说,池是资源的容器,本质上是对资源的复用。
顾名思义,连接池中的资源为一组数据库连接,由程序动态地对池中的连接进行使用,释放。
当系统开始处理客户请求的时候,如果它需要相关的资源,可以直接从池中获取,无需动态分配;当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用释放资源。
数据库访问的一般流程是什么?
当系统需要访问数据库时,先系统创建数据库连接,完成数据库操作,然后系统断开数据库连接。
为什么要创建连接池?
从一般流程中可以看出,若系统需要频繁访问数据库,则需要频繁创建和断开数据库连接,而创建数据库连接是一个很耗时的操作,也容易对数据库造成安全隐患。
在程序初始化的时候,集中创建多个数据库连接,并把他们集中管理,供程序使用,可以保证较快的数据库读写速度,更加安全可靠。
11.2 整体概述
池可以看做资源的容器,所以多种实现方法,比如数组、链表、队列等。这里,使用单例模式和链表创建数据库连接池,实现对数据库连接资源的复用。
项目中的数据库模块分为两部分,其一是数据库连接池的定义,其二是利用连接池完成登录和注册的校验功能。具体的,工作线程从数据库连接池取得一个连接,访问数据库中的数据,访问完毕后将连接交还连接池。
11.3 本文内容
本篇将介绍数据库连接池的定义,具体的涉及到单例模式创建、连接池代码实现、RAII机制释放数据库连接。
单例模式创建,结合代码描述连接池的单例实现。
连接池代码实现,结合代码对连接池的外部访问接口进行详解。
RAII机制释放数据库连接,描述连接释放的封装逻辑。
11.4 单例模式创建
使用局部静态变量懒汉模式创建连接池。
class connection_pool{
public:
//局部静态变量单例模式
static connection_pool *GetInstance();
private:
connection_pool();
~connection_pool();
}
connection _pool *connection_pool: :GetInstance(){
static connection_pool connPool;
return &connPool;
}
11.5 连接池代码实现
连接池的定义中注释比较详细,这里仅对其实现进行解析。
连接池的功能主要有:初始化,获取连接、释放连接,销毁连接池。
初始化
值得注意的是,销毁连接池没有直接被外部调用,而是通过RAII机制来完成自动释放;使用信号量实现多线程争夺连接的同步机制,这里将信号量初始化为数据库的连接总数。
connection_pool::connection_pool(){
m_CurConn = 0;
m_FreeConn = 0;
}
//RAII机制销毁连接池
connectionRAII::~connectionRAII(){
poolRAII->ReleaseConnection(conRAII);
}
//构造初始化
void connection_pool::init(string url, string User, string PassWord, string DBName, int Port, int MaxConn, int close_log){
//初始化数据库信息
m_url = url;
m_Port = Port;
m_User = User;
m_PassWord = PassWord;
m_DatabaseName = DBName;
m_close_log = close_log;
//创建MaxConn条数据库连接
for (int i = 0; i < MaxConn; i++){
MYSQL *con = NULL;
con = mysql_init(con);
if (con == NULL){
LOG_ERROR("MySQL Error");
exit(1);
}
con = mysql_real_connect(con, url.c_str(), User.c_str(), PassWord.c_str(), DBName.c_str(), Port, NULL, 0);
if (con == NULL){
LOG_ERROR("MySQL Error");
exit(1);
}
//更新连接池和空闲连接数量
connList.push_back(con);
++m_FreeConn;
}
//将信号量初始化为最大连接次数
reserve = sem(m_FreeConn);
m_MaxConn = m_FreeConn;
}
获取、释放连接
当线程数量大于数据库连接数量时,使用信号量进行同步,每次取出连接,信号量原子减1,释放连接原子加1,若连接池内没有连接了,则阻塞等待。
另外,由于多线程操作连接池,会造成竞争,这里使用互斥锁完成同步,具体的同步机制均使用lock.h中封装好的类。
//当有请求时,从数据库连接池中返回一个可用连接,更新使用和空闲连接数
MYSQL *connection_pool::GetConnection(){
MYSQL *con = NULL;
if (0 == connList.size())
return NULL;
//取出连接,信号量原子减1,为0则等待
reserve.wait();
lock.lock();
con = connList.front();
connList.pop_front();
//这里的两个变量,并没有用到,非常鸡肋...
--m_FreeConn;
++m_CurConn;
lock.unlock();
return con;
}
//释放当前使用的连接 //把该链接重新放到队列中 //释放连接原子加1
bool connection_pool::ReleaseConnection(MYSQL *con){
if (NULL == con)
return false;
lock.lock();
connList.push_back(con);
++m_FreeConn;
--m_CurConn;
lock.unlock();
reserve.post(); //释放连接原子加1
return true;
}
销毁连接池
通过迭代器遍历连接池链表,关闭对应数据库连接,清空链表并重置空闲连接和现有连接数量。
//销毁数据库连接池
void connection_pool::DestroyPool(){
lock.lock();
if (connList.size() > 0){
//通过迭代器遍历,关闭数据库连接
for (list<MYSQL *>::iterator it = connList.begin(); it != connList.end(); ++it){
MYSQL *con = *it;
mysql_close(con); //mysql的API,关闭这个链接
}
m_CurConn = 0;
m_FreeConn = 0;
connList.clear(); //清空连接池的队列
}
lock.unlock();
}
11.6 RAII机制释放数据库连接
将数据库连接的获取与释放通过RAII机制封装,避免手动释放。
定义
这里需要注意的是,在获取连接时,通过有参构造对传入的参数进行修改。其中数据库连接本身是指针类型,所以参数需要通过双指针才能对其进行修改。
class connectionRAII{
public:
//双指针对MYSQL *con修改
connectionRAII(MYSQL **con, connection_pool *connPool);
~connectionRAII();
private:
MYSQL *conRAII; //MySQL连接的指针
connection_pool *poolRAII; //连接池的指针
};
实现
不直接调用获取和释放连接的接口,将其封装起来,通过RAII机制进行获取和释放。
//双指针对MYSQL *con修改
connectionRAII::connectionRAII(MYSQL **SQL, connection_pool *connPool){
*SQL = connPool->GetConnection();
conRAII = *SQL;
poolRAII = connPool;
}
//RAII机制释放当前使用的连接
connectionRAII::~connectionRAII(){
poolRAII->ReleaseConnection(conRAII);//释放当前使用的连接 //把该链接重新放到队列中 //释放连接原子加1
}
12 注册登录
整体概述
本项目中,使用数据库连接池实现服务器访问数据库的功能,使用POST请求完成注册和登录的校验工作。
本文内容
本篇将介绍同步实现注册登录功能,具体的涉及到流程图,载入数据库表,提取用户名和密码,注册登录流程与页面跳转的的代码实现。
流程图,描述服务器从报文中提取出用户名密码,并完成注册和登录校验后,实现页面跳转的逻辑。
载入数据库表,结合代码将数据库中的数据载入到服务器中。
提取用户名和密码,结合代码对报文进行解析,提取用户名和密码。
注册登录流程,结合代码对描述服务器进行注册和登录校验的流程。
页面跳转,结合代码对页面跳转机制进行详解。
流程图
具体的,描述了GET和POST请求下的页面跳转流程。
载入数据库表
将数据库中的用户名和密码载入到服务器的map中来,map中的key为用户名,value为密码。
map<string, string> m_users;
//执行SELECT username,passwd FROM user,把结果放在map<string, string> m_users中
void http_conn::initmysql_result(connection_pool *connPool){
//先从连接池中取一个连接
MYSQL *mysql = NULL;
connectionRAII mysqlcon(&mysql, connPool);
//在user表中检索username,passwd数据,浏览器端输入
if (mysql_query(mysql, "SELECT username,passwd FROM user")){
LOG_ERROR("SELECT error:%s\n", mysql_error(mysql));
}
//从表中检索完整的结果集
MYSQL_RES *result = mysql_store_result(mysql);
//返回结果集中的列数
int num_fields = mysql_num_fields(result);
//返回所有字段结构的数组
MYSQL_FIELD *fields = mysql_fetch_fields(result);
//从结果集中获取下一行,将对应的用户名和密码,存入map中
while (MYSQL_ROW row = mysql_fetch_row(result)){
string temp1(row[0]);
string temp2(row[1]);
users[temp1] = temp2;
}
}
提取用户名和密码
服务器端解析浏览器的请求报文,当解析为POST请求时,cgi标志位设置为1,并将请求报文的消息体赋值给m_string,进而提取出用户名和密码。
//判断http请求是否被完整读入
http_conn::HTTP_CODE http_conn::parse_content(char *text){
if (m_read_idx >= (m_content_length + m_checked_idx)){
text[m_content_length] = '\0';
//POST请求中最后为输入的用户名和密码
m_string = text;
return GET_REQUEST;
}
return NO_REQUEST;
}
//根据标志判断是登录检测还是注册检测
char flag = m_url[1];
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/");
strcat(m_url_real, m_url + 2);
strncpy(m_real_file + len, m_url_real, FILENAME_LEN - len - 1);
free(m_url_real);
//将用户名和密码提取出来
//user=123&password=123
char name[100], password[100];
int i;
//以&为分隔符,前面的为用户名
for (i = 5; m_string[i] != '&'; ++i)
name[i - 5] = m_string[i];
name[i - 5] = '\0';
//以&为分隔符,后面的是密码
int j = 0;
for (i = i + 10; m_string[i] != '\0'; ++i, ++j)
password[j] = m_string[i];
password[j] = '\0';
同步线程登录注册
通过m_url定位/所在位置,根据/后的第一个字符判断是登录还是注册校验。
2
- 登录校验
3
- 注册校验
根据校验结果,跳转对应页面。另外,对数据库进行操作时,需要通过锁来同步。
//处理cgi
if (*(p + 1) == '3'){
//如果是注册,先检测数据库中是否有重名的
//没有重名的,进行增加数据
char *sql_insert = (char *)malloc(sizeof(char) * 200);
strcpy(sql_insert, "INSERT INTO user(username, passwd) VALUES(");
strcat(sql_insert, "'");
strcat(sql_insert, name);
strcat(sql_insert, "', '");
strcat(sql_insert, password);
strcat(sql_insert, "')");
//判断map中能否找到重复的用户名
if (users.find(name) == users.end()){
//向数据库中插入数据时,需要通过锁来同步数据
m_lock.lock();
int res = mysql_query(mysql, sql_insert);
users.insert(pair<string, string>(name, password)); //在程序中存储用户名和密码
m_lock.unlock();
//校验成功,跳转登录页面
if (!res)
strcpy(m_url, "/log.html");
else //校验失败,跳转注册失败页面
strcpy(m_url, "/registerError.html");
}
else
strcpy(m_url, "/registerError.html");
}
//如果是登录,直接判断
//若浏览器端输入的用户名和密码在表中可以查找到,返回1,否则返回0
else if (*(p + 1) == '2'){
if (users.find(name) != users.end() && users[name] == password)
strcpy(m_url, "/welcome.html");
else
strcpy(m_url, "/logError.html");
}
页面跳转
通过m_url定位/所在位置,根据/后的第一个字符,使用分支语句实现页面跳转。具体的,
0
- 跳转注册页面,GET
1
- 跳转登录页面,GET
5
- 显示图片页面,POST
6
- 显示视频页面,POST
7
- 显示关注页面,POST
if (*(p + 1) == '0'){//如果请求资源为/0,表示跳转注册界面
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/register.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else if (*(p + 1) == '1'){//如果请求资源为/1,表示跳转登录界面
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/log.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else if (*(p + 1) == '5'){//如果请求资源为/5,表示访问图片界面
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/picture.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else if (*(p + 1) == '6'){
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/video.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else if (*(p + 1) == '7'){
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/fans.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else
//如果以上均不符合,即不是登录和注册,直接将url与网站目录拼接
//这里的情况是welcome界面,请求服务器上的一个图片
strncpy(m_real_file + len, m_url, FILENAME_LEN - len - 1);
13 踩坑和面试题
本文内容
本篇是项目的最终篇,将介绍踩坑与面试题两部分。
踩坑,描述做项目过程中遇到的问题与解决方案。
面试题,介绍项目相关的知识点变种和真实面试题,这里不会给出答案,具体的,可以在项目微信群中讨论。
踩坑
做项目过程中,肯定会遇到形形色色、大大小小的问题,但并不是所有问题都值得列出来探讨,这里仅列出个人认为有意义的问题。
具体的,包括大文件传输。
大文件传输
先看下之前的大文件传输,也就是游双书上的代码,发送数据只调用了writev函数,并对其返回值是否异常做了处理。
bool http_conn::write() {
int temp=0;
int bytes_have_send=0;
int bytes_to_send=m_write_idx;
if(bytes_to_send==0) {
modfd(m_epollfd,m_sockfd,EPOLLIN);
init();
return true;
}
while(1) {
temp=writev(m_sockfd,m_iv,m_iv_count);
if(temp<=-1){
if(errno==EAGAIN){
modfd(m_epollfd,m_sockfd,EPOLLOUT);
return true;
}
unmap();
return false;
}
bytes_to_send-=temp;
bytes_have_send+=temp;
if(bytes_to_send<=bytes_have_send){
unmap();
if(m_linger){
init();
modfd(m_epollfd,m_sockfd,EPOLLIN);
return true;
}
else {
modfd(m_epollfd,m_sockfd,EPOLLIN);
return false;
}
}
}
}
在实际测试中发现,当请求小文件,也就是调用一次writev函数就可以将数据全部发送出去的时候,不会报错,此时不会再次进入while循环。
一旦请求服务器文件较大文件时,需要多次调用writev函数,便会出现问题,不是文件显示不全,就是无法显示。
对数据传输过程分析后,定位到writev的m_iv结构体成员有问题,每次传输后不会自动偏移文件指针和传输长度,还会按照原有指针和原有长度发送数据。
根据前面的基础API分析,我们知道writev以顺序iov[0],iov[1]至iov[iovcnt-1]从缓冲区中聚集输出数据。项目中,申请了2个iov,其中iov[0]为存储报文状态行的缓冲区,iov[1]指向资源文件指针。
对上述代码做了修改如下:
- 由于报文消息报头较小,第一次传输后,需要更新m_iv[1].iov_base和iov_len,m_iv[0].iov_len置成0,只传输文件,不用传输响应消息头
- 每次传输后都要更新下次传输的文件起始位置和长度
更新后,大文件传输得到了解决。
bool http_conn::write(){
int temp = 0;
int newadd = 0;
if (bytes_to_send == 0){
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);
init();
return true;
}
while (1){
temp = writev(m_sockfd, m_iv, m_iv_count);
if (temp >= 0){
bytes_have_send += temp;
newadd = bytes_have_send - m_write_idx;
}
else{
if (errno == EAGAIN) {
if (bytes_have_send >= m_iv[0].iov_len){
m_iv[0].iov_len = 0;
m_iv[1].iov_base = m_file_address + newadd;
m_iv[1].iov_len = bytes_to_send;
}
else {
m_iv[0].iov_base = m_write_buf + bytes_have_send;
m_iv[0].iov_len = m_iv[0].iov_len - bytes_have_send;
}
modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode);
return true;
}
unmap();
return false;
}
bytes_to_send -= temp;
if (bytes_to_send <= 0){
unmap();
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);
if (m_linger){
init();
return true;
}
else {
return false;
}
}
}
}
面试题
包括项目介绍,线程池相关,并发模型相关,HTTP报文解析相关,定时器相关,日志相关,压测相关,综合能力等。
项目介绍
- 为什么要做这样一个项目?
- 介绍下你的项目
线程池相关
- 手写线程池
- 线程的同步机制有哪些?
- 线程池中的工作线程是一直等待吗?
- 你的线程池工作线程处理完一个任务后的状态是什么?
- 如果同时1000个客户端进行访问请求,线程数不多,怎么能及时响应处理每一个呢?
- 如果一个客户请求需要占用线程很久的时间,会不会影响接下来的客户请求呢,有什么好的策略呢?
并发模型相关
- 简单说一下服务器使用的并发模型?
- reactor、proactor、主从reactor模型的区别?
- 你用了epoll,说一下为什么用epoll,还有其他复用方式吗?区别是什么?
HTTP报文解析相关
- 用了状态机啊,为什么要用状态机?
- 状态机的转移图画一下
- https协议为什么安全?
- https的ssl连接过程
- GET和POST的区别
数据库登录注册相关
- 登录说一下?
- 你这个保存状态了吗?如果要保存,你会怎么做?(cookie和session)
- 登录中的用户名和密码你是load到本地,然后使用map匹配的,如果有10亿数据,即使load到本地后hash,也是很耗时的,你要怎么优化?
- 用的mysql啊,redis了解吗?用过吗?
定时器相关
- 为什么要用定时器?
- 说一下定时器的工作原理
- 双向链表啊,删除和添加的时间复杂度说一下?还可以优化吗?
- 最小堆优化?说一下时间复杂度和工作原理
日志相关
- 说下你的日志系统的运行机制?
- 为什么要异步?和同步的区别是什么?
- 现在你要监控一台服务器的状态,输出监控日志,请问如何将该日志分发到不同的机器上?(消息队列)
压测相关
- 服务器并发量测试过吗?怎么测试的?
- webbench是什么?介绍一下原理
- 测试的时候有没有遇到问题?
综合能力
- 你的项目解决了哪些其他同类项目没有解决的问题?
- 说一下前端发送请求后,服务器处理的过程,中间涉及哪些协议?