/*
 * Decompiled with CFR 0.152.
 */
package com.atlassian.migration.agent.queue;

import com.atlassian.annotations.VisibleForTesting;
import com.atlassian.migration.MigrationDarkFeaturesManager;
import com.atlassian.migration.agent.entity.WorkItem;
import com.atlassian.migration.agent.logging.ContextLoggerFactory;
import com.atlassian.migration.agent.queue.QueueConsumer;
import com.atlassian.migration.agent.queue.QueueConsumerConfiguration;
import com.atlassian.migration.agent.service.impl.WorkItemQueue;
import com.atlassian.migration.agent.store.tx.PluginTransactionTemplate;
import com.atlassian.scheduler.JobRunner;
import com.atlassian.scheduler.JobRunnerRequest;
import com.atlassian.scheduler.JobRunnerResponse;
import com.atlassian.scheduler.SchedulerService;
import com.atlassian.scheduler.SchedulerServiceException;
import com.atlassian.scheduler.config.JobConfig;
import com.atlassian.scheduler.config.JobId;
import com.atlassian.scheduler.config.JobRunnerKey;
import com.atlassian.scheduler.config.RunMode;
import com.atlassian.scheduler.config.Schedule;
import com.atlassian.scheduler.status.JobDetails;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;

@Deprecated
public class QueueBroker
implements JobRunner {
    static final JobRunnerKey RUNNER_KEY = JobRunnerKey.of((String)"migration-plugin:-queue-broker-runner-key");
    static final JobId JOB_ID = JobId.of((String)"migration-plugin:queue-broker-job-id");
    static final Duration RESCHEDULE_INTERVAL = Duration.ofSeconds(10L);
    public static final int CONCURRENCY_LEVEL_UPPER_LIMIT = 5;
    private static final int FAILURE_TOLERANCE = 3;
    private static final Supplier<String> CONSUMER_JOB_ID_SUPPLIER = () -> "migration-plugin:" + UUID.randomUUID().toString();
    private static final Logger log = ContextLoggerFactory.getLogger(QueueBroker.class);
    private final PluginTransactionTemplate ptx;
    private final List<QueueConsumer> registeredConsumers;
    private final WorkItemQueue workItemQueue;
    private final SchedulerService schedulerService;
    private final MigrationDarkFeaturesManager migrationDarkFeaturesManager;

    public QueueBroker(PluginTransactionTemplate ptx, List<QueueConsumer> queueConsumers, WorkItemQueue workItemQueue, SchedulerService schedulerService, MigrationDarkFeaturesManager migrationDarkFeaturesManager) {
        this.ptx = ptx;
        this.workItemQueue = workItemQueue;
        this.schedulerService = schedulerService;
        this.registeredConsumers = queueConsumers;
        this.migrationDarkFeaturesManager = migrationDarkFeaturesManager;
        log.info("Started queue broker. Registered consumers: {}", this.registeredConsumers);
    }

    @PostConstruct
    public void postConstruct() throws SchedulerServiceException {
        this.schedulerService.registerJobRunner(RUNNER_KEY, (JobRunner)this);
        log.debug("Successfully registered QueueBroker job {}.", (Object)RUNNER_KEY);
        this.maybeStartPolling();
    }

    @PreDestroy
    public void cleanup() {
        this.schedulerService.unregisterJobRunner(RUNNER_KEY);
        log.debug("Successfully unregistered QueueBroker job: {}", (Object)RUNNER_KEY);
        this.stopPolling();
    }

    public void maybeStartPolling() throws SchedulerServiceException {
        if (!this.isPollingScheduled()) {
            this.schedulerService.scheduleJob(JOB_ID, this.mapSchedulerJobConfig());
            log.debug("Successfully started QueueBroker poller.");
        }
    }

    public boolean isPollingScheduled() {
        return this.schedulerService.getJobDetails(JOB_ID) != null;
    }

    public void stopPolling() {
        this.schedulerService.unscheduleJob(JOB_ID);
        log.debug("Successfully unscheduled QueueBroker job: {}", (Object)JOB_ID);
    }

    public JobConfig mapSchedulerJobConfig() {
        return JobConfig.forJobRunnerKey((JobRunnerKey)RUNNER_KEY).withRunMode(RunMode.RUN_ONCE_PER_CLUSTER).withSchedule(Schedule.forInterval((long)RESCHEDULE_INTERVAL.toMillis(), (Date)new Date(System.currentTimeMillis() + 5000L)));
    }

    public JobRunnerResponse runJob(JobRunnerRequest req) {
        if (this.migrationDarkFeaturesManager.isSchedulerFixEnabled()) {
            return JobRunnerResponse.success((String)"QueueBroker is disabled. No queued Work Items for the QueueConsumers will be processed. To enable, please set the \"migration-agent.disable.scheduler.fixes\" feature flag.");
        }
        try {
            this.registeredConsumers.forEach(this::dispatchBatchToConsumer);
            return JobRunnerResponse.success();
        }
        catch (RuntimeException e) {
            log.error("An unhandled exception occurred when processing a QueueBroker job request. Reason: {}", (Object)e.getMessage(), (Object)e);
            return JobRunnerResponse.failed((String)("QueueBroker job failed with reason " + e.getMessage()));
        }
    }

    private void dispatchBatchToConsumer(QueueConsumer consumer) {
        QueueConsumerConfiguration config = consumer.getConsumerConfiguration();
        this.workItemQueue.performWithLocking(() -> {
            List<WorkItem> batch = this.getNextBatch(config.getConsumerType(), this.getBatchSize(config.getConcurrencyLevel()));
            batch.forEach(workItem -> this.dispatch((WorkItem)workItem, consumer));
            return null;
        });
    }

    private List<WorkItem> getNextBatch(String type, int batchSize) {
        try {
            return this.ptx.write(() -> this.workItemQueue.getBatchByType(type, batchSize).stream().filter(this::recordIsReadyOrStuck).collect(Collectors.toList()));
        }
        catch (RuntimeException e) {
            log.error("An error occurred when getting the next batch for consumer type: {}. Message: {}", new Object[]{type, e.getMessage(), e});
            return Collections.emptyList();
        }
    }

    private void dispatch(WorkItem item, QueueConsumer consumer) {
        String jobId = CONSUMER_JOB_ID_SUPPLIER.get();
        try {
            this.ptx.write(() -> this.workItemQueue.claim(item.getRefId(), jobId));
            log.info("JobID {} successfully claimed work item {} for consumer {}", new Object[]{jobId, item.getRefId(), consumer.getConsumerConfiguration().getConsumerType()});
            consumer.consume(item.getRefId(), jobId);
            log.info("Consumer {} successfully consumed item with ref {}", (Object)consumer.getConsumerConfiguration().getConsumerType(), (Object)item.getRefId());
        }
        catch (RuntimeException e) {
            log.error("An error occurred when attempting to dispatch item {} - item will be skipped.", (Object)item.getRefId(), (Object)e);
            this.ptx.write(() -> this.workItemQueue.dequeue(item.getRefId()));
        }
    }

    private boolean recordIsReadyOrStuck(WorkItem workItem) {
        if (WorkItem.Status.READY == workItem.getStatus()) {
            return true;
        }
        if (WorkItem.Status.STARTED == workItem.getStatus() && this.jobIsGone(workItem)) {
            log.info("Work item with ref {} recorded a jobId {}, but that jobId was not found. Record will be reclaimed.", (Object)workItem.getRefId(), (Object)workItem.getJobId());
            return true;
        }
        return false;
    }

    private boolean jobIsGone(WorkItem workItem) {
        JobDetails details = this.schedulerService.getJobDetails(JobId.of((String)workItem.getJobId()));
        if (details != null) {
            return false;
        }
        int retryCount = this.workItemQueue.incrementAndGetFailureCount(workItem.getRefId());
        log.info("Work item with ref {} recorded a jobId {}, but that jobId was not found. Updated retry count to {}.", new Object[]{workItem.getRefId(), workItem.getJobId(), retryCount});
        if (retryCount < 3) {
            return false;
        }
        this.workItemQueue.resetFailureCount(workItem.getRefId());
        return true;
    }

    @VisibleForTesting
    int getBatchSize(int concurrencyLevel) {
        if (this.migrationDarkFeaturesManager.newExportEnabled()) {
            return concurrencyLevel;
        }
        return Math.min(5, concurrencyLevel);
    }
}

