Java Concurrent

Материал из SEWiki
Перейти к: навигация, поиск

1. Executors

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Task implements Runnable {
	private int counter;
	public Task(int num) {
		this.counter = num;
	}
	public void run() {
		while (counter-- > 0) {
			System.out.println(Thread.currentThread() + ": " + counter);
			Thread.yield();
		}
	}
}

public class Main {
	public static void main(String[] args) {
		Random rand = new Random();
//		ExecutorService exec = Executors.newCachedThreadPool();
		ExecutorService exec = Executors.newFixedThreadPool(2);
		for (int i = 0; i < 5; i++) {
			exec.execute(new Task(Math.abs(rand.nextInt())%10));
		}
		exec.shutdown();
	}
}

2. Callable

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

class CallableTask implements Callable<Integer> {
	private int counter;
	private final int number;
	public CallableTask(int num) {
		this.counter = num;
		this.number = num;
	}
	public Integer call() {
		while (counter-- > 0) {
			System.out.println(Thread.currentThread() + ": " + counter);
			Thread.yield();
		}		
		return number;
	}
}

public class MainCallable {
	public static void main(String[] args) {
		ArrayList<Future<Integer>> results = new ArrayList<>();
		
		ExecutorService exec = Executors.newCachedThreadPool();
		for (int i = 0; i < 5; i++) {
			results.add(exec.submit(new CallableTask(i)));
		}
		exec.shutdown();
		
		for (Future<Integer> fi : results) {
			try {
				System.out.println(fi.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}

			try {
				System.out.println(fi.get(5, TimeUnit.SECONDS));
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			} catch (TimeoutException e) {
				e.printStackTrace();
			}
		
		}
	}

}

3. ThreadFactory

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;

class DaemonThreadFactory implements ThreadFactory {
	public Thread newThread(Runnable r) {
		Thread t = new Thread(r);
		t.setDaemon(true);
		return t;
	}
	
}

public class MainThreadFactory {
	public static void main(String[] args) {
		ArrayList<Future<Integer>> results = new ArrayList<Future<Integer>>();
		
		ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory());
		for (int i = 0; i < 5; i++) {
			results.add(exec.submit(new CallableTask(i*100)));
		}
		exec.shutdown();
		
		for (Future<Integer> fi : results) {
			try {
				System.out.println(fi.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}
	}

}

4. ThreadLocal

import java.util.concurrent.*;
import java.util.*;
 
class Accessor implements Runnable {
  private final int id;
  public Accessor(int idn) { id = idn; }
  public void run() {
    while(!Thread.currentThread().isInterrupted()) {
      ThreadLocalVariableHolder.increment();
      System.out.println(this);
      Thread.yield();
    }
  }
  public String toString() {
    return "#" + id + ": " +
      ThreadLocalVariableHolder.get();
  }
}
 
class ThreadLocalVariableHolder {
	private static ThreadLocal<Integer> value = new ThreadLocal<Integer>()	{
		private Random rand = new Random(47);
		protected synchronized Integer initialValue() {
			return rand.nextInt(10000);
		}
	};
	public static void increment() {
		value.set(value.get() + 1);
	}
	public static int get() {
		return value.get();
	}	
}

public class ThreadLocalTest {

