博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
一个模拟的负载均衡系统的实现
阅读量:3940 次
发布时间:2019-05-24

本文共 15060 字,大约阅读时间需要 50 分钟。

文章目录

一、标题

一个模拟的负载均衡系统的实现

二、题目

为了构建可伸缩的,高可用的网络服务,很多大型网站都采用了负载均衡技术。

利用负载均衡技术,可以将多台廉价的、低性能的服务器,组合成一台性能强劲的,高可用的虚拟服务器。

负载均衡的常见实现方式大致如下:

将网络服务的地址(如公网IP地址、tcp套接字等)部署在负载均衡器上,而不是真实的服务器上;

将负载均衡器作为网络服务的总入口,接收用户的所有访问请求;

负载均衡器接收到用户的访问请求后,将访问请求按照一定的策略分发给某一台真实的服务器进行处理;

真实服务器,对访问请求进行处理后,将处理结果发送给负载均衡器;

负载均衡器接收到真实服务器的处理结果,将他发送给用户。

下图展示了一个负载均衡系统的组网结构,图中包含了1台负载均衡器,3台真实的服务器。公网IP配置在负载均衡器上,负载均衡器与真实服务器之间,则通过私网地址进行通讯。

在这里插入图片描述

图1 负载均衡组网结构

有关负载均衡的技术原理及更多详细信息,请上网查阅相关资料或阅读相关书籍。

本题的任务,是在PC机上实现一个模拟的负载均衡系统。

他包含如下3个可执行程序:

服务端 (server.exe)――辅助程序,通过UDP端口,提供时间查询服务。

负载均衡器(LB.exe) ――核心程序,用于实现负载均衡功能。

客户端 (client.exe)――辅助程序,通过UDP端口,访问时间查询服务。

3个程序的协作关系如下图所示,其中客户端与服务端程序,需要起多个进程。

在这里插入图片描述

图 2 系统协作
上图中,每一个方框表示一个进程。每一个进程拥有一个唯一的id(注意,这是由用户配置的id,并非操作系统为进程分配的pid),进程之间一律通过UDP协议进行通信。系统运行起来之后,客户端通过UDP协议向负载均衡器发送“时间请求”消息,负载均衡器通过UDP协议将消息分发给某个服务端进行处理。服务端返回“时间应答”消息给负载均衡器,负载均衡器将“时间应答”消息返回给客户端。

为了简化实现,本模拟系统中,所有消息,都采用如下结构体进行封装。通过msg_type字段的值,来区分不同类型的消息。

typedef struct{
/* 消息的发送进程是谁,就填谁的id */ unsigned src_id;/* 消息的接收进程是谁,就填谁的id */ unsigned dst_id;/* 发送“时间请求”消息时填写,回复“时间应答”消息时,其值要与请求消息保持一致。 */ unsigned usr_id;/* 消息类型:0, 时间请求;1, 时间答应;2, 心跳请求;3, 心跳应答 */ unsigned msg_type;/* 服务端回复“时间应答”消息时,在data中填入当前时间的字符串,形式如“2013-06-20 13:56:28”即可 */ char data[32];} t_msg;

三、功能需求

a)服务端程序

服务端程序,需要起多个进程,每个进程拥有一个唯一的id,绑定到一个唯一的UDP端口上。每个服务端进程通过自己绑定的UDP端口接收“时间请求”消息,如果消息中的dst_id等于自己的id,就向对端发送“时间应答”消息。否则,就丢弃此消息。

每个服务端进程的id、udp端口号,可以通过命令行参数传入,可以通过配置文件配置,也可以在进程运行时指定。三种方式,只要支持任意一种就行了。

服务端程序,需要具备一个调试开关。在运行过程中,可以打开/关闭调试开关。当调试开关打开后,服务端进程需要将自己接收/发送的每一个消息,都实时显示给用户看。

服务端程序,需要具备统计功能。在运行过程中,可以随时可看,每个服务端进程接收了多少条消息(正确的多少条,错误的多少条),应答了多少条消息。

