src/torrent/get_file.rs
// main entrance function
pub async fn download_torrent(filename: &String, db: &Db, memory_data: mpsc::Sender<String>, verified_transactions: Arc<Mutex<mpsc::Receiver<Vec<String>>>>) -> Result<(), String> {
// read the torrent from file
let mut torrent = match read_torrent_file(&filename).await {
Ok(t) => t,
Err(err) => return Err(format!("Failed to build Torrent struct: {}", err)),
};
// get the block number from the torrent
let block_number = extract_block_number(&mut torrent).await;
// get the connected nodes from memory
let connected_nodes = get_nodes_from_memory().await;
// downlaod and save the block
download_and_save_block_async(torrent, connected_nodes, block_number, &db, memory_data, verified_transactions).await
}
// function to get the block number from the torrent file
async fn extract_block_number(torrent: &mut Torrent) -> u32 {
if let Some(index) = torrent.info.name.find('.') {
if let Ok(block_number) = torrent.info.name[..index].parse() {
return block_number;
}
}
0
}
// function to read the torrent from file
async fn read_torrent_file(file_path: &str) -> Result<Torrent, io::Error> {
let mut file = File::open(file_path)?;
let mut file_content = String::new();
file.read_to_string(&mut file_content)?;
let torrent: Torrent = serde_json::from_str(&file_content)?;
Ok(torrent)
}
// function to call all the nodes from memory and return the stream
async fn get_nodes_from_memory() -> Vec<(String, u16, Arc<Mutex<TcpStream>>)> {
let memory_storage = GLOBAL_MEMORY.read().unwrap();
let mut nodes = Vec::new();
if let Some(memory) = &*memory_storage {
for connection_info in memory.connection_map.values() {
let ip = connection_info.ip.clone();
let port = connection_info.port;
let stream_arc = Arc::clone(&connection_info.stream);
nodes.push((ip, port, stream_arc));
}
}
drop(memory_storage);
nodes
}
// this function will connect to a given stream and request a piece of the block from the node
async fn request_piece_from_node(
stream: Arc<Mutex<TcpStream>>,
block_number: u32,
piece_number: u8,
piece_length: usize,
total_length: usize,
) -> Result<Vec<u8>, String> {
let mut stream_lock = stream.lock().await;
let exact_piece_size = if (piece_number as usize) * piece_length < total_length {
piece_length
} else {
total_length % piece_length
};
let mut request = Vec::new();
request.push(24u8);
request.extend_from_slice(&(block_number as u32).to_le_bytes());
request.push(piece_number);
stream_lock.write_all(&request).await
.map_err(|e| format!("Failed to send request. Error: {}", e))?;
let mut response = vec![0; exact_piece_size];
stream_lock.read_exact(&mut response).await
.map_err(|e| format!("Failed to read response. Error: {}", e))?;
Ok(response)
}
// Function to save a piece in the named tree "block_pieces"
fn save_piece_to_db(db: &Db, block_number: u32, piece_number: u32, data: &[u8]) {
let tree = db.open_tree("block_pieces").unwrap();
let key = format!("{}-{}", block_number, piece_number);
tree.insert(key.as_bytes(), data).unwrap();
}
// Function to remove all pieces associated with a block from the named tree "block_pieces"
fn remove_block_pieces_from_db(db: &Db, block_number: u32) {
let tree = db.open_tree("block_pieces").unwrap();
let prefix = format!("{}-", block_number);
let iter = tree.range(prefix.as_bytes()..);
for result in iter {
match result {
Ok((key, _value)) => {
if !key.starts_with(prefix.as_bytes()) {
break;
}
tree.remove(key).unwrap();
}
Err(_) => {
// Handle the error as needed
}
}
}
}
// this function requests pieces from different nodes and validates each piece as it is received.
// It adds each piece to the database so others can request a piece while other pieces as still
// being downloaded. When all peices have been aquired it combines and saves, then cleans the
// database.
async fn download_and_save_block_async(
torrent: Torrent,
connected_nodes: Vec<(String, u16, Arc<Mutex<TcpStream>>)>,
block_number: u32,
db: &Db,
memory_data: mpsc::Sender<String>,
verified_transactions: Arc<Mutex<mpsc::Receiver<Vec<String>>>>
) -> Result<(), String> {
let mut pieces = HashMap::new();
for piece_number in 1..=torrent.info.pieces.len() as u8 {
let mut piece_downloading = false;
for (_ip, _port, stream) in &connected_nodes {
if pieces.get(&piece_number).is_some() {
piece_downloading = true;
break;
}
match request_piece_from_node(stream.clone(), block_number, piece_number, torrent.info.piece_length as usize, torrent.info.length as usize).await {
Ok(data) => {
if let Some(expected_hash) = torrent.info.pieces[piece_number as usize - 1].get(&piece_number) {
if validate_hash(&data, expected_hash).await {
save_piece_to_db(db, block_number, piece_number as u32, &data);
pieces.insert(piece_number, data);
piece_downloading = true;
break;
} else {
eprintln!("Hash validation failed for piece {} from node.", piece_number);
}
} else {
eprintln!("Expected hash not found for piece {} from node.", piece_number);
}
}
Err(err) => {
eprintln!("Error downloading piece {} - {}", piece_number, err);
}
}
}
if !piece_downloading {
eprintln!("No node had the piece {}.", piece_number);
}
}
let ordered_pieces: Vec<u8> = (1..=pieces.len() as u8)
.flat_map(|piece_number| pieces.remove(&piece_number).expect("Piece not found"))
.collect();
if validate_hash(&ordered_pieces, &torrent.info.info_hash).await {
let json_block = load_block_from_binary(&ordered_pieces);
let json_block_str: Result<&str, &str> = json_block
.as_ref()
.map(|s| s.as_str())
.map_err(|s| s.as_str());
drop(ordered_pieces);
match json_block_str {
Ok(str_value) => {
let verification_result = verify_block_data(&str_value, db, memory_data, verified_transactions.clone()).await;
match verification_result {
Ok(signatures) => {
let _ = save_block(str_value.to_string(), db, torrent.info.block_hash.clone(), signatures, "updating".to_string()).await;
}
Err(err) => {
println!("Invalid block: {}", err);
}
}
}
Err(_) => {
}
}
} else {
eprintln!("Hash validation failed for complete block");
}
remove_block_pieces_from_db(db, block_number);
Ok(())
}