Conduit.

Conduit is a library which wants to replace your protocol implementation. Instead to use Unix.read and Unix.write, we use:

We assume that Conduit is MirageOS-compatible (instead of Unix module).

A ping-pong client.

We will start with a getline function:

val getline : Conduit_lwt.flow ->
  ([ `Line of string | `End_of_flow ], Conduit_lwt.error) result Lwt.t

As the POSIX getline (see man 3 getline). The ping-pong client sends a line to a ping-pong server and, if it receives then "pong", it continues to talk until it has nothing to send. Otherwise, it closes the connection.

let client ~resolvers edn =
  Conduit_lwt.resolve ~resolvers edn >>? fun flow ->
  let rec go () = match input_line stdin with
    | line ->
      ( Conduit_lwt.send flow (Cstruct.of_string (line ^ "\n")) >>? fun _ ->
        Format.printf "> %s.\n%!" line ;
        getline flow >>? function
        | `Line "pong" -> Format.printf "< pong.\n%!" ; go ()
        | `Line line -> Format.printf "< %s.\n%!" ; Conduit_lwt.close flow
        | `End_of_flow -> Conduit_lwt.close flow )
    | exception End_of_file -> Conduit_lwt.close flow in
  go ()

getline can be implemented as well with Conduit.S.recv.

A ping-pong server.

A ping-pong server accepts a client and if it receives:

Conduit_lwt (and Conduit_async) provides a simple function Conduit_lwt.serve to initiate a service. Let's implement the server logic:

let handler flow =
  let rec go () =
    getline flow >>= function
    | Ok `End_of_flow | Error _ -> Conduit_lwt.close flow
    | Ok (`Line "ping") ->
      Conduit_lwt.send flow (Cstruct.of_string "pong\n") >>? fun _ -> go ()
    | Ok (`Line "pong") ->
      Conduit_lwt.send flow (Cstruct.of_string "ping\n") >>? fun _ -> go ()
    | Ok (`Line line) ->
      Conduit_lwt.send flow (Cstruct.of_string (line ^ "\n")) >>? fun _ ->
      Conduit_lwt.close flow in
  go () >>= function
  | Error err -> failwith "%a" Conduit_lwt.pp_error err
  | Ok () -> Lwt.return_unit

let server cfg ~protocol ~service =
  Conduit_lwt.serve
    ~handler:(fun flow -> handler (Conduit_lwt.pack protocol flow))
    ~service cfg

Now, we are able to start clients and a simple server. At this stage of the development, we did not choose the protocol implementation (such as a Unix.socket). We will see how to inject a protocol behind Conduit.

In our code, we use Conduit.S.pack which wants to hide the given flow. Indeed, the flow given by Conduit_lwt.serve has the concrete type Lwt_unix.file_descr so we must abstract it to a Conduit.S.flow.

Initiate a server & a client.

Conduit_lwt propose a Conduit-compatible TCP/IP protocol: Conduit_lwt.TCP (and the same goes for Conduit_async.TCP). This is your host's TCP/IP stack. Let's launch the ping-pong server with that:

let fiber ~uri =
  let host = Uri.host_with_default ~default:"127.0.0.1" uri in
  let port = Option.value ~default:8080 (Uri.port uri) in
  let cfg : Conduit_lwt.TCP.configuration =
    { Conduit_lwt.TCP.sockaddr= Unix.(ADDR_INET (inet_addr_of_string host, port))
    ; capacity= 40 } in
  let _always, run = server cfg
    ~protocol:Conduit_lwt.TCP.protocol
    ~service:Conduit_lwt.TCP.service in
  run ()

let () =
  let uri = Uri.of_string Sys.argv.(1) in  
  Lwt_main.run (fiber ~uri)

We enforce to use the TCP/IP protocol in that case and we give a service's defined configuration value to launch our ping-pong service. cfg depends on service. Indeed, the type of Conduit_lwt.TCP.service is:

type configuration = { sockaddr : Lwt_unix.sockaddr; capacity : int }

val service : (configuration, Service.t, Protocol.flow) Conduit_lwt.service

For the client, we must construct a Conduit.resolvers and give an Conduit.Endpoint.t. We will make them from an Uri.t:

let conduit_of_uri uri =
  let host = Uri.host_with_default ~default:"localhost" uri in
  let port = Option.value ~default:8080 (Uri.port uri) in
  match Uri.scheme uri with
  | Some "pg" | None ->
    let resolvers =
      Conduit.empty
      |> Conduit_lwt.add Conduit_lwt.TCP.protocol (Conduit_lwt.TCP.resolve ~port) in
    resolvers, Conduit_lwt.Endpoint.v host
  | Some scheme -> invalid_arg "Invalid scheme: %s" scheme

let fiber ~uri =
  let resolvers, edn = conduit_of_uri uri in
  client ~resolvers edn >>= function
  | Ok () -> Lwt.return_unit
  | Error err -> failwith "%a" Conduit_lwt.pp_error err

let () =
  let uri = Uri.of_string Sys.argv.(1) in
  Lwt_main.run (fiber ~uri)

Compilation & run.

Let's try our binaries, we use 2 shells ($1> and $2>):

$1> ocamlfind opt -thread -linkpkg -package conduit-lwt,uri client.ml -o client
$2> ocamlfind opt -thread -linkpkg -package conduit-lwt,uri server.ml -o server
$2> ./server pg://127.0.0.1/
$1> ./client pg://localhost/
 1> ping
 1> > ping
 1> < ping
 1> pong
 1> > pong
 1> < ping

Upgrade our protocol with crypto.

It seems that we exchange lot of secret information! So we should upgrade to TLS. At this stage, our server and our client function should not change - and this is the final goal of Conduit: abstract the protocol.

Conduit_lwt_tls.TCP provides, as Conduit_lwt.TCP two values:

val protocol :
  ( Lwt_unix.sockaddr * Tls.Config.client,
    Protocol.flow protocol_with_tls )
  protocol

val service :
  ( configuration * Tls.Config.server,
    Service.t service_with_tls,
    Protocol.flow protocol_with_tls )
  service

So, instead to use Conduit_lwt.TCP.protocol and Conduit_lwt.TCP.service, we will use them to upgrade our protocol (when the scheme is "pgs"):

server.ml
let load_file filename =
  let ic = open_in filename in
  let ln = in_channel_length ic in
  let rs = Bytes.create ln in
  really_input ic rs 0 ln ;
  close_in ic ;
  Cstruct.of_bytes rs

let config cert key =
  let cert = load_file cert in
  let key = load_file key in
  match
    (X509.Certificate.decode_pem_multiple cert, X509.Private_key.decode_pem key)
  with
  | Ok certs, Ok (`RSA key) ->
      Tls.Config.server ~certificates:(`Single (certs, key)) ()
  | _ -> failwith "Invalid key or certificate"

let fiber ~uri =
  let host = Uri.host_with_default ~default:"127.0.0.1" uri in
  let cfg ~port =
    { Conduit_lwt.TCP.sockaddr= Unix.(ADDR_INET (inet_addr_of_string host, port))
    ; capacity= 40 } in
  let _always, run = match Uri.scheme uri with
    | None | Some "pg" ->
      let port = Option.value ~default:8080 (Uri.port uri) in
      server (cfg ~port)
        ~protocol:Conduit_lwt.TCP.protocol
        ~service:Conduit_lwt.TCP.service
    | Some "pgs" ->
      let port = Option.value ~default:4343 (Uri.port uri) in
      let cfg = cfg ~port, config "server.pem" "server.key" in
      server cfg
        ~protocol:Conduit_lwt_tls.TCP.protocol
        ~service:Conduit_lwt_tls.TCP.service
    | Some scheme -> invalid_arg "Invalid scheme: %s" scheme in
  run ()

let () =
  let uri = Uri.of_string Sys.argv.(1) in  
  Lwt_main.run (fiber ~uri)

And for the client, we just need to update the function conduit_of_uri:

client.ml
let tls_config = Tls.Config.client ~authenticator:(fun ~host:_ _ -> Ok None) ()

let conduit_of_uri uri =
  let host = Uri.host_with_default ~default:"localhost" uri in
  let edn = Conduit_lwt.Endpoint.v host in
  let resolvers = match Uri.scheme uri with
    | Some "pg" ->
      let open Conduit_lwt.TCP in
      let port = Option.value ~default:8080 (Uri.port uri) in 
      let resolvers =
        Conduit.empty
        |> Conduit_lwt.add protocol (TCP.resolve ~port) in
      resolvers
    | Some "pgs" ->
      let open Conduit_lwt_tls.TCP in
      let port = Option.value ~default:4343 (Uri.port uri) in 
      let resolvers =
        Conduit.empty
        |> Conduit_lwt.add protocol (resolve ~port ~config:tls_config) in
      resolvers
    | None ->
      let module TCP = Conduit_lwt.TCP in
      let module TLS = Conduit_lwt_tls.TCP in
      let u_port = Option.value ~default:8080 (Uri.port uri) in 
      let s_port = Option.value ~default:4343 (Uri.port uri) in 
      let resolvers =
        Conduit.empty
        |> Conduit_lwt.add ~priority:10 TLS.protocol
             (TLS.resolve ~port:s_port ~config:tls_config)
        |> Conduit_lwt.add TCP.protocol (TCP.resolve ~port:u_port) in
      resolvers
    | Some scheme -> Fmt.invalid_arg "Invalid scheme: %s" scheme in
  resolvers, edn

Above, we can see that for the default case (when we don't have a scheme), we inject two protocols, the secure one (with a priority) and the unsecure one. Conduit will try the secure one and if it fails, it fallbacks to the second one. This case can appear when you want to communicate with a peer and you have multiple possibilies such as http and https.

Compilation & run.

Let's show result:

$1> ocamlfind opt -thread -linkpkg -package conduit-lwt-tls,mirage-crypto-rng.unix \
  client.ml -o client
$2> ocamlfind opt -thread -linkpkg -package conduit-lwt-tls,mirage-crypto-rng.unix \
  server.ml -o server
$2> ./server pgs://127.0.0.1/
$1> ./client localhost
 1> pong
 1> > pong
 1> < ping

Conclusion.

From a client logic and a server logic, with Conduit, we are able to abstract both over the protocol implementation. Conduit comes with several implementations (TCP/IP, TLS and SSL) but a protocol implementer can add its own protocol implementation only with the core of Conduit.

Then, from the user point-of-view, he can decide which protocol he wants to use. We see that upgrading a existing code-base to another protocol has no cost as long as the code-base use Conduit.

Of course, the end-user (our fiber functions) must be updated according new protocols but it does not imply any change on the logic of the server and the logic of the client: this is the goal of Conduit!

MirageOS compatibility.

For MirageOS, it's hard to abstract the entire stack with functors to be able to launch a simple HTTP request - and it's even more difficult with a (non-required) cryptographic layered protocol such as TLS. Conduit should help users and protocol implementers to be compatible with MirageOS.

As you can see, Conduit is not mandatory at the logic of your protocol and you are able to functorize this part with something close to Conduit.S (as cohttp does). The application of your functor with Conduit lets the user to orchestrate your implementation with others protocols (such as mirage-tcpip) without anothers functors.

Of course, Conduit is not only about MirageOS when the question of the abstraction can appear for anyone: which TLS implementation should I use? Should I enforce one of them? Again, it's about abstraction and Conduit is a response about that more generally.