Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
CC = g++
CC = g++ -std=c++11
FLAG = -g
INCLUDE =
LIBDIR =
LIB =
LIB = -lpthread
BIN =
TARGET = websocketserver
SRCS = base64.cpp sha1.cpp network_interface.cpp debug_log.cpp websocket_handler.cpp \
websocket_request.cpp main.cpp
websocket_request.cpp main.cpp broadcast_queue.cpp websocket_respond.cpp

$(TARGET):$(SRCS:.cpp=.o)
$(CC) $(FLAG) $(LIBDIR) $(LIB) -o $@ $^
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# websocket
C++解析websocket协议
28 changes: 28 additions & 0 deletions broadcast_queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// Created by wangyaofu on 2018/6/3.
//

#include "broadcast_queue.h"

BroadcastMsgQueue::BroadcastMsgQueue() {

}

BroadcastMsgQueue::~BroadcastMsgQueue() {

}

void BroadcastMsgQueue::addMsg(BroadcastMsg& msg) {
std::lock_guard<std::mutex> lk(msg_mutex_);
msg_queue_.push(msg);
}

int BroadcastMsgQueue::getMsg(BroadcastMsg& broadcastMsg) {
std::lock_guard<std::mutex> lk(msg_mutex_);
if (msg_queue_.size() <= 0) {
return 0;
}
broadcastMsg = msg_queue_.front();
msg_queue_.pop();
return 1;
}
34 changes: 34 additions & 0 deletions broadcast_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
// Created by wangyaofu on 2018/6/3.
//

#ifndef WEBSOCKET_MASTER_BROADCAST_QUEUE_H
#define WEBSOCKET_MASTER_BROADCAST_QUEUE_H

#include "singleton.h"
#include <memory>
#include <mutex>
#include <queue>

struct BroadcastMsg {
int type;
std::string data;
};

class BroadcastMsgQueue {
public:
BroadcastMsgQueue();
~BroadcastMsgQueue();
void addMsg(BroadcastMsg& msg);
int getMsg(BroadcastMsg& broadcastMsg);
size_t queueSize() {
return msg_queue_.size();
}
private:
std::queue<BroadcastMsg> msg_queue_;
std::mutex msg_mutex_;
};

#define BROAD_QUEUE_PTR Singleton<BroadcastMsgQueue>::Instance()

#endif //WEBSOCKET_MASTER_BROADCAST_QUEUE_H
68 changes: 54 additions & 14 deletions network_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
#include <map>
#include "debug_log.h"
#include "network_interface.h"
#include "broadcast_queue.h"
#include "websocket_respond.h"

Network_Interface *Network_Interface::m_network_interface = NULL;

Network_Interface::Network_Interface():
NetworkInterface::NetworkInterface():
epollfd_(0),
listenfd_(0),
websocket_handler_map_()
Expand All @@ -22,11 +23,11 @@ Network_Interface::Network_Interface():
exit(1);
}

