Skip to content

Commit 63abfdb

Browse files
author
Stuart Hart
committed
Revert "fix: properly close executor services and HTTP clients (closes #2)"
This reverts commit a34f94e.
1 parent a34f94e commit 63abfdb

9 files changed

Lines changed: 24 additions & 86 deletions

File tree

commit.bat

Lines changed: 0 additions & 6 deletions
This file was deleted.

geotrack-processing/src/main/java/com/geotrack/processing/consumer/PositionEventConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void consume(String payload) {
116116
} catch (Exception e) {
117117
Log.errorf(e, "Failed to process position event: %s", payload);
118118
// SmallRye DLQ strategy will route this to position.dlq
119-
throw new PositionProcessingException("Position processing failed", e);
119+
throw new RuntimeException("Position processing failed", e);
120120
}
121121
}
122122

geotrack-processing/src/main/java/com/geotrack/processing/consumer/PositionProcessingException.java

Lines changed: 0 additions & 8 deletions
This file was deleted.

geotrack-simulator/src/main/java/com/geotrack/simulator/SimulatorCommand.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import com.geotrack.simulator.route.RouteReplayer;
88
import io.quarkus.logging.Log;
99
import jakarta.enterprise.context.Dependent;
10-
import org.slf4j.Logger;
11-
import org.slf4j.LoggerFactory;
1210
import jakarta.inject.Inject;
1311
import picocli.CommandLine.Command;
1412
import picocli.CommandLine.Option;
@@ -27,8 +25,6 @@
2725
description = "GeoTrack fleet simulation and GPX route replay")
2826
public class SimulatorCommand implements Runnable {
2927

30-
private static final Logger log = LoggerFactory.getLogger(SimulatorCommand.class);
31-
3228
@Option(names = {"--fleet"}, description = "Run Newcastle demo fleet (4 vehicles)")
3329
boolean fleet;
3430

@@ -78,7 +74,7 @@ private void runFleet() {
7874
var simulator = new FleetSimulator(speedMultiplier).withNewcastleFleet();
7975
var futures = simulator.simulate(position -> {
8076
if (dryRun) {
81-
log.info("[{}] {}, {} | speed={} km/h heading={}°",
77+
System.out.printf("[%s] %.6f, %.6f | speed=%.1f km/h heading=%.0f°%n",
8278
position.assetId(), position.latitude(), position.longitude(),
8379
position.speed(), position.heading());
8480
} else {
@@ -109,7 +105,7 @@ private void runGpxReplay() {
109105

110106
new RouteReplayer(route, assetId, speedMultiplier).replay(position -> {
111107
if (dryRun) {
112-
log.info("[{}] {}, {} | speed={} km/h",
108+
System.out.printf("[%s] %.6f, %.6f | speed=%.1f km/h%n",
113109
position.assetId(), position.latitude(), position.longitude(), position.speed());
114110
} else {
115111
kafkaProducer.send(position);

geotrack-simulator/src/main/java/com/geotrack/simulator/command/LiveIngestCommand.java

Lines changed: 18 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -53,41 +53,27 @@ public void run() {
5353
return t;
5454
});
5555

56-
try {
57-
// Graceful shutdown on SIGINT
58-
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
59-
Log.info("🛑 Shutting down OpenSky ingestor...");
60-
scheduler.shutdown();
61-
shutdownLatch.countDown();
62-
}));
63-
64-
scheduler.scheduleAtFixedRate(() -> {
65-
try {
66-
ingestor.poll();
67-
} catch (Exception e) {
68-
Log.errorf(e, "Error during OpenSky poll cycle");
69-
}
70-
}, 0, intervalSeconds, TimeUnit.SECONDS);
56+
// Graceful shutdown on SIGINT
57+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
58+
Log.info("🛑 Shutting down OpenSky ingestor...");
59+
scheduler.shutdown();
60+
shutdownLatch.countDown();
61+
}));
7162

72-
// Block until shutdown signal
63+
scheduler.scheduleAtFixedRate(() -> {
7364
try {
74-
shutdownLatch.await();
75-
scheduler.awaitTermination(10, TimeUnit.SECONDS);
76-
} catch (InterruptedException e) {
77-
Thread.currentThread().interrupt();
78-
}
79-
} finally {
80-
if (!scheduler.isShutdown()) {
81-
scheduler.shutdown();
82-
}
83-
try {
84-
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
85-
scheduler.shutdownNow();
86-
}
87-
} catch (InterruptedException e) {
88-
scheduler.shutdownNow();
89-
Thread.currentThread().interrupt();
65+
ingestor.poll();
66+
} catch (Exception e) {
67+
Log.errorf(e, "Error during OpenSky poll cycle");
9068
}
69+
}, 0, intervalSeconds, TimeUnit.SECONDS);
70+
71+
// Block until shutdown signal
72+
try {
73+
shutdownLatch.await();
74+
scheduler.awaitTermination(5, TimeUnit.SECONDS);
75+
} catch (InterruptedException e) {
76+
Thread.currentThread().interrupt();
9177
}
9278

9379
Log.info("✈ OpenSky ingestor stopped");

geotrack-simulator/src/main/java/com/geotrack/simulator/fleet/FleetSimulator.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.concurrent.ExecutorService;
1212
import java.util.concurrent.Executors;
1313
import java.util.concurrent.Future;
14-
import java.util.concurrent.TimeUnit;
1514
import java.util.function.Consumer;
1615

1716
/**
@@ -73,14 +72,6 @@ public List<Future<?>> simulate(Consumer<Position> positionConsumer) {
7372
.toList());
7473

7574
executor.shutdown();
76-
try {
77-
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
78-
executor.shutdownNow();
79-
}
80-
} catch (InterruptedException e) {
81-
executor.shutdownNow();
82-
Thread.currentThread().interrupt();
83-
}
8475
return futures;
8576
}
8677
}

geotrack-simulator/src/main/java/com/geotrack/simulator/live/AISStreamIngestor.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public class AISStreamIngestor {
4848
private final AtomicInteger totalReceived = new AtomicInteger();
4949
private final AtomicBoolean running = new AtomicBoolean(true);
5050
private final ScheduledExecutorService scheduler;
51-
private HttpClient httpClient;
5251

5352
public AISStreamIngestor(PositionProducer producer, String apiKey) {
5453
this.producer = producer;
@@ -75,18 +74,6 @@ public void start() {
7574
public void stop() {
7675
running.set(false);
7776
scheduler.shutdown();
78-
try {
79-
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
80-
scheduler.shutdownNow();
81-
}
82-
} catch (InterruptedException e) {
83-
scheduler.shutdownNow();
84-
Thread.currentThread().interrupt();
85-
}
86-
if (httpClient != null) {
87-
httpClient.close();
88-
httpClient = null;
89-
}
9077
}
9178

9279
private void connect() {
@@ -98,8 +85,8 @@ private void connect() {
9885
{"Apikey":"%s","BoundingBoxes":[[[49.0,-12.0],[61.0,3.0]]],"FilterMessageTypes":["PositionReport"]}""",
9986
apiKey);
10087

101-
this.httpClient = HttpClient.newHttpClient();
102-
httpClient.newWebSocketBuilder()
88+
HttpClient.newHttpClient()
89+
.newWebSocketBuilder()
10390
.buildAsync(URI.create(WS_URL), new WebSocket.Listener() {
10491

10592
private final StringBuilder buffer = new StringBuilder();

geotrack-simulator/src/main/java/com/geotrack/simulator/route/GpxParseException.java

Lines changed: 0 additions & 8 deletions
This file was deleted.

geotrack-simulator/src/main/java/com/geotrack/simulator/route/GpxParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public List<RoutePoint> parse(InputStream input) {
4545

4646
return points;
4747
} catch (Exception e) {
48-
throw new GpxParseException("Failed to parse GPX: " + e.getMessage(), e);
48+
throw new RuntimeException("Failed to parse GPX: " + e.getMessage(), e);
4949
}
5050
}
5151

0 commit comments

Comments
 (0)