/*
 * Decompiled with CFR 0.152.
 */
package org.keycloak.cluster.infinispan;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.context.Flag;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.persistence.remote.RemoteStore;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.infinispan.TaskCallback;
import org.keycloak.cluster.infinispan.WrapperClusterEvent;
import org.keycloak.common.util.ConcurrentMultivaluedHashMap;
import org.keycloak.common.util.Retry;
import org.keycloak.connections.infinispan.DefaultInfinispanConnectionProviderFactory;
import org.keycloak.executors.ExecutorsProvider;
import org.keycloak.models.KeycloakSession;

public class InfinispanNotificationsManager {
    protected static final Logger logger = Logger.getLogger(InfinispanNotificationsManager.class);
    private static final int BACKOFF_BASE_MILLIS = 10;
    private static final int MAX_BACKOFF_RETRIES = 10;
    private final ConcurrentMultivaluedHashMap<String, ClusterListener> listeners = new ConcurrentMultivaluedHashMap();
    private final ConcurrentMap<String, TaskCallback> taskCallbacks = new ConcurrentHashMap<String, TaskCallback>();
    private final Cache<String, Object> workCache;
    private final RemoteCache<String, Object> workRemoteCache;
    private final String myAddress;
    private final String mySite;
    private final ExecutorService listenersExecutor;

    protected InfinispanNotificationsManager(Cache<String, Object> workCache, RemoteCache<String, Object> workRemoteCache, String myAddress, String mySite, ExecutorService listenersExecutor) {
        this.workCache = workCache;
        this.workRemoteCache = workRemoteCache;
        this.myAddress = myAddress;
        this.mySite = mySite;
        this.listenersExecutor = listenersExecutor;
    }

    public static InfinispanNotificationsManager create(KeycloakSession session, Cache<String, Object> workCache, String myAddress, String mySite, Set<RemoteStore> remoteStores) {
        RemoteCache workRemoteCache = null;
        if (!remoteStores.isEmpty()) {
            RemoteStore remoteStore = remoteStores.iterator().next();
            workRemoteCache = remoteStore.getRemoteCache();
            if (mySite == null) {
                throw new IllegalStateException("Multiple datacenters available, but site name is not configured! Check your configuration");
            }
        }
        ExecutorService listenersExecutor = workRemoteCache == null ? null : ((ExecutorsProvider)session.getProvider(ExecutorsProvider.class)).getExecutor("work-cache-event-listener");
        InfinispanNotificationsManager manager = new InfinispanNotificationsManager(workCache, (RemoteCache<String, Object>)workRemoteCache, myAddress, mySite, listenersExecutor);
        workCache.addListener((Object)manager.new CacheEntryListener());
        logger.debugf("Added listener for infinispan cache: %s", (Object)workCache.getName());
        if (workRemoteCache != null) {
            InfinispanNotificationsManager infinispanNotificationsManager = manager;
            Objects.requireNonNull(infinispanNotificationsManager);
            workRemoteCache.addClientListener((Object)infinispanNotificationsManager.new HotRodListener((RemoteCache<String, Object>)workRemoteCache));
            logger.debugf("Added listener for HotRod remoteStore cache: %s", (Object)workRemoteCache.getName());
        }
        return manager;
    }

    void registerListener(String taskKey, ClusterListener task) {
        this.listeners.add((Object)taskKey, (Object)task);
    }

    TaskCallback registerTaskCallback(String taskKey, TaskCallback callback) {
        TaskCallback existing = this.taskCallbacks.putIfAbsent(taskKey, callback);
        if (existing != null) {
            return existing;
        }
        return callback;
    }

