Skip to content

Commit b0896a7

Browse files
committed
newtensor:的小bug
1 parent f7319de commit b0896a7

12 files changed

Lines changed: 471 additions & 84 deletions

File tree

excuter/common/src/client/udpserver.cpp

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ namespace client
1616
close(sockfd);
1717
}
1818
}
19-
void udpserver::start()
19+
void udpserver::start(queue<deepx::op::Op> &queue)
2020
{
2121
// 创建UDP套接字
2222
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
@@ -26,8 +26,7 @@ namespace client
2626
}
2727

2828
memset(&servaddr, 0, sizeof(servaddr));
29-
memset(&cliaddr, 0, sizeof(cliaddr));
30-
29+
3130
// 绑定IP和端口
3231
servaddr.sin_family = AF_INET; // IPv4
3332
servaddr.sin_addr.s_addr = INADDR_ANY;
@@ -42,7 +41,7 @@ namespace client
4241
while (true)
4342
{
4443
len = sizeof(cliaddr);
45-
n = recvfrom(sockfd, (char *)buffer, sizeof(buffer), 0, (struct sockaddr *)&cliaddr, &len);
44+
n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr *)&cliaddr, &len);
4645
buffer[n] = '\0';
4746

4847
// 新增换行拆分逻辑
@@ -51,13 +50,17 @@ namespace client
5150
while (getline(ss, line)) {
5251
if (!line.empty()) {
5352
cout << "~" << line << endl;
54-
char *IR = const_cast<char *>(line.c_str());
55-
string strresp=func(IR);
56-
sendto(sockfd, strresp.c_str(), strresp.size(), 0,
57-
(const struct sockaddr*)&cliaddr, len);
53+
deepx::op::Op op;
54+
op.recv_at = chrono::system_clock::now();
55+
op.load(line);
56+
queue.push(op);
5857
}
5958
}
6059
}
6160
close(sockfd);
6261
}
62+
void udpserver::resp(string str){
63+
sendto(sockfd, str.c_str(), str.size(), 0, // 改为sendto
64+
(const struct sockaddr *)&cliaddr, sizeof(cliaddr));
65+
}
6366
}

excuter/common/src/client/udpserver.hpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,28 @@
77
#include <sys/un.h>
88
#include <unistd.h>
99
#include <functional>
10-
10+
#include "deepx/op/op.hpp"
11+
#include <queue>
12+
1113
namespace client{
14+
using namespace std;
1215
class udpserver
1316
{
1417
private:
1518
int port;
1619
int sockfd;
17-
struct sockaddr_in servaddr, cliaddr;
20+
struct sockaddr_in servaddr,cliaddr;
1821
char buffer[1024];
1922
socklen_t len;
2023
ssize_t n;
2124
public:
2225
udpserver(int port);
2326
~udpserver();
24-
void start();
25-
using handlefunc = std::function<std::string(const char *buffer)>;
27+
void start(queue<deepx::op::Op> &tasks);
28+
using handlefunc = std::function<void(const char *buffer)>;
2629
handlefunc func;
30+
void resp(string str);
2731
};
28-
2932
}
3033

3134
#endif
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#ifndef __WORKER_HPP__
2+
#define __WORKER_HPP__
3+
4+
5+
namespace client{
6+
7+
}
8+
#endif

excuter/common/src/deepx/op/op.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ namespace deepx::op
55
//与deepx/front/py/deepx/nn/deepxir.py对应
66