  public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newCachedThreadPool();
    for(int i = 0; i < 5; i++)
      exec.execute(new Accessor(i));
    TimeUnit.SECONDS.sleep(3);   // Небольшая задержка
    exec.shutdownNow();          // Выход из всех объектов Accessor
  }
}

5. CountDownLatch

//: concurrency/CountDownLatchDemo.java
import java.util.concurrent.*;
import java.util.*;
 
// Часть основной задачи.:
class TaskPortion implements Runnable {
  private static int counter = 0;
  private final int id = counter++;
  private static Random rand = new Random(47);
  private final CountDownLatch latch;
  TaskPortion(CountDownLatch latch) {
    this.latch = latch;
  }
  public void run() {
    try {
      doWork();
      latch.countDown();
    } catch(InterruptedException ex) {
      // Приемлемый вариант выхода
    }
  }
  public void doWork() throws InterruptedException {
    TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
    System.out.println(this + "completed");
  }
  public String toString() {
    return String.format("%1$-3d ", id);
  }
}
 
// Ожидание по объекту CountDownLatch:
class WaitingTask implements Runnable {
  private static int counter = 0;
  private final int id = counter++;
  private final CountDownLatch latch;
  WaitingTask(CountDownLatch latch) {
    this.latch = latch;
  }
  public void run() {
    try {
      latch.await();
      System.out.println("Latch barrier passed for " + this);
    } catch(InterruptedException ex) {
      System.out.println(this + " interrupted");
    }
  }
  public String toString() {
    return String.format("WaitingTask %1$-3d ", id);
  }
}
 
public class CountDownLatchDemo {
  static final int SIZE = 100;
  public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newCachedThreadPool();
    // Все подзадачи совместно используют один объект CountDownLatch:
    CountDownLatch latch = new CountDownLatch(SIZE);
    for(int i = 0; i < 10; i++)
      exec.execute(new WaitingTask(latch));
    for(int i = 0; i < SIZE; i++)
      exec.execute(new TaskPortion(latch));
    System.out.println("Launched all tasks");
    exec.shutdown(); // Выход по завершению всех задач
  }
}

6. Cyclicbarrier

//: concurrency/HorseRace.java
// Using CyclicBarriers.
import java.util.concurrent.*;
import java.util.*;
 
class Horse implements Runnable {
  private static int counter = 0;
  private final int id = counter++;
  private int strides = 0;
  private static Random rand = new Random(47);
  private static CyclicBarrier barrier;
  public Horse(CyclicBarrier b) { barrier = b; }
  public synchronized int getStrides() { return strides; }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        synchronized(this) {
          strides += rand.nextInt(3); // Produces 0, 1 or 2
        }
        barrier.await();
      }
    } catch(InterruptedException e) {
      // Приемлемый вариант выхода
    } catch(BrokenBarrierException e) {
      // Исключение, которое нас интересует
      throw new RuntimeException(e);
    }
  }
  public String toString() { return "Horse " + id + " "; }
  public String tracks() {
    StringBuilder s = new StringBuilder();
    for(int i = 0; i < getStrides(); i++)
      s.append("*");
    s.append(id);
    return s.toString();
  }
}
 
public class HorseRace {
  static final int FINISH_LINE = 25;
  private List<Horse> horses = new ArrayList<Horse>();
  private ExecutorService exec =
    Executors.newCachedThreadPool();
  private CyclicBarrier barrier;
  public HorseRace(int nHorses, final int pause) {
    barrier = new CyclicBarrier(nHorses, new Runnable() {
      public void run() {
        StringBuilder s = new StringBuilder();
        for(int i = 0; i < FINISH_LINE; i++)
          s.append("="); // Забор на беговой дорожке
        System.out.println(s);
        for(Horse horse : horses)
        	System.out.println(horse.tracks());
        for(Horse horse : horses)
          if(horse.getStrides() >= FINISH_LINE) {
        	  System.out.println(horse + "won!");
            exec.shutdownNow();
            return;
          }
        try {
          TimeUnit.MILLISECONDS.sleep(pause);
        } catch(InterruptedException e) {
        	System.out.println("barrier-action sleep interrupted");
        }
      }
    });
    for(int i = 0; i < nHorses; i++) {
      Horse horse = new Horse(barrier);
      horses.add(horse);
      exec.execute(horse);
    }
  }
  public static void main(String[] args) {
    int nHorses = 7;
    int pause = 200;
    if(args.length > 0) { // Необязательный аргумент
      int n = new Integer(args[0]);
      nHorses = n > 0 ? n : nHorses;
    }
    if(args.length > 1) { // Необязательный аргумент
      int p = new Integer(args[1]);
      pause = p > -1 ? p : pause;
    }
    new HorseRace(nHorses, pause);
  }
} /* (Execute to see output) *///:~