b)负载均衡程序

负载均衡程序,只要启动一个进程即可。此进程拥有一个唯一的id,绑定到两个不同的UDP端口上。一个UDP端口(下文称为client_udp_port)用于收发客户端的消息,一个UDP端口(下文称为server_udp_port)用于收发服务端的消息。

负载均衡进程的id是多少,绑定的两个udp端口号是多少,支持多少个服务端,每个服务端的id、udp端口各是多少,均通过配置文件进行配置的。负载均衡进程启动时读入这些信息,运行过程中,不会改变。

对于客户端有多少个,每个客户端的id是多少,UDP端口号是多少,负载均衡进程对这些信息是一无所知的,也是无法预测的。

负载均衡进程通过client_udp_port接收到客户端的“时间请求”消息后,如果消息中的dst_id不等于自己的id,就丢弃此消息。否则,就按照轮转算法选出一个服务端,将时间请求消息中的dst_id改成此服务端的id后,将消息通过server_udp_port分发给该服务端处理。

负载均衡进程通过server_udp_port接收到客户端的“时间应答”消息后,将消息中的src_id改成自己的id,然后将消息通过client_udp_port发送给消息中的dst_id所指示的客户端。

负载均衡程序,需要具备一个调试开关。在运行过程中,可以打开/关闭调试开关。当调试开关打开后,程序需要将自己接收/发送的每一个消息,都实时显示给用户看。

负载均衡程序,需要具备统计功能。在运行过程中,可以随时可看,本进程从客户端接收了多少条消息(正确的多少条,错误的多少条),向客户端发送了多少条消息,从服务端接收了多少条消息(正确的多少条,错误的多少条),向服务端发送了多少条消息。

负载均衡程序,还需要具备日志功能。在运行过程中,如果出现异常事件(如UDP接收、发送失败等),需要记录日志,供后续分析查看。日志中,尽当尽可能包含详细的信息,如异常事件发生的时间、事件描述、事件原因等。

c)客户端程序

客户端程序,需要起多个进程,每个进程拥有一个id,一个usr_id,绑定到一个默认分配的UDP端口上。每个客户端进程启动后,通过自己绑定的UDP端口向负载均衡器发送n条“时间请求”消息,并接收相应的时间应答消息。时间请求消息中的src_id填写自己的id,usr_id填写自己的usr_id,dst_id填写负载均衡器进程的id。

每个客户端进程的id、usr_id,发送的消息条数n,可以通过命令行参数传入,可以通过配置文件配置,也可以在进程启动时指定。三种方式,只要支持任意一种就行了。

注意,不同客户端进程的id可以相同、但usr_id不可以相同。

客户端进程,在运行过程中,需要将自己接收/发送的每一个消息,都实时显示给用户看。客户端进程完成自己的任务后,显示一下相关的统计信息,即可退出。统计信息包括本进程发送了多少条消息,接收了多少条消息(正确的多少条,错误的多少条)。

四、代码实现

服务器端

