package fi.laji.datawarehouse.etl.models;

import fi.laji.datawarehouse.dao.DAO;
import fi.laji.datawarehouse.etl.models.ThreadStatuses;
import fi.laji.datawarehouse.etl.models.containers.InPipeData;
import fi.laji.datawarehouse.etl.models.containers.OriginalIds;
import fi.laji.datawarehouse.etl.models.containers.OutPipeData;
import fi.laji.datawarehouse.etl.models.containers.Source;
import fi.laji.datawarehouse.etl.models.dw.Annotation;
import fi.laji.datawarehouse.etl.models.dw.Document;
import fi.laji.datawarehouse.etl.models.dw.DwRoot;
import fi.laji.datawarehouse.etl.models.exceptions.CriticalParseFailure;
import fi.laji.datawarehouse.etl.models.exceptions.ETLException;
import fi.laji.datawarehouse.etl.models.exceptions.UnknownHarmonizingFailure;
import fi.laji.datawarehouse.etl.models.harmonizers.Harmonizer;
import fi.laji.datawarehouse.etl.utils.Const;
import fi.laji.datawarehouse.etl.utils.Util;
import fi.luomus.commons.containers.rdf.Qname;
import fi.luomus.commons.utils.LogUtils;
import fi.luomus.commons.utils.Utils;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:fi/laji/datawarehouse/etl/models/InPipeReader.class */
public class InPipeReader extends ReaderThread {
    private final Map<String, Harmonizer> harmonizers;
    private final DAO dao;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fi/laji/datawarehouse/etl/models/InPipeReader$Results.class */
    public static class Results {
        private final List<InPipeData> successful;
        private final List<SaveFailure> failed;
        private final List<SaveFailure> permantentlyFailed;

        private Results() {
            this.successful = new ArrayList();
            this.failed = new ArrayList();
            this.permantentlyFailed = new ArrayList();
        }

        public void successful(InPipeData inPipeData) {
            this.successful.add(inPipeData);
        }

        public void failure(InPipeData inPipeData, String str) {
            this.failed.add(new SaveFailure(inPipeData, str));
        }

        public void permanentFailure(InPipeData inPipeData, String str) {
            this.permantentlyFailed.add(new SaveFailure(inPipeData, str));
        }

        public List<InPipeData> getSuccessful() {
            return this.successful;
        }

        public List<SaveFailure> getFailed() {
            return this.failed;
        }

        public List<SaveFailure> getPermanentlyFailed() {
            return this.permantentlyFailed;
        }

        public boolean hasFailed() {
            return !getFailed().isEmpty();
        }

        /* synthetic */ Results(Results results) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fi/laji/datawarehouse/etl/models/InPipeReader$SaveFailure.class */
    public static class SaveFailure {
        private final InPipeData data;
        private final String errorMessage;

        public SaveFailure(InPipeData inPipeData, String str) {
            this.data = inPipeData;
            this.errorMessage = str;
        }

        public InPipeData getData() {
            return this.data;
        }

        public String getErrorMessage() {
            return this.errorMessage;
        }
    }

    public InPipeReader(Qname qname, DAO dao, ThreadHandler threadHandler, ThreadStatuses.ThreadStatusReporter threadStatusReporter, Map<String, Harmonizer> map) {
        super(qname, threadHandler, threadStatusReporter);
        this.dao = dao;
        this.harmonizers = map;
    }

    private Harmonizer getHarmonizer(String str) {
        return this.harmonizers.get(str);
    }

    @Override // fi.laji.datawarehouse.etl.models.ReaderThread
    protected long read() {
        boolean z = false;
        try {
            reportStatus("Getting unprocessed from in-pipe");
            List<InPipeData> unprocessedNotErroneousInOrderFromInPipe = this.dao.getUnprocessedNotErroneousInOrderFromInPipe(this.source);
            if (unprocessedNotErroneousInOrderFromInPipe.isEmpty()) {
                unprocessedNotErroneousInOrderFromInPipe = this.dao.getFailedAttemptDataFromInPipe(this.source);
                if (unprocessedNotErroneousInOrderFromInPipe.isEmpty()) {
                    return 60000L;
                }
                z = true;
            }
            try {
                validateSource(unprocessedNotErroneousInOrderFromInPipe);
                Results process = process(unprocessedNotErroneousInOrderFromInPipe);
                markSuccessfullyProcessed(process);
                markFailed(process);
                markPermanentlyFailed(process);
                if (z) {
                    return process.hasFailed() ? 900000L : 10L;
                }
                return 10L;
            } catch (Throwable th) {
                reportStatus("Handling unknown exceptions");
                this.dao.reportAttempted(unprocessedNotErroneousInOrderFromInPipe, LogUtils.buildStackTrace(th));
                return 900000L;
            }
        } catch (Throwable th2) {
            reportStatus("Handling unknown pipe read exceptions");
            this.dao.logError(this.source, OutPipeReader.class, th2);
            return 900000L;
        }
    }

