-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient_server.cc
More file actions
141 lines (125 loc) · 4.8 KB
/
client_server.cc
File metadata and controls
141 lines (125 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
*
* Author: Pramod Kumar(veer) & Soheil
*
* Description: Client server.cc
* It is a thread in every client process and always listen for metadatamanger permission revoke request.
* It is thread safe
*
*/
#include "commons.h"
#include "cs_mdm.h"
#include "c_mdm.h"
string ipAddress;
int port;
string client_server_ip_port ;
class client_server_service_impl : public ClientServerService::Service {
Status fileRevokePermissionRequestHandler (ServerContext* context,const FilePermissionRevokeRequest* request, FilePermissionRevokeResponse* reply) override {
#ifdef DEBUG_FLAG
cout<<"\n"<<__func__ <<":";
std::cout << "\n Got permission invoke message ";
cout<<"\n Start byte: "<<request->startbyte();
cout<<"\n End byte: "<<request->endbyte();
cout<<"\n Request id: "<<request->requestid();
cout<<"\n File name: "<<request->filename();
#endif
permission per1,per2;
int partial=0;
reply->set_requestid(request->requestid());
file_info_store *file = file_dir[request->filename()];
if (request->type() == FilePermissionRevokeRequest::WRITE) {
if(file->status!=CLOSED){
for(auto it = file->access_permission.begin(); it!=file->access_permission.end();it++) {
if((*it).type.find("w")!=string::npos){
partial=1;
pair<int,int> s_e = (*it).start_end;
if(s_e.first==request->startbyte() && s_e.second==request->endbyte()){
file->access_permission.erase(it);
}else if(s_e.first<request->startbyte() && s_e.second==request->endbyte()){
file->access_permission.erase(it);
per1.start_end=make_pair (s_e.first,request->startbyte()-1);
per1.type="rw";
file->access_permission.push_back(per1);
}else if(s_e.first==request->startbyte() && s_e.second>request->endbyte()){
file->access_permission.erase(it);
per2.start_end=make_pair (request->endbyte()+1,s_e.second);
per2.type="rw";
file->access_permission.push_back(per2);
}else{
file->access_permission.erase(it);
per1.start_end=make_pair (s_e.first,request->startbyte()-1);
per1.type="rw";
file->access_permission.push_back(per1);
per2.start_end=make_pair (request->endbyte()+1,s_e.second);
per2.type="rw";
file->access_permission.push_back(per2);
}
break;
}
}
if(partial)
reply->set_code(FilePermissionRevokeResponse::PARTIAL);
else
reply->set_code(FilePermissionRevokeResponse::WHOLE);
}else{
reply->set_code(FilePermissionRevokeResponse::WHOLE);
}
}else if (request->type() == FilePermissionRevokeRequest::READ) {
if(file->status!=CLOSED){
for(auto it = file->access_permission.begin(); it!=file->access_permission.end();it++) {
if((*it).type.find("w")!=string::npos){
pair<int,int> s_e = (*it).start_end;
partial=1;
if(s_e.first==request->startbyte() && s_e.second==request->endbyte()){
file->access_permission.erase(it);
}else if(s_e.first<request->startbyte() && s_e.second==request->endbyte()){
file->access_permission.erase(it);
per1.start_end=make_pair (s_e.first,request->startbyte()-1);
per1.type="r";
file->access_permission.push_back(per1);
}else if(s_e.first==request->startbyte() && s_e.second>request->endbyte()){
file->access_permission.erase(it);
per2.start_end=make_pair (request->endbyte()+1,s_e.second);
per2.type="r";
file->access_permission.push_back(per2);
}else{
file->access_permission.erase(it);
per1.start_end=make_pair (s_e.first,request->startbyte()-1);
per1.type="r";
file->access_permission.push_back(per1);
per2.start_end=make_pair (request->endbyte()+1,s_e.second);
per2.type="r";
file->access_permission.push_back(per2);
}
break;
}
}
if(partial)
reply->set_code(FilePermissionRevokeResponse::PARTIAL);
else
reply->set_code(FilePermissionRevokeResponse::WHOLE);
}else{
reply->set_code(FilePermissionRevokeResponse::WHOLE);
}
}else{
do{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}while(file->status==READING || file->status==WRITING);
file_dir.erase(request->filename());
}
return Status::OK;
}
};
void start_client_server(){
/* Start fileserver */
client_server_service_impl cs_server;
#ifdef DEBUG_FLAG
cout<<"\n\n"<<__func__ <<": " <<client_server_ip_port;
#endif
ServerBuilder builder;
builder.AddListeningPort(client_server_ip_port, grpc::InsecureServerCredentials());
builder.RegisterService(&cs_server);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout <<__func__<< "\nClient Server listening on " << client_server_ip_port << std::endl;
server->Wait();
}