-
Notifications
You must be signed in to change notification settings - Fork 0
feat(event): optimize the event service #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📋 Review Summary
This pull request introduces a significant and well-designed optimization for the event service. The new version-based approach to switch between the old and new event mechanisms is a good strategy for a phased rollout. The new event service is well-structured into different services for handling historical, real-time, and solidified events, which improves modularity. The addition of unit tests for the new services is commendable and crucial for ensuring the stability of this critical component.
🔍 General Feedback
- The overall implementation is of high quality. The code is well-organized and the logic for handling chain reorganization in the new event service seems robust.
- I've pointed out a few minor areas for improvement regarding thread safety and configuration management. Addressing these will make the implementation even more solid.
- Good job on the extensive test coverage for the new feature.
|
|
||
| isRunning = true; | ||
|
|
||
| new Thread(() -> syncEvent()).start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟢 For consistency with other services in this PR like BlockEventLoad, RealtimeEventService, and SolidEventService, consider using ExecutorServiceManager to create and manage the thread instead of creating a new Thread directly.
| isRunning = true; | |
| new Thread(() -> syncEvent()).start(); | |
| private final ScheduledExecutorService executor = ExecutorServiceManager | |
| .newSingle-threadScheduledExecutor("history-event"); | |
| public void init() { | |
| if (instance.getStartSyncBlockNum() <= 0) { | |
| initEventService(manager.getChainBaseManager().getHeadBlockId()); | |
| return; | |
| } | |
| isRunning = true; | |
| executor.schedule(() -> syncEvent(), 0, TimeUnit.MILLISECONDS); | |
| logger.info("History event service start."); | |
| } |
| private SolidEventService solidEventService; | ||
|
|
||
| private static BlockingQueue<Event> queue = new LinkedBlockingQueue<>(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟢 The maxEventSize is hardcoded. It would be more flexible if this value was configurable, for instance, through the application configuration file. This would allow operators to tune the queue size based on their specific hardware and load expectations without changing the code.
| private int maxEventSize = Args.getInstance().getEventPluginConfig().getSendQueueLength(); |
| long num = blockEvent.getBlockId().getNum(); | ||
| List<BlockEvent> list = numMap.get(num); | ||
| if (list == null) { | ||
| list = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 The add method has a potential race condition. If multiple threads call this method with the same blockEvent.getBlockId().getNum() and the map does not yet contain this number, they could both execute list = new ArrayList<>() and numMap.put(num, list), leading to one of the lists being lost. Also, ArrayList is not thread-safe, so list.add(blockEvent) could lead to issues if multiple threads add to the same list.
Consider using computeIfAbsent and a thread-safe list implementation like CopyOnWriteArrayList or synchronizing the access to the list.
| long num = blockEvent.getBlockId().getNum(); | |
| List<BlockEvent> list = numMap.get(num); | |
| if (list == null) { | |
| list = new ArrayList<>(); | |
| long num = blockEvent.getBlockId().getNum(); | |
| List<BlockEvent> list = numMap.computeIfAbsent(num, k -> new CopyOnWriteArrayList<>()); | |
| list.add(blockEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pull request introduces a significant and well-structured optimization for the event service. The changes are modular, include comprehensive unit tests, and thoughtfully handle backward compatibility through a versioning mechanism. The new event service appears to be designed with concurrency and chain reorganizations in mind.
🔍 General Feedback
- The introduction of a dedicated event service with clear separation of concerns for historical, real-time, and solid block processing is a strong architectural improvement.
- The addition of unit tests for the new components is excellent and crucial for such a critical part of the system.
- The use of
synchronizedblocks and concurrent data structures for thread safety is noted and appreciated.
Overall, this is a high-quality contribution. The provided comments are minor suggestions for potential refinement.
|
|
||
| @Slf4j(topic = "event") | ||
| public class BlockEventCache { | ||
| @Getter | ||
| private static volatile long solidNum; | ||
|
|
||
| @Getter | ||
| private static volatile BlockEvent head; | ||
|
|
||
| @Getter | ||
| private static volatile BlockCapsule.BlockId solidId; | ||
|
|
||
| private static Map<BlockCapsule.BlockId, BlockEvent> blockEventMap = new ConcurrentHashMap<>(); | ||
|
|
||
| private static Map<Long, List<BlockEvent>> numMap = new ConcurrentHashMap<>(); | ||
|
|
||
| public static BlockEvent getBlockEvent(BlockCapsule.BlockId blockId) { | ||
| return blockEventMap.get(blockId); | ||
| } | ||
|
|
||
| public static void init(BlockCapsule.BlockId blockId) { | ||
| blockEventMap.clear(); | ||
| numMap.clear(); | ||
| solidNum = blockId.getNum(); | ||
| head = new BlockEvent(blockId); | ||
| solidId = blockId; | ||
| List<BlockEvent> list = new ArrayList<>(); | ||
| list.add(head); | ||
| numMap.put(blockId.getNum(), list); | ||
| blockEventMap.put(blockId, head); | ||
| } | ||
|
|
||
| public static void add(BlockEvent blockEvent) throws EventException { | ||
| logger.info("Add block event, {}", blockEvent.getBlockId().getString(), | ||
| blockEvent.getParentId().getString()); | ||
| if (blockEventMap.get(blockEvent.getParentId()) == null) { | ||
| throw new EventException("unlink BlockEvent, " | ||
| + blockEvent.getBlockId().getString() + ", " | ||
| + blockEvent.getParentId().getString()); | ||
| } | ||
|
|
||
| long num = blockEvent.getBlockId().getNum(); | ||
| List<BlockEvent> list = numMap.get(num); | ||
| if (list == null) { | ||
| list = new ArrayList<>(); | ||
| numMap.put(num, list); | ||
| } | ||
| list.add(blockEvent); | ||
|
|
||
| blockEventMap.put(blockEvent.getBlockId(), blockEvent); | ||
|
|
||
| if (num > head.getBlockId().getNum()) { | ||
| head = blockEvent; | ||
| } | ||
|
|
||
| if (blockEvent.getSolidId().getNum() > solidId.getNum()) { | ||
| solidId = blockEvent.getSolidId(); | ||
| } | ||
| } | ||
|
|
||
| public static void remove(BlockCapsule.BlockId solidId) { | ||
| logger.info("Remove solidId {}, solidNum {}, {}, {}", | ||
| solidId.getString(), solidNum, numMap.size(), blockEventMap.size()); | ||
| numMap.forEach((k, v) -> { | ||
| if (k < solidId.getNum()) { | ||
| v.forEach(value -> blockEventMap.remove(value.getBlockId())); | ||
| numMap.remove(k); | ||
| } | ||
| }); | ||
| solidNum = solidId.getNum(); | ||
| } | ||
|
|
||
| public static List<BlockEvent> getSolidBlockEvents(BlockCapsule.BlockId solidId) { | ||
| List<BlockEvent> blockEvents = new ArrayList<>(); | ||
| BlockCapsule.BlockId tmp = solidId; | ||
| while (tmp.getNum() > solidNum) { | ||
| BlockEvent blockEvent = blockEventMap.get(tmp); | ||
| blockEvents.add(blockEvent); | ||
| tmp = blockEvent.getParentId(); | ||
| } | ||
|
|
||
| return Lists.reverse(blockEvents); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider refactoring this class to be a Spring-managed singleton component (@Component with singleton scope). This would align better with the dependency injection pattern used elsewhere in the project, improve testability by allowing the cache to be mocked, and make the lifecycle management clearer.
| @Slf4j(topic = "event") | |
| public class BlockEventCache { | |
| @Getter | |
| private static volatile long solidNum; | |
| @Getter | |
| private static volatile BlockEvent head; | |
| @Getter | |
| private static volatile BlockCapsule.BlockId solidId; | |
| private static Map<BlockCapsule.BlockId, BlockEvent> blockEventMap = new ConcurrentHashMap<>(); | |
| private static Map<Long, List<BlockEvent>> numMap = new ConcurrentHashMap<>(); | |
| public static BlockEvent getBlockEvent(BlockCapsule.BlockId blockId) { | |
| return blockEventMap.get(blockId); | |
| } | |
| public static void init(BlockCapsule.BlockId blockId) { | |
| blockEventMap.clear(); | |
| numMap.clear(); | |
| solidNum = blockId.getNum(); | |
| head = new BlockEvent(blockId); | |
| solidId = blockId; | |
| List<BlockEvent> list = new ArrayList<>(); | |
| list.add(head); | |
| numMap.put(blockId.getNum(), list); | |
| blockEventMap.put(blockId, head); | |
| } | |
| public static void add(BlockEvent blockEvent) throws EventException { | |
| logger.info("Add block event, {}", blockEvent.getBlockId().getString(), | |
| blockEvent.getParentId().getString()); | |
| if (blockEventMap.get(blockEvent.getParentId()) == null) { | |
| throw new EventException("unlink BlockEvent, " | |
| + blockEvent.getBlockId().getString() + ", " | |
| + blockEvent.getParentId().getString()); | |
| } | |
| long num = blockEvent.getBlockId().getNum(); | |
| List<BlockEvent> list = numMap.get(num); | |
| if (list == null) { | |
| list = new ArrayList<>(); | |
| numMap.put(num, list); | |
| } | |
| list.add(blockEvent); | |
| blockEventMap.put(blockEvent.getBlockId(), blockEvent); | |
| if (num > head.getBlockId().getNum()) { | |
| head = blockEvent; | |
| } | |
| if (blockEvent.getSolidId().getNum() > solidId.getNum()) { | |
| solidId = blockEvent.getSolidId(); | |
| } | |
| } | |
| public static void remove(BlockCapsule.BlockId solidId) { | |
| logger.info("Remove solidId {}, solidNum {}, {}, {}", | |
| solidId.getString(), solidNum, numMap.size(), blockEventMap.size()); | |
| numMap.forEach((k, v) -> { | |
| if (k < solidId.getNum()) { | |
| v.forEach(value -> blockEventMap.remove(value.getBlockId())); | |
| numMap.remove(k); | |
| } | |
| }); | |
| solidNum = solidId.getNum(); | |
| } | |
| public static List<BlockEvent> getSolidBlockEvents(BlockCapsule.BlockId solidId) { | |
| List<BlockEvent> blockEvents = new ArrayList<>(); | |
| BlockCapsule.BlockId tmp = solidId; | |
| while (tmp.getNum() > solidNum) { | |
| BlockEvent blockEvent = blockEventMap.get(tmp); | |
| blockEvents.add(blockEvent); | |
| tmp = blockEvent.getParentId(); | |
| } | |
| return Lists.reverse(blockEvents); | |
| } | |
| } | |
| @Component | |
| @Slf4j(topic = "event") | |
| public class BlockEventCache { | |
| @Getter | |
| private volatile long solidNum; | |
| @Getter | |
| private volatile BlockEvent head; | |
| @Getter | |
| private volatile BlockCapsule.BlockId solidId; | |
| private Map<BlockCapsule.BlockId, BlockEvent> blockEventMap = new ConcurrentHashMap<>(); | |
| private Map<Long, List<BlockEvent>> numMap = new ConcurrentHashMap<>(); | |
| public BlockEvent getBlockEvent(BlockCapsule.BlockId blockId) { | |
| return blockEventMap.get(blockId); | |
| } | |
| public void init(BlockCapsule.BlockId blockId) { | |
| blockEventMap.clear(); | |
| numMap.clear(); | |
| solidNum = blockId.getNum(); | |
| head = new BlockEvent(blockId); | |
| solidId = blockId; | |
| List<BlockEvent> list = new ArrayList<>(); | |
| list.add(head); | |
| numMap.put(blockId.getNum(), list); | |
| blockEventMap.put(blockId, head); | |
| } | |
| public void add(BlockEvent blockEvent) throws EventException { | |
| logger.info("Add block event, {}", blockEvent.getBlockId().getString(), | |
| blockEvent.getParentId().getString()); | |
| if (blockEventMap.get(blockEvent.getParentId()) == null) { | |
| throw new EventException("unlink BlockEvent, " | |
| + blockEvent.getBlockId().getString() + ", " | |
| + blockEvent.getParentId().getString()); | |
| } | |
| long num = blockEvent.getBlockId().getNum(); | |
| List<BlockEvent> list = numMap.get(num); | |
| if (list == null) { | |
| list = new ArrayList<>(); | |
| numMap.put(num, list); | |
| } | |
| list.add(blockEvent); | |
| blockEventMap.put(blockEvent.getBlockId(), blockEvent); | |
| if (num > head.getBlockId().getNum()) { | |
| head = blockEvent; | |
| } | |
| if (blockEvent.getSolidId().getNum() > solidId.getNum()) { | |
| solidId = blockEvent.getSolidId(); | |
| } | |
| } | |
| public void remove(BlockCapsule.BlockId solidId) { | |
| logger.info("Remove solidId {}, solidNum {}, {}, {}", | |
| solidId.getString(), solidNum, numMap.size(), blockEventMap.size()); | |
| numMap.forEach((k, v) -> { | |
| if (k < solidId.getNum()) { | |
| v.forEach(value -> blockEventMap.remove(value.getBlockId())); | |
| numMap.remove(k); | |
| } | |
| }); | |
| solidNum = solidId.getNum(); | |
| } | |
| public List<BlockEvent> getSolidBlockEvents(BlockCapsule.BlockId solidId) { | |
| List<BlockEvent> blockEvents = new ArrayList<>(); | |
| BlockCapsule.BlockId tmp = solidId; | |
| while (tmp.getNum() > solidNum) { | |
| BlockEvent blockEvent = blockEventMap.get(tmp); | |
| blockEvents.add(blockEvent); | |
| tmp = blockEvent.getParentId(); | |
| } | |
| return Lists.reverse(blockEvents); | |
| } | |
| } |
| } | ||
| initEventService(manager.getChainBaseManager().getBlockIdByNum(endNum)); | ||
| } catch (InterruptedException e1) { | ||
| logger.warn("Sync event interrupted."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For improved clarity and maintainability, consider adding a comment to explain the purpose of this sleep.
| logger.warn("Sync event interrupted."); | |
| Thread.sleep(30); // Prevent busy-waiting while syncing history |
7e22411 to
8767e88
Compare
What does this PR do?
Optimize the event service, refer to issue tronprotocol#6153
Why are these changes required?
This PR has been tested by:
Follow up
Extra details