lecture4非阻塞通信.ppt

上传人:小飞机 文档编号:6510881 上传时间:2023-11-07 格式:PPT 页数:91 大小:344.49KB
返回 下载 相关 举报
lecture4非阻塞通信.ppt_第1页
第1页 / 共91页
lecture4非阻塞通信.ppt_第2页
第2页 / 共91页
lecture4非阻塞通信.ppt_第3页
第3页 / 共91页
lecture4非阻塞通信.ppt_第4页
第4页 / 共91页
lecture4非阻塞通信.ppt_第5页
第5页 / 共91页
点击查看更多>>
资源描述

《lecture4非阻塞通信.ppt》由会员分享,可在线阅读,更多相关《lecture4非阻塞通信.ppt(91页珍藏版)》请在三一办公上搜索。

1、第四章 非阻塞通信,主要内容,线程阻塞的概念Java.nio包中类的介绍:ServerSocketChannelSocketChannelSelector/SelectionKey/ByteBuffer等阻塞模式与非阻塞模式的实现,4.1线程阻塞的概念,处于阻塞状态的线程共同特征:1、放弃CPU,暂停运行,只有等待导致阻塞的原因消除,才能恢复运行;2、或者被其他线程中断,该线程会退出阻塞状态,并且抛出InterruptedException,4.1线程阻塞的原因,1、线程执行Thread.sleep(int n);2、线程要执行一段同步代码3、线程执行了一个对象的wait()方法,只有其他线程

2、执行了该对象的notify()或notifyAll()方法,才能将其唤醒4、线程执行I/O操作,如ReadLine之类的方法。,客户程序线程进入阻塞状态的情况,1、客户程序与服务器建立连接时,会进入阻塞状态,直到连接成功,线程才返回2、线程从Socket的输入流读入数据时,如果没有足够的数据,就进入阻塞状态,直到读到足够数据,或到达输入流末尾,或者异常,才会返回。输入流的不同读方法:int read():输入一个字节就足够int read(byte buff):输入流字节数和数组长度相同String readLine():输入流中有一行字符串就足够,需要BufferReader的此方法。,客户

3、程序线程进入阻塞状态的情况,3、线程从Socket的输出流写数据时,可能进入阻塞状态,等待输出了所有数据或者出现异常,才从输出流的write()方法返回或者异常中断4、调用Socket的close()方法时如果设置了关闭Socket的延迟时间,会进入阻塞,直到底层 Socket发完所有数据,或者超过setSoLinger()方法设置的延迟时间,才从close()方法返回。,服务器程线程进入阻塞状态的情况,1、线程执行ServerSocket的accept()方法2、线程从Socket的输入流读入数据,同客户端3、线程从Socket的输出流写出数据,同客户端综上,通过Socket的输入输出流来读

4、写数据时,都可能进入阻塞状态。这种可能出现阻塞的输入输出操作称为阻塞I/O与此对照,如果这种输入输出流读写操作不发生阻塞,则称为非阻塞I/O,4.1.2服务器多线程处理阻塞通信局限,创建ServerSocket,接收连接accept,委派任务给工作线程,等待任务,接收请求数据,发送响应数据,关闭连接,通信结束?,否,主线程,工作线程,红色标志步骤为可能阻塞,一旦阻塞,需要转让线程CPU使用权,4.1.2服务器多线程处理阻塞通信局限,局限:每个线程分配独立的堆栈空间,线程调度、同步、死锁可能很多时间浪费在阻塞IO操作上,线程切换频繁,非阻塞通信的基本思想,一心多用轮询while 处理流程Whil

5、e(一直等待,直到有接收连接就绪事件、读就绪事件、写就绪事件发生)/阻塞If(有客户连接)接收客户连接;/非阻塞If(Socket输入流有可读数据)从输入流中读数据;/非阻塞If(Socket输出流可写数据)向输出流中写数据;/非阻塞,Java.nio包中的主要类,以下支持阻塞和非阻塞通信的类ServerSocketChannel:ServerSocket的替代类,支持阻塞与非阻塞通信SocketChannel:Socket的替代类,同上Selector:为ServerSocketChannel监控接收连接就绪事件,为SocketChannel监控连接就绪、读就绪、写就绪事件。,Java.ni