77
// 新格式示例:mul@float32 a(a_grad) b(b_grad) -> a(a_grad) //id=1 create_time=1714512000 send_time=1714512000 recv_time=1714512000
8-
void Op::load(const char* str) {
9-
string input(str);
10-
8+
void Op::load(const string &input) {
119
// 分割元数据部分
1210
size_t meta_pos = input.find("//");
1311
string body = input.substr(0, meta_pos);

excuter/common/src/deepx/op/op.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ namespace deepx::op
5454
throw std::runtime_error("backward not implemented");
5555
}
5656

57-
void load(const char* str) ;
57+
void load(const string &str) ;
5858
void init(const string &opname,
5959
const string &dtype,
6060
const vector<string> &args,
Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
#include <mutex>
2+
#include <thread>
23

34
#include <deepx/tensorfunc/init.hpp>
45
#include "deepx/op/op.hpp"
56
#include "deepx/op/opfactory.hpp"
67
#include "deepx/mem/mem.hpp"
78
#include "client/udpserver.hpp"
8-
9+
910
using namespace deepx::tensorfunc;
1011
using namespace deepx::mem;
1112

@@ -18,38 +19,45 @@ int main()
1819
deepx::op::OpFactory opfactory;
1920
register_all(opfactory);
2021

21-
server.func = [&mem, &opfactory, &memmutex](const char *buffer)->std::string
22-
{
23-
24-
25-
deepx::op::Op op;
26-
op.recv_at = chrono::system_clock::now();
27-
op.load(buffer);
28-
std::string resp=to_string(op.id);
29-
resp+=" recv_at:";
30-
resp+=to_string(op.recv_at.time_since_epoch().count());
31-
if (opfactory.ops.find(op.name)==opfactory.ops.end()){
32-
cout<<"<op> "<<op.name<<" not found"<<endl;
33-
resp+="error op not found";
34-
}
35-
auto &type_map = opfactory.ops.find(op.name)->second;
36-
if (type_map.find(op.dtype)==type_map.end()){
37-
cout<<"<op>"<<op.name<<" "<<op.dtype<<" not found"<<endl;
38-
resp+="error dtype not found";
39-
}
40-
auto src = type_map.find(op.dtype)->second;
41-
42-
(*src).init(op.name, op.dtype, op.args, op.returns, op.grad, op.args_grad, op.returns_grad);
43-
memmutex.lock();
44-
if (op.grad) {
45-
(*src).backward(mem);
46-
}else {
47-
(*src).forward(mem);
22+
queue<deepx::op::Op> tasks;
23+
// 启动一个新线程来运行UDP服务器
24+
std::thread server_thread([&server, &tasks]() {
25+
server.start(tasks);
26+
});
27+
// 分离线程,让它在后台运行
28+
server_thread.detach();
29+
30+
while (true) {
31+
if (!tasks.empty()) {
32+
deepx::op::Op op = tasks.front();
33+
tasks.pop();
34+
35+
std::string resp=to_string(op.id);
36+
resp+="recv_at:";
37+
resp+=to_string(op.recv_at.time_since_epoch().count());
38+
if (opfactory.ops.find(op.name)==opfactory.ops.end()){
39+
cout<<"<op> "<<op.name<<" not found"<<endl;
40+
resp+="error op not found";
41+
}
42+
auto &type_map = opfactory.ops.find(op.name)->second;
43+
if (type_map.find(op.dtype)==type_map.end()){
44+
cout<<"<op>"<<op.name<<" "<<op.dtype<<" not found"<<endl;
45+
resp+="error dtype not found";
46+
}
47+
auto src = type_map.find(op.dtype)->second;
48+
49+
(*src).init(op.name, op.dtype, op.args, op.returns, op.grad, op.args_grad, op.returns_grad);
50+
memmutex.lock();
51+
if (op.grad) {
52+
(*src).backward(mem);
53+
}else {
54+
(*src).forward(mem);
55+
}
56+
memmutex.unlock();
57+
resp+=" success";
58+
server.resp(resp);
4859
}
49-
memmutex.unlock();
50-
resp+=" success";
51-
return resp;
52-
};
53-
server.start();
60+
}
61+
5462
return 0;
5563
}

excuter/op-mem-ompsimd/src/deepx/op/new.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ namespace deepx::op{
2525
vector<int> shape=mem.getvector<int32_t>(this->args[0]);
2626
Tensor<T> t=tensorfunc::New<T>(shape);
2727
mem.addtensor(name,t);
28-
}else if (this->args.size()>1){
28+
}else{
2929
vector<int> shape;
3030
for (int i = 0; i < this->args.size(); i++) {
3131
shape.push_back(atoi(this->args[i].c_str()));

front/py/deepx/scheduler/client/allclient.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,3 @@ def send(ir:DeepxIR) -> Optional[dict]:
1212
_id_counter=_id_counter+1
1313
ir._id=_id_counter
1414
resp=default_client.send(str(ir))
15-
print(resp)
Lines changed: 23 additions & 0 deletions
Loading
Lines changed: 96 additions & 0 deletions
Loading

0 commit comments

Comments
 (0)