Commit a741b450 authored by Nawasan Wisitsingkhon's avatar Nawasan Wisitsingkhon

ofp13: converted to async with tokio;

I haven't tested yet; Just waiting for ofp10 to convert to async.
parent 28962c3e
......@@ -2,6 +2,21 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
dependencies = [
"gimli",
]
[[package]]
name = "adler2"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "anstream"
version = "0.6.15"
......@@ -51,12 +66,51 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "autocfg"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "backtrace"
version = "0.3.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
dependencies = [
"addr2line",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
"windows-targets",
]
[[package]]
name = "bitflags"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "clap"
version = "4.5.16"
......@@ -112,18 +166,111 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
[[package]]
name = "gimli"
version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "libc"
version = "0.2.159"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5"
[[package]]
name = "lock_api"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "memchr"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "miniz_oxide"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
dependencies = [
"adler2",
]
[[package]]
name = "mio"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec"
dependencies = [
"hermit-abi",
"libc",
"wasi",
"windows-sys",
]
[[package]]
name = "object"
version = "0.36.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e"
dependencies = [
"memchr",
]
[[package]]
name = "parking_lot"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets",
]
[[package]]
name = "pin-project-lite"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "proc-macro2"
version = "1.0.86"
......@@ -142,6 +289,52 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f"
dependencies = [
"bitflags",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
dependencies = [
"libc",
]
[[package]]
name = "smallvec"
version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
dependencies = [
"libc",
"windows-sys",
]
[[package]]
name = "strsim"
version = "0.11.1"
......@@ -166,6 +359,36 @@ dependencies = [
"byteorder",
"clap",
"clap_complete",
"tokio",
]
[[package]]
name = "tokio"
version = "1.40.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys",
]
[[package]]
name = "tokio-macros"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
......@@ -180,6 +403,12 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "windows-sys"
version = "0.52.0"
......
......@@ -19,18 +19,18 @@ path = "src/lib.rs"
[[bin]]
name = "tenjin"
path = "src/main.rs"
required-features = ["cli"]
[dependencies]
byteorder = "1.0.0"
clap = { version = "4.5.13", features = ["derive"], optional = true }
clap_complete = { version = "4.5.23", optional = true }
tokio = { version = "1.40.0", features = ["full"] }
[profile.release]
strip = true
[features]
default = []
default = ["full"]
example = []
cli = ["dep:clap", "dep:clap_complete", "example"]
full = ["cli"]
#![allow(unused)]
#![allow(unused_variables)]
use std::{collections::HashMap, net::TcpStream};
use crate::{
etherparser::{ether_type::EtherType, MacAddr},
openflow::ofp13::{
......@@ -10,6 +8,8 @@ use crate::{
ControllerFrame13, FlowModEvent, OfpMsgEvent, PacketInEvent,
},
};
use std::collections::HashMap;
use tokio::net::TcpStream;
/**
* Here is Controller you can modify and write the process or more you need.
* In production please remove allow unused.
......@@ -29,7 +29,7 @@ impl ControllerFrame13 for Controller13 {
/**
* Start here for handle packetIn message.
*/
fn switch_features_handler(
async fn switch_features_handler(
&self,
xid: u32,
features_reply: ofp13::FeaturesReplyEvent,
......@@ -38,8 +38,14 @@ impl ControllerFrame13 for Controller13 {
let matchs = MatchFields::match_all();
let actions = vec![Action::Oputput(ofp13::PseudoPort::Controller(!0))];
self.add_flow(xid, 0, matchs, &actions, 0, None, stream)
.await;
}
fn packet_in_handler(&mut self, xid: u32, packetin: PacketInEvent, stream: &mut TcpStream) {
async fn packet_in_handler(
&mut self,
xid: u32,
packetin: PacketInEvent,
stream: &mut TcpStream,
) {
let pkt = match packetin.ether_parse() {
Ok(pkt) => pkt,
Err(_) => return,
......@@ -74,8 +80,8 @@ impl ControllerFrame13 for Controller13 {
if let ofp13::PseudoPort::PhysicalPort(_) = out_port {
let mut match_fields = MatchFields::match_all();
match_fields.in_port = Some(in_port);
match_fields.eth_dst = Some(MacAddr::from(mac_dst));
match_fields.eth_src = Some(MacAddr::from(mac_src));
match_fields.eth_dst = Some(mac_dst);
match_fields.eth_src = Some(mac_src);
if let Some(buf_id) = packetin.buf_id {
self.add_flow(
xid,
......@@ -85,7 +91,8 @@ impl ControllerFrame13 for Controller13 {
packetin.table_id,
Some(buf_id),
stream,
);
)
.await;
return;
} else {
self.add_flow(
......@@ -96,18 +103,19 @@ impl ControllerFrame13 for Controller13 {
packetin.table_id,
None,
stream,
);
)
.await;
}
}
let packet_out = self
.ofp()
.packet_out(Some(in_port), packetin.payload, actions);
self.send_msg(packet_out, xid, stream);
self.send_msg(packet_out, xid, stream).await;
}
}
impl Controller13 {
fn add_flow(
async fn add_flow(
&self,
xid: u32,
priority: u16,
......@@ -122,5 +130,6 @@ impl Controller13 {
xid,
stream,
)
.await
}
}
use tenjin::cli;
use tenjin::{example::Controller13, openflow::ofp13::ControllerFrame13};
/**
* If you prefer to run only Controller without cli.
......@@ -10,6 +10,8 @@ fn main() {
}
```
*/
fn main() {
cli::system();
#[tokio::main]
async fn main() {
let controller = Controller13::new();
controller.listener("127.0.0.1:6653").await;
}
use crate::openflow::ofp13::{ErrorEvent, Msg, PacketInEvent};
use std::{
io::{Read, Write},
net::TcpStream,
};
use std::future::Future;
use super::{
events::{echo_reply::EchoReplyEvent, EchoRequestEvent},
tcp_listener_handler, FeaturesReplyEvent, MessageMarshal, OfpMsgEvent, Openflow13,
OpenflowHeader,
};
use crate::openflow::ofp13::{ErrorEvent, Msg, PacketInEvent};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
pub trait ControllerFrame13
where
Self: 'static,
{
pub trait ControllerFrame13: Send {
fn ofp(&self) -> Openflow13 {
Openflow13::new()
}
fn packet_in_handler(&mut self, xid: u32, packetin: PacketInEvent, stream: &mut TcpStream);
fn packet_in_handler(
&mut self,
xid: u32,
packetin: PacketInEvent,
stream: &mut TcpStream,
) -> impl Future<Output = ()> + Send;
fn new() -> Self;
fn listener(&self, address: &str)
fn listener(&self, address: &str) -> impl Future<Output = ()> + Send
where
Self: Sized,
Self: Send,
Self: Sized + 'static,
Self: Clone,
Self: Sync,
{
async move {
println!("server run at {}", address);
let _ = tcp_listener_handler(address, self.clone());
let _ = tcp_listener_handler(address, self).await;
}
}
fn handle_header(&mut self, buf: &mut Vec<u8>) -> Option<(u8, usize, u32)> {
let ofp_header = self.ofp().header_parse(&buf);
let ofp_header = self.ofp().header_parse(buf);
match ofp_header {
Ok(header) => Some((header.message(), header.pkt_size(), header.xid())),
Err(_) => None,
}
}
fn request_handler(&mut self, buf: &mut Vec<u8>, stream: &mut TcpStream) {
fn request_handler(
&mut self,
buf: &mut Vec<u8>,
stream: &mut TcpStream,
) -> impl Future<Output = ()> + Send
where
Self: Sync,
{
async move {
let ofp = self.ofp();
let (message, pkt_size, xid) = match self.handle_header(buf) {
Some(header) => header,
None => return,
};
let mut payload = vec![0u8; pkt_size];
let _ = stream.read(&mut payload);
let message = ofp.msg_parse(message as u8);
let _ = stream.read(&mut payload).await;
let message = ofp.msg_parse(message);
match message {
Msg::Hello => self.hello_handler(xid, stream),
Msg::Error => match ErrorEvent::parse(&payload) {
Ok(error) => self.error_handler(error),
Err(_) => (),
},
Msg::Hello => self.hello_handler(xid, stream).await,
Msg::Error => {
if let Ok(error) = ErrorEvent::parse(&payload) {
self.error_handler(error)
}
}
Msg::EchoRequest => {
self.echo_request_handler(xid, EchoRequestEvent::new(payload), stream)
.await
}
Msg::FeaturesReply => {
if let Ok(features) = FeaturesReplyEvent::parse(&payload) {
self.switch_features_handler(xid, features, stream).await
}
}
Msg::PacketIn => {
if let Ok(pkt_in) = PacketInEvent::parse(&payload) {
self.packet_in_handler(xid, pkt_in, stream).await
}
}
Msg::FeaturesReply => match FeaturesReplyEvent::parse(&payload) {
Ok(features) => self.switch_features_handler(xid, features, stream),
Err(_) => (),
},
Msg::PacketIn => match PacketInEvent::parse(&payload) {
Ok(pkt_in) => self.packet_in_handler(xid, pkt_in, stream),
Err(_) => (),
},
_ => (),
}
}
}
fn send_msg<MSM: MessageMarshal>(&self, msg: MSM, xid: u32, stream: &mut TcpStream) {
fn send_msg<MSM: MessageMarshal + std::marker::Send>(
&self,
msg: MSM,
xid: u32,
stream: &mut TcpStream,
) -> impl Future<Output = ()> + Send
where
Self: Sync,
{
async move {
let ofp = self.ofp();
let mut header_bytes: Vec<u8> = Vec::new();
let mut body_bytes: Vec<u8> = Vec::new();
......@@ -77,20 +104,37 @@ where
let ofp_header = ofp.header(msg.msg_usize() as u8, body_bytes.len() as u16, xid);
ofp_header.marshal(&mut header_bytes);
header_bytes.append(&mut body_bytes);
let _ = stream.write_all(&header_bytes);
let _ = stream.write_all(&header_bytes).await;
}
}
/**
* for handle message
*/
fn hello_handler(&self, xid: u32, stream: &mut TcpStream) {
self.send_msg(self.ofp().fetures_req(), xid, stream);
fn hello_handler(&self, xid: u32, stream: &mut TcpStream) -> impl Future<Output = ()> + Send
where
Self: Sync,
{
async move {
self.send_msg(self.ofp().fetures_req(), xid, stream).await;
}
}
fn error_handler(&self, error: ErrorEvent) {
println!("Error {:?} payload: {:x?}", error.error_type, error.payload);
}
fn echo_request_handler(&self, xid: u32, echo: EchoRequestEvent, stream: &mut TcpStream) {
self.send_msg(EchoReplyEvent::new(echo.payload), xid, stream);
fn echo_request_handler(
&self,
xid: u32,
echo: EchoRequestEvent,
stream: &mut TcpStream,
) -> impl Future<Output = ()> + Send
where
Self: Sync,
{
async move {
self.send_msg(EchoReplyEvent::new(echo.payload), xid, stream)
.await;
}
}
#[allow(unused)]
fn switch_features_handler(
......@@ -98,6 +142,7 @@ where
xid: u32,
features_reply: FeaturesReplyEvent,
stream: &mut TcpStream,
) {
) -> impl std::future::Future<Output = ()> + Send {
async {}
}
}
......@@ -14,7 +14,7 @@ impl Payload {
pub fn marshal(&self, bytes: &mut Vec<u8>) {
match self {
Payload::Buffered(_, buf) | Payload::NoBuffered(buf) => {
let _ = bytes.write_all(&buf);
let _ = bytes.write_all(buf);
}
}
}
......
......@@ -40,7 +40,7 @@ impl OpenflowHeader for OfpHeader {
size_of::<Self>()
}
fn pkt_size(&self) -> usize {
return self.length as usize - size_of::<Self>();
self.length as usize - size_of::<Self>()
}
fn parse(buf: &Vec<u8>) -> Result<Self, Error> {
......
......@@ -12,6 +12,12 @@ impl Openflow13 {
}
}
impl Default for Openflow13 {
fn default() -> Self {
Self::new()
}
}
impl OfpMsgEvent for Openflow13 {
fn header_parse(&self, bytes: &Vec<u8>) -> Result<OfpHeader, std::io::Error> {
OfpHeader::parse(bytes)
......
use std::{io::Read, net::TcpListener, thread};
use crate::openflow::ofp13::HelloEvent;
use super::{ControllerFrame13, OfpMsgEvent};
use crate::openflow::ofp13::HelloEvent;
use tokio::{
io::AsyncReadExt,
net::{TcpListener, TcpStream},
};
pub fn tcp_listener_handler(
pub async fn tcp_listener_handler(
address: &str,
controller: impl ControllerFrame13 + Send + 'static + Clone,
controller: &(impl ControllerFrame13 + 'static + Clone + Sync),
) -> Result<(), std::io::Error> {
let listener = TcpListener::bind(address)?;
for stream in listener.incoming() {
match stream {
Ok(mut stream) => {
let listener = TcpListener::bind(address).await?;
loop {
let (mut stream, _) = listener.accept().await?;
if let Ok(addr) = stream.peer_addr() {
println!("server has connection from {}", addr);
}
let mut ctrl = controller.clone();
thread::spawn(move || {
ctrl.send_msg(HelloEvent::new(), 0, &mut stream);
tokio::spawn(async move {
processing(&mut ctrl, &mut stream).await;
});
}
}
async fn processing(ctrl: &mut (impl ControllerFrame13 + Clone + Sync), stream: &mut TcpStream) {
ctrl.send_msg(HelloEvent::new(), 0, stream).await;
let ofp_size = ctrl.ofp().header_size();
let mut buffer = vec![0u8; ofp_size];
loop {
match stream.read(&mut buffer) {
match stream.read(&mut buffer).await {
Ok(v) if v > 0 => {
ctrl.request_handler(&mut buffer, &mut stream);
ctrl.request_handler(&mut buffer, stream).await;
}
Ok(_) => {
break;
......@@ -35,10 +41,4 @@ pub fn tcp_listener_handler(
}
}
}
});
}
Err(_) => panic!("Connection failed!"),
}
}
Ok(())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment