Java Network

Материал из SEWiki
Перейти к: навигация, поиск

Что почитать

Thinking In Java Enterprise (русский перевод) → Сетевое программирование с Сокетами и Каналами

Использование select для высокоскоростного сетевого взаимодействия

1. WhoAmI

import java.net.InetAddress;
import java.net.UnknownHostException;

public class WhoAmI {
	public static void main(String[] args) {
        if (args.length != 1) {
            System.err.println("Usage: WhoAmI MachineName");
            System.exit(1);
        }
		try {
	        InetAddress a = InetAddress.getByName(args[0]);
	        InetAddress b = InetAddress.getByName(null);
	        InetAddress c = InetAddress.getByName("localhost");
	        InetAddress d = InetAddress.getByName("127.0.0.1");	        
		    System.out.println(a);			
		    System.out.println(b + " " + c + " " + d);			
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
	}

}

2. EchoClient

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;


public class EchoClient {
	   public static void main(String[] args) throws IOException {
		      InetAddress addr = InetAddress.getByName(null);
		      System.out.println("addr = " + addr);
		      Socket socket = new Socket(addr, EchoServer.PORT);
		      // Помещаем все в блок try-finally, чтобы
		      // быть уверенным, что сокет закроется:
		      try {
		         System.out.println("socket = " + socket);
		         BufferedReader in = new BufferedReader(new InputStreamReader(socket
		               .getInputStream()));
		         // Вывод автоматически Output быталкивается PrintWriter'ом.
		         PrintWriter out = new PrintWriter(new BufferedWriter(
		               new OutputStreamWriter(socket.getOutputStream())), true);
		         for (int i = 0; i < 10; i++) {
		            out.println("howdy " + i);
		            String str = in.readLine();
		            System.out.println(str);
		         }
		         out.println("END");
		      }
		      finally {
		         System.out.println("closing...");
		         socket.close();
		      }
		   }
}

3. EchoServer

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;


public class EchoServer {
	   // Выбираем порт вне пределов 1-1024:
	   public static final int PORT = 8080;
	  
	   public static void main(String[] args) throws IOException {
	      ServerSocket s = new ServerSocket(PORT);
	      System.out.println("Started: " + s);
	      try {
	         // Блокирует до тех пор, пока не возникнет соединение:
	         Socket socket = s.accept();
	         try {
	            System.out.println("Connection accepted: " + socket);
	            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
	            
	            // Вывод автоматически выталкивается из буфера PrintWriter'ом
	            PrintWriter out = new PrintWriter(new BufferedWriter(
	                  new OutputStreamWriter(socket.getOutputStream())), true);
	            while (true) {
	               String str = in.readLine();
	               if (str.equals("END"))
	                  break;
	               System.out.println("Echoing: " + str);
	               out.println(str);
	            }
	            // Всегда закрываем два сокета...
	         }
	         finally {
	            System.out.println("closing...");
	            socket.close();
	         }
	      }
	      finally {
	         s.close();
	      }
	   }
}

4. MultiEchoServer

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

class OneEchoServer extends Thread {
   private Socket socket;
   private BufferedReader in;
   private PrintWriter out;
  
   public OneEchoServer(Socket s) throws IOException {
      socket = s;
      in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
      // Включаем автоматическое выталкивание:
      out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true);
      // Если любой из вышеприведенных вызовов приведет к
      // возникновению исключения, то вызывающий отвечает за
      // закрытие сокета. В противном случае, нить
      // закроет его.
      start(); // вызываем run()
   }
  
   public void run() {
      try {
         while (true) {
            String str = in.readLine();
            if (str.equals("END"))
               break;
            System.out.println("Echoing: " + str);
            out.println(str);
         }
         System.out.println("closing...");
      }
      catch (IOException e) {
         System.err.println("IO Exception");
      }
      finally {
         try {
            socket.close();
         }
         catch (IOException e) {
            System.err.println("Socket not closed");
         }
      }
   }
}

public class MultiEchoServer {
   static final int PORT = 8080;
  
   public static void main(String[] args) throws IOException {
      ServerSocket s = new ServerSocket(PORT);
      System.out.println("Server Started");
      try {
         while (true) {
            // Блокируется до возникновения нового соединения:
            Socket socket = s.accept();
            try {
               new OneEchoServer(socket);
            }
            catch (IOException e) {
               // Если завершится неудачей, закрывается сокет,
               // в противном случае, нить закроет его:
               socket.close();
            }
         }
      }
      finally {
         s.close();
      }
   }
} // /:~


5. MultiEchoClient

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;

class EchoClientThread extends Thread {
   private Socket socket;
   private BufferedReader in;
   private PrintWriter out;
   private static int counter = 0;
   private int id = counter++;
   private static int threadcount = 0;
  
   public static int threadCount() {
      return threadcount;
   }
  
   public EchoClientThread(InetAddress addr) {
      System.out.println("Making client " + id);
      threadcount++;
      try {
         socket = new Socket(addr, MultiEchoServer.PORT);
      }
      catch (IOException e) {
         System.err.println("Socket failed");
         // Если создание сокета провалилось,
         // ничего ненужно чистить.
      }
      try {
         in = new BufferedReader(new InputStreamReader(socket
               .getInputStream()));
         // Включаем автоматическое выталкивание:
         out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true);
         start();
      }
      catch (IOException e) {
         // Сокет должен быть закрыт при любой
         // ошибке, кроме ошибки конструктора сокета:
         try {
            socket.close();
         }
         catch (IOException e2) {
            System.err.println("Socket not closed");
         }
      }
      // В противном случае сокет будет закрыт
      // в методе run() нити.
   }
  
   public void run() {
      try {
         for (int i = 0; i < 25; i++) {
            out.println("Client " + id + ": " + i);
            try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
            String str = in.readLine();
            System.out.println(str);
         }
         out.println("END");
      }
      catch (IOException e) {
         System.err.println("IO Exception");
      }
      finally {
         // Всегда закрывает:
         try {
            socket.close();
         }
         catch (IOException e) {
            System.err.println("Socket not closed");
         }
         threadcount--; // Завершаем эту нить
      }
   }
}

public class MultiEchoClient {
   static final int MAX_THREADS = 40;
  
   public static void main(String[] args) throws IOException,
         InterruptedException {
      InetAddress addr = InetAddress.getByName(null);
      while (true) {
         if (EchoClientThread.threadCount() < MAX_THREADS) {
            new EchoClientThread(addr);
         }
         Thread.sleep(100);
      }
   }
} // /:~


6. UDPEchoClient

import java.net.*; 

import java.io.*;

public class UDPEchoCLient{
	private InetAddress addr;
	private int port;
	private DatagramSocket ds;
	
	UDPEchoCLient(String host, int port) throws UnknownHostException, SocketException{ 
		this.addr = InetAddress.getByName(host);
		this.port = port; 
		this.ds = new DatagramSocket(); 
	}

	public void sendMessage(String mes){ 
		try{
			byte[] data = mes.getBytes();
			DatagramPacket pack = new DatagramPacket(data, data.length, addr, port); 
			ds.send(pack); 
		} catch(IOException e){
			System.err.println(e); 

		} 
	}
	
	public void close() {
		ds.close();
	}

	public static void main(String[] args) {
		try {
			UDPEchoCLient sndr = new UDPEchoCLient("localhost", 1050);
			for (String mes : args)
				sndr.sendMessage(mes);
			sndr.close();
		} catch (UnknownHostException | SocketException e) {
			e.printStackTrace();
		} 
	} 
}

7. UDPEchoServer

import java.net.DatagramPacket;
import java.net.DatagramSocket;


public class UDPEchoServer {
	public static void main(String[] args) {
		try {
			DatagramSocket ds = new DatagramSocket(1050); 

			while (true){
				DatagramPacket pack = new DatagramPacket(new byte[1024], 1024);
				ds.receive(pack);
				System.out.println(new String(pack.getData())); 
			} 
		} catch (Exception e) {
			System.out.println(e); 
		}  
	}
}

8. NonBlockingIO

import java.net.*;
import java.nio.channels.*;
import java.util.*;
import java.io.*;

