187 EmbeddedCacheManager cacheManager =
event.getCacheManager();
188 Transport transport = cacheManager.getTransport();
191 if (transport != null && transport.isCoordinator()) {
194 Set<String> removedNodesAddresses =
convertAddresses(event.getOldMembers());
195 removedNodesAddresses.removeAll(newAddresses);
197 if (removedNodesAddresses.isEmpty()) {
201 logger.debugf(
"Nodes %s removed from cluster. Removing tasks locked by this nodes", removedNodesAddresses.toString());
203 Cache<String, Serializable> cache = cacheManager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
205 Iterator<String> toRemove = cache.entrySet().stream().filter(
new Predicate<Map.Entry<String, Serializable>>() {
208 public boolean test(Map.Entry<String, Serializable> entry) {
209 if (!(entry.getValue() instanceof LockEntry)) {
213 LockEntry lock = (LockEntry) entry.getValue();
214 return removedNodesAddresses.contains(lock.getNode());
217 }).map(
new Function<Map.Entry<String, Serializable>, String>() {
220 public String apply(Map.Entry<String, Serializable> entry) {
221 return entry.getKey();
226 while (toRemove.hasNext()) {
227 String rem = toRemove.next();
228 if (
logger.isTraceEnabled()) {
229 logger.tracef(
"Removing task %s due it's node left cluster", rem);
void taskFinished(String taskKey, boolean success)
Definition: InfinispanNotificationsManager.java:285
InfinispanNotificationsManager notificationsManager
Definition: InfinispanClusterProviderFactory.java:74
static final Logger logger
Definition: InfinispanClusterProviderFactory.java:63
Set< String > convertAddresses(Collection< Address > addresses)
Definition: InfinispanClusterProviderFactory.java:240