Contents

(Implementation of the rsync algorithm by Norman Ramsey <nr@alumni.princeton.edu>.)

The Rsync Algorithm

The rsync algorithm is a technique for reducing the cost of a file transfer by avoiding the transfer of blocks that are already at the destination [cite tridgell:rsync]. Imagine we have source and destination computers that have files X and Y respectively, where X and Y are similar.

The algorithm proceeds as follows:

  1. The destination computer divides file Y into blocks of an agreed-upon size N.
  2. For each block, the destination computer computes two functions of the block's contents:
  3. The destination computer sends a list of fingerprints and checksums to the source computer. Blocks are identified implicitly by the order in which they appear in the list.
  4. The source computer searches through file X to identify blocks that have the same fingerprints as blocks that appear in the list sent from B. The checksums are used to find candidate blocks in a single pass through file X. (Blocks with identical fingerprints are presumed to be identical.)
  5. The source computer sends instructions for reconstructing file X at the destination. These instructions avoid transmitting blocks of X that are identical to other blocks in Y.

Interfaces

Checksums

We can compute the checksum of the whole string, or we can build up a checksum incrementally by appending a character or ``rolling through'' a character stream.

<checksum.mli>=
<checksum.ml>
<checksum.ml>= (<-U)
(* module Checksum *)
module type S = sig
  include Hashtbl.HashedType
    (* gets type t, which probably fits in a machine word *)
  
  val string : string -> t
  
  val append : t -> char -> t
    (* append (string s) c = string (s ^ String.make 1 c) *)
  
  val roll : int -> t -> char -> char -> t
    (* provided r = roll (1 + String.length s), then
         r (string (String.make 1 c ^ s)) c c' = string (s ^ String.make 1 c') *)
end

The costs of append and roll should be small constants. The cost of string should be linear in the size of its argument. roll k may perform some interesting computation, the results of which should be cached.

Fingerprints

The MD5 digest could be a suitable implementation of fingerprints.

<fingerprint.mli>=
<fingerprint.ml>
<fingerprint.ml>= (<-U)
(* module Fingerprint *)
module type S = sig
  type t
  val string : string -> t       (* a reliable fingerprint *)
end

The rsync algorithm

The idea is that synchronization happens in three steps:

  1. The destination computer calls findBlocks to compute the checksums and fingerprints of the blocks of the old file. It sends this block_info to the source computer.
  2. The source computer runs compress over the new file, producing a list of transfer instructions, which it sends back to the destination computer.
  3. The destination computer runs decompress, using the transfer instructions to convert the old file to the new file.
The algorithm is parameterized over both fingerprints and checksums.

<rsync.mli>=
(* module type Rsync *)
module Make :
  functor (Fingerprint : Fingerprint.S) ->
  functor (Checksum : Checksum.S) -> sig
    type block_info = { size : int; blocks : (Checksum.t * Fingerprint.t) list }

    type transfer_instruction 
      = STRING of string
      | BLOCK_NUMBERED of int

    val findBlocks : in_channel -> int -> block_info
      (* pull out blocks of a particular size *)
    val compress : block_info -> in_channel -> transfer_instruction list
      (* compress a file into transfer instructions, using block info *)
    val decompress : 
      int -> in_channel -> out_channel -> transfer_instruction list -> unit
      (* given block size and original file, decompress transfer instructions *)
    end

Implementation

Here's the boilerplate for the implementation.

<rsync.ml>=
module Make (Fingerprint : Fingerprint.S) (Checksum : Checksum.S) = struct
  type block_info = { size : int; blocks : (Checksum.t * Fingerprint.t) list }

  type transfer_instruction 
    = STRING of string
    | BLOCK_NUMBERED of int

  <rsync functions>
end

Compression

In order to find candidate blocks quickly, we need to look up blocks by checksum. I've chosen a hash table that stores a list of (block number, fingerprint) pairs. The list-of-pairs behavior is implicit in the semantics of OCaml hash tables; all I need is to put pairs in, then get lists out with find_all.

<rsync functions>= (<-U) [D->]
module BTab = Hashtbl.Make(Checksum)
let mkBlockTable blocks =
  let t = BTab.create (List.length blocks) in
  let add k (csum, fp) = (BTab.add t csum (k, fp); k+1) in
  let _ = List.fold_left add 0 blocks in
    t

To compress, we scan the file one character at a time, while looking at a window of the right size. Before we set up suitable initial conditions, it may be easier to think about the steady state and the loop invariants. We use state variables