    void notify(String taskKey, Collection<? extends ClusterEvent> events, boolean ignoreSender, ClusterProvider.DCNotify dcNotify) {
        if (events == null || events.isEmpty()) {
            return;
        }
        WrapperClusterEvent wrappedEvent = WrapperClusterEvent.wrap(taskKey, events, this.myAddress, this.mySite, dcNotify, ignoreSender);
        String eventKey = UUID.randomUUID().toString();
        if (logger.isTraceEnabled()) {
            logger.tracef("Sending event with key %s: %s", (Object)eventKey, events);
        }
        if (dcNotify == ClusterProvider.DCNotify.LOCAL_DC_ONLY || this.workRemoteCache == null) {
            this.workCache.getAdvancedCache().withFlags(new Flag[]{Flag.IGNORE_RETURN_VALUES, Flag.SKIP_CACHE_STORE}).put((Object)eventKey, (Object)wrappedEvent, 120L, TimeUnit.SECONDS);
        } else {
            Retry.executeWithBackoff(iteration -> {
                try {
                    DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() -> this.workRemoteCache.put((Object)eventKey, (Object)wrappedEvent, 120L, TimeUnit.SECONDS));
                }
                catch (HotRodClientException re) {
                    if (logger.isDebugEnabled()) {
                        logger.debugf((Throwable)re, "Failed sending notification to remote cache '%s'. Key: '%s', iteration '%s'. Will try to retry the task", (Object)this.workRemoteCache.getName(), (Object)eventKey, (Object)iteration);
                    }
                    throw re;
                }
            }, (int)10, (int)10);
        }
    }

    private void eventReceived(String key, Object obj) {
        List myListeners;
        if (!(obj instanceof WrapperClusterEvent)) {
            if (obj == null && !key.startsWith("task::")) {
                logger.warnf("Event object wasn't available in remote cache after event was received. Event key: %s", (Object)key);
            }
            return;
        }
        WrapperClusterEvent event = (WrapperClusterEvent)obj;
        if (event.rejectEvent(this.myAddress, this.mySite)) {
            return;
        }
        String eventKey = event.getEventKey();
        if (logger.isTraceEnabled()) {
            logger.tracef("Received event: %s", (Object)event);
        }
        if ((myListeners = (List)this.listeners.get((Object)eventKey)) != null) {
            for (ClusterEvent clusterEvent : event.getDelegateEvents()) {
                myListeners.forEach(clusterEvent);
            }
        }
    }

    void taskFinished(String taskKey) {
        TaskCallback callback = (TaskCallback)this.taskCallbacks.remove(taskKey);
        if (callback != null) {
            if (logger.isDebugEnabled()) {
                logger.debugf("Finished task '%s' with '%b'", (Object)taskKey, (Object)true);
            }
            callback.setSuccess(true);
            callback.getTaskCompletedLatch().countDown();
        }
    }

    @Listener(observation=Listener.Observation.POST)
    public class CacheEntryListener {
        @CacheEntryCreated
        public void cacheEntryCreated(CacheEntryCreatedEvent<String, Object> event) {
            InfinispanNotificationsManager.this.eventReceived((String)event.getKey(), event.getValue());
        }

        @CacheEntryModified
        public void cacheEntryModified(CacheEntryModifiedEvent<String, Object> event) {
            InfinispanNotificationsManager.this.eventReceived((String)event.getKey(), event.getNewValue());
        }

        @CacheEntryRemoved
        public void cacheEntryRemoved(CacheEntryRemovedEvent<String, Object> event) {
            InfinispanNotificationsManager.this.taskFinished((String)event.getKey());
        }
    }

    @ClientListener
    public class HotRodListener {
        private final RemoteCache<String, Object> remoteCache;

        public HotRodListener(RemoteCache<String, Object> remoteCache) {
            this.remoteCache = remoteCache;
        }

        @ClientCacheEntryCreated
        public void created(ClientCacheEntryCreatedEvent<String> event) {
            this.hotrodEventReceived((String)event.getKey());
        }

        @ClientCacheEntryModified
        public void updated(ClientCacheEntryModifiedEvent<String> event) {
            this.hotrodEventReceived((String)event.getKey());
        }

        @ClientCacheEntryRemoved
        public void removed(ClientCacheEntryRemovedEvent<String> event) {
            InfinispanNotificationsManager.this.taskFinished((String)event.getKey());
        }

        private void hotrodEventReceived(String key) {
            try {
                InfinispanNotificationsManager.this.listenersExecutor.submit(() -> {
                    Supplier<Object> fetchEvent = () -> this.remoteCache.get((Object)key);
                    Object event = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(fetchEvent);
                    for (int iteration = 0; event == null && iteration < 10; ++iteration) {
                        try {
                            Thread.sleep(Retry.computeBackoffInterval((int)10, (int)iteration));
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                        event = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(fetchEvent);
                    }
                    InfinispanNotificationsManager.this.eventReceived(key, event);
                });
            }
            catch (RejectedExecutionException ree) {
                if (ree.getMessage() != null && (ree.getMessage().contains("Terminated") || ree.getMessage().contains("Shutting down"))) {
                    logger.warnf("Rejected submitting of the event for key: %s because server is shutting down or pool was terminated.", (Object)key);
                    logger.debug((Object)ree);
                }
                logger.errorf("Rejected submitting of the event for key: %s. Server going to shutdown or pool exhausted. Pool: %s", (Object)key, (Object)InfinispanNotificationsManager.this.listenersExecutor.toString());
                throw ree;
            }
        }
    }
}