/**
* Цель: Показать как использовать селектор. Нет чтения/записи, просто
* показывается готовность к совершению операции.
*
* Алгоритм: -> Создаем селектор. -> Создаем канал -> Связываем сокет,
* ассоциированный с каналом, с <клиентским портом> -> Конфигурируем канал, как
* не блокирующий -> Регестрируем канал в селекторе. -> Вызываем метод select( ),
* чтобы он блокировал выполнение до тех пор, пока канал не будет готов. (как
* это предполагается методом select(long timeout) -> Получаем множество ключей,
* относящихся к готовому каналу для работы, основной интерес состоит в том,
* когда они зарегестрированя с помощью селектора. -> Перебираем ключи. -> Для
* каждого ключа проверяем, что соответствующий канал готов к работе, в которой
* он заинтересован. -> Если он готов, печатаем сообщение о готовности.
*
* Примечание: -> Необходим запущенный MultiJabberServer на локальной машине. Вы
* запускаете его и соединяетесь с локальным MultiJabberServer -> Он может стать
* причиной исключения в MultiJabberServer, но это исключение ожидаемо.
*/
public class NonBlockingIO {
   public static void main(String[] args) throws IOException {
      if (args.length < 2) {
         System.out.println("Usage: java <client port> <local server port>");
         System.exit(1);
      }
      int cPort = Integer.parseInt(args[0]);
      int sPort = Integer.parseInt(args[1]);
      SocketChannel ch = SocketChannel.open();
      Selector sel = Selector.open();
      try {
         ch.socket().bind(new InetSocketAddress(cPort));
         ch.configureBlocking(false);
         // Канал заинтересован в выполнении чтения/записи/соединении
         ch.register(sel, SelectionKey.OP_READ | SelectionKey.OP_WRITE
               | SelectionKey.OP_CONNECT);
         // Разблокируем, когда готовы к чтению/записи/соединению
         sel.select();
         // Ключи, относящиеся к готовому каналу, канал заинтересован
         // в работе, которая может быть выполненаin can be
         // без блокирования.
         Iterator<?> it = sel.selectedKeys().iterator();
         while (it.hasNext()) {
            SelectionKey key = (SelectionKey) it.next();
            it.remove();
            // Если связанный с ключом канал готов к соединению?
            // if((key.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            if (key.isConnectable()) {
               InetAddress ad = InetAddress.getLocalHost();
               System.out.println("Connect will not block");
               // Вы должны проверить возвращаемое значение,
               // чтобы убедиться, что он соединен. Этот не блокированный
               // вызов может вернуться без соединения, когда
               // нет сервера, к которому вы пробуете подключиться
               // Поэтому вы вызываете finishConnect(), который завершает
               // операцию соединения.
               if (!ch.connect(new InetSocketAddress(ad, sPort)))
                  ch.finishConnect();
            }
            // Если канал, связанный с ключом, готов к чтению?
            // if((key.readyOps() & SelectionKey.OP_READ) != 0)
            if (key.isReadable())
               System.out.println("Read will not block");
            // Готов ли канал, связанный с ключом, к записи?
            // if((key.readyOps() & SelectionKey.OP_WRITE) != 0)
            if (key.isWritable())
               System.out.println("Write will not block");
         }
      }
      finally {
         ch.close();
         sel.close();
      }
   }
} // /:~

9. EchoServerNIO

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.*;

/**
* Сервер принимает соединения не блокирующим способом. Когда соединение
* установлено, создается сокет, который регистрируется с селектором для
* чтения/записи. Чтение/запись выполняется над этим сокетом, когда селектор
* разблокируется. Эта программа работает точно так же, как и MultiJabberServer.
*/
public class EchoServerNIO {
   public static final int PORT = 8080;
  
   public static void main(String[] args) throws IOException {
      // Канал будет читать данные в ByteBuffer, посылаемые
      // методом PrintWriter.println(). Декодирование этого потока
      // байт требует кодовой страницы для кодировки по умолчанию.
      Charset cs = Charset.forName(System.getProperty("file.encoding"));
      ByteBuffer buffer = ByteBuffer.allocate(16);
      SocketChannel ch = null;
      ServerSocketChannel ssc = ServerSocketChannel.open();
      Selector sel = Selector.open();
      try {
         ssc.configureBlocking(false);
         // Локальныйы адрес, на котором он будет слушать соединения
         // Примечание: Socket.getChannel() возвращает null, если с ним не
         // ассоциирован канал, как показано ниже.
         // т.е выражение (ssc.socket().getChannel() != null) справедливо
         ssc.socket().bind(new InetSocketAddress(PORT));
         // Канал заинтересован в событиях OP_ACCEPT
         ssc.register(sel, SelectionKey.OP_ACCEPT);
         System.out.println("Server on port: " + PORT);
         while (true) {
            sel.select();
            Iterator<?> it = sel.selectedKeys().iterator();
            while (it.hasNext()) {
               SelectionKey skey = (SelectionKey) it.next();
               it.remove();
               if (skey.isAcceptable()) {
                  ch = ssc.accept();
                  System.out.println("Accepted connection from:"
                        + ch.socket());
                  ch.configureBlocking(false);
                  ch.register(sel, SelectionKey.OP_READ);
               }
               else {
                  // Обратите внимание, что не выполняется проверка, если
                  // в канал можно писать или читать - для упрощения.
                  ch = (SocketChannel) skey.channel();
                  ch.read(buffer);
                  CharBuffer cb = cs.decode((ByteBuffer) buffer.flip());
                  String response = cb.toString();
                  System.out.print("Echoing : " + response);
                  ch.write((ByteBuffer) buffer.rewind());
                  if (response.indexOf("END") != -1)
                     ch.close();
                  buffer.clear();
               }
            }
         }
      }
      finally {
         if (ch != null)
            ch.close();
         ssc.close();
         sel.close();
      }
   }
} // /:~

10. Worker

import java.io.*;
import java.util.logging.*;

public class Worker extends Thread {
   public static final Logger logger = Logger.getLogger("Worker");
   private String workerId;
   private Runnable task;
   // Необходима ссылка на пул нитей в котором существует нить, чтобы
   // нить могла добавить себя в пул нитей по завершению работы.
   private ThreadPool threadpool;
   static {
      try {
         logger.setUseParentHandlers(false);
         FileHandler ferr = new FileHandler("WorkerErr.log");
         ferr.setFormatter(new SimpleFormatter());
         logger.addHandler(ferr);
      }
      catch (IOException e) {
         System.out.println("Logger not initialized..");
      }
   }
  
   public Worker(String id, ThreadPool pool) {
      workerId = id;
      threadpool = pool;
      start();
   }
  
   // ThreadPool, когда ставит в расписание задачу, использует этот метод
   // для делегирования задачи Worker-нити. Кроме того для установки
   // задачи (типа Runnable) он также переключает ожидающий метод
   // run() на начало выполнения задачи.
   public void setTask(Runnable t) {
      task = t;
      synchronized (this) {
         notify();
      }
   }
  
   public void run() {
      try {
         while (!threadpool.isStopped()) {
            synchronized (this) {
               if (task != null) {
                  try {
                     task.run(); // Запускаем задачу
                  }
                  catch (Exception e) {
                     logger.log(Level.SEVERE,
                           "Exception in source Runnable task", e);
                  }
                  // Возвращает себя в пул нитей
                  threadpool.putWorker(this);
               }
               wait();
            }
         }
         System.out.println(this + " Stopped");
      }
      catch (InterruptedException e) {
         throw new RuntimeException(e);
      }
   }
  
   public String toString() {
      return "Worker : " + workerId;
   }
} // /:~

11. ThreadPool

import java.util.*;

public class ThreadPool extends Thread {
   private static final int DEFAULT_NUM_WORKERS = 5;
   private LinkedList<Worker> workerPool = new LinkedList<Worker>();
   private LinkedList<Runnable> taskList = new LinkedList<Runnable>();
   private boolean stopped = false;
  
   public ThreadPool() {
      this(DEFAULT_NUM_WORKERS);
   }
  
   public ThreadPool(int numOfWorkers) {
      for (int i = 0; i < numOfWorkers; i++)
         workerPool.add(new Worker("" + i, this));
      start();
   }
  
   public void run() {
      try {
         while (!stopped) {
            if (taskList.isEmpty()) {
               synchronized (taskList) {
                  // Если очередь пустая, подождать, пока будет добавлена
                  // задача
                  taskList.wait();
               }
            }
            else if (workerPool.isEmpty()) {
               synchronized (workerPool) {
                  // Если нет рабочих нитей, подождать, пока
                  // пока не появится
                  workerPool.wait();
               }
            }
            // Запускаем следующую задачу из расписания задач
            getWorker().setTask((Runnable) taskList.removeLast());
         }
      }
      catch (InterruptedException e) {
         throw new RuntimeException(e);
      }
   }
  
   public void addTask(Runnable task) {
      taskList.addFirst(task);
      synchronized (taskList) {
         taskList.notify(); // Если добавлена новая задача, уведомляем
      }
   }
  