    private void markSuccessfullyProcessed(Results results) {
        reportStatus("Reporting in pipe data successes");
        this.dao.reportProcessed(results.getSuccessful());
    }

    private void markFailed(Results results) {
        for (SaveFailure saveFailure : results.getFailed()) {
            reportStatus("Reporting in pipe data failures");
            this.dao.reportAttempted(Utils.list(new InPipeData[]{saveFailure.getData()}), saveFailure.getErrorMessage());
        }
    }

    private void markPermanentlyFailed(Results results) {
        for (SaveFailure saveFailure : results.getPermanentlyFailed()) {
            reportStatus("Reporting in pipe data permanent failures");
            this.dao.reportPermantentyFailed(Utils.list(new InPipeData[]{saveFailure.getData()}), saveFailure.getErrorMessage());
        }
    }

    private Results process(List<InPipeData> list) throws Exception {
        Results results = new Results(null);
        ArrayList arrayList = new ArrayList();
        for (InPipeData inPipeData : list) {
            try {
                List<DwRoot> harmonizeAndValidate = harmonizeAndValidate(inPipeData);
                setOutPipeDatas(arrayList, inPipeData, harmonizeAndValidate);
                storeAnnotationsNotifyReprocess(inPipeData, harmonizeAndValidate);
                results.successful(inPipeData);
            } catch (CriticalParseFailure | UnknownHarmonizingFailure | IllegalAccessException e) {
                reportStatus("Handling permanent parsing exceptions");
                results.permanentFailure(inPipeData, LogUtils.buildStackTrace(e));
            } catch (Throwable th) {
                reportStatus("Handling unknown parsing exceptions");
                String buildStackTrace = LogUtils.buildStackTrace(th);
                this.dao.logError(this.source, InPipeReader.class, th);
                results.failure(inPipeData, buildStackTrace);
            }
        }
        reportStatus("Saving out-pipe datas (" + arrayList.size() + ")");
        if (storeToOutPipe(arrayList) > 0) {
            getThreadHandler().reportOutPipeJob(this.source);
        }
        return results;
    }

    private void storeAnnotationsNotifyReprocess(InPipeData inPipeData, List<DwRoot> list) {
        reportStatus("Storing annotations");
        List<Annotation> storeAnnotations = storeAnnotations(list, inPipeData.getId());
        if (storeAnnotations.isEmpty()) {
            return;
        }
        reportStatus("Marking annotated documents for reprocessing");
        markForReprocess(storeAnnotations);
    }

    private void setOutPipeDatas(List<OutPipeData> list, InPipeData inPipeData, List<DwRoot> list2) {
        long id = inPipeData.getId();
        for (DwRoot dwRoot : list2) {
            if (!dwRoot.hasOnlyAnnotations()) {
                list.add(new OutPipeData(-1L, id, this.source, dwRoot.getDocumentId(), dwRoot.toJSON().toString()).setAttemptCount(inPipeData.getAttemptCount()));
            }
        }
    }

    private void markForReprocess(List<Annotation> list) {
        HashSet hashSet = new HashSet();
        for (Annotation annotation : list) {
            OutPipeData outPipeData = this.dao.getOutPipeData(annotation.getRootID());
            if (outPipeData != null) {
                getThreadHandler().markDocumentForReprocess(outPipeData);
                this.dao.markReprocessOutPipe(outPipeData.getId());
                hashSet.add(outPipeData.getSource());
            } else {
                this.dao.logError(this.source, InPipeReader.class, new IllegalStateException("Annotated document not found from out-pipe: " + annotation.getRootID().toURI()));
            }
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            getThreadHandler().reportOutPipeJob((Qname) it.next());
        }
    }

