Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,9 @@
<version>3.2.0</version>
<configuration>
<java>
<googleJavaFormat>
<version>1.33.0</version>
<style>AOSP</style>
</googleJavaFormat>
<palantirJavaFormat>
<version>2.85.0</version>
</palantirJavaFormat>
<removeUnusedImports />
<formatAnnotations />
</java>
Expand Down
188 changes: 78 additions & 110 deletions src/main/java/io/apitally/common/ApitallyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,16 @@ public enum HubRequestStatus {
private static final int INITIAL_PERIOD_SECONDS = 3600;
private static final int MAX_QUEUE_TIME_SECONDS = 3600;
private static final int REQUEST_TIMEOUT_SECONDS = 10;
private static final String HUB_BASE_URL =
Optional.ofNullable(System.getenv("APITALLY_HUB_BASE_URL"))
.filter(s -> !s.trim().isEmpty())
.orElse("https://hub.apitally.io");
private static final String HUB_BASE_URL = Optional.ofNullable(System.getenv("APITALLY_HUB_BASE_URL"))
.filter(s -> !s.trim().isEmpty())
.orElse("https://hub.apitally.io");

private static final Logger logger = LoggerFactory.getLogger(ApitallyClient.class);
private static final RetryTemplate retryTemplate =
RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(Duration.ofSeconds(1), 2, Duration.ofSeconds(4), true)
.retryOn(RetryableHubRequestException.class)
.build();
private static final RetryTemplate retryTemplate = RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(Duration.ofSeconds(1), 2, Duration.ofSeconds(4), true)
.retryOn(RetryableHubRequestException.class)
.build();

private final String clientId;
private final String env;
Expand Down Expand Up @@ -87,9 +85,7 @@ public ApitallyClient(String clientId, String env, RequestLoggingConfig requestL
this.requestCounter = new RequestCounter();
this.requestLogger = new RequestLogger(requestLoggingConfig);
this.spanCollector =
new SpanCollector(
requestLoggingConfig.isEnabled()
&& requestLoggingConfig.isTracingEnabled());
new SpanCollector(requestLoggingConfig.isEnabled() && requestLoggingConfig.isTracingEnabled());
this.validationErrorCounter = new ValidationErrorCounter();
this.serverErrorCounter = new ServerErrorCounter();
this.consumerRegistry = new ConsumerRegistry();
Expand Down Expand Up @@ -128,36 +124,32 @@ private void sendStartupData() {
if (startupData == null) {
return;
}
HttpRequest request =
HttpRequest.newBuilder()
.uri(getHubUrl("startup"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(startupData.toJSON()))
.build();
sendHubRequest(request)
.thenAccept(
status -> {
if (status == HubRequestStatus.OK) {
startupDataSent = true;
startupData = null;
} else if (status == HubRequestStatus.VALIDATION_ERROR) {
startupDataSent = false;
startupData = null;
} else {
startupDataSent = false;
}
});
HttpRequest request = HttpRequest.newBuilder()
.uri(getHubUrl("startup"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(startupData.toJSON()))
.build();
sendHubRequest(request).thenAccept(status -> {
if (status == HubRequestStatus.OK) {
startupDataSent = true;
startupData = null;
} else if (status == HubRequestStatus.VALIDATION_ERROR) {
startupDataSent = false;
startupData = null;
} else {
startupDataSent = false;
}
});
}

private void sendSyncData() {
SyncData data =
new SyncData(
instanceLock.getInstanceUuid(),
requestCounter.getAndResetRequests(),
validationErrorCounter.getAndResetValidationErrors(),
serverErrorCounter.getAndResetServerErrors(),
consumerRegistry.getAndResetConsumers(),
resourceMonitor.getCpuMemoryUsage());
SyncData data = new SyncData(
instanceLock.getInstanceUuid(),
requestCounter.getAndResetRequests(),
validationErrorCounter.getAndResetValidationErrors(),
serverErrorCounter.getAndResetServerErrors(),
consumerRegistry.getAndResetConsumers(),
resourceMonitor.getCpuMemoryUsage());
syncDataQueue.offer(data);

int i = 0;
Expand All @@ -170,12 +162,11 @@ private void sendSyncData() {
// Add random delay between retries
Thread.sleep(100 + random.nextInt(400));
}
HttpRequest request =
HttpRequest.newBuilder()
.uri(getHubUrl("sync"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(payload.toJSON()))
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(getHubUrl("sync"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(payload.toJSON()))
.build();
HubRequestStatus status = sendHubRequest(request).join();
if (status == HubRequestStatus.RETRYABLE_ERROR) {
syncDataQueue.offer(payload);
Expand Down Expand Up @@ -204,12 +195,11 @@ private void sendLogData() {
}
}
try (InputStream inputStream = logFile.getInputStream()) {
HttpRequest request =
HttpRequest.newBuilder()
.uri(getHubUrl("log", "uuid=" + logFile.getUuid().toString()))
.header("Content-Type", "application/octet-stream")
.POST(HttpRequest.BodyPublishers.ofInputStream(() -> inputStream))
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(getHubUrl("log", "uuid=" + logFile.getUuid().toString()))
.header("Content-Type", "application/octet-stream")
.POST(HttpRequest.BodyPublishers.ofInputStream(() -> inputStream))
.build();
HubRequestStatus status = sendHubRequest(request).join();
if (status == HubRequestStatus.PAYMENT_REQUIRED) {
requestLogger.clear();
Expand All @@ -231,84 +221,62 @@ private void sendLogData() {
}

public CompletableFuture<HubRequestStatus> sendHubRequest(HttpRequest request) {
return CompletableFuture.supplyAsync(
() -> {
return CompletableFuture.supplyAsync(() -> {
try {
return retryTemplate.execute(context -> {
try {
return retryTemplate.execute(
context -> {
try {
logger.debug(
"Sending request to Apitally hub: {}",
request.uri());
HttpResponse<String> response =
httpClient.send(
request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() >= 200
&& response.statusCode() < 300) {
return HubRequestStatus.OK;
} else if (response.statusCode() == 402) {
return HubRequestStatus.PAYMENT_REQUIRED;
} else if (response.statusCode() == 404) {
enabled = false;
stopSync();
requestLogger.close();
logger.error(
"Invalid Apitally client ID: {}", clientId);
return HubRequestStatus.INVALID_CLIENT_ID;
} else if (response.statusCode() == 422) {
logger.error(
"Received validation error from Apitally hub: {}",
response.body());
return HubRequestStatus.VALIDATION_ERROR;
} else {
throw new RetryableHubRequestException(
"Hub request failed with status code "
+ response.statusCode());
}
} catch (Exception e) {
throw new RetryableHubRequestException(
"Hub request failed with exception: "
+ e.getMessage());
}
});
logger.debug("Sending request to Apitally hub: {}", request.uri());
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() >= 200 && response.statusCode() < 300) {
return HubRequestStatus.OK;
} else if (response.statusCode() == 402) {
return HubRequestStatus.PAYMENT_REQUIRED;
} else if (response.statusCode() == 404) {
enabled = false;
stopSync();
requestLogger.close();
logger.error("Invalid Apitally client ID: {}", clientId);
return HubRequestStatus.INVALID_CLIENT_ID;
} else if (response.statusCode() == 422) {
logger.error("Received validation error from Apitally hub: {}", response.body());
return HubRequestStatus.VALIDATION_ERROR;
} else {
throw new RetryableHubRequestException(
"Hub request failed with status code " + response.statusCode());
}
} catch (Exception e) {
logger.error("Error sending request to Apitally hub", e);
return HubRequestStatus.RETRYABLE_ERROR;
throw new RetryableHubRequestException("Hub request failed with exception: " + e.getMessage());
}
});
} catch (Exception e) {
logger.error("Error sending request to Apitally hub", e);
return HubRequestStatus.RETRYABLE_ERROR;
}
});
}

public void startSync() {
if (scheduler == null) {
scheduler =
Executors.newSingleThreadScheduledExecutor(
r -> {
Thread thread = new Thread(r, "apitally-sync");
thread.setDaemon(true);
return thread;
});
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "apitally-sync");
thread.setDaemon(true);
return thread;
});
}

if (syncTask != null) {
syncTask.cancel(false);
}

// Start with shorter initial sync interval
syncTask =
scheduler.scheduleAtFixedRate(
this::sync, 0, INITIAL_SYNC_INTERVAL_SECONDS, TimeUnit.SECONDS);
syncTask = scheduler.scheduleAtFixedRate(this::sync, 0, INITIAL_SYNC_INTERVAL_SECONDS, TimeUnit.SECONDS);

// Schedule a one-time task to switch to regular sync interval
scheduler.schedule(
() -> {
syncTask.cancel(false);
syncTask =
scheduler.scheduleAtFixedRate(
this::sync,
SYNC_INTERVAL_SECONDS,
SYNC_INTERVAL_SECONDS,
TimeUnit.SECONDS);
syncTask = scheduler.scheduleAtFixedRate(
this::sync, SYNC_INTERVAL_SECONDS, SYNC_INTERVAL_SECONDS, TimeUnit.SECONDS);
},
INITIAL_PERIOD_SECONDS,
TimeUnit.SECONDS);
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/apitally/common/ConsumerRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ public void addOrUpdateConsumer(Consumer consumer) {
}

if (hasChanges) {
consumers.put(
consumer.getIdentifier(),
new Consumer(consumer.getIdentifier(), newName, newGroup));
consumers.put(consumer.getIdentifier(), new Consumer(consumer.getIdentifier(), newName, newGroup));
updated.add(consumer.getIdentifier());
}
}
Expand Down
17 changes: 6 additions & 11 deletions src/main/java/io/apitally/common/InstanceLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,8 @@ static InstanceLock create(String clientId, String env, Path lockDir) {
Path lockPath = lockDir.resolve("instance_" + appEnvHash + "_" + slot + ".lock");
FileChannel channel = null;
try {
channel =
FileChannel.open(
lockPath,
StandardOpenOption.CREATE,
StandardOpenOption.READ,
StandardOpenOption.WRITE);
channel = FileChannel.open(
lockPath, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);

FileLock lock = channel.tryLock();
if (lock == null) {
Expand All @@ -69,9 +65,9 @@ static InstanceLock create(String clientId, String env, Path lockDir) {
}

FileTime lastModified = Files.getLastModifiedTime(lockPath);
boolean tooOld =
Duration.between(lastModified.toInstant(), Instant.now()).getSeconds()
> MAX_LOCK_AGE_SECONDS;
boolean tooOld = Duration.between(lastModified.toInstant(), Instant.now())
.getSeconds()
> MAX_LOCK_AGE_SECONDS;

String existingUuid = readChannel(channel);
UUID uuid = parseUuid(existingUuid);
Expand Down Expand Up @@ -120,8 +116,7 @@ private static UUID parseUuid(String s) {
}
}

private static String getAppEnvHash(String clientId, String env)
throws NoSuchAlgorithmException {
private static String getAppEnvHash(String clientId, String env) throws NoSuchAlgorithmException {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest((clientId + ":" + env).getBytes(StandardCharsets.UTF_8));
return HexFormat.of().formatHex(hash, 0, 4);
Expand Down
35 changes: 15 additions & 20 deletions src/main/java/io/apitally/common/RequestCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ public void addRequest(
long responseTime,
long requestSize,
long responseSize) {
String key =
String.join("|", consumer, method.toUpperCase(), path, String.valueOf(statusCode));
String key = String.join("|", consumer, method.toUpperCase(), path, String.valueOf(statusCode));

// Increment request count
requestCounts.merge(key, 1, Integer::sum);
Expand Down Expand Up @@ -73,25 +72,21 @@ public List<Requests> getAndResetRequests() {
String path = parts[2];
int statusCode = Integer.parseInt(parts[3]);

Map<Integer, Integer> responseTimeMap =
responseTimes.getOrDefault(key, new ConcurrentHashMap<>());
Map<Integer, Integer> requestSizeMap =
requestSizes.getOrDefault(key, new ConcurrentHashMap<>());
Map<Integer, Integer> responseSizeMap =
responseSizes.getOrDefault(key, new ConcurrentHashMap<>());
Map<Integer, Integer> responseTimeMap = responseTimes.getOrDefault(key, new ConcurrentHashMap<>());
Map<Integer, Integer> requestSizeMap = requestSizes.getOrDefault(key, new ConcurrentHashMap<>());
Map<Integer, Integer> responseSizeMap = responseSizes.getOrDefault(key, new ConcurrentHashMap<>());

Requests item =
new Requests(
consumer,
method,
path,
statusCode,
entry.getValue(),
requestSizeSums.getOrDefault(key, 0L),
responseSizeSums.getOrDefault(key, 0L),
responseTimeMap,
requestSizeMap,
responseSizeMap);
Requests item = new Requests(
consumer,
method,
path,
statusCode,
entry.getValue(),
requestSizeSums.getOrDefault(key, 0L),
responseSizeSums.getOrDefault(key, 0L),
responseTimeMap,
requestSizeMap,
responseSizeMap);
data.add(item);
}

Expand Down
Loading