open Nonstd
open Pvem_lwt_unix
open Pvem_lwt_unix.Deferred_result
module String = StringLabels
let debug = ref false
let dbg fmt = ksprintf (eprintf "Trakeva_sqlite: %s\n%!") fmt
type t = {
handle: Sqlite3.db;
action_mutex: Lwt_mutex.t;
}
let in_posix_thread ~on_exn f =
Lwt_preemptive.detach (fun () ->
try `Ok (f ())
with e -> on_exn e) ()
let escape_blob s =
let b = Buffer.create (String.length s * 2 + 4) in
Buffer.add_string b "X'";
String.iter s ~f:(fun c ->
Buffer.add_string b (sprintf "%02X" (int_of_char c));
);
Buffer.add_char b '\'';
Buffer.contents b
let default_table = "trakeva_default_table"
let none_blob = "trakeva_none_collection"
let option_equals =
function
| None -> sprintf "= %s" (escape_blob none_blob)
| Some s -> sprintf "= %s" (escape_blob s)
let option_insert =
function
| None -> escape_blob none_blob
| Some s -> escape_blob s
let validate_collection =
function
| None -> ()
| Some c when c = none_blob ->
ksprintf
failwith "The collection name %s is reserved by Trakeva_sqlite" none_blob
| Some _ -> ()
let create_table t =
sprintf "CREATE TABLE IF NOT EXISTS %s (collection BLOB, key BLOB, value BLOB, PRIMARY KEY(collection, key) ON CONFLICT REPLACE)" t
let get_statement table collection key =
sprintf "SELECT value FROM %s WHERE key = %s AND collection %s" table
(escape_blob key)
(option_equals collection)
let get_all_statement table collection =
sprintf "SELECT key FROM %s WHERE collection %s ORDER BY key" table
(option_equals collection)
let keys_statement table collection =
sprintf "SELECT DISTINCT key FROM %s WHERE collection %s"
table (option_equals collection)
let set_statement table collection key value =
sprintf "INSERT OR REPLACE INTO %S (collection, key, value) VALUES (%s, %s, %s)"
table
(option_insert collection)
(escape_blob key) (escape_blob value)
let unset_statement table collection key =
sprintf "DELETE FROM %s WHERE key = %s AND collection %s" table
(escape_blob key)
(option_equals collection)
let with_executed_statement handle statement f =
let prep = Sqlite3.prepare handle statement in
if !debug then
dbg "exec: %S\n → counts: %d, %d"
statement (Sqlite3.column_count prep) (Sqlite3.data_count prep);
(try
let x = f prep in
let _ = Sqlite3.finalize prep in
x
with e ->
let _ = Sqlite3.finalize prep in
raise e)
let is_ok_or_done_exn handle (rc: Sqlite3.Rc.t) =
let open Sqlite3.Rc in
match rc with
| OK -> ()
| DONE -> ()
| _ -> failwith (sprintf "not ok/done: %s (global error: %s)"
(Sqlite3.Rc.to_string rc)
(Sqlite3.errmsg handle))
let get_row_exn prep =
match Sqlite3.step prep with
| Sqlite3.Rc.ROW -> Sqlite3.column prep 0, true
| Sqlite3.Rc.DONE -> Sqlite3.column prep 0, false
| rc -> failwith (sprintf "not a row: %s" (Sqlite3.Rc.to_string rc))
let exec_unit_exn handle statement =
with_executed_statement handle statement (fun prep ->
Sqlite3.step prep |> is_ok_or_done_exn handle)
let string_option_data_exn data =
let open Sqlite3.Data in
begin match data with
| NONE -> failwith "string_option_data_exn: none"
| NULL -> None
| INT _ -> failwith "string_option_data_exn: int"
| FLOAT _ -> failwith "string_option_data_exn: float"
| TEXT s
| BLOB s -> Some s
end
let exec_option_exn handle statement =
with_executed_statement handle statement (fun prep ->
let first, (_ : bool) = get_row_exn prep in
string_option_data_exn first)
let exec_list_exn handle statement =
with_executed_statement handle statement begin fun prep ->
let ret = ref [] in
let rec loop () =
let row, more_to_come = get_row_exn prep in
match string_option_data_exn row with
| Some one -> ret:= one :: !ret;
if more_to_come then loop () else ()
| None -> ()
in
loop ();
!ret
end
let load path =
let on_exn e = `Error (`Database (`Load path, Printexc.to_string e)) in
begin
try if Sys.getenv "TRAKEVA_SQLITE_DEBUG" = "true" then debug := true
with _ -> ()
end;
let action_mutex = Lwt_mutex.create () in
in_posix_thread ~on_exn (fun () ->
if !debug then
dbg "openning: %S" path;
let handle = Sqlite3.db_open ~mutex:`FULL ~cache:`PRIVATE path in
exec_unit_exn handle (create_table default_table);
{handle; action_mutex}
)
let close {handle} =
let on_exn e = `Error (`Database (`Close, Printexc.to_string e)) in
in_posix_thread ~on_exn begin fun () ->
let rec loop = function
| 0 -> failwith "failed to close (busy many times)"
| n ->
if Sqlite3.db_close handle then () else (
Sqlite3.sleep 100 |> ignore;
if !debug then
dbg "closing, %d attempts left" (n - 1);
loop (n - 1)
)
in
loop 5
end
open Trakeva
let get ?collection t ~key =
let statement = get_statement default_table collection key in
let error_loc = `Get (Key_in_collection.create key ?collection) in
let on_exn e = `Error (`Database (error_loc , Printexc.to_string e)) in
in_posix_thread ~on_exn (fun () ->
exec_option_exn t.handle statement
)
let get_all t ~collection =
let statement = get_all_statement default_table (Some collection) in
let error_loc = `Get_all collection in
let on_exn e = `Error (`Database (error_loc , Printexc.to_string e)) in
in_posix_thread ~on_exn (fun () ->
exec_list_exn t.handle statement
)
let iterator t ~collection =
let keys_statement = keys_statement default_table (Some collection) in
let error_loc = `Iter collection in
let on_exn e = `Error (`Database (error_loc , Printexc.to_string e)) in
let state = ref None in
let rec next_exn prep =
if !debug then
dbg "exec: %S\n → counts: %d, %d"
keys_statement (Sqlite3.column_count prep) (Sqlite3.data_count prep);
try
let row, more_to_come = get_row_exn prep in
begin match string_option_data_exn row with
| Some one_key -> Some one_key
| None ->
let _ = Sqlite3.finalize prep in
None
end
with e ->
let _ = Sqlite3.finalize prep in
raise e
in
begin fun () ->
in_posix_thread ~on_exn (fun () ->
match !state with
| None ->
let prep = Sqlite3.prepare t.handle keys_statement in
begin match Sqlite3.prepare_tail prep with
| None -> state := Some prep;
| Some p -> state := Some p;
end;
next_exn prep
| Some prep ->
next_exn prep
)
end
let act t ~(action: Action.t) =
let rec transact (action: Action.t) =
let open Key_in_collection in
let open Action in
match action with
| Set ({ key; collection }, value) ->
validate_collection collection;
let statement = set_statement default_table collection key value in
exec_unit_exn t.handle statement;
true
| Unset { key; collection } ->
validate_collection collection;
let statement = unset_statement default_table collection key in
exec_unit_exn t.handle statement;
true
| Sequence l -> List.for_all l ~f:transact
| Check ({ key; collection }, opt) ->
validate_collection collection;
let statement = get_statement default_table collection key in
exec_option_exn t.handle statement = opt
in
let error_loc = `Act action in
let on_exn e = `Error (`Database (error_loc , Printexc.to_string e)) in
Lwt_mutex.with_lock t.action_mutex (fun () ->
in_posix_thread ~on_exn begin fun () ->
exec_unit_exn t.handle "BEGIN TRANSACTION";
begin match transact action with
| false ->
exec_unit_exn t.handle "ROLLBACK";
`Not_done
| true ->
exec_unit_exn t.handle "COMMIT";
`Done
end
end
)