    private void validate(List<DwRoot> list) throws CriticalParseFailure, IllegalAccessException, Exception {
        for (DwRoot dwRoot : list) {
            validateSource(dwRoot.getSourceId());
            if (dwRoot.isDeleteRequest()) {
                validateDocumentId(dwRoot.getDocumentId());
            } else if (!dwRoot.hasAnnotations()) {
                validateDocumentId(dwRoot.getDocumentId());
                validate(dwRoot.getPublicDocument());
                validate(dwRoot.getPrivateDocument());
                if (dwRoot.hasSplittedPublicDocuments()) {
                    throw new CriticalParseFailure("Root has splitted documents. This feature is ment only for the warehouse internally.");
                }
            } else {
                if (!dwRoot.hasOnlyAnnotations()) {
                    throw new CriticalParseFailure("Root has annotations and documents");
                }
                for (Annotation annotation : dwRoot.getAnnotations()) {
                    Util.validateIncomingId(annotation.getId(), "annotationId");
                    Util.validateIncomingId(annotation.getRootID(), "rootId");
                    if (annotation.getTargetID() != null) {
                        Util.validateIncomingId(annotation.getTargetID(), "targetId");
                    }
                }
            }
        }
    }

    private void validateSource(List<InPipeData> list) throws IllegalAccessException {
        Iterator<InPipeData> it = list.iterator();
        while (it.hasNext()) {
            validateSource(it.next());
        }
    }

    private void validateSource(InPipeData inPipeData) throws IllegalAccessException {
        Source source = (Source) this.dao.getSources().get(inPipeData.getSource().toURI());
        if (source == null) {
            throw new IllegalAccessException("Unkown source " + inPipeData.getSource());
        }
        if (!source.isWarehouseSource()) {
            throw new IllegalAccessException("Source " + inPipeData.getSource() + " is not allowed to insert data into warehouse!");
        }
    }

    private void validateSource(Qname qname) throws IllegalAccessException {
        if (!this.source.equals(qname)) {
            throw new IllegalAccessException("Given sourceId " + qname + " does not match authentication source: " + this.source);
        }
    }

    private void validate(Document document) throws CriticalParseFailure, IllegalAccessException {
        if (document == null) {
            return;
        }
        validateSource(document.getSourceId());
        document.validateIncomingIds();
    }

    private void validateDocumentId(Qname qname) throws CriticalParseFailure {
        Util.validateIncomingDocumentId(qname);
    }

    private List<DwRoot> harmonizeAndValidate(InPipeData inPipeData) throws IllegalAccessException, Exception {
        reportStatus("Harmonizing " + inPipeData.getId());
        Harmonizer harmonizer = getHarmonizer(inPipeData.getContentType());
        if (harmonizer == null) {
            throw new CriticalParseFailure("Not yet implemented for " + inPipeData.getContentType());
        }
        List<DwRoot> harmonize = harmonizer.harmonize(inPipeData.getData(), inPipeData.getSource());
        if (harmonize.isEmpty()) {
            throw new CriticalParseFailure("No data");
        }
        reportStatus("Validating " + inPipeData.getId());
        validate(harmonize);
        return harmonize;
    }

    private int storeToOutPipe(List<OutPipeData> list) throws IllegalAccessException {
        if (list.isEmpty()) {
            return 0;
        }
        return this.dao.storeToOutPipeAndReturnNumberOfAffectedRows(list);
    }

    private List<Annotation> storeAnnotations(List<DwRoot> list, long j) {
        ArrayList arrayList = new ArrayList();
        for (DwRoot dwRoot : list) {
            if (dwRoot.hasAnnotations()) {
                arrayList.addAll(dwRoot.getAnnotations());
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Annotation annotation = (Annotation) it.next();
            if (looksLikeSplittedDocumentId(annotation.getRootID())) {
                setOriginalIdsToSplitted(annotation);
            }
            if (shouldOverrideExisting(j, this.dao.getExistingAnnotationInPipeId(annotation.getId()))) {
                this.dao.storeAnnotation(annotation, j);
                if (annotation.isDeleted()) {
                    this.dao.removeUnsentNotifications(annotation.getId());
                }
            } else {
                it.remove();
            }
        }
        return arrayList;
    }

    private boolean shouldOverrideExisting(long j, Long l) {
        return l == null || l.longValue() <= j;
    }

    private void setOriginalIdsToSplitted(Annotation annotation) {
        OriginalIds originalIdsOfSplittedDocument = this.dao.getOriginalIdsOfSplittedDocument(annotation.getRootID());
        if (originalIdsOfSplittedDocument == null) {
            return;
        }
        try {
            annotation.setRootID(originalIdsOfSplittedDocument.getDocumentId());
            annotation.setTargetID(originalIdsOfSplittedDocument.getUnitId());
        } catch (CriticalParseFailure e) {
            throw new ETLException("Impossible state", e);
        }
    }

    private boolean looksLikeSplittedDocumentId(Qname qname) {
        return qname.toString().startsWith(Const.SPLITTED_DOCUMENT_ID_QNAME_NAMESPACE_PREFIX);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }
}
