代码地址: http://121.4.70.4:3000/adminPyf/libevent_study.git
libevent_study/test_bufffer_filter_zlib
主要功能为 客户端向服务端传递一个文本文件
Linux windows 都测试通过的,注意下文件路径就好了
test_bufffer_filter_zlib.cpp:
#include <iostream>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifdef _WIN32
#else
#include <signal.h>
#endif // !_WIN32
using namespace std;
int main()
{
#ifdef _WIN32
//初始化socket库
WSADATA wsa;
WSAStartup(MAKEWORD(2, 2), &wsa);
#else
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
{ //忽略管道信号,发送数据给已关闭的socket,会飞掉!
return 1;
}
#endif
//初始化libevent上下文
event_config *conf = event_config_new();
//windows中支持IOCP(线程池) 要查看该项是否开启成功,可以在运行程序后打开任务管理器查看线程数量
#ifdef _WIN32
event_config_set_flag(conf, EVENT_BASE_FLAG_STARTUP_IOCP);
evthread_use_windows_threads();
//设置cpu数量
SYSTEM_INFO si;
GetSystemInfo(&si);
event_config_set_num_cpus_hint(conf, si.dwNumberOfProcessors);
#endif
//初始化配置上下文
event_base *base = event_base_new_with_config(conf);
if (!base)
{
cout << "event_base_new_with_config failed!" << endl;
base = event_base_new();
if (!base)
{
cerr << "event_base_new failed" << endl;
return 0;
}
}
else
{
void Server(event_base* base);
Server(base);
void Client(event_base* base);
Client(base);
event_base_dispatch(base);
event_base_free(base);
event_config_free(conf);
}
return 0;
}
zlib_server.cpp:
#include <iostream>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#ifdef _WIN32
#include <string>
#else
#include <string.h>
#include <signal.h>
#endif // !_WIN32
using namespace std;
#define SPORT 5001
struct Status {
bool start = false;
FILE *fp = 0;
string filename = "";
};
bufferevent_filter_result server_filter_in(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit,
enum bufferevent_flush_mode mode, void *ctx)
{
cout << "server_filter_in" << endl;
//1.接收客户端发送的文件名
char data[1024] = { 0 };
int len = evbuffer_remove(src, data, sizeof(data) - 1);
evbuffer_add(dst, data, len);
return BEV_OK;
}
void server_read_cb(bufferevent *bev, void *arg) {
Status *status = (Status*)arg;
if (!status->start) {
char data[1024] = { 0 };
bufferevent_read(bev, data, sizeof(data) - 1);
status->filename = data;
string out = "out\\";
out += data;
status->fp = fopen(out.c_str(), "wb");
if (!status->fp) {
cout << "open file " << out << "faild!" << endl;
return;
}
bufferevent_write(bev, "ok", 2);
status->start = true;
return;
}
//写入文件
do {
char data[1024] = { 0 };
int len = bufferevent_read(bev, data, sizeof(data) - 1);
if (len >= 0) {
fwrite(data, 1, len, status->fp);
fflush(status->fp);
}
} while (evbuffer_get_length(bufferevent_get_input(bev)) > 0);
}
void server_event_cb(bufferevent *bev, short events, void *arg) {
cout << "server_event_cb" << events << endl;
Status *status = (Status *)arg;
if (events&BEV_EVENT_EOF || events & BEV_EVENT_TIMEOUT) {
if (status->fp) {
fclose(status->fp);
status->fp = 0;
}
bufferevent_free(bev);
}
}
void server_listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{
std::cout << "server_listen_cb" << std::endl;
event_base *base = (event_base *)arg;
// 1.创建bufferevent 用于通信
bufferevent *bev = bufferevent_socket_new(base, s, BEV_OPT_CLOSE_ON_FREE);
// 2.添加过滤 并设置输入回调
Status *status = new Status;
bufferevent *bev_filter = bufferevent_filter_new(bev,
server_filter_in,//输入过滤函数
0, //输出过滤
BEV_OPT_CLOSE_ON_FREE, //关闭filter同时关闭bufferevent
0, //清理回调
status); //传递参数
// 3.设置回调 读取 事件(处理连接断开)
bufferevent_setcb(bev_filter, server_read_cb, 0, server_event_cb, status);
bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
}
void Server(event_base* base) {
cout << "begin Server" << endl;
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(SPORT);
evconnlistener *ev = evconnlistener_new_bind(base, //libevent上下文
server_listen_cb, //接收到连接的回调函数
base, //回调函数获取的参数(根据业务来)
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, //地址重用,evconnlistener关闭同时关闭socket
10, //连接队列大小,对应listen函数参数
(sockaddr *)&sin, //绑定的地址和端口
sizeof(sin));
}
zlib_client.cpp:
#include <iostream>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#include <zlib.h>
#ifdef _WIN32
#else
#include <signal.h>
#endif // !_WIN32
using namespace std;
#define FILEPATH "001.txt"
#define SPORT 5001
struct ClientStatus {
FILE *fp = 0;
bool end = false;
bool startSend = false;
z_stream *z_output = 0;
~ClientStatus() {
delete z_output;
fclose(fp);
fp = 0;
z_output = 0;
}
};
bufferevent_filter_result clinet_filter_out(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit,
enum bufferevent_flush_mode mode, void *ctx)
{
ClientStatus *clientStatus = (ClientStatus * )ctx;
if (!clientStatus->startSend) {
char data[1024] = { 0 };
int len = evbuffer_remove(src, data, sizeof(data) - 1);
evbuffer_add(dst, data, len);
return BEV_OK;
}
//开始压缩文件
//1.取出buffer中的数据的引用
evbuffer_iovec v_in[1] = {0};
int n = evbuffer_peek(src, -1, 0, v_in, 1);
if (n <= 0) {
//没有数据
return BEV_NEED_MORE;
}
z_stream *p = clientStatus->z_output;
if (!p) {
return BEV_ERROR;
}
//输入数据大小
p->avail_in = v_in[0].iov_len;
//输入数据地址
p->next_in = (Byte*)v_in[0].iov_base;
//申请输出空间大小
evbuffer_iovec v_out[1] = { 0 };
n = evbuffer_reserve_space(dst, 4096, v_out, 1);
if (n < 0)
return BEV_ERROR;
//zlib输出空间大小
p->avail_out = v_out[0].iov_len;
//zlib输出空间地址
p->next_out = (Byte*)v_out[0].iov_base;
//zlib 压缩
int re = deflate(p, Z_SYNC_FLUSH);
if (re != Z_OK) {
cerr << "deflate falid" << endl;
}
//压缩用了多少数据
//p->avail_in 未处理数据大小
int nread = v_in[0].iov_len - p->avail_in;
//压缩后数据大小
//v_out[0].iov_len 剩余空间大小
int nwrite = v_out[0].iov_len - p->avail_out;
//移除 source evbuffer中数据
evbuffer_drain(src, nread);
//传入des evbuffer
v_out[0].iov_len = nwrite;
evbuffer_commit_space(dst, v_out, 1);
cout << "nread = " << nread << " nwirte=" << nwrite << endl;
}
void clinet_read_cb(bufferevent *bev, void *arg) {
ClientStatus *clientStatus = (ClientStatus *)arg;
//002 接收服务端发送的OK消息
char data[1024] = { 0 };
int len = bufferevent_read(bev, data, sizeof(data) - 1);
if (strcmp(data, "ok") == 0) {
cout << data << endl;
clientStatus->startSend = true;
//开始发送文件
bufferevent_trigger(bev, EV_WRITE, 0);
}
else {
bufferevent_free(bev);
}
cout << "clinet_read_cb" << endl;
}
void clinet_write_cb(bufferevent *bev, void *arg) {
ClientStatus *clientStatus = (ClientStatus *)arg;
FILE *fp = clientStatus->fp;
//判断什么时候清理资源
if (clientStatus->end) {
//判断缓冲是否有数据 如果有就刷新
//获取过滤器绑定的buffer
bufferevent * be = bufferevent_get_underlying(bev);
evbuffer *evb = bufferevent_get_output(be);
int len = evbuffer_get_length(evb);
if (len <= 0) {
bufferevent_free(bev);
delete clientStatus;
return;
}
bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
return;
}
if (!fp) {
cout << "open file " << FILEPATH << "faild!" << endl;
return;
}
char data[1024] = { 0 };
int len = fread(data, 1, sizeof(data), fp);
if (len <= 0) {
clientStatus->end = true;
bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
return;
}
bufferevent_write(bev, data, len);
}
void clinet_event_cb(bufferevent *bev, short events, void *arg) {
cout << "clinet_event_cb" << endl;
}
void client_event_cb(struct bufferevent *bev, short what, void *ctx) {
cout << "client_event_cb" << what << endl;
if (what & BEV_EVENT_CONNECTED) {
cout << "BEV_EVENT_CONNECTED" << endl;
//001 发送文件名
bufferevent_write(bev, FILEPATH,strlen(FILEPATH));
FILE *fp = fopen(FILEPATH, "rb");
if (!fp) {
cout << "open file " << FILEPATH << "faild!" << endl;
}
ClientStatus *clientStatus = new ClientStatus;
clientStatus->fp = fp;
//初始化zlib上下文
clientStatus->z_output = new z_stream();
deflateInit(clientStatus->z_output, Z_DEFAULT_COMPRESSION);
//创建输出过滤
bufferevent *bev_filter = bufferevent_filter_new(bev,
0,//输入过滤函数
clinet_filter_out, //输出过滤
BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS, //关闭filter同时关闭bufferevent 延时调用
0, //清理回调
clientStatus);
bufferevent_setcb(bev_filter, clinet_read_cb, clinet_write_cb, clinet_event_cb, clientStatus);
bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
}
}
void Client(event_base* base) {
cout << "begin Client" << endl;
//连接服务器
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(SPORT);
evutil_inet_pton(AF_INET,"127.0.0.1",&sin.sin_addr.s_addr);
bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
//只绑定连接事件回调,用于确认连接成功
bufferevent_enable(bev, EV_READ | EV_WRITE);
bufferevent_setcb(bev, 0, 0, client_event_cb, 0);
bufferevent_socket_connect(bev, (sockaddr*)&sin, sizeof(sin));
}
Libevent8:过滤器+zlib实现压缩文件