We maintain the following invariants:

The startCompressing function requires the precondition that the buffer and queue be empty.

The implementation of the loop assumes the block table is in blocktab. If we hit in the block cache, we have to drain the buffer and the queue, because it's just like starting over again at the beginning of the file.

<compression functions>= (U->) [D->]
let rec compressLoop instr' b q csum infile =
  try (* first case: hit in the block table *)
    match BTab.find_all blocktab csum
    with [] -> raise Not_found
    | candidates ->
        let contents = Buffer.create size in
        let () = Queue.iter (Buffer.add_char contents) q in
        let fp = Fingerprint.string (Buffer.contents contents) in
        let (blockNum, _) = List.find (fun (_, fp') -> fp = fp') candidates in
        let instr' = if Buffer.length b > 0 then STRING (Buffer.contents b) :: instr'
                     else instr' in
        let instr' = BLOCK_NUMBERED blockNum :: instr' in
          ( Buffer.reset b
          ; Queue.clear q
          ; startCompressing instr' b q infile
          )
  with Not_found ->
    <roll one character and try again>

Stepping a character simply means sliding the window while maintaining all the invariants.

<roll one character and try again>= (<-U)
try
  let next = input_char infile  in
  let ()   = Queue.add next q  in
  let prev = Queue.take q  in
  let ()   = Buffer.add_char b prev  in
  let csum = roll csum prev next  in
    compressLoop instr' b q csum infile
with End_of_file ->
  finishCompressing instr' b q

To finish compressing, we drain the queue into the buffer and append it into the transfer instructions.

<compression functions>+= (U->) [<-D->]
and finishCompressing instr' b q = 
  let () = Queue.iter (Buffer.add_char b) q in
    List.rev (STRING (Buffer.contents b) :: instr')

On start of compression, the buffer and queue are empty. The function fillAndSum fills the queue and computes a checksum. We fill the queue and start the main loop. If there aren't enough characters to fill the queue, we finish it off.

<compression functions>+= (U->) [<-D]
and startCompressing instr' b q infile =
  let rec fillAndSum csum k = 
        if k = 0 then csum
        else 
          let c = input_char infile in
            ( Queue.add c q
            ; fillAndSum (Checksum.append csum c) (k-1)
            ) in
    try 
      compressLoop instr' b q (fillAndSum (Checksum.string "") size) infile
    with End_of_file -> 
      finishCompressing instr' b q

Here we put it together.

<rsync functions>+= (<-U) [<-D->]
let compress {size=size; blocks=blocks} infile =
  let blocktab = mkBlockTable blocks in
  let roll = Checksum.roll size in
  <compression functions> in
  let q = Queue.create () in
  let b = Buffer.create size in
    startCompressing [] b q infile

Decompression

This is dead easy. For each instruction with either emit a string or copy a block from the old file.

<rsync functions>+= (<-U) [<-D->]
let decompress size =
  let buf = String.create size in
  fun oldfile newfile instructions ->
    let copyBlock k =
          ( seek_in oldfile (k*size)
          ; (try really_input oldfile buf 0 size with End_of_file -> assert false)
          ; output_string newfile buf
          ) in
    let rec emit l = match l
      with STRING s         :: l -> output_string newfile s; emit l
      |    BLOCK_NUMBERED k :: l -> copyBlock k;             emit l
      |    []                    -> ()  in
    emit instructions

Finding blocks

Given really_input, it's easy to get all the blocks.

<rsync functions>+= (<-U) [<-D]
let findBlocks infile size =
  let b = String.create size in
  let rec blocks bs' =
    try
      really_input infile b 0 size;
      blocks ((Checksum.string b, Fingerprint.string b) :: bs')
    with End_of_file ->
      {size=size; blocks=List.rev bs'} in
  blocks []

Testing

This checksum is formed from xoring together all characters. It does a terrible job distinguishing blocks, but it made it possible to test the implementation.

<xor.mli>=
module M : Checksum.S
<xor.ml>=
module M : Checksum.S = struct
  type t = int
  let hash = Hashtbl.hash
  let equal = (=)
  let append sum c = sum lxor (int_of_char c) 
  let string s = 
    let rec next k sum =
      try next (k+1) (append sum (String.get s k))
      with Invalid_argument _ -> sum in
    next 0 0
  let roll size sum c c' = append (append sum c) c'
  let toString = string_of_int
end

Here's a somewhat more aggressive xor algorithm that depends on the order in which characters appear. I use 24 bits worth of checksum. I append a character by rotating to the left one bit, then xoring in the new character. This checksum makes the rsync compression run 3 times faster than the previous checksum.

<xorx.ml>=
module M : Checksum.S = struct
  type t = int
  let rotl1 n = 
    let hi = n lsr 23 in
    let lo = n land 0x7fffff in
    (lo lsl 1) lor hi

  let rotl k n = 
    let hi = n lsr (24 - k) in
    let lo = n land (0xffffff lsr k) in
    (lo lsl k) lor hi

  let hash = Hashtbl.hash
  let equal = (=)
  let append sum c = (rotl1 sum) lxor (int_of_char c) 
  let string s = 
    let rec next k sum =
      try next (k+1) (append sum (String.get s k))
      with Invalid_argument _ -> sum in
    next 0 0
  let roll size =
    if size mod 24 = 0 then
      fun sum c c' -> (append sum c') lxor (int_of_char c)
    else
      let rot = rotl (size mod 24)
      in fun sum c c' -> (append sum c') lxor (rot (int_of_char c))
  let toString = string_of_int
end

Note the roll implementation will perform slightly better for block sizes that are a multiple of 24. I have sometimes observed a 3--4% improvement in practice, but the numbers are small and not always reproducible.

To test this stuff, we'll want two implementations.

<rtest.mli>=
<rtest.ml>= [D->]
module R' = Rsync.Make (Digest) (Xor.M)   (* old, no longer used *)
module R  = Rsync.Make (Digest) (Xorx.M)

Here we have some test functions that work with files. First, compute and show transfer instructions. The showi function shows the character position of a matched block, which was most helpful for debugging.

<rtest.ml>+= [<-D->]
let info size old =
  let inf  = open_in_bin old  in
  let b    = R.findBlocks inf size  in
  (close_in inf; b)

let showi size info =
  let rec dots k = if k > 0 then "." ^ dots (k - size) else "" in
  let cvt = function (R.BLOCK_NUMBERED k) -> "<" ^ string_of_int (size * k) ^ ">"
                   | (R.STRING s) -> dots (String.length s)
  in  List.fold_left (fun s i -> s ^ cvt i) "" info

Here's a compressor that operates on file names.

<rtest.ml>+= [<-D->]
let comp size old new' = 
  let b    = info size old  in
  let oldf  = open_in_bin old  in
  let newf = open_in_bin new'  in
  let c    = R.compress b newf  in
    (close_in newf; close_in oldf; c)

To measure the compression ratio, we need to be able to compute file sizes.

<rtest.ml>+= [<-D->]
let fsize fname = 
  let f = open_in fname in
  let rec count n = try ignore(input_char f); count (n+1) with End_of_file -> n  in
  let k = count 0 in
  (close_in f; k)

let ratio size old new' =
  let actual = fsize new'  in
  let i = comp size old new'  in
  let sz = function R.STRING s -> String.length s
                  | R.BLOCK_NUMBERED k when k < 254 -> 1
                  | R.BLOCK_NUMBERED k when k < 65535 -> 3
                  | R.BLOCK_NUMBERED k -> 5  in
  let compressed = List.fold_left (fun k b -> 1 + sz b + k) 0 i  in
  ignore (
   Printf.printf "With %d-byte blocks & base file %s, file %s compresses to %2.1f%%\n"
      size old new' (100.0 *. float compressed /. float actual))

A more useful measure might be ``total communication overhead,'' which would also account for the cost of sending the fingerprints and checksums of the blocks. I haven't implemented this measure.

Here's a decompressor, also operating on files.

<rtest.ml>+= [<-D->]
let decomp size original modified info =
  let orig = open_in_bin original  in
  let modi = open_out_bin modified  in
    (R.decompress size orig modi info; close_in orig; close_out modi)

Now, here's a test that compresses, decompresses, and reports the ratio.

<rtest.ml>+= [<-D]
let doit size o n =
  let i = comp size o n in
  let () = decomp size o "rtest.out" i in
  let () = ratio size o n in
  ()
  

And finally, here's code to handle the command line and do something. If I understood the Arg module, I would surely use that parser instead.

<run.ml>=
let k = try int_of_string (Array.get Sys.argv 1) with Invalid_argument _ -> 32;;
let o = try Array.get Sys.argv 2 with Invalid_argument _ -> "prop.orig";;
let n = try Array.get Sys.argv 3 with Invalid_argument _ -> "prop.latest";;
Rtest.doit k o n;;