package org.aktin.request.manager;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.activation.DataSource;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.aktin.Preferences;
import org.aktin.broker.client.BrokerClient;
import org.aktin.broker.client.auth.HttpApiKeyAuth;
import org.aktin.broker.request.RequestStatus;
import org.aktin.broker.request.RetrievedRequest;
import org.aktin.dwh.PreferenceKey;

@Singleton
/* loaded from: input_file:request-manager-0.7.jar:org/aktin/request/manager/ResultUploader.class */
public class ResultUploader implements Consumer<RetrievedRequest> {
    private static final Logger log = Logger.getLogger(ResultUploader.class.getName());
    private BrokerClient client;
    private Executor executor;
    private int uploadBufferSize = 16777216;

    @Inject
    private Preferences prefs;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:request-manager-0.7.jar:org/aktin/request/manager/ResultUploader$Execution.class */
    public class Execution implements Runnable, BrokerClient.OutputWriter {
        private RetrievedRequest request;
        private DataSource result;

        Execution(RetrievedRequest retrievedRequest) {
            this.request = retrievedRequest;
        }

        private void changeStatus(RequestStatus requestStatus, String str) {
            try {
                this.request.changeStatus(null, requestStatus, str);
            } catch (IOException e) {
                ResultUploader.log.log(Level.SEVERE, "Unable to change status for request " + this.request.getRequestId() + " to " + requestStatus, (Throwable) e);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.result = this.request.getResultData();
                ResultUploader.this.client.putRequestResult(this.request.getRequestId(), this.result.getContentType(), this);
                changeStatus(RequestStatus.Submitted, null);
            } catch (Throwable th) {
                ResultUploader.log.log(Level.SEVERE, "Query execution failed for request " + this.request.getRequestId(), th);
                changeStatus(RequestStatus.Failed, th.toString());
            }
        }

        @Override // org.aktin.broker.client.BrokerClient.OutputWriter
        public void write(OutputStream outputStream) throws IOException {
            InputStream inputStream = this.result.getInputStream();
            Throwable th = null;
            try {
                try {
                    byte[] bArr = new byte[ResultUploader.this.uploadBufferSize];
                    for (int read = inputStream.read(bArr); read != -1; read = inputStream.read(bArr)) {
                        outputStream.write(bArr, 0, read);
                    }
                    if (inputStream != null) {
                        if (0 == 0) {
                            inputStream.close();
                            return;
                        }
                        try {
                            inputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (inputStream != null) {
                    if (th != null) {
                        try {
                            inputStream.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        inputStream.close();
                    }
                }
                throw th4;
            }
        }
    }

    @Resource(lookup = "java:comp/DefaultManagedExecutorService")
    public void setExecutor(Executor executor) {
        this.executor = executor;
    }

    @PostConstruct
    private void initializeAggregatorClient() {
        String str = this.prefs.get(PreferenceKey.brokerEndpointURI);
        if (str == null || str.trim().length() == 0) {
            this.client = null;
            log.warning("No broker configured, there will be no aggregator uploads");
        } else {
            this.client = new BrokerClient(URI.create(str));
            this.client.setClientAuthenticator(HttpApiKeyAuth.newBearer(this.prefs.get(PreferenceKey.brokerEndpointKeys)));
        }
    }

    @Override // java.util.function.Consumer
    public void accept(RetrievedRequest retrievedRequest) {
        if (this.client == null) {
            log.warning("Unable to upload data for request " + retrievedRequest.getRequestId() + "- no aggregator specified");
        } else {
            this.executor.execute(new Execution(retrievedRequest));
        }
    }
}