6、o包中的主要类,SelectionKey:代表ServerSocketChannel 和SocketChannel向Selector注册事件的句柄,当一个SelectionKey对象位于Selector对象的selected-keys集合中时,就表示与这个SelectionKey对象相关的事件发生了。,Java.nio包中的主要类,如何注册事件的呢?ServerSocketChannel向Selector注册接收连接就绪事件的代码如下:注册事件过程SelectionKey key=serverSocketChannel.register(selector,SelectionKey.OP_ACC

7、EPT);,Java.nio包中的主要类,interfaceChannel,interfaceByteChannel,SelectableChannel,ServerSocketChannel,SocketChannel,ServerSocketChannel和SocketChannel都是SelectableChannel的子类,SelectableChannel及其子类都能委托Selector来监控它们可能发生的一些事件,这个委托过程也称为注册事件过程,SelectionKey类静态常量事件类型,SelectionKey的一些静态常量表示事件类型ServerSocketChannel只可能

8、发生一种事件:SelectionKey.OP_ACCEPT:接收就绪事件,表示至少有一个客户连接,服务器可以接收连接。,SelectionKey类静态常量事件类型,SocketChannel可能发生三种事件:SelectionKey.OP_CONNECT:连接就绪事件,表示客户与服务器的连接已经成功建立了。SelectionKey.OP_READ:读就绪事件,输入流中已有可读数据,可以执行读操作了SelectionKey.OP_WRITE:写就绪事件,可以向输出流写数据了。,SocketChannel的接收和发送数据方法,read(ByteBuffer buffer):接收数据,并将数据存到参

9、数指定的ByteBuffer中。write(ByteBuffer buffer):将参数指定的buffer中的数据发送出去。ByteBuffer表示字节缓冲区,两个方法都会操作它。继承于Buffer类。是特定的基本类型元素的线性、有限序列 ByteBuffer中存放的是字节,为转换为字符串,还需要用到Charset类,表示字符编码,提供了从字节流转换为字符串和字符串转换为字节流的方法。,缓冲区Buffer,数据输入输出往往是比较耗时的操作。缓冲区可以从两个方面提高IO操作的效率减少实际的物理读写次数。缓冲区在创建时被分配内存,这块内存一直被重用,减少动态分配和回收的次数。缓冲区属性:容量(ca

10、pacity)、限制(limit)、位置(position).对应的方法:clear()、flip()、rewind(),缓冲区Buffer,容量(capacity):表示缓冲区可以保存多少数据限制(limit):表示缓冲区的当前终点,不能对缓冲区中超过极限的区域进行读写操作,是不应读取或写入的第一个元素的索引 位置(position):是下一个要读取或写入的元素的索引,改变属性,clear()方法:清除此缓冲区。将位置设置为零,限制设置为该容量,并且丢弃标记。在使用序列信道读取或 put 操作填充此缓冲区之前调用此方法。例如:buf.clear();/Prepare buffer for r

11、eading in.read(buf);/Read data 此方法不能实际擦除缓冲区中的数据,但从名称来看似乎能够擦除,因为它多数情况下确实是在擦除数据时使用的,改变属性,filp()方法:反转此缓冲区。首先对当前位置设置限制,然后将该位置设置为零。如果已定义了标记,则丢弃该标记。在序列信道读取或 put 操作之后,调用此方法以做好序列信道写入或相对 get 操作。例如:buf.put(magic);/Prepend header in.read(buf);/Read data into rest of buffer buf.flip();/Flip buffer out.write(buf

12、);/Write header+data to channel,改变属性,rewind()方法:使缓冲区做好了重新读取已包含的数据的准备:它使限制保持不变,并将位置设置为零。重绕此缓冲区。将位置设置为零并丢弃标记。在序列信道写入或 get 操作之前调用此方法(假定已经适当设置了限制)。例如:out.write(buf);/Write remaining data buf.rewind();/Rewind buffer buf.get(array);/Copy data into array,Bufferdemo,importjava.io.FileInputStream;importjava.

