Java Concurrent
Материал из SEWiki
Содержание
1. Executors
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class Task implements Runnable {
private int counter;
public Task(int num) {
this.counter = num;
}
public void run() {
while (counter-- > 0) {
System.out.println(Thread.currentThread() + ": " + counter);
Thread.yield();
}
}
}
public class Main {
public static void main(String[] args) {
Random rand = new Random();
// ExecutorService exec = Executors.newCachedThreadPool();
ExecutorService exec = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
exec.execute(new Task(Math.abs(rand.nextInt())%10));
}
exec.shutdown();
}
}
2. Callable
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
class CallableTask implements Callable<Integer> {
private int counter;
private final int number;
public CallableTask(int num) {
this.counter = num;
this.number = num;
}
public Integer call() {
while (counter-- > 0) {
System.out.println(Thread.currentThread() + ": " + counter);
Thread.yield();
}
return number;
}
}
public class MainCallable {
public static void main(String[] args) {
ArrayList<Future<Integer>> results = new ArrayList<>();
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
results.add(exec.submit(new CallableTask(i)));
}
exec.shutdown();
for (Future<Integer> fi : results) {
try {
System.out.println(fi.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
try {
System.out.println(fi.get(5, TimeUnit.SECONDS));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
3. ThreadFactory
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
public class MainThreadFactory {
public static void main(String[] args) {
ArrayList<Future<Integer>> results = new ArrayList<Future<Integer>>();
ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory());
for (int i = 0; i < 5; i++) {
results.add(exec.submit(new CallableTask(i*100)));
}
exec.shutdown();
for (Future<Integer> fi : results) {
try {
System.out.println(fi.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
}
4. ThreadLocal
import java.util.concurrent.*;
import java.util.*;
class Accessor implements Runnable {
private final int id;
public Accessor(int idn) { id = idn; }
public void run() {
while(!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
public String toString() {
return "#" + id + ": " +
ThreadLocalVariableHolder.get();
}
}
class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
private Random rand = new Random(47);
protected synchronized Integer initialValue() {
return rand.nextInt(10000);
}
};
public static void increment() {
value.set(value.get() + 1);
}
public static int get() {
return value.get();
}
}
public class ThreadLocalTest {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new Accessor(i));
TimeUnit.SECONDS.sleep(3); // Небольшая задержка
exec.shutdownNow(); // Выход из всех объектов Accessor
}
}
5. CountDownLatch
//: concurrency/CountDownLatchDemo.java
import java.util.concurrent.*;
import java.util.*;
// Часть основной задачи.:
class TaskPortion implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random rand = new Random(47);
private final CountDownLatch latch;
TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
doWork();
latch.countDown();
} catch(InterruptedException ex) {
// Приемлемый вариант выхода
}
}
public void doWork() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
System.out.println(this + "completed");
}
public String toString() {
return String.format("%1$-3d ", id);
}
}
// Ожидание по объекту CountDownLatch:
class WaitingTask implements Runnable {
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch latch;
WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();
System.out.println("Latch barrier passed for " + this);
} catch(InterruptedException ex) {
System.out.println(this + " interrupted");
}
}
public String toString() {
return String.format("WaitingTask %1$-3d ", id);
}
}
public class CountDownLatchDemo {
static final int SIZE = 100;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// Все подзадачи совместно используют один объект CountDownLatch:
CountDownLatch latch = new CountDownLatch(SIZE);
for(int i = 0; i < 10; i++)
exec.execute(new WaitingTask(latch));
for(int i = 0; i < SIZE; i++)
exec.execute(new TaskPortion(latch));
System.out.println("Launched all tasks");
exec.shutdown(); // Выход по завершению всех задач
}
}
6. Cyclicbarrier
//: concurrency/HorseRace.java
// Using CyclicBarriers.
import java.util.concurrent.*;
import java.util.*;
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) { barrier = b; }
public synchronized int getStrides() { return strides; }
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
strides += rand.nextInt(3); // Produces 0, 1 or 2
}
barrier.await();
}
} catch(InterruptedException e) {
// Приемлемый вариант выхода
} catch(BrokenBarrierException e) {
// Исключение, которое нас интересует
throw new RuntimeException(e);
}
}
public String toString() { return "Horse " + id + " "; }
public String tracks() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < getStrides(); i++)
s.append("*");
s.append(id);
return s.toString();
}
}
public class HorseRace {
static final int FINISH_LINE = 25;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec =
Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses, final int pause) {
barrier = new CyclicBarrier(nHorses, new Runnable() {
public void run() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < FINISH_LINE; i++)
s.append("="); // Забор на беговой дорожке
System.out.println(s);
for(Horse horse : horses)
System.out.println(horse.tracks());
for(Horse horse : horses)
if(horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + "won!");
exec.shutdownNow();
return;
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch(InterruptedException e) {
System.out.println("barrier-action sleep interrupted");
}
}
});
for(int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
if(args.length > 0) { // Необязательный аргумент
int n = new Integer(args[0]);
nHorses = n > 0 ? n : nHorses;
}
if(args.length > 1) { // Необязательный аргумент
int p = new Integer(args[1]);
pause = p > -1 ? p : pause;
}
new HorseRace(nHorses, pause);
}
} /* (Execute to see output) *///:~
7. Lock, Condotion
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
8. Atomic & Speed Test
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Speed {
private static final int NUM = 100000000;
Integer notAtomic = 0;
int notAtomic2 = 0;
AtomicInteger atomic = new AtomicInteger(0);
Lock lock = new ReentrantLock();
public synchronized Integer incrementNotAtomic() {
return notAtomic++;
}
public Integer incrementNotAtomic2() {
synchronized (notAtomic) {
notAtomic++;
}
return notAtomic;
}
public Integer incrementNotAtomic2a() {
synchronized (this) {
notAtomic++;
}
return notAtomic;
}
public Integer incrementNotAtomic3() {
lock.lock();
try {
notAtomic++;
} finally {
lock.unlock();
}
return notAtomic;
}
public synchronized Integer incrementNotAtomic4() {
return notAtomic2++;
}
public Integer incrementNotAtomic5() {
synchronized (this) {
notAtomic2++;
}
return notAtomic2;
}
public Integer incrementNotAtomic6() {
lock.lock();
try {
notAtomic2++;
} finally {
lock.unlock();
}
return notAtomic2;
}
public void performTestNotAtomic() {
final long start = System.currentTimeMillis();
for (int i = 0 ; i < NUM ; i++) {
incrementNotAtomic();
}
System.out.println("Integer Synchronized method: "+(System.currentTimeMillis() - start));
}
public void performTestNotAtomic2() {
final long start = System.currentTimeMillis();
for (int i = 0 ; i < NUM ; i++) {
incrementNotAtomic2();
}
System.out.println("Integer Synchronized Integer: "+(System.currentTimeMillis() - start));
}
public void performTestNotAtomic2a() {
final long start = System.currentTimeMillis();
for (int i = 0 ; i < NUM ; i++) {
incrementNotAtomic2a();
}
System.out.println("Integer Synchronized This: "+(System.currentTimeMillis() - start));
}
public void performTestNotAtomic3() {
final long start = System.currentTimeMillis();
for (int i = 0 ; i < NUM ; i++) {
incrementNotAtomic3();
}
System.out.println("Integer Lock: "+(System.currentTimeMillis() - start));
}
public void performTestNotAtomic4() {
final long start = System.currentTimeMillis();
for (int i = 0 ; i < NUM ; i++) {
incrementNotAtomic4();
}
System.out.println("Int Synchronized method: "+(System.currentTimeMillis() - start));
}
public void performTestNotAtomic5() {
final long start = System.currentTimeMillis();
for (int i = 0 ; i < NUM ; i++) {
incrementNotAtomic5();
}
System.out.println("Int Synchronized This: "+(System.currentTimeMillis() - start));
}
public void performTestNotAtomic6() {
final long start = System.currentTimeMillis();
for (int i = 0 ; i < NUM ; i++) {
incrementNotAtomic6();
}
System.out.println("Int Lock: "+(System.currentTimeMillis() - start));
}
public void performTestAtomic() {
final long start = System.currentTimeMillis();
for (int i = 0 ; i < NUM ; i++) {
atomic.getAndIncrement();
}
System.out.println("Atomic: "+(System.currentTimeMillis() - start));
}
public static void main(String[] args) {
Speed s = new Speed();
s.performTestAtomic();
s.performTestNotAtomic();
s.performTestNotAtomic2();
s.performTestNotAtomic2a();
s.performTestNotAtomic3();
s.performTestNotAtomic4();
s.performTestNotAtomic5();
s.performTestNotAtomic6();
}
}