SDN课设,使用Ryu来控制,啃源码可真的累呀

Github源码在此

项目背景

​ 目前智慧城市越来越发展,但是随着各种网络设备以及传感器的应用,城市的网络负担也在加大。如果数据分析等操作都由城市的数据中心来做,那么一定网络流量不堪重负。雾计算,是一个好思想,然而,某些时候又需要整体进行调控,我们使用SDN的思想来实现。

​ 这是一个交通路口的信号灯及背后处理器的拓扑,我们都知道,可以通过当前路况来智能调整交通信号灯的时长,使得交通更加通畅。我们设想,在车流量不是很大的时候,摄像头10.1.1.100收集车流信息,可以由局域网中的MCU(10.3.3.150)做决策,调节其时长;然而在上下班高峰期,就需要将车流信息上传到数据处理中心(10.2.2.130),数据中心根据全城的交通情况做决策,这样就避免各个路口自己做决策结果冲突了。

​ 但是,一个摄像头显然不会有这么智能的功能,知道什么时候将流发给局域网的MCU还是远程的数据中心。我们就像,设置一个虚拟IP(10.1.1.77)与对应的虚拟MAC(66:66:66:66:66:66),路由器在收到这个目的地址为虚拟IP的包之后,根据Ryu的控制,选择将其发送给近处还是远处进行处理,而MCU和数据中心可以正常向摄像头发信息。这样以来,通过虚拟IP,对于摄像头来说就是透明的了,它只管发,不用管别的。

肝课设过程中的笔记

方法一

1
2
3
graph TD;
虚拟机搭建拓扑-->配置成为OVS交换机-->某个虚拟机安装ryu作为控制器-->使用命令将tyu指定为OVS所用的控制器;

相当于将Ubuntu作为ovs的交换机了,好处:更加模拟现实环境,直接用linux系统。可能存在的问题:这个拓扑在清零的时候可能会有问题,比如arp表清除等(不能做到像mininet仿真的时候每次运行拓扑就从零开始。)

Ubuntu安装ovs配置

Ubuntu安装Open vSwitch

ryu RESTAPI

RYU核心源码解读:

RYU入门

RYU官方文档(其中有对模拟arp的介绍,并且可以查看guide1.dox)

RYU:OpenFlow协议源码

方法二

使用mininet搭建整个拓扑,ryu也是直接作为这个拓扑网络的控制器

优点:方便调试、更新,使用wireshark抓包,ping和tcpdump进行流量创建(这一点方法一也可以做到)

缺点:都是在一个虚拟机上进行的,互联互通有可能成为问题

Ryu+mininet+Wireshark

一个mininet+ryu逐步控制的例子

Mininet与真实网络连接

Mininet中host与外网通信

一个抓包例子

正式开始

ryu源码分析之packet类

ryupacket源码

ryu packet文档

结合这个例子学习构造包下发

逐级封装:

1
2
3
4
5
6
7
8
9
e = ethernet.ethernet(dst='ff:ff:ff:ff:ff:ff',
src='08:60:6e:7f:74:e7',
ethertype=ether.ETH_TYPE_ARP)
a = arp.arp(hwtype=1, proto=0x0800, hlen=6, plen=4, opcode=2,
src_mac='08:60:6e:7f:74:e7', src_ip='192.0.2.1',
dst_mac='00:00:00:00:00:00', dst_ip='192.0.2.2')
p = packet.Packet()
p.add_protocol(e)
p.add_protocol(a)

添加action事件下发包

1
2
3
actions = [parser.OFPActionOutput(port=port)]
out =parser.OFPPacketOut(datapath=datapath,buffer_id=ofproto.OFP_NO_BUFFER,in_port=ofproto.OFPP_CONTROLLER,actions=actions,data=data)
datapath.send_msg(out)

Ryu RESTFUL协议远程获取控制面信息

跨网段通信所需

OFPMatch 607行

OFPActinOutput 4682行

OFPInstructionActions 4550行

1
2
3
4
5
6
7
8
pkt = packet.Packet(msg.data)