13、io.FileOutputStream;importjava.nio.ByteBuffer;importjava.nio.channels.FileChannel;,Bufferdemo,publicclassBufferDemo publicstaticvoidmain(Stringargs)throwsException./分配一个非直接缓冲区 ByteBufferbb=ByteBuffer.allocate(100);/向缓冲区写入0到100的字节制 for(inti=0;i100;i+).byteb=(byte)(Math.random()*100);bb.put(b);(写入文件前的

14、缓冲区数据);bb.flip();while(bb.hasRemaining()System.out.print(bb.get()+);(“读取缓冲区数据完毕”);,Bufferdemo,/获取一个关联到文件buffer.txt的信道 FileChannelfc=newFileOutputStream(buffer.txt).getChannel();/将缓冲区数据写到文件中 bb.flip();fc.write(bb);/防止缓存 fc.force(true);/关闭信道 fc.close();bb=null;fc=null;,Bufferdemo,/下面从文件中读取数据 fc=newFil

15、eInputStream(buffer.txt).getChannel();ByteBufferbb2=ByteBuffer.allocate(int)fc.size();fc.read(bb2);(从文件读取的缓冲区数据);bb2.flip();while(bb2.hasRemaining()System.out.print(bb2.get()+);();fc.close();bb2=null;fc=null;,缓冲区Buffer的创建方法,Buffer类是一个抽象类,共有8个具体缓冲类,其中最基本的缓冲区是ByteBuffer,它存放的数据单元是字节。ByteBuffer类并没有提供公开的

16、构造方法,但是提供了两个获得ByteBuffer实例的静态工厂方法allocate(int capacity):返回一个容量大小为capacity的缓冲区directAllocate(int capacity):返回一个容量大小为capacity的直接缓冲区,开销相对大,只有当缓冲区较大并长期存在,或经常需要重用,缓冲区Buffer的读写方法,get():相对读。从缓冲区的当前位置读取一个单元的数据,读完后位置加一;get(int index).put():相对写。向缓冲区的当前位置写入一个单元的数据,然后位置加一;put(int index).还有一个compact()方法:将已读或写的数据

17、删除,然后将剩下的数据放到0至limit-position区域,缓冲区Buffer的创建方法,Buffer类是一个抽象类,共有8个具体缓冲类,其中最基本的缓冲区是ByteBuffer,它存放的数据单元是字节。提示:计算机中信息的存储单位(1)位(Bit):表示一个二进制数码0或1,是计算机存储处理信息的最基本的单位。(2)字节(Byte):一个字节由8个位组成。它表示作为一个完整处理单位的8个二进制数码。现目前计算机上多使用美国国家信息交换标准代码ASCII编码(由美国国家标准委员会制定)如:字符“A”的二进制编码是“0100 0001”即41H或65D“#”的二进制编码是“0010 0011

18、”即23H或35D(3)字(Word):16个位为一个字,它代表计算机处理指令或数据的二进制数位数,是计算机进行数据存储和数据处理的运算单位。通常称16位是一个字,32位是一个双字,64位是两个双字。,字符编码Charset,ByteBuffer encode(String str)ByteBuffer encode(CharBuffer cb)CharBuffer decode(ByteBuffer bb)创建方法:Charset cs=Charset.forName(“GBK”);,通道,通道用来连接缓冲区与数据源或者数据汇,Java.nio包中Channel的主要层次结构,interfa

19、ceChannel,interfaceByteChannel,SelectableChannel,ServerSocketChannel,SocketChannel,interfaceReadableByteChannel,interfaceWritableByteChannel,interfaceScatteringByteChannel,interfaceGatheringByteChannel,interfaceFileChannel,java.nio.channels.Channel只声明了两个方法1、close()2、isOpen().通道在创建时被打开,一旦关闭,就不能再打开了,R

20、ead(ByteBuffer bbs):把从数据源中读取的数据依次填充到bbs中,Read(ByteBuffer bb),通道Channel,通道用来连接缓冲区与数据源或者数据汇,ReadableByteChannel,ByteBuffer,数据汇,WritableByteChannel,数据源,SelectableChannel类,是一种支持阻塞和非阻塞I/O的通道。在非阻塞模式下,读写数据不会阻塞,并且它可以向Selector注册读就绪和写就绪事件,等到事件发生时,就可以进行相关操作了。主要方法:SelectableChannel configureBlocking(boolean blo

21、ck):默认阻塞模式public SelectionKey register(Selector sel,int ops)public SelectionKey register(Selector sel,int ops,Object attachment):附件对象为SelectionKey关联,当注册事件发生且需处理时,可从SelectionKey中获得这个附件,该附件可用来包含与处理该事件相关的信息:如下:,SelectableChannel类,案例一、ByteBuffer buffer=ByteBuffer.allocate(1024);socketChannel.register(se

22、lector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer);/以下取得附件对象,请参看如下二案例二、Target target=new Target();target.channel.register(selector,SelectionKey.OP_CONNECT,target);/以下取得附件对象Target target=(Target)selectionKey.attachment();,ServerSocketChannel类,继承SelectableChannel类的以下方法:SelectableChannel configu

23、reBlocking(boolean block)register(Selector sel,int ops,Object attachment)每个该对象都与一个ServerSocket对象关联,并可通过以下方式把服务器进程绑定到一个本地端口:ServerSocketChannel.socket().bind(port)是ServerSocket类的替代类,其他主要方法:,ServerSocketChannel类,3、public static ServerSocketChannel open()静态工厂方法:返回ServerSocketChannel 对象,这个对象没有与任何本地端口进行绑

24、定,并处于阻塞工资模式。如果希望改为非阻塞模式,则调用configureBlocking(false),ServerSocketChannel类,4、public SocketChannel accept():类似ServerSocket的accept()方法,用于接收客户连接。如果ServerSocketChannel处于非阻塞模式,当没有客户连接时,立即返回null。否则会一直等,直到有客户连接才返回。该方法返回的SocketChannel 对象是阻塞模式对象,如果需要改为非阻塞模式,必须执行:socketChannel.configureBlocking(false);,ServerSo

25、cketChannel类,5、public int validOps()返回ServerSocketChannel所能产生的事件6、public ServerSocket socket()返回与ServerSocketChannel关联的ServerSocket对象,每个ServerSocketChannel对象都与一个ServerSocket对象关联。,SocketChannel类,继承SelectableChannel类的以下方法:public SelectableChannel configureBlocking(boolean block)register(Selector sel,i

26、nt ops)public SelectionKey register(Selector sel,int ops,Object attachment),SocketChannel类,是Socket类的替代类,其连接和创建方法如下:public boolean connect(SocketAddress remote)public static SocketChannel open()public static SocketChannel open(SocketAddress remote)SocketChannel没有public类型的构造方法,必须通过它的静态方法open()来创建Socke

27、tChannel对象open也可以没有参数。有参数的会建立与远程服务器的连接。返回的对象处于阻塞模式,如果需要非阻塞,则需要调用该对象的configureBlocking(false)。,SocketChannel类,public final int validOps():返回SocketChannel所能产生的事件,3个。public Socket socket():返回与SocketChannel关联的Socket对象,每个SocketChannel对象都与一个socket对象关联。public boolean isConnected():判断底层Socket是否已建立了远程连接。publ

28、ic boolean isConnectionPending():判断是否正在进行连接。,SocketChannel类,public boolean connect(SocketAddress remote):使底层socket建立远程连接。当SocketChannel处于非阻塞模式时,如果立即连接成功,则返回true,否则返回false,之后必须使用finishConnect()方法来完成连接。public boolean finishConnect():试图完成连接远程服务器的操作。在非阻塞模式下,建立连接从connect()方法开始,到调用finishConnect()方法结束。而在阻塞

29、模式下,如果连接没有完成,则会进入阻塞状态,直到连接完成或者异常。,SocketChannel类,实现了ByteChannel接口,所以实现以下方法:read(ByteBuffer dst)write(ByteBuffer src),SocketChannel类read/write方法,4、int read(ByteBuffer dst):从Channel中读入若干字节,把它们存放到参数指定的ByteBuffer中。,capacity,limit,position,r,socketChannel.read(ByteBuffer dst),capacity,limit,position,r,n,

30、Read()方法读入n个字节,SocketChannel类read/write方法,4、int read(ByteBuffer dst):在阻塞模式下,read()方法会争取读到r个字节,如果输入流中不足r个字节,就进入阻塞状态,直到读到r个字节,或者输入流末尾,或者异常,capacity,limit,position,r,socketChannel.read(ByteBuffer dst),capacity,limit,position,r,n,SocketChannel类read/write方法,4、int read(ByteBuffer dst):在非阻塞模式下,read()方法会有多少

31、读多少,总会立即返回,如果返回-1,就表示读到了输入流的末尾。,capacity,limit,position,r,socketChannel.read(ByteBuffer dst),capacity,limit,position,r,n,SocketChannel类read/write方法,5、int write(ByteBuffer src):把src指定的ByteBuffer中的字节写到Channel中。,capacity,limit,position,r,socketChannel.write(ByteBuffer src),capacity,limit,position,r,n,w

32、rite()方法写入n个字节,SocketChannel类read/write方法,5、int write(ByteBuffer src):在阻塞模式下,write方法会争取输出r个字节,如果底层网络的输出缓冲区不能容纳r个字节,就进入阻塞状态。,capacity,limit,position,r,socketChannel.write(ByteBuffer src),capacity,limit,position,r,n,write()方法写入n个字节,SocketChannel类read/write方法,5、int write(ByteBuffer src):在非阻塞模式下,write方法

33、会能输出多少就输出多少,然后立即返回。,capacity,limit,position,r,socketChannel.write(ByteBuffer src),capacity,limit,position,r,n,write()方法写入n个字节,Selector类,只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监控它们相关事件是否发生。一个Selector对象中包含3类SelectionKey集合:all-keys集合:当前所有向Selector对象注册的SelectionKey的集合,Selector的ke

34、ys()方法返回该集合selected-keys 集合:相关事件已被selector捕获的selectionKey的集合,Selector的selectedKeys()方法返回该集合Cancelled-keys集合:已经取消的selectionKey的集合,无方法访问该集合,Selector类,all-keys集合:什么时候会加入SelectionKey对象呢?只要执行register方法!什么时候删除呢?不能使用该集合对象的remove方法,会引起异常,,Selector类,selected-keys集合:什么时候会加入SelectionKey对象呢?在执行Selector对象的select

35、()方法时,如果注册的相关事件发生了,这个SelectionKey对象就会加入该集合什么时候删除呢?可以直接调用其remove方法,Selector类,Cancelled-keys集合:什么时候会加入SelectionKey对象呢?在执行SelectionKey对象的cancel()方法或者关闭了相关的Channel对象时,这个SelectionKey对象就会加入该集合什么时候删除呢?再次执行Selector对象的select()方法时,Selector类,创建方法:public static Selector open()public boolean isOpen():取决于close方法p

36、ublic void close():释放selector的占用资源public int selectNow():返回当前事件已经发生的个数,采用非阻塞方式。,Selector类,public int select()或者select(long timeout):返回当前事件已经发生的个数,采用阻塞方式,如果没有任何事件发生,就会阻塞下去,直到至少有一件事件发生,或者其他线程调用wakeup或超时。wakeup():让线程中的select方法返回。selectedKeys():返回selected-keys集合,存放已经发生的相关事件的Selection-Key对象。,SelectionKey

37、类,通过Channel对象的register方法创建该对象其工作过程请结合三个SelectionKey集合来理解SelectionKey定义了四种事件:SelectionKey.OP_ACCEPT:常量值为16SelectionKey.OP_CONNECT:常量值为8SelectionKey.OP_READ:常量值为1SelectionKey.OP_WRITE:常量值为4其相关方法:获取状态、关联对象、附件等,服务器编程范例,1、采用阻塞模式,用线程池中的工作线程处理每个客户的连接:4-1例程2、采用非阻塞模式,单个线程同时负责接收和响应请求:4-2例程3、一个线程用阻塞模式负责连接多个客户请

38、求,另一个线程负责与多个客户进行数据交换:4-3例程,创建阻塞的EchoServer,当ServerSocketChannel和SocketChannel采用默认的阻塞模式时,同样需要多个线程。如下:,创建阻塞的EchoServer,public EchoServer()throws IOException executorService=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*POOL_MULTIPLE);serverSocketChannel=ServerSocketChannel.op

39、en();serverSocketChannel.socket().setReuseAddress(true);serverSocketChannel.socket().bind(new InetSocketAddress(port);(服务器启动);public void service()while(true)SocketChannel socketChannel=null;try socketChannel=serverSocketChannel.accept();executorService.execute(new Handler(socketChannel);catch(IOExc

40、eption e)e.printStackTrace();,创建阻塞的EchoServer,class Handler implements Runnable private SocketChannel socketChannel;public Handler(SocketChannel socketChannel)this.socketChannel=socketChannel;public void run()handle(socketChannel);,创建阻塞的EchoServer,public void handle(SocketChannel socketChannel)try S

41、ocket socket=socketChannel.socket();(接收到客户连接,来自:+socket.getInetAddress()+:+socket.getPort();BufferedReader br=getReader(socket);PrintWriter pw=getWriter(socket);String msg=null;while(msg=br.readLine()!=null)pw.println(echo(msg);if(msg.equals(bye)break;catch(IOException e)e.printStackTrace();finally

42、try if(socketChannel!=null)socketChannel.close();catch(IOException e)e.printStackTrace();,客户程序,public class EchoClient private SocketChannel socketChannel=null;public EchoClient()throws IOException socketChannel=SocketChannel.open();InetAddress ia=InetAddress.getLocalHost();InetSocketAddress isa=new

43、 InetSocketAddress(ia,8000);socketChannel.connect(isa);(与服务器的连接建立成功);public static void main(String args)throws IOException new EchoClient().talk();,客户程序,public void talk()throws IOException try BufferedReader br=getReader(socketChannel.socket();PrintWriter pw=getWriter(socketChannel.socket();Buffer

44、edReader localReader=new BufferedReader(new InputStreamReader(System.in);String msg=null;while(msg=localReader.readLine()!=null)pw.println(msg);System.out.println(br.readLine();if(msg.equals(bye)break;catch(IOException e)e.printStackTrace();finally trysocketChannel.close();catch(IOException e)e.prin

45、tStackTrace();,创建非阻塞的EchoServer,在非阻塞模式下,EchoServer只要启动一个主线程,就能同时处理3件事:1、接收客户连接2、接收客户发送的数据3、向客户发回响应数据EchoServer委托Selector来负责监控接收连接就绪事件、读就绪事件、写就绪事件。,创建非阻塞的EchoServer,public class EchoServer private Selector selector=null;private ServerSocketChannel serverSocketChannel=null;private int port=8000;privat

46、e Charset charset=Charset.forName(GBK);public EchoServer()throws IOException selector=Selector.open();/创建一个Selector对象 serverSocketChannel=ServerSocketChannel.open();/创建ServerSocketChannel serverSocketChannel.socket().setReuseAddress(true);serverSocketChannel.configureBlocking(false);/使其为非阻塞模式工作 serv

47、erSocketChannel.socket().bind(new InetSocketAddress(port);(服务器启动);,创建非阻塞的EchoServer.service(),public void service()throws IOException serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);while(selector.select()0)Set readyKeys=selector.selectedKeys();Iterator it=readyKeys.iterator();while(it

48、.hasNext()SelectionKey key=null;try key=(SelectionKey)it.next();it.remove();,创建非阻塞的EchoServer.service,if(key.isAcceptable()ServerSocketChannel ssc=(ServerSocketChannel)key.channel();SocketChannel socketChannel=(SocketChannel)ssc.accept();(接收到客户连接,来自:+socketChannel.socket().getInetAddress()+:+socketC

49、hannel.socket().getPort();socketChannel.configureBlocking(false);ByteBuffer buffer=ByteBuffer.allocate(1024);socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer);if(key.isReadable()receive(key);if(key.isWritable()send(key);,创建非阻塞的EchoServer.service,catch(IOException e)e

50、.printStackTrace();tryif(key!=null)key.cancel();key.channel().close();catch(Exceptionex)e.printStackTrace();/#while/#while,创建非阻塞的EchoServer-send,public void send(SelectionKey key)throws IOException ByteBuffer buffer=(ByteBuffer)key.attachment();SocketChannel socketChannel=(SocketChannel)key.channel(

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 生活休闲 > 在线阅读


备案号:宁ICP备20000045号-2

经营许可证:宁B2-20210002

宁公网安备 64010402000987号