-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathProcessImpl.java
More file actions
291 lines (258 loc) · 9.46 KB
/
ProcessImpl.java
File metadata and controls
291 lines (258 loc) · 9.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package edu.bits.dc;
import java.rmi.Naming;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.server.UnicastRemoteObject;
import java.util.Date;
/**
* This is an implementation class for the processes in a distributed environment using Java RMI.
*/
public class ProcessImpl extends UnicastRemoteObject implements Process {
// Constants to use in program.
private static final long serialVersionUID = 1L;
private static final int PORT = 1099;
private static final String URI = "rmi://localhost:"+ PORT +"/";
private static final String SIMULATOR = "Simulator";
private static final java.util.Random RANDOM = new java.util.Random();
/**
* Boolean flag to maintain the process status.
*/
private static boolean ALIVE = true;
/**
* Boolean flag to determine the leader process.
*/
private static boolean LEADER = false;
/**
* Randomly generated logical clock value.
*/
private static int LOGICAL_CLOCK = RANDOM.nextInt(1000);
/**
* Unique process name within a distributed environment.
*/
private static String PROCESS_NAME;
/**
* Constructor to initialize a process with a unique name.
* @param pName
* @throws RemoteException
*/
public ProcessImpl(String pName) throws RemoteException {
super();
PROCESS_NAME = pName;
}
@Override
public boolean isAlive() throws RemoteException {
return ALIVE;
}
@Override
public boolean isLeader() throws RemoteException {
return LEADER;
}
@Override
public int getLogicalClock() throws RemoteException {
return LOGICAL_CLOCK;
}
@Override
public String getProcessName() throws RemoteException {
return PROCESS_NAME;
}
@Override
public boolean inquiry(Message message) throws RemoteException {
return LOGICAL_CLOCK < message.getLogicalClock() ? true : false;
}
@Override
public boolean victory(Message message) throws RemoteException {
if(LOGICAL_CLOCK < message.getLogicalClock()) {
LEADER = false; /* respect other leader and step down */
return true;
} else {
return false;
}
}
/**
* Main method to start the simulation.
* Usage:
* To start the simulator: java -jar leader.jar
* To start the process: java -jar leader.jar P1
* @param args
*/
public static void main(String args[]) {
String processName = args.length == 0 ? SIMULATOR : args[0];
// Create RMI registry to bind processes on the given port.
try {
LocateRegistry.createRegistry(PORT);
System.out.println("Leader election simulator started, terminating this process will also end the simulation.");
} catch (Exception e) { /* RMI registry already created */ }
// Bind this process server to the RMI registry with the given uri to keep running.
try {
Naming.rebind(URI + processName, new ProcessImpl(processName));
} catch (Exception e) {
System.out.println("Error while binding the process [" + processName + "] to the RMI registry.");
}
// Different console for simulator and process for validating the algorithm.
if(processName.compareToIgnoreCase(SIMULATOR) == 0) {
System.out.println("*This process will not participate in leader election and will just monitor the simulation.");
System.out.println("Please start individual processes in new consoles. \nUsage: java -jar leader.jar P1 \t(where p1 is a unique process name)");
ALIVE = false; /* do not participate in leader election */
while(true) { /* Monitor simulation till it gets terminated */
monitorSimulation();
}
} else {
System.out.print("Process [" + processName + "] started ");
// Monitor this process till it is alive or auto healed */
monitorProcess();
System.out.print("to participate in leader election.\n");
}
}
/**
* This method monitors the process to participate the leader election.
* @param processName
*/
private static void monitorProcess() {
Thread monitorThread = new Thread() {
public void run() {
while(true) {
try { /* Add some random delay to slow down the process */
Thread.sleep(RANDOM.nextInt(3000) + 2000);
} catch (Exception e) { /* do nothing */ }
if(!LEADER && ALIVE) {
// Check if there is another leader process.
boolean isAnyLeader = false;
for (int i = 0; i < getProcessCount(); i++) {
try {
Process process = getProcess(Naming.list(URI)[i]);
if(process.isAlive() && process.isLeader()) {
isAnyLeader = true;
System.out.println("\nLeader process in distributed system is: " + process.getProcessName());
}
} catch (Exception e) { /* do nothing */ }
}
// if there is no leader, ask for election
if(!isAnyLeader) {
election();
}
}
// Purposefully introduce an error randomly ( for unlucky 13 !! ) for simulation, reset all values and then let auto heal the process.
if(ALIVE && RANDOM.nextInt(1000) % 13 == 0){
System.out.println("\nProcess [" + PROCESS_NAME + "] encountered an error and became unresponsive now.");
System.out.println("You may wait to get this auto healed or terminate the process by pressing 'Ctrl + C'");
ALIVE = false;
LEADER = false;
try {
Thread.sleep(5000);
ALIVE = true;
LOGICAL_CLOCK = RANDOM.nextInt(1000);
} catch (Exception e) { /* do nothing*/ }
}
// Monitor and terminate the process if RMI registry gets unavailable.
try {
Naming.lookup(URI + PROCESS_NAME);
} catch (Exception e) {
System.out.println("\nLeader election simulator stopped, terminating this process.");
System.exit(0);
}
}
};
};
monitorThread.start();
}
/**
* This method gets the overall process count in RMI registry.
* @return
*/
private static int getProcessCount() {
int processCount = 0;
try {
processCount = Naming.list(URI).length;
} catch (Exception e) { /* do nothing */ }
return processCount;
}
/**
* This method gets a process of given name within distributed environment.
* @param processUri
* @return
*/
private static Process getProcess(String processUri) {
String pName = processUri.substring(processUri.lastIndexOf("/") + 1);
try {
return (Process)Naming.lookup(URI + pName);
} catch (Exception e) {
return null;
}
}
/**
* This method perform the election to choose a leader.
* Step 1: Sends inquiry messages to all other N-1 processes, terminates on any -ve response.
* Step 2: Announces victory to all other N-1 processes, asks for re-election for any contradiction.
*/
private static void election() {
System.out.println("\nNo leader elected in distributed system, calling inquiry.");
boolean inquerySuccess = true;
int processCount = getProcessCount();
for (int i = 0; i < processCount; i++) {
try {
Process process = getProcess(Naming.list(URI)[i]);
if(process.getProcessName().equalsIgnoreCase(PROCESS_NAME)) { /* ignore self inquiry */
continue;
}
if(process.isAlive()){
if(!process.inquiry(new Message(PROCESS_NAME, LOGICAL_CLOCK))) { /* another process may be the leader */
inquerySuccess = false;
break;
}
}
} catch (Exception e) { /* do nothing */ }
}
// if still there is no leader, seems I am the Leader !!
if(inquerySuccess) {
System.out.println("\nProcess [" + PROCESS_NAME+ "] seems to be a leader, announcing victory.");
// Sending the victory message to all other processes
boolean isLeader = true;
for (int i = 0; i < processCount; i++) {
try {
Process process = getProcess(Naming.list(URI)[i]);
if(process.getProcessName().equalsIgnoreCase(PROCESS_NAME)) { /* ignore self victory */
continue;
}
if(process.isAlive()){
if(!process.victory(new Message(PROCESS_NAME, LOGICAL_CLOCK))) {
isLeader = false; /* re-election */
System.out.println("Victory countered by process [" + process.getProcessName() + "], re-election.");
}
}
} catch (Exception e) { /* do nothing */ }
}
if(isLeader) {
LEADER = true;
System.out.println("Process [" + PROCESS_NAME+ "] is elacted as a leader: " + new Date());
} else {
election();
}
}
}
/**
* This method monitors the simulation and provides details of every participating processes.
*/
private static void monitorSimulation() {
try { /* Monitor the simulation periodically */
Thread.sleep(100);
} catch (Exception e) { /* do nothing */ }
int processCount = getProcessCount();
if(processCount == 1) {
return;
}
System.out.println("\nTotal number of candidate processes in leader election: " + (processCount - 1));
String pName = "";
for (int i = 0; i < processCount; i++) {
try {
Process process = getProcess(Naming.list(URI)[i]);
pName = Naming.list(URI)[i].substring(Naming.list(URI)[i].lastIndexOf("/") + 1);
if(SIMULATOR.equalsIgnoreCase(pName)) {
continue;
}
System.out.println("Process ["+ pName + "] is up with clock value (" + process.getLogicalClock() + ") and status: " + (process.isAlive() ? (process.isLeader() ? "Leader" : "Alive") : "Unresponsive"));
} catch (Exception e) {
System.out.println("Process ["+ pName + "] is down and disconnected, restart it to participate again.");;
}
}
}
}