-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRideSharingSystem.java
More file actions
151 lines (150 loc) · 6.39 KB
/
RideSharingSystem.java
File metadata and controls
151 lines (150 loc) · 6.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import java.io.*;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
public class RideSharingSystem {
// Simple Task data holder
static class Task {
final int id;
final String payload;
Task(int id, String payload) { this.id = id; this.payload = payload; }
// a single poison object for shutdown signaling
static final Task POISON = new Task(-1, "POISON");
}
// Thread-safe queue wrapper
static class TaskQueue {
private final BlockingQueue<Task> q = new LinkedBlockingQueue<>();
public void addTask(Task t) throws InterruptedException { q.put(t); }
public Task getTask() throws InterruptedException { return q.take(); }
}
// Result writer with synchronized write to avoid race on file writes
static class ResultWriter {
private final BufferedWriter writer;
ResultWriter(BufferedWriter writer) { this.writer = writer; }
public void writeResult(String result) throws IOException {
synchronized (writer) {
writer.write(result);
writer.newLine();
writer.flush();
}
}
}
private static final Random rnd = new Random();
// Simple logger helper
private static synchronized void log(String msg) {
System.out.printf("%s [%s] %s%n", LocalDateTime.now(), Thread.currentThread().getName(), msg);
}
// Worker runnable
static class Worker implements Runnable {
private final String name;
private final TaskQueue queue;
private final List<String> results; // thread-safe list wrapper will be used
private final ResultWriter writer;
Worker(String name, TaskQueue queue, List<String> results, ResultWriter writer) {
this.name = name; this.queue = queue; this.results = results; this.writer = writer;
}
@Override
public void run() {
Thread.currentThread().setName(name);
log("started");
try {
while (true) {
Task t;
try {
t = queue.getTask();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log("interrupted while waiting for task");
break;
}
if (t == Task.POISON) {
log("received poison pill; exiting");
break;
}
try {
processTask(t);
} catch (Exception e) {
log("exception processing task " + t.id + ": " + e.getMessage());
}
}
} finally {
log("exiting");
}
}
private void processTask(Task t) {
log("processing task " + t.id);
// simulate computational work
try { Thread.sleep(100 + rnd.nextInt(400)); }
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log("interrupted during processing");
return;
}
String result = String.format("Task %d processed by %s (payload=%s)", t.id, name, t.payload);
// save to shared list
results.add(result);
// write to shared file safely
try {
writer.writeResult(result);
} catch (IOException ioe) {
log("IOException while writing result for task " + t.id + ": " + ioe.getMessage());
}
log("completed task " + t.id);
}
}
public static void main(String[] args) {
int numWorkers = 4;
int numTasks = 20;
if (args.length >= 1) {
try { numWorkers = Integer.parseInt(args[0]); } catch (NumberFormatException ignored) {}
}
if (args.length >= 2) {
try { numTasks = Integer.parseInt(args[1]); } catch (NumberFormatException ignored) {}
}
log("Starting RideSharingSystem with workers=" + numWorkers + ", tasks=" + numTasks);
TaskQueue queue = new TaskQueue();
List<String> results = Collections.synchronizedList(new ArrayList<>());
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter("results_java.txt", false));
ResultWriter resultWriter = new ResultWriter(bw);
ExecutorService pool = Executors.newFixedThreadPool(numWorkers);
// start workers
for (int i = 0; i < numWorkers; i++) {
pool.submit(new Worker("Worker-" + (i + 1), queue, results, resultWriter));
}
// produce tasks
for (int i = 1; i <= numTasks; i++) {
try { queue.addTask(new Task(i, "ride-" + i)); }
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log("Interrupted while adding task " + i);
}
}
// add poison pills to stop workers
for (int i = 0; i < numWorkers; i++) {
try { queue.addTask(Task.POISON); }
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log("Interrupted while adding poison pill");
}
}
// shutdown and wait for workers
pool.shutdown();
boolean finished = pool.awaitTermination(2, TimeUnit.MINUTES);
if (!finished) log("Timeout waiting for workers to finish; some workers may still be running.");
else log("All workers finished.");
log("Total results collected in-memory: " + results.size());
} catch (IOException ioe) {
log("IOException during setup: " + ioe.getMessage());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log("Main thread interrupted while waiting.");
} finally {
if (bw != null) {
try { bw.close(); } catch (IOException e) { log("Error closing writer: " + e.getMessage()); }
}
}
log("RideSharingSystem terminated.");
}
}