/*
 * Decompiled with CFR 0.152.
 */
package com.almworks.structure.confluence.helper;

import com.almworks.structure.confluence.helper.Notification;
import com.almworks.structure.confluence.helper.NotificationSender;
import com.almworks.structure.confluence.helper.NotificationTracker;
import com.almworks.structure.confluence.helper.Subscription;
import com.almworks.structure.confluence.helper.SubscriptionManager;
import com.almworks.structure.confluence.helper.util.OneElementQueue;
import io.atlassian.util.concurrent.ThreadFactories;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

public class ThrottledNotificationService
implements NotificationTracker,
DisposableBean,
InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(ThrottledNotificationService.class);
    private final long myMinimalSendInterval = TimeUnit.MILLISECONDS.toNanos(Long.getLong("structure.notifications.minimalSendInterval", 500L));
    private final int myMaxNotificationQueueSize = Integer.getInteger("structure.notification.maxNotificationQueueSize", 1000000);
    private final long myNotificationAccumulationInterval = TimeUnit.MILLISECONDS.toNanos(Long.getLong("structure.notifications.accumulationInterval", 100L));
    private final long myResendPendingNotificationsInterval = TimeUnit.MILLISECONDS.toNanos(Long.getLong("structure.notifications.resendPendingNotificationsInterval", 30000L));
    private final DelayQueue<ThrottledNotification> myNotificationsQueue = new DelayQueue();
    private final AtomicLong myNextSend = new AtomicLong(System.nanoTime());
    private final AtomicInteger myNotificationsCount = new AtomicInteger(0);
    private final SubscriptionManager mySubscriptionManager;
    private final Runnable mySendTask = () -> {
        try {
            this.send();
        }
        catch (InterruptedException e) {
            log.warn("sending notifications thread was interrupted, stopping");
        }
    };
    private final ExecutorService mySendTaskExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new OneElementQueue<Runnable>(() -> this.mySendTask), ThreadFactories.namedThreadFactory((String)"structure-notification-sender"), new ThreadPoolExecutor.DiscardPolicy());
    private final NotificationSender myNotificationSender;

    public ThrottledNotificationService(SubscriptionManager subscriptionManager, NotificationSender notificationSender) {
        this.mySubscriptionManager = subscriptionManager;
        this.myNotificationSender = notificationSender;
        log.warn("starting");
    }

    public String toString() {
        return "Structure notification service";
    }

    public void afterPropertiesSet() {
        log.warn("{} starting send task", (Object)this);
        this.mySendTaskExecutor.submit(this.mySendTask);
    }

    public void destroy() throws Exception {
        log.warn("{} shutting down", (Object)this);
        this.mySendTaskExecutor.shutdownNow();
        this.mySendTaskExecutor.awaitTermination(2L * this.myMinimalSendInterval, TimeUnit.NANOSECONDS);
    }

    @Override
    public void record(Notification notification) {
        this.add(notification);
    }

    private void add(Notification notification) {
        this.myNotificationsQueue.add(new ThrottledNotification(notification));
        log.debug("n +{}", (Object)notification);
        int nExtraEvents = this.myNotificationsQueue.size() - this.myMaxNotificationQueueSize;
        if (nExtraEvents > 0) {
            ThrottledNotification o;
            log.error("Dropping {} notifications: notification queue size exceeded {}", (Object)nExtraEvents, (Object)this.myMaxNotificationQueueSize);
            while (nExtraEvents-- > 0 && (o = (ThrottledNotification)this.myNotificationsQueue.peek()) != null) {
                this.myNotificationsQueue.remove(o);
            }
        }
    }

    private void send() throws InterruptedException {
        log.debug("send {}. waiting for subscriptions", (Object)this.myNextSend);
        List<Subscription> subscriptions = this.mySubscriptionManager.getActiveSubscriptions();
        log.debug("subscriptions {}. waiting for notifications", subscriptions);
        ThrottledNotification tn1 = this.waitForNotifications(subscriptions);
        List buffer = Collections.emptyList();
        if (tn1 != null) {
            ThrottledNotification tn2 = this.accumulateNotifications();
            buffer = new ArrayList(1 + (tn2 == null ? 0 : 1) + this.myNotificationsQueue.size());
            buffer.add(tn1);
            if (tn2 != null) {
                buffer.add(tn2);
            }
            this.myNotificationsQueue.drainTo(buffer);
        }
        Collection notifications = buffer.stream().map(ThrottledNotification::getNotification).collect(Collectors.toList());
        log.debug("sending {}", (Object)notifications);
        this.myNotificationSender.send(subscriptions, notifications);
        this.myNextSend.set(System.nanoTime() + this.myMinimalSendInterval);
    }

    @Nullable
    private ThrottledNotification waitForNotifications(List<Subscription> subscriptions) throws InterruptedException {
        if (this.myNotificationSender.hasPendingNotifications(subscriptions)) {
            return (ThrottledNotification)this.myNotificationsQueue.poll(this.myResendPendingNotificationsInterval, TimeUnit.NANOSECONDS);
        }
        return (ThrottledNotification)this.myNotificationsQueue.take();
    }

    private ThrottledNotification accumulateNotifications() throws InterruptedException {
        return (ThrottledNotification)this.myNotificationsQueue.poll(this.myNotificationAccumulationInterval, TimeUnit.NANOSECONDS);
    }

    private class ThrottledNotification
    implements Delayed {
        private final Notification myNotification;
        private final long myInstanceId;

        public ThrottledNotification(Notification notification) {
            this.myInstanceId = ThrottledNotificationService.this.myNotificationsCount.getAndIncrement();
            this.myNotification = notification;
        }

        public Notification getNotification() {
            return this.myNotification;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(ThrottledNotificationService.this.myNextSend.get() - System.nanoTime(), TimeUnit.NANOSECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.myInstanceId, ((ThrottledNotification)o).myInstanceId);
        }
    }
}

