package it.unimi.di.law.bubing.frontier;

import gate.mimir.web.Index;
import it.unimi.di.law.bubing.Agent;
import it.unimi.di.law.bubing.RuntimeConfiguration;
import it.unimi.di.law.bubing.sieve.AbstractSieve;
import it.unimi.di.law.bubing.sieve.ByteArrayListByteSerializerDeserializer;
import it.unimi.di.law.bubing.sieve.ByteSerializerDeserializer;
import it.unimi.di.law.bubing.sieve.IdentitySieve;
import it.unimi.di.law.bubing.sieve.MercatorSieve;
import it.unimi.di.law.bubing.store.Store;
import it.unimi.di.law.bubing.util.BURL;
import it.unimi.di.law.bubing.util.BubingJob;
import it.unimi.di.law.bubing.util.ByteArrayDiskQueue;
import it.unimi.di.law.bubing.util.ConcurrentCountingMap;
import it.unimi.di.law.bubing.util.FastApproximateByteArrayCache;
import it.unimi.di.law.bubing.util.FetchData;
import it.unimi.di.law.bubing.util.LockFreeQueue;
import it.unimi.di.law.bubing.util.MurmurHash3;
import it.unimi.di.law.bubing.util.Util;
import it.unimi.di.law.warc.io.ParallelBufferedWarcWriter;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.io.FastBufferedInputStream;
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectListIterator;
import it.unimi.dsi.jai4j.JobListener;
import it.unimi.dsi.jai4j.NoSuchJobManagerException;
import it.unimi.dsi.stat.SummaryStats;
import it.unimi.dsi.sux4j.mph.AbstractHashFunction;
import it.unimi.dsi.util.BloomFilter;
import it.unimi.dsi.util.Properties;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import net.htmlparser.jericho.Config;
import net.htmlparser.jericho.LoggerProvider;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xbill.DNS.Lookup;
import org.xbill.DNS.TTL;

