-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathmanager.py
More file actions
149 lines (119 loc) · 5.07 KB
/
manager.py
File metadata and controls
149 lines (119 loc) · 5.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/python3
from datetime import datetime
import logging
import operator
import os
import re
import threading
import time
import psutil
import random
import readline # For nice CLI
import subprocess
import sys
from pathlib import Path, PureWindowsPath
# Plotman libraries
import job
import plot_util
import archive # for get_archdir_freebytes(). TODO: move to avoid import loop
# Constants
MIN = 60 # Seconds
HR = 3600 # Seconds
MAX_AGE = 1000_000_000 # Arbitrary large number of seconds
def dstdirs_to_furthest_phase(all_jobs):
'''Return a map from dst dir to a phase tuple for the most progressed job
that is emitting to that dst dir.'''
result = {}
for j in all_jobs:
if not j.dstdir in result.keys() or result[j.dstdir] < j.progress():
result[j.dstdir] = j.progress()
return result
def dstdirs_to_youngest_phase(all_jobs):
'''Return a map from dst dir to a phase tuple for the least progressed job
that is emitting to that dst dir.'''
result = {}
for j in all_jobs:
if not j.dstdir in result.keys() or result[j.dstdir] > j.progress():
result[j.dstdir] = j.progress()
return result
def phases_permit_new_job(phases, sched_cfg):
'''Scheduling logic: return True if it's OK to start a new job on a tmp dir
with existing jobs in the provided phases.'''
if len(phases) == 0:
return True
milestone_1 = ( sched_cfg['tmpdir_stagger_phase_major'],
sched_cfg['tmpdir_stagger_phase_minor'] )
# milestone_2 = (4, 0)
if len([p for p in phases if p < milestone_1]) > 0:
return False
# if len([p for p in phases if milestone_1 <= p and p <milestone_2]) > 1:
# return False
# No more than 3 jobs total on the tmpdir
if len(phases) >= sched_cfg['tmpdir_max_jobs']:
return False
return True
def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg):
jobs = job.Job.get_running_jobs(dir_cfg['log'])
wait_reason = None # If we don't start a job this iteration, this says why.
youngest_job_age = min(jobs, key=job.Job.get_time_wall).get_time_wall() if jobs else MAX_AGE
global_stagger = int(sched_cfg['global_stagger_m'] * MIN)
if (youngest_job_age < global_stagger):
wait_reason = 'stagger (%ds/%ds)' % (
youngest_job_age, global_stagger)
else:
tmp_to_all_phases = [ (d, job.job_phases_for_tmpdir(d, jobs))
for d in dir_cfg['tmp'] ]
eligible = [ (d, phases) for (d, phases) in tmp_to_all_phases
if phases_permit_new_job(phases, sched_cfg) ]
rankable = [ (d, phases[0]) if phases else (d, (999, 999))
for (d, phases) in eligible ]
if not eligible:
wait_reason = 'no eligible tempdirs'
else:
# Plot to oldest tmpdir.
tmpdir = max(rankable, key=operator.itemgetter(1))[0]
# Select the dst dir least recently selected
dir2ph = dstdirs_to_youngest_phase(jobs)
unused_dirs = [d for d in dir_cfg['dst'] if d not in dir2ph.keys()]
dstdir = ''
if unused_dirs:
dstdir = random.choice(unused_dirs)
else:
dstdir = max(dir2ph, key=dir2ph.get)
logpath = Path(dir_cfg['log'])
logfile = PureWindowsPath(logpath / datetime.now().strftime('%Y-%m-%d-%H.%M.%S.log'))
#logfile = os.path.join(Path(dir_cfg['log']),)
print(logfile)
plot_args = [r'C:\Users\Wofl\AppData\Local\Chia-Blockchain\app-1.0.3\resources\app.asar.unpacked\daemon\chia2.exe','plots', 'create',
'-k', str(plotting_cfg['k']),
'-r', str(plotting_cfg['n_threads']),
'-u', str(plotting_cfg['n_buckets']),
'-b', str(plotting_cfg['job_buffer']),
'-t', tmpdir,
'-d', dstdir ]
if 'e' in plotting_cfg and plotting_cfg['e']:
plot_args.append('-e')
if 'tmp2' in dir_cfg:
plot_args.append('-2')
plot_args.append(dir_cfg['tmp2'])
# logfile = repr(logfile)
logmsg = ('Starting plot job: %s ; logging to %s' % (' '.join(plot_args), logfile))
#print(logfile)
#print(logmsg)
# start_new_sessions to make the job independent of this controlling tty.
p = subprocess.Popen(plot_args,
stdout=open(logfile, 'w'),
stderr=subprocess.STDOUT,
start_new_session=True)
psutil.Process(p.pid) #.nice(ABOVE_NORMAL_PRIORITY_CLASS)
# print( psutil.Process(p.pid).cmdline())
# x = psutil.Process(p.pid)
# x.nice(15)
return (True, logmsg)
return (False, wait_reason)
def select_jobs_by_partial_id(jobs, partial_id):
selected = []
for j in jobs:
if j.plot_id.startswith(partial_id):
selected.append(j)
return selected