7. Lock, Condotion

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length) 
         notFull.await();
       items[putptr] = x; 
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0) 
         notEmpty.await();
       Object x = items[takeptr]; 
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   } 
}

8. Atomic & Speed Test

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class Speed {
	private static final int NUM = 100000000;
	Integer notAtomic = 0;
	int notAtomic2 = 0;
	AtomicInteger atomic = new AtomicInteger(0);
	Lock lock = new ReentrantLock();
	public synchronized Integer incrementNotAtomic() {
	    return notAtomic++;
	}

	public Integer incrementNotAtomic2() {
		synchronized (notAtomic) {
			notAtomic++;
		}
		return notAtomic;
	}
	
	public Integer incrementNotAtomic2a() {
		synchronized (this) {
			notAtomic++;
		}
		return notAtomic;
	}
	
	public Integer incrementNotAtomic3() {
		lock.lock();
		try {
			notAtomic++;
		} finally {
			lock.unlock();
		}
		return notAtomic;
	}
	
	public synchronized Integer incrementNotAtomic4() {
	    return notAtomic2++;
	}

	public Integer incrementNotAtomic5() {
		synchronized (this) {
			notAtomic2++;
		}
		return notAtomic2;
	}
	
	public Integer incrementNotAtomic6() {
		lock.lock();
		try {
			notAtomic2++;
		} finally {
			lock.unlock();
		}
		return notAtomic2;
	}

	public void performTestNotAtomic() {
	    final long start = System.currentTimeMillis();
	    for (int i = 0 ; i < NUM ; i++) {
	        incrementNotAtomic();
	    }
	    System.out.println("Integer Synchronized method: "+(System.currentTimeMillis() - start));
	}

	public void performTestNotAtomic2() {
	    final long start = System.currentTimeMillis();
	    for (int i = 0 ; i < NUM ; i++) {
	        incrementNotAtomic2();
	    }
	    System.out.println("Integer Synchronized Integer: "+(System.currentTimeMillis() - start));
	}

	public void performTestNotAtomic2a() {
	    final long start = System.currentTimeMillis();
	    for (int i = 0 ; i < NUM ; i++) {
	        incrementNotAtomic2a();
	    }
	    System.out.println("Integer Synchronized This: "+(System.currentTimeMillis() - start));
	}

	public void performTestNotAtomic3() {
	    final long start = System.currentTimeMillis();
	    for (int i = 0 ; i < NUM ; i++) {
	        incrementNotAtomic3();
	    }
	    System.out.println("Integer Lock: "+(System.currentTimeMillis() - start));
	}

	public void performTestNotAtomic4() {
	    final long start = System.currentTimeMillis();
	    for (int i = 0 ; i < NUM ; i++) {
	        incrementNotAtomic4();
	    }
	    System.out.println("Int Synchronized method: "+(System.currentTimeMillis() - start));
	}

	public void performTestNotAtomic5() {
	    final long start = System.currentTimeMillis();
	    for (int i = 0 ; i < NUM ; i++) {
	        incrementNotAtomic5();
	    }
	    System.out.println("Int Synchronized This: "+(System.currentTimeMillis() - start));
	}

	public void performTestNotAtomic6() {
	    final long start = System.currentTimeMillis();
	    for (int i = 0 ; i < NUM ; i++) {
	        incrementNotAtomic6();
	    }
	    System.out.println("Int Lock: "+(System.currentTimeMillis() - start));
	}

	public void performTestAtomic() {
	    final long start = System.currentTimeMillis();
	    for (int i = 0 ; i < NUM ; i++) {
	        atomic.getAndIncrement();
	    }
	    System.out.println("Atomic: "+(System.currentTimeMillis() - start));
	}

	public static void main(String[] args) {
		Speed s = new Speed();
		s.performTestAtomic();
		s.performTestNotAtomic();
		s.performTestNotAtomic2();
		s.performTestNotAtomic2a();
		s.performTestNotAtomic3();
		s.performTestNotAtomic4();
		s.performTestNotAtomic5();
		s.performTestNotAtomic6();
	}

}