(* O'Caml high level Freenet client module *) (* by Travis Bemann *) (* *) (* This program is free software; you can redistribute it and/or *) (* modify it under the terms of the GNU Lesser General Public *) (* License as published by the Free Software Foundation; either *) (* version 2 of the License, or (at your option) any later version. *) (* *) (* This program is distributed in the hope that it will be useful, *) (* but WITHOUT ANY WARRANTY; without even the implied warranty of *) (* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *) (* GNU Lesser General Public License for more details. *) open Hlfreenet open Fieldset open Fstream (* Type for all callbacks (which have single parameters) *) type 'a action_result = Result of 'a | Error of exn (* This is the class type for node_async, and defines its *) (* interface. *) class type node_async_type = object method actions_max : int method splitfile_threads_max : int method block_len : int method attempts_max : int method dnf_retry_htl_mult : float method splitfile_block_len_try : int method set_actions_max : int -> unit method set_splitfile_threads_max : int -> unit method set_block_len : int -> unit method set_attempts_max : int -> unit method set_dnf_retry_htl_mult : float -> unit method set_splitfile_block_len_try : int -> unit method actions : int method wait_all : unit method request : callback:(fieldset action_result -> unit) -> uri:string -> htl:int -> stream:fstream_out -> unit method request_control_info : callback:(control_info action_result -> unit) -> uri:string -> htl:int -> unit method request_map : callback:(map action_result -> unit) -> uri:string -> htl:int -> unit method insert : callback:(string action_result -> unit) -> uri:string -> htl:int -> map:map -> stream:fstream_in -> unit method insert_no_map : callback:(string action_result -> unit) -> uri:string -> htl:int -> control_info:control_info -> stream:fstream_in -> unit method insert_info : callback:(string action_result -> unit) -> uri:string -> htl:int -> info:fieldset -> stream:fstream_in -> unit method insert_chk : callback:(string action_result -> unit) -> htl:int -> map:map -> stream:fstream_in -> unit method insert_chk_no_map : callback:(string action_result -> unit) -> htl:int -> control_info:control_info -> stream:fstream_in -> unit method insert_chk_info : callback:(string action_result -> unit) -> htl:int -> info:fieldset -> stream:fstream_in -> unit method insert_splitfile_blocks : callback:(control_split_file action_result -> unit) -> htl:int -> stream:fstream_in -> unit method insert_splitfile : callback:(string action_result -> unit) -> uri:string -> htl:int -> info:fieldset -> stream:fstream_in -> unit method insert_splitfile_chk : callback:(string action_result -> unit) -> htl:int -> info:fieldset -> stream:fstream_in -> unit method generate_chk : callback:(string action_result -> unit) -> map:map -> stream:fstream_in -> unit method generate_chk_no_map : callback:(string action_result -> unit) -> control_info:control_info -> stream:fstream_in -> unit method generate_chk_info : callback:(string action_result -> unit) -> info:fieldset -> stream:fstream_in -> unit method generate_svk : callback:((string * string) action_result -> unit) -> unit end (* This class forms a general multithreaded, asynchronous node *) (* interface; note that it encapsulates Hlfreenet, and that *) (* Hlfreenet handles splitfile request and insert multithreading. *) class node_async_core ~addr ~port ~actions_max:actions_max_init ~splitfile_threads_max:splitfile_threads_max_init ~block_len:block_len_init ~attempts_max:attempts_max_init ~dnf_retry_htl_mult:dnf_retry_htl_mult_init ~splitfile_block_len_try:splitfile_block_len_try_init = object (self) val params_mutex = Mutex.create () val mutable actions_max = actions_max_init val mutable splitfile_threads_max = splitfile_threads_max_init val mutable block_len = block_len_init val mutable attempts_max = attempts_max_init val mutable dnf_retry_htl_mult = dnf_retry_htl_mult_init val mutable splitfile_block_len_try = splitfile_block_len_try_init val node_mutex = Mutex.create () val mutable node = new node_hl ~addr:addr ~port:port ~threads_max:splitfile_threads_max_init ~block_len:block_len_init ~attempts_max:attempts_max_init ~dnf_retry_htl_mult:dnf_retry_htl_mult_init ~splitfile_block_len_try:splitfile_block_len_try_init val actions_mutex = Mutex.create () val mutable actions = 0 val action_queue_mutex = Mutex.create () val action_queue = Queue.create () (* Do an asynchronous high level request, and call a callback in a *) (* new thread once it is complete with the requested file's *) (* metadata. *) method request ~callback ~uri ~htl ~stream = let request () = try self#callback_result callback (node#request_hl ~uri:uri ~htl:htl ~stream:stream) with except -> self#callback_error callback except in self#create_thread request (* Do an asynchronous high level request, and call a callback in a *) (* new thread once it is complete with the requested file's control *) (* datastructure and info metadata pair; do not follow control *) (* metadata. *) method request_control_info ~callback ~uri ~htl = let request () = try self#callback_result callback (node#request_hl_control_info ~uri:uri ~htl:htl) with except -> self#callback_error callback except in self#create_thread request (* Do an asynchronous high level request, and call a callback in a *) (* new thread once it is complete with the requested file's map *) (* datastructure; do not follow control metadata. *) method request_map ~callback ~uri ~htl = let request () = try self#callback_result callback (node#request_hl_map ~uri:uri ~htl:htl) with except -> self#callback_error callback except in self#create_thread request (* Do an asynchronous high level insert of a file with a map *) (* datastructure, and call a callback in a new thread once it is *) (* complete with the inserted file's actual URI. *) method insert ~callback ~uri ~htl ~map ~stream = let insert () = try self#callback_result callback (node#insert_hl ~uri:uri ~htl:htl ~map:map ~stream:stream) with except -> self#callback_error callback except in self#create_thread insert (* Do an asynchronous high level insert of a file with a control *) (* datastructure and info metadata pair, and call a callback in a *) (* new thread once it is complete with the inserted file's actual*) (* URI. *) method insert_no_map ~callback ~uri ~htl ~control_info ~stream = let insert () = try self#callback_result callback (node#insert_hl_no_map ~uri:uri ~htl:htl ~control_info:control_info ~stream:stream) with except -> self#callback_error callback except in self#create_thread insert (* Do an asynchronous high level insert of a file with only info *) (* metadata, and call a callback in a new thread once it is *) (* complete with the inserted file's actual URI. *) method insert_info ~callback ~uri ~htl ~info ~stream = let insert () = try self#callback_result callback (node#insert_hl_info ~uri:uri ~htl:htl ~info:info ~stream:stream) with except -> self#callback_error callback except in self#create_thread insert (* Do an asynchronous high level insert of a file as a CHK with a *) (* map datastructure, and call a callback in a new thread once it *) (* is complete with the inserted file's actual URI. *) method insert_chk ~callback ~htl ~map ~stream = let insert () = try self#callback_result callback (node#insert_hl_chk ~htl:htl ~map:map ~stream:stream) with except -> self#callback_error callback except in self#create_thread insert (* Do an asynchronous high level insert of a file as a CHK with a *) (* control datastructure and info metadata pair, and call a *) (* callback in a new thread once it is complete with the inserted *) (* file's actual URI. *) method insert_chk_no_map ~callback ~htl ~control_info ~stream = let insert () = try self#callback_result callback (node#insert_hl_chk_no_map ~htl:htl ~control_info:control_info ~stream:stream) with except -> self#callback_error callback except in self#create_thread insert (* Do an asynchronous high level insert of a file as a CHK with *) (* onlyb info metadata, and call a callback in a new thread once it *) (* is complete with the inserted file's actual URI. *) method insert_chk_info ~callback ~htl ~info ~stream = let insert () = try self#callback_result callback (node#insert_hl_chk_info ~htl:htl ~info:info ~stream:stream) with except -> self#callback_error callback except in self#create_thread insert (* Do an asynchronous high level insert of a file as a group of *) (* splitfile blocks, and call a callback in a new thread once it *) (* is complete with the resulting splitfile datastructure. *) method insert_splitfile_blocks ~callback ~htl ~stream = let insert () = try self#callback_result callback (node#insert_hl_splitfile_blocks ~htl:htl ~stream:stream) with except -> self#callback_error callback except in self#create_thread insert (* Do an asynchronous high level insert of a file as a splitfile *) (* and insert the resulting splitfile control metadata and info *) (* metadata at a specified URI, and call a callback in a new thread *) (* once it is complete with the real inserted URI. *) method insert_splitfile ~callback ~uri ~htl ~info ~stream = let insert () = try self#callback_result callback (node#insert_hl_splitfile ~uri:uri ~htl:htl ~info:info ~stream:stream) with except -> self#callback_error callback except in self#create_thread insert (* Do an asynchronous high level insert of a file as a splitfile *) (* and insert the resulting splitfile control metadata and info *) (* metadata in a CHK, and call a callback in a new thread once it *) (* is complete with the real inserted URI. *) method insert_splitfile_chk ~callback ~htl ~info ~stream = let insert () = try self#callback_result callback (node#insert_hl_splitfile_chk ~htl:htl ~info:info ~stream:stream) with except -> self#callback_error callback except in self#create_thread insert (* Do an asynchronous high level CHK generation from a stream with *) (* a map datastructure. *) method generate_chk ~callback ~map ~stream = let generate () = try self#callback_result callback (node#generate_chk_hl ~map:map ~stream:stream) with except -> self#callback_error callback except in self#create_thread generate (* Do an asynchronous high level CHK generation from a stream with *) (* a control datastructure and info metadata pair. *) method generate_chk_no_map ~callback ~control_info ~stream = let generate () = try self#callback_result callback (node#generate_chk_hl_no_map ~control_info:control_info ~stream:stream) with except -> self#callback_error callback except in self#create_thread generate (* Do an asynchronous high level CHK generation from a stream with *) (* info metadata. *) method generate_chk_info ~callback ~info ~stream = let generate () = try self#callback_result callback (node#generate_chk_hl_info ~info:info ~stream:stream) with except -> self#callback_error callback except in self#create_thread generate (* Do an asynchronous SVK private-public key pair generation, where *) (* the first item of the tuple returned is the private key and the *) (* second item of the tupble returned is the public key. *) method generate_svk ~callback = let generate () = try self#callback_result callback (node#generate_svk) with except -> self#callback_error callback except in self#create_thread generate (* Get the current number of actions. *) method actions = Mutex.lock actions_mutex; let actions = actions in Mutex.unlock actions_mutex; actions (* Wait for all actions to complete. *) method wait_all = Mutex.lock actions_mutex; if actions > 0 then begin Mutex.unlock actions_mutex; Thread.yield (); self#wait_all end else Mutex.unlock actions_mutex (* Create a single new thread, with all the thread quota handling *) method private create_thread thunk = Mutex.lock actions_mutex; if actions < self#actions_max then begin actions <- actions + 1; Mutex.unlock actions_mutex; ignore (Thread.create thunk ()) end else begin Mutex.lock action_queue_mutex; Queue.add thunk action_queue; Mutex.unlock action_queue_mutex; Mutex.unlock actions_mutex end (* Handle callback signaling and new action dequeuing. *) method private callback_send : 'a . ('a action_result -> unit) -> 'a action_result -> unit = fun thunk data -> Mutex.lock action_queue_mutex; try let next_action = Queue.take action_queue in ignore (Thread.create next_action ()); Mutex.unlock action_queue_mutex; thunk data with Queue.Empty -> Mutex.lock actions_mutex; actions <- actions - 1; Mutex.unlock actions_mutex; Mutex.unlock action_queue_mutex; thunk data (* Send a successful result to a callback *) method private callback_result : 'a . ('a action_result -> unit) -> 'a -> unit = fun thunk result -> self#callback_send thunk (Result result) (* Set an exception to a callback *) method private callback_error : 'a . ('a action_result -> unit) -> exn -> unit = fun thunk except -> self#callback_send thunk (Error except) (* Simple getter methods *) method actions_max = Mutex.lock params_mutex; let actions_max = actions_max in Mutex.unlock params_mutex; actions_max method splitfile_threads_max = Mutex.lock params_mutex; let splitfile_threads_max = splitfile_threads_max in Mutex.unlock params_mutex; splitfile_threads_max method block_len = Mutex.lock params_mutex; let block_len = block_len in Mutex.unlock params_mutex; block_len method attempts_max = Mutex.lock params_mutex; let attempts_max = attempts_max in Mutex.unlock params_mutex; attempts_max method dnf_retry_htl_mult = Mutex.lock params_mutex; let dnf_retry_htl_mult = dnf_retry_htl_mult in Mutex.unlock params_mutex; dnf_retry_htl_mult method splitfile_block_len_try = Mutex.lock params_mutex; let splitfile_block_len_try = splitfile_block_len_try in Mutex.unlock params_mutex; splitfile_block_len_try (* Simple setter methods *) method set_actions_max actions_max_new = Mutex.lock params_mutex; actions_max <- actions_max_new; Mutex.unlock params_mutex; self#update_node method set_splitfile_threads_max splitfile_threads_max_new = Mutex.lock params_mutex; splitfile_threads_max <- splitfile_threads_max_new; Mutex.unlock params_mutex; self#update_node method set_block_len block_len_new = Mutex.lock params_mutex; block_len <- block_len_new; Mutex.unlock params_mutex; self#update_node method set_attempts_max attempts_max_new = Mutex.lock params_mutex; attempts_max <- attempts_max_new; Mutex.unlock params_mutex; self#update_node method set_dnf_retry_htl_mult dnf_retry_htl_mult_new = Mutex.lock params_mutex; dnf_retry_htl_mult <- dnf_retry_htl_mult_new; Mutex.unlock params_mutex; self#update_node method set_splitfile_block_len_try splitfile_block_len_try_new = Mutex.lock params_mutex; splitfile_block_len_try <- splitfile_block_len_try_new; Mutex.unlock params_mutex; self#update_node (* Regenerate the node object based on the current settings *) method private update_node = Mutex.lock params_mutex; Mutex.lock node_mutex; node <- new node_hl ~addr:addr ~port:port ~threads_max:splitfile_threads_max ~block_len:block_len ~attempts_max:attempts_max ~dnf_retry_htl_mult:dnf_retry_htl_mult ~splitfile_block_len_try:splitfile_block_len_try; Mutex.unlock params_mutex; Mutex.unlock node_mutex end (* This class forms the external interface for instantiating a *) (* general multithreaded, asynchronous node interface; note that it *) (* encapsulates Hlfreenet, and that Hlfreenet handles splitfile *) (* request and insert multithreading. *) class node_async ~addr ~port ~actions_max ~splitfile_threads_max ~block_len ~attempts_max ~dnf_retry_htl_mult ~splitfile_block_len_try = (node_async_core ~addr:addr ~port:port ~actions_max:actions_max ~splitfile_threads_max:splitfile_threads_max ~block_len:block_len ~attempts_max:attempts_max ~dnf_retry_htl_mult:dnf_retry_htl_mult ~splitfile_block_len_try:splitfile_block_len_try : node_async_type)