Java Network
Материал из SEWiki
Содержание
Что почитать
Thinking In Java Enterprise (русский перевод) → Сетевое программирование с Сокетами и Каналами
Использование select для высокоскоростного сетевого взаимодействия
1. WhoAmI
import java.net.InetAddress;
import java.net.UnknownHostException;
public class WhoAmI {
public static void main(String[] args) {
if (args.length != 1) {
System.err.println("Usage: WhoAmI MachineName");
System.exit(1);
}
try {
InetAddress a = InetAddress.getByName(args[0]);
InetAddress b = InetAddress.getByName(null);
InetAddress c = InetAddress.getByName("localhost");
InetAddress d = InetAddress.getByName("127.0.0.1");
System.out.println(a);
System.out.println(b + " " + c + " " + d);
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
2. EchoClient
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
public class EchoClient {
public static void main(String[] args) throws IOException {
InetAddress addr = InetAddress.getByName(null);
System.out.println("addr = " + addr);
Socket socket = new Socket(addr, EchoServer.PORT);
// Помещаем все в блок try-finally, чтобы
// быть уверенным, что сокет закроется:
try {
System.out.println("socket = " + socket);
BufferedReader in = new BufferedReader(new InputStreamReader(socket
.getInputStream()));
// Вывод автоматически Output быталкивается PrintWriter'ом.
PrintWriter out = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())), true);
for (int i = 0; i < 10; i++) {
out.println("howdy " + i);
String str = in.readLine();
System.out.println(str);
}
out.println("END");
}
finally {
System.out.println("closing...");
socket.close();
}
}
}
3. EchoServer
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class EchoServer {
// Выбираем порт вне пределов 1-1024:
public static final int PORT = 8080;
public static void main(String[] args) throws IOException {
ServerSocket s = new ServerSocket(PORT);
System.out.println("Started: " + s);
try {
// Блокирует до тех пор, пока не возникнет соединение:
Socket socket = s.accept();
try {
System.out.println("Connection accepted: " + socket);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// Вывод автоматически выталкивается из буфера PrintWriter'ом
PrintWriter out = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())), true);
while (true) {
String str = in.readLine();
if (str.equals("END"))
break;
System.out.println("Echoing: " + str);
out.println(str);
}
// Всегда закрываем два сокета...
}
finally {
System.out.println("closing...");
socket.close();
}
}
finally {
s.close();
}
}
}
4. MultiEchoServer
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
class OneEchoServer extends Thread {
private Socket socket;
private BufferedReader in;
private PrintWriter out;
public OneEchoServer(Socket s) throws IOException {
socket = s;
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// Включаем автоматическое выталкивание:
out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true);
// Если любой из вышеприведенных вызовов приведет к
// возникновению исключения, то вызывающий отвечает за
// закрытие сокета. В противном случае, нить
// закроет его.
start(); // вызываем run()
}
public void run() {
try {
while (true) {
String str = in.readLine();
if (str.equals("END"))
break;
System.out.println("Echoing: " + str);
out.println(str);
}
System.out.println("closing...");
}
catch (IOException e) {
System.err.println("IO Exception");
}
finally {
try {
socket.close();
}
catch (IOException e) {
System.err.println("Socket not closed");
}
}
}
}
public class MultiEchoServer {
static final int PORT = 8080;
public static void main(String[] args) throws IOException {
ServerSocket s = new ServerSocket(PORT);
System.out.println("Server Started");
try {
while (true) {
// Блокируется до возникновения нового соединения:
Socket socket = s.accept();
try {
new OneEchoServer(socket);
}
catch (IOException e) {
// Если завершится неудачей, закрывается сокет,
// в противном случае, нить закроет его:
socket.close();
}
}
}
finally {
s.close();
}
}
} // /:~
5. MultiEchoClient
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
class EchoClientThread extends Thread {
private Socket socket;
private BufferedReader in;
private PrintWriter out;
private static int counter = 0;
private int id = counter++;
private static int threadcount = 0;
public static int threadCount() {
return threadcount;
}
public EchoClientThread(InetAddress addr) {
System.out.println("Making client " + id);
threadcount++;
try {
socket = new Socket(addr, MultiEchoServer.PORT);
}
catch (IOException e) {
System.err.println("Socket failed");
// Если создание сокета провалилось,
// ничего ненужно чистить.
}
try {
in = new BufferedReader(new InputStreamReader(socket
.getInputStream()));
// Включаем автоматическое выталкивание:
out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true);
start();
}
catch (IOException e) {
// Сокет должен быть закрыт при любой
// ошибке, кроме ошибки конструктора сокета:
try {
socket.close();
}
catch (IOException e2) {
System.err.println("Socket not closed");
}
}
// В противном случае сокет будет закрыт
// в методе run() нити.
}
public void run() {
try {
for (int i = 0; i < 25; i++) {
out.println("Client " + id + ": " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
String str = in.readLine();
System.out.println(str);
}
out.println("END");
}
catch (IOException e) {
System.err.println("IO Exception");
}
finally {
// Всегда закрывает:
try {
socket.close();
}
catch (IOException e) {
System.err.println("Socket not closed");
}
threadcount--; // Завершаем эту нить
}
}
}
public class MultiEchoClient {
static final int MAX_THREADS = 40;
public static void main(String[] args) throws IOException,
InterruptedException {
InetAddress addr = InetAddress.getByName(null);
while (true) {
if (EchoClientThread.threadCount() < MAX_THREADS) {
new EchoClientThread(addr);
}
Thread.sleep(100);
}
}
} // /:~
6. UDPEchoClient
import java.net.*;
import java.io.*;
public class UDPEchoCLient{
private InetAddress addr;
private int port;
private DatagramSocket ds;
UDPEchoCLient(String host, int port) throws UnknownHostException, SocketException{
this.addr = InetAddress.getByName(host);
this.port = port;
this.ds = new DatagramSocket();
}
public void sendMessage(String mes){
try{
byte[] data = mes.getBytes();
DatagramPacket pack = new DatagramPacket(data, data.length, addr, port);
ds.send(pack);
} catch(IOException e){
System.err.println(e);
}
}
public void close() {
ds.close();
}
public static void main(String[] args) {
try {
UDPEchoCLient sndr = new UDPEchoCLient("localhost", 1050);
for (String mes : args)
sndr.sendMessage(mes);
sndr.close();
} catch (UnknownHostException | SocketException e) {
e.printStackTrace();
}
}
}
7. UDPEchoServer
import java.net.DatagramPacket;
import java.net.DatagramSocket;
public class UDPEchoServer {
public static void main(String[] args) {
try {
DatagramSocket ds = new DatagramSocket(1050);
while (true){
DatagramPacket pack = new DatagramPacket(new byte[1024], 1024);
ds.receive(pack);
System.out.println(new String(pack.getData()));
}
} catch (Exception e) {
System.out.println(e);
}
}
}
8. NonBlockingIO
import java.net.*;
import java.nio.channels.*;
import java.util.*;
import java.io.*;
/**
* Цель: Показать как использовать селектор. Нет чтения/записи, просто
* показывается готовность к совершению операции.
*
* Алгоритм: -> Создаем селектор. -> Создаем канал -> Связываем сокет,
* ассоциированный с каналом, с <клиентским портом> -> Конфигурируем канал, как
* не блокирующий -> Регестрируем канал в селекторе. -> Вызываем метод select( ),
* чтобы он блокировал выполнение до тех пор, пока канал не будет готов. (как
* это предполагается методом select(long timeout) -> Получаем множество ключей,
* относящихся к готовому каналу для работы, основной интерес состоит в том,
* когда они зарегестрированя с помощью селектора. -> Перебираем ключи. -> Для
* каждого ключа проверяем, что соответствующий канал готов к работе, в которой
* он заинтересован. -> Если он готов, печатаем сообщение о готовности.
*
* Примечание: -> Необходим запущенный MultiJabberServer на локальной машине. Вы
* запускаете его и соединяетесь с локальным MultiJabberServer -> Он может стать
* причиной исключения в MultiJabberServer, но это исключение ожидаемо.
*/
public class NonBlockingIO {
public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.out.println("Usage: java <client port> <local server port>");
System.exit(1);
}
int cPort = Integer.parseInt(args[0]);
int sPort = Integer.parseInt(args[1]);
SocketChannel ch = SocketChannel.open();
Selector sel = Selector.open();
try {
ch.socket().bind(new InetSocketAddress(cPort));
ch.configureBlocking(false);
// Канал заинтересован в выполнении чтения/записи/соединении
ch.register(sel, SelectionKey.OP_READ | SelectionKey.OP_WRITE
| SelectionKey.OP_CONNECT);
// Разблокируем, когда готовы к чтению/записи/соединению
sel.select();
// Ключи, относящиеся к готовому каналу, канал заинтересован
// в работе, которая может быть выполненаin can be
// без блокирования.
Iterator<?> it = sel.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
it.remove();
// Если связанный с ключом канал готов к соединению?
// if((key.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (key.isConnectable()) {
InetAddress ad = InetAddress.getLocalHost();
System.out.println("Connect will not block");
// Вы должны проверить возвращаемое значение,
// чтобы убедиться, что он соединен. Этот не блокированный
// вызов может вернуться без соединения, когда
// нет сервера, к которому вы пробуете подключиться
// Поэтому вы вызываете finishConnect(), который завершает
// операцию соединения.
if (!ch.connect(new InetSocketAddress(ad, sPort)))
ch.finishConnect();
}
// Если канал, связанный с ключом, готов к чтению?
// if((key.readyOps() & SelectionKey.OP_READ) != 0)
if (key.isReadable())
System.out.println("Read will not block");
// Готов ли канал, связанный с ключом, к записи?
// if((key.readyOps() & SelectionKey.OP_WRITE) != 0)
if (key.isWritable())
System.out.println("Write will not block");
}
}
finally {
ch.close();
sel.close();
}
}
} // /:~
9. EchoServerNIO
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.*;
/**
* Сервер принимает соединения не блокирующим способом. Когда соединение
* установлено, создается сокет, который регистрируется с селектором для
* чтения/записи. Чтение/запись выполняется над этим сокетом, когда селектор
* разблокируется. Эта программа работает точно так же, как и MultiJabberServer.
*/
public class EchoServerNIO {
public static final int PORT = 8080;
public static void main(String[] args) throws IOException {
// Канал будет читать данные в ByteBuffer, посылаемые
// методом PrintWriter.println(). Декодирование этого потока
// байт требует кодовой страницы для кодировки по умолчанию.
Charset cs = Charset.forName(System.getProperty("file.encoding"));
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel ch = null;
ServerSocketChannel ssc = ServerSocketChannel.open();
Selector sel = Selector.open();
try {
ssc.configureBlocking(false);
// Локальныйы адрес, на котором он будет слушать соединения
// Примечание: Socket.getChannel() возвращает null, если с ним не
// ассоциирован канал, как показано ниже.
// т.е выражение (ssc.socket().getChannel() != null) справедливо
ssc.socket().bind(new InetSocketAddress(PORT));
// Канал заинтересован в событиях OP_ACCEPT
ssc.register(sel, SelectionKey.OP_ACCEPT);
System.out.println("Server on port: " + PORT);
while (true) {
sel.select();
Iterator<?> it = sel.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey skey = (SelectionKey) it.next();
it.remove();
if (skey.isAcceptable()) {
ch = ssc.accept();
System.out.println("Accepted connection from:"
+ ch.socket());
ch.configureBlocking(false);
ch.register(sel, SelectionKey.OP_READ);
}
else {
// Обратите внимание, что не выполняется проверка, если
// в канал можно писать или читать - для упрощения.
ch = (SocketChannel) skey.channel();
ch.read(buffer);
CharBuffer cb = cs.decode((ByteBuffer) buffer.flip());
String response = cb.toString();
System.out.print("Echoing : " + response);
ch.write((ByteBuffer) buffer.rewind());
if (response.indexOf("END") != -1)
ch.close();
buffer.clear();
}
}
}
}
finally {
if (ch != null)
ch.close();
ssc.close();
sel.close();
}
}
} // /:~
10. Worker
import java.io.*;
import java.util.logging.*;
public class Worker extends Thread {
public static final Logger logger = Logger.getLogger("Worker");
private String workerId;
private Runnable task;
// Необходима ссылка на пул нитей в котором существует нить, чтобы
// нить могла добавить себя в пул нитей по завершению работы.
private ThreadPool threadpool;
static {
try {
logger.setUseParentHandlers(false);
FileHandler ferr = new FileHandler("WorkerErr.log");
ferr.setFormatter(new SimpleFormatter());
logger.addHandler(ferr);
}
catch (IOException e) {
System.out.println("Logger not initialized..");
}
}
public Worker(String id, ThreadPool pool) {
workerId = id;
threadpool = pool;
start();
}
// ThreadPool, когда ставит в расписание задачу, использует этот метод
// для делегирования задачи Worker-нити. Кроме того для установки
// задачи (типа Runnable) он также переключает ожидающий метод
// run() на начало выполнения задачи.
public void setTask(Runnable t) {
task = t;
synchronized (this) {
notify();
}
}
public void run() {
try {
while (!threadpool.isStopped()) {
synchronized (this) {
if (task != null) {
try {
task.run(); // Запускаем задачу
}
catch (Exception e) {
logger.log(Level.SEVERE,
"Exception in source Runnable task", e);
}
// Возвращает себя в пул нитей
threadpool.putWorker(this);
}
wait();
}
}
System.out.println(this + " Stopped");
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public String toString() {
return "Worker : " + workerId;
}
} // /:~
11. ThreadPool
import java.util.*;
public class ThreadPool extends Thread {
private static final int DEFAULT_NUM_WORKERS = 5;
private LinkedList<Worker> workerPool = new LinkedList<Worker>();
private LinkedList<Runnable> taskList = new LinkedList<Runnable>();
private boolean stopped = false;
public ThreadPool() {
this(DEFAULT_NUM_WORKERS);
}
public ThreadPool(int numOfWorkers) {
for (int i = 0; i < numOfWorkers; i++)
workerPool.add(new Worker("" + i, this));
start();
}
public void run() {
try {
while (!stopped) {
if (taskList.isEmpty()) {
synchronized (taskList) {
// Если очередь пустая, подождать, пока будет добавлена
// задача
taskList.wait();
}
}
else if (workerPool.isEmpty()) {
synchronized (workerPool) {
// Если нет рабочих нитей, подождать, пока
// пока не появится
workerPool.wait();
}
}
// Запускаем следующую задачу из расписания задач
getWorker().setTask((Runnable) taskList.removeLast());
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void addTask(Runnable task) {
taskList.addFirst(task);
synchronized (taskList) {
taskList.notify(); // Если добавлена новая задача, уведомляем
}
}
public void putWorker(Worker worker) {
workerPool.addFirst(worker);
// Здесь может быть случай, когда вы будете иметь пул из 5 нитей,
// а будет требоваться больше. Это происходит тогда, когда требуется
// рабочая нить,
// но ее нет (свободной), тогда просто блокируем пул нитей.
// Это событие, при котором появляется свободная рабочая нить в пуле
// нитей
// Поэтому эта нить посылает уведомление и разблокирует
// нить ThreadPool, ожидающую пул нитей
synchronized (workerPool) {
workerPool.notify();
}
}
private Worker getWorker() {
return (Worker) workerPool.removeLast();
}
public boolean isStopped() {
return stopped;
}
public void stopThreads() {
stopped = true;
Iterator<Worker> it = workerPool.iterator();
while (it.hasNext()) {
Worker w = it.next();
synchronized (w) {
w.notify();
}
}
} // Junit test
public void testThreadPool() {
ThreadPool tp = new ThreadPool();
for (int i = 0; i < 10; i++) {
tp.addTask(new Runnable() {
public void run() {
System.out.println("A");
}
});
}
tp.stopThreads();
}
} // /:~
12. MultiEchoServerPool
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.*;
class ServeOneEcho implements Runnable {
private SocketChannel channel;
private Selector sel;
public ServeOneEcho(SocketChannel ch) throws IOException {
channel = ch;
sel = Selector.open();
}
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(16);
boolean read = false, done = false;
String response = null;
try {
channel.register(sel, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
while (!done) {
sel.select();
Iterator<?> it = sel.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
it.remove();
if (key.isReadable() && !read) {
if (channel.read(buffer) > 0)
read = true;
CharBuffer cb = MultiEchoServerPool.CS
.decode((ByteBuffer) buffer.flip());
response = cb.toString();
}
if (key.isWritable() && read) {
System.out.print("Echoing : " + response);
channel.write((ByteBuffer) buffer.rewind());
if (response.indexOf("END") != -1)
done = true;
buffer.clear();
read = false;
}
}
}
}
catch (IOException e) {
// будет поймано Worker.java и залогировано.
// Необходимо выбросить исключение времени выполнения, так как мы не
// можем
// оставить IOException
throw new RuntimeException(e);
}
finally {
try {
channel.close();
}
catch (IOException e) {
System.out.println("Channel not closed.");
// Выбрасываем это, чтобы рабочая нить могла залогировать.
throw new RuntimeException(e);
}
}
}
}
public class MultiEchoServerPool {
public static final int PORT = 8080;
private static String encoding = System.getProperty("file.encoding");
public static final Charset CS = Charset.forName(encoding);
// Создаем пул нитей с 20 рабочими нитями.
private static ThreadPool pool = new ThreadPool(20);
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
Selector sel = Selector.open();
try {
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(PORT));
ssc.register(sel, SelectionKey.OP_ACCEPT);
System.out.println("Server on port: " + PORT);
while (true) {
sel.select();
Iterator<?> it = sel.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey skey = (SelectionKey) it.next();
it.remove();
if (skey.isAcceptable()) {
SocketChannel channel = ssc.accept();
System.out.println("Accepted connection from:"
+ channel.socket());
channel.configureBlocking(false);
// Отделяем события и ассоциированное действие
pool.addTask(new ServeOneEcho(channel));
}
}
}
}
finally {
ssc.close();
sel.close();
}
}
} // /:~