eth = pkt.get_protocols(ethernet.ethernet)[0]
arp_pkt = pkt.get_protocol(arp.arp)

if eth:
eth_dst = eth.dst
eth_src = eth.src
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
if udp_pkt:
print('***********收到虚拟udp包*********')
match = parser.OFPMatch(eth_dst='66:66:66:66:66:66')
print("match:", match)

#kwargs = dict(eth_dst='66:66:66:66:66:66')
#match1 = parser.OFPMatch(** kwargs)
#print("match1:", match1)

actions = [parser.OFPActionSetField(ipv4_dst='10.1.1.110'),parser.OFPActionSetField(eth_dst=src_mac),parser.OFPActionOutput(in_port)]


# datapath = self._find_dp(int(switch))
# assert datapath is not None

inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)]

# idle and hardtimeout set to 0,making the entry permanent
# reference openflow spec
mod = datapath.ofproto_parser.OFPFlowMod(
datapath=datapath,
match=match,
idle_timeout=0,
hard_timeout=0,
priority=32768,
instructions=inst
)
datapath.send_msg(mod)

RYU官方文档中,给了基于RESTFUL协议的对外通信API,Ryu RESTFUL协议远程获取控制面信息,封装在了ryu.app.ofctl_rest类中,主要提供了以下几种操作:

1.获取ovs交换机信息

2.获取流信息

3.添加流、添加新的组

由于是基于RESTFUL API,因此我们可以利用python的requests模块很轻松的使用,其方法和使用一般的http方法并无不同。

为了获取RYU所管理的网桥信息,首先根据官方文档得url:”http://localhost:8080:stats/switchs",使用GET方法,得到所有的网桥dpid,存入列表中。

1
2
3
4
5
6
7
8
9
10
11
def __init__(self):
self.URL='http://localhost:8080'
self.dpid=[]
def getall_sw(self):
url=self.URL+'/stats/switches'
data = requests.get(url)
if data.status_code == 200:
data_all = data.json()
print(data_all)
for i in data_all:
self.dpid.append(i)

在遍历这个列表,使用url:”http://localhost:8080:stats/flows",再加上列表中的dpid,组合成得到这个dpid网桥的具体信息,通过GET方法,将得到的数据转化成json格式,再提取其中的action模式、流过包数量、流过字节数等字段,得到想要的输出信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def getall_sw_data(self):
for i in self.dpid:
url=self.URL+'/stats/flow/'+str(i)
data = requests.get(url)
if data.status_code == 200:
data_all = data.json()[str(i)]
action=data_all[0]['actions']
packet_count=data_all[3]['packet_count']
priority=data_all[6]['priority']
byte_count=data_all[8]['byte_count']
print("交换机{}的action模式为{}\n"\
"流过的包为{}\n"\
"流过的字节为{}\n"\
"优先级为{}\n".format(i,action,packet_count,byte_count,priority))

整体的代码分析

按照如图的拓扑:

55行的代码

1
self.host_mac_to={}

是为了将每个设备的id和其MAC地址相对应,方便后面我们知道是哪个设备。

89行:

1
2
self.arp_table.update({VIP:VMAC})
self.arp_table.update({VIP2:VMAC})

将虚拟地址的IP和MAC提前准备好,这样才能够发arp回复。

当摄像头第一次向虚拟地址发包的时候,因为没有mac地址,会发arp请求(578行),我们会给个虚拟arp回复。

随后,摄像头拿到mac成帧,向外发包,通过439行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 发往虚拟ip
if pkt_eth.dst == VMAC:
flag = 1
print('!!!!!发往虚拟ip!!!!!')

tsecond = time.asctime(time.localtime(time.time()))
second = tsecond.split(' ')[3].split(':')[2]
print(second, '!!GET TIME!!')