/* loaded from: input_file:WEB-INF/lib/bubing-0.9.11.jar:it/unimi/di/law/bubing/frontier/Frontier.class */
public class Frontier implements JobListener<BubingJob>, AbstractSieve.NewFlowReceiver<ByteArrayList> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) Frontier.class);
    public static final int READY_URLS_BUFFER_SIZE = 67108864;
    public static final InetAddress[] LOOPBACK;
    private static final String ROBOTS_STORE = "robots.warc.gz";
    public static final long MIN_FLUSH_INTERVAL = 10000;
    public static final long FRONT_INCREASE = 1000;
    public static final AbstractHashFunction<byte[]> BYTE_ARRAY_HASHING_STRATEGY;
    public static final AbstractHashFunction<ByteArrayList> BYTE_ARRAY_LIST_HASHING_STRATEGY;
    protected final Store store;
    protected final Agent agent;
    public final RuntimeConfiguration rc;
    public AbstractSieve<ByteArrayList, Void> sieve;
    public ByteArrayDiskQueue readyURLs;
    public ArrayBlockingQueue<ByteArrayList> quickReceivedURLs;
    public ByteArrayDiskQueue receivedURLs;
    public final ObjectArrayList<ParsingThread> parsingThreads;
    public final ParallelBufferedWarcWriter robotsWarcParallelOutputStream;
    public final AtomicLong pathQueriesInQueues;
    public final AtomicLong weightOfpathQueriesInQueues;
    public final AtomicLong brokenVisitStates;
    public final AtomicLong numberOfReceivedURLs;
    protected volatile long nextFlush;
    public final Workbench workbench;
    public final DelayQueue<VisitState> unknownHosts;
    public final LinkedBlockingQueue<VisitState> newVisitStates;
    public BloomFilter<Void> digests;
    protected final ObjectArrayList<DNSThread> dnsThreads;
    private final ObjectArrayList<FetchingThread> fetchingThreads;
    protected final Distributor distributor;
    public final FastApproximateByteArrayCache urlCache;
    protected final WorkbenchVirtualizer virtualizer;
    public final LockFreeQueue<VisitState> todo;
    public final LockFreeQueue<VisitState> done;
    protected final LockFreeQueue<VisitState> refill;
    public final AtomicLong requiredFrontSize;
    public final AtomicLong fetchingThreadWaits;
    public final AtomicLong fetchingThreadWaitingTimeSum;
    public final LockFreeQueue<FetchData> results;
    private TodoThread todoThread;
    private DoneThread doneThread;
    public final AtomicLong[] archetypesStatus;
    public final SummaryStats outdegree;
    public final SummaryStats externalOutdegree;
    public final SummaryStats contentLength;
    public final AtomicLong contentTypeText;
    public final AtomicLong contentTypeImage;
    public final AtomicLong contentTypeApplication;
    public final AtomicLong contentTypeOthers;
    public final AtomicLong duplicates;
    public final AtomicLong fetchedResources;
    public final AtomicLong fetchedRobots;
    public final AtomicLong transferredBytes;
    public ConcurrentCountingMap schemeAuthority2Count = new ConcurrentCountingMap();
    public final AtomicLongArray speedDist;
    protected double averageSpeed;
    public volatile long workbenchSizeInPathQueries;
    public final RequestConfig defaultRequestConfig;
    public final RequestConfig robotsRequestConfig;
    private ConcurrentCountingMap.LockedMap lockedMap;

    /* loaded from: input_file:WEB-INF/lib/bubing-0.9.11.jar:it/unimi/di/law/bubing/frontier/Frontier$PropertyKeys.class */
    public enum PropertyKeys {
        PATHQUERIESINQUEUES,
        WEIGHTOFPATHQUERIESINQUEUES,
        BROKENVISITSTATES,
        NUMBEROFRECEIVEDURLS,
        REQUIREDFRONTSIZE,
        FETCHINGTHREADWAITS,
        FETCHINGTHREADWAITINGTIMESUM,
        ARCHETYPESOTHERS,
        ARCHETYPES1XX,
        ARCHETYPES2XX,
        ARCHETYPES3XX,
        ARCHETYPES4XX,
        ARCHETYPES5XX,
        DUPLICATES,
        FETCHEDRESOURCES,
        FETCHEDROBOTS,
        TRANSFERREDBYTES,
        AVERAGESPEED,
        CURRENTQUEUE,
        VIRTUALQUEUESIZES,
        VIRTUALQUEUESBIRTHTIME,
        READYURLSSIZE,
        RECEIVEDURLSSIZE,
        DISTRIBUTORWARMUP,
        DISTRIBUTORVISITSTATESONDISK,
        EPOCH,
        CRAWLDURATION,
        VISITSTATESETSIZE,
        WORKBENCHENTRYSETSIZE
    }

    public Frontier(RuntimeConfiguration runtimeConfiguration, Store store, Agent agent) throws IOException, IllegalArgumentException, ConfigurationException, ClassNotFoundException, InterruptedException {
        this.rc = runtimeConfiguration;
        this.workbenchSizeInPathQueries = runtimeConfiguration.workbenchMaxByteSize / 100;
        this.averageSpeed = 1.0d / runtimeConfiguration.schemeAuthorityDelay;
        File file = new File(runtimeConfiguration.storeDir, ROBOTS_STORE);
        LOGGER.info("Opening file " + file + " to write robots.txt");
        this.robotsWarcParallelOutputStream = new ParallelBufferedWarcWriter(new FastBufferedOutputStream(new FileOutputStream(file, !runtimeConfiguration.crawlIsNew)), true);
        this.urlCache = new FastApproximateByteArrayCache(runtimeConfiguration.urlCacheMaxByteSize);
        this.store = store;
        if (runtimeConfiguration.sieveSize == 0) {
            this.sieve = new IdentitySieve(this, new ByteArrayListByteSerializerDeserializer(), ByteSerializerDeserializer.VOID, BYTE_ARRAY_LIST_HASHING_STRATEGY, null);
        } else {
            this.sieve = new MercatorSieve(runtimeConfiguration.crawlIsNew, runtimeConfiguration.sieveDir, runtimeConfiguration.sieveSize, runtimeConfiguration.sieveStoreIOBufferByteSize, runtimeConfiguration.sieveAuxFileIOBufferByteSize, this, new ByteArrayListByteSerializerDeserializer(), ByteSerializerDeserializer.VOID, BYTE_ARRAY_LIST_HASHING_STRATEGY, null);
        }
        this.agent = agent;
        this.workbench = new Workbench();
        this.unknownHosts = new DelayQueue<>();
        this.virtualizer = new WorkbenchVirtualizer(this);
        this.pathQueriesInQueues = new AtomicLong();
        this.weightOfpathQueriesInQueues = new AtomicLong();
        this.brokenVisitStates = new AtomicLong();
        this.fetchedResources = new AtomicLong();
        this.fetchedRobots = new AtomicLong();
        this.transferredBytes = new AtomicLong();
        this.speedDist = new AtomicLongArray(40);
        this.archetypesStatus = new AtomicLong[6];
        for (int i = 0; i < 6; i++) {
            this.archetypesStatus[i] = new AtomicLong();
        }
        this.outdegree = new SummaryStats();
        this.externalOutdegree = new SummaryStats();
        this.contentLength = new SummaryStats();
        this.contentTypeText = new AtomicLong();
        this.contentTypeImage = new AtomicLong();
        this.contentTypeApplication = new AtomicLong();
        this.contentTypeOthers = new AtomicLong();
        this.duplicates = new AtomicLong();
        this.numberOfReceivedURLs = new AtomicLong();
        this.requiredFrontSize = new AtomicLong(1000L);
        this.fetchingThreadWaits = new AtomicLong();
        this.fetchingThreadWaitingTimeSum = new AtomicLong();
        this.defaultRequestConfig = RequestConfig.custom().setSocketTimeout(runtimeConfiguration.socketTimeout).setConnectTimeout(runtimeConfiguration.connectionTimeout).setConnectionRequestTimeout(runtimeConfiguration.connectionTimeout).setCookieSpec(runtimeConfiguration.cookiePolicy).setRedirectsEnabled(false).setProxy(runtimeConfiguration.proxyHost.length() > 0 ? new HttpHost(runtimeConfiguration.proxyHost, runtimeConfiguration.proxyPort) : null).build();
        this.robotsRequestConfig = RequestConfig.custom().setSocketTimeout(runtimeConfiguration.socketTimeout).setConnectTimeout(runtimeConfiguration.connectionTimeout).setConnectionRequestTimeout(runtimeConfiguration.connectionTimeout).setCookieSpec(runtimeConfiguration.cookiePolicy).setRedirectsEnabled(true).setMaxRedirects(5).setProxy(runtimeConfiguration.proxyHost.length() > 0 ? new HttpHost(runtimeConfiguration.proxyHost, runtimeConfiguration.proxyPort) : null).build();
        this.dnsThreads = new ObjectArrayList<>();
        this.fetchingThreads = new ObjectArrayList<>();
        this.parsingThreads = new ObjectArrayList<>();
        this.newVisitStates = new LinkedBlockingQueue<>();
        this.todo = new LockFreeQueue<>();
        this.done = new LockFreeQueue<>();
        this.refill = new LockFreeQueue<>();
        this.results = new LockFreeQueue<>();
        this.distributor = new Distributor(this);
        Config.LoggerProvider = LoggerProvider.SLF4J;
        this.quickReceivedURLs = new ArrayBlockingQueue<>(1024);
        if (runtimeConfiguration.crawlIsNew) {
            this.digests = BloomFilter.create(Math.max(1L, runtimeConfiguration.maxUrls), runtimeConfiguration.bloomFilterPrecision);
            this.readyURLs = ByteArrayDiskQueue.createNew(new File(runtimeConfiguration.frontierDir, Index.READY), 67108864, true);
            this.receivedURLs = ByteArrayDiskQueue.createNew(new File(runtimeConfiguration.frontierDir, "received"), 16384, true);
            this.distributor.statsThread.start(0L);
        } else {
            restore();
        }
        this.distributor.start();
        TodoThread todoThread = new TodoThread(this);
        this.todoThread = todoThread;
        todoThread.start();
        DoneThread doneThread = new DoneThread(this);
        this.doneThread = doneThread;
        doneThread.start();
        Lookup.getDefaultCache(1).setMaxEntries(runtimeConfiguration.dnsCacheMaxSize);
        Lookup.getDefaultCache(1).setMaxCache((int) Math.min(runtimeConfiguration.dnsPositiveTtl, TTL.MAX_VALUE));
        Lookup.getDefaultCache(1).setMaxNCache((int) Math.min(runtimeConfiguration.dnsNegativeTtl, TTL.MAX_VALUE));
        Lookup.getDefaultResolver().setTimeout(60);
    }

    public void dnsThreads(int i) throws IllegalArgumentException {
        if (i <= 0) {
            throw new IllegalArgumentException();
        }
        synchronized (this.dnsThreads) {
            if (i < this.dnsThreads.size()) {
                for (int i2 = i; i2 < this.dnsThreads.size(); i2++) {
                    this.dnsThreads.get(i2).stop = true;
                }
                this.dnsThreads.size(i);
                return;
            }
            int size = i - this.dnsThreads.size();
            while (true) {
                int i3 = size;
                size--;
                if (i3 == 0) {
                    LOGGER.info("Number of DNS Threads set to " + i);
                    return;
                } else {
                    DNSThread dNSThread = new DNSThread(this, this.dnsThreads.size());
                    dNSThread.start();
                    this.dnsThreads.add(dNSThread);
                }
            }
        }
    }

    public void fetchingThreads(int i) throws IllegalArgumentException, NoSuchAlgorithmException, IOException {
        if (i <= 0) {
            throw new IllegalArgumentException();
        }
        synchronized (this.fetchingThreads) {
            if (i < this.fetchingThreads.size()) {
                for (int i2 = i; i2 < this.fetchingThreads.size(); i2++) {
                    this.fetchingThreads.get(i2).stop = true;
                }
                this.fetchingThreads.size(i);
                return;
            }
            int size = i - this.fetchingThreads.size();
            while (true) {
                int i3 = size;
                size--;
                if (i3 == 0) {
                    LOGGER.info("Number of Fetching Threads set to " + i);
                    return;
                } else {
                    FetchingThread fetchingThread = new FetchingThread(this, this.fetchingThreads.size());
                    fetchingThread.start();
                    this.fetchingThreads.add(fetchingThread);
                }
            }
        }
    }

    public void parsingThreads(int i) throws IllegalArgumentException {
        if (i <= 0) {
            throw new IllegalArgumentException();
        }
        synchronized (this.parsingThreads) {
            if (i < this.parsingThreads.size()) {
                for (int i2 = i; i2 < this.parsingThreads.size(); i2++) {
                    this.parsingThreads.get(i2).stop = true;
                }
                this.parsingThreads.size(i);
                return;
            }
            int size = i - this.parsingThreads.size();
            while (true) {
                int i3 = size;
                size--;
                if (i3 == 0) {
                    LOGGER.info("Number of Parsing Threads set to " + i);
                    return;
                } else {
                    ParsingThread parsingThread = new ParsingThread(this, this.store, this.parsingThreads.size());
                    parsingThread.start();
                    this.parsingThreads.add(parsingThread);
                }
            }
        }
    }

    public void enqueue(ByteArrayList byteArrayList) throws IOException, InterruptedException {
        byte[] elements = byteArrayList.elements();
        if (this.schemeAuthority2Count.get(elements, 0, BURL.startOfpathAndQuery(elements)) < this.rc.maxUrlsPerSchemeAuthority && this.urlCache.add(byteArrayList)) {
            BubingJob bubingJob = new BubingJob(byteArrayList);
            if (this.agent.local(bubingJob)) {
                if (this.sieve.enqueue(byteArrayList, null)) {
                    this.nextFlush = System.currentTimeMillis() + 10000;
                    return;
                }
                return;
            }
            try {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Sending out scheme+authority {} with path+query {}", Util.toString(BURL.schemeAndAuthorityAsByteArray(elements)), Util.toString(BURL.pathAndQueryAsByteArray(byteArrayList)));
                }
                this.agent.submit((Agent) bubingJob);
            } catch (NoSuchJobManagerException e) {
                LOGGER.warn("Impossible to submit URL " + BURL.fromNormalizedByteArray(byteArrayList.toByteArray()), (Throwable) e);
            } catch (IllegalStateException e2) {
                LOGGER.warn("Impossible to submit URL " + BURL.fromNormalizedByteArray(byteArrayList.toByteArray()), (Throwable) e2);
            }
        }
    }

    public boolean workbenchIsFull() {
        return this.weightOfpathQueriesInQueues.get() >= this.rc.workbenchMaxByteSize;
    }

    public void enqueueLocal(ByteArrayList byteArrayList) throws IOException, InterruptedException {
        byte[] elements = byteArrayList.elements();
        if (this.schemeAuthority2Count.get(elements, 0, BURL.startOfpathAndQuery(elements)) < this.rc.maxUrlsPerSchemeAuthority && this.urlCache.add(byteArrayList) && this.sieve.enqueue(byteArrayList, null)) {
            this.nextFlush = System.currentTimeMillis() + 10000;
        }
    }

    @Override // it.unimi.dsi.jai4j.JobListener
    public void receive(BubingJob bubingJob) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Receiving job {}", bubingJob.url);
        }
        try {
            this.quickReceivedURLs.put(bubingJob.url);
        } catch (Exception e) {
            LOGGER.error("Error while enqueueing " + bubingJob.url, (Throwable) e);
        }
    }

    public void close() throws IOException, InterruptedException {
        boolean z;
        boolean z2;
        this.todoThread.interrupt();
        this.distributor.join();
        LOGGER.info("Joined distributor");
        this.todoThread.join();
        LOGGER.info("Joined todo thread");
        ObjectListIterator<DNSThread> it2 = this.dnsThreads.iterator();
        while (it2.hasNext()) {
            it2.next().stop = true;
        }
        ObjectListIterator<DNSThread> it3 = this.dnsThreads.iterator();
        while (it3.hasNext()) {
            it3.next().join();
        }
        LOGGER.info("Joined DNS threads");
        ObjectListIterator<FetchingThread> it4 = this.fetchingThreads.iterator();
        while (it4.hasNext()) {
            it4.next().stop = true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        do {
            Thread.sleep(1000L);
            z = false;
            ObjectListIterator<FetchingThread> it5 = this.fetchingThreads.iterator();
            while (it5.hasNext()) {
                z |= it5.next().isAlive();
            }
            if (!z) {
                break;
            }
        } while (System.currentTimeMillis() - currentTimeMillis < this.rc.socketTimeout * 2);
        if (z) {
            ObjectListIterator<FetchingThread> it6 = this.fetchingThreads.iterator();
            while (it6.hasNext()) {
                it6.next().abort();
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        do {
            Thread.sleep(1000L);
            z2 = false;
            ObjectListIterator<FetchingThread> it7 = this.fetchingThreads.iterator();
            while (it7.hasNext()) {
                z2 |= it7.next().isAlive();
            }
            if (!z2) {
                break;
            }
        } while (System.currentTimeMillis() - currentTimeMillis2 < this.rc.socketTimeout * 2);
        if (z2) {
            LOGGER.error("Some fetching threads are still alive");
            ObjectListIterator<FetchingThread> it8 = this.fetchingThreads.iterator();
            while (it8.hasNext()) {
                it8.next().interrupt();
            }
        }
        ObjectListIterator<FetchingThread> it9 = this.fetchingThreads.iterator();
        while (it9.hasNext()) {
            it9.next().join();
        }
        LOGGER.info("Joined fetching threads");
        Thread.sleep(2000L);
        this.doneThread.stop = true;
        this.doneThread.join();
        LOGGER.info("Joined done thread");
        while (true) {
            if (this.results.size() == 0) {
                break;
            }
            boolean z3 = false;
            ObjectListIterator<ParsingThread> it10 = this.parsingThreads.iterator();
            while (it10.hasNext()) {
                z3 |= it10.next().isAlive();
            }
            if (!z3) {
                LOGGER.error("No parsing thread alive: some results might not have been parsed");
                break;
            }
            Thread.sleep(1000L);
        }
        if (this.results.size() == 0) {
            LOGGER.info("All results have been parsed");
        }
        ObjectListIterator<ParsingThread> it11 = this.parsingThreads.iterator();
        while (it11.hasNext()) {
            it11.next().stop = true;
        }
        ObjectListIterator<ParsingThread> it12 = this.parsingThreads.iterator();
        while (it12.hasNext()) {
            it12.next().join();
        }
        this.robotsWarcParallelOutputStream.close();
        this.store.close();
        LOGGER.info("Joined parsing threads and closed stores");
        ObjectListIterator<FetchingThread> it13 = this.fetchingThreads.iterator();
        while (it13.hasNext()) {
            it13.next().close();
        }
        LOGGER.info("Closed fetching threads");
        while (true) {
            VisitState poll = this.todo.poll();
            if (poll == null) {
                break;
            } else {
                this.workbench.release(poll);
            }
        }
        while (true) {
            VisitState poll2 = this.done.poll();
            if (poll2 == null) {
                break;
            }
            if (poll2.nextFetch != Long.MAX_VALUE && this.virtualizer.count(poll2) > 0 && poll2.isEmpty()) {
                this.refill.add(poll2);
            }
            this.workbench.release(poll2);
        }
        while (true) {
            VisitState poll3 = this.refill.poll();
            if (poll3 == null) {
                this.sieve.close();
                this.distributor.statsThread.done();
                return;
            } else {
                if (this.virtualizer.dequeuePathQueries(poll3, poll3.pathQueryLimit()) == 0) {
                    LOGGER.info("No URLs on disk during last refill: " + poll3);
                }
                if (poll3.acquired) {
                    LOGGER.warn("Visit state in the poll queue is acquired: " + poll3);
                }
            }
        }
    }

    @Override // it.unimi.di.law.bubing.sieve.AbstractSieve.NewFlowReceiver
    public void prepareToAppend() throws IOException {
        this.lockedMap = this.schemeAuthority2Count.lock();
    }

    @Override // it.unimi.di.law.bubing.sieve.AbstractSieve.NewFlowReceiver
    public void append(long j, ByteArrayList byteArrayList) throws IOException {
        byte[] elements = byteArrayList.elements();
        int size = byteArrayList.size();
        if (this.lockedMap.get(elements, 0, BURL.startOfpathAndQuery(elements)) < this.rc.maxUrlsPerSchemeAuthority) {
            this.readyURLs.enqueue(elements, 0, size);
        }
    }

    @Override // it.unimi.di.law.bubing.sieve.AbstractSieve.NewFlowReceiver
    public synchronized void finishedAppending() throws IOException {
        this.lockedMap.unlock();
    }

    @Override // it.unimi.di.law.bubing.sieve.AbstractSieve.NewFlowReceiver
    public void noMoreAppend() throws IOException {
    }

    public void updateRequestedFrontSize() {
        long j = this.requiredFrontSize.get();
        if ((this.workbench.approximatedSize() + this.todo.size()) - this.workbench.broken.get() < j || !this.requiredFrontSize.compareAndSet(j, Math.min(j + 1000, this.workbenchSizeInPathQueries / 2))) {
            return;
        }
        LOGGER.info("Required front size: " + this.requiredFrontSize.get());
    }

    public void updateFetchingThreadsWaitingStats(long j) {
        this.fetchingThreadWaits.incrementAndGet();
        this.fetchingThreadWaitingTimeSum.addAndGet(j);
    }

    public void resetFetchingThreadsWaitingStats() {
        this.fetchingThreadWaits.set(0L);
        this.fetchingThreadWaitingTimeSum.set(0L);
    }

    public void snap() throws ConfigurationException, IllegalArgumentException, IOException {
        for (VisitState visitState : this.distributor.schemeAuthority2VisitState.visitStates()) {
            if (visitState != null) {
                visitState.removeRobots();
            }
        }
        LOGGER.info("Final statistics");
        this.distributor.statsThread.emit();
        this.distributor.statsThread.run();
        File file = new File(this.rc.frontierDir, "snap");
        LOGGER.info("Started snapping to " + file);
        if (file.exists()) {
            LOGGER.warn("Already existing snap directory " + file + ": data will be overwritten (this shouldn't happen)");
        } else if (!file.mkdir()) {
            LOGGER.error("Could not create snap directory " + file + ": will not produce snap");
            return;
        }
        LOGGER.info("Snapping scalar data");
        Properties properties = new Properties();
        properties.addProperty((Enum<?>) PropertyKeys.EPOCH, System.currentTimeMillis());
        properties.setHeader("Snap started at " + new Date());
        properties.addProperty((Enum<?>) PropertyKeys.PATHQUERIESINQUEUES, this.pathQueriesInQueues.get());
        properties.addProperty((Enum<?>) PropertyKeys.WEIGHTOFPATHQUERIESINQUEUES, this.weightOfpathQueriesInQueues.get());
        properties.addProperty((Enum<?>) PropertyKeys.BROKENVISITSTATES, this.brokenVisitStates.get());
        properties.addProperty((Enum<?>) PropertyKeys.NUMBEROFRECEIVEDURLS, this.numberOfReceivedURLs.get());
        properties.addProperty((Enum<?>) PropertyKeys.REQUIREDFRONTSIZE, this.requiredFrontSize.get());
        properties.addProperty((Enum<?>) PropertyKeys.FETCHINGTHREADWAITS, this.fetchingThreadWaits.get());
        properties.addProperty((Enum<?>) PropertyKeys.FETCHINGTHREADWAITINGTIMESUM, this.fetchingThreadWaitingTimeSum.get());
        properties.addProperty((Enum<?>) PropertyKeys.ARCHETYPESOTHERS, this.archetypesStatus[0].get());
        properties.addProperty((Enum<?>) PropertyKeys.ARCHETYPES1XX, this.archetypesStatus[1].get());
        properties.addProperty((Enum<?>) PropertyKeys.ARCHETYPES2XX, this.archetypesStatus[2].get());
        properties.addProperty((Enum<?>) PropertyKeys.ARCHETYPES3XX, this.archetypesStatus[3].get());
        properties.addProperty((Enum<?>) PropertyKeys.ARCHETYPES4XX, this.archetypesStatus[4].get());
        properties.addProperty((Enum<?>) PropertyKeys.ARCHETYPES5XX, this.archetypesStatus[5].get());
        properties.addProperty((Enum<?>) PropertyKeys.DUPLICATES, this.duplicates.get());
        properties.addProperty((Enum<?>) PropertyKeys.FETCHEDRESOURCES, this.fetchedResources.get());
        properties.addProperty((Enum<?>) PropertyKeys.FETCHEDROBOTS, this.fetchedRobots.get());
        properties.addProperty((Enum<?>) PropertyKeys.TRANSFERREDBYTES, this.transferredBytes.get());
        properties.addProperty(PropertyKeys.AVERAGESPEED, this.averageSpeed);
        properties.addProperty((Enum<?>) PropertyKeys.CRAWLDURATION, this.distributor.statsThread.requestLogger.millis());
        properties.addProperty((Enum<?>) PropertyKeys.VISITSTATESETSIZE, this.distributor.schemeAuthority2VisitState.size());
        properties.addProperty((Enum<?>) PropertyKeys.WORKBENCHENTRYSETSIZE, this.workbench.numberOfWorkbenchEntries());
        LOGGER.info("Storing virtualizer states");
        this.virtualizer.close();
        LOGGER.info("Freezing byte disk queues");
        properties.addProperty((Enum<?>) PropertyKeys.READYURLSSIZE, this.readyURLs.size64());
        this.readyURLs.freeze();
        properties.addProperty((Enum<?>) PropertyKeys.RECEIVEDURLSSIZE, this.receivedURLs.size64());
        this.receivedURLs.freeze();
        properties.save(new File(file, "frontier.data"));
        LOGGER.info("Storing digests");
        BinIO.storeObject(this.digests, new File(file, "digests"));
        LOGGER.info("Storing counts");
        BinIO.storeObject(this.schemeAuthority2Count, new File(file, "schemeAuthority2Count"));
        LOGGER.info("Storing visit states");
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FastBufferedOutputStream(new FileOutputStream(new File(file, "workbench"))));
        long j = 0;
        for (VisitState visitState2 : this.distributor.schemeAuthority2VisitState.visitStates()) {
            if (visitState2 != null) {
                if (visitState2.acquired) {
                    LOGGER.error("Acquired visit state: " + visitState2);
                }
                j++;
            }
        }
        objectOutputStream.writeLong(j);
        for (VisitState visitState3 : this.distributor.schemeAuthority2VisitState.visitStates()) {
            if (visitState3 != null) {
                objectOutputStream.writeObject(visitState3);
                objectOutputStream.writeBoolean(visitState3.workbenchEntry != null);
                if (visitState3.workbenchEntry != null) {
                    Util.writeByteArray(visitState3.workbenchEntry.ipAddress, objectOutputStream);
                }
            }
        }
        objectOutputStream.close();
    }

    public void restore() throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        File file = new File(this.rc.frontierDir, "snap");
        if (!file.exists() || !file.isDirectory()) {
            LOGGER.error("Trying to restore state from snap directory " + file + ", but it does not exist or is not a directory");
            return;
        }
        LOGGER.info("Restoring data from " + file);
        LOGGER.info("Restoring scalar data");
        Properties properties = new Properties(new File(file, "frontier.data"));
        long j = properties.getLong(PropertyKeys.EPOCH);
        this.transferredBytes.set(properties.getLong(PropertyKeys.TRANSFERREDBYTES));
        this.pathQueriesInQueues.set(properties.getLong(PropertyKeys.PATHQUERIESINQUEUES));
        this.weightOfpathQueriesInQueues.set(properties.getLong(PropertyKeys.WEIGHTOFPATHQUERIESINQUEUES));
        this.brokenVisitStates.set(properties.getLong(PropertyKeys.BROKENVISITSTATES));
        this.numberOfReceivedURLs.set(properties.getLong(PropertyKeys.NUMBEROFRECEIVEDURLS));
        this.requiredFrontSize.set(properties.getLong(PropertyKeys.REQUIREDFRONTSIZE));
        this.fetchingThreadWaits.set(properties.getLong(PropertyKeys.FETCHINGTHREADWAITS));
        this.fetchingThreadWaitingTimeSum.set(properties.getLong(PropertyKeys.FETCHINGTHREADWAITINGTIMESUM));
        this.archetypesStatus[0].set(properties.getLong(PropertyKeys.ARCHETYPESOTHERS));
        this.archetypesStatus[1].set(properties.getLong(PropertyKeys.ARCHETYPES1XX));
        this.archetypesStatus[2].set(properties.getLong(PropertyKeys.ARCHETYPES2XX));
        this.archetypesStatus[3].set(properties.getLong(PropertyKeys.ARCHETYPES3XX));
        this.archetypesStatus[4].set(properties.getLong(PropertyKeys.ARCHETYPES4XX));
        this.archetypesStatus[5].set(properties.getLong(PropertyKeys.ARCHETYPES5XX));
        this.duplicates.set(properties.getLong(PropertyKeys.DUPLICATES));
        this.fetchedResources.set(properties.getLong(PropertyKeys.FETCHEDRESOURCES));
        this.fetchedRobots.set(properties.getLong(PropertyKeys.FETCHEDROBOTS));
        this.transferredBytes.set(properties.getLong(PropertyKeys.TRANSFERREDBYTES));
        this.averageSpeed = properties.getDouble(PropertyKeys.AVERAGESPEED);
        this.distributor.schemeAuthority2VisitState.ensureCapacity(properties.getInt(PropertyKeys.VISITSTATESETSIZE));
        this.workbench.address2WorkbenchEntry.ensureCapacity(properties.getInt(PropertyKeys.WORKBENCHENTRYSETSIZE));
        LOGGER.info("Restoring digests");
        this.digests = (BloomFilter) BinIO.loadObject(new File(file, "digests"));
        LOGGER.info("Restoring counts");
        this.schemeAuthority2Count = (ConcurrentCountingMap) BinIO.loadObject(new File(file, "schemeAuthority2Count"));
        LOGGER.info("Restoring workbench");
        ObjectInputStream objectInputStream = new ObjectInputStream(new FastBufferedInputStream(new FileInputStream(new File(file, "workbench"))));
        long readLong = objectInputStream.readLong();
        long j2 = readLong;
        while (true) {
            try {
                long j3 = j2;
                j2 = j3 - 1;
                if (j3 == 0) {
                    break;
                }
                VisitState visitState = (VisitState) objectInputStream.readObject();
                visitState.frontier = this;
                this.distributor.schemeAuthority2VisitState.add(visitState);
                boolean readBoolean = objectInputStream.readBoolean();
                if (visitState.lastRobotsFetch == Long.MAX_VALUE) {
                    visitState.forciblyEnqueueRobotsFirst();
                }
                if (readBoolean) {
                    visitState.setWorkbenchEntry(this.workbench.getWorkbenchEntry(Util.readByteArray(objectInputStream)));
                } else {
                    this.newVisitStates.add(visitState);
                }
                if (visitState.isEmpty() && this.virtualizer.count(visitState) > 0) {
                    LOGGER.error("Empty visit state, URLs on disk: " + visitState);
                    this.refill.add(visitState);
                }
            } catch (EOFException e) {
                LOGGER.error("Workbench stream too short: " + j2 + " visit states missing out of " + readLong);
            }
        }
        objectInputStream.close();
        this.virtualizer.readMetadata();
        LOGGER.info("Defreezing byte disk queues");
        this.readyURLs = ByteArrayDiskQueue.createFromFile(properties.getLong(PropertyKeys.READYURLSSIZE), new File(this.rc.frontierDir, Index.READY), 67108864, true);
        this.receivedURLs = ByteArrayDiskQueue.createFromFile(properties.getLong(PropertyKeys.RECEIVEDURLSSIZE), new File(this.rc.frontierDir, "received"), 16384, true);
        File file2 = new File(file + "-" + j);
        LOGGER.info("Renaming snap directory " + file + " to " + file2);
        if (!file.renameTo(file2)) {
            LOGGER.error("Could not rename snap directory");
        }
        this.distributor.statsThread.start(properties.getLong(PropertyKeys.CRAWLDURATION));
        LOGGER.info("Starting statistics");
        this.distributor.statsThread.emit();
        this.distributor.statsThread.run();
    }

    public StatsThread getStatsThread() {
        return this.distributor.statsThread;
    }

    public long archetypes() {
        long j = 0;
        for (int i = 0; i < this.archetypesStatus.length; i++) {
            j += this.archetypesStatus[i].get();
        }
        return j;
    }

    static {
        try {
            LOOPBACK = new InetAddress[]{InetAddress.getByAddress(new byte[]{Byte.MAX_VALUE, 0, 0, 1})};
            BYTE_ARRAY_HASHING_STRATEGY = new AbstractHashFunction<byte[]>() { // from class: it.unimi.di.law.bubing.frontier.Frontier.1
                private static final long serialVersionUID = 1;

                @Override // it.unimi.dsi.fastutil.objects.Object2LongFunction
                public long getLong(Object obj) {
                    return MurmurHash3.hash((byte[]) obj);
                }
            };
            BYTE_ARRAY_LIST_HASHING_STRATEGY = new AbstractHashFunction<ByteArrayList>() { // from class: it.unimi.di.law.bubing.frontier.Frontier.2
                private static final long serialVersionUID = 1;

                @Override // it.unimi.dsi.fastutil.objects.Object2LongFunction
                public long getLong(Object obj) {
                    return MurmurHash3.hash((ByteArrayList) obj);
                }
            };
        } catch (UnknownHostException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}
