当前位置:Gxlcms > mysql > Mina源码阅读笔记(四)—Mina的连接IoConnector2

Mina源码阅读笔记(四)—Mina的连接IoConnector2

时间:2021-07-01 10:21:17 帮助过:48人阅读

接着Mina源码阅读笔记(四)—Mina的连接IoConnector1,,我们继续: AbstractIoAcceptor: 001 package org.apache.mina.core.rewrite.service; 002 003 import java.io.IOException; 004 import java.net.SocketAddress; 005 import java.util.ArrayList; 0

接着Mina源码阅读笔记(四)—Mina的连接IoConnector1,,我们继续:

AbstractIoAcceptor:

001 package org.apache.mina.core.rewrite.service;

002

003 import java.io.IOException;

004 import java.net.SocketAddress;

005 import java.util.ArrayList;

006 import java.util.Collections;

007 import java.util.HashSet;

008 import java.util.List;

009 import java.util.Set;

010 import java.util.concurrent.Executor;

011

012 public abstract class AbstractIoAcceptor extends AbstractIoService implements

013 IoAcceptor {

014

015 private final List defaultLocalAddresses = new ArrayList();

016

017 private final List unmodifiableDeffaultLocalAddresses = Collections

018 .unmodifiableList(defaultLocalAddresses);

019

020 private final Set boundAddresses = new HashSet();

021

022 private boolean disconnectOnUnbind = true;

023

024 /** 这里不是很明白,为什么要用protected 而 不是private */

025 protected final Object bindLock = new Object();

026

027 /**

028 * 注意这个构造方法是一定要写的,否则编译不通过:抽象类继承时候,构造方法都要写,而且必须包含super

029 *

030 * @param param

031 * sessionConfig

032 * @param executor

033 */

034 protected AbstractIoAcceptor(Object param, Executor executor) {

035 super(param, executor);

036 defaultLocalAddresses.add(null);

037 }

038

039 @Override

040 public SocketAddress getLocalAddress() {

041

042 Set localAddresses = getLocalAddresses();

043 if (localAddresses.isEmpty()) {

044 return null;

045 }

046 return localAddresses.iterator().next();

047 }

048

049 @Override

050 public final Set getLocalAddresses() {

051 Set localAddresses = new HashSet();

052 synchronized (boundAddresses) {

053 localAddresses.addAll(boundAddresses);

054 }

055 return localAddresses;

056 }

057

058 @Override

059 public void bind(SocketAddress localAddress) throws IOException {

060 // TODO Auto-generated method stub

061

062 }

063

064 @Override

065 public void bind(Iterableextends SocketAddress> localAddresses)

066 throws IOException {

067 // TODO isDisposing()

068

069 if (localAddresses == null) {

070 throw new IllegalArgumentException("localAddresses");

071 }

072

073 List localAddressesCopy = new ArrayList();

074

075 for (SocketAddress a : localAddresses) {

076 // TODO check address type

077 localAddressesCopy.add(a);

078 }

079

080 if (localAddressesCopy.isEmpty()) {

081 throw new IllegalArgumentException("localAddresses is empty");

082 }

083

084 boolean active = false;

085

086 synchronized (bindLock) {

087 synchronized (boundAddresses) {

088 if (boundAddresses.isEmpty()) {

089 active = true;

090 }

091 }

092 }

093 /** implement in abstractIoService */

094 if (getHandler() == null) {

095 throw new IllegalArgumentException("handler is not set");

096 }

097

098 try {

099 Set addresses = bindInternal(localAddressesCopy);

100

101 synchronized (boundAddresses) {

102 boundAddresses.addAll(addresses);

103 }

104 } catch (IOException e) {

105 throw e;

106 } catch (RuntimeException e) {

107 throw e;

108 } catch (Throwable e) {

109 throw new RuntimeException("Filed ti bind");

110 }

111

112 if(active){

113 //do sth

114 }

115 }

116

117 protected abstract Set bindInternal(

118 Listextends SocketAddress> localAddress) throws Exception;

119

120 @Override

121 public void unbind(SocketAddress localAddress) {

122 // TODO Auto-generated method stub

123

124 }

125 }

polling:

01 package org.apache.mina.core.rewrite.polling;

02

03 import java.net.SocketAddress;

04 import java.nio.channels.ServerSocketChannel;

05 import java.util.List;

06 import java.util.Set;

07 import java.util.concurrent.Executor;

08 import java.util.concurrent.Semaphore;

09 import java.util.concurrent.atomic.AtomicReference;

10

11 import org.apache.mina.core.rewrite.service.AbstractIoAcceptor;

12

