Commit f34edd49 authored by Nawasan Wisitsingkhon's avatar Nawasan Wisitsingkhon

use same controller in multi thread and it don't need to create new Controller in thread

parent 0bde5d15
#![allow(unused)]
#![allow(unused_variables)]
/**
* Here is Controller you can modify and write the process or more you need.
* In production please remove allow unused.
*/
use crate::openflow::events::PacketInEvent;
use crate::openflow::{controller_frame::ControllerFrame, traiter::OfpMsgEvent};
use std::{collections::HashMap, net::TcpStream};
pub struct Controller<OME: OfpMsgEvent> {
ofp: OME,
mac_to_port: HashMap<u64, u16>,
}
impl<OME: OfpMsgEvent> ControllerFrame<OME> for Controller<OME> {
fn get_ofp(&self) -> &impl OfpMsgEvent {
&self.ofp
}
fn new(ofp: OME) -> Self {
Self {
ofp,
mac_to_port: HashMap::new(),
}
}
/**
* Start here for handle packetIn message.
*/
fn packet_in_handler(&mut self, xid: u32, packetin: PacketInEvent, stream: &mut TcpStream) {}
}
...@@ -2,4 +2,5 @@ pub mod openflow; ...@@ -2,4 +2,5 @@ pub mod openflow;
pub mod etherparser; pub mod etherparser;
mod tcp_listener; pub mod controller;
\ No newline at end of file pub use controller::Controller;
\ No newline at end of file
use tenjin::openflow::messages::Openflow10; use tenjin::{
use tenjin::openflow::Controller; openflow::{controller_frame::ControllerFrame, ofp_v1_0::Openflow10},
Controller,
};
extern crate byteorder; extern crate byteorder;
fn main() -> Result<(), std::io::Error> { fn main() -> Result<(), std::io::Error> {
// Controller::listener("127.0.0.1:6633", Openflow10::new());
// tcp_listener_handler(Controller::new(Openflow10), "127.0.0.1:6633");
Controller::listener("127.0.0.1:6633", Openflow10::new()); Controller::listener("127.0.0.1:6633", Openflow10::new());
Ok(()) Ok(())
} }
use std::{ use std::{
collections::HashMap,
io::{Read, Write}, io::{Read, Write},
net::TcpStream, net::TcpStream,
}; };
use crate::tcp_listener::tcp_listener_handler;
use super::{ use super::{
events::PacketInEvent, events::PacketInEvent,
messages::{ messages::{
traiter::{MessageMarshal, OfpMsgEvent}, traiter::{MessageMarshal, OfpMsgEvent},
OfpMsg, OfpMsg,
}, },
OfpHeader, tcp_listener_handler, OfpHeader,
}; };
pub struct Controller<OME: OfpMsgEvent> { pub trait ControllerFrame<OME: OfpMsgEvent> {
pub ofp: OME, fn get_ofp(&self) -> &impl OfpMsgEvent;
mac_to_port: HashMap<u64, u16>, fn packet_in_handler(&mut self, xid: u32, packetin: PacketInEvent, stream: &mut TcpStream);
} fn new(ofp: OME) -> Self;
impl<OME: OfpMsgEvent> Controller<OME> {
pub const OFP_1_0: u8 = 1;
pub fn new(ofp: OME) -> Self {
Self {
ofp,
mac_to_port: HashMap::new(),
}
}
pub fn listener(address: &str, ofp: OME) { fn listener(address: &str, ofp: OME) {
tcp_listener_handler(ofp.version() as u8, address); tcp_listener_handler::<OME>(address, ofp.version() as u8);
} }
pub fn request_handler(&mut self, buf: &mut Vec<u8>, stream: &mut TcpStream) { fn request_handler(&mut self, buf: &mut Vec<u8>, stream: &mut TcpStream) {
let ofp = self.get_ofp();
let ofp_header = OfpHeader::parse(&buf); let ofp_header = OfpHeader::parse(&buf);
let mut payload = vec![0u8; ofp_header.pkt_size()]; let mut payload = vec![0u8; ofp_header.pkt_size()];
let _ = stream.read(&mut payload); let _ = stream.read(&mut payload);
let message = self.ofp.msg_parse(ofp_header.message as u16); let message = ofp.msg_parse(ofp_header.message as u16);
match message { match message {
OfpMsg::Hello => self.send_msg(self.ofp.fetures_req(), ofp_header.xid, stream), OfpMsg::Hello => self.send_msg(ofp.fetures_req(), ofp_header.xid, stream),
OfpMsg::FeaturesReq => todo!(), OfpMsg::FeaturesReq => todo!(),
OfpMsg::PacketIn => { OfpMsg::PacketIn => {
self.packet_in_handler(ofp_header.xid, PacketInEvent::parse(&payload), stream); self.packet_in_handler(ofp_header.xid, PacketInEvent::parse(&payload), stream);
...@@ -49,34 +38,16 @@ impl<OME: OfpMsgEvent> Controller<OME> { ...@@ -49,34 +38,16 @@ impl<OME: OfpMsgEvent> Controller<OME> {
} }
} }
pub fn send_msg<T: MessageMarshal>(&self, msg: T, xid: u32, stream: &mut TcpStream) { fn send_msg<MSM: MessageMarshal>(&self, msg: MSM, xid: u32, stream: &mut TcpStream) {
let ofp = self.get_ofp();
let mut header_bytes: Vec<u8> = Vec::new(); let mut header_bytes: Vec<u8> = Vec::new();
let mut body_bytes: Vec<u8> = Vec::new(); let mut body_bytes: Vec<u8> = Vec::new();
msg.marshal(&mut body_bytes); msg.marshal(&mut body_bytes);
let ofp_header = let ofp_header = ofp.header(msg.msg_usize(ofp) as u8, body_bytes.len() as u16, xid);
self.ofp
.header(msg.msg_usize(&self.ofp) as u8, body_bytes.len() as u16, xid);
ofp_header.marshal(&mut header_bytes); ofp_header.marshal(&mut header_bytes);
header_bytes.append(&mut body_bytes); header_bytes.append(&mut body_bytes);
let _ = stream.write_all(&header_bytes); let _ = stream.write_all(&header_bytes);
} }
/**
* example of sending message
*/
pub fn hello(&self, stream: &mut TcpStream) {
let hello_msg = self.ofp.hello_event();
self.send_msg(hello_msg, 0, stream);
}
pub fn fetures_req(&self, xid: u32, stream: &mut TcpStream) {
let fetreq_msg = self.ofp.fetures_req();
self.send_msg(fetreq_msg, xid, stream);
}
pub fn packet_in_handler(&mut self, xid: u32, packetin: PacketInEvent, stream: &mut TcpStream) {
let ether = packetin.payload;
self.mac_to_port.insert(ether.mac_src, packetin.port);
}
} }
...@@ -25,7 +25,9 @@ impl OfpMsgEvent for Openflow10 { ...@@ -25,7 +25,9 @@ impl OfpMsgEvent for Openflow10 {
fn fetures_req(&self) -> FeaturesReq { fn fetures_req(&self) -> FeaturesReq {
FeaturesReq::new() FeaturesReq::new()
} }
fn ofp_version() -> usize {
1
}
fn version(&self) -> usize { fn version(&self) -> usize {
1 1
} }
......
...@@ -22,6 +22,7 @@ pub trait MessageMarshal { ...@@ -22,6 +22,7 @@ pub trait MessageMarshal {
pub trait OfpMsgEvent { pub trait OfpMsgEvent {
fn header(&self, message: u8, length: u16, xid: u32) -> OfpHeader; fn header(&self, message: u8, length: u16, xid: u32) -> OfpHeader;
fn version(&self) -> usize; fn version(&self) -> usize;
fn ofp_version() -> usize;
fn header_size(&self) -> usize; fn header_size(&self) -> usize;
fn msg_usize(&self, msg: OfpMsg) -> usize; fn msg_usize(&self, msg: OfpMsg) -> usize;
......
pub mod header; pub mod header;
pub use header::OfpHeader; pub use header::OfpHeader;
pub mod controller; pub mod controller_frame;
pub use controller::Controller;
pub mod events; pub mod events;
...@@ -11,3 +10,6 @@ pub use ofp_port::{OfpPort, PseudoPort}; ...@@ -11,3 +10,6 @@ pub use ofp_port::{OfpPort, PseudoPort};
pub mod messages; pub mod messages;
pub use messages::{ofp_v1_0, traiter}; pub use messages::{ofp_v1_0, traiter};
pub mod tcp_listener;
pub use tcp_listener::tcp_listener_handler;
\ No newline at end of file
use crate::ofp_from_version; use crate::openflow::{messages::Openflow10, traiter::OfpMsgEvent};
use crate::openflow::{messages::Openflow10, traiter::OfpMsgEvent, Controller}; use crate::{ofp_from_version, Controller};
use std::sync::{Arc, Mutex};
use std::{io::Read, net::TcpListener, thread}; use std::{io::Read, net::TcpListener, thread};
pub fn tcp_listener_handler(ofp_version: u8, address: &str) { use super::controller_frame::ControllerFrame;
use super::events::HelloEvent;
pub fn tcp_listener_handler<OME: OfpMsgEvent>(address: &str, ofp_version: u8) {
let controller = Arc::new(Mutex::from(Controller::new(ofp_from_version!(ofp_version))));
let listener = TcpListener::bind(address).unwrap(); let listener = TcpListener::bind(address).unwrap();
for stream in listener.incoming() { for stream in listener.incoming() {
match stream { match stream {
Ok(mut stream) => { Ok(mut stream) => {
let controller_clone = controller.clone();
thread::spawn(move || { thread::spawn(move || {
/* controller_clone
* when spawn new thread. .lock()
* The Controller will be create. .unwrap()
*/ .send_msg(HelloEvent::new(), 0, &mut stream);
let mut controller = Controller::new(ofp_from_version!(ofp_version)); let ofp_size = controller_clone.lock().unwrap().get_ofp().header_size();
controller.hello(&mut stream); // let ofp = controller.lock().unwrap().get_ofp();
let mut buffer = vec![0u8; controller.ofp.header_size()]; let mut buffer = vec![0u8; ofp_size];
loop { loop {
match stream.read(&mut buffer) { match stream.read(&mut buffer) {
Ok(v) if v > 0 => { Ok(v) if v > 0 => {
controller.request_handler(&mut buffer, &mut stream); controller_clone
.lock()
.unwrap()
.request_handler(&mut buffer, &mut stream);
} }
Ok(_) | Err(_) => break, Ok(_) | Err(_) => break,
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment