Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions data-service/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion data-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
"dependencies": {
"express": "^4.18.2",
"cors": "^2.8.5",
"uuid": "^9.0.1"
"uuid": "^9.0.1",
"proper-lockfile": "^4.1.2"
},
"devDependencies": {
"@types/express": "^4.17.21",
"@types/cors": "^2.8.19",
"@types/node": "^20.8.0",
"@types/uuid": "^9.0.7",
"@types/proper-lockfile": "^4.1.4",
"typescript": "^5.2.2",
"ts-node": "^10.9.1"
}
Expand Down
29 changes: 23 additions & 6 deletions data-service/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import express, { Request, Response, NextFunction } from 'express';
import cors from 'cors';
import fs from 'fs/promises';
import path from 'path';
import { lock } from 'proper-lockfile';

interface InventoryItem {
productId: string;
Expand Down Expand Up @@ -66,9 +67,17 @@ class DataService {
}

private async saveOrders(orders: OrderDatabase): Promise<void> {
const tempData = JSON.stringify(orders, null, 2);
await fs.writeFile(this.ordersPath, tempData);
this.cachedOrders = orders;
let release: (() => Promise<void>) | null = null;
try {
release = await lock(this.ordersPath, { retries: 5 });
const tempData = JSON.stringify(orders, null, 2);
await fs.writeFile(this.ordersPath, tempData);
this.cachedOrders = orders;
} finally {
if (release) {
await release();
}
}
}

private async loadInventory(): Promise<InventoryDatabase> {
Expand All @@ -83,9 +92,17 @@ class DataService {
}

private async saveInventory(inventory: InventoryDatabase): Promise<void> {
const tempData = JSON.stringify(inventory, null, 2);
await fs.writeFile(this.inventoryPath, tempData);
this.cachedInventory = inventory;
let release: (() => Promise<void>) | null = null;
try {
release = await lock(this.inventoryPath, { retries: 5 });
const tempData = JSON.stringify(inventory, null, 2);
await fs.writeFile(this.inventoryPath, tempData);
this.cachedInventory = inventory;
} finally {
if (release) {
await release();
}
}
}

private getDefaultInventory(): InventoryDatabase {
Expand Down
2 changes: 2 additions & 0 deletions inventory-service/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Core dependencies for Inventory Service
pika==1.3.2
retry==0.9.2

requests==2.31.0

# Testing dependencies
Expand Down
97 changes: 54 additions & 43 deletions inventory-service/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import requests
from datetime import datetime
from typing import Dict, List, Optional
from retry import retry

import pika # type: ignore
from pika.adapters.asyncio_connection import AsyncioConnection # type: ignore
Expand Down Expand Up @@ -112,6 +113,20 @@ def get_all_inventory(self) -> List[Dict]:
logger.error(f"Error fetching all inventory: {e}")
return []

def update_inventory(self, product_name: str, quantity: int, reserved: int) -> bool:
"""Update inventory for a product"""
try:
response = self.session.put(
f"{self.base_url}/inventory/{product_name}",
json={'quantity': quantity, 'reservedQuantity': reserved},
timeout=self.timeout
)
response.raise_for_status()
return response.json()['success']
except requests.exceptions.RequestException as e:
logger.error(f"Error updating inventory for {product_name}: {e}")
return False

async def wait_for_data_service(base_url: str = "http://localhost:3002", max_attempts: int = 10, interval_seconds: int = 10):
"""Wait for data service to become available"""
print("🔍 Checking data service connection...")
Expand Down Expand Up @@ -141,6 +156,7 @@ def __init__(self, rabbitmq_url: str = 'amqp://admin:admin123@localhost:5672'):
self.connection = None
self.channel = None
self.data_client = DataServiceClient()
self._inventory_lock = asyncio.Lock()

self.exchange_name = 'orders.exchange'
self.order_created_queue = 'order.created'
Expand Down Expand Up @@ -240,7 +256,7 @@ def _on_order_status_queue_declare(self, method):

logger.info("👂 Ready to process inventory orders")

def _on_order_created_message(self, channel, method, properties, body) -> None:
async def _on_order_created_message(self, channel, method, properties, body) -> None:
"""Handle order created message"""
try:
message = json.loads(body.decode('utf-8'))
Expand All @@ -250,7 +266,7 @@ def _on_order_created_message(self, channel, method, properties, body) -> None:

logger.info(f"📥 Processing order {order_id}: {quantity}x {product_name}")

success, status_message = self._process_order(order_id, product_name, quantity)
success, status_message = await self._process_order_with_retry(order_id, product_name, quantity)

status = 'fulfilled' if success else 'failed'
self._send_order_status_update(order_id, status, status_message)
Expand All @@ -261,50 +277,45 @@ def _on_order_created_message(self, channel, method, properties, body) -> None:
logger.error(f"❌ Error processing order created message: {e}")
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

@retry(tries=3, delay=1, logger=logger)
async def _process_order_with_retry(self, order_id: str, product_name: str, quantity: int) -> tuple[bool, str]:
"""Process order with retry mechanism for handling transient failures"""
async with self._inventory_lock:
return self._process_order(order_id, product_name, quantity)


def _process_order(self, order_id: str, product_name: str, quantity: int) -> tuple[bool, str]:
"""Process order fulfillment"""
try:
item = self.data_client.get_inventory_item(product_name)
if not item:
message = f"Product '{product_name}' not found in inventory"
logger.warning(f"⚠️ Order {order_id}: {message}")
return False, message

available_check = self.data_client.check_availability(product_name, quantity)
if not available_check:
item = self.data_client.get_inventory_item(product_name)
if item:
available = item['quantity'] - item['reservedQuantity']
message = f"Insufficient stock for {product_name}. Available: {available}, Requested: {quantity}"
else:
message = f"Could not determine availability for {product_name}"
logger.warning(f"⚠️ Order {order_id}: {message}")
return False, message

logger.info(f"🔄 Order {order_id}: Availability confirmed for {quantity}x {product_name}")

if not self.data_client.reserve_stock(product_name, quantity):
item = self.data_client.get_inventory_item(product_name)
if item:
available = item['quantity'] - item['reservedQuantity']
logger.warning(f"⚠️ Order {order_id}: Stock changed during processing - Available: {available}")
message = f"Failed to reserve stock for {product_name}"
logger.error(f"❌ Order {order_id}: {message}")
return False, message

if not self.data_client.fulfill_order(product_name, quantity):
self.data_client.cancel_reservation(product_name, quantity)
message = f"Failed to fulfill order for {product_name}"
logger.error(f"❌ Order {order_id}: {message}")
return False, message

message = f"Successfully fulfilled {quantity}x {product_name}"
logger.info(f"✅ Order {order_id}: {message}")
return True, message
item = self.data_client.get_inventory_item(product_name)
if not item:
message = f"Product '{product_name}' not found in inventory"
logger.warning(f"⚠️ Order {order_id}: {message}")
return False, message

available = item['quantity'] - item['reservedQuantity']
if available < quantity:
message = f"Insufficient stock for {product_name}. Available: {available}, Requested: {quantity}"
logger.warning(f"⚠️ Order {order_id}: {message}")
return False, message

logger.info(f"🔄 Order {order_id}: Availability confirmed for {quantity}x {product_name}")

new_reserved_quantity = item['reservedQuantity'] + quantity
if not self.data_client.update_inventory(product_name, item['quantity'], new_reserved_quantity):
message = f"Failed to reserve stock for {product_name}"
logger.error(f"❌ Order {order_id}: {message}")
return False, message

if not self.data_client.fulfill_order(product_name, quantity):
self.data_client.cancel_reservation(product_name, quantity)
message = f"Failed to fulfill order for {product_name}"
logger.error(f"❌ Order {order_id}: {message}")
return False, message

message = f"Successfully fulfilled {quantity}x {product_name}"
logger.info(f"✅ Order {order_id}: {message}")
return True, message

except Exception as e:
logger.error(f"❌ Error processing order {order_id}: {e}")
return False, f"Internal error processing order: {str(e)}"

def _send_order_status_update(self, order_id: str, status: str, message: str) -> None:
"""Send order status update via RabbitMQ"""
Expand Down
Loading