/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.event.dea.impl;

import java.util.Calendar;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.event.dea.impl.ResourceHelper;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedEventReceiver
implements EventHandler,
Runnable,
TopologyEventListener {
    private static final String TOPIC_STOPPED = "org/apache/sling/event/dea/impl/STOPPED";
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>();
    private final ResourceResolverFactory resourceResolverFactory;
    private final String slingId;
    private final String rootPath;
    private final String ownRootPath;
    private final int cleanupPeriod;
    private volatile ResourceResolver writerResolver;
    private volatile boolean running;
    private volatile Set<String> instances;
    private volatile ServiceRegistration<?> serviceRegistration;
    private AtomicInteger successCounter = new AtomicInteger();
    private AtomicInteger failureCounter = new AtomicInteger();
    private final AtomicLong eventCounter = new AtomicLong(0L);

    public DistributedEventReceiver(final BundleContext bundleContext, String rootPath, final String ownRootPath, int cleanupPeriod, ResourceResolverFactory rrFactory, SlingSettingsService settings) {
        this.rootPath = rootPath;
        this.ownRootPath = ownRootPath;
        this.resourceResolverFactory = rrFactory;
        this.slingId = settings.getSlingId();
        this.cleanupPeriod = cleanupPeriod;
        this.running = true;
        Thread writerThread = new Thread(new Runnable(){

            @Override
            public void run() {
                Hashtable<String, Object> props = new Hashtable<String, Object>();
                ((Dictionary)props).put("service.vendor", "The Apache Software Foundation");
                ((Dictionary)props).put("event.topics", "*");
                ((Dictionary)props).put("event.filter", "(event.distribute=*)");
                ((Dictionary)props).put("scheduler.period", 1800L);
                ((Dictionary)props).put("scheduler.concurrent", Boolean.FALSE);
                ((Dictionary)props).put("scheduler.threadpool", "org-apache-sling-event-dea");
                ServiceRegistration reg = bundleContext.registerService(new String[]{EventHandler.class.getName(), Runnable.class.getName(), TopologyEventListener.class.getName()}, (Object)DistributedEventReceiver.this, props);
                DistributedEventReceiver.this.serviceRegistration = reg;
                try {
                    DistributedEventReceiver.this.writerResolver = DistributedEventReceiver.this.resourceResolverFactory.getServiceResourceResolver(null);
                    ResourceUtil.getOrCreateResource((ResourceResolver)DistributedEventReceiver.this.writerResolver, (String)ownRootPath, (String)"sling:Folder", (String)"sling:Folder", (boolean)true);
                }
                catch (Exception e) {
                    DistributedEventReceiver.this.logger.error("Error during resource resolver creation.", (Throwable)e);
                    DistributedEventReceiver.this.running = false;
                }
                try {
                    DistributedEventReceiver.this.processWriteQueue();
                }
                catch (Throwable t) {
                    DistributedEventReceiver.this.logger.error("Writer thread stopped with exception: " + t.getMessage(), t);
                    DistributedEventReceiver.this.running = false;
                }
                if (DistributedEventReceiver.this.writerResolver != null) {
                    DistributedEventReceiver.this.writerResolver.close();
                    DistributedEventReceiver.this.writerResolver = null;
                }
            }
        });
        writerThread.start();
    }

    public void stop() {
        if (this.serviceRegistration != null) {
            this.serviceRegistration.unregister();
            this.serviceRegistration = null;
        }
        this.running = false;
        try {
            this.writeQueue.put(new Event(TOPIC_STOPPED, (Dictionary)null));
        }
        catch (InterruptedException e) {
            this.ignoreException(e);
            Thread.currentThread().interrupt();
        }
    }

    private void processWriteQueue() {
        while (this.running) {
            Event event = null;
            try {
                event = this.writeQueue.take();
            }
            catch (InterruptedException e) {
                this.ignoreException(e);
                Thread.currentThread().interrupt();
                this.running = false;
            }
            if (event == null || !this.running) continue;
            try {
                this.writeEvent(event);
                this.successCounter.incrementAndGet();
            }
            catch (Exception e) {
                this.logger.error("Exception during writing the event to the resource tree.", (Throwable)e);
                this.failureCounter.incrementAndGet();
            }
        }
    }

    private void writeEvent(Event event) throws PersistenceException {
        Calendar now = Calendar.getInstance();
        StringBuilder sb = new StringBuilder(this.ownRootPath);
        sb.append('/');
        sb.append(now.get(1));
        sb.append('/');
        sb.append(now.get(2) + 1);
        sb.append('/');
        sb.append(now.get(5));
        sb.append('/');
        sb.append(now.get(11));
        sb.append('/');
        sb.append(now.get(12));
        sb.append('/');
        sb.append("event-");
        sb.append(String.valueOf(this.eventCounter.getAndIncrement()));
        HashMap<String, Object> properties = new HashMap<String, Object>();
        String[] propNames = event.getPropertyNames();
        if (propNames != null && propNames.length > 0) {
            for (String propName : propNames) {
                properties.put(propName, event.getProperty(propName));
            }
        }
        properties.remove("event.distribute");
        properties.put("event.topics", event.getTopic());
        properties.put("event.application", this.slingId);
        Object oldRT = properties.get("sling:resourceType");
        if (oldRT != null) {
            properties.put("event.dea.sling:resourceType", oldRT);
        }
        properties.put("sling:resourceType", "sling/distributed/event");
        this.writerResolver.refresh();
        ResourceUtil.getOrCreateResource((ResourceResolver)this.writerResolver, (String)sb.toString(), properties, (String)"sling:Folder", (boolean)true);
    }

    public void handleEvent(Event event) {
        try {
            this.writeQueue.put(event);
        }
        catch (InterruptedException ex) {
            this.ignoreException(ex);
            Thread.currentThread().interrupt();
        }
    }

    private void ignoreException(Exception e) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Ignored exception " + e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public void run() {
        this.cleanUpObsoleteInstances();
        this.cleanUpObsoleteEvents();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanUpObsoleteInstances() {
        Set<String> slingIds = this.instances;
        if (slingIds != null) {
            this.instances = null;
            this.logger.debug("Checking for old instance trees for distributed events.");
            try (ResourceResolver resolver = null;){
                resolver = this.resourceResolverFactory.getServiceResourceResolver(null);
                Resource baseResource = resolver.getResource(this.rootPath);
                if (baseResource != null) {
                    ResourceHelper.BatchResourceRemover brr = ResourceHelper.getBatchResourceRemover(50);
                    Iterator iter = baseResource.listChildren();
                    while (iter.hasNext()) {
                        Resource rootResource = (Resource)iter.next();
                        if (slingIds.contains(rootResource.getName())) continue;
                        brr.delete(rootResource);
                    }
                    resolver.commit();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanUpObsoleteEvents() {
        if (this.cleanupPeriod > 0) {
            this.logger.debug("Cleaning up distributed events, removing all entries older than {} minutes.", (Object)this.cleanupPeriod);
            try (ResourceResolver resolver = null;){
                resolver = this.resourceResolverFactory.getServiceResourceResolver(null);
                ResourceHelper.BatchResourceRemover brr = ResourceHelper.getBatchResourceRemover(50);
                Resource baseResource = resolver.getResource(this.ownRootPath);
                if (baseResource != null) {
                    Calendar oldDate = Calendar.getInstance();
                    oldDate.add(12, -1 * this.cleanupPeriod);
                    int oldYear = oldDate.get(1);
                    Iterator yearIter = baseResource.listChildren();
                    while (yearIter.hasNext()) {
                        Resource yearResource = (Resource)yearIter.next();
                        int year = Integer.valueOf(yearResource.getName());
                        if (year < oldYear) {
                            brr.delete(yearResource);
                            continue;
                        }
                        if (year != oldYear) continue;
                        int oldMonth = oldDate.get(2) + 1;
                        Iterator monthIter = yearResource.listChildren();
                        while (monthIter.hasNext()) {
                            Resource monthResource = (Resource)monthIter.next();
                            int month = Integer.valueOf(monthResource.getName());
                            if (month < oldMonth) {
                                brr.delete(monthResource);
                                continue;
                            }
                            if (month != oldMonth) continue;
                            int oldDay = oldDate.get(5);
                            Iterator dayIter = monthResource.listChildren();
                            while (dayIter.hasNext()) {
                                Resource dayResource = (Resource)dayIter.next();
                                int day = Integer.valueOf(dayResource.getName());
                                if (day < oldDay) {
                                    brr.delete(dayResource);
                                    continue;
                                }
                                if (day != oldDay) continue;
                                int oldHour = oldDate.get(11);
                                Iterator hourIter = dayResource.listChildren();
                                while (hourIter.hasNext()) {
                                    Resource hourResource = (Resource)hourIter.next();
                                    int hour = Integer.valueOf(hourResource.getName());
                                    if (hour < oldHour) {
                                        brr.delete(hourResource);
                                        continue;
                                    }
                                    if (hour != oldHour) continue;
                                    int oldMinute = oldDate.get(12);
                                    Iterator minuteIter = hourResource.listChildren();
                                    while (minuteIter.hasNext()) {
                                        Resource minuteResource = (Resource)minuteIter.next();
                                        int minute = Integer.valueOf(minuteResource.getName());
                                        if (minute >= oldMinute) continue;
                                        brr.delete(minuteResource);
                                    }
                                }
                            }
                        }
                    }
                }
                resolver.commit();
            }
        }
    }

    public void handleTopologyEvent(TopologyEvent event) {
        if (event.getType() == TopologyEvent.Type.TOPOLOGY_CHANGING) {
            this.instances = null;
        } else if ((event.getType() == TopologyEvent.Type.TOPOLOGY_CHANGED || event.getType() == TopologyEvent.Type.TOPOLOGY_INIT) && event.getNewView().getLocalInstance().isLeader()) {
            HashSet<String> set = new HashSet<String>();
            for (InstanceDescription desc : event.getNewView().getInstances()) {
                set.add(desc.getSlingId());
            }
            this.instances = set;
        }
    }

    public int getSuccessCounter() {
        return this.successCounter.get();
    }

    public int getFailureCounter() {
        return this.failureCounter.get();
    }
}

