初学netty 简单理解

netty的 future 比 java 的future 多了监听器的功能,使的future.get()这个阻塞的形式不是必须。
可以用非阻塞的形式。
但是netty 的future 有两个问题 1、一般要和一个执行器executor关联,也就是要有个线程去异步执行task。
2、 一般只能等操作完成或者出现异常错误才会返回,不能在任务过程中去设置success或者fail,也不能设置返回值。。
第一个问题,要和一个异步task队列相关联,所以netty给我实现了 就用把
第二个问题so Promise解决了第二个问题。
那么也就是说 Promise 可以在任务过程中 有条件的设置success 或者fail 或者cancel。

还有Promise是future的子接口 所以一般可以和futrue相等,而且设置值是线程安全的。

比较常用的Promise 是DefaultPromise。这里简单的做一个udp的需求 有一个操作是udp发一个操作报文,那么5秒中之内得到回应的话 就输出操作成功。
如果不成功每5秒钟 重试一次 重试3次后 如果还失败就输出发送重试没成功。
UdpServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

public class UdpServer {


/**
* 启动服务
*/
public void bind(int port) {

log.info("-------------------------------udpServer-------------------------");
//表示服务器连接监听线程组,专门接受 accept 新的客户端client 连接
EventLoopGroup bossLoopGroup = new NioEventLoopGroup();

HashMap<InetSocketAddress, Channel> map = new LinkedHashMap<>();

try {
//1,创建netty bootstrap 启动类
Bootstrap serverBootstrap = new Bootstrap();
//2、设置boostrap 的eventLoopGroup线程组
serverBootstrap = serverBootstrap.group(bossLoopGroup);
//3、设置NIO UDP连接通道
serverBootstrap = serverBootstrap.channel(NioDatagramChannel.class);
//4、设置通道参数 SO_BROADCAST广播形式
serverBootstrap = serverBootstrap.option(ChannelOption.SO_BROADCAST, true);
//5、设置处理类 装配流水线
serverBootstrap = serverBootstrap.handler(new BootNettyUdpSimpleChannelInboundHandler(map));
//6、绑定server,通过调用sync()方法异步阻塞,直到绑定成功

ChannelFuture f = serverBootstrap.bind(port).sync();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
channelFuture.isSuccess();
}
});
log.info(UdpServer.class.getName() + " started and listend on " + f.channel().localAddress());

/* f.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
f.channel().writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8), datagramPacket.sender()));
}
});*/

// ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() << 1);

//起了个netty执行器 相当于起了个线程
DefaultEventExecutor executor=new DefaultEventExecutor();
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture cf;

log.info("位置1 "+String.valueOf(Thread.currentThread().getId()));


while (true) {
try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (map.keySet().size() > 0) {
Channel c=map.entrySet().iterator().next().getValue();
InetSocketAddress add=map.entrySet().iterator().next().getKey();
cf= c.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8), add));

cf.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if(future.isSuccess()){
//懒得用其他结构了 直接在channel里的attr 里设置个状态1 发出去还没回应,2 发出去回应了。
AttributeKey<Integer> bechoAttr = AttributeKey.valueOf("becho");
Attribute<Integer> bechoA=c.attr(bechoAttr);
bechoA.set(1);
String attrKey="becho";
int attrValue=2;
//自己写了个DefaultPromise结果
EchoWrite ew=new EchoWrite();
DefaultPromise tof =ew.echoWrite("echorequest",add,c, attrKey,attrValue,3,5000);
tof.addListener(new GenericFutureListener<Future<? super Boolean>>() {
@Override
public void operationComplete(Future<? super Boolean> future) {
if(future.isSuccess()){
log.info("成功回应");

}
else {log.info("好几次没回应");
}
}
});
}

}
});
break;

}


// else continue;
}
}
});

//7、监听通道关闭事件,应用程序会一直等待,直到channel关闭
f.channel().closeFuture().sync();
}catch(Exception e){
// TODO: handle exception
}finally{
System.out.println("netty udp close!");
//8 关闭EventLoopGroup,
bossLoopGroup.shutdownGracefully();
}
}

}

