`

java分布式开发TCP/IP NIO无阻塞 Socket((基于消息方式实现系统间的通信) )(转)

 
阅读更多

在java中可以基于java.nio.channels中的Channel和Selector的相关类来实现TCP/IP+NIO方式的系统间通信。

 

用于系统间通信依靠SocketChannel和ServerSocketChannel,SocketChannel用于建立连接,监听事件及操作读写,ServerSocketChannel用于监听端口及监听连接事件,可通过Selector来获取是否有要处理的事件。

 

服务端java代码:

Java代码  收藏代码
  1. package com.java.distributed.message.tcpip;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.InetSocketAddress;  
  5. import java.net.ServerSocket;  
  6. import java.nio.ByteBuffer;  
  7. import java.nio.channels.SelectionKey;  
  8. import java.nio.channels.Selector;  
  9. import java.nio.channels.ServerSocketChannel;  
  10. import java.nio.channels.SocketChannel;  
  11. import java.nio.charset.Charset;  
  12.   
  13. public class NIOServer {  
  14.   
  15.     /** 
  16.      * @param args 
  17.      * @throws IOException  
  18.      */  
  19.     public static void main(String[] args) throws IOException {  
  20.         int port =7889;  
  21.         //打开选择器  
  22.         Selector selector=Selector.open();  
  23.         //打开服务器套接字通道  
  24.         ServerSocketChannel ssc=ServerSocketChannel.open();  
  25.         //检索与此通道关联的服务器套接字  
  26.         ServerSocket serverSocket=ssc.socket();  
  27.         //将 ServerSocket 绑定到特定地址(IP 地址和端口号)  
  28.         serverSocket.bind(new InetSocketAddress(port));  
  29.         System.out.println("server listen on port:"+port);  
  30.           
  31.         //调整通道的阻塞模式  
  32.         ssc.configureBlocking(false);  
  33.         //向给定的选择器注册此通道,返回一个选择键。SelectionKey.OP_ACCEPT--用于套接字接受操作的操作集位     
  34.         ssc.register(selector, SelectionKey.OP_ACCEPT);  
  35.           
  36.         while(true){  
  37.             //timeout:为正,则在等待某个通道准备就绪时最多阻塞 timeout 毫秒;如果为零,则无限期地阻塞;必须为非负数  
  38.             int nKeys=selector.select(1000);  
  39.             if(nKeys>0){  
  40.                   
  41.                 for(SelectionKey key:selector.selectedKeys()){  
  42.                     /*测试此键的通道是否已准备好接受新的套接字连接-- 
  43.                      * 如果此键的通道不支持套接字接受操作,则此方法始终返回 false 
  44.                      * */  
  45.                     if(key.isAcceptable()){  
  46.                         ServerSocketChannel server=(ServerSocketChannel) key.channel();  
  47.                         SocketChannel sc=server.accept();  
  48.                           
  49.                         if(sc==null){  
  50.                             continue;  
  51.                         }  
  52.                         sc.configureBlocking(false);  
  53.                         sc.register(selector, SelectionKey.OP_READ);  
  54.                     }else if(key.isReadable()){  
  55.                         //分配一个新的字节缓冲区  
  56.                         ByteBuffer buffer=ByteBuffer.allocate(1024);  
  57.                         SocketChannel sc=(SocketChannel) key.channel();  
  58.                         int readBytes=0;  
  59.                         String message=null;  
  60.                         try{  
  61.                             int ret;  
  62.                             try{  
  63.                                 while((ret=sc.read(buffer))>0){  
  64.                                     readBytes +=ret;  
  65.                                 }  
  66.                                   
  67.                             }catch(Exception e ){  
  68.                                 readBytes=0;  
  69.                                 //ignore  
  70.                             }finally{  
  71.                                 //反转此缓冲区。首先对当前位置设置限制,然后将该位置设置为零  
  72.                                 buffer.flip();  
  73.                             }  
  74.                               
  75.                             if(readBytes>0){  
  76.                                 message=Charset.forName("UTF-8").decode(buffer).toString();  
  77.                                 buffer=null;  
  78.                             }  
  79.                         }finally{  
  80.                             if(buffer!=null)  
  81.                                 buffer.clear();  
  82.                         }  
  83.                           
  84.                         if(readBytes>0){  
  85.                             System.out.println("message from client:"+message);  
  86.                             if("quit".equalsIgnoreCase(message.trim())){  
  87.                                 sc.close();  
  88.                                 selector.close();  
  89.                                 System.out.println("Server has been shutdown!");  
  90.                                 System.exit(0);  
  91.                             }  
  92.                             String outMessage="server response:"+message;  
  93.                             sc.write(Charset.forName("UTF-8").encode(outMessage));  
  94.                         }  
  95.                           
  96.                     }  
  97.                 }  
  98.                 selector.selectedKeys().clear();  
  99.             }  
  100.           
  101.         }  
  102.     }  
  103. }  

 

 

客户端java代码:

Java代码  收藏代码
  1. package com.java.distributed.message.tcpip;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.IOException;  
  5. import java.io.InputStreamReader;  
  6. import java.net.InetSocketAddress;  
  7. import java.net.SocketAddress;  
  8. import java.nio.ByteBuffer;  
  9. import java.nio.channels.SelectionKey;  
  10. import java.nio.channels.Selector;  
  11. import java.nio.channels.SocketChannel;  
  12. import java.nio.charset.Charset;  
  13.   
  14.   
  15. public class NIOClient {  
  16.   
  17.     /** 
  18.      * @param args 
  19.      * @throws IOException  
  20.      */  
  21.     public static void main(String[] args) throws IOException {  
  22.         int port =7889;  
  23.         SocketChannel channel=SocketChannel.open();  
  24.         channel.configureBlocking(false);  
  25.           
  26.         SocketAddress target=new InetSocketAddress("127.0.0.1",port);  
  27.         channel.connect(target);  
  28.         Selector selector=Selector.open();  
  29.         //用于套接字连接操作的操作集位  
  30.         channel.register(selector, SelectionKey.OP_CONNECT);  
  31.         BufferedReader systemIn=new BufferedReader(new InputStreamReader(System.in));  
  32.           
  33.         while(true){  
  34.             if(channel.isConnected()){  
  35.                 String command=systemIn.readLine();  
  36.                 channel.write(Charset.forName("UTF-8").encode(command));  
  37.                   
  38.                 if(command==null||"quit".equalsIgnoreCase(command.trim())){  
  39.                     systemIn.close();  
  40.                     channel.close();  
  41.                     selector.close();  
  42.                     System.out.println("Client quit !");  
  43.                     System.exit(0);  
  44.                 }  
  45.             }  
  46.             int nKeys=selector.select(1000);  
  47.             if(nKeys>0){  
  48.                 for(SelectionKey key:selector.selectedKeys()){  
  49.                     if(key.isConnectable()){  
  50.                         SocketChannel sc=(SocketChannel) key.channel();  
  51.                         sc.configureBlocking(false);  
  52.                         sc.register(selector, SelectionKey.OP_READ);  
  53.                         sc.finishConnect();  
  54.                     }else if(key.isReadable()){  
  55.                         ByteBuffer buffer=ByteBuffer.allocate(1024);  
  56.                         SocketChannel sc=(SocketChannel) key.channel();  
  57.                         int readBytes=0;  
  58.                         try{  
  59.                             int ret=0;  
  60.                             try{  
  61.                                 while((ret=sc.read(buffer))>0){  
  62.                                     readBytes+=ret;  
  63.                                 }  
  64.                             }finally{  
  65.                                 buffer.flip();  
  66.                             }  
  67.                             if (readBytes > 0) {     
  68.                                 System.out.println(Charset.forName("UTF-8")     
  69.                                         .decode(buffer).toString());     
  70.                                 buffer = null;     
  71.                             }     
  72.   
  73.                         }finally {     
  74.                             if (buffer != null) {     
  75.                                 buffer.clear();     
  76.                             }  
  77.                         }  
  78.                     }  
  79.                 }  
  80.                     selector.selectedKeys().clear();     
  81.             }  
  82.         }  
  83.     }  
  84.   
  85. }  
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics