-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathweb.py
More file actions
96 lines (74 loc) · 2.18 KB
/
web.py
File metadata and controls
96 lines (74 loc) · 2.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import os
import ujson
from flask import Flask, request
from multiprocessing import Process
from conf import Cfg
from datasets import Datasets
from storage.lmdbStorage import LmdbStorage
app = Flask(__name__)
cfg = Cfg()
db = LmdbStorage(cfg)
db.load()
d = Datasets(cfg, db)
@app.route("/")
def main():
data = {}
keys = ["name", "tags", "paths"]
for k, v in db.data.items():
data[k] = {key: v[key] for key in keys}
return ujson.dumps(data)
@app.route("/detail/<id>")
def detail(id):
data = db.get(id)
if "usages" in data:
data["usages"] = sorted(data["usages"], key=lambda x: x["timestamp"],
reverse=True)
if "changelog" in data:
data["changelog"] = sorted(data["changelog"], key=lambda x: x[0][3],
reverse=True)
if "characteristics" in data:
data["characteristics"] = sorted(data["characteristics"].items(),
key=lambda x: x[0])
for i, (k, v) in enumerate(data["characteristics"]):
data["characteristics"][i] = (
k, sorted(v.items(), key=lambda x: x[0]))
return ujson.dumps(data)
@app.route("/use/<id>", methods=['POST'])
def use(id):
data = db.get(id)
usage = request.json
if "action" not in usage:
usage["action"] = "read"
d.use(id, usage)
return ujson.dumps(data)
@app.route("/update/<id>", methods=['POST'])
def update(id):
# todo log only which things changed
data = request.json
stored = db.get(id)
d.log_change(id, [[None, stored, data, time.time()]])
db.update(id, data)
return '', 200
@app.route("/reload")
def reload():
# todo way to update periodically
db.load()
return '', 200
@app.route("/scan")
def scan():
p = Process(target=_scan_proc)
p.start()
return '', 200
def _scan_proc():
# this func is called from another process (passing objects may be bad idea)
cfg = Cfg()
db = LmdbStorage(cfg)
db.load()
d = Datasets(cfg, db)
d.scan(cfg["datasets"])
db.load()
@app.route("/new")
def new():
return ujson.dumps(d.generate())
if __name__ == "__main__":
app.run(host="0.0.0.0")