(Implementation of the rsync algorithm by Norman Ramsey <nr@alumni.princeton.edu>.)
The rsync algorithm is a technique for reducing the cost of a file transfer by avoiding the transfer of blocks that are already at the destination [cite tridgell:rsync]. Imagine we have source and destination computers that have files X and Y respectively, where X and Y are similar.
The algorithm proceeds as follows:
We can compute the checksum of the whole string, or we can build up a checksum incrementally by appending a character or ``rolling through'' a character stream.
<checksum.mli>= <checksum.ml>
<checksum.ml>= (<-U) (* module Checksum *) module type S = sig include Hashtbl.HashedType (* gets type t, which probably fits in a machine word *) val string : string -> t val append : t -> char -> t (* append (string s) c = string (s ^ String.make 1 c) *) val roll : int -> t -> char -> char -> t (* provided r = roll (1 + String.length s), then r (string (String.make 1 c ^ s)) c c' = string (s ^ String.make 1 c') *) end
The costs of append
and roll
should be small constants.
The cost of string
should be linear in the size of its argument.
roll k
may perform some interesting computation, the results of which should be
cached.
The MD5 digest could be a suitable implementation of fingerprints.
<fingerprint.mli>= <fingerprint.ml>
<fingerprint.ml>= (<-U) (* module Fingerprint *) module type S = sig type t val string : string -> t (* a reliable fingerprint *) end
The idea is that synchronization happens in three steps:
findBlocks
to compute the checksums
and fingerprints of the blocks of
the old file.
It sends this block_info
to the source computer.
compress
over the new file, producing a
list of transfer instructions, which it sends back to the destination
computer.
decompress
, using the transfer
instructions to convert the old file to the new file.
<rsync.mli>= (* module type Rsync *) module Make : functor (Fingerprint : Fingerprint.S) -> functor (Checksum : Checksum.S) -> sig type block_info = { size : int; blocks : (Checksum.t * Fingerprint.t) list } type transfer_instruction = STRING of string | BLOCK_NUMBERED of int val findBlocks : in_channel -> int -> block_info (* pull out blocks of a particular size *) val compress : block_info -> in_channel -> transfer_instruction list (* compress a file into transfer instructions, using block info *) val decompress : int -> in_channel -> out_channel -> transfer_instruction list -> unit (* given block size and original file, decompress transfer instructions *) end
<rsync.ml>= module Make (Fingerprint : Fingerprint.S) (Checksum : Checksum.S) = struct type block_info = { size : int; blocks : (Checksum.t * Fingerprint.t) list } type transfer_instruction = STRING of string | BLOCK_NUMBERED of int <rsync functions> end
find_all
.
<rsync functions>= (<-U) [D->] module BTab = Hashtbl.Make(Checksum) let mkBlockTable blocks = let t = BTab.create (List.length blocks) in let add k (csum, fp) = (BTab.add t csum (k, fp); k+1) in let _ = List.fold_left add 0 blocks in t
To compress, we scan the file one character at a time, while looking at a window of the right size. Before we set up suitable initial conditions, it may be easier to think about the steady state and the loop invariants. We use state variables
b
, a buffer,
q
, a queue, holding the current window of size N,
csum
, the checksum of the string in q
,
instr'
, a list of transfer instructions in reverse order, and
infile
, the file being compressed.
instr'
, followed by the contents
of b
, followed by the characters in q
, followed by the unread
characters from infile
.
instr'
is either empty or headed by a BLOCK_NUMBERED
instruction.
The startCompressing
function requires the precondition that the
buffer and queue be empty.
The implementation of the loop assumes the block table is in blocktab
.
If we hit in the block cache, we have to drain the buffer and the
queue, because it's just like starting over again at the beginning of
the file.
<compression functions>= (U->) [D->] let rec compressLoop instr' b q csum infile = try (* first case: hit in the block table *) match BTab.find_all blocktab csum with [] -> raise Not_found | candidates -> let contents = Buffer.create size in let () = Queue.iter (Buffer.add_char contents) q in let fp = Fingerprint.string (Buffer.contents contents) in let (blockNum, _) = List.find (fun (_, fp') -> fp = fp') candidates in let instr' = if Buffer.length b > 0 then STRING (Buffer.contents b) :: instr' else instr' in let instr' = BLOCK_NUMBERED blockNum :: instr' in ( Buffer.reset b ; Queue.clear q ; startCompressing instr' b q infile ) with Not_found -> <roll one character and try again>
Stepping a character simply means sliding the window while maintaining all the invariants.
<roll one character and try again>= (<-U) try let next = input_char infile in let () = Queue.add next q in let prev = Queue.take q in let () = Buffer.add_char b prev in let csum = roll csum prev next in compressLoop instr' b q csum infile with End_of_file -> finishCompressing instr' b q
To finish compressing, we drain the queue into the buffer and append it into the transfer instructions.
<compression functions>+= (U->) [<-D->] and finishCompressing instr' b q = let () = Queue.iter (Buffer.add_char b) q in List.rev (STRING (Buffer.contents b) :: instr')
On start of compression, the buffer and queue are empty.
The function fillAndSum
fills the queue and computes a checksum.
We fill the queue and start the main loop.
If there aren't enough characters to fill the queue, we finish it off.
<compression functions>+= (U->) [<-D] and startCompressing instr' b q infile = let rec fillAndSum csum k = if k = 0 then csum else let c = input_char infile in ( Queue.add c q ; fillAndSum (Checksum.append csum c) (k-1) ) in try compressLoop instr' b q (fillAndSum (Checksum.string "") size) infile with End_of_file -> finishCompressing instr' b q
<rsync functions>+= (<-U) [<-D->] let compress {size=size; blocks=blocks} infile = let blocktab = mkBlockTable blocks in let roll = Checksum.roll size in <compression functions> in let q = Queue.create () in let b = Buffer.create size in startCompressing [] b q infile
<rsync functions>+= (<-U) [<-D->] let decompress size = let buf = String.create size in fun oldfile newfile instructions -> let copyBlock k = ( seek_in oldfile (k*size) ; (try really_input oldfile buf 0 size with End_of_file -> assert false) ; output_string newfile buf ) in let rec emit l = match l with STRING s :: l -> output_string newfile s; emit l | BLOCK_NUMBERED k :: l -> copyBlock k; emit l | [] -> () in emit instructions
Given really_input
, it's easy to get all the blocks.
<rsync functions>+= (<-U) [<-D] let findBlocks infile size = let b = String.create size in let rec blocks bs' = try really_input infile b 0 size; blocks ((Checksum.string b, Fingerprint.string b) :: bs') with End_of_file -> {size=size; blocks=List.rev bs'} in blocks []
This checksum is formed from xoring together all characters. It does a terrible job distinguishing blocks, but it made it possible to test the implementation.
<xor.mli>= module M : Checksum.S
<xor.ml>= module M : Checksum.S = struct type t = int let hash = Hashtbl.hash let equal = (=) let append sum c = sum lxor (int_of_char c) let string s = let rec next k sum = try next (k+1) (append sum (String.get s k)) with Invalid_argument _ -> sum in next 0 0 let roll size sum c c' = append (append sum c) c' let toString = string_of_int end
Here's a somewhat more aggressive xor algorithm that depends on the order in which characters appear. I use 24 bits worth of checksum. I append a character by rotating to the left one bit, then xoring in the new character. This checksum makes the rsync compression run 3 times faster than the previous checksum.
<xorx.ml>= module M : Checksum.S = struct type t = int let rotl1 n = let hi = n lsr 23 in let lo = n land 0x7fffff in (lo lsl 1) lor hi let rotl k n = let hi = n lsr (24 - k) in let lo = n land (0xffffff lsr k) in (lo lsl k) lor hi let hash = Hashtbl.hash let equal = (=) let append sum c = (rotl1 sum) lxor (int_of_char c) let string s = let rec next k sum = try next (k+1) (append sum (String.get s k)) with Invalid_argument _ -> sum in next 0 0 let roll size = if size mod 24 = 0 then fun sum c c' -> (append sum c') lxor (int_of_char c) else let rot = rotl (size mod 24) in fun sum c c' -> (append sum c') lxor (rot (int_of_char c)) let toString = string_of_int end
Note the roll
implementation will perform slightly better for
block sizes that are a multiple of 24. I have sometimes observed a
3--4% improvement in practice, but the numbers are small and not
always reproducible.
To test this stuff, we'll want two implementations.
<rtest.mli>=
<rtest.ml>= [D->] module R' = Rsync.Make (Digest) (Xor.M) (* old, no longer used *) module R = Rsync.Make (Digest) (Xorx.M)
Here we have some test functions that work with files.
First, compute and show transfer instructions.
The showi
function shows the character position of a matched
block, which was most helpful for debugging.
<rtest.ml>+= [<-D->] let info size old = let inf = open_in_bin old in let b = R.findBlocks inf size in (close_in inf; b) let showi size info = let rec dots k = if k > 0 then "." ^ dots (k - size) else "" in let cvt = function (R.BLOCK_NUMBERED k) -> "<" ^ string_of_int (size * k) ^ ">" | (R.STRING s) -> dots (String.length s) in List.fold_left (fun s i -> s ^ cvt i) "" info
Here's a compressor that operates on file names.
<rtest.ml>+= [<-D->] let comp size old new' = let b = info size old in let oldf = open_in_bin old in let newf = open_in_bin new' in let c = R.compress b newf in (close_in newf; close_in oldf; c)
To measure the compression ratio, we need to be able to compute file sizes.
<rtest.ml>+= [<-D->] let fsize fname = let f = open_in fname in let rec count n = try ignore(input_char f); count (n+1) with End_of_file -> n in let k = count 0 in (close_in f; k) let ratio size old new' = let actual = fsize new' in let i = comp size old new' in let sz = function R.STRING s -> String.length s | R.BLOCK_NUMBERED k when k < 254 -> 1 | R.BLOCK_NUMBERED k when k < 65535 -> 3 | R.BLOCK_NUMBERED k -> 5 in let compressed = List.fold_left (fun k b -> 1 + sz b + k) 0 i in ignore ( Printf.printf "With %d-byte blocks & base file %s, file %s compresses to %2.1f%%\n" size old new' (100.0 *. float compressed /. float actual))
A more useful measure might be ``total communication overhead,'' which would also account for the cost of sending the fingerprints and checksums of the blocks. I haven't implemented this measure.
Here's a decompressor, also operating on files.
<rtest.ml>+= [<-D->] let decomp size original modified info = let orig = open_in_bin original in let modi = open_out_bin modified in (R.decompress size orig modi info; close_in orig; close_out modi)
Now, here's a test that compresses, decompresses, and reports the ratio.
<rtest.ml>+= [<-D] let doit size o n = let i = comp size o n in let () = decomp size o "rtest.out" i in let () = ratio size o n in ()
And finally, here's code to handle the command line and do something.
If I understood the Arg
module, I would surely use that parser
instead.
<run.ml>= let k = try int_of_string (Array.get Sys.argv 1) with Invalid_argument _ -> 32;; let o = try Array.get Sys.argv 2 with Invalid_argument _ -> "prop.orig";; let n = try Array.get Sys.argv 3 with Invalid_argument _ -> "prop.latest";; Rtest.doit k o n;;