#include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;#define IPSTR "127.0.0.1"//定义一个通用的数据结构struct msg{ int src_id; //发送者的id int des_id; //接受者的id int usr_id; //消息的类型, int msg_type; //数据部分 char data[32];};class CServer{ public: //创建对象的时候必须给定id 和端口号 CServer(int i,int p):id(i),port(p){ } //将服务器的id 和 port 写入配置文件 void InitCnf(); //初始化套接字,创建套接字并绑定端口 int InitSock(); //保存日志文件 void CreatLog(msg &data); //实时显示用户的请求信息与服务器的发送信息 void display(msg &data); //处理socket端的程序 void dealSock(int sockfd, bool &flag); //接受负载均衡器发来的数据 bool request(int sockfd, msg &data, struct sockaddr_in &caddr); //当接受到正确的信息后,服务器向负载均衡器回复数据包 void response(int sockfd, msg &data, struct sockaddr_in &addr); //调试器开关 通过监控键盘终端的文件描述符,输入数据为debug 时开关打开 void debug(bool &flag); //拼接回复的信息 void creatData(msg &data); //负责总的调用过程 void mainfun();private: int id; //记录服务器的id int port; //记录服务器的端口};void CServer::dealSock(int sockfd, bool &flag){ msg data; struct sockaddr_in addr; if(request(sockfd, data, addr)) { if(flag) { display(data); //接受数据放入日志文件 CreatLog(data); creatData(data); //回复数据加入日志文件 CreatLog(data); display(data); } else { CreatLog(data); creatData(data); //回复数据加入日志文件 CreatLog(data); } response(sockfd, data, addr); }}//在这之前首先初始化配置文件,创建一个对象void CServer::mainfun(){ fd_set fdset; FD_ZERO( &fdset ); int fd = 0; int sockfd = InitSock(); FD_SET(fd, &fdset); FD_SET(sockfd, &fdset); //默认不开启调试器 bool flag = false; while(1) { int n = select(sockfd+1, &fdset, NULL, NULL, NULL); assert(n != -1); if(FD_ISSET(fd, &fdset)) { debug(flag); } if(FD_ISSET(sockfd, &fdset)) { //处理sock的数据 dealSock(sockfd, flag); } FD_ZERO( &fdset ); FD_SET(fd, &fdset); FD_SET(sockfd, &fdset); }}//通过终端控制调试器void CServer::debug(bool &flag){ char buff[128] = { 0}; int n = read(0, buff, 127); if(strncmp(buff, "debug", 5) == 0) { //进入调试状态 cout << "===============进入debug模式============" << endl; flag = true; } else if(strncmp(buff, "off", 3) == 0) { //关闭调试状态 cout << "==============退出debug模式============="<< endl; flag = false; }}//拼接回复信息void CServer::creatData(msg &data){ data.src_id = data.des_id ^ data.src_id; data.des_id = data.des_id ^ data.src_id; data.src_id = data.des_id ^ data.src_id; data.msg_type += 1; //设置为回复类型 strcpy(data.data,"ok");}void CServer::InitCnf(){ FILE *fp = fopen("server.cnf","a"); char buff[128] = { 0}; sprintf(buff,"%d,%d \n",id,port); //将服务器的id 和 port 写入配置文件 fwrite(buff, strlen(buff), 1, fp); fclose(fp);}//初始化套接字, 返回一个创建好的套接字int CServer::InitSock(){ int sockfd = socket(AF_INET, SOCK_DGRAM, 0); assert( sockfd != -1 ); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(IPSTR); //绑定端口 int res = bind(sockfd, (struct sockaddr*) &addr, sizeof(addr)); assert(res != -1); //返回创建的套接字 return sockfd;}//保存到本地的log.log中void CServer::CreatLog(msg &data){ //保存日志 传入一个msg 格式的数据,保存到本地文档 FILE *fp = fopen("log.log","a"); assert( fp != NULL ); char buff[128] = { 0}; sprintf(buff,"发送方id:%d,接受方id:%d,客户端usr_id:%d,请求方式:%d,数据:%s\n", data.src_id, data.des_id, data.usr_id, data.msg_type,data.data); fwrite(buff, strlen(buff), 1, fp); fclose(fp);}void CServer::display(msg &data){ cout << "发送方id:" << data.src_id << "接收方id:" << data.des_id << "数据包类型:" << data.msg_type; cout << "客户端usr_id:" << data.usr_id << "数据:" << data.data << endl; cout << endl;}//接受负载均衡器发来的消息,只接受发给本地id 的数据包//接受加过滤的功能//如果是发送给自己的数据包,则接受,返回true;//否则返回false 丢弃数据包bool CServer::request(int sockfd, msg &data, struct sockaddr_in &caddr){ socklen_t len = sizeof(caddr); //接受数据 recvfrom(sockfd, &data, sizeof(data), 0, (struct sockaddr*) &caddr, &len); if(data.des_id == id) { return true; } return false;}//服务器向客户端回复数据void CServer::response(int sockfd, msg &data, struct sockaddr_in &addr){ sendto(sockfd, &data, sizeof(data), 0, (struct sockaddr*) &addr, sizeof(addr));}int main(int argc, char *argv[]){ //接受输入的id 和 port int id,port; if(argc < 3) { cout << "参数错误" << endl; return 1; } id = strtol(argv[1],NULL,10); port = strtol(argv[2],NULL,10); //创建一个服务器对象 CServer ser(id,port); ser.InitCnf(); ser.mainfun(); return 0;}

