Who am I?

This man has has retrograde amnesia!

What's a Logplex?

  • A high volume syslog collector and emitter
  • Messages arrive from all over the place (via syslog, HTTP)
  • Split out into logical channels / topics
  • Delivered to 'drains' attached to channels (udp, tcp syslog, HTTP)
  • Configuration continuously altered at runtime
  • Tail [-f]
  • Logplex is open source

High throughput

My usual approach

  • If you can apply back-pressure, do.
  • {active, once} is great - use it.
  • Your clients will slow down to an input rate you can handle.
  • Job done.
  • Logging systems generally can't.
  • Cry into an emacs buffer repeatedly.

A brief aside on Syslog

It's the worst

  • Sender->receiver: Data Data Data Data Data Data
  • receiver->Sender: ...
  • No application layer acknowledgements
  • When you close a connection to a receiver, particularly in the case of error, you don't know how many messages were actually delivered.
  • Even message framing is unreliable (see also)
  • At least it's pipelined
  • And has minimal en/decoding complexity
Have you seen the replacements? Yeesh - they're worstester.

The pickle

A problem restatement

  • Noone wants to slow down to guarantee log delivery
  • If we don't accept messages, the sender will drop them
  • If we close the connection, the sender will drop them (plus a few by accident)
  • A sender can represent a lot of different clients
  • Delaying or dropping messages from one sender unfairly penalizes other clients multiplexed on the same connection

Coping with unbounded input

Start with the basics:

  • Don't serialize on the input/forwarding side
  • Avoid gen_server:call to singleton processes (and equivalent operations)
  • ETS tables (with read_concurrency) are generally good

Cool. I did all that.

  • eheap_alloc: Cannot allocate 1459620480 bytes of memory (of type "heap").
  • We're now so good at keeping up with the input volume we're blowing out the message queue of the output side of the system.
  • We can't add backpressure to the input side
  • Sometimes we won't be connected to the output destination
  • Even worse, sometimes the destination we deliver to will run too slowly

Tricks

Be careful what you wish for

A simple tourniquet

Administer only when bleeding profusely

check_overload() ->
    case process_info(self(), message_queue_len) of
        {message_queue_len, N} when N > 100000 ->
            erlang:exit(stage_left);
        _ ->
            ok
    end.

Bounded buffers

A short tale of woe

Version 0

Output process

What I did first.

Version 1

Output process

The obvious thing to do.

Raw TCP Ports

  • Sometimes a receiver stops receiving
  • We'll just add a timeout...
     gen_tcp:send(Port, Data, Timeout)
  • Haha, that doesn't exist.
  • OK, well how on earth does gen_tcp work anyway?
I can't remember why I didn't use {send_timeout, N}

To the rabbithole!

send(S, Packet) when is_port(S) ->
    case inet_db:lookup_socket(S) of
        {ok, Mod} ->
            Mod:send(S, Packet);
        Error ->
            Error
    end.
  • Mod is always prim_inet
  • It stores it on the port itself with erlang:port_get_data/1 (Kinda wonder if I can find a use for that)

prim_inet

send(S, Data) ->
    send(S, Data, []).

send(S, Data, OptList) when is_port(S), is_list(OptList) ->
    try erlang:port_command(S, Data, OptList) of
        false -> % Port busy and nosuspend option passed
            {error,busy};
        true ->
            receive
                {inet_reply,S,Status} ->
                    Status
            end
    catch
        error:_Error ->
             {error,einval}
    end.

Are you thinking what I'm thinking?

send(S, Data, Timeout) ->
    true = erlang:port_command(S, Data, []),
    receive
        {inet_reply, S, Status} ->
            Status
    after Timeout ->
        {error, timeout}
    end.

For extra credit, you can make this message based by starting a timer before the port command, and then watching for completion of the send by either an inet_reply message or a timeout message.

You can check your answer against our example in logplex_tcpsyslog_drain.

For extra extra credit, you could do the same for connect.

Version 1

  • First approach to a high throughput output process
  • Make all operations asynchronous, or at least synchronous with a timeout
  • Now we don't spend much time blocked, so we can keep up with our message queue

  • Code complexity has gone up

  • We now have to distinguish between a 'connected and ready to send' state and a 'sending' state
  • We have to be very careful adding any third-party libraries to the code

Version N+1

A new hope

Version N+1

Quickly crushed

  • Added SSL, ran into a series of operations that never complete
  • Didn't blow out memory, so that's a plus
  • Do I have to go through all that async stuff again for every library?

Put children to work

Do not fail me a first time

  • Sometimes the client code for a protocol can be complicated
  • Or the protocol can require a lot of back and forth
  • You can still get yourself a send timeout by doing the erlangy thing:
    More processes

  • Move your use of the client code out to a separate process

  • (pre)spawn when you want to connect
  • Send the intermediary the data to deliver, start a timer
  • Do other things
  • Receive a completion message, send more data
  • Receive a timeout message, ...
I love erlang:start_timer/3.

Wrapping up

  • If clients refuse to slow down, they have chosen message loss as the way to balance the equation
  • They'll be less annoyed if message loss is deliberate (the VM dying is not deliberate)
  • They'll be less annoyed if message loss explicit (and you'll spend a lot of time chasing it)
  • Going full-async is possible (but comes with a cost)
  • The answer is almost always to add more processes