1. IoSession在服务器端的创建第一步:accept(processor,handle) 接着上节说,我们已经知道,NioSocketAcceptor.bind()后,最终会生成一个在自身维护的executor线程池内提交的任务Acceptor(单一线程),该任务不断的轮询判断服务是否关闭、同时等待新的连接接入。当有新的连接接入时:
1 2 3 4 5 6 Acceptor.run(){ ... processHandles(selectedHandles()); ... }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private void processHandles (Iterator<ServerSocketChannel> handles) throws Exception { while (handles.hasNext()) { H handle = handles.next(); handles.remove(); S session = accept(processor, handle); if (session == null ) { continue ; } initSession(session, null , null ); session.getProcessor().add(session); } }
第一步accept(processor,handle)方法,调用子类实现的NioSession accept(...)方法,判断了SelectionKey的有效性后,获得连接的通道SocketChannel ch = handle.accept();最后以此为传参创建IoSession对象return new NioSocketSession(this, processor, ch);我们看下构造函数做了什么(为了方便起见,相关的父类构造函数都写在一块了):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public NioSocketSession (IoService service, IoProcessor<NioSession> processor, SocketChannel channel) { this .service = service; this .handler = service.getHandler(); long currentTime = System.currentTimeMillis(); creationTime = currentTime; lastThroughputCalculationTime = currentTime; lastReadTime = currentTime; lastWriteTime = currentTime; lastIdleTimeForBoth = currentTime; lastIdleTimeForRead = currentTime; lastIdleTimeForWrite = currentTime; closeFuture.addListener(SCHEDULED_COUNTER_RESETTER); sessionId = idGenerator.incrementAndGet(); this .channel = channel; this .processor = processor; filterChain = new DefaultIoFilterChain(this ); config = new SessionConfigImpl(); this .config.setAll(service.getSessionConfig()); }
1.1 IoFuture 我们已经很多次看到IoFuture,有必要来详细了解一下IoFuture的作用。那用过Executor的同学都知道,submit()提交Callback的任务后将返回一个Future表示这个任务的操作结果,这里的IoFuture的作用是类似的,他代表一次相关于IoSession的操作的操作结果,同时,还可以在他完成时获取定义相关的操作。我们看下他的子类结构图
图4.1 IoFuture子类结构图
1 2 3 4 5 6 7 8 9 10 addListener(IoFutureListener<?>) await() await(long ) await(long , TimeUnit) awaitUninterruptibly() awaitUninterruptibly(long ) awaitUninterruptibly(long , TimeUnit) getSession() isDone() removeListener(IoFutureListener<?>)
1 2 3 public interface IoFutureListener <F extends IoFuture > extends EventListener { void operationComplete (F future) ; }
在他的直接实现子类DefaultIoFuture里面,我们看一下该类很主要一个实现方法setValue(Object newValue),所有其他子类都会调用该方法以完成具体的操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void setValue (Object newValue) { synchronized (lock) { if (ready) { return ; } result = newValue; ready = true ; if (waiters > 0 ) { lock.notifyAll(); } } notifyListeners(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private void checkDeadLock () { if (!(this instanceof CloseFuture || this instanceof WriteFuture || this instanceof ReadFuture || this instanceof ConnectFuture)) { return ; } StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); for (StackTraceElement s : stackTrace) { if (AbstractPollingIoProcessor.class.getName().equals(s.getClassName())) { IllegalStateException e = new IllegalStateException("t" ); e.getStackTrace(); throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName() + ".await() was invoked from an I/O processor thread. " + "Please use " + IoFutureListener.class.getSimpleName() + " or configure a proper thread model alternatively." ); } } for (StackTraceElement s : stackTrace) { try { Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName()); if (IoProcessor.class.isAssignableFrom(cls)) { throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName() + ".await() was invoked from an I/O processor thread. " + "Please use " + IoFutureListener.class.getSimpleName() + " or configure a proper thread model alternatively." ); } } catch (Exception cnfe) { } } }
1.2 IoSession的过滤器链 IoSession是有状态的,它包括:
Connected :IoSession已被创建且生效。
Idle :会话在一定时间内没有处理任何请求,它包括下面三个子状态:
Idle for read : 一段时间内没有接受数据。
Idle for write : 一段时间内没有发送数据。
Idle for both : 一段时间内没有产生接受或发送数据的操作。
Closing : IoSession正在关闭中。
Closed : session已经被关闭。
图4.2 IoSession状态变化图
1 filterChain = new DefaultIoFilterChain(this );
2. IoSession在服务器端的创建第二步:initSession(session, null, null) 这一步initSession(session, null, null);主要的有两段可配置的属性,一个就是存储attribute属性键值对的Map,默认是ConcurrentHashMap;另一个就是发送队列,默认是ConcurrentLinkedQueue。若要实现自己的数据结构,需要重写接口IoSessionDataStructureFactory或继承DefaultIoSessionDataStructureFactory。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 try { ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory() .getAttributeMap(session)); } catch (IoSessionInitializationException e) { throw e; } catch (Exception e) { throw new IoSessionInitializationException("Failed to initialize an attributeMap." , e); } try { ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory() .getWriteRequestQueue(session)); } catch (IoSessionInitializationException e) { throw e; } catch (Exception e) { throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue." , e); }
3. IoSession在服务器端的创建第三步:session.getProcessor().add(session); 这一步就是为我们新生成的IoSession在IoService的Processor池中指定一个Processor处理。分配的方式就是取模分配,完成后在分配Processor中增加该IoSession的引用。SimpleIoProcessorPool中的代码如下:
1 2 3 4 5 public final void add (S session) { getProcessor(session).add(session); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private IoProcessor<S> getProcessor (S session) { IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR); if (processor == null ) { if (disposed || disposing) { throw new IllegalStateException("A disposed processor cannot be accessed." ); } processor = pool[Math.abs((int ) session.getId()) % pool.length]; if (processor == null ) { throw new IllegalStateException("A disposed processor cannot be accessed." ); } session.setAttributeIfAbsent(PROCESSOR, processor); } return processor; }
4. SimpleIoProcessorPool、IoProcess和NioSocketAcceptor之间的关系 上节我们留了个悬念,SimpleIoProcessorPool、IoProcess和IoServices之间的到底是关系呢?我们从SimpleIoProcessorPool初始化池说起:
1 2 3 4 5 for (int i = 1 ; i < pool.length; i++) { ... pool[i] = processorConstructor.newInstance(this .executor); ... }
1 2 3 4 5 6 7 8 9 10 public final void add (S session) { if (disposed || disposing) { throw new IllegalStateException("Already disposed." ); } newSessions.add(session); tartupProcessor(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private class Processor implements Runnable { public void run () { ... for (;;) { try { long t0 = System.currentTimeMillis(); int selected = select(1000 ); long t1 = System.currentTimeMillis(); long delta = (t1 - t0); ... nSessions += handleNewSessions(); ... if (selected > 0 ) { process(); } ... nSessions -= removeSessions(); notifyIdleSessions(currentTime); ... if (isDisposing()) { for (Iterator<S> i = allSessions(); i.hasNext();) { scheduleRemove(i.next()); } wakeup(); } } catch ... } ... } }
1 2 3 4 5 6 7 8 9 10 11 SelectableChannel ch = (SelectableChannel) session.getChannel(); ch.configureBlocking(false ); session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session)); IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder(); chainBuilder.buildFilterChain(session.getFilterChain()); IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners(); listeners.fireSessionCreated(session);
SimpleIoProcessorPool维护一组IoProcess[] ,每个IoProcess都在线程池中绑定了工作线程,该工作线程开启一个selector并监听session交互OP_READ。
1 2 3 4 5 6 7 8 9 public NioProcessor (Executor executor) { super (executor); try { selector = Selector.open(); } catch (IOException e) { throw new RuntimeIoException("Failed to open a selector." , e); } }
1 2 3 NioSocketAcceptor.init() throws Exception { selector = Selector.open(); }
结语 今天分析了IoSession后,mina的框架体系就比较清晰了,不过还有重要的filter、handler等,我们接下来几章继续探讨。