Libevent8:过滤器+zlib实现压缩文件

代码地址: http://121.4.70.4:3000/adminPyf/libevent_study.git

libevent_study/test_bufffer_filter_zlib

主要功能为 客户端向服务端传递一个文本文件

Linux windows 都测试通过的,注意下文件路径就好了

test_bufffer_filter_zlib.cpp:
#include <iostream>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifdef _WIN32

#else
#include <signal.h>
#endif // !_WIN32

using namespace std;


int main()
{
#ifdef _WIN32
    //初始化socket库
    WSADATA wsa;
    WSAStartup(MAKEWORD(2, 2), &wsa);
#else
    if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
    { //忽略管道信号,发送数据给已关闭的socket,会飞掉!
        return 1;
    }
#endif

    //初始化libevent上下文
    event_config *conf = event_config_new();

    //windows中支持IOCP(线程池) 要查看该项是否开启成功,可以在运行程序后打开任务管理器查看线程数量
#ifdef _WIN32
    event_config_set_flag(conf, EVENT_BASE_FLAG_STARTUP_IOCP);
    evthread_use_windows_threads();
    //设置cpu数量
    SYSTEM_INFO si;
    GetSystemInfo(&si);
    event_config_set_num_cpus_hint(conf, si.dwNumberOfProcessors);
#endif

    //初始化配置上下文
    event_base *base = event_base_new_with_config(conf);

    if (!base)
    {
        cout << "event_base_new_with_config failed!" << endl;
        base = event_base_new();
        if (!base)
        {
            cerr << "event_base_new failed" << endl;
            return 0;
        }
    }
    else
    {
        void Server(event_base* base);
        Server(base);
        void Client(event_base* base);
        Client(base);


        event_base_dispatch(base);
        event_base_free(base);
        event_config_free(conf);
    }
    return 0;
}

zlib_server.cpp:
#include <iostream>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>

#ifdef _WIN32
#include <string>
#else
#include <string.h>
#include <signal.h>
#endif // !_WIN32

using namespace std;

#define SPORT 5001
struct Status {
    bool start = false;
    FILE *fp = 0;
    string filename = "";
};
bufferevent_filter_result server_filter_in(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit,
    enum bufferevent_flush_mode mode, void *ctx)
{
    cout << "server_filter_in" << endl;

    //1.接收客户端发送的文件名
    char data[1024] = { 0 };
    int len = evbuffer_remove(src, data, sizeof(data) - 1);
    evbuffer_add(dst, data, len);
    return BEV_OK;
}
void server_read_cb(bufferevent *bev, void *arg) {

    Status *status = (Status*)arg;
    if (!status->start) {
        char data[1024] = { 0 };
        bufferevent_read(bev, data, sizeof(data) - 1);
        status->filename = data;
        string out = "out\\";
        out += data;
        status->fp = fopen(out.c_str(), "wb");
        if (!status->fp) {
            cout << "open file " << out << "faild!" << endl;
            return;
        }

        bufferevent_write(bev, "ok", 2);
        status->start = true;
        return;
    }

    //写入文件
    do {
        char data[1024] = { 0 };
        int len = bufferevent_read(bev, data, sizeof(data) - 1);
        if (len >= 0) {
            fwrite(data, 1, len, status->fp);
            fflush(status->fp);
        }
    } while (evbuffer_get_length(bufferevent_get_input(bev)) > 0);

}
void server_event_cb(bufferevent *bev, short events, void *arg) {
    cout << "server_event_cb" << events << endl;

    Status *status = (Status *)arg;
    if (events&BEV_EVENT_EOF || events & BEV_EVENT_TIMEOUT) {
        if (status->fp) {
            fclose(status->fp);
            status->fp = 0;
        }
        bufferevent_free(bev);
    }
}


void server_listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{
    std::cout << "server_listen_cb" << std::endl;
    event_base *base = (event_base *)arg;
    // 1.创建bufferevent 用于通信
    bufferevent *bev = bufferevent_socket_new(base, s, BEV_OPT_CLOSE_ON_FREE);
    // 2.添加过滤 并设置输入回调
    Status *status = new Status;
    bufferevent *bev_filter = bufferevent_filter_new(bev,
        server_filter_in,//输入过滤函数
        0, //输出过滤
        BEV_OPT_CLOSE_ON_FREE, //关闭filter同时关闭bufferevent
        0, //清理回调
        status); //传递参数
    // 3.设置回调  读取 事件(处理连接断开) 
    bufferevent_setcb(bev_filter, server_read_cb, 0, server_event_cb, status);
    bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
}


void Server(event_base* base) {
    cout << "begin Server" << endl;

    sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(SPORT);

    evconnlistener *ev = evconnlistener_new_bind(base,                                      //libevent上下文
        server_listen_cb,                                   //接收到连接的回调函数
        base,                                       //回调函数获取的参数(根据业务来)
        LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, //地址重用,evconnlistener关闭同时关闭socket
        10,                                     //连接队列大小,对应listen函数参数
        (sockaddr *)&sin,                           //绑定的地址和端口
        sizeof(sin));
}

zlib_client.cpp:

#include <iostream>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#include <zlib.h>
#ifdef _WIN32

#else
#include <signal.h>
#endif // !_WIN32

using namespace std;

#define FILEPATH "001.txt"
#define SPORT 5001
struct ClientStatus {
    FILE *fp = 0;
    bool end = false;
    bool startSend = false;
    z_stream *z_output = 0;
    ~ClientStatus() {
        delete z_output;
        fclose(fp);
        fp = 0;
        z_output = 0;
    }
};
bufferevent_filter_result clinet_filter_out(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit,
    enum bufferevent_flush_mode mode, void *ctx)
{
    ClientStatus *clientStatus = (ClientStatus * )ctx;

