package it.unimi.di.law.warc.processors;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.martiansoftware.jsap.FlaggedOption;
import com.martiansoftware.jsap.JSAP;
import com.martiansoftware.jsap.JSAPResult;
import com.martiansoftware.jsap.Parameter;
import com.martiansoftware.jsap.SimpleJSAP;
import com.martiansoftware.jsap.Switch;
import com.martiansoftware.jsap.UnflaggedOption;
import it.unimi.di.law.bubing.store.UnbufferedFileStore;
import it.unimi.di.law.warc.filters.Filter;
import it.unimi.di.law.warc.filters.parser.FilterParser;
import it.unimi.di.law.warc.io.CompressedWarcCachingReader;
import it.unimi.di.law.warc.io.WarcFormatException;
import it.unimi.di.law.warc.io.WarcReader;
import it.unimi.di.law.warc.records.WarcRecord;
import it.unimi.di.law.warc.util.ReorderingBlockingQueue;
import it.unimi.dsi.fastutil.io.FastBufferedInputStream;
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectListIterator;
import it.unimi.dsi.lang.FlyweightPrototype;
import it.unimi.dsi.lang.ObjectParser;
import it.unimi.dsi.logging.ProgressLogger;
import java.io.Closeable;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import net.htmlparser.jericho.HTMLElementName;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.http.util.Args;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/bubing-0.9.11.jar:it/unimi/di/law/warc/processors/ParallelFilteredProcessorRunner.class */
public class ParallelFilteredProcessorRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ParallelFilteredProcessorRunner.class);
    private final ObjectArrayList<Step<?>> steps;
    private final InputStream in;
    private final Filter<WarcRecord> filter;
    private ReorderingBlockingQueue<Result<?>[]> queue;
    protected volatile IOException flushingThreadException;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/bubing-0.9.11.jar:it/unimi/di/law/warc/processors/ParallelFilteredProcessorRunner$FlushingThread.class */
    public final class FlushingThread implements Callable<Void> {
        private FlushingThread() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            ProgressLogger progressLogger = new ProgressLogger(ParallelFilteredProcessorRunner.LOGGER, 1L, TimeUnit.MINUTES, "records");
            progressLogger.itemsName = "records";
            progressLogger.displayFreeMemory = true;
            progressLogger.displayLocalSpeed = true;
            progressLogger.start("Scanning...");
            while (true) {
                try {
                    Result<?>[] resultArr = (Result[]) ParallelFilteredProcessorRunner.this.queue.take();
                    if (resultArr == Result.END_OF_RESULTS) {
                        progressLogger.done();
                        return null;
                    }
                    for (Result<?> result : resultArr) {
                        if (result != null) {
                            result.write();
                        }
                    }
                    progressLogger.lightUpdate();
                } catch (Exception e) {
                    ParallelFilteredProcessorRunner.LOGGER.error("Exception in flushing thread", (Throwable) e);
                    throw e;
                }
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/bubing-0.9.11.jar:it/unimi/di/law/warc/processors/ParallelFilteredProcessorRunner$Processor.class */
    public interface Processor<T> extends Closeable, FlyweightPrototype<Processor<T>> {
        T process(WarcRecord warcRecord, long j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/bubing-0.9.11.jar:it/unimi/di/law/warc/processors/ParallelFilteredProcessorRunner$Result.class */
    public static final class Result<T> {
        public static final Result<?>[] END_OF_RESULTS = new Result[0];
        private final T result;
        private final Writer<? super T> writer;
        private PrintStream out;
        private final long storePosition;

        private Result(T t, Writer<? super T> writer, long j, PrintStream printStream) {
            this.result = t;
            this.writer = writer;
            this.storePosition = j;
            this.out = printStream;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void write() throws IOException {
            this.writer.write(this.result, this.storePosition, this.out);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/bubing-0.9.11.jar:it/unimi/di/law/warc/processors/ParallelFilteredProcessorRunner$Step.class */
    public static class Step<T> implements FlyweightPrototype<Step<T>> {
        private final Processor<T> processor;
        private final Writer<? super T> writer;
        private final PrintStream out;

        private Step(Processor<T> processor, Writer<? super T> writer, PrintStream printStream) {
            this.processor = processor;
            this.writer = writer;
            this.out = printStream;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Result<T> run(WarcReader warcReader, long j) throws Exception {
            Args.notNull(warcReader, "reader");
            T process = this.processor.process(warcReader.read(), j);
            if (process == null) {
                return null;
            }
            return new Result<>(process, this.writer, j, this.out);
        }

        @Override // it.unimi.dsi.lang.FlyweightPrototype
        public Step<T> copy() {
            return new Step<>(this.processor.copy(), this.writer, this.out);
        }
    }

    /* loaded from: input_file:WEB-INF/lib/bubing-0.9.11.jar:it/unimi/di/law/warc/processors/ParallelFilteredProcessorRunner$Writer.class */
    public interface Writer<T> extends Closeable {
        void write(T t, long j, PrintStream printStream) throws IOException;
    }

    public ParallelFilteredProcessorRunner(InputStream inputStream, Filter<WarcRecord> filter) {
        this.steps = new ObjectArrayList<>();
        this.in = inputStream;
        this.filter = filter;
    }

    public ParallelFilteredProcessorRunner(InputStream inputStream) {
        this(inputStream, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ObjectArrayList<Step<?>> copySteps(ObjectArrayList<Step<?>> objectArrayList) {
        ObjectArrayList<Step<?>> objectArrayList2 = new ObjectArrayList<>(objectArrayList.size());
        ObjectListIterator<Step<?>> it2 = objectArrayList.iterator();
        while (it2.hasNext()) {
            objectArrayList2.add(it2.next().copy());
        }
        return objectArrayList2;
    }

    public <T> ParallelFilteredProcessorRunner add(Processor<T> processor, Writer<? super T> writer, PrintStream printStream) {
        this.steps.add(new Step<>(processor, writer, printStream));
        return this;
    }

    public void runSequentially() throws WarcFormatException, Exception {
        CompressedWarcCachingReader compressedWarcCachingReader = new CompressedWarcCachingReader(this.in);
        ProgressLogger progressLogger = new ProgressLogger(LOGGER, 1L, TimeUnit.MINUTES, "records");
        progressLogger.itemsName = "records";
        progressLogger.displayFreeMemory = true;
        progressLogger.displayLocalSpeed = true;
        progressLogger.start("Scanning...");
        long j = 0;
        while (true) {
            long j2 = j;
            WarcReader cache2 = compressedWarcCachingReader.cache();
            if (cache2 == null) {
                break;
            }
            WarcRecord read = cache2.read();
            if (this.filter == null || (this.filter != null && this.filter.apply(read))) {
                ObjectListIterator<Step<?>> it2 = this.steps.iterator();
                while (it2.hasNext()) {
                    Result run = it2.next().run(cache2, j2);
                    if (run != null) {
                        run.write();
                    }
                }
            }
            progressLogger.lightUpdate();
            j = j2 + 1;
        }
        progressLogger.done();
        ObjectListIterator<Step<?>> it3 = this.steps.iterator();
        while (it3.hasNext()) {
            Step<?> next = it3.next();
            ((Step) next).processor.close();
            ((Step) next).writer.close();
        }
    }

    public void run() throws WarcFormatException, IOException, InterruptedException {
        run(Runtime.getRuntime().availableProcessors());
    }

    public void run(int i) throws WarcFormatException, IOException, InterruptedException {
        this.queue = new ReorderingBlockingQueue<>(2 * i);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setPriority(10).setNameFormat("FlushingThread").build());
        Future submit = newSingleThreadExecutor.submit(new FlushingThread());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i, new ThreadFactoryBuilder().setNameFormat("ProcessingThread-%d").build());
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(newFixedThreadPool);
        final MutableLong mutableLong = new MutableLong();
        final MutableBoolean mutableBoolean = new MutableBoolean();
        final Result[] resultArr = new Result[this.steps.size()];
        for (int length = resultArr.length - 1; length >= 0; length--) {
            resultArr[length] = null;
        }
        int i2 = i;
        while (true) {
            int i3 = i2;
            i2--;
            if (i3 == 0) {
                break;
            } else {
                executorCompletionService.submit(new Callable<Void>() { // from class: it.unimi.di.law.warc.processors.ParallelFilteredProcessorRunner.1
                    private final ObjectArrayList<Step<?>> stepsCopy;
                    private final CompressedWarcCachingReader reader;
                    private final Filter<WarcRecord> filterCopy;

                    {
                        this.stepsCopy = ParallelFilteredProcessorRunner.copySteps(ParallelFilteredProcessorRunner.this.steps);
                        this.reader = new CompressedWarcCachingReader(ParallelFilteredProcessorRunner.this.in);
                        this.filterCopy = ParallelFilteredProcessorRunner.this.filter == null ? null : (Filter) ParallelFilteredProcessorRunner.this.filter.copy();
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        long longValue;
                        Result[] resultArr2;
                        while (true) {
                            WarcReader warcReader = null;
                            synchronized (mutableLong) {
                                if (mutableBoolean.booleanValue()) {
                                    return null;
                                }
                                try {
                                    warcReader = this.reader.cache();
                                } catch (Exception e) {
                                    ParallelFilteredProcessorRunner.LOGGER.error("Exception while reading store", (Throwable) e);
                                }
                                if (warcReader == null) {
                                    mutableBoolean.setValue(true);
                                    return null;
                                }
                                longValue = mutableLong.longValue();
                                mutableLong.increment();
                            }
                            boolean z = true;
                            if (this.filterCopy != null) {
                                if (!this.filterCopy.apply(warcReader.read())) {
                                    z = false;
                                }
                            }
                            if (z) {
                                resultArr2 = new Result[this.stepsCopy.size()];
                                int i4 = 0;
                                ObjectListIterator<Step<?>> it2 = this.stepsCopy.iterator();
                                while (it2.hasNext()) {
                                    int i5 = i4;
                                    i4++;
                                    resultArr2[i5] = it2.next().run(warcReader, longValue);
                                }
                            } else {
                                resultArr2 = resultArr;
                            }
                            ParallelFilteredProcessorRunner.this.queue.put(resultArr2, longValue);
                        }
                    }
                });
            }
        }
        int i4 = i;
        while (true) {
            int i5 = i4;
            i4--;
            if (i5 == 0) {
                break;
            }
            try {
                executorCompletionService.take().get();
            } catch (Exception e) {
                LOGGER.error("Unexpected exception in parallel thread", e.getCause());
                e.getCause();
            }
        }
        newFixedThreadPool.shutdown();
        try {
            try {
                this.queue.put(Result.END_OF_RESULTS, mutableLong.longValue());
                submit.get();
                newSingleThreadExecutor.shutdown();
                ObjectListIterator<Step<?>> it2 = this.steps.iterator();
                while (it2.hasNext()) {
                    Step<?> next = it2.next();
                    ((Step) next).processor.close();
                    ((Step) next).writer.close();
                }
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if (!(cause instanceof RuntimeException)) {
                    throw new RuntimeException(cause.getMessage(), cause);
                }
            }
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdown();
            throw th;
        }
    }

    public static void main(String[] strArr) throws Exception {
        SimpleJSAP simpleJSAP = new SimpleJSAP(ParallelFilteredProcessorRunner.class.getName(), "Processes a store.", new Parameter[]{new FlaggedOption("filter", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, false, 'f', "filter", "A WarcRecord filter that recods must pass in order to be processed."), new FlaggedOption("processor", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, true, 'p', "processor", "A processor to be applied to data.").setAllowMultipleDeclarations(true), new FlaggedOption("writer", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, true, 'w', "writer", "A writer to be applied to the results.").setAllowMultipleDeclarations(true), new FlaggedOption(HTMLElementName.OUTPUT, JSAP.STRING_PARSER, JSAP.NO_DEFAULT, false, 'o', HTMLElementName.OUTPUT, "The output filename  (- for stdout).").setAllowMultipleDeclarations(true), new FlaggedOption("threads", JSAP.INTSIZE_PARSER, Integer.toString(Runtime.getRuntime().availableProcessors()), false, 'T', "threads", "The number of threads to be used."), new Switch("sequential", 'S', "sequential"), new UnflaggedOption(UnbufferedFileStore.STORE_NAME, JSAP.STRING_PARSER, false, "The name of the store (if omitted, stdin).")});
        JSAPResult parse = simpleJSAP.parse(strArr);
        if (simpleJSAP.messagePrinted()) {
            return;
        }
        String string = parse.getString("filter");
        ParallelFilteredProcessorRunner parallelFilteredProcessorRunner = new ParallelFilteredProcessorRunner(parse.userSpecified(UnbufferedFileStore.STORE_NAME) ? new FastBufferedInputStream(new FileInputStream(parse.getString(UnbufferedFileStore.STORE_NAME))) : System.in, string != null ? new FilterParser(WarcRecord.class).parse(string) : null);
        String[] stringArray = parse.getStringArray("processor");
        String[] stringArray2 = parse.getStringArray("writer");
        String[] stringArray3 = parse.getStringArray(HTMLElementName.OUTPUT);
        if (stringArray.length != stringArray2.length) {
            throw new IllegalArgumentException("You must specify the same number or processors and writers");
        }
        if (stringArray3.length != stringArray2.length) {
            throw new IllegalArgumentException("You must specify the same number or output specifications and writers");
        }
        String[] strArr2 = {ParallelFilteredProcessorRunner.class.getPackage().getName()};
        PrintStream[] printStreamArr = new PrintStream[stringArray.length];
        for (int i = 0; i < stringArray.length; i++) {
            printStreamArr[i] = "-".equals(stringArray3[i]) ? System.out : new PrintStream((OutputStream) new FastBufferedOutputStream(new FileOutputStream(stringArray3[i])), false, "UTF-8");
            parallelFilteredProcessorRunner.add((Processor) ObjectParser.fromSpec(stringArray[i], Processor.class, strArr2, new String[]{"getInstance"}), (Writer) ObjectParser.fromSpec(stringArray2[i], Writer.class, strArr2, new String[]{"getInstance"}), printStreamArr[i]);
        }
        if (parse.userSpecified("sequential")) {
            parallelFilteredProcessorRunner.runSequentially();
        } else {
            parallelFilteredProcessorRunner.run(parse.getInt("threads"));
        }
        for (int i2 = 0; i2 < stringArray.length; i2++) {
            printStreamArr[i2].close();
        }
    }
}
