From bc2137722a23e73a8f63a75e3c2f9c937e7bab13 Mon Sep 17 00:00:00 2001 From: fishmeister09 Date: Thu, 25 Sep 2025 14:26:28 +0530 Subject: [PATCH 1/2] initial changes made by gemini --- data-service/package-lock.json | 51 ++++++++++++++++ data-service/package.json | 4 +- data-service/src/server.ts | 29 +++++++-- inventory-service/requirements.txt | 2 + inventory-service/src/main.py | 97 +++++++++++++++++------------- order-service/src/api.ts | 95 ++++++++++------------------- 6 files changed, 164 insertions(+), 114 deletions(-) diff --git a/data-service/package-lock.json b/data-service/package-lock.json index b91a604..9f56b4e 100644 --- a/data-service/package-lock.json +++ b/data-service/package-lock.json @@ -11,12 +11,14 @@ "dependencies": { "cors": "^2.8.5", "express": "^4.18.2", + "proper-lockfile": "^4.1.2", "uuid": "^9.0.1" }, "devDependencies": { "@types/cors": "^2.8.19", "@types/express": "^4.17.21", "@types/node": "^20.8.0", + "@types/proper-lockfile": "^4.1.4", "@types/uuid": "^9.0.7", "ts-node": "^10.9.1", "typescript": "^5.2.2" @@ -172,6 +174,16 @@ "undici-types": "~6.21.0" } }, + "node_modules/@types/proper-lockfile": { + "version": "4.1.4", + "resolved": "https://registry.npmjs.org/@types/proper-lockfile/-/proper-lockfile-4.1.4.tgz", + "integrity": "sha512-uo2ABllncSqg9F1D4nugVl9v93RmjxF6LJzQLMLDdPaXCUIDPeOJ21Gbqi43xNKzBi/WQ0Q0dICqufzQbMjipQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/retry": "*" + } + }, "node_modules/@types/qs": { "version": "6.14.0", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.14.0.tgz", @@ -186,6 +198,13 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/retry": { + "version": "0.12.5", + "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.5.tgz", + "integrity": "sha512-3xSjTp3v03X/lSQLkczaN9UIEwJMoMCA1+Nb5HfbJEQWogdeQIyVtTvxPXDQjZ5zws8rFQfVfRdz03ARihPJgw==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/send": { "version": "0.17.5", "resolved": "https://registry.npmjs.org/@types/send/-/send-0.17.5.tgz", @@ -638,6 +657,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/graceful-fs": { + "version": "4.2.11", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz", + "integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==", + "license": "ISC" + }, "node_modules/has-symbols": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/has-symbols/-/has-symbols-1.1.0.tgz", @@ -844,6 +869,17 @@ "integrity": "sha512-RA1GjUVMnvYFxuqovrEqZoxxW5NUZqbwKtYz/Tt7nXerk0LbLblQmrsgdeOxV5SFHf0UDggjS/bSeOZwt1pmEQ==", "license": "MIT" }, + "node_modules/proper-lockfile": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/proper-lockfile/-/proper-lockfile-4.1.2.tgz", + "integrity": "sha512-TjNPblN4BwAWMXU8s9AEz4JmQxnD1NNL7bNOY/AKUzyamc379FWASUhc/K1pL2noVb+XmZKLL68cjzLsiOAMaA==", + "license": "MIT", + "dependencies": { + "graceful-fs": "^4.2.4", + "retry": "^0.12.0", + "signal-exit": "^3.0.2" + } + }, "node_modules/proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", @@ -896,6 +932,15 @@ "node": ">= 0.8" } }, + "node_modules/retry": { + "version": "0.12.0", + "resolved": "https://registry.npmjs.org/retry/-/retry-0.12.0.tgz", + "integrity": "sha512-9LkiTwjUh6rT555DtE9rTX+BKByPfrMzEAtnlEtdEwr3Nkffwiihqe2bWADg+OQRjt9gl6ICdmB/ZFDCGAtSow==", + "license": "MIT", + "engines": { + "node": ">= 4" + } + }, "node_modules/safe-buffer": { "version": "5.2.1", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", @@ -1054,6 +1099,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/signal-exit": { + "version": "3.0.7", + "resolved": "https://registry.npmjs.org/signal-exit/-/signal-exit-3.0.7.tgz", + "integrity": "sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ==", + "license": "ISC" + }, "node_modules/statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", diff --git a/data-service/package.json b/data-service/package.json index bbe4e0b..09f108e 100644 --- a/data-service/package.json +++ b/data-service/package.json @@ -20,13 +20,15 @@ "dependencies": { "express": "^4.18.2", "cors": "^2.8.5", - "uuid": "^9.0.1" + "uuid": "^9.0.1", + "proper-lockfile": "^4.1.2" }, "devDependencies": { "@types/express": "^4.17.21", "@types/cors": "^2.8.19", "@types/node": "^20.8.0", "@types/uuid": "^9.0.7", + "@types/proper-lockfile": "^4.1.4", "typescript": "^5.2.2", "ts-node": "^10.9.1" } diff --git a/data-service/src/server.ts b/data-service/src/server.ts index 274561e..d5b6096 100644 --- a/data-service/src/server.ts +++ b/data-service/src/server.ts @@ -2,6 +2,7 @@ import express, { Request, Response, NextFunction } from 'express'; import cors from 'cors'; import fs from 'fs/promises'; import path from 'path'; +import { lock } from 'proper-lockfile'; interface InventoryItem { productId: string; @@ -66,9 +67,17 @@ class DataService { } private async saveOrders(orders: OrderDatabase): Promise { - const tempData = JSON.stringify(orders, null, 2); - await fs.writeFile(this.ordersPath, tempData); - this.cachedOrders = orders; + let release: (() => Promise) | null = null; + try { + release = await lock(this.ordersPath, { retries: 5 }); + const tempData = JSON.stringify(orders, null, 2); + await fs.writeFile(this.ordersPath, tempData); + this.cachedOrders = orders; + } finally { + if (release) { + await release(); + } + } } private async loadInventory(): Promise { @@ -83,9 +92,17 @@ class DataService { } private async saveInventory(inventory: InventoryDatabase): Promise { - const tempData = JSON.stringify(inventory, null, 2); - await fs.writeFile(this.inventoryPath, tempData); - this.cachedInventory = inventory; + let release: (() => Promise) | null = null; + try { + release = await lock(this.inventoryPath, { retries: 5 }); + const tempData = JSON.stringify(inventory, null, 2); + await fs.writeFile(this.inventoryPath, tempData); + this.cachedInventory = inventory; + } finally { + if (release) { + await release(); + } + } } private getDefaultInventory(): InventoryDatabase { diff --git a/inventory-service/requirements.txt b/inventory-service/requirements.txt index 2c66dec..5b74434 100644 --- a/inventory-service/requirements.txt +++ b/inventory-service/requirements.txt @@ -1,5 +1,7 @@ # Core dependencies for Inventory Service pika==1.3.2 +retry==0.9.2 + requests==2.31.0 # Testing dependencies diff --git a/inventory-service/src/main.py b/inventory-service/src/main.py index e7712c0..5a2e989 100644 --- a/inventory-service/src/main.py +++ b/inventory-service/src/main.py @@ -9,6 +9,7 @@ import requests from datetime import datetime from typing import Dict, List, Optional +from retry import retry import pika # type: ignore from pika.adapters.asyncio_connection import AsyncioConnection # type: ignore @@ -112,6 +113,20 @@ def get_all_inventory(self) -> List[Dict]: logger.error(f"Error fetching all inventory: {e}") return [] + def update_inventory(self, product_name: str, quantity: int, reserved: int) -> bool: + """Update inventory for a product""" + try: + response = self.session.put( + f"{self.base_url}/inventory/{product_name}", + json={'quantity': quantity, 'reservedQuantity': reserved}, + timeout=self.timeout + ) + response.raise_for_status() + return response.json()['success'] + except requests.exceptions.RequestException as e: + logger.error(f"Error updating inventory for {product_name}: {e}") + return False + async def wait_for_data_service(base_url: str = "http://localhost:3002", max_attempts: int = 10, interval_seconds: int = 10): """Wait for data service to become available""" print("🔍 Checking data service connection...") @@ -141,6 +156,7 @@ def __init__(self, rabbitmq_url: str = 'amqp://admin:admin123@localhost:5672'): self.connection = None self.channel = None self.data_client = DataServiceClient() + self._inventory_lock = asyncio.Lock() self.exchange_name = 'orders.exchange' self.order_created_queue = 'order.created' @@ -240,7 +256,7 @@ def _on_order_status_queue_declare(self, method): logger.info("👂 Ready to process inventory orders") - def _on_order_created_message(self, channel, method, properties, body) -> None: + async def _on_order_created_message(self, channel, method, properties, body) -> None: """Handle order created message""" try: message = json.loads(body.decode('utf-8')) @@ -250,7 +266,7 @@ def _on_order_created_message(self, channel, method, properties, body) -> None: logger.info(f"📥 Processing order {order_id}: {quantity}x {product_name}") - success, status_message = self._process_order(order_id, product_name, quantity) + success, status_message = await self._process_order_with_retry(order_id, product_name, quantity) status = 'fulfilled' if success else 'failed' self._send_order_status_update(order_id, status, status_message) @@ -261,50 +277,45 @@ def _on_order_created_message(self, channel, method, properties, body) -> None: logger.error(f"❌ Error processing order created message: {e}") channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + @retry(tries=3, delay=1, logger=logger) + async def _process_order_with_retry(self, order_id: str, product_name: str, quantity: int) -> tuple[bool, str]: + """Process order with retry mechanism for handling transient failures""" + async with self._inventory_lock: + return self._process_order(order_id, product_name, quantity) + + def _process_order(self, order_id: str, product_name: str, quantity: int) -> tuple[bool, str]: """Process order fulfillment""" - try: - item = self.data_client.get_inventory_item(product_name) - if not item: - message = f"Product '{product_name}' not found in inventory" - logger.warning(f"⚠️ Order {order_id}: {message}") - return False, message - - available_check = self.data_client.check_availability(product_name, quantity) - if not available_check: - item = self.data_client.get_inventory_item(product_name) - if item: - available = item['quantity'] - item['reservedQuantity'] - message = f"Insufficient stock for {product_name}. Available: {available}, Requested: {quantity}" - else: - message = f"Could not determine availability for {product_name}" - logger.warning(f"⚠️ Order {order_id}: {message}") - return False, message - - logger.info(f"🔄 Order {order_id}: Availability confirmed for {quantity}x {product_name}") - - if not self.data_client.reserve_stock(product_name, quantity): - item = self.data_client.get_inventory_item(product_name) - if item: - available = item['quantity'] - item['reservedQuantity'] - logger.warning(f"⚠️ Order {order_id}: Stock changed during processing - Available: {available}") - message = f"Failed to reserve stock for {product_name}" - logger.error(f"❌ Order {order_id}: {message}") - return False, message - - if not self.data_client.fulfill_order(product_name, quantity): - self.data_client.cancel_reservation(product_name, quantity) - message = f"Failed to fulfill order for {product_name}" - logger.error(f"❌ Order {order_id}: {message}") - return False, message - - message = f"Successfully fulfilled {quantity}x {product_name}" - logger.info(f"✅ Order {order_id}: {message}") - return True, message + item = self.data_client.get_inventory_item(product_name) + if not item: + message = f"Product '{product_name}' not found in inventory" + logger.warning(f"⚠️ Order {order_id}: {message}") + return False, message + + available = item['quantity'] - item['reservedQuantity'] + if available < quantity: + message = f"Insufficient stock for {product_name}. Available: {available}, Requested: {quantity}" + logger.warning(f"⚠️ Order {order_id}: {message}") + return False, message + + logger.info(f"🔄 Order {order_id}: Availability confirmed for {quantity}x {product_name}") + + new_reserved_quantity = item['reservedQuantity'] + quantity + if not self.data_client.update_inventory(product_name, item['quantity'], new_reserved_quantity): + message = f"Failed to reserve stock for {product_name}" + logger.error(f"❌ Order {order_id}: {message}") + return False, message + + if not self.data_client.fulfill_order(product_name, quantity): + self.data_client.cancel_reservation(product_name, quantity) + message = f"Failed to fulfill order for {product_name}" + logger.error(f"❌ Order {order_id}: {message}") + return False, message + + message = f"Successfully fulfilled {quantity}x {product_name}" + logger.info(f"✅ Order {order_id}: {message}") + return True, message - except Exception as e: - logger.error(f"❌ Error processing order {order_id}: {e}") - return False, f"Internal error processing order: {str(e)}" def _send_order_status_update(self, order_id: str, status: str, message: str) -> None: """Send order status update via RabbitMQ""" diff --git a/order-service/src/api.ts b/order-service/src/api.ts index ae36061..9eba49c 100644 --- a/order-service/src/api.ts +++ b/order-service/src/api.ts @@ -72,35 +72,8 @@ class OrderService { console.log('📁 Database operations delegated to data-service'); } - async loadDatabase(): Promise { - try { - const response = await axios.get(`${this.dataServiceUrl}/orders`); - const orders = response.data.orders.map((order: any) => ({ - ...order, - createdAt: new Date(order.createdAt), - updatedAt: new Date(order.updatedAt), - })); - - if (orders.length > 0) { - console.log(`📚 Loaded ${orders.length} orders from data service`); - } - - return { orders, lastId: orders.length }; - } catch (error) { - console.error('❌ Error loading orders from data service:', error); - throw error; - } - } - - async saveDatabase(db: OrderDatabase): Promise { - // Note: Individual order operations are handled by data-service - // This method is kept for compatibility but doesn't perform file operations - console.log(`💾 Database state synchronized via data-service API`); - } async createOrder(request: OrderCreateRequest): Promise { - const db = await this.loadDatabase(); - // Find product in catalog const product = this.products.find( (p) => p.name.toLowerCase() === request.productName.toLowerCase() @@ -198,10 +171,20 @@ class OrderService { } async getAllOrders(): Promise { - const db = await this.loadDatabase(); - return db.orders.sort( - (a, b) => b.createdAt.getTime() - a.createdAt.getTime() - ); + try { + const response = await axios.get(`${this.dataServiceUrl}/orders`); + const orders = response.data.orders.map((order: any) => ({ + ...order, + createdAt: new Date(order.createdAt), + updatedAt: new Date(order.updatedAt), + })); + return orders.sort( + (a, b) => b.createdAt.getTime() - a.createdAt.getTime() + ); + } catch (error) { + console.error('❌ Error loading orders from data service:', error); + throw error; + } } async updateOrderStatus( @@ -209,29 +192,7 @@ class OrderService { status: OrderStatus, message?: string ): Promise { - const db = await this.loadDatabase(); - const order = db.orders.find((o) => o.id === orderId); - - if (!order) { - return null; - } - - const oldStatus = order.status; - console.log(`🔄 Updating order ${orderId} from ${oldStatus} to ${status}`); - - const freshDb = await this.loadDatabase(); - const freshOrder = freshDb.orders.find((o) => o.id === orderId); - - if (!freshOrder) { - console.warn(`⚠️ Order ${orderId} not found after reload`); - return null; - } - - if (freshOrder.status !== oldStatus) { - console.log( - `🔀 Order ${orderId} status changed during processing: ${oldStatus} -> ${freshOrder.status}` - ); - } + console.log(`🔄 Updating order ${orderId} to ${status}`); try { const response = await axios.put( @@ -246,21 +207,27 @@ class OrderService { throw new Error('Failed to update order status in data service'); } - order.status = status; - order.updatedAt = new Date(); - if (message) { - order.notes = (order.notes ? order.notes + '\n' : '') + message; + const updatedOrder = response.data.order; + updatedOrder.createdAt = new Date(updatedOrder.createdAt); + updatedOrder.updatedAt = new Date(updatedOrder.updatedAt); + + if (process.env.NODE_ENV !== 'test') { + console.log(`📝 Order ${orderId} status updated to: ${status}`); } + + return updatedOrder; } catch (error) { + const axiosError = error as any; + if ( + axios.isAxiosError(axiosError) && + axiosError.response?.status === 404 + ) { + console.warn(`⚠️ Order ${orderId} not found in data service`); + return null; + } console.error('❌ Failed to update order status in data service:', error); throw error; } - - if (process.env.NODE_ENV !== 'test') { - console.log(`📝 Order ${orderId} status updated to: ${status}`); - } - - return order; } getProductCatalog(): Product[] { From 11b8be2275242e97f0bbd672ffdd5d3c231437f2 Mon Sep 17 00:00:00 2001 From: fishmeister09 Date: Thu, 25 Sep 2025 14:26:41 +0530 Subject: [PATCH 2/2] fixed some type errors --- order-service/src/api.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/order-service/src/api.ts b/order-service/src/api.ts index 9eba49c..23c03d7 100644 --- a/order-service/src/api.ts +++ b/order-service/src/api.ts @@ -72,7 +72,6 @@ class OrderService { console.log('📁 Database operations delegated to data-service'); } - async createOrder(request: OrderCreateRequest): Promise { // Find product in catalog const product = this.products.find( @@ -179,7 +178,7 @@ class OrderService { updatedAt: new Date(order.updatedAt), })); return orders.sort( - (a, b) => b.createdAt.getTime() - a.createdAt.getTime() + (a: Order, b: Order) => b.createdAt.getTime() - a.createdAt.getTime() ); } catch (error) { console.error('❌ Error loading orders from data service:', error);