    if (!clientStatus->startSend) {
        char data[1024] = { 0 };
        int len = evbuffer_remove(src, data, sizeof(data) - 1);
        evbuffer_add(dst, data, len);
        return BEV_OK;
    }
    //开始压缩文件
    //1.取出buffer中的数据的引用
    evbuffer_iovec v_in[1] = {0};
    int n = evbuffer_peek(src, -1, 0, v_in, 1);
    if (n <= 0) {
        //没有数据
        return BEV_NEED_MORE; 
    }
    z_stream *p = clientStatus->z_output;
    if (!p) {
        return BEV_ERROR;
    }
    //输入数据大小
    p->avail_in = v_in[0].iov_len;
    //输入数据地址
    p->next_in = (Byte*)v_in[0].iov_base;

    //申请输出空间大小
    evbuffer_iovec v_out[1] = { 0 };
     n = evbuffer_reserve_space(dst, 4096, v_out, 1);
    if (n < 0)
        return BEV_ERROR;

    //zlib输出空间大小
    p->avail_out = v_out[0].iov_len;
    //zlib输出空间地址
    p->next_out = (Byte*)v_out[0].iov_base;

    //zlib 压缩
    int re = deflate(p, Z_SYNC_FLUSH);
    if (re != Z_OK) {
        cerr << "deflate falid" << endl;
    }

    //压缩用了多少数据
    //p->avail_in 未处理数据大小
    int nread = v_in[0].iov_len - p->avail_in;
    //压缩后数据大小
    //v_out[0].iov_len  剩余空间大小
    int nwrite = v_out[0].iov_len - p->avail_out;

    //移除 source evbuffer中数据
    evbuffer_drain(src, nread);

    //传入des evbuffer
    v_out[0].iov_len = nwrite;
    evbuffer_commit_space(dst, v_out, 1);
    cout << "nread = " << nread << " nwirte=" << nwrite << endl;

}
void clinet_read_cb(bufferevent *bev, void *arg) {
    ClientStatus *clientStatus = (ClientStatus *)arg;
    //002 接收服务端发送的OK消息
    char data[1024] = { 0 };
    int len = bufferevent_read(bev, data, sizeof(data) - 1);
    if (strcmp(data, "ok") == 0) {
        cout << data << endl;
        clientStatus->startSend = true;
        //开始发送文件
        bufferevent_trigger(bev, EV_WRITE, 0);
    }
    else {
        bufferevent_free(bev);
    }

    cout << "clinet_read_cb" << endl;
}

void clinet_write_cb(bufferevent *bev, void *arg) {
    ClientStatus *clientStatus = (ClientStatus *)arg;
    FILE *fp = clientStatus->fp;
    //判断什么时候清理资源
    if (clientStatus->end) {
        //判断缓冲是否有数据 如果有就刷新
        //获取过滤器绑定的buffer
        bufferevent * be = bufferevent_get_underlying(bev);
        evbuffer *evb = bufferevent_get_output(be);
        int len = evbuffer_get_length(evb);
        if (len <= 0) {
            bufferevent_free(bev);
            delete clientStatus;
            return;
        }

        bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
        return;
    }
    if (!fp) {
        cout << "open file " << FILEPATH << "faild!" << endl;
        return;
    }
    char data[1024] = { 0 };
    int len = fread(data, 1, sizeof(data), fp);
    if (len <= 0) {
        clientStatus->end = true;
        bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
        return;
    }
    bufferevent_write(bev, data, len);

}
void clinet_event_cb(bufferevent *bev, short events, void *arg) {
    cout << "clinet_event_cb" << endl;
}

void client_event_cb(struct bufferevent *bev, short what, void *ctx) {
    cout << "client_event_cb" << what << endl;
    if (what & BEV_EVENT_CONNECTED) {
        cout << "BEV_EVENT_CONNECTED" << endl;
        //001 发送文件名
        bufferevent_write(bev, FILEPATH,strlen(FILEPATH));

        FILE *fp = fopen(FILEPATH, "rb");
        if (!fp) {
            cout << "open file " << FILEPATH << "faild!" << endl;
        }
        ClientStatus *clientStatus = new ClientStatus;
        clientStatus->fp = fp;

        //初始化zlib上下文
        clientStatus->z_output = new z_stream();
        deflateInit(clientStatus->z_output, Z_DEFAULT_COMPRESSION);

        //创建输出过滤
        bufferevent *bev_filter = bufferevent_filter_new(bev,
            0,//输入过滤函数
            clinet_filter_out, //输出过滤
            BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS, //关闭filter同时关闭bufferevent  延时调用
            0, //清理回调
            clientStatus);


        bufferevent_setcb(bev_filter, clinet_read_cb, clinet_write_cb, clinet_event_cb, clientStatus);
        bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
    }
}
void Client(event_base* base) {
    cout << "begin Client" << endl;
    //连接服务器
    sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(SPORT);
    evutil_inet_pton(AF_INET,"127.0.0.1",&sin.sin_addr.s_addr);
    bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);

    //只绑定连接事件回调,用于确认连接成功
    bufferevent_enable(bev, EV_READ | EV_WRITE);
    bufferevent_setcb(bev, 0, 0, client_event_cb, 0);

    bufferevent_socket_connect(bev, (sockaddr*)&sin, sizeof(sin));


}
Libevent8:过滤器+zlib实现压缩文件

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动到顶部