package de.bsvrz.sys.funclib.communicationStreams;

import de.bsvrz.sys.funclib.concurrent.UnboundedQueue;
import de.bsvrz.sys.funclib.dataSerializer.Deserializer;
import de.bsvrz.sys.funclib.dataSerializer.Serializer;
import de.bsvrz.sys.funclib.dataSerializer.SerializingFactory;
import de.bsvrz.sys.funclib.debug.Debug;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.ProtocolException;
import java.nio.channels.ClosedChannelException;

/* loaded from: input_file:de/bsvrz/sys/funclib/communicationStreams/StreamDemultiplexer.class */
public class StreamDemultiplexer {
    private static Debug _debug = Debug.getLogger();
    private final int _numberOfStreams;
    private final int _blockingFactor;
    private final int _ticketBlockingFactor;
    private final StreamDemultiplexerDirector _director;
    private final DemultiplexerStreaminformations[] _arrayOfStreams;
    private final int[] _numberOfReceivedPackets;
    private final int[] _numberOfTakes;
    private int _numberOfPacketsReceived;
    private int _abortedStreamReceivedData;
    private int _numberOfOverchargesReceiver;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/bsvrz/sys/funclib/communicationStreams/StreamDemultiplexer$DemultiplexerStreaminformations.class */
    public static class DemultiplexerStreaminformations {
        private final int _indexOfStream;
        private int _packetIndexToSendNextMaxTicketIndex;
        private int _maxStreamPacketIndex;
        static final /* synthetic */ boolean $assertionsDisabled;
        private int _nextPacketIndex = 1;
        private final UnboundedQueue _bigDataPacketQueue = new UnboundedQueue();
        private final UnboundedQueue _smallDataPacketQueue = new UnboundedQueue();
        private boolean _endStream = false;
        private boolean _streamAborted = false;
        private boolean _streamTerminated = false;
        private boolean _lostConnectionToSender = false;

        public DemultiplexerStreaminformations(int i, int i2, int i3) {
            this._indexOfStream = i;
            this._packetIndexToSendNextMaxTicketIndex = i2;
            this._maxStreamPacketIndex = i3;
        }

        public int getPacketIndexToSendNextMaxTicketIndex() {
            return this._packetIndexToSendNextMaxTicketIndex;
        }

        public void setPacketIndexToSendNextMaxTicketIndex(int i) {
            this._packetIndexToSendNextMaxTicketIndex = i;
        }

        public int getMaxStreamPacketIndex() {
            return this._maxStreamPacketIndex;
        }

        public void setMaxStreamPacketIndex(int i) {
            this._maxStreamPacketIndex = i;
        }

        public int getNextPacketIndex() {
            return this._nextPacketIndex;
        }

        public void setNextPacketIndex(int i) {
            this._nextPacketIndex = i;
        }

        public int getIndexOfStream() {
            return this._indexOfStream;
        }

        public ReferenceDataPacket getReferenceDataPacket() throws InterruptedException {
            return (ReferenceDataPacket) this._bigDataPacketQueue.take();
        }

        public void newDataPacketForStream(ReferenceDataPacket referenceDataPacket) {
            this._bigDataPacketQueue.put(referenceDataPacket);
        }

        public int sizeOfSmallDataPacketQueue() {
            return this._smallDataPacketQueue.size();
        }

        public void putDataSmallDataPacketQueue(byte[] bArr) {
            this._smallDataPacketQueue.put(bArr);
        }

        public byte[] getData() throws InterruptedException {
            if ($assertionsDisabled || this._smallDataPacketQueue.size() > 0) {
                return (byte[]) this._smallDataPacketQueue.take();
            }
            throw new AssertionError("Ein kleines Paket sollte geholt werden, obwohl keines mehr da war.");
        }

        public boolean isEndStream() {
            return this._endStream;
        }

        public boolean isStreamAborted() {
            return this._streamAborted;
        }

        public boolean isStreamTerminated() {
            return this._streamTerminated;
        }

        public boolean isLostConnectionToSender() {
            return this._lostConnectionToSender;
        }

        public void setEndStreamTrue() {
            this._endStream = true;
        }

        public void setStreamAbortedTrue() {
            this._streamAborted = true;
            createUnlockPacket();
        }

        public void setStreamTerminatedTrue() {
            this._streamTerminated = true;
            createUnlockPacket();
        }

        public void setLostConnectionToSenderTrue() {
            this._lostConnectionToSender = true;
            createUnlockPacket();
        }

