Class ArchiveBuffer
- java.lang.Object
-
- java.lang.Thread
-
- de.bsvrz.pua.prot.processing.ProcessingBuffer
-
- de.bsvrz.pua.prot.processing.archivebuffer.ArchiveBuffer
-
- All Implemented Interfaces:
java.lang.Runnable
public class ArchiveBuffer extends ProcessingBuffer
Puffer für die Daten vom Archivsystem, die von der Datenaufbereitung angefordert werden. Der folgende Algorithmus wird für jeden Zeitbereich wiederholt:
Für jedes reale Element, d.h. für jedes reale Attribut und für jede Attributgruppe wird eine Archivanfrage über den Zeitbereich und Datenart gestellt. Der jeweils erste pro Element erhaltene Datensatz wird markiert. Anschließend werden die Datensätze verschränkt. Dabei wird ein "Gewinner" festgestellt. Der Gewinner ist das Element mit dem kleinsten Datenzeitstempel. Danach werden die Werte der übrigen Attribute aufgefüllt und als Ausgangsdatensatz in einem Puffer abgelegt. Im nächsten Schritt wird ein weiterer Datensatz des Gewinners abgerufen, markiert mit den Datensätzen der übrigen Elementen verschränkt und dabei ein neuer "Gewinner" bestimmt(*). Nun wird die Datenaufbereitung benachrichtigt, dass ein Ausgangsdatensatz vorliegt. Grund für diese Verzögerung ist, dass die Bearbeitung eines Ausgangsdatensatzes erst vollständig abgeschlossen ist, wenn der nächste Ausgangsdatensatz erzeugt wurde. Der in (*) gewonnene Ausgangsdatensatz wird im Puffer abgelegt, und der Algorithmus wiederholt sich bis alle Daten vom Archivsystem abgerufen wurden. Ausgangsdatensatz wird ebenfalls in einem Puffer abgelegt. Wird in einem eigenen Thread gestartet, da ggf. auf die Antwort des Archivsystems gewartet werden muss. Da Archivdaten streambasiert abgefragt werden können, stellt der ArchivBuffer sicher, dass sich im Puffer für die Ausgangsdatensätze nur eine bestimmte Anzahl von Einträgen ansammelt. Wird dieses Limit (MAX_THRESHOLD
) erreicht, so stellt der Archivbuffer die arbeit ein, bis sich der Puffer wieder fast vollständig (MIN_THRESHOLD
) geleert hat.
-
-
Field Summary
Fields Modifier and Type Field Description static int
MAX_THRESHOLD
Anzahl Ausgangsdatensätze, die im Ausgangspuffer liegen.static int
MIN_THRESHOLD
Anzahl Ausgangsdatensätze, die mindestens im Ausgangspuffer liegen sollten.-
Fields inherited from class de.bsvrz.pua.prot.processing.ProcessingBuffer
_buffer, _bufferResult, _connection, _dataSetBuilder, _debug, _done, _periods, _processingInformation, _realElements, _tempElements, INITIAL_RINGBUFFER_SIZE
-
-
Constructor Summary
Constructors Constructor Description ArchiveBuffer(de.bsvrz.dav.daf.main.ClientDavInterface dav, ProcessingInterface processor, de.bsvrz.dav.daf.main.config.ConfigurationObject configAuth, ProcessingInformation pi, java.util.List<de.bsvrz.sys.funclib.losb.util.Tuple<java.lang.Long,java.lang.Long>> periods, java.lang.String scriptName, de.bsvrz.dav.daf.accessControl.UserInfo userInfo)
Startet den Online-ProcessingBuffer.
-
Method Summary
Modifier and Type Method Description boolean
applyAggregations(java.util.List<BaseDataSet> baseData)
Führt die Aggregierungen durch.int[]
getLinkedAttributes()
Zeigt an welche Attribute durch die Aggregationspalte
zusammengefasst werden.boolean
hasData()
Zeigt an ob Daten abgeholt werden können.void
init()
Sendet erste Anfragen an das Archivsystem.boolean
isDone()
Zeigt ob der Buffer noch weitere Daten liefern wird.protected boolean
isDoneCollecting()
Überprüft ob die Datensammlung abgeschlossen ist.protected boolean
isListAggregation()
Werden nicht aggregierte Daten versendet?protected void
processNewData(ValueProvider[] elements, java.util.ArrayList<ValueProvider> winners)
Bereitet aus den Ergebnisdatensätzen die Ergebnisdaten auf.protected void
requestData()
Stellt Archivanfragen für alle realen Elemente mit den momentan engetragenen Werten von_archiveUser
void
storeAggregatedData(byte status)
Falls Aggregationsdatensätze vorhanden sind, werden sie in den Ausgangspuffer gelegt.IntermediateDataSet
take()
Liefert einen Ausgangsdatensatz zurück.-
Methods inherited from class de.bsvrz.pua.prot.processing.ProcessingBuffer
abort, applyPostFilter, getInsertEmpty, getProtocolType, getResult, getScriptName, getTimeStampOrigin, getWinners, isAbort, isAggregate, nextInterval, notifyProcessor, run, setAggregate, setDone, setTempAttributes, size, storeDataSet
-
Methods inherited from class java.lang.Thread
activeCount, checkAccess, clone, countStackFrames, currentThread, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, onSpinWait, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, suspend, toString, yield
-
-
-
-
Constructor Detail
-
ArchiveBuffer
public ArchiveBuffer(de.bsvrz.dav.daf.main.ClientDavInterface dav, ProcessingInterface processor, de.bsvrz.dav.daf.main.config.ConfigurationObject configAuth, ProcessingInformation pi, java.util.List<de.bsvrz.sys.funclib.losb.util.Tuple<java.lang.Long,java.lang.Long>> periods, java.lang.String scriptName, de.bsvrz.dav.daf.accessControl.UserInfo userInfo) throws de.bsvrz.sys.funclib.losb.exceptions.FailureException
Startet den Online-ProcessingBuffer. Meldet sich für jede benötigten Datenidentifikation beim Datenverteiler an.init()
sollte unverzüglich nach Erzeugen des Objekts aufgerufen werden.- Parameters:
dav
- Verbindung zum Datenverteilerprocessor
- Objekt das die Datenaufbereitung durchführt. Wird jedes Mal benachrichtigt, wenn ein Ausgangsdatensatz vorliegt.configAuth
- Konfigurationsverantwortlicher, dessen Archivsystem verwendet wird.pi
- Informationen zur Datenaufbereitung. Die Zeitbereiche müssen bereits sortiert und zusammengefasst sein!periods
- Zeitbereiche, in denen der Archivbuffer Daten sammeln soll. Inhalt wird geändert!scriptName
- Bezeichnung des Skripts zur Verwendung in Fehlerausgaben.userInfo
- Benutzer für Rechteprüfung- Throws:
de.bsvrz.sys.funclib.losb.exceptions.FailureException
- Fehler bei der Kommunikation mit der Konfiguration
-
-
Method Detail
-
init
public void init() throws de.bsvrz.sys.funclib.losb.exceptions.FailureException, java.lang.InterruptedException
Sendet erste Anfragen an das Archivsystem. Startet Anschließend einen Thread, der die Daten vom Archivsystem entgegennimmt. Falls es zu einem Fehler bei der Archivanfrage kommt, wird der Thread nicht gestartet, undProcessingBuffer.abort()
aufgerufen.- Specified by:
init
in classProcessingBuffer
- Throws:
de.bsvrz.sys.funclib.losb.exceptions.FailureException
- Fehler bei der Archivanfrage.java.lang.InterruptedException
- Warten auf Archivantwort wurde unterbrochen.- See Also:
ProcessingBuffer.init()
-
applyAggregations
public boolean applyAggregations(java.util.List<BaseDataSet> baseData)
Description copied from class:ProcessingBuffer
Führt die Aggregierungen durch. Die Aggregierungen werden jedoch nur durchgeführt, fallsProcessingBuffer.isAggregate()
true
liefert.- Specified by:
applyAggregations
in classProcessingBuffer
- Parameters:
baseData
- Werte des Ausgangsdatensatz. Einträge können von der Methode geändert werden.- Returns:
true
: Der Ausgangsdatensatz soll nachgefiltert und ausgegeben werden.- See Also:
ProcessingBuffer.applyAggregations(List)
-
requestData
protected void requestData() throws de.bsvrz.sys.funclib.losb.exceptions.FailureException, java.lang.InterruptedException
Stellt Archivanfragen für alle realen Elemente mit den momentan engetragenen Werten von_archiveUser
- Throws:
de.bsvrz.sys.funclib.losb.exceptions.FailureException
- Fehler bei der Archivanfrage.java.lang.InterruptedException
- Warten auf Antwortdatensatz wurde unterbrochen.
-
isDoneCollecting
protected boolean isDoneCollecting() throws de.bsvrz.sys.funclib.losb.exceptions.FailureException, java.lang.InterruptedException
Überprüft ob die Datensammlung abgeschlossen ist.- Specified by:
isDoneCollecting
in classProcessingBuffer
- Returns:
True
falls die Datensammlung abgeschlossen ist. Überprüft zudem, wieviele Elemente sich in der Warteschlange befinden. Ist die Warteschlange bereits überMAX_THRESHOLD
gefüllt, so wird der ArchivBuffer angehalten, bis die Queue wieder fast vollständig (MIN_THRESHOLD
) geelert ist.- Throws:
de.bsvrz.sys.funclib.losb.exceptions.FailureException
- Fehler bei der Archivanfrage. Es wird eine Archivanfrage gestellt, wenn Daten für ein weiteres Intervall angefragt werden.java.lang.InterruptedException
- Warten auf Antwortdatensatz wurde unterbrochen.- See Also:
ProcessingBuffer.isDoneCollecting()
-
isDone
public boolean isDone()
Description copied from class:ProcessingBuffer
Zeigt ob der Buffer noch weitere Daten liefern wird.- Specified by:
isDone
in classProcessingBuffer
- Returns:
true
falls noch weitere Daten zu erwarten sind.- See Also:
ProcessingBuffer.isDone()
-
hasData
public boolean hasData()
Zeigt an ob Daten abgeholt werden können.- Specified by:
hasData
in classProcessingBuffer
- Returns:
true
falls Daten mittelstake()
abgeholt werden können.
-
take
public IntermediateDataSet take() throws java.lang.InterruptedException
Description copied from class:ProcessingBuffer
Liefert einen Ausgangsdatensatz zurück. Setzt dabei das 'zeitdauer' Attribut. Liegt keiner vor, kehrt die Methode sofort mit dem Rückgabewert null zurück.- Overrides:
take
in classProcessingBuffer
- Returns:
- Ausgangsdatensatz und Status oder null, falls keiner vorliegt.
- Throws:
java.lang.InterruptedException
- Warten auf Ausgangsdatensatz wurde unterbrochen.- See Also:
ProcessingBuffer.take()
-
isListAggregation
protected boolean isListAggregation()
Description copied from class:ProcessingBuffer
Werden nicht aggregierte Daten versendet?- Specified by:
isListAggregation
in classProcessingBuffer
- Returns:
true
fallsListe
eine der ausgewählten die Aggregationsanwendungen ist.- See Also:
ProcessingBuffer.isListAggregation()
-
getLinkedAttributes
public int[] getLinkedAttributes()
Zeigt an welche Attribute durch die Aggregationspalte
zusammengefasst werden. Zusammengehörige Spalten werden duch die gleichen Nummern gekennzeichnet.- Returns:
- Zusammen gehörende Spalten.
null
falls diespalten
Aggregation nicht verwendet wird.
-
storeAggregatedData
public void storeAggregatedData(byte status)
Description copied from class:ProcessingBuffer
Falls Aggregationsdatensätze vorhanden sind, werden sie in den Ausgangspuffer gelegt.- Specified by:
storeAggregatedData
in classProcessingBuffer
- Parameters:
status
- Status, den der Aggregationsdatensatz erhalten soll.- See Also:
ProcessingBuffer.storeAggregatedData(byte)
-
processNewData
protected void processNewData(ValueProvider[] elements, java.util.ArrayList<ValueProvider> winners) throws de.bsvrz.sys.funclib.losb.exceptions.FailureException, java.lang.InterruptedException
Description copied from class:ProcessingBuffer
Bereitet aus den Ergebnisdatensätzen die Ergebnisdaten auf.- Specified by:
processNewData
in classProcessingBuffer
- Parameters:
elements
- Liste aller Datenlieferanten.winners
- Rückgabe: Liste der Datensätze mit minimalem, nicht aufgefüllten Datenzeitstempel. (Es sollte eine leere ArrayList übergeben werden)- Throws:
de.bsvrz.sys.funclib.losb.exceptions.FailureException
- Fehler bei der Aufbereitung.java.lang.InterruptedException
- Aufbereitung wurde unterbrochen.- See Also:
ProcessingBuffer.processNewData(ValueProvider[], ArrayList)
-
-