Network_Interface::~Network_Interface(){
NetworkInterface::~NetworkInterface(){

}

int Network_Interface::init(){
int NetworkInterface::init(){
listenfd_ = socket(AF_INET, SOCK_STREAM, 0);
if(listenfd_ == -1){
DEBUG_LOG("创建套接字失败!");
Expand All @@ -37,6 +38,8 @@ int Network_Interface::init(){
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT);
int opt = 1;
setsockopt(listenfd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
if(-1 == bind(listenfd_, (struct sockaddr *)(&server_addr), sizeof(server_addr))){
DEBUG_LOG("绑定套接字失败!");
return -1;
Expand All @@ -52,7 +55,7 @@ int Network_Interface::init(){
return 0;
}

int Network_Interface::epoll_loop(){
int NetworkInterface::epoll_loop(){
struct sockaddr_in client_addr;
socklen_t clilen;
int nfds = 0;
Expand Down Expand Up @@ -80,25 +83,34 @@ int Network_Interface::epoll_loop(){
}
}
}
comsume_msg();

}

return 0;
}

int Network_Interface::set_noblock(int fd){
int NetworkInterface::comsume_msg() {
int size = BROAD_QUEUE_PTR->queueSize();
Websocket_Respond respond;

while (size-- > 0) {
BroadcastMsg msg;
BROAD_QUEUE_PTR->getMsg(msg);
char buf[BUFFLEN] = {0};
int len = respond.makeFrame(msg.data.c_str(), msg.data.length(), buf);
broadcast(buf, len);
}
}

int NetworkInterface::set_noblock(int fd){
int flags;
if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
flags = 0;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

Network_Interface *Network_Interface::get_share_network_interface(){
if(m_network_interface == NULL)
m_network_interface = new Network_Interface();
return m_network_interface;
}

void Network_Interface::ctl_event(int fd, bool flag){
void NetworkInterface::ctl_event(int fd, bool flag){
struct epoll_event ev;
ev.data.fd = fd;
ev.events = flag ? EPOLLIN : 0;
Expand All @@ -117,6 +129,34 @@ void Network_Interface::ctl_event(int fd, bool flag){
}
}

void Network_Interface::run(){
void NetworkInterface::run(){
epoll_loop();
}

int NetworkInterface::broadcast(const char *data, int len) {
if (data == NULL || len <= 0) {
return 0;
}
WEB_SOCKET_HANDLER_MAP::iterator iter = websocket_handler_map_.begin();
for (; iter != websocket_handler_map_.end(); ++iter) {
if (iter->first == listenfd_) {
continue;
}
if (singleSend(iter->first, data, len) < 0) {
//ctl_event(iter->first, false);
}
}
return len;
}

int NetworkInterface::singleSend(int fd, const char *data, int len) {
int n = 0, sendSize = 0;
while (sendSize < len) {
if ((n = write(fd, data+sendSize, len-sendSize)) <= 0) {
DEBUG_LOG("write packet error:(%s)", strerror(errno));
return -1;
}
sendSize += n;
}
return sendSize;
}
18 changes: 12 additions & 6 deletions network_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define __NETWORK_INTERFACE__

#include "websocket_handler.h"
#include "singleton.h"

#define PORT 9000
#define TIMEWAIT 100
Expand All @@ -10,24 +11,29 @@

typedef std::map<int, Websocket_Handler *> WEB_SOCKET_HANDLER_MAP;

class Network_Interface {
class NetworkInterface {
public:
NetworkInterface();
~NetworkInterface();

private:
Network_Interface();
~Network_Interface();
int init();
int epoll_loop();
int set_noblock(int fd);
void ctl_event(int fd, bool flag);

int comsume_msg();

public:
void run();
static Network_Interface *get_share_network_interface();
int broadcast(const char* data, int len);
int singleSend(int fd, const char* data, int len);
private:
int epollfd_;
int listenfd_;
WEB_SOCKET_HANDLER_MAP websocket_handler_map_;
static Network_Interface *m_network_interface;
};

#define NETWORK_INTERFACE Network_Interface::get_share_network_interface()
#define NETWORK_INTERFACE Singleton<NetworkInterface>::Instance()

#endif
61 changes: 61 additions & 0 deletions singleton.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
** Copyright (C) 2014 Wang Yaofu
** All rights reserved.
**
**Author:Wang Yaofu voipman@qq.com
**Description: The implement file of class Singleton.
*/
#ifndef _COMMON_SINGLETONG_H_
#define _COMMON_SINGLETONG_H_

//
// singleton class
// Note: template class must
// provide default constructor.
//
#include <cstddef>
#include <cstdlib>
#include <pthread.h>
#include "uncopyable.h"

template<class T>
class Singleton : public Uncopyable {
public:
// global access point
static T *Instance() {
if (mInstance == NULL) {
pthread_once(&mPonce, &Singleton::Init);
}
return mInstance;
}

static T& InstanceRef() {
if (mInstance == NULL) {
pthread_once(&mPonce, &Singleton::Init);
}
return *mInstance;
}

private:
static void Init() {
mInstance = new T;
::atexit(Singleton::Destroy);
}
static void Destroy() {
if (mInstance != NULL) {
delete mInstance;
mInstance = NULL;
}
}

private:
static pthread_once_t mPonce;
// instance for T
static T* volatile mInstance;
};
template<typename T>
pthread_once_t Singleton<T>::mPonce = PTHREAD_ONCE_INIT;

template<typename T>
T* volatile Singleton<T>::mInstance = NULL;
#endif // _COMMON_SINGLETONG_H_
22 changes: 22 additions & 0 deletions uncopyable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
** Copyright (C) 2014 Wang Yaofu
** All rights reserved.
**
**Author:Wang Yaofu voipman@qq.com
**Description: The header file of class Uncopyable.
*/

#ifndef _COMMON_UNCOPYABLE_H
#define _COMMON_UNCOPYABLE_H

class Uncopyable {
protected:
Uncopyable() {}
~Uncopyable() {}

private:
Uncopyable(const Uncopyable& o);
Uncopyable& operator=(const Uncopyable& o);
};

#endif // _COMMON_UNCOPYABLE_H
11 changes: 11 additions & 0 deletions websocket_handler.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <unistd.h>
#include "websocket_handler.h"
#include "broadcast_queue.h"

Websocket_Handler::Websocket_Handler(int fd):
buff_(),
Expand All @@ -11,6 +12,9 @@ Websocket_Handler::Websocket_Handler(int fd):
}

Websocket_Handler::~Websocket_Handler(){
if (NULL != request_) {
delete request_;
}
}

int Websocket_Handler::process(){
Expand All @@ -19,6 +23,13 @@ int Websocket_Handler::process(){
}
request_->fetch_websocket_info(buff_);
request_->print();
std::string payload;
BroadcastMsg broadcastMsg;
broadcastMsg.type = 1;
if (request_->get_payload(broadcastMsg.data) > 0) {
BROAD_QUEUE_PTR->addMsg(broadcastMsg);
}
request_->reset();
memset(buff_, 0, sizeof(buff_));
return 0;
}
Expand Down
8 changes: 7 additions & 1 deletion websocket_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ void Websocket_Request::print(){
"PAYLOAD: %s",
fin_, opcode_, mask_, payload_length_, payload_);

reset();
}

void Websocket_Request::reset(){
Expand All @@ -45,6 +44,13 @@ void Websocket_Request::reset(){
memset(payload_, 0, sizeof(payload_));
}

int Websocket_Request::get_payload(std::string& payload) {
if (payload_length_ > 0) {
payload.assign(payload_, payload_length_);
}
return payload_length_;
}

int Websocket_Request::fetch_fin(char *msg, int &pos){
fin_ = (unsigned char)msg[pos] >> 7;
return 0;
Expand Down
Loading