        public void createUnlockPacket() {
            this._bigDataPacketQueue.put(new ReferenceDataPacket(-999, "Dies ist ein unlockPacket. Es dient nur dazu, um wartende Threads aus der dataQueue eines Streams zu befreien. Der Stream sollte zu diesem Zeitpunkt bereits abgebrochen sein".getBytes()));
        }

        public int sizeOfDataQueue() {
            return this._bigDataPacketQueue.size();
        }

        static {
            $assertionsDisabled = !StreamDemultiplexer.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/bsvrz/sys/funclib/communicationStreams/StreamDemultiplexer$ReferenceDataPacket.class */
    public static class ReferenceDataPacket {
        private final byte[] _data;
        private final int _streamPacketIndex;

        ReferenceDataPacket(int i, byte[] bArr) {
            this._streamPacketIndex = i;
            this._data = bArr;
        }

        public byte[] getData() {
            return this._data;
        }

        public int getStreamPacketIndex() {
            return this._streamPacketIndex;
        }
    }

    public StreamDemultiplexer(int i, int i2, StreamDemultiplexerDirector streamDemultiplexerDirector) {
        this._numberOfStreams = i;
        if (i2 < 1) {
            throw new IllegalArgumentException("Der blockingFactor muß größer gleich 1 sein");
        }
        this._blockingFactor = i2;
        if (this._blockingFactor == 1) {
            this._ticketBlockingFactor = 1;
        } else {
            this._ticketBlockingFactor = this._blockingFactor / 2;
        }
        this._arrayOfStreams = new DemultiplexerStreaminformations[this._numberOfStreams];
        for (int i3 = 0; i3 < this._arrayOfStreams.length; i3++) {
            this._arrayOfStreams[i3] = new DemultiplexerStreaminformations(i3, this._ticketBlockingFactor, this._blockingFactor);
        }
        this._director = streamDemultiplexerDirector;
        this._numberOfReceivedPackets = new int[this._numberOfStreams];
        this._numberOfTakes = new int[this._numberOfStreams];
    }

    public void abort(int i) {
        _debug.fine("Abort Stream: " + i);
        DemultiplexerStreaminformations demultiplexerStreaminformations = this._arrayOfStreams[i];
        synchronized (demultiplexerStreaminformations) {
            if (!demultiplexerStreaminformations.isEndStream() && !demultiplexerStreaminformations.isStreamAborted() && !demultiplexerStreaminformations.isStreamTerminated() && !demultiplexerStreaminformations.isLostConnectionToSender()) {
                demultiplexerStreaminformations.setStreamAbortedTrue();
                try {
                    sendNewTicketIndexToSender(i, -1);
                } catch (IOException e) {
                    e.printStackTrace();
                    _debug.error("Ein Fehler beim serialisieren/deserialisieren: " + e);
                }
            }
        }
    }

    public byte[] take(int i) throws InterruptedException, IllegalStateException, ProtocolException, ClosedChannelException {
        DemultiplexerStreaminformations demultiplexerStreaminformations = this._arrayOfStreams[i];
        synchronized (demultiplexerStreaminformations._smallDataPacketQueue) {
            boolean z = false;
            synchronized (demultiplexerStreaminformations) {
                if (demultiplexerStreaminformations.isEndStream()) {
                    demultiplexerStreaminformations.createUnlockPacket();
                    return null;
                }
                if (demultiplexerStreaminformations.isStreamAborted()) {
                    demultiplexerStreaminformations.createUnlockPacket();
                    throw new IllegalStateException("Fehler Stream (Index: " + i + "): Der Stream wurde mit 'abort'von der Empfängerapplikation abgebrochen und dann erneut mit 'take' aufgerufen");
                }
                if (demultiplexerStreaminformations.isStreamTerminated()) {
                    demultiplexerStreaminformations.createUnlockPacket();
                    throw new ProtocolException("Fehler Stream (Index: " + i + "): Ein Nutzdatenpaket wurde entweder doppelt empfangen oder ist verschwunden. Der Empfänger(StreamDemultplexer) hat den Stream abgebrochen und den Sender (StreamMultiplexer) benachrichtigt");
                }
                if (demultiplexerStreaminformations.isLostConnectionToSender()) {
                    demultiplexerStreaminformations.createUnlockPacket();
                    throw new ClosedChannelException();
                }
                if (demultiplexerStreaminformations.sizeOfSmallDataPacketQueue() == 0) {
                    z = true;
                }
                if (z) {
                    unpackBigPacket(demultiplexerStreaminformations);
                }
                synchronized (demultiplexerStreaminformations) {
                    if (demultiplexerStreaminformations.isEndStream()) {
                        demultiplexerStreaminformations.createUnlockPacket();
                        return null;
                    }
                    if (demultiplexerStreaminformations.isStreamAborted()) {
                        demultiplexerStreaminformations.createUnlockPacket();
                        throw new IllegalStateException("Fehler Stream (Index: " + i + "): Der Stream wurde mit 'abort'von der Empfängerapplikation abgebrochen und dann erneut mit 'take' aufgerufen");
                    }
                    if (demultiplexerStreaminformations.isStreamTerminated()) {
                        demultiplexerStreaminformations.createUnlockPacket();
                        throw new ProtocolException("Fehler Stream (Index: " + i + "): Ein Nutzdatenpaket wurde entweder doppelt empfangen oder ist verschwunden. Der Empfänger(StreamDemultplexer) hat den Stream abgebrochen und den Sender (StreamMultiplexer) benachrichtigt");
                    }
                    if (demultiplexerStreaminformations.isLostConnectionToSender()) {
                        demultiplexerStreaminformations.createUnlockPacket();
                        throw new ClosedChannelException();
                    }
                    byte[] data = demultiplexerStreaminformations.getData();
                    if (data == null) {
                        demultiplexerStreaminformations.setEndStreamTrue();
                    }
                    return data;
                }
            }
        }
    }

    private void unpackBigPacket(DemultiplexerStreaminformations demultiplexerStreaminformations) throws InterruptedException {
        int indexOfStream;
        ReferenceDataPacket referenceDataPacket = demultiplexerStreaminformations.getReferenceDataPacket();
        int i = 0;
        boolean z = false;
        boolean z2 = false;
        synchronized (demultiplexerStreaminformations) {
            indexOfStream = demultiplexerStreaminformations.getIndexOfStream();
            if (demultiplexerStreaminformations.getPacketIndexToSendNextMaxTicketIndex() == referenceDataPacket.getStreamPacketIndex()) {
                i = demultiplexerStreaminformations.getPacketIndexToSendNextMaxTicketIndex() + this._blockingFactor;
                int packetIndexToSendNextMaxTicketIndex = demultiplexerStreaminformations.getPacketIndexToSendNextMaxTicketIndex() + this._ticketBlockingFactor;
                demultiplexerStreaminformations.setMaxStreamPacketIndex(i);
                demultiplexerStreaminformations.setPacketIndexToSendNextMaxTicketIndex(packetIndexToSendNextMaxTicketIndex);
                z = true;
            }
            boolean z3 = false;
            Deserializer createDeserializer = SerializingFactory.createDeserializer(new ByteArrayInputStream(referenceDataPacket.getData()));
            while (!z3) {
                try {
                    int readInt = createDeserializer.readInt();
                    if (readInt >= 0) {
                        demultiplexerStreaminformations.putDataSmallDataPacketQueue(createDeserializer.readBytes(readInt));
                    } else if (readInt == -1) {
                        z2 = true;
                        z3 = true;
                        demultiplexerStreaminformations.putDataSmallDataPacketQueue(null);
                        _debug.fine("Beim auspacken eines großen Pakets, Stream(" + indexOfStream + ") wurde ein null-Paket mit Index(Index des großen Pakets) " + referenceDataPacket.getStreamPacketIndex() + " gefunden");
                    } else {
                        z3 = true;
                        _debug.finer("Ein großes Paket wurde ausgepackt, es enthielt nur Nutzdaten. Stream:" + indexOfStream + " Index des großen Pakets: " + referenceDataPacket.getStreamPacketIndex());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        if (z && !z2) {
            try {
                sendNewTicketIndexToSender(indexOfStream, i);
            } catch (IOException e2) {
                e2.printStackTrace();
                _debug.error("Ein Fehler beim serialisieren/deserialisieren: " + e2);
            }
        }
        this._numberOfTakes[indexOfStream] = this._numberOfTakes[indexOfStream] + 1;
        _debug.finer("Take Stream(" + indexOfStream + ") liefert: Paket mit Index " + referenceDataPacket.getStreamPacketIndex() + " Insgesamt wurden " + this._numberOfTakes[indexOfStream] + " takes auf diesem Stream ausgeführt");
    }

    public void receivedDataFromSender(byte[] bArr) throws IOException {
        Deserializer createDeserializer = SerializingFactory.createDeserializer(new ByteArrayInputStream(bArr));
        int readInt = createDeserializer.readInt();
        int readInt2 = createDeserializer.readInt();
        ReferenceDataPacket referenceDataPacket = new ReferenceDataPacket(readInt2, createDeserializer.readBytes(createDeserializer.readInt()));
        DemultiplexerStreaminformations demultiplexerStreaminformations = this._arrayOfStreams[readInt];
        synchronized (demultiplexerStreaminformations) {
            if (demultiplexerStreaminformations.isEndStream() || demultiplexerStreaminformations.isStreamAborted() || demultiplexerStreaminformations.isStreamTerminated() || demultiplexerStreaminformations.isLostConnectionToSender()) {
                _debug.info("Der Stream war bereits beendet, aber es kamen noch Daten für ihn (" + readInt + ")");
                this._abortedStreamReceivedData++;
            } else if (readInt2 == demultiplexerStreaminformations.getNextPacketIndex()) {
                demultiplexerStreaminformations.newDataPacketForStream(referenceDataPacket);
                demultiplexerStreaminformations.setNextPacketIndex(readInt2 + 1);
                if (demultiplexerStreaminformations.sizeOfDataQueue() > this._blockingFactor) {
                    this._numberOfOverchargesReceiver++;
                    _debug.error("Überlastung des Empfängers: Stream (" + readInt + ") Paketindex (" + referenceDataPacket.getStreamPacketIndex() + ") _blockingFactor (" + this._blockingFactor + ") Größe Dataqueue (" + demultiplexerStreaminformations.sizeOfDataQueue() + ") Der Fehler ist " + this._numberOfOverchargesReceiver + " vorgekommen");
                    throw new IllegalStateException("Der Empfängerpuffer wurde über das erlaubte Maß belaßtet");
                }
            } else {
                demultiplexerStreaminformations.setStreamTerminatedTrue();
                _debug.error("Stream (" + readInt + ") wurde beendet, weil Nutzdatenpakete in der falschen Reihenfolge/gar nicht ankamen. Erwartet wurde ein Paket mit dem Index: " + demultiplexerStreaminformations.getNextPacketIndex() + " Empfangen wurde ein mit dem Index: " + referenceDataPacket.getStreamPacketIndex());
                sendNewTicketIndexToSender(readInt, -1);
            }
        }
        this._numberOfPacketsReceived++;
        this._numberOfReceivedPackets[readInt] = this._numberOfReceivedPackets[readInt] + 1;
    }

    private void sendNewTicketIndexToSender(int i, int i2) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Serializer createSerializer = SerializingFactory.createSerializer(byteArrayOutputStream);
        createSerializer.writeInt(i);
        createSerializer.writeInt(i2);
        this._director.sendNewTicketIndexToSender(byteArrayOutputStream.toByteArray());
        _debug.finer("Ein Ticket wird an den Sender verschickt: Stream " + i + " neuer MaxIndex: " + i2);
    }

    public void killAllStreams() {
        _debug.fine("Die Empfängerapplikation bricht alle Streams ab, da die Leitung zur Senderapplikation nicht mehr vorhanden ist.");
        for (int i = 0; i < this._arrayOfStreams.length; i++) {
            DemultiplexerStreaminformations demultiplexerStreaminformations = this._arrayOfStreams[i];
            synchronized (demultiplexerStreaminformations) {
                demultiplexerStreaminformations.setLostConnectionToSenderTrue();
            }
        }
    }

    private void printByteArrayScreen(byte[] bArr) {
        String str = "Nutzdaten: ";
        if (bArr == null) {
            System.out.println("Das Nutzdatenpaket hatte als Inhalt NULL");
            return;
        }
        for (byte b : bArr) {
            str = str + ((char) b);
        }
        System.out.println(str);
        System.out.println("");
    }

    private void printDebugVariables() {
        System.out.println("*****************Debug Variablen StreamDemultiplexer (Empfänger)*************************");
        System.out.println("");
        System.out.println("Anzahl Streams, die Abgebrochen wurden, aber noch Daten empfangen haben (kein Fehler): " + this._abortedStreamReceivedData);
        System.out.println("Anzahl empfangender Pakete über alle Streams: " + this._numberOfPacketsReceived);
        System.out.println("Anzahl der Fälle, wo der Sender überlastet wurde (dies ist ein schwerer Fehler, die Zahl sollte 0 sein): " + this._numberOfOverchargesReceiver);
    }
}