BootNettyUdpSimpleChannelInboundHandler类服务端消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class BootNettyUdpSimpleChannelInboundHandler extends SimpleChannelInboundHandler<DatagramPacket> {

HashMap<InetSocketAddress, Channel> map;

public BootNettyUdpSimpleChannelInboundHandler(HashMap<InetSocketAddress, Channel> map) {
this.map = map;
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {
try {

log.info("位置2 "+String.valueOf(Thread.currentThread().getId()));
String strdata = datagramPacket.content().toString(CharsetUtil.UTF_8);
if(strdata.equals("echo")){ //得到的消息如果是echo 那么设置状态2
AttributeKey<Integer> bechoAttr = AttributeKey.valueOf("becho");
Attribute<Integer> bechoA=channelHandlerContext.channel().attr(bechoAttr);
bechoA.set(2);
}
//打印收到的消息
log.info("---------------------receive data--------------------------");
InetSocketAddress remoteAddress = datagramPacket.sender();
String ip = remoteAddress.getAddress().getHostAddress();
int port = remoteAddress.getPort();
map.put(remoteAddress,channelHandlerContext.channel());
log.info(ip+" "+port);
log.info(strdata);
log.info("---------------------receive data--------------------------");

//收到udp消息后,可通过此方式原路返回的方式返回消息,例如返回时间戳
// channelHandlerContext.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8), datagramPacket.sender()));
} catch (Exception e) {

}

}

做了一个通用的EchoWrite 辅助类 这个类 实现超时重传。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class EchoWrite {
EventExecutor e;
DefaultPromise t;
public EchoWrite() {
this.e = new DefaultEventExecutor();
t=new DefaultPromise(this.e);
}

public <T> DefaultPromise echoWrite(String msg, InetSocketAddress add ,Channel c,String attrKey ,T attrValue, int times, int timeout) {
e.submit(new Runnable() {

@Override
public void run() {
int times1 = 0;
log.info("位置3 "+String.valueOf(Thread.currentThread().getId()));
c.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8), add));
log.info("发出echo");
AttributeKey<Integer> bechoAttr = AttributeKey.valueOf(attrKey);
Attribute<Integer> bechoA = c.attr(bechoAttr);
while (times1 < times) {
try {
sleep(timeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

bechoA = c.attr(bechoAttr);
if (bechoA.get() == attrValue) {

t.setSuccess(true);
break;
} else {
c.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8), add));
times1++;
log.info("发出echo");

continue;
}
}
if (times1 == times) {
Exception ex = new Exception("超时没有回应");
t.setFailure(ex);

}

}



});

return t;
}
}

客户端就简单了 只需给服务端发一个报文,让服务端得到channel ,还有就会收到echorequest 发回echo就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class UdpClient {
public static void main(String[] args) throws Exception{
int port = 51000;
if(args != null && args.length > 0){
port = Integer.valueOf(args[0]);
}
new UdpClient().run(port);
}

public void bindclient(){
int port = 51000;
try {
run(port);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void run(int port) throws Exception{
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();

try {

bootstrap.group(eventLoopGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new UdpClientHandler());

Channel channel = bootstrap.bind(0).sync().channel();
AttributeKey<Integer> timesAttrKey = AttributeKey.valueOf("times");
Attribute<Integer> timeA=channel.attr(timesAttrKey);
timeA.set(0);
//向网段内所有机器广播
channel.writeAndFlush(
new DatagramPacket(
Unpooled.copiedBuffer("发个测试信息", CharsetUtil.UTF_8),
// new InetSocketAddress("255.255.255.255", port)
new InetSocketAddress("127.0.0.1", port)
)
).sync();

//客户端等待15s用于接收服务端的应答消息,然后退出并释放资源


// while(true); //这里注意 如果注释掉 那么客户端只会给服务器发个消息 不会回应echorequest 如果不注释 那么会发回应echo。

}catch (Exception e){
e.printStackTrace();
}finally {
eventLoopGroup.shutdownGracefully();
}


}
}

客户端消息处理handler

1
2
3
4
5
6
7
8
9
10
public class UdpClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket packet) throws Exception {
String resp = packet.content().toString(CharsetUtil.UTF_8);
System.out.println("客户端接收结果:" + resp);
//channelHandlerContext.close();
channelHandlerContext.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("echo", CharsetUtil.UTF_8), packet.sender()));
}
}

可能用定时器实现更方便 ,这里就是简单的sleep了 回头定时器任务试试。