|
63 | 63 | import java.util.Queue;
|
64 | 64 | import java.util.Set;
|
65 | 65 | import java.util.UUID;
|
66 |
| -import java.util.concurrent.ConcurrentHashMap; |
67 | 66 | import java.util.concurrent.ConcurrentLinkedQueue;
|
68 | 67 | import java.util.concurrent.ExecutionException;
|
69 | 68 | import java.util.concurrent.Executors;
|
@@ -160,7 +159,6 @@ public class LocalNode extends Node implements Closeable {
|
160 | 159 | private final Optional<Path> statusFilePath;
|
161 | 160 | private final Optional<Path> sessionHistoryFilePath;
|
162 | 161 | private final Queue<SessionHistoryEntry> sessionHistory = new ConcurrentLinkedQueue<>();
|
163 |
| - private final Map<SessionId, Instant> sessionStartTimes = new ConcurrentHashMap<>(); |
164 | 162 |
|
165 | 163 | protected LocalNode(
|
166 | 164 | Tracer tracer,
|
@@ -308,6 +306,7 @@ protected LocalNode(
|
308 | 306 |
|
309 | 307 | bus.addListener(SessionStartedEvent.listener(this::recordSessionStart));
|
310 | 308 | bus.addListener(SessionClosedEvent.listener(this::recordSessionStop));
|
| 309 | + bus.addListener(NodeHeartBeatEvent.listener(this::cleanupSessionHistory)); |
311 | 310 |
|
312 | 311 | shutdown =
|
313 | 312 | () -> {
|
@@ -1138,33 +1137,58 @@ private void recordSessionStart(SessionId sessionId) {
|
1138 | 1137 | return;
|
1139 | 1138 | }
|
1140 | 1139 | Instant startTime = Instant.now();
|
1141 |
| - sessionStartTimes.put(sessionId, startTime); |
1142 | 1140 | sessionHistory.add(new SessionHistoryEntry(sessionId, startTime, null));
|
1143 | 1141 | writeSessionHistoryToFile();
|
1144 | 1142 | }
|
1145 | 1143 |
|
1146 | 1144 | private void recordSessionStop(SessionId sessionId) {
|
1147 |
| - if (!isSessionOwner(sessionId)) { |
1148 |
| - return; |
1149 |
| - } |
1150 | 1145 | Instant stopTime = Instant.now();
|
1151 |
| - Instant startTime = sessionStartTimes.remove(sessionId); |
1152 |
| - if (startTime != null) { |
1153 |
| - // Find and update the existing history entry |
1154 |
| - sessionHistory.stream() |
1155 |
| - .filter(entry -> entry.getSessionId().equals(sessionId)) |
1156 |
| - .findFirst() |
1157 |
| - .ifPresent(entry -> entry.setStopTime(stopTime)); |
1158 |
| - writeSessionHistoryToFile(); |
| 1146 | + // Find and update the existing history entry |
| 1147 | + sessionHistory.stream() |
| 1148 | + .filter(entry -> entry.getSessionId().equals(sessionId)) |
| 1149 | + .findFirst() |
| 1150 | + .ifPresent( |
| 1151 | + entry -> { |
| 1152 | + entry.setStopTime(stopTime); |
| 1153 | + writeSessionHistoryToFile(); |
| 1154 | + }); |
| 1155 | + } |
| 1156 | + |
| 1157 | + private void cleanupSessionHistory(NodeStatus status) { |
| 1158 | + int maxHistorySize = 100; |
| 1159 | + if (!status.getNodeId().equals(getId()) || sessionHistory.size() < maxHistorySize) { |
| 1160 | + return; |
1159 | 1161 | }
|
| 1162 | + |
| 1163 | + // Keep only the last 100 completed sessions |
| 1164 | + List<SessionHistoryEntry> completedSessions = |
| 1165 | + sessionHistory.stream() |
| 1166 | + .filter(entry -> entry.getStopTime() != null) |
| 1167 | + .sorted( |
| 1168 | + (a, b) -> |
| 1169 | + b.getStartTime().compareTo(a.getStartTime())) // Sort by start time descending |
| 1170 | + .limit(100) |
| 1171 | + .collect(Collectors.toList()); |
| 1172 | + |
| 1173 | + // Keep all ongoing sessions |
| 1174 | + List<SessionHistoryEntry> ongoingSessions = |
| 1175 | + sessionHistory.stream() |
| 1176 | + .filter(entry -> entry.getStopTime() == null) |
| 1177 | + .collect(Collectors.toList()); |
| 1178 | + |
| 1179 | + // Clear and rebuild the history queue |
| 1180 | + sessionHistory.clear(); |
| 1181 | + sessionHistory.addAll(completedSessions); |
| 1182 | + sessionHistory.addAll(ongoingSessions); |
| 1183 | + |
| 1184 | + // Write the cleaned history to file |
| 1185 | + writeSessionHistoryToFile(); |
1160 | 1186 | }
|
1161 | 1187 |
|
1162 | 1188 | private void writeSessionHistoryToFile() {
|
1163 | 1189 | if (sessionHistoryFilePath.isPresent()) {
|
1164 | 1190 | try {
|
1165 | 1191 | List<SessionHistoryEntry> sortedHistory = new ArrayList<>(sessionHistory);
|
1166 |
| - sortedHistory.sort((a, b) -> a.getStartTime().compareTo(b.getStartTime())); |
1167 |
| - |
1168 | 1192 | String historyJson = JSON.toJson(sortedHistory);
|
1169 | 1193 | Files.write(
|
1170 | 1194 | sessionHistoryFilePath.get(),
|
|
0 commit comments