if pkt_ipv4:
if second <= '20': # 前20s发往本地计算单元
if pkt_ipv4.src == pc1_ip:
print('前20s,pc1发往pc2')
dstmac = pc2_mac
dstip = pc2_ip
if pkt_ipv4.src == pc4_ip:
print('前20s,pc4发往pc5')
dstmac = pc5_mac
dstip = pc5_ip
data = msg.data
actions = [parser.OFPActionSetField(ipv4_dst=dstip),
parser.OFPActionSetField(eth_dst=dstmac), parser.OFPActionOutput(port=2)]
self.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)

就像前面写的,对包进行拆分、组装,更换目的IP,然后再从对应端口发出去。

为了模拟,我们假装每分钟的20s~40s,为上下班高峰期,此时会将报文转发到远程的服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
if second > '20' and second <= '40':  # 中间20s发往远程服务器
if pkt_ipv4.src == pc1_ip:
print('中间20s,pc1发往pc3')
smac = gate1_mac

if pkt_ipv4.src == pc4_ip:
print('中间20s,pc4发往pc3')
smac = gate3_mac

actions = [parser.OFPActionOutput(port=gate_port)]
data = msg.data
self.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)

actions = [parser.OFPActionSetField(eth_src=smac),
parser.OFPActionSetField(ipv4_dst=pc3_ip),
parser.OFPActionSetField(eth_dst=gate2_mac),
parser.OFPActionOutput(port=gate_port)]
data = msg.data
datapath = self.datapaths[0]
self.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)

actions = [parser.OFPActionSetField(eth_src=gate2_mac),
parser.OFPActionSetField(ipv4_dst=pc3_ip),
parser.OFPActionSetField(eth_dst=pc3_mac), parser.OFPActionOutput(port=1)]
data = msg.data
datapath = self.datapaths[0]
self.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)

额外功能:

204行:

1
2
3
4
5
6
7
8
9
# ipv6 广播报文目的Mac开头都是33:33“
if '33:33' in dst_mac[:5]:
# the controller has not flooded this packet before
if (src_mac,dst_mac) not in self.flood_history[dpid]:
# we remember this packet
self.flood_history[dpid].append((src_mac,dst_mac))
else:
# the controller have flooded this packet before,we do nothing and return
return

防止ipv6的广播风暴。

为了能够检测整个网络的效果,还写了一个flask作为后端的web服务器,可以随时在线上查看网络的负载以及路由器的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import requests
import json
class ryu_restful:

def __init__(self):
self.URL='http://localhost:8080'
self.dpid=[]
def getall_sw(self):
url=self.URL+'/stats/switches'
data = requests.get(url)
if data.status_code == 200:
data_all = data.json()
print(data_all)
for i in data_all:
self.dpid.append(i)


def getall_sw_data(self):
for i in self.dpid:
url=self.URL+'/stats/flow/'+str(i)
data = requests.get(url)
if data.status_code == 200:
data_all = data.json()[str(i)]
action=data_all[0]['actions']
packet_count=data_all[3]['packet_count']
priority=data_all[6]['priority']
byte_count=data_all[8]['byte_count']


print("交换机{}的action模式为{}\n"\
"流过的包为{}\n"\
"流过的字节为{}\n"\
"优先级为{}\n".format(i,action,packet_count,byte_count,priority))


def add_flow(self):
url=sel.URL+'/stats/flowentry/add'
data= '{"dpid": 1,"cookie": 1,"cookie_mask": 1,"table_id": 0,"idle_timeout": 30,'\
' "hard_timeout": 30, "priority": 11111,"flags": 1,"match":{"in_port":1}, "actions":[{"type":"OUTPUT","port": 2}]}'

res=requests.post(url=url,data=data)
print(res.text)


def get_flow(self):

for i in self.dpid:
url=self.URL+'/stats/flow'+str(i)
data=requests.get(url)
if data.status_code==200:
data_all=data.json()[str(i)][0]
match=data_all['match']
actions=data_all['actions']
print('流{}的匹配表为{},动作为{}'.format(i,match,actions))



if __name__ == "__main__":
ryu=ryu_restful()
ryu.getall_sw()
ryu.getall_sw_data()
ryu.get_flow()

最后的效果

向虚拟地址发UDP报文:

远程监测网络运行情况: