4.12Spezielle threadsichere Datenstrukturen
In Zeiten der Kernexplosion – jedes moderne Handy hat heute schon mindestens zwei CPU-Kerne – ändert sich auch die Programmierung grundlegend. Datenstrukturen, auf die Programme nebenläufig zugreifen, müssen auf diese Situation vorbereitet sein, denn andernfalls gehen Daten verloren, tauchen doppelt auf, oder die Datenstruktur ist komplett unbrauchbar.
4.12.1Zu Beginn nur synchronisierte Datenstrukturen in Java 1.0
Als Java geboren wurde, gab es im Grunde nur zwei Datenstrukturen: Vector und Hashtable. Es gab keine Mengen oder Queues, wer die benötigte, musste die Funktionalität mit Vector und Hashtable realisieren oder neu programmieren. Das java.util-Paket enthält mit Stack und Properties noch zwei weitere Datenstrukturen, doch Stack ist eine simple Unterklasse von Vector und Properties eine einfache Unterklasse von Hashtable, sodass sich die Implementierungsdetails von Vector/Hashtable auf die Unterklassen übertragen.
Eine wichtige Eigenschaft der Klassen Vector und Hashtable ist, dass sie synchronisiert sind. Das bedeutet, dass ein Thread, der eine Methode aufruft, damit allen anderen Threads so lange den Zutritt verwehrt, bis die Operation angeschlossen ist. Erst dann öffnet sich das Tor wieder für andere Methoden.
Eine Dauersynchronisation klingt zwar nett, hat aber zwei Nachteile: Zum einen kostete insbesondere zu Anfangszeiten der virtuellen Maschinen die Steuerung der Zugriffe Geschwindigkeit, und zum anderen muss bedacht werden, dass einige Operationen ja grundsätzlich schon parallel ohne Synchronisation abgearbeitet werden können, insbesondere alle Leseoperationen.
4.12.2Nicht synchronisierte Datenstrukturen in der Standard-Collection-API
Bei den aktuellen Klassen wie ArrayList, TreeSet oder HashMap sind die Methoden nicht mehr automatisch synchronized. Nutzen Programme diese Datenstrukturen nicht nebenläufig, gibt es folglich auch keinen Geschwindigkeitsverlust. Allerdings müssen Entwickler sich nun selbst um eine korrekte Sperrung kümmern.
Sollen Listen, Mengen oder Assoziativspeicher vor nebenläufigen Änderungen sicher sein, gibt es zwei Möglichkeiten: einmal über spezielle so genannte Wait-free- bzw. Lock-free-Algorithmen, die tatsächlich parallele Zugriffe – wenn möglich – ohne Lock erlauben, und einmal über synchronisierende Wrapper.
Wait-free-Algorithmen
Wenn zum Beispiel bei einer verketteten Liste ein Thread vorn etwas anhängt und der andere hinten etwas entfernt, ist das tatsächlich nebenläufig möglich, und es muss nicht die ganze Datenstruktur gelockt werden. Auch bei anderen Datenstrukturen kann direkt ohne Lock eine Operation durchgeführt werden, und es muss nur im Spezialfall eine besondere Absicherung vorgenommen werden.
Hierfür gibt es neue Klassen, die nicht im Paket java.util liegen, sondern in einem Unterpaket, java.util.concurrent. So lässt sich leicht merken: Bis auf Vector und Hashtable sind alle Datenstrukturen in java.util nicht threadsicher, alle in java.util.concurrent schon.
4.12.3Nebenläufiger Assoziativspeicher und die Schnittstelle ConcurrentMap
Die Schnittstelle java.util.concurrent.ConcurrentMap erweitert die Schnittstelle java.util.Map, führt aber keine Methoden hinzu, sondern dokumentiert nur das Verhalten von elf Operationen, dass diese threadsicher und atomar sind. Die Implementierungen der Schnittstelle sind:
ConcurrentHashMap: Ein schneller threadsicherer Assoziativspeicher nach dem Hashing-Verfahren. Seit Java 8 gibt es über 30 neue Methoden, die insbesondere die funktionale Programmierung unterstützen. Auch neu in Java 8 ist die Rückgabe von keySet(), die nun nicht mehr Set<K> ist, sondern ConcurrentHashMap.KeySetView<K,V>, eine besondere Set-Implementierung; hier nutzt Java 8 also kovariante Rückgabetypen.
ConcurrentSkipListMap: Ein threadsicherer Assoziativspeicher, der automatisch sortiert ist, also vergleichbar mit TreeMap. Die Klasse implementiert ConcurrentNavigableMap (die wiederum NavigableMap erweitert), eine Schnittstelle, die Methoden zur Teilmengenbildung vorgibt.
Beide sind sehr schnelle Datenstrukturen für gleichzeitig operierende Threads, die dann auch keine ConcurrentModificationException auslösen, wenn es parallele Veränderungen über einen Iterator gibt.
4.12.4ConcurrentLinkedQueue
Obwohl es keine Schnittstellen ConcurrentSet und ConcurrentList gibt, existiert zumindest eine Klasse ConcurrentLinkedQueue, die eine threadsichere und wartefreie Liste (genauer Queue) ist. Wie der Name schon andeutet, beruht die Realisierung auf verketteten Listen und nicht auf Arrays. Ein eigenes ConcurrentSet könnte auf der Basis von ConcurrentHashMap implementiert werden, so wie auch HashSet intern mit einer HashMap realisiert ist. Die Klasse ConcurrentSkipListSet ist eine performante nebenläufige Implementierung der Schnittstelle NavigableSet.
4.12.5CopyOnWriteArrayList und CopyOnWriteArraySet
Ist die Anzahl der Leseoperationen hoch, kann es sich lohnen, bei jedem Schreibzugriff erst die Daten zu kopieren und dann das Element hinzuzufügen, damit im Hintergrund andere Threads ohne einen Lock, der für das Schreiben nötig ist, Daten lesen können. Zwei dieser Datenstrukturen bietet Java an: CopyOnWriteArrayList für Listen und CopyOnWriteArraySet für Mengen. Die Klassen sind genau dann optimal, wenn wenig verändert – das ist teuer – und fast ausschließlich gelesen wird. Auch führen die Klassen zu keiner ConcurrentModificationException beim Ändern und gleichzeitigen Ablauf über Iteratoren. Denn falls die CopyOnWriteXXX-Datenstruktur verändert wird, arbeiten die »alten« Interessenten ja mit der herkömmlichen Version. Wenn zum Beispiel ein Iterator durch die Daten läuft und es dann eine Änderung gibt, so führt die Änderung zu einer Kopie der Daten und zur anschließenden Veränderung. Kommt eine Anfrage an die Datenstruktur nach der Veränderung, so bekommt der Anfrager Daten aus der neuen veränderten Datenstruktur. Kommt eine Anfrage während der Veränderung, also zu dem Zeitpunkt, an dem die Veränderung noch nicht abgeschlossen ist, so ist weiterhin die alte Version die einzig korrekte, und Daten kommen aus dieser CopyOnWriteXXX-Datenstruktur.
4.12.6Wrapper zur Synchronisation
Können zur Absicherung nebenläufiger Operationen die oben aufgeführten Datenstrukturen aus java.util.concurrent nicht verwendet werden, etwa bei Java 1.4 oder bei eigenen, nicht nebenläufig implementierten Versionen von Set, Map, List und Queue, lassen sich Zugriffe auf die Datenstrukturen extern synchronisieren. Dazu fordern statische Methoden wie synchronizedXXX(…) einen Wrapper an, der sich um die existierende Datenstruktur legt. Die Wrapper arbeiten mit einem Lock, und Parallelität in den Datenstrukturen ist nicht gegeben.
[zB]Beispiel
Eine synchronisierte Liste:
Der generische Typ der Datenstruktur geht auch weiter an den Wrapper.
Die statischen synchronizedXXX(…)-Methoden liefern eine neue Sammlung, die sich wie eine Hülle um die existierende Datenstruktur legt und alle Methodenaufrufe synchronisiert. Paralleler Zugriff darf natürlich dann nur noch über den Wrapper laufen, wobei nichtparalleler Zugriff weiterhin über die Original-Datenstruktur möglich ist.
static <T> Collection<T> synchronizedCollection(Collection<T> c)
static <K,V> SortedMap<K,V> synchronizedSortedMap(SortedMap<K,V> m)
static <T> SortedSet<T> synchronizedSortedSet(SortedSet<T> s)
static <T> NavigableSet<T> synchronizedNavigableSet(NavigableSet<T> s)
Neu in Java 8.static <K,V> NavigableMap<K,V> synchronizedNavigableMap(NavigableMap<K,V> m)
Neu in Java 8. Liefert synchronisierte, also threadsichere Datenstrukturen.
[»]Hinweis zu Iteratoren von synchronisierten Wrappern
Mit dem Wrapper ist der nebenläufige Zugriff über die Methoden gesichert, aber nicht der Zugriff über den Iterator. Ist syncList eine synchronisierte Datenstruktur, die ein Iterator ablaufen möchte und soll während des Ablaufens jeder andere Zugriff gesperrt sein, so ist Folgendes zu schreiben:
synchronized ( syncList ) {
Iterator<Byte> iter = syncList.iterator();
}
Die Synchronisation ist immer auf dem Wrapper und nicht auf dem »Gewrappten«.
4.12.7Blockierende Warteschlangen
Die Schnittstelle BlockingQueue steht für besondere Queues, die blockieren können. Das kann aus zwei Gründen geschehen: Entweder sind keine Daten zu entnehmen, da die Queue leer ist, oder eine maximale Anzahl von zu haltenden Elementen ist erreicht. Besonders in Produzenten-/Konsumenten-Szenarien sind blockierende Warteschlangen sehr nützlich.
Eine performante threadsichere Implementierung ist in der Praxis sehr wichtig, und die Java SE-Bibliothek hat zwei besondere Realisierungen auf Lager:
ArrayBlockingQueue: Queue immer mit einer maximalen Kapazität, intern realisiert mit einem Feld
LinkedBlockingQueue: Queue unbeschränkt oder mit maximaler Kapazität, intern realisiert durch eine verkettete Liste
PriorityBlockingQueue: eine blockierende PriorityQueue
Die anderen Realisierungen wie DelayQueue sind für uns jetzt nicht relevant.
Das Schöne an blockierenden Warteschlangen ist ihr Verhalten in Produzenten-/Konsumenten-Verhältnissen; ein Thread (oder beliebig viele Threads) setzt (viele setzen) Daten in die Queue, ein Thread (oder beliebig viele Threads) holt (viele Threads holen) die Daten wieder raus. Bei einer Queue ist es ja so, dass sie nach dem FIFO-Verfahren arbeitet, das heißt, die Daten, die zuerst hineingelegt wurden, werden auch zuerst verarbeitet. Bei einer Prioritätswarteschlange ist das etwas anders, aber dazu gleich mehr.
4.12.8ArrayBlockingQueue und LinkedBlockingQueue
Bei den normalen blockierenden Datenstrukturen geht es darum, sich zwischen ArrayBlockingQueue und LinkedBlockingQueue zu entscheiden. Der wesentliche Unterschied ist – bis auf die interne Realisierung – dass die ArrayBlockingQueue immer mit einer maximalen Schranke von Elementen konfiguriert werden muss; ist diese Schranke erreicht, blockiert die Queue und nimmt keine neuen Elemente mehr an. Die LinkedBlockingQueue kann unbegrenzt wachsen (also bis Integer.MAX_VALUE). Ist die Queue dann offen, wird nur geblockt, wenn kein Element vorhanden ist, ein Thread aber eines entnehmen möchte.
Beispiel mit unbeschränkter LinkedBlockingQueue
Dazu ein einfaches Beispiel: Wir haben zwei Produzenten für Meldungen und einen Konsumenten, der sie nach Eingang einfach auf der Konsole ausgibt:
Listing 4.39com/tutego/insel/thread/concurrent/LoggingInQueue.java, LoggingInQueue
private static final BlockingQueue<String> messages =
new LinkedBlockingQueue<>();
private static class MessageOutputter extends Thread {
@Override public void run() {
while ( true )
try {
long startTime = System.currentTimeMillis();
System.out.printf( "%s (Wartete %d ms)%n",
messages.take(),
System.currentTimeMillis() – startTime ); }
catch ( InterruptedException e ) { }
}
}
private static class UserMessageProducer extends Thread {
@Override public void run() {
for( int i = 0; ; i++ )
messages.add( "msg " + i + " " +
JOptionPane.showInputDialog( "Meldung eingeben" ) );
}
}
private static class DiskspaceMessageProducer extends Thread {
@Override public void run() {
for( int i = 0; ; i++ ) {
String dir = System.getProperty( "user.dir" );
messages.add( "spc " + i + " " + new File( dir ).getFreeSpace() );
try { TimeUnit.SECONDS.sleep( 1 ); }
catch ( InterruptedException e ) { }
}
}
}
public static void main( String[] args ) {
new MessageOutputter().start();
new UserMessageProducer().start();
new DiskspaceMessageProducer().start();
}
}
Auf der einen Seite steht der Thread, der die blockiere Queue abhorcht. Wir fragen die Daten mit take() ab, damit die Methode – und somit der Thread – blockiert, falls keine Daten in der Queue sind. Die BlockingQueue-Schnittstelle definiert auch noch andere Methoden wie poll() oder peek(), aber die blockieren nicht, sondern liefern null, wenn keine Daten in der Queue sind – eigentlich sind die Methoden unpassend in einer blockierenden Datenstruktur, aber BlockingQueue erbt diese Methoden von Queue. (Leser sollten zum Test take() durch poll() ersetzen und das Ergebnis testen.) Um eine Vorstellung zu bekommen, wie lange take() warten muss, holen wir über System.currentTimeMillis() die Systemzeit in Millisekunden relativ zum 1.1.1970 vor dem Aufruf von take() sowie nach dem Aufruf von take() – die Differenz der Zeiten gibt uns eine Idee von der Dauer, während der der Thread wartet und auch keine Rechenzeit verbraucht.
Die Produzenten sind in unserem Fall zwei Threads. Der eine holt permanent die Anzahl freier Bytes auf der Festplatte des Benutzers, der andere Thread offeriert einen Benutzerdialog, und die Benutzereingabe kommt auf den Schirm. Gestartet mit ein paar Eingaben ist die Ausgabe dann auch recht unspektakulär:
spc 1 55942631424 (Wartete 943 ms)
spc 2 55943192576 (Wartete 1005 ms)
spc 3 55943192576 (Wartete 1007 ms)
msg 0 eingabe (Wartete 716 ms)
spc 4 55943192576 (Wartete 290 ms)
msg 1 eingabe (Wartete 822 ms)
spc 5 55943192576 (Wartete 180 ms)
msg 2 eingabe (Wartete 112 ms)
4.12.9PriorityBlockingQueue
Eine blockierende Prioritätswarteschlange ist eine besondere Prioritätswarteschlange, die threadsicher ist und den Entnehmer-Thread blockiert, wenn kein Element vorhanden ist, also die Queue leer ist. Für blockierende Prioritätswarteschlangen bietet Java eine Klasse: PriorityBlockingQueue; sie implementiert PriorityQueue. Die Sortierung ist entweder natürlich oder über einen externen Comparator geregelt, bei Letzterem muss das Vergleichsobjekt im Konstruktor übergeben werden.
Tickets mit Priorität
Unser nächstes Beispiel legt Tickets in eine blockierende Prioritätswarteschlange. Tickets mit der höchsten Priorität sollen nach vorne wandern. Gleichzeitig messen wir die Zeit beim Anlegen des Tickets, um bei Tickets mit gleicher Priorität dem Ticket den Vorzug zu geben, das als Erstes angelegt wurde.
Listing 4.40com/tutego/insel/thread/concurrent/Ticket.java
import java.util.Date;
class Ticket implements Comparable<Ticket> {
enum Priority { SEVERE, NORMAL }
private final Priority priority;
private final String message;
private final Date arrivalDate;
Ticket( Priority priority, String message ) {
this.priority = priority;
this.message = message;
arrivalDate = new Date();
}
@Override public int compareTo( Ticket that ) {
int ticketPriority = this.priority.compareTo( that.priority );
// Wenn Ticket-Priorität ungleich 0, dann ist ein Ticket wichtiger als das
// andere und die Zeit spielt keine Rolle.
if ( ticketPriority != 0 )
return ticketPriority;
// Wenn Ticket-Priorität gleich 0, dann sind beide Tickets
// gleich wichtig. Die Zeit kommt dann als Kriterium hinzu.
return this.arrivalDate.compareTo( that.arrivalDate );
}
@Override public String toString() {
return String.format( "%tT.%1$TL (%s) kam '%s'",
arrivalDate, priority, message );
}
}
Die Ticket-Klasse implementiert Comparable, sodass es eine natürliche Ordnung der Elemente gibt.
Ein Test, der die Ordnung bei der Sortierung zeigt, ist schnell geschrieben:
Listing 4.41com/tutego/insel/thread/concurrent/TicketDemo.java, main()
new Ticket( Priority.NORMAL, "Kein Senf" ),
new Ticket( Priority.SEVERE, "Feuer" ),
new Ticket( Priority.NORMAL, "Bier warm" ),
new Ticket( Priority.SEVERE, "Erdbeben" ) );
Collections.sort( tickets );
System.out.println( tickets );
Die Ausgabe ist wie:
13:45:20.777 (SEVERE) kam 'Erdbeben'
13:45:20.777 (NORMAL) kam 'Kein Senf'
13:45:20.777 (NORMAL) kam 'Bier warm'
Die Ausgabe macht zwei Dinge deutlich: Zunächst, dass die wichtigen Meldungen vorne liegen, und dann als zweites, dass zuerst eingefügte Meldungen auch zeitlich vor den nachfolgenden Meldungen liegen (wobei das Programm so schnell ist, dass die Zeit gar nicht als Komponente berücksichtigt werden kann).
Tickets generieren und mit der blockierenden Warteschlange verarbeiten
Im letzten Schritt können wir zwei Threads starten, wobei ein Thread neue Tickets (mit zufälliger Wichtigkeit) in die Queue legt und ein anderer Thread die Nachrichten permanent entnimmt:
Listing 4.42com/tutego/insel/thread/concurrent/PrioritizedTicketsQueue.java
import java.util.concurrent.*;
import com.tutego.insel.util.concurrent.Ticket.Priority;
public class PrioritizedTicketsQueue {
private static final BlockingQueue<Ticket> tickets =
new PriorityBlockingQueue<>();
private static class TicketProducer extends Thread {
@Override public void run() {
while ( true ) {
Ticket ticket = new Ticket(
Priority.values()[ (int)(Math.random() * 2) ], "Hilfe!" );
tickets.add( ticket );
System.out.println( "Neues Ticket: " + ticket );
try { TimeUnit.MILLISECONDS.sleep( (int)(Math.random() * 2000) ); }
catch ( InterruptedException e ) { /* Empty */ }
}
}
}
private static class TicketSolver extends Thread {
@Override public void run() {
while ( true ) {
try {
System.out.println( tickets.take() );
TimeUnit.SECONDS.sleep( 1 );
}
catch ( InterruptedException e ) { /* Empty */ }
}
}
}
public static void main( String[] args ) {
new TicketProducer().start();
new TicketSolver().start();
}
}
Lassen wir das Programm starten, kann die Ausgabe wie folgt sein:
14:28:55.422 (SEVERE) kam 'Hilfe!'
Neues Ticket: 14:28:56.038 (NORMAL) kam 'Hilfe!'
Neues Ticket: 14:28:56.445 (NORMAL) kam 'Hilfe!'
14:28:56.038 (NORMAL) kam 'Hilfe!'
Neues Ticket: 14:28:57.213 (NORMAL) kam 'Hilfe!'
Neues Ticket: 14:28:57.264 (NORMAL) kam 'Hilfe!'
Neues Ticket: 14:28:57.443 (SEVERE) kam 'Hilfe!'
14:28:57.443 (SEVERE) kam 'Hilfe!'
14:28:56.445 (NORMAL) kam 'Hilfe!'
Neues Ticket: 14:28:58.618 (SEVERE) kam 'Hilfe!'
Neues Ticket: 14:28:59.025 (SEVERE) kam 'Hilfe!'
14:28:58.618 (SEVERE) kam 'Hilfe!'
Neues Ticket: 14:29:00.418 (SEVERE) kam 'Hilfe!'
14:28:59.025 (SEVERE) kam 'Hilfe!'
Interessant ist die fett gedruckte Zeile, denn sie zeigt deutlich, dass – obwohl zwei Tickets vorher generiert wurden – diese durch die spätere Meldung verdrängt werden, da die spätere Meldung eine höhere Wichtigkeit hat als die beiden Meldungen zuvor.
4.12.10Transfer-Warteschlangen – TransferQueue und LinkedTransferQueue
Setzen Produzenten etwas in eine Queue, so ist das in der Regel eine Ablegeoperation, die entkoppelt von der Entnahme ist. Vielleicht muss ein Ableger noch warten, wenn die Queue voll ist, aber sobald Platz ist, kehrt ein Geber-Thread sofort zurück. Ob ein Konsument vorhanden ist, spielt erst einmal keine Rolle.
Eine Unterschnittstelle von BlockingQueue ist TransferQueue. Sie deklariert neue Transfermethoden, die direkt Daten vom Produzenten zum wartenden Konsumenten bringen bzw. melden, wenn es keinen wartenden Thread gibt. Anders als bei den anderen Queues wächst bei den Transfermethoden die Queue nicht, denn ein xxxTransfer(…) ist kein add(…). Die Transfermethoden realisieren immer einen direkten Weg von einem Produzenten zu einem Konsumenten, der etwa mit take() schon auf Daten wartet. Insgesamt deklariert die Schnittstelle drei Übertragungsmethoden:
void transfer(E e) throws InterruptedException
boolean tryTransfer(E e)
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException
Die erste Methode versucht eine Übermittlung, und gibt es keinen wartenden Konsumenten, blockiert transfer(…) so lange, bis das Datum abgeholt ist. Die zweite Methode ist ungeduldiger, denn gibt es keinen Konsumenten, wartet tryTransfer(..) nicht, sondern kehrt direkt mit der Rückgabe false zurück. Die zweite tryTransfer(…)-Variante bietet ein Zeitintervall, in dem ein Konsument die Daten abholen muss.
Die Java SE bietet bisher nur eine realisierende Klasse der Schnittstelle: LinkedTransferQueue.