客户端

#include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;#define PORT 8888#define IPSTR "127.0.0.1"#define ID 1 //负载均衡器的id#define REQUEST 0 //数据包的类型//定义结构体typedef struct{ int src_id; int des_id; int usr_id; int msg_type; char data[32];}msg;class CClient{ public: //构建对象必须指定id 和 usr_id CClient(int i,int u_id):id(i),usr_id(u_id){ } //初始化一个套接字 int initSock(struct sockaddr_in &addr); //发送数据 void sendMsg(int sockfd, struct sockaddr_in &addr, msg &data); //接受数据 bool recvMsg(int sockfd, struct sockaddr_in &addr, msg &data); //显示数据 void display(msg &data); //显示统计结果 void displayRes(); //构造发送的数据 void creatData(msg &data); //主控函数 void mainfun(int n);private: int id; int usr_id; struct Count { Count(int r = 0, int s = 0, int e = 0):recv(r),send(s),error(e){ } int recv; //接受的个数 int send; //发送的数据包的个数 int error; //错误的个数 }; Count cou;};//主控函数void CClient::mainfun(int n){ struct sockaddr_in addr; int sockfd = initSock(addr); fd_set fdset; FD_ZERO( &fdset ); FD_SET(sockfd, &fdset); msg data; for(int i = 0; i < n; i++) { creatData(data); display(data); sendMsg(sockfd, addr, data); //设置2秒的超时时间 struct timeval tv = { 2,0}; int n = select(sockfd+1, &fdset, NULL, NULL, &tv); if( n > 0 ) { recvMsg(sockfd, addr, data); display(data); } FD_ZERO( &fdset ); FD_SET(sockfd, &fdset); }}void CClient::creatData(msg &data){ data.src_id = id; //负载均衡器使用的服务器 data.des_id = ID; data.usr_id = usr_id; data.msg_type = REQUEST; strcpy(data.data,"time");}//返回文件描述符 并设置addrint CClient::initSock(struct sockaddr_in &addr){ int sockfd = socket(AF_INET, SOCK_DGRAM, 0); assert(sockfd != -1); //连接负载均衡服务器 memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(PORT); addr.sin_addr.s_addr = inet_addr(IPSTR); return sockfd;}//显示数据void CClient::display(msg &data){ char buff[128] = { 0}; sprintf(buff, "发送方id:%d, 接受方id:%d, 客户端usr_id:%d, 数据包类型: %d, 数据:%s" , data.src_id, data.des_id, data.usr_id, data.msg_type, data.data); cout << buff << endl;}//发送消息void CClient::sendMsg(int sockfd, struct sockaddr_in &addr, msg &data){ sendto(sockfd, &data, sizeof(data), 0, (struct sockaddr*) &addr, sizeof(addr)); cou.send++;}//接收消息出判断是否是自己的数据bool CClient::recvMsg(int sockfd, struct sockaddr_in &addr, msg &data){ socklen_t len = sizeof(addr); recvfrom(sockfd, &data, sizeof(data), 0, (struct sockaddr*) &addr, &len); cou.recv++; if(data.des_id == id) { return true; } return false;}int main(int argc, char *argv[]){ if(argc < 4) { cout << "参数错误" << endl; return -1; } int id = strtol(argv[1],NULL,10); int usr_id = strtol(argv[2], NULL, 10); int n = strtol(argv[3], NULL, 10); CClient cli(id,usr_id); cli.mainfun(n); return 0;}

负载均衡器

