Commit db268eb4 authored by Nawasan Wisitsingkhon's avatar Nawasan Wisitsingkhon

finish: convert to tokio

but don't test yet.
parent 47372ac6
...@@ -4,7 +4,7 @@ use crate::{ ...@@ -4,7 +4,7 @@ use crate::{
}; };
use clap::{command, CommandFactory, Parser, Subcommand}; use clap::{command, CommandFactory, Parser, Subcommand};
use clap_complete::{generate, Shell}; use clap_complete::{generate, Shell};
use std::{io, thread}; use std::io;
#[derive(Parser)] #[derive(Parser)]
#[command(name = "tenjin",author, version, about, long_about = None)] #[command(name = "tenjin",author, version, about, long_about = None)]
...@@ -34,7 +34,7 @@ enum Commands { ...@@ -34,7 +34,7 @@ enum Commands {
Generate { shell: Shell }, Generate { shell: Shell },
} }
#[derive(Subcommand)] #[derive(Subcommand, Clone)]
pub enum Controllers { pub enum Controllers {
/// Openflow 1.3 with Controller13 /// Openflow 1.3 with Controller13
Ctrl13, Ctrl13,
...@@ -42,7 +42,7 @@ pub enum Controllers { ...@@ -42,7 +42,7 @@ pub enum Controllers {
Ctrl10, Ctrl10,
} }
pub fn system() { pub async fn system() {
let cli = Cli::parse(); let cli = Cli::parse();
match cli.command { match cli.command {
Commands::Run { Commands::Run {
...@@ -50,32 +50,19 @@ pub fn system() { ...@@ -50,32 +50,19 @@ pub fn system() {
port, port,
listen, listen,
} => { } => {
// creat runner function to run inside thread spawn for p in port.iter() {
let runner = match controller { let addr = format!("{}:{}", listen, p);
let controller = controller.clone();
tokio::spawn(async move {
match controller {
Some(controller) => match controller { Some(controller) => match controller {
Controllers::Ctrl13 => |addr: &str| { Controllers::Ctrl13 => Controller13::new().listener(&addr).await,
Controller13::new().listener(addr); Controllers::Ctrl10 => Controller10::new().listener(&addr).await,
},
Controllers::Ctrl10 => |addr: &str| {
Controller10::new().listener(addr);
},
}, },
// Set Default Controller at here // Set Default Controller at here
None => |addr: &str| { None => Controller13::new().listener(&addr).await,
Controller13::new().listener(addr);
},
};
// spawn and run threads
let mut thread_list = Vec::new();
for p in port.iter() {
let addr = format!("{}:{}", listen, p);
let t = thread::spawn(move || {
runner(&addr);
});
thread_list.push(t);
} }
for th in thread_list { });
let _ = th.join();
} }
} }
Commands::Generate { shell } => { Commands::Generate { shell } => {
......
#![allow(unused)] #![allow(unused)]
#![allow(unused_variables)] #![allow(unused_variables)]
use std::{collections::HashMap, net::TcpStream}; use std::collections::HashMap;
use tokio::net::TcpStream;
use crate::{ use crate::{
etherparser::ether_type::EtherType, etherparser::ether_type::EtherType,
...@@ -29,7 +30,12 @@ impl ControllerFrame10 for Controller10 { ...@@ -29,7 +30,12 @@ impl ControllerFrame10 for Controller10 {
/** /**
* Start here for handle packetIn message. * Start here for handle packetIn message.
*/ */
fn packet_in_handler(&mut self, xid: u32, packetin: PacketInEvent, stream: &mut TcpStream) { async fn packet_in_handler(
&mut self,
xid: u32,
packetin: PacketInEvent,
stream: &mut TcpStream,
) {
let pkt = match packetin.ether_parse() { let pkt = match packetin.ether_parse() {
Ok(pkt) => pkt, Ok(pkt) => pkt,
Err(_) => return, Err(_) => return,
...@@ -64,21 +70,23 @@ impl ControllerFrame10 for Controller10 { ...@@ -64,21 +70,23 @@ impl ControllerFrame10 for Controller10 {
match_fields.mac_dest = Some(mac_dst); match_fields.mac_dest = Some(mac_dst);
match_fields.mac_src = Some(mac_src); match_fields.mac_src = Some(mac_src);
if let Some(buf_id) = packetin.buf_id { if let Some(buf_id) = packetin.buf_id {
self.add_flow(xid, 1, match_fields, &actions, Some(buf_id as u32), stream); self.add_flow(xid, 1, match_fields, &actions, Some(buf_id as u32), stream)
.await;
return; return;
} else { } else {
self.add_flow(xid, 1, match_fields, &actions, None, stream); self.add_flow(xid, 1, match_fields, &actions, None, stream)
.await;
} }
} }
let packet_out = self let packet_out = self
.ofp() .ofp()
.packet_out(Some(packetin.in_port), packetin.payload, actions); .packet_out(Some(packetin.in_port), packetin.payload, actions);
self.send_msg(packet_out, xid, stream); self.send_msg(packet_out, xid, stream).await;
} }
} }
impl Controller10 { impl Controller10 {
fn add_flow( async fn add_flow(
&self, &self,
xid: u32, xid: u32,
priority: u16, priority: u16,
...@@ -92,5 +100,6 @@ impl Controller10 { ...@@ -92,5 +100,6 @@ impl Controller10 {
xid, xid,
stream, stream,
) )
.await;
} }
} }
use tenjin::{example::Controller13, openflow::ofp13::ControllerFrame13}; use tenjin::{example::{Controller10, Controller13}, openflow::{ofp10::ControllerFrame10, ofp13::ControllerFrame13}};
/** /**
* If you prefer to run only Controller without cli. * If you prefer to run only Controller without cli.
...@@ -12,6 +12,6 @@ fn main() { ...@@ -12,6 +12,6 @@ fn main() {
*/ */
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let controller = Controller13::new(); let controller = Controller10::new();
controller.listener("127.0.0.1:6653").await; controller.listener("127.0.0.1:6653").await;
} }
use crate::openflow::ofp10::{self, ErrorEvent, Msg, PacketInEvent}; use tokio::{
use std::{ io::{AsyncReadExt, AsyncWriteExt},
io::{Read, Write},
net::TcpStream, net::TcpStream,
}; };
use crate::openflow::ofp10::{self, ErrorEvent, Msg, PacketInEvent};
use std::future::Future;
use super::{ use super::{
events::{echo_reply::EchoReplyEvent, EchoRequestEvent}, events::{echo_reply::EchoReplyEvent, EchoRequestEvent},
tcp_listener_handler, MessageMarshal, OfpMsgEvent, Openflow10, OpenflowHeader, tcp_listener_handler, MessageMarshal, OfpMsgEvent, Openflow10, OpenflowHeader,
}; };
pub trait ControllerFrame10 pub trait ControllerFrame10: Send {
where
Self: 'static,
{
fn ofp(&self) -> ofp10::Openflow10 { fn ofp(&self) -> ofp10::Openflow10 {
Openflow10::new() Openflow10::new()
} }
fn packet_in_handler(&mut self, xid: u32, packetin: PacketInEvent, stream: &mut TcpStream); fn packet_in_handler(
&mut self,
xid: u32,
packetin: PacketInEvent,
stream: &mut TcpStream,
) -> impl Future<Output = ()> + Send;
fn new() -> Self; fn new() -> Self;
fn listener(&self, address: &str) fn listener(&self, address: &str) -> impl Future<Output = ()> + Send
where where
Self: Sized, Self: Sized + 'static,
Self: Send,
Self: Clone, Self: Clone,
Self: Sync,
{ {
async move {
println!("server run at {}", address); println!("server run at {}", address);
let _ = tcp_listener_handler(address, self.clone()); let _ = tcp_listener_handler(address, self).await;
}
} }
fn handle_header(&mut self, buf: &mut Vec<u8>) -> Option<(u8, usize, u32)> { fn handle_header(&mut self, buf: &mut Vec<u8>) -> Option<(u8, usize, u32)> {
...@@ -37,33 +43,53 @@ where ...@@ -37,33 +43,53 @@ where
} }
} }
fn request_handler(&mut self, buf: &mut Vec<u8>, stream: &mut TcpStream) { fn request_handler(
&mut self,
buf: &mut Vec<u8>,
stream: &mut TcpStream,
) -> impl Future<Output = ()> + Send
where
Self: Sync,
{
async move {
let ofp = self.ofp(); let ofp = self.ofp();
let (message, pkt_size, xid) = match self.handle_header(buf) { let (message, pkt_size, xid) = match self.handle_header(buf) {
Some(header) => header, Some(header) => header,
None => return, None => return,
}; };
let mut payload = vec![0u8; pkt_size]; let mut payload = vec![0u8; pkt_size];
let _ = stream.read(&mut payload); let _ = stream.read(&mut payload).await;
let message = ofp.msg_parse(message as u8); let message = ofp.msg_parse(message as u8);
match message { match message {
Msg::Hello => self.hello_handler(xid, stream), Msg::Hello => self.hello_handler(xid, stream).await,
Msg::Error => match ErrorEvent::parse(&payload) { Msg::Error => {
Ok(error) => self.error_handler(error), if let Ok(error) = ErrorEvent::parse(&payload) {
Err(_) => (), self.error_handler(error)
}, }
}
Msg::EchoRequest => { Msg::EchoRequest => {
self.echo_request_handler(xid, EchoRequestEvent::new(payload), stream) self.echo_request_handler(xid, EchoRequestEvent::new(payload), stream)
.await
} }
Msg::PacketIn => match PacketInEvent::parse(&payload) { Msg::PacketIn => match PacketInEvent::parse(&payload) {
Ok(pkt_in) => self.packet_in_handler(xid, pkt_in, stream), Ok(pkt_in) => self.packet_in_handler(xid, pkt_in, stream).await,
Err(_) => (), Err(_) => (),
}, },
_ => (), _ => (),
} }
} }
}
fn send_msg<MSM: MessageMarshal>(&self, msg: MSM, xid: u32, stream: &mut TcpStream) { fn send_msg<MSM: MessageMarshal + Send>(
&self,
msg: MSM,
xid: u32,
stream: &mut TcpStream,
) -> impl Future<Output = ()> + Send
where
Self: Sync,
{
async move {
let ofp = self.ofp(); let ofp = self.ofp();
let mut header_bytes: Vec<u8> = Vec::new(); let mut header_bytes: Vec<u8> = Vec::new();
let mut body_bytes: Vec<u8> = Vec::new(); let mut body_bytes: Vec<u8> = Vec::new();
...@@ -72,19 +98,36 @@ where ...@@ -72,19 +98,36 @@ where
let ofp_header = ofp.header(msg.msg_usize() as u8, body_bytes.len() as u16, xid); let ofp_header = ofp.header(msg.msg_usize() as u8, body_bytes.len() as u16, xid);
ofp_header.marshal(&mut header_bytes); ofp_header.marshal(&mut header_bytes);
header_bytes.append(&mut body_bytes); header_bytes.append(&mut body_bytes);
let _ = stream.write_all(&header_bytes); let _ = stream.write_all(&header_bytes).await;
}
} }
/** /**
* for handle message * for handle message
*/ */
fn hello_handler(&self, xid: u32, stream: &mut TcpStream) { fn hello_handler(&self, xid: u32, stream: &mut TcpStream) -> impl Future<Output = ()> + Send
self.send_msg(self.ofp().fetures_req(), xid, stream); where
Self: Sync,
{
async move {
self.send_msg(self.ofp().fetures_req(), xid, stream).await;
}
} }
fn error_handler(&self, error: ErrorEvent) { fn error_handler(&self, error: ErrorEvent) {
println!("Error {:?}", error.error_type); println!("Error {:?}", error.error_type);
} }
fn echo_request_handler(&self, xid: u32, echo: EchoRequestEvent, stream: &mut TcpStream) { fn echo_request_handler(
self.send_msg(EchoReplyEvent::new(echo.payload), xid, stream); &self,
xid: u32,
echo: EchoRequestEvent,
stream: &mut TcpStream,
) -> impl Future<Output = ()> + Send
where
Self: Sync,
{
async move {
self.send_msg(EchoReplyEvent::new(echo.payload), xid, stream)
.await;
}
} }
} }
use std::{io::Read, net::TcpListener, thread};
use crate::openflow::ofp10::HelloEvent;
use super::{ControllerFrame10, OfpMsgEvent}; use super::{ControllerFrame10, OfpMsgEvent};
use crate::openflow::ofp10::HelloEvent;
use tokio::{
io::AsyncReadExt,
net::{TcpListener, TcpStream},
};
pub fn tcp_listener_handler( pub async fn tcp_listener_handler(
address: &str, address: &str,
controller: impl ControllerFrame10 + Send + 'static + Clone, controller: &(impl ControllerFrame10 + 'static + Clone + Sync),
) -> Result<(), std::io::Error> { ) -> Result<(), std::io::Error> {
let listener = TcpListener::bind(address)?; let listener = TcpListener::bind(address).await?;
for stream in listener.incoming() { loop {
match stream { let (mut stream, _) = listener.accept().await?;
Ok(mut stream) => {
if let Ok(addr) = stream.peer_addr() { if let Ok(addr) = stream.peer_addr() {
println!("server has connection from {}", addr); println!("server has connection from {}", addr);
} }
let mut ctrl = controller.clone(); let mut ctrl = controller.clone();
thread::spawn(move || { tokio::spawn(async move {
ctrl.send_msg(HelloEvent::new(), 0, &mut stream); processing(&mut ctrl, &mut stream).await;
});
}
}
async fn processing(ctrl: &mut (impl ControllerFrame10 + Clone + Sync), stream: &mut TcpStream) {
ctrl.send_msg(HelloEvent::new(), 0, stream).await;
let ofp_size = ctrl.ofp().header_size(); let ofp_size = ctrl.ofp().header_size();
let mut buffer = vec![0u8; ofp_size]; let mut buffer = vec![0u8; ofp_size];
loop { loop {
match stream.read(&mut buffer) { match stream.read(&mut buffer).await {
Ok(v) if v > 0 => { Ok(v) if v > 0 => {
ctrl.request_handler(&mut buffer, &mut stream); ctrl.request_handler(&mut buffer, stream).await;
} }
Ok(_) => { Ok(_) => {
break; break;
...@@ -35,10 +41,4 @@ pub fn tcp_listener_handler( ...@@ -35,10 +41,4 @@ pub fn tcp_listener_handler(
} }
} }
} }
});
}
Err(_) => panic!("Connection failed!"),
}
}
Ok(())
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment