107         int processors = Runtime.getRuntime().availableProcessors();
   109         ExecutorService localExecutor = Executors.newCachedThreadPool();
   110         Transport transport = 
workCache.getCacheManager().getTransport();
   111         boolean distributed = transport != null;
   112         ExecutorService executorService = distributed ? 
new DefaultExecutorService(
workCache, localExecutor) : localExecutor;
   117             while (!state.isFinished()) {
   118                 int nodesCount = transport==null ? 1 : transport.getMembers().size();
   119                 int distributedWorkersCount = processors * nodesCount;
   121                 log.debugf(
"Starting next iteration with %d workers", distributedWorkersCount);
   123                 List<Integer> segments = state.getUnfinishedSegments(distributedWorkersCount);
   125                 if (
log.isTraceEnabled()) {
   126                     log.trace(
"unfinished segments for this iteration: " + segments);
   129                 List<Future<WorkerResult>> futures = 
new LinkedList<>();
   130                 for (Integer segment : segments) {
   131                     SessionInitializerWorker worker = 
new SessionInitializerWorker();
   137                     Future<WorkerResult> future = executorService.submit(worker);
   141                 for (Future<WorkerResult> future : futures) {
   143                         WorkerResult result = future.get();
   145                         if (result.getSuccess()) {
   146                             int computedSegment = result.getSegment();
   147                             state.markSegmentFinished(computedSegment);
   149                             if (
log.isTraceEnabled()) {
   150                                 log.tracef(
"Segment %d failed to compute", result.getSegment());
   153                     } 
catch (InterruptedException ie) {
   155                         log.error(
"Interruped exception when computed future. Errors: " + errors, ie);
   156                     } 
catch (ExecutionException ee) {
   158                         log.error(
"ExecutionException when computed future. Errors: " + errors, ee);
   163                     throw new RuntimeException(
"Maximum count of worker errors occured. Limit was " + 
maxErrors + 
". See server.log for details");
   168                 log.debugf(
"New initializer state pushed. The state is: %s", state);
   176                 executorService.shutdown();
   178             localExecutor.shutdown();
 void saveStateToCache(final InitializerState state)
Definition: BaseCacheInitializer.java:84
static final Logger log
Definition: InfinispanCacheInitializer.java:47
final int maxErrors
Definition: InfinispanCacheInitializer.java:49
final Cache< String, Serializable > workCache
Definition: BaseCacheInitializer.java:42
void afterAllSessionsLoaded(BaseCacheInitializer initializer)
final SessionLoader sessionLoader
Definition: BaseCacheInitializer.java:43