13 public abstract class AbstractPollingIoAcceptor extends AbstractIoAcceptor {

14

15 private final Semaphore lock = new Semaphore(1);

16

17 private volatile boolean selectable;

18

19 private AtomicReference acceptorRef = new AtomicReference();

20

21 /**

22 * define the num of sockets that can wait to be accepted.

23 */

24 protected int backlog = 50;

25

26 /**

27 * 一样的,这个构造方法也要写

28 *

29 * @param param

30 * @param executor

31 */

32 protected AbstractPollingIoAcceptor(Object param, Executor executor) {

33 super(param, executor);

34 // TODO Auto-generated constructor stub

35 }

36

37 /**

38 * init the polling system. will be called at construction time

39 *

40 * @throws Exception

41 */

42 protected abstract void init() throws Exception;

43

44 protected abstract void destory() throws Exception;

45

46 protected abstract int select() throws Exception;

47 /**这里有点儿变动*/

48 protected abstract ServerSocketChannel open(SocketAddress localAddress) throws Exception;

49

50 @Override

51 protected Set bindInternal(

52 Listextends SocketAddress> localAddress) throws Exception {

53 // ...

54 try {

55 lock.acquire();

56 Thread.sleep(10);

57 } finally {

58 lock.release();

59 }

60 // ...

61 return null;

62 }

63

64 /**

65 * this class is called by startupAcceptor() method it's a thread accepting

66 * incoming connections from client

67 *

68 * @author ChenHui

69 *

70 */

71 private class Acceptor implements Runnable {

72 @Override

73 public void run() {

74 assert (acceptorRef.get() == this);

75

76 int nHandles = 0;

77

78 lock.release();

79

80 while (selectable) {

81 try {

82 int selected = select();

83

84 // nHandles+=registerHandles();

85

86 if (nHandles == 0) {

87 acceptorRef.set(null);

88 // ...

89 }

90 } catch (Exception e) {

91

92 }

93 }

94 }

95 }

96 }

好了最后看NioSoeketAcceptor:

001 package org.apache.mina.rewrite.transport.socket.nio;

002

003 import java.net.InetSocketAddress;

004 import java.net.ServerSocket;

005 import java.net.SocketAddress;

006 import java.nio.channels.SelectionKey;

007 import java.nio.channels.Selector;

008 import java.nio.channels.ServerSocketChannel;

009 import java.util.concurrent.Executor;

010

011 import org.apache.mina.core.rewrite.polling.AbstractPollingIoAcceptor;

012 import org.apache.mina.rewrite.transport.socket.SocketAcceptor;

013

014 public final class NioSocketAcceptor extends AbstractPollingIoAcceptor

015 implements SocketAcceptor {

016

017 private volatile Selector selector;

018

019 protected NioSocketAcceptor(Object param, Executor executor) {

020 super(param, executor);

021 // TODO Auto-generated constructor stub

022 }

023

024 @Override

025 public int getManagedSessionCount() {

026 // TODO Auto-generated method stub

027 return 0;

028 }

029

030 /**

031 * 这个方法继承自AbstractIoAcceptor

032 *

033 * The type NioSocketAcceptor must implement the inherited abstract method

034 * SocketAcceptor.getLocalAddress() to override

035 * AbstractIoAcceptor.getLocalAddress()

036 */

037 @Override

038 public InetSocketAddress getLocalAddress() {

039 // TODO Auto-generated method stub

040 return null;

041 }

042

043 @Override

044 public void setDefaultLocalAddress(InetSocketAddress localAddress) {

045 // TODO Auto-generated method stub

046

047 }

048

049 @Override

050 public boolean isReuseAddress() {

051 // TODO Auto-generated method stub

052 return false;

053 }

054

055 @Override

056 protected void init() throws Exception {

057 selector = Selector.open();

058 }

059

060 @Override

061 protected void destory() throws Exception {

062 if (selector != null) {

063 selector.close();

064 }

065 }

066

067 @Override

068 protected int select() throws Exception {

069 return selector.select();

070 }

071

072 @Override

073 protected void dispose0() throws Exception {

074 // TODO Auto-generated method stub

075

076 }

077

078 protected ServerSocketChannel open(SocketAddress localAddress)

079 throws Exception {

080 ServerSocketChannel channel =ServerSocketChannel.open();

081

082 boolean success=false;

083

084 try{

085 channel.configureBlocking(false);

086

087 ServerSocket socket=channel.socket();

088

089 socket.setReuseAddress(isReuseAddress());

090

091 socket.bind(localAddress);

092

093 channel.register(selector, SelectionKey.OP_ACCEPT);

094

095 success=true;

096 }finally{

097 if(!success){

098 //close(channel);

099 }

100 }

101 return channel;

102 }

103

104 @Override

105 public boolean isActive() {

106 // TODO Auto-generated method stub

107 return false;

108 }

109

110 }

------------------------------------------------------

到此为止将连接部分都写完了,在连接部分还有些零碎的东西,比如handlerpollingsession了。

人气教程排行