   public void putWorker(Worker worker) {
      workerPool.addFirst(worker);
      // Здесь может быть случай, когда вы будете иметь пул из 5 нитей,
      // а будет требоваться больше. Это происходит тогда, когда требуется
      // рабочая нить,
      // но ее нет (свободной), тогда просто блокируем пул нитей.
      // Это событие, при котором появляется свободная рабочая нить в пуле
      // нитей
      // Поэтому эта нить посылает уведомление и разблокирует
      // нить ThreadPool, ожидающую пул нитей
      synchronized (workerPool) {
         workerPool.notify();
      }
   }
  
   private Worker getWorker() {
      return (Worker) workerPool.removeLast();
   }
  
   public boolean isStopped() {
      return stopped;
   }
  
   public void stopThreads() {
      stopped = true;
      Iterator<Worker> it = workerPool.iterator();
      while (it.hasNext()) {
         Worker w = it.next();
         synchronized (w) {
            w.notify();
         }
      }
   } // Junit test
  
   public void testThreadPool() {
      ThreadPool tp = new ThreadPool();
      for (int i = 0; i < 10; i++) {
         tp.addTask(new Runnable() {
            public void run() {
               System.out.println("A");
            }
         });
      }
      tp.stopThreads();
   }
} // /:~

12. MultiEchoServerPool

import java.io.*;

import java.net.*;

import java.nio.*;

import java.nio.channels.*;

import java.nio.charset.*;

import java.util.*;

class ServeOneEcho implements Runnable {
   private SocketChannel channel;
   private Selector sel;
  
   public ServeOneEcho(SocketChannel ch) throws IOException {
      channel = ch;
      sel = Selector.open();
   }
  
   public void run() {
      ByteBuffer buffer = ByteBuffer.allocate(16);
      boolean read = false, done = false;
      String response = null;
      try {
         channel.register(sel, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
         while (!done) {
            sel.select();
            Iterator<?> it = sel.selectedKeys().iterator();
            while (it.hasNext()) {
               SelectionKey key = (SelectionKey) it.next();
               it.remove();
               if (key.isReadable() && !read) {
                  if (channel.read(buffer) > 0)
                     read = true;
                  CharBuffer cb = MultiEchoServerPool.CS
                        .decode((ByteBuffer) buffer.flip());
                  response = cb.toString();
               }
               if (key.isWritable() && read) {
                  System.out.print("Echoing : " + response);
                  channel.write((ByteBuffer) buffer.rewind());
                  if (response.indexOf("END") != -1)
                     done = true;
                  buffer.clear();
                  read = false;
               }
            }
         }
      }
      catch (IOException e) {
         // будет поймано Worker.java и залогировано.
         // Необходимо выбросить исключение времени выполнения, так как мы не
         // можем
         // оставить IOException
         throw new RuntimeException(e);
      }
      finally {
         try {
            channel.close();
         }
         catch (IOException e) {
            System.out.println("Channel not closed.");
            // Выбрасываем это, чтобы рабочая нить могла залогировать.
            throw new RuntimeException(e);
         }
      }
   }
}

public class MultiEchoServerPool {
   public static final int PORT = 8080;
   private static String encoding = System.getProperty("file.encoding");
   public static final Charset CS = Charset.forName(encoding);
   // Создаем пул нитей с 20 рабочими нитями.
   private static ThreadPool pool = new ThreadPool(20);
  
   public static void main(String[] args) throws IOException {
      ServerSocketChannel ssc = ServerSocketChannel.open();
      Selector sel = Selector.open();
      try {
         ssc.configureBlocking(false);
         ssc.socket().bind(new InetSocketAddress(PORT));
         ssc.register(sel, SelectionKey.OP_ACCEPT);
         System.out.println("Server on port: " + PORT);
         while (true) {
            sel.select();
            Iterator<?> it = sel.selectedKeys().iterator();
            while (it.hasNext()) {
               SelectionKey skey = (SelectionKey) it.next();
               it.remove();
               if (skey.isAcceptable()) {
                  SocketChannel channel = ssc.accept();
                  System.out.println("Accepted connection from:"
                        + channel.socket());
                  channel.configureBlocking(false);
                  // Отделяем события и ассоциированное действие
                  pool.addTask(new ServeOneEcho(channel));
               }
            }
         }
      }
      finally {
         ssc.close();
         sel.close();
      }
   }
} // /:~