时间:2021-07-01 10:21:17 帮助过:48人阅读
接着Mina源码阅读笔记(四)—Mina的连接IoConnector1,,我们继续: AbstractIoAcceptor: 001 package org.apache.mina.core.rewrite.service; 002 003 import java.io.IOException; 004 import java.net.SocketAddress; 005 import java.util.ArrayList; 0
接着Mina源码阅读笔记(四)—Mina的连接IoConnector1,,我们继续:
AbstractIoAcceptor:
001 |
package org.apache.mina.core.rewrite.service; |
002 |
003 |
import java.io.IOException; |
004 |
import java.net.SocketAddress; |
005 |
import java.util.ArrayList; |
006 |
import java.util.Collections; |
007 |
import java.util.HashSet; |
008 |
import java.util.List; |
009 |
import java.util.Set; |
010 |
import java.util.concurrent.Executor; |
011 |
012 |
public abstract class AbstractIoAcceptor extends AbstractIoService implements |
013 |
IoAcceptor
{ |
014 |
015 |
private final List new ArrayList |
016 |
017 |
private final List |
018 |
.unmodifiableList(defaultLocalAddresses); |
019 |
020 |
private final Set new HashSet |
021 |
022 |
private boolean disconnectOnUnbind
= true ; |
023 |
024 |
/**
这里不是很明白,为什么要用protected 而 不是private */ |
025 |
protected final Object
bindLock = new Object(); |
026 |
027 |
/** |
028 |
*
注意这个构造方法是一定要写的,否则编译不通过:抽象类继承时候,构造方法都要写,而且必须包含super |
029 |
* |
030 |
*
@param param |
031 |
*
sessionConfig |
032 |
*
@param executor |
033 |
*/ |
034 |
protected AbstractIoAcceptor(Object
param, Executor executor) { |
035 |
super (param,
executor); |
036 |
defaultLocalAddresses.add( null ); |
037 |
} |
038 |
039 |
@Override |
040 |
public SocketAddress
getLocalAddress() { |
041 |
042 |
Set |
043 |
if (localAddresses.isEmpty())
{ |
044 |
return null ; |
045 |
} |
046 |
return localAddresses.iterator().next(); |
047 |
} |
048 |
049 |
@Override |
050 |
public final Set |
051 |
Set new HashSet |
052 |
synchronized (boundAddresses)
{ |
053 |
localAddresses.addAll(boundAddresses); |
054 |
} |
055 |
return localAddresses; |
056 |
} |
057 |
058 |
@Override |
059 |
public void bind(SocketAddress
localAddress) throws IOException
{ |
060 |
//
TODO Auto-generated method stub |
061 |
062 |
} |
063 |
064 |
@Override |
065 |
public void bind(Iterable extends SocketAddress>
localAddresses) |
066 |
throws IOException
{ |
067 |
//
TODO isDisposing() |
068 |
069 |
if (localAddresses
== null )
{ |
070 |
throw new IllegalArgumentException( "localAddresses" ); |
071 |
} |
072 |
073 |
List new ArrayList |
074 |
075 |
for (SocketAddress
a : localAddresses) { |
076 |
//
TODO check address type |
077 |
localAddressesCopy.add(a); |
078 |
} |
079 |
080 |
if (localAddressesCopy.isEmpty())
{ |
081 |
throw new IllegalArgumentException( "localAddresses
is empty" ); |
082 |
} |
083 |
084 |
boolean active
= false ; |
085 |
086 |
synchronized (bindLock)
{ |
087 |
synchronized (boundAddresses)
{ |
088 |
if (boundAddresses.isEmpty())
{ |
089 |
active
= true ; |
090 |
} |
091 |
} |
092 |
} |
093 |
/**
implement in abstractIoService */ |
094 |
if (getHandler()
== null )
{ |
095 |
throw new IllegalArgumentException( "handler
is not set" ); |
096 |
} |
097 |
098 |
try { |
099 |
Set |
100 |
101 |
synchronized (boundAddresses)
{ |
102 |
boundAddresses.addAll(addresses); |
103 |
} |
104 |
} catch (IOException
e) { |
105 |
throw e; |
106 |
} catch (RuntimeException
e) { |
107 |
throw e; |
108 |
} catch (Throwable
e) { |
109 |
throw new RuntimeException( "Filed
ti bind" ); |
110 |
} |
111 |
|
112 |
if (active){ |
113 |
//do
sth |
114 |
} |
115 |
} |
116 |
117 |
protected abstract Set |
118 |
List extends SocketAddress>
localAddress) throws Exception; |
119 |
120 |
@Override |
121 |
public void unbind(SocketAddress
localAddress) { |
122 |
//
TODO Auto-generated method stub |
123 |
|
124 |
} |
125 |
} |
01 |
package org.apache.mina.core.rewrite.polling; |
02 |
03 |
import java.net.SocketAddress; |
04 |
import java.nio.channels.ServerSocketChannel; |
05 |
import java.util.List; |
06 |
import java.util.Set; |
07 |
import java.util.concurrent.Executor; |
08 |
import java.util.concurrent.Semaphore; |
09 |
import java.util.concurrent.atomic.AtomicReference; |
10 |
11 |
import org.apache.mina.core.rewrite.service.AbstractIoAcceptor; |
12 |
13 |
public abstract class AbstractPollingIoAcceptor extends AbstractIoAcceptor
{ |
14 |
15 |
private final Semaphore
lock = new Semaphore( 1 ); |
16 |
17 |
private volatile boolean selectable; |
18 |
19 |
private AtomicReference
acceptorRef = new AtomicReference(); |
20 |
21 |
/** |
22 |
*
define the num of sockets that can wait to be accepted. |
23 |
*/ |
24 |
protected int backlog
= 50 ; |
25 |
26 |
/** |
27 |
*
一样的,这个构造方法也要写 |
28 |
* |
29 |
*
@param param |
30 |
*
@param executor |
31 |
*/ |
32 |
protected AbstractPollingIoAcceptor(Object
param, Executor executor) { |
33 |
super (param,
executor); |
34 |
//
TODO Auto-generated constructor stub |
35 |
} |
36 |
37 |
/** |
38 |
*
init the polling system. will be called at construction time |
39 |
* |
40 |
*
@throws Exception |
41 |
*/ |
42 |
protected abstract void init() throws Exception; |
43 |
44 |
protected abstract void destory() throws Exception; |
45 |
46 |
protected abstract int select() throws Exception; |
47 |
/**这里有点儿变动*/ |
48 |
protected abstract ServerSocketChannel
open(SocketAddress localAddress) throws Exception; |
49 |
50 |
@Override |
51 |
protected Set |
52 |
List extends SocketAddress>
localAddress) throws Exception
{ |
53 |
//
... |
54 |
try { |
55 |
lock.acquire(); |
56 |
Thread.sleep( 10 ); |
57 |
} finally { |
58 |
lock.release(); |
59 |
} |
60 |
//
... |
61 |
return null ; |
62 |
} |
63 |
64 |
/** |
65 |
*
this class is called by startupAcceptor() method it's a thread accepting |
66 |
*
incoming connections from client |
67 |
* |
68 |
*
@author ChenHui |
69 |
* |
70 |
*/ |
71 |
private class Acceptor implements Runnable
{ |
72 |
@Override |
73 |
public void run()
{ |
74 |
assert (acceptorRef.get()
== this ); |
75 |
76 |
int nHandles
= 0 ; |
77 |
78 |
lock.release(); |
79 |
80 |
while (selectable)
{ |
81 |
try { |
82 |
int selected
= select(); |
83 |
84 |
//
nHandles+=registerHandles(); |
85 |
86 |
if (nHandles
== 0 )
{ |
87 |
acceptorRef.set( null ); |
88 |
//
... |
89 |
} |
90 |
} catch (Exception
e) { |
91 |
92 |
} |
93 |
} |
94 |
} |
95 |
} |
96 |
} |
001 |
package org.apache.mina.rewrite.transport.socket.nio; |
002 |
003 |
import java.net.InetSocketAddress; |
004 |
import java.net.ServerSocket; |
005 |
import java.net.SocketAddress; |
006 |
import java.nio.channels.SelectionKey; |
007 |
import java.nio.channels.Selector; |
008 |
import java.nio.channels.ServerSocketChannel; |
009 |
import java.util.concurrent.Executor; |
010 |
011 |
import org.apache.mina.core.rewrite.polling.AbstractPollingIoAcceptor; |
012 |
import org.apache.mina.rewrite.transport.socket.SocketAcceptor; |
013 |
014 |
public final class NioSocketAcceptor extends AbstractPollingIoAcceptor |
015 |
implements SocketAcceptor
{ |
016 |
017 |
private volatile Selector
selector; |
018 |
019 |
protected NioSocketAcceptor(Object
param, Executor executor) { |
020 |
super (param,
executor); |
021 |
//
TODO Auto-generated constructor stub |
022 |
} |
023 |
024 |
@Override |
025 |
public int getManagedSessionCount()
{ |
026 |
//
TODO Auto-generated method stub |
027 |
return 0 ; |
028 |
} |
029 |
030 |
/** |
031 |
*
这个方法继承自AbstractIoAcceptor |
032 |
* |
033 |
*
The type NioSocketAcceptor must implement the inherited abstract method |
034 |
*
SocketAcceptor.getLocalAddress() to override |
035 |
*
AbstractIoAcceptor.getLocalAddress() |
036 |
*/ |
037 |
@Override |
038 |
public InetSocketAddress
getLocalAddress() { |
039 |
//
TODO Auto-generated method stub |
040 |
return null ; |
041 |
} |
042 |
043 |
@Override |
044 |
public void setDefaultLocalAddress(InetSocketAddress
localAddress) { |
045 |
//
TODO Auto-generated method stub |
046 |
047 |
} |
048 |
049 |
@Override |
050 |
public boolean isReuseAddress()
{ |
051 |
//
TODO Auto-generated method stub |
052 |
return false ; |
053 |
} |
054 |
055 |
@Override |
056 |
protected void init() throws Exception
{ |
057 |
selector
= Selector.open(); |
058 |
} |
059 |
060 |
@Override |
061 |
protected void destory() throws Exception
{ |
062 |
if (selector
!= null )
{ |
063 |
selector.close(); |
064 |
} |
065 |
} |
066 |
067 |
@Override |
068 |
protected int select() throws Exception
{ |
069 |
return selector.select(); |
070 |
} |
071 |
072 |
@Override |
073 |
protected void dispose0() throws Exception
{ |
074 |
//
TODO Auto-generated method stub |
075 |
076 |
} |
077 |
078 |
protected ServerSocketChannel
open(SocketAddress localAddress) |
079 |
throws Exception
{ |
080 |
ServerSocketChannel
channel =ServerSocketChannel.open(); |
081 |
|
082 |
boolean success= false ; |
083 |
|
084 |
try { |
085 |
channel.configureBlocking( false ); |
086 |
|
087 |
ServerSocket
socket=channel.socket(); |
088 |
|
089 |
socket.setReuseAddress(isReuseAddress()); |
090 |
|
091 |
socket.bind(localAddress); |
092 |
|
093 |
channel.register(selector,
SelectionKey.OP_ACCEPT); |
094 |
|
095 |
success= true ; |
096 |
} finally { |
097 |
if (!success){ |
098 |
//close(channel); |
099 |
} |
100 |
} |
101 |
return channel; |
102 |
} |
103 |
104 |
@Override |
105 |
public boolean isActive()
{ |
106 |
//
TODO Auto-generated method stub |
107 |
return false ; |
108 |
} |
109 |
110 |
} |
到此为止将连接部分都写完了,在连接部分还有些零碎的东西,比如handler、pollingsession了。