package fr.in2p3.cc.storage.treqs2.core.dispatcher;

import fr.in2p3.cc.storage.treqs2.core.entity.TreqsDispatchedFiles;
import fr.in2p3.cc.storage.treqs2.core.entity.TreqsFile;
import fr.in2p3.cc.storage.treqs2.core.entity.TreqsStatus;
import fr.in2p3.cc.storage.treqs2.core.messaging.MessagingManager;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import jersey.repackaged.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fr/in2p3/cc/storage/treqs2/core/dispatcher/Dispatcher.class */
public class Dispatcher implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(Dispatcher.class);
    private static final int DISPATCHER_THREADS_POOL_SIZE = 8;
    private static final int CLEANNING_SCHEDULER_RATE = 900;
    private static final int DISPATCHING_TIMEOUT = 3;
    private static final int UNDISPATCHING_TIMEOUT = 3;
    private ExecutorService dispatchedThreadsPool = null;
    private ScheduledExecutorService cleaningThreadPool = null;
    private ConcurrentHashMap<String, TreqsDispatchedFiles> dispatchedFilesMap = null;
    private boolean isShutdown = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:fr/in2p3/cc/storage/treqs2/core/dispatcher/Dispatcher$CleaningTask.class */
    public class CleaningTask implements Runnable {
        CleaningTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Dispatcher.LOGGER.info("Cleaning the map of useless objects");
            Iterator it = Dispatcher.this.dispatchedFilesMap.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                if (((TreqsDispatchedFiles) entry.getValue()).getDispatchedFilesStatus().equals(TreqsStatus.DispatchedFilesStatus.ENDED)) {
                    Dispatcher.LOGGER.info("Removing ENDED TreqsDispatchedFiles object for tape {}", entry.getKey());
                    it.remove();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fr/in2p3/cc/storage/treqs2/core/dispatcher/Dispatcher$DispatchTreqsFile.class */
    public class DispatchTreqsFile implements Callable {
        protected TreqsFile treqsFile;

        public DispatchTreqsFile(TreqsFile treqsFile) {
            this.treqsFile = null;
            this.treqsFile = treqsFile;
        }

        @Override // java.util.concurrent.Callable
        public TreqsFile call() throws Exception {
            Dispatcher.LOGGER.debug("Start dispatching TreqsFile object (filename={}, tapename={})", this.treqsFile.getFilename(), this.treqsFile.getTape().getTapeName());
            String tapeName = this.treqsFile.getTape().getTapeName();
            TreqsDispatchedFiles treqsDispatchedFiles = new TreqsDispatchedFiles(this.treqsFile.getTape());
            TreqsDispatchedFiles treqsDispatchedFiles2 = (TreqsDispatchedFiles) Dispatcher.this.dispatchedFilesMap.putIfAbsent(tapeName, treqsDispatchedFiles);
            if (treqsDispatchedFiles2 != null) {
                if (treqsDispatchedFiles2.getDispatchedFilesStatus() == TreqsStatus.DispatchedFilesStatus.ENDED) {
                    treqsDispatchedFiles2.reset(this.treqsFile.getTape());
                }
                treqsDispatchedFiles2.add(this.treqsFile);
            } else {
                treqsDispatchedFiles.add(this.treqsFile);
            }
            Dispatcher.LOGGER.info("Success dispatching TreqsFile object (filename = {}, tapename = {})", this.treqsFile.getFilename(), this.treqsFile.getTape().getTapeName());
            return this.treqsFile;
        }

        TreqsFile createTreqsFileForJMS(String str) {
            TreqsFile treqsFile = new TreqsFile();
            treqsFile.markAsEnded(TreqsStatus.FileSubStatus.FAILED, str);
            return treqsFile;
        }
    }

    /* loaded from: input_file:fr/in2p3/cc/storage/treqs2/core/dispatcher/Dispatcher$DispatcherHolder.class */
    private static class DispatcherHolder {
        private static final Dispatcher INSTANCE = new Dispatcher();

        private DispatcherHolder() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fr/in2p3/cc/storage/treqs2/core/dispatcher/Dispatcher$UndispatchTreqsFile.class */
    public class UndispatchTreqsFile extends DispatchTreqsFile {
        public UndispatchTreqsFile(TreqsFile treqsFile) {
            super(treqsFile);
        }

        @Override // fr.in2p3.cc.storage.treqs2.core.dispatcher.Dispatcher.DispatchTreqsFile, java.util.concurrent.Callable
        public TreqsFile call() throws Exception {
            Dispatcher.LOGGER.debug("Start UNdispatching TreqsFile object (filename={}, tapename={})", this.treqsFile.getFilename(), this.treqsFile.getTape().getTapeName());
            TreqsDispatchedFiles treqsDispatchedFiles = (TreqsDispatchedFiles) Dispatcher.this.dispatchedFilesMap.get(this.treqsFile.getTape().getTapeName());
            if (treqsDispatchedFiles == null) {
                throw new Exception("TreqsFile object not found");
            }
            if (treqsDispatchedFiles.remove(this.treqsFile)) {
                Dispatcher.LOGGER.info("TreqsFile object (filename={} , tapename={}) has been removed from dispatched files list", this.treqsFile.getFilename(), this.treqsFile.getTape().getTapeName());
                this.treqsFile.markAsEnded(TreqsStatus.FileSubStatus.CANCELLED);
            } else {
                Dispatcher.LOGGER.error("TreqsFile object (filename={} , tapename={}) has NOT been removed from dispatched files list (file already or currently being staged)", this.treqsFile.getFilename(), this.treqsFile.getTape().getTapeName());
            }
            return this.treqsFile;
        }
    }

    public static Dispatcher getInstance() {
        return DispatcherHolder.INSTANCE;
    }

    protected Dispatcher() {
        startup();
    }

    public void startup() {
        LOGGER.info("Starting dispatcher ... ");
        if (this.dispatchedThreadsPool != null) {
            this.dispatchedThreadsPool.shutdownNow();
        }
        this.dispatchedThreadsPool = Executors.newFixedThreadPool(DISPATCHER_THREADS_POOL_SIZE, new ThreadFactoryBuilder().setNameFormat("Dispatcher").build());
        if (this.dispatchedFilesMap != null) {
            this.dispatchedFilesMap.clear();
        }
        if (this.cleaningThreadPool != null) {
            this.cleaningThreadPool.shutdownNow();
        }
        this.cleaningThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Cleaner").build());
        this.cleaningThreadPool.scheduleAtFixedRate(new CleaningTask(), 900L, 900L, TimeUnit.SECONDS);
        if (this.dispatchedFilesMap != null) {
            this.dispatchedFilesMap.clear();
        }
        this.dispatchedFilesMap = new ConcurrentHashMap<>();
        this.isShutdown = false;
    }

    public void shutdown() {
        LOGGER.info("Shutdown dispatcher ... ");
        LOGGER.info("Shutdown dispatcher >  shutdown dispatchedThreadsPool ... ");
        if (this.dispatchedThreadsPool != null) {
            boolean z = false;
            while (((ThreadPoolExecutor) this.dispatchedThreadsPool).getActiveCount() > 0 && !z) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    z = true;
                }
            }
            int i = 0;
            boolean z2 = false;
            while (i < 5 && !z2) {
                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e2) {
                    z2 = true;
                }
                if (((ThreadPoolExecutor) this.dispatchedThreadsPool).getActiveCount() == 0) {
                    i++;
                }
            }
            this.dispatchedThreadsPool.shutdown();
            try {
                try {
                    this.dispatchedThreadsPool.awaitTermination(10L, TimeUnit.SECONDS);
                    this.isShutdown = true;
                } catch (InterruptedException e3) {
                    LOGGER.error("Exception occured with dispatchedThreadsPool, while shutdown (exception was {})", e3);
                    Thread.currentThread().interrupt();
                    this.isShutdown = true;
                }
            } catch (Throwable th) {
                this.isShutdown = true;
                throw th;
            }
        }
        LOGGER.info("Shutdown dispatcher > dispatchedThreadsPool stopped ");
        LOGGER.info("Shutdown dispatcher >  shutdown cleaningThreadPool ... ");
        if (this.cleaningThreadPool != null) {
            this.cleaningThreadPool.shutdownNow();
            try {
                this.cleaningThreadPool.awaitTermination(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e4) {
                LOGGER.error("Exception occured with cleaningThreadPool, while shutdown (exception was {})", e4);
                Thread.currentThread().interrupt();
            }
        }
        LOGGER.info("Shutdown dispatcher > cleaningThreadPool stopped ");
    }

    public ConcurrentHashMap<String, TreqsDispatchedFiles> getDispatchedFilesMap() {
        return this.dispatchedFilesMap;
    }

    public int getDispatchedThreadsPoolActiveCount() {
        return ((ThreadPoolExecutor) this.dispatchedThreadsPool).getActiveCount();
    }

    public boolean isShutdown() {
        return this.isShutdown;
    }

    public void onMessage(Message message) {
        try {
            Queue jMSDestination = message.getJMSDestination();
            if (jMSDestination instanceof Queue) {
                String queueName = jMSDestination.getQueueName();
                ObjectMessage objectMessage = (ObjectMessage) message;
                LOGGER.debug("Received message on queue " + queueName);
                boolean z = -1;
                switch (queueName.hashCode()) {
                    case -1870429366:
                        if (queueName.equals(MessagingManager.MSG_UNDISPATCH_FILE)) {
                            z = true;
                            break;
                        }
                        break;
                    case 1607937475:
                        if (queueName.equals(MessagingManager.MSG_DISPATCH_FILE)) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        dispatchTreqsFile((TreqsFile) objectMessage.getObject());
                        break;
                    case true:
                        unDispatchTreqsFile((TreqsFile) objectMessage.getObject());
                        break;
                    default:
                        throw new UnsupportedOperationException(" Queue " + queueName + " is not supported yet.");
                }
            }
        } catch (JMSException e) {
            LOGGER.error("JMSException while receiving message (exception was {})", e);
        }
    }

    public void dispatchTreqsFile(TreqsFile treqsFile) {
        if (treqsFile == null || treqsFile.getTape() == null || treqsFile.getTape().getTapeName() == null) {
            return;
        }
        final String filename = treqsFile.getFilename();
        final DispatchTreqsFile dispatchTreqsFile = new DispatchTreqsFile(treqsFile);
        final Future submit = this.dispatchedThreadsPool.submit(dispatchTreqsFile);
        this.dispatchedThreadsPool.submit(new Runnable() { // from class: fr.in2p3.cc.storage.treqs2.core.dispatcher.Dispatcher.1
            @Override // java.lang.Runnable
            public void run() {
                TreqsFile treqsFile2 = null;
                try {
                    try {
                        try {
                            treqsFile2 = (TreqsFile) submit.get(3L, TimeUnit.SECONDS);
                            Dispatcher.LOGGER.debug("Sending JMS message after dispatching file {}", treqsFile2.getFilename());
                            Dispatcher.this.sendMessage(treqsFile2);
                        } catch (TimeoutException e) {
                            Dispatcher.LOGGER.error("Abnormal end while dispatching file {}, timeout of {} s reached (exception was {})", new Object[]{filename, 3, e});
                            treqsFile2 = dispatchTreqsFile.createTreqsFileForJMS("dispatching failed due to timeout");
                            Dispatcher.LOGGER.debug("Sending JMS message after dispatching file {}", treqsFile2.getFilename());
                            Dispatcher.this.sendMessage(treqsFile2);
                        }
                    } catch (InterruptedException | ExecutionException e2) {
                        Dispatcher.LOGGER.error("Abnormal end of dispatching file {} (exception was {})", filename, e2);
                        treqsFile2 = dispatchTreqsFile.createTreqsFileForJMS("dispatching failed");
                        Dispatcher.LOGGER.debug("Sending JMS message after dispatching file {}", treqsFile2.getFilename());
                        Dispatcher.this.sendMessage(treqsFile2);
                    }
                } catch (Throwable th) {
                    Dispatcher.LOGGER.debug("Sending JMS message after dispatching file {}", treqsFile2.getFilename());
                    Dispatcher.this.sendMessage(treqsFile2);
                    throw th;
                }
            }
        });
    }

    public void unDispatchTreqsFile(TreqsFile treqsFile) {
        if (treqsFile == null || treqsFile.getTape() == null || treqsFile.getTape().getTapeName() == null) {
            return;
        }
        final String filename = treqsFile.getFilename();
        final UndispatchTreqsFile undispatchTreqsFile = new UndispatchTreqsFile(treqsFile);
        final Future submit = this.dispatchedThreadsPool.submit(undispatchTreqsFile);
        this.dispatchedThreadsPool.submit(new Runnable() { // from class: fr.in2p3.cc.storage.treqs2.core.dispatcher.Dispatcher.2
            @Override // java.lang.Runnable
            public void run() {
                TreqsFile treqsFile2 = null;
                try {
                    try {
                        try {
                            treqsFile2 = (TreqsFile) submit.get(3L, TimeUnit.SECONDS);
                            Dispatcher.LOGGER.debug("Sending JMS message after unDispatching file {}", treqsFile2.getFilename());
                            Dispatcher.this.sendMessage(treqsFile2);
                        } catch (TimeoutException e) {
                            Dispatcher.LOGGER.error("Abnormal end while UNdispatching file {}, timeout of {} s reached, (exception was {})", new Object[]{filename, 3, e});
                            treqsFile2 = undispatchTreqsFile.createTreqsFileForJMS("undispatching (cancel) failed due to timeout");
                            Dispatcher.LOGGER.debug("Sending JMS message after unDispatching file {}", treqsFile2.getFilename());
                            Dispatcher.this.sendMessage(treqsFile2);
                        }
                    } catch (InterruptedException | ExecutionException e2) {
                        Dispatcher.LOGGER.error("Abnormal end of UNdispatching file {} (exception was {})", filename, e2);
                        treqsFile2 = undispatchTreqsFile.createTreqsFileForJMS("undispatching (cancel) failed");
                        Dispatcher.LOGGER.debug("Sending JMS message after unDispatching file {}", treqsFile2.getFilename());
                        Dispatcher.this.sendMessage(treqsFile2);
                    }
                } catch (Throwable th) {
                    Dispatcher.LOGGER.debug("Sending JMS message after unDispatching file {}", treqsFile2.getFilename());
                    Dispatcher.this.sendMessage(treqsFile2);
                    throw th;
                }
            }
        });
    }

    protected void sendMessage(TreqsFile treqsFile) {
        if (treqsFile != null) {
            Session session = null;
            try {
                try {
                    MessageProducer producer = MessagingManager.getInstance().getProducer(MessagingManager.MSG_STATUS_FILE);
                    session = MessagingManager.getInstance().getConnection().createSession(false, 1);
                    ObjectMessage createObjectMessage = session.createObjectMessage(TreqsFile.class);
                    createObjectMessage.setObject(treqsFile);
                    producer.send(createObjectMessage);
                    LOGGER.debug("Sent MSG_STATUS_FILE for " + treqsFile.getFilename());
                    session.close();
                    if (session != null) {
                        try {
                            session.close();
                        } catch (JMSException e) {
                            LOGGER.error("JMSException while closing session, (exception was {})", e);
                        }
                    }
                } catch (Throwable th) {
                    if (session != null) {
                        try {
                            session.close();
                        } catch (JMSException e2) {
                            LOGGER.error("JMSException while closing session, (exception was {})", e2);
                        }
                    }
                    throw th;
                }
            } catch (JMSException e3) {
                LOGGER.error("JMSException while sending file {}, (exception was {})", treqsFile.getFilename(), e3);
                if (session != null) {
                    try {
                        session.close();
                    } catch (JMSException e4) {
                        LOGGER.error("JMSException while closing session, (exception was {})", e4);
                    }
                }
            }
        }
    }
}
