107 int processors = Runtime.getRuntime().availableProcessors();
109 ExecutorService localExecutor = Executors.newCachedThreadPool();
110 Transport transport =
workCache.getCacheManager().getTransport();
111 boolean distributed = transport != null;
112 ExecutorService executorService = distributed ?
new DefaultExecutorService(
workCache, localExecutor) : localExecutor;
117 while (!state.isFinished()) {
118 int nodesCount = transport==null ? 1 : transport.getMembers().size();
119 int distributedWorkersCount = processors * nodesCount;
121 log.debugf(
"Starting next iteration with %d workers", distributedWorkersCount);
123 List<Integer> segments = state.getUnfinishedSegments(distributedWorkersCount);
125 if (
log.isTraceEnabled()) {
126 log.trace(
"unfinished segments for this iteration: " + segments);
129 List<Future<WorkerResult>> futures =
new LinkedList<>();
130 for (Integer segment : segments) {
131 SessionInitializerWorker worker =
new SessionInitializerWorker();
137 Future<WorkerResult> future = executorService.submit(worker);
141 for (Future<WorkerResult> future : futures) {
143 WorkerResult result = future.get();
145 if (result.getSuccess()) {
146 int computedSegment = result.getSegment();
147 state.markSegmentFinished(computedSegment);
149 if (
log.isTraceEnabled()) {
150 log.tracef(
"Segment %d failed to compute", result.getSegment());
153 }
catch (InterruptedException ie) {
155 log.error(
"Interruped exception when computed future. Errors: " + errors, ie);
156 }
catch (ExecutionException ee) {
158 log.error(
"ExecutionException when computed future. Errors: " + errors, ee);
163 throw new RuntimeException(
"Maximum count of worker errors occured. Limit was " +
maxErrors +
". See server.log for details");
168 log.debugf(
"New initializer state pushed. The state is: %s", state);
176 executorService.shutdown();
178 localExecutor.shutdown();
void saveStateToCache(final InitializerState state)
Definition: BaseCacheInitializer.java:84
static final Logger log
Definition: InfinispanCacheInitializer.java:47
final int maxErrors
Definition: InfinispanCacheInitializer.java:49
final Cache< String, Serializable > workCache
Definition: BaseCacheInitializer.java:42
void afterAllSessionsLoaded(BaseCacheInitializer initializer)
final SessionLoader sessionLoader
Definition: BaseCacheInitializer.java:43