#include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;#define IPSTR "127.0.0.1"typedef struct{ int src_id; int des_id; int usr_id; int msg_type; char data[32];}msg;class CBlance{ public: //读取配置文件,获取id 和 端口 void readConfig(); //通过配置文件获取服务端的信息 void InitSerInfo(); //处理来自服务器的数据 void dealServerMsg(int sockSer, int sockCli, unordered_multimap
> &cli_table); //处理来自客户端的数据 void dealClientMsg(int sockSer, int sockCli, unordered_multimap
> &cli_table); //服务器端关闭套接字 void delSerAddId(int &id,struct sockaddr_in &addr); //绑定端口,获取sock 套接字 int InitSock(int &port); //将内容写入日志文件 void writeLog(msg &data); //接收消息recvMsg bool recvMsg(int sockfd, struct sockaddr_in &addr, msg &data); //发送消息sendMsg bool sendMsg(int sockfd, struct sockaddr_in &addr, msg &data); //轮循函数,动态分配服务器 void balanceLoop(int &id, struct sockaddr_in &addr); //主调函数 void mainfun();private: int id; int serPort; int cliPort; queue
server_addr; //存放服务器的地址 queue
server_id; //存放服务器的id //bind 两个ip 一个与客户端通信,一个与服务器通信 struct Count { Count(int r = 0, int s = 0, int e = 0): recv(r),send(s),error(e) { } int recv; //接收数据包的个数 int send; //发送数据包的个数 int error; //错误数据包的个数 }; Count c;};//主调函数void CBlance::mainfun(){ readConfig(); InitSerInfo(); int sockSer = InitSock(serPort); int sockCli = InitSock(cliPort); int max = sockSer > sockCli ? sockSer:sockCli; fd_set fdset; FD_ZERO( &fdset ); FD_SET(sockSer, &fdset); FD_SET(sockCli, &fdset); // id usr_id addr unordered_multimap
> cli_table; //客户端的请求表,可重复 while( 1 ) { int n = select(max+1, &fdset, NULL, NULL, NULL); if(n > 0) { if(FD_ISSET(sockSer, &fdset)) { //接收到来自服务器的数据 dealServerMsg(sockSer, sockCli, cli_table); } if(FD_ISSET(sockCli, &fdset)) { //接收到来自客户端的数据 dealClientMsg(sockSer, sockCli, cli_table); } } FD_ZERO(&fdset); FD_SET(sockSer, &fdset); FD_SET(sockCli, &fdset); }}//处理来自服务器的数据void CBlance::dealServerMsg(int sockSer, int sockCli, unordered_multimap
> &cli_table){ struct sockaddr_in addr; msg data; if(recvMsg(sockSer, addr, data)) { //将接收到的消息写入日志文件 writeLog(data); if(data.des_id == id) { auto it = cli_table.find(data.usr_id); data.src_id = id; data.des_id = it->second.first; //目的id if(it != cli_table.end()) { sendMsg(sockCli, it->second.second, data); writeLog(data); //删除一条记录 cli_table.erase(it); } } //else 对错误数据的处理 }}//处理来自客户端的数据void CBlance::dealClientMsg(int sockSer, int sockCli, unordered_multimap
> &cli_table){ struct sockaddr_in addr; msg data; if(recvMsg(sockCli, addr, data)) { writeLog(data); if(data.des_id == id) { //将客户端的消息放入cli_table中 cli_table.insert({ data.usr_id, { data.src_id, addr} }); int ser_id; balanceLoop(ser_id,addr); data.src_id = id; data.des_id = ser_id; //向服务器转发消息 sendMsg(sockSer, addr, data); //将发送的数据写入日志文件 writeLog(data); } //else 如果直接丢弃 }}//服务器端关闭套接字void CBlance::delSerAddId(int &id,struct sockaddr_in &addr){ //删除服务器端关闭的套接字 int size = server_addr.size(); for(int i = 0; i < size; i++) { if(id != server_id.front()) { server_id.push(server_id.front()); server_addr.push(server_addr.front()); server_id.pop(); server_addr.pop(); } else { server_id.pop(); server_addr.pop(); break; } }}//轮循函数,动态分配服务器void CBlance::balanceLoop(int &id, struct sockaddr_in &addr){ addr = server_addr.front(); id = server_id.front(); //将分配的服务器放入队尾 server_addr.pop(); server_id.pop(); server_addr.push(addr); server_id.push(id);}//读入服务器的配置信息void CBlance::InitSerInfo(){ FILE *fp = fopen("server.cnf","r"); struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(IPSTR); char buff[128] = { 0}; while( fgets(buff, 127, fp) > 0 ) { int id,port; sscanf(buff, "%d,%d", &id, &port); addr.sin_port = htons(port); //将服务端的地址和id 放入队列 server_addr.push(addr); server_id.push(id); } fclose(fp);}//读取配置文件中的id 和 serPort cliPortvoid CBlance::readConfig(){ FILE *fp = fopen("balance.cnf", "r"); assert(fp != NULL); char buff[128] = { 0}; if(fgets(buff, 127 ,fp) != NULL) { sscanf(buff, "%d,%d,%d", &id, &serPort, &cliPort); } fclose(fp);}//返回创建的套接字int CBlance::InitSock(int &port){ int sockfd = socket(AF_INET, SOCK_DGRAM, 0); assert(sockfd != -1); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(IPSTR); int res = bind(sockfd, (struct sockaddr*) &addr, sizeof(addr)); assert(res != -1); return sockfd;}//将数据写入配置文件中void CBlance::writeLog(msg &data){ FILE *fp = fopen("balance.log", "a"); assert(fp != NULL); char buff[128] = { 0}; sprintf(buff, "发送方id:%d,接收方id:%d,客户端usr_id:%d,数据包类型:%d,数据内容:%s\n" ,data.src_id, data.des_id, data.usr_id, data.msg_type, data.data); fwrite(buff, strlen(buff), 1, fp); fclose(fp);}//接收消息recvMsgbool CBlance::recvMsg(int sockfd, struct sockaddr_in &addr, msg &data){ socklen_t len = sizeof(addr); int n = recvfrom(sockfd, &data, sizeof(data), 0, (struct sockaddr*)&addr, &len); //对面关闭链接 if(n == 0) { return false; } //读取出错 else if(n < 0) { return false; } //正常读取 else { return true; }}//发送消息sendMsgbool CBlance::sendMsg(int sockfd, struct sockaddr_in &addr, msg &data){ int n = sendto(sockfd, &data, sizeof(data), 0, (struct sockaddr*) &addr, sizeof(addr)); //发送失败 if(n == -1) { return false; } //正常发送 else { return true; }}int main(){ CBlance cb; cb.mainfun(); return 0;}

转载地址:http://nxnwi.baihongyu.com/

你可能感兴趣的文章
git 学习笔记
查看>>
C++类中的static的用法
查看>>
vector 释放内存 swap
查看>>
在linux下新增一块硬盘的操作。(包含大于2T的硬盘在linux下挂载操作)
查看>>
在32位系统中使用fseek和lseek或fwrite、write写大文件时,最大只能写2G左右的解决办法
查看>>
整理华为C/C++编码规范
查看>>
C语言中嵌入正则表达式
查看>>
libxml2 指南(中文)
查看>>
虚拟机VMware中实现linux与windows的共享
查看>>
undefined reference问题总结
查看>>
souce insight 3.5 修改背景颜色
查看>>
Linux 关闭/开启图形界面(X-window) 命令
查看>>
debug 打印 开关 设计(for c || C++)
查看>>
vmware中虚拟机和主机ping不通的问题。
查看>>
从“冷却时间”谈产品设计
查看>>
常用shell脚本
查看>>
长网站 转换为 短网址 的原理
查看>>
基于http协议的C语言客户端代码
查看>>
我常用的makefile之产生优秀的.depend文件
查看>>
VMware无法识别USB设备的解决方法 以及 从虚拟机中断开USB设备,使其重新连接到windows主机上
查看>>