public class InQueuesMgr
extends java.lang.Object
Warteschlangen-Manager. Verteilt reinkommende Datentelegramm mit Datensätzen/Archivanfragen/Archiv-Informationsanfragen/ Backup-/Restore-/HeaderRestore-/LZV/Delete-/RequestGap-Aufträgen auf die jeweilige Warteschlange.
Modifier and Type | Class and Description |
---|---|
class |
InQueuesMgr.ArchiveDataReceiver
Empfaengerklasse für Archivdaten
|
class |
InQueuesMgr.ArchiveSettingsReceiver
Empfaengerklasse für Archiveinstellungen
|
class |
InQueuesMgr.DataAckSender |
class |
InQueuesMgr.DataReceiver
Abstrakte Klasse, die zum Empfang bestimmter Daten (z.B. von Archiv-Queries) verwendet werden kann.
|
class |
InQueuesMgr.QueryReceiver
Empfaengerklasse für Archivanfragen
|
Modifier and Type | Field and Description |
---|---|
static int |
NUM_OF_NONARCH_QUEUES |
static int |
NUM_OF_SINGLETASKS |
static int |
Q_ARC_SETT_IDX |
static int |
Q_ARCINF_IDX |
static int |
Q_BACKUP_IDX |
static int |
Q_DELETE_IDX |
static int |
Q_HDR_REST_IDX |
static int |
Q_LZV_IDX |
static int |
Q_NUM_QRY_INF_IDX |
static int |
Q_QRYA_IDX
Indexe der Eingangswarteschlangen
|
static int |
Q_QRYB_IDX |
static int |
Q_QRYC_IDX |
static int |
Q_REQ_GAP_IDX |
static int |
Q_RESTORE_IDX |
static int |
Q_SIMPARAM_IDX |
static int |
ST_ARC_SETT_IDX |
static int |
ST_BACKUP_IDX |
static int |
ST_DELETE_IDX |
static int |
ST_HDR_REST_IDX |
static int |
ST_LZV_IDX |
static int |
ST_NUM_QRY_INF_IDX |
static int |
ST_REQ_GAP_IDX |
static int |
ST_RESTORE_IDX |
Constructor and Description |
---|
InQueuesMgr(ArchiveManager aMgr,
DataIdentTree dTree,
int numOfArchQueuesOnline,
int numOfArchQueuesReq,
int totalCapacityOfOnlineQueues,
int totalCapacityOfRequestedQueues,
long minWaitNanosPerSubscription,
long maxWaitNanosPerSubscription,
int subscriptionSlidingWindowSize)
Erzeugt den Warteschlangen-Manager.
|
Modifier and Type | Method and Description |
---|---|
int |
calcOnlineArchivTaskIndex(long objID,
long atgID,
long aspID,
int simVar)
Verteilung der Datenidentifikationen auf die Online-Archiv-Tasks erfolgt anhand der IDs.
|
int |
calcReqArchivTaskIndex(long objID,
long atgID,
long aspID,
int simVar)
Verteilung der Datenidentifikationen auf die Nachgefordert-Archiv-Tasks erfolgt anhand der IDs.
|
void |
checkForSuspendNonWriteTask()
Prueft ob Tasks temporaer blockiert werden sollen und blockiert ggf. den aufrufenden Task.
|
int |
countOnlineDataInQueues() |
void |
decrOnlineDataInQueues() |
InQueuesMgr.ArchiveDataReceiver |
getArchiveDataReceiver()
Die Klasse
ArchivConfig meldet die zu archivierenden Daten mit dem Objekt als Empfaenger an, das von dieser Methode geliefert wird. |
float[][] |
getArchiveQueuesOnlineStatus() |
float[][] |
getArchiveQueuesRequestedStatus() |
ArchiveSettingsTask |
getArchiveSettingsTask()
Liefert den Task, der für die Bearbeitung der Archiveinstellungen verantwortlich ist.
|
long |
getCloseContainerSuccess() |
InQueuesMgr.DataAckSender |
getDataAckSender()
Die Klasse
ArchivConfig meldet die Quittungen für die zu archivierenden Daten mit dem Objekt als Sender an, das von dieser Methode geliefert wird. |
long |
getFailedCountOnline() |
long |
getFailedCountRequested() |
long |
getFailedCountTotal() |
int |
getHiQueryTaskNum() |
int |
getInitialCapacityOfOnlineQueues()
Initiale Maximalgröße eines Ringpuffers für aktuelle Datensätze.
|
int |
getInitialCapacityOfRequestedQueue()
Kapazität des Ringpuffers für nachgelieferte Datensätze
|
int |
getLoQueryTaskNum() |
int |
getMidQueryTaskNum() |
float[][] |
getNonArchiveQueuesStatus() |
int |
getNumOfArchQueuesOnline()
Zahl der Tasks, die aktuelle Datensätze archivieren (sollte Primzahl sein)
|
int |
getNumOfArchQueuesReq()
Zahl der Tasks, die nachgelieferte Datensätze archivieren
|
void |
getObjectsFromDav()
In dieser Methode wird allen Tasks (falls notwendig) die Moeglichkeit gegeben, benoetigte Objekte vom DAV zu laden.
|
ArchiveTask |
getOnlineArchiveTask(long objId,
long atgId,
long aspId,
int simVar)
Nur für Testzwecke: Liefert den gewuenschten ArchiveTask
|
int |
getOnlineQueueResizeBlockSize()
Größe der Blocks, in denen die Online-Queues vergrößert oder verkleinert werden
|
long |
getProcessedCountOnline() |
long |
getProcessedCountRequested() |
long |
getProcessedCountTotal() |
long |
getQueuedCountOnline() |
long |
getQueuedCountTotal() |
double |
getQueueLoad() |
long |
getQueuesCountRequested() |
long |
getReceivedCountOnline() |
SingleTask |
getSingleTask(int taskIndex)
Liefert den gewuenschten SingleTask (um etwa Listener einzuhaengen)
|
long |
getSuccessCountOnline() |
long |
getSuccessCountRequested() |
long |
getSuccessCountTotal() |
int |
getTotalCapacityOfOnlineQueues()
Gesamtgröße (Summe) der Online-Queues
|
int |
getTotalCapacityOfRequestedQueues()
Gesamtgröße (Summe) der Nachgefordert-Queues
|
int |
getTotalOnlineQueuesCapa() |
void |
insertCloseContainer(long objID,
ArchiveTask.CloseContainerObject cco)
Fuegt einen nachgelieferten Datensatz in die entsprechende Queue ein.
|
void |
insertInArchiveQueueReq(long archiveTime,
ArchiveData ad)
Fuegt einen nachgeforderten Datensatz in die entsprechende Queue ein.
|
boolean |
insertInBackupQueue(ResultData resultData)
Fügt dem Sicherungs-Task einen Auftrag hinzu.
|
boolean |
insertInDeleteRegularQueue(ResultData resultData)
Fügt dem Lösch-Task (automatisches Löschen) einen Auftrag hinzu.
|
boolean |
insertInLZVQueue(ResultData resultData)
Fügt dem LZV-Task einen Auftrag hinzu.
|
boolean |
insertInRequestQueue(ResultData resultData)
Fügt dem Nachfordern-Task einen Auftrag hinzu.
|
boolean |
insertInSimVarDeleteQueue(ResultData resultData)
Fügt dem Simulationsvarianten-Lösch-Task einen Auftrag hinzu.
|
boolean |
insertSimVarParam(SimulationResultData resultData)
Fügt dem Parametrierungs-Task einen weiteren Auftrag hinzu.
|
protected void |
logCapaNoLongerExceeded(int maxsize,
java.lang.String qname,
int leftouts) |
protected void |
logSendMsgCapaExceeded(int maxsize,
java.lang.String qname,
ResultData rd)
Betriebsmeldung absetzen: Warteschlange voll.
|
void |
resetDSCounter()
Setzt alle Zaehler auf 0.
|
void |
ringOfDeath(Task doomedTask) |
void |
setFreeDiskSpace(long freeDiskSpace)
Aktualisiert den freien Speicherplatz
|
void |
setQueryTaskNumbers(int numHi,
int numMid,
int numLo)
Setzt die Anzahl der Threads für Archivanfragen auf die angegebenen Werte.
|
void |
startAllTasks()
Startet alle Tasks (Multi- und Single-Tasks) sowie die Archiv-Tasks.
|
void |
stopAllTasks()
Beendet alle Tasks (Multi- und Single-Tasks) sowie die Archiv-Tasks.
|
void |
subscribe(ClientDavInterface davCon,
SystemObject so,
DataDescription dd)
Anmelden mit Verzögerung (Sliding-Window-Mechanismus)
|
void |
subscribeQueries()
In dieser Methode wird allen Tasks (falls notwendig) die Moeglichkeit gegeben, sich auf Objekte anzumelden.
|
void |
subscribeSettings()
In dieser Methode wird allen Tasks (falls notwendig) die Moeglichkeit gegeben, sich auf Objekte anzumelden.
|
public static final int ST_ARC_SETT_IDX
public static final int ST_BACKUP_IDX
public static final int ST_RESTORE_IDX
public static final int ST_HDR_REST_IDX
public static final int ST_LZV_IDX
public static final int ST_DELETE_IDX
public static final int ST_REQ_GAP_IDX
public static final int ST_NUM_QRY_INF_IDX
public static final int NUM_OF_SINGLETASKS
public static final int Q_QRYA_IDX
Indexe der Eingangswarteschlangen
public static final int Q_QRYB_IDX
public static final int Q_QRYC_IDX
public static final int Q_ARCINF_IDX
public static final int Q_BACKUP_IDX
public static final int Q_RESTORE_IDX
public static final int Q_HDR_REST_IDX
public static final int Q_LZV_IDX
public static final int Q_DELETE_IDX
public static final int Q_REQ_GAP_IDX
public static final int Q_ARC_SETT_IDX
public static final int Q_SIMPARAM_IDX
public static final int Q_NUM_QRY_INF_IDX
public static final int NUM_OF_NONARCH_QUEUES
public InQueuesMgr(ArchiveManager aMgr, DataIdentTree dTree, int numOfArchQueuesOnline, int numOfArchQueuesReq, int totalCapacityOfOnlineQueues, int totalCapacityOfRequestedQueues, long minWaitNanosPerSubscription, long maxWaitNanosPerSubscription, int subscriptionSlidingWindowSize)
Erzeugt den Warteschlangen-Manager.
aMgr
- Archiv-ManagerdTree
- DataIdentTreenumOfArchQueuesOnline
- Anzahl Queues für Online-DatensätzenumOfArchQueuesReq
- Anzahl Queues für nachgeforderte DatensätzetotalCapacityOfOnlineQueues
- totalCapacityOfRequestedQueues
- minWaitNanosPerSubscription
- Minimale Wartezeit in Nanosekunden zwischen 2 AnmeldungenmaxWaitNanosPerSubscription
- Maximale Wartezeit in Nanosekunden zwischen 2 AnmeldungensubscriptionSlidingWindowSize
- Maximale Anzahl an unbestätigten gleichzeitigen Anmeldevorgängen (wird evtl. durch maxWaitNanosPerSubscription überschrieben)public void subscribe(ClientDavInterface davCon, SystemObject so, DataDescription dd) throws java.lang.InterruptedException
Anmelden mit Verzögerung (Sliding-Window-Mechanismus)
davCon
- Datenverteilerverbindungso
- Objektdd
- DataDescriptionjava.lang.InterruptedException
public final int getNumOfArchQueuesReq()
Zahl der Tasks, die nachgelieferte Datensätze archivieren
public final int getNumOfArchQueuesOnline()
Zahl der Tasks, die aktuelle Datensätze archivieren (sollte Primzahl sein)
public void setQueryTaskNumbers(int numHi, int numMid, int numLo)
Setzt die Anzahl der Threads für Archivanfragen auf die angegebenen Werte. Falls Threads entfernt werden, werden noch behandelte Anfragen zuende bearbeitet.
numHi
- Anzahl Tasks der Prioritaet “hoch”numMid
- Anzahl Tasks der Prioritaet “mittel”numLo
- Anzahl Tasks der Prioritaet “niedrig”public int getHiQueryTaskNum()
public int getMidQueryTaskNum()
public int getLoQueryTaskNum()
public void checkForSuspendNonWriteTask() throws java.lang.InterruptedException
Prueft ob Tasks temporaer blockiert werden sollen und blockiert ggf. den aufrufenden Task.
Diese Methode und #suspendNonWriteTasks(boolean)
verwenden eine reduzierte Synchronisierung um die Zahl der durchlaufenen synchronized-Bloecke gering zu halten.
java.lang.InterruptedException
public void getObjectsFromDav()
In dieser Methode wird allen Tasks (falls notwendig) die Moeglichkeit gegeben, benoetigte Objekte vom DAV zu laden.
public void subscribeSettings()
In dieser Methode wird allen Tasks (falls notwendig) die Moeglichkeit gegeben, sich auf Objekte anzumelden. Abmelden geschieht zentral ueber den ConnectionManager
.
public void subscribeQueries()
In dieser Methode wird allen Tasks (falls notwendig) die Moeglichkeit gegeben, sich auf Objekte anzumelden. Abmelden geschieht zentral ueber den ConnectionManager
.
public void startAllTasks()
Startet alle Tasks (Multi- und Single-Tasks) sowie die Archiv-Tasks.
public void stopAllTasks() throws java.lang.InterruptedException
Beendet alle Tasks (Multi- und Single-Tasks) sowie die Archiv-Tasks.
java.lang.InterruptedException
public void ringOfDeath(Task doomedTask)
public int calcOnlineArchivTaskIndex(long objID, long atgID, long aspID, int simVar)
Verteilung der Datenidentifikationen auf die Online-Archiv-Tasks erfolgt anhand der IDs. Jede Datenidentifikation wird somit immer in dieselbe ArchiveQueue eingefuegt. Auf diese Weise kann die Reihenfolge des Eintreffens wesentlich einfacher erhalten werden.
objID
- Objekt-IDatgID
- Attributgruppen-IDaspID
- Aspekt-IDsimVar
- Simulationsvariantepublic int calcReqArchivTaskIndex(long objID, long atgID, long aspID, int simVar)
Verteilung der Datenidentifikationen auf die Nachgefordert-Archiv-Tasks erfolgt anhand der IDs. Jede Datenidentifikation wird somit immer in dieselbe ArchiveQueue eingefuegt. Auf diese Weise kann die Reihenfolge des Eintreffens wesentlich einfacher erhalten werden.
objID
- Objekt-IDatgID
- Attributgruppen-IDaspID
- Aspekt-IDsimVar
- Simulationsvariantepublic void insertInArchiveQueueReq(long archiveTime, ArchiveData ad)
Fuegt einen nachgeforderten Datensatz in die entsprechende Queue ein.
aTime
- Archivzeitrd
- Datensatzpublic boolean insertInSimVarDeleteQueue(ResultData resultData)
Fügt dem Simulationsvarianten-Lösch-Task einen Auftrag hinzu.
resultData
- Auftrag.true
falls der Auftrag eingefügt werden konnte. false
sonst.public boolean insertSimVarParam(SimulationResultData resultData)
Fügt dem Parametrierungs-Task einen weiteren Auftrag hinzu.
resultData
- Auftrag. Enthält Informationen, auf welche Datenidentifikationen sich das Archivsystem anmelden soll.true
falls der Auftrag eingefügt werden konnte. false
sonst.public boolean insertInDeleteRegularQueue(ResultData resultData)
Fügt dem Lösch-Task (automatisches Löschen) einen Auftrag hinzu.
resultData
- Auftrag.true
falls der Auftrag eingefügt werden konnte. false
sonst.public boolean insertInRequestQueue(ResultData resultData)
Fügt dem Nachfordern-Task einen Auftrag hinzu.
resultData
- Auftrag.true
falls der Auftrag eingefügt werden konnte. false
sonst.public boolean insertInLZVQueue(ResultData resultData)
Fügt dem LZV-Task einen Auftrag hinzu.
resultData
- Auftrag.true
falls der Auftrag eingefügt werden konnte. false
sonst.public boolean insertInBackupQueue(ResultData resultData)
Fügt dem Sicherungs-Task einen Auftrag hinzu.
resultData
- Auftrag.true
falls der Auftrag eingefügt werden konnte. false
sonst.public void insertCloseContainer(long objID, ArchiveTask.CloseContainerObject cco)
Fuegt einen nachgelieferten Datensatz in die entsprechende Queue ein.
aTime
- Archivzeitrd
- Datensatzprotected void logSendMsgCapaExceeded(int maxsize, java.lang.String qname, ResultData rd)
Betriebsmeldung absetzen: Warteschlange voll.
maxsize
- Kapazität der Warteschlangeqname
- Name der Warteschlangerd
- Datensatz.protected void logCapaNoLongerExceeded(int maxsize, java.lang.String qname, int leftouts)
public float[][] getArchiveQueuesOnlineStatus()
public float[][] getArchiveQueuesRequestedStatus()
public float[][] getNonArchiveQueuesStatus()
public int getTotalOnlineQueuesCapa()
public double getQueueLoad()
public int countOnlineDataInQueues()
public void resetDSCounter()
Setzt alle Zaehler auf 0.
public void decrOnlineDataInQueues()
public ArchiveTask getOnlineArchiveTask(long objId, long atgId, long aspId, int simVar)
Nur für Testzwecke: Liefert den gewuenschten ArchiveTask
objId
- Objekt-IDatgId
- Attributgruppen-IDaspId
- Aspekt-IDsimVar
- SimulationsvariantecalcArchivTaskIndex(long,long,long, int, int)
public ArchiveSettingsTask getArchiveSettingsTask()
Liefert den Task, der für die Bearbeitung der Archiveinstellungen verantwortlich ist. Dies ist notwendig, damit der ArchiveManager
beim Start einen ArchiveSettingsTask.ArSSettingListener
einhaengen und darauf warten kann, dass die Bearbeitung der Archiveinstellungen abgeschlossen ist. Das Archivsystem wartet beim Start auf die Archiv-Einstellungen, da es nicht sinnvoll ist, die ersten Sekunden mit Default-Werten loszulaufen.
public SingleTask getSingleTask(int taskIndex)
Liefert den gewuenschten SingleTask (um etwa Listener einzuhaengen)
taskIndex
- Index der internen SingleTask-Listepublic InQueuesMgr.ArchiveDataReceiver getArchiveDataReceiver()
Die Klasse ArchivConfig
meldet die zu archivierenden Daten mit dem Objekt als Empfaenger an, das von dieser Methode geliefert wird.
public InQueuesMgr.DataAckSender getDataAckSender()
Die Klasse ArchivConfig
meldet die Quittungen für die zu archivierenden Daten mit dem Objekt als Sender an, das von dieser Methode geliefert wird.
public void setFreeDiskSpace(long freeDiskSpace)
Aktualisiert den freien Speicherplatz
freeDiskSpace
- freier Speicherplatz in Bytespublic int getInitialCapacityOfOnlineQueues()
Initiale Maximalgröße eines Ringpuffers für aktuelle Datensätze. Die MaximalGröße kann sich auf Kosten anderer Puffer aendern. Mit dynamischen PufferGrößen kann man Lastspitzen besser abfangen.
public int getInitialCapacityOfRequestedQueue()
Kapazität des Ringpuffers für nachgelieferte Datensätze
public int getTotalCapacityOfOnlineQueues()
Gesamtgröße (Summe) der Online-Queues
public int getTotalCapacityOfRequestedQueues()
Gesamtgröße (Summe) der Nachgefordert-Queues
public int getOnlineQueueResizeBlockSize()
Größe der Blocks, in denen die Online-Queues vergrößert oder verkleinert werden
public long getReceivedCountOnline()
resetDSCounter()
von getArchiveDataReceiver()
empfangenen Datensätze, egal ob diese archiviert werden oder nicht. Kann zur Test-Synchronisation verwendet werden.public long getQueuedCountTotal()
resetDSCounter()
Datensätze, die in eine Queue eingefügt wurden.public long getQueuedCountOnline()
resetDSCounter()
Online-Datensätze, die in eine Queue eingefügt wurden.public long getQueuesCountRequested()
resetDSCounter()
nachgeforderten Datensätze, die in eine Queue eingefügt wurden.public long getFailedCountTotal()
resetDSCounter()
Datensätze, die nicht erfolgreich archiviert wurdenpublic long getSuccessCountTotal()
resetDSCounter()
Datensätze, die erfolgreich archiviert wurdenpublic long getFailedCountOnline()
resetDSCounter()
Online-Datensätze, die nicht erfolgreich archiviert wurdenpublic long getSuccessCountOnline()
resetDSCounter()
Online-Datensätze, die erfolgreich archiviert wurdenpublic long getFailedCountRequested()
resetDSCounter()
nachgeforderten Datensätze, die nicht erfolgreich archiviert wurdenpublic long getSuccessCountRequested()
resetDSCounter()
nachgeforderten Datensätze, die erfolgreich archiviert wurdenpublic long getProcessedCountTotal()
resetDSCounter()
Datensätze, die verarbeitet wurdenpublic long getProcessedCountOnline()
resetDSCounter()
Online-Datensätze, die verarbeitet wurdenpublic long getProcessedCountRequested()
resetDSCounter()
nachgeforderten Datensätze, die verarbeitet wurdenpublic long getCloseContainerSuccess()
resetDSCounter()
erfolgreich verarbeiteten Close-Container-Datensätze. Kann zur Test-Synchronisation verwendet werden.