Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::{Deserialize, Deserializer};
use std::{collections::HashMap, fmt::Display, net::Ipv4Addr, str::FromStr};
use syscalls::Sysno;

pub mod machine;
pub mod script;
pub mod worker;

Expand Down
92 changes: 92 additions & 0 deletions src/machine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use crate::script::ast::MachineInstruction;

use log::{debug, trace};
use std::{
fs,
io::{BufReader, prelude::*},
net::TcpListener,
thread,
};

#[derive(Debug)]
pub enum MachineError {
Internal,
}

/// Start a dummy listening server
fn start_server(addr: String, target_port: u16) -> Result<(), MachineError> {
debug!("Starting server at {:?}:{:?}", addr, target_port);

let listener = TcpListener::bind((addr, target_port)).unwrap();

for stream in listener.incoming() {
let mut stream = stream.unwrap();

// As a simplest solution to keep a connection open, spawn a
// thread. It's not the best one though, as we waste resources.
// For the purpose of only keeping connections open we could e.g.
// spawn only two threads, where the first one receives connections
// and adds streams into the list of active, and the second iterates
// through streams and replies. This way the connections will have
// high latency, but for the purpose of networking workload it
// doesn't matter.
thread::spawn(move || {
loop {
let mut buf_reader = BufReader::new(&stream);
let mut buffer = String::new();

match buf_reader.read_line(&mut buffer) {
Ok(0) => {
// EOF, exit
trace!("EOF");
return;
}
Ok(_n) => {
trace!("Received {:?}", buffer);

let response = "hello\n";
match stream.write_all(response.as_bytes()) {
Ok(_) => {
// Response is sent, handle the next one
}
Err(e) => {
trace!("ERROR: sending response, {}", e);
break;
}
}
}
Err(e) => {
trace!("ERROR: reading a line, {}", e)
}
}
}
});
}

Ok(())
}

/// Make sure a path exists
fn ensure_path(path: String) -> Result<(), MachineError> {
debug!("Create path {path:?}");
match fs::create_dir_all(path) {
Ok(()) => {
debug!("Success");
Ok(())
}
Err(e) => {
debug!("Failure {e:?}");
Err(MachineError::Internal)
}
}
}

pub fn apply(instr: MachineInstruction) -> Result<(), MachineError> {
match instr {
MachineInstruction::Server { port } => {
start_server("127.0.0.1".to_string(), port)
}
MachineInstruction::Profile { target: _ } => todo!(),
MachineInstruction::Path { value } => ensure_path(value),
}
}
13 changes: 12 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use berserker::{
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use berserker::machine::apply;
use berserker::script::{
ast::Node, parser::parse_instructions, rules::apply_rules,
};
Expand Down Expand Up @@ -65,12 +66,22 @@ fn run_script(script_path: String) -> Vec<(i32, u64)> {
parse_instructions(&std::fs::read_to_string(script_path).unwrap())
.unwrap();

let (_machine, works): (Vec<_>, Vec<_>) =
let (machine, works): (Vec<_>, Vec<_>) =
ast.iter().partition_map(|node| match node {
Node::Work { .. } => Either::Right(node),
Node::Machine { .. } => Either::Left(node),
});

if let Some(m) = machine.into_iter().next() {
let Node::Machine { m_instructions } = m.clone() else {
unreachable!()
};

for instr in m_instructions {
thread::spawn(move || apply(instr.clone()));
}
};

apply_rules(works)
.into_iter()
.flat_map(|node| {
Expand Down
Loading