package fi.laji.datawarehouse.etl.models;

import com.google.common.collect.Lists;
import fi.laji.datawarehouse.dao.DAO;
import fi.laji.datawarehouse.dao.oracle.KastikkaDataSourceDefinition;
import fi.laji.datawarehouse.etl.models.ThreadStatuses;
import fi.luomus.commons.config.Config;
import fi.luomus.commons.containers.rdf.Context;
import fi.luomus.commons.containers.rdf.InternalModelToJenaModelConverter;
import fi.luomus.commons.containers.rdf.JenaUtils;
import fi.luomus.commons.containers.rdf.Model;
import fi.luomus.commons.containers.rdf.ObjectLiteral;
import fi.luomus.commons.containers.rdf.ObjectResource;
import fi.luomus.commons.containers.rdf.Predicate;
import fi.luomus.commons.containers.rdf.Qname;
import fi.luomus.commons.containers.rdf.Statement;
import fi.luomus.commons.db.connectivity.SimpleTransactionConnection;
import fi.luomus.commons.db.connectivity.TransactionConnection;
import fi.luomus.commons.reporting.ErrorReporter;
import fi.luomus.commons.utils.Utils;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.jena.riot.RDFFormat;
import org.apache.tomcat.jdbc.pool.DataSource;

/* loaded from: input_file:fi/laji/datawarehouse/etl/models/KastikkaPullReader.class */
public class KastikkaPullReader extends ReaderThread {
    private static final int PAUSE = 3600000;
    private final DAO dao;
    private final ErrorReporter errorReporter;
    private final DataSource dataSource;
    private static final int ROWS_PER_FETCH = 10;
    private static final String FETCH_ROWS_SQL = " SELECT transactionid, documenturi  FROM transactions  WHERE inerror = 0  ORDER BY transactionid  FETCH FIRST 10 ROWS ONLY  ";
    private static final String FETCH_EROROREUS_ROW_SQL = " SELECT transactionid, documenturi  FROM transactions  WHERE inerror = 1  ORDER BY transactionid  FETCH FIRST 1 ROWS ONLY ";
    private static final String GET_MODELS_SQL_PRE = " SELECT  subjectname, predicatename, objectname, resourceliteral, langcodefk, contextname, statementid  FROM    rdf_statementview                                                     WHERE   subjectname IN ( ";
    private static final String GET_MODELS_SQL_POST = " ) ORDER BY subjectname ";
    private static final String GET_CHILDREN_SQL_PRE = " SELECT DISTINCT subjectname               FROM  rdf_statementview                   WHERE objectname IN ( ";
    private static final Predicate MZ_HAS_PART_PREDICATE = new Predicate("MZ.hasPart");
    private static final Predicate MZ_IS_PART_OF_PREDICATE = new Predicate("MZ.isPartOf");
    public static final Qname SOURCE = new Qname("KE.167");
    private static final String GET_CHILDREN_SQL_POST = " ) AND predicatename = '" + MZ_IS_PART_OF_PREDICATE + "' ";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fi/laji/datawarehouse/etl/models/KastikkaPullReader$TransactionEntry.class */
    public static class TransactionEntry {
        private final int transactionId;
        private final Qname documentQname;

        public TransactionEntry(int i, Qname qname) {
            this.transactionId = i;
            this.documentQname = qname;
        }
    }

    public KastikkaPullReader(DAO dao, Config config, ErrorReporter errorReporter, ThreadHandler threadHandler, ThreadStatuses.ThreadStatusReporter threadStatusReporter) {
        super(SOURCE, threadHandler, threadStatusReporter);
        this.dao = dao;
        this.errorReporter = errorReporter;
        this.dataSource = KastikkaDataSourceDefinition.initDataSource(config.connectionDescription("KastikkaPull"));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.dataSource != null) {
            this.dataSource.close();
        }
    }

    @Override // fi.laji.datawarehouse.etl.models.ReaderThread
    public long read() {
        TransactionConnection transactionConnection = null;
        try {
            try {
                transactionConnection = getCon();
                long tryToRead = tryToRead(transactionConnection);
                if (transactionConnection != null) {
                    transactionConnection.release();
                }
                return tryToRead;
            } catch (Exception e) {
                reportStatus("Handling exceptions");
                this.dao.logError(this.source, getClass(), e);
                this.errorReporter.report("Kastikka pull", e);
                if (transactionConnection == null) {
                    return 3600000L;
                }
                transactionConnection.release();
                return 3600000L;
            }
        } catch (Throwable th) {
            if (transactionConnection != null) {
                transactionConnection.release();
            }
            throw th;
        }
    }

    private SimpleTransactionConnection getCon() throws SQLException {
        return new SimpleTransactionConnection(this.dataSource.getConnection());
    }

    private long tryToRead(TransactionConnection transactionConnection) throws Exception {
        transactionConnection.startTransaction();
        reportStatus("Getting next entries");
        List<TransactionEntry> nextEntries = getNextEntries(transactionConnection);
        if (nextEntries.isEmpty()) {
            return 3600000L;
        }
        try {
            reportStatus("Getting document models for " + nextEntries.size() + " transactions");
            Collection<Model> documentModels = getDocumentModels(nextEntries, transactionConnection);
            if (documentModels.isEmpty()) {
                return 3600000L;
            }
            reportStatus("Generating rdf+xml");
            String generateRdf = generateRdf(documentModels);
            reportStatus("Storing to in pipe");
            this.dao.storeToInPipe(SOURCE, generateRdf, "application/rdf+xml");
            reportStatus("Deleting transaction entries");
            deleteTransactionEntries(transactionConnection, nextEntries);
            getThreadHandler().reportInPipeJob(this.source);
            reportStatus("Done, will sleep..");
            transactionConnection.commitTransaction();
            return 10L;
        } catch (Throwable th) {
            reportStatus("Handling exceptions");
            setTransactionEntriesErroneous(transactionConnection, nextEntries);
            transactionConnection.commitTransaction();
            throw th;
        }
    }

    private void deleteTransactionEntries(TransactionConnection transactionConnection, List<TransactionEntry> list) throws SQLException {
        executeUpdate(transactionConnection, "DELETE FROM transactions", list);
    }

    private void setTransactionEntriesErroneous(TransactionConnection transactionConnection, List<TransactionEntry> list) throws SQLException {
        executeUpdate(transactionConnection, "UPDATE transactions SET inerror = inerror + 1", list);
    }

    private void executeUpdate(TransactionConnection transactionConnection, String str, List<TransactionEntry> list) throws SQLException {
        PreparedStatement preparedStatement = null;
        try {
            preparedStatement = transactionConnection.prepareStatement(generateUpdateStatement(str, list));
            int i = 1;
            Iterator<TransactionEntry> it = list.iterator();
            while (it.hasNext()) {
                int i2 = i;
                i++;
                preparedStatement.setInt(i2, it.next().transactionId);
            }
            preparedStatement.executeUpdate();
            Utils.close(preparedStatement);
        } catch (Throwable th) {
            Utils.close(preparedStatement);
            throw th;
        }
    }

    private String generateUpdateStatement(String str, List<TransactionEntry> list) {
        StringBuilder sb = new StringBuilder(String.valueOf(str) + " WHERE transactionid IN (");
        Iterator<TransactionEntry> it = list.iterator();
        while (it.hasNext()) {
            it.next();
            sb.append("?");
            if (it.hasNext()) {
                sb.append(",");
            }
        }
        sb.append(")");
        return sb.toString();
    }

    private List<TransactionEntry> getNextEntries(TransactionConnection transactionConnection) throws SQLException {
        return getNextEntries(transactionConnection, false);
    }

    private List<TransactionEntry> getNextEntries(TransactionConnection transactionConnection, boolean z) throws SQLException {
        ArrayList arrayList = new ArrayList();
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            preparedStatement = !z ? transactionConnection.prepareStatement(FETCH_ROWS_SQL) : transactionConnection.prepareStatement(FETCH_EROROREUS_ROW_SQL);
            resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                arrayList.add(new TransactionEntry(resultSet.getInt(1), new Qname(resultSet.getString(2))));
            }
            if (!arrayList.isEmpty() || z) {
                Utils.close(preparedStatement, resultSet);
                return arrayList;
            }
            List<TransactionEntry> nextEntries = getNextEntries(transactionConnection, true);
            Utils.close(preparedStatement, resultSet);
            return nextEntries;
        } catch (Throwable th) {
            Utils.close(preparedStatement, resultSet);
            throw th;
        }
    }

    private Collection<Model> getDocumentModels(List<TransactionEntry> list, TransactionConnection transactionConnection) throws SQLException {
        if (list.isEmpty()) {
            return Collections.emptyList();
        }
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        Iterator<TransactionEntry> it = list.iterator();
        while (it.hasNext()) {
            String qname = it.next().documentQname.toString();
            if (!arrayList.contains(qname)) {
                arrayList.add(qname);
            }
        }
        addDocumentTreeModels(hashMap, arrayList, transactionConnection);
        return hashMap.values();
    }

    private void addDocumentTreeModels(Map<String, Model> map, List<String> list, TransactionConnection transactionConnection) throws SQLException {
        List<List<String>> partition = Lists.partition(list, 500);
        ArrayList arrayList = new ArrayList();
        for (List<String> list2 : partition) {
            if (!list2.isEmpty()) {
                reportStatus("Getting document model for " + list2.size() + " ids " + list2.get(0) + " ... ");
                StringBuilder sb = new StringBuilder(GET_MODELS_SQL_PRE);
                inClause(sb, list2);
                sb.append(GET_MODELS_SQL_POST);
                ResultSet resultSet = null;
                PreparedStatement preparedStatement = null;
                try {
                    preparedStatement = transactionConnection.prepareStatement(sb.toString());
                    resultSet = executeSelect(list2, preparedStatement);
                    arrayList.addAll(addModels(map, resultSet));
                    Utils.close(preparedStatement, resultSet);
                } catch (Throwable th) {
                    Utils.close(preparedStatement, resultSet);
                    throw th;
                }
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        List<String> children = getChildren(arrayList, transactionConnection);
        if (children.isEmpty()) {
            return;
        }
        addDocumentTreeModels(map, children, transactionConnection);
        addModelLinks(map, children);
    }

    private void addModelLinks(Map<String, Model> map, List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            addModelLink(map, it.next());
        }
    }

    private void addModelLink(Map<String, Model> map, String str) {
        Model model = map.get(str);
        String str2 = null;
        long j = -1;
        Iterator it = model.getStatements(MZ_IS_PART_OF_PREDICATE.getQname()).iterator();
        if (it.hasNext()) {
            Statement statement = (Statement) it.next();
            str2 = statement.getObjectResource().getQname();
            j = statement.getId();
        }
        if (str2 == null) {
            return;
        }
        model.removeStatement(j);
        map.get(str2).addStatement(new Statement(MZ_HAS_PART_PREDICATE, new ObjectResource(str)));
    }

    private List<String> addModels(Map<String, Model> map, ResultSet resultSet) throws SQLException {
        ArrayList arrayList = new ArrayList();
        Object obj = "";
        Model model = null;
        while (resultSet.next()) {
            String string = resultSet.getString(1);
            if (!string.equals(obj)) {
                model = new Model(new Qname(string));
                arrayList.add(string);
                obj = string;
                map.put(string, model);
            }
            toModel(resultSet, model);
        }
        return arrayList;
    }

    private List<String> getChildren(List<String> list, TransactionConnection transactionConnection) throws SQLException {
        ArrayList arrayList = new ArrayList();
        for (List<String> list2 : Lists.partition(list, 500)) {
            if (!list2.isEmpty()) {
                reportStatus("Getting children of " + list2.size() + " ids " + list2.get(0) + " ... ");
                StringBuilder sb = new StringBuilder(GET_CHILDREN_SQL_PRE);
                inClause(sb, list2);
                sb.append(GET_CHILDREN_SQL_POST);
                ResultSet resultSet = null;
                PreparedStatement preparedStatement = null;
                try {
                    preparedStatement = transactionConnection.prepareStatement(sb.toString());
                    resultSet = executeSelect(list2, preparedStatement);
                    while (resultSet.next()) {
                        arrayList.add(resultSet.getString(1));
                    }
                    Utils.close(preparedStatement, resultSet);
                } catch (Throwable th) {
                    Utils.close(preparedStatement, resultSet);
                    throw th;
                }
            }
        }
        return arrayList;
    }

    private void inClause(StringBuilder sb, List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            it.next();
            sb.append("?");
            if (it.hasNext()) {
                sb.append(",");
            }
        }
    }

    private ResultSet executeSelect(List<String> list, PreparedStatement preparedStatement) throws SQLException {
        int i = 1;
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            preparedStatement.setString(i2, it.next());
        }
        return preparedStatement.executeQuery();
    }

    public void toModel(ResultSet resultSet, Model model) throws SQLException {
        String string = resultSet.getString(2);
        String string2 = resultSet.getString(3);
        String string3 = resultSet.getString(4);
        String string4 = resultSet.getString(5);
        String string5 = resultSet.getString(6);
        long j = resultSet.getLong(7);
        Context context = string5 == null ? null : new Context(string5);
        if (string3 == null && string2 == null) {
            return;
        }
        Statement statement = string3 != null ? new Statement(new Predicate(string), new ObjectLiteral(clean(string3), string4), context) : new Statement(new Predicate(string), new ObjectResource(string2), context);
        statement.setId(j);
        model.addStatement(statement);
    }

    private String clean(String str) {
        String str2 = "";
        for (char c : str.toCharArray()) {
            str2 = invalid(c) ? String.valueOf(str2) + " " : String.valueOf(str2) + c;
        }
        return str2.trim();
    }

    private boolean invalid(char c) {
        return Character.getType(c) == 15;
    }

    private String generateRdf(Collection<Model> collection) {
        return JenaUtils.getSerialized(new InternalModelToJenaModelConverter(collection).getJenaModel(), RDFFormat.RDFXML_ABBREV);
    }
}
