Streaming OpenAI in Elixir Phoenix Part II

Date

This is part II in a series about streaming OpenAI chat completions in Elixir.

  1. In part I, we implement a module and API endpoint for streaming chat completions.
  2. Part II (this post) revisits stream parsing and why you may want stateful parsing.
  3. Part III uses Phoenix LiveView to stream completions to users connected to your site.

You may not need this

The previous post implements a working integration against OpenAI. In practice I have not encountered the problems this post describes and aims to solve. It may be best to stick with the simpler implementation and leave this as more of an intellectual exercise.

I did not thoroughly investigate, but I did not see these potential problems addressed in the community-maintained OpenAI Elixir packages either. Please correct me if I’m wrong!

Revisiting the parser

In the previous post, we implemented streaming event parsing using the following code:

defp parse(chunk) do
  chunk
  |> String.split("data: ")
  |> Enum.map(&String.trim/1)
  |> Enum.map(&decode/1)
  |> Enum.reject(&is_nil/1)
end

defp decode(""), do: nil
defp decode("[DONE]"), do: nil
defp decode(data), do: Jason.decode!(data)

The input to parse/1 (the chunk) was assumed to be zero or more complete events, e.g.:

data: {"id":"chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08","object":"chat.completion.chunk","created":1704745461,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08","object":"chat.completion.chunk","created":1704745461,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}]}

However, what if we were to receive a partial event? That is, what if the bytes that comprise one or more of the events in the stream arrive at different times? For example, let’s take the first event from above and say it arrived in stages.

Stage 1:

data: {"id":"chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08","object":"chat.completion.chunk","created":1704745

Stage 2:

461,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}

If this were to happen, the parser would fail to extract the events correctly as it currently assumes the entire event is present when invoking parse/1.

HTTP buffering

Some server environments will buffer data and send the data to the client once the buffer reaches a certain size. Even when a web app’s server does not do this, it’s possible there is a proxy (or some other middleman) that sits between the source server and the client. If this is the case, it’s possible that a client reading from a stream of events will receive portions of an event at a time and thus be responsible for stringing them back together.

If we want a robust implementation that can withstand this scenario, we’ll need to introduce state into our parser.

Note: In practice, I frequently work with streaming APIs and have implemented zero-dependency clients against OpenAI, Anthropic, Cohere, Google, and more without ever experiencing this issue.

Updating our parser

Before introducing state, let’s rework the parser to remove the assumption that an entire event is present at once.

Among the greatest joys of working with Elixir is iterating over binaries (strings) and using pattern matching to extract or otherwise modify the input in some way. Whether or not we need to optimize our parser around the above scenario, reworking it in this way will be a fun exercise!

Let’s reimplement the parser to take a buffer argument and iterate over the binary using pattern matching and recursion. We’ll add two new functions, parse/2 and parse/3.

def parse(buffer, chunk) do
  parse(buffer, chunk, [])
end

defp parse(buffer, chunk, events) do
  # TODO
end

parse/2 will be the public interface for callers, while parse/3 will implement the parsing logic. The arguments are:

defp parse([buffer | "\n"], "\n" <> rest, events) do
  case IO.iodata_to_binary(buffer) do
    "data: [DONE]" ->
      parse([], rest, events)

    "data: " <> event ->
      parse([], rest, [Jason.decode!(event) | events])
  end
end

defp parse(buffer, <<char::utf8, rest::binary>>, events) do
  parse([buffer | <<char::utf8>>], rest, events)
end

defp parse(buffer, "", events) do
  {buffer, Enum.reverse(events)}
end

That’s a lot of functionality in only a few lines of code, so let’s unpack it. At a high-level:

Pattern matching is used extensively here, not only in the function heads but also in a case statement. We destructure binaries in multiple places using declarative syntax. Modifiers like ::utf8 are used to ensure that we don’t naively destructure a subset of a single character but entire characters at a time (since utf8 is variable-width). For more on this, I recommend the Elixir guides and The Absolute Minimum Every Software Developer Must Know About Unicode in 2023.

Lastly, we use iodata as our buffer for efficiency. Naively, we could have used a string as our buffer and appended each utf8 character to it as we went. However, doing so would result in an explosion in memory consumption because each append operation would copy the entire string and create a new one with the new character appended. Instead, we add each character to a nested list with one call at the end to convert it into a string.

An aside on binary pattern matching

I’ve written many low-level stream parsers in TypeScript using TransformStreams and Uint8Arrays and I can confidently say it is SO MUCH MORE enjoyable to do this in Elixir.

This is one of those simple ideas which after you have seen it makes you wonder how any language could be without it.

Joe Armstrong on binary pattern matching and the bit syntax in A History of Erlang

A quick test

To make this concrete and ensure it’s working, let’s add a quick unit test. Create test/my_app/openai_test.exs and add the following code.

defmodule MyApp.OpenaiTest do
  use MyApp.DataCase

  alias MyApp.Openai

  test "can parse complete chunks" do
    event_one = %{
      "choices" => [
        %{
          "delta" => %{"content" => "Hello"},
          "finish_reason" => nil,
          "index" => 0,
          "logprobs" => nil
        }
      ],
      "created" => 1_704_745_461,
      "id" => "chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08",
      "model" => "gpt-3.5-turbo-0613",
      "object" => "chat.completion.chunk",
      "system_fingerprint" => nil
    }

    event_two = %{
      "choices" => [
        %{"delta" => %{"content" => "!"}, "finish_reason" => nil, "index" => 0, "logprobs" => nil}
      ],
      "created" => 1_704_745_461,
      "id" => "chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08",
      "model" => "gpt-3.5-turbo-0613",
      "object" => "chat.completion.chunk",
      "system_fingerprint" => nil
    }

    chunk = """
    data: #{Jason.encode!(event_one)}

    data: #{Jason.encode!(event_two)}

    data: [DONE]

    """

    assert {[], [^event_one, ^event_two]} = Openai.parse([], chunk)
  end
end

Hopefully this example clarifies the expected inputs and outputs to parse/2.

Swapping out parse/1

At this point, we can replace the previous parsing logic, parse/1, with parse/2.

 def chat_completion(request, callback) do
   Req.post(@chat_completions_url,
     json: set_stream(request, true),
     auth: {:bearer, api_key()},
     into: fn {:data, data}, acc ->
-      Enum.each(parse(data), callback)
+      {_buffer, events} = parse([], data)
+      Enum.each(events, callback)
       {:cont, acc}
     end
   )
 end

- defp parse(chunk) do
-   chunk
-   |> String.split("data: ")
-   |> Enum.map(&String.trim/1)
-   |> Enum.map(&decode/1)
-   |> Enum.reject(&is_nil/1)
- end
-
- defp decode(""), do: nil
- defp decode("[DONE]"), do: nil
- defp decode(data), do: Jason.decode!(data)

Introducing state

It’s time to address the problem we described above: If a streaming chunk arrives containing an incomplete event, our previous parsing logic will fail. Our new parsing logic will also fail as it is currently written, but we can change that by introducing state.

Before we continue, let’s add a test that makes it clear where state is needed.

test "can parse incomplete chunks" do
  chunk_one =
    """
    data: {"id":"chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08","object":"chat.completion.chunk","created":1704745
    """
    |> String.trim()

  chunk_two = """
  461,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}

  """

  expected_event = %{
    "choices" => [
      %{
        "delta" => %{"content" => "Hello"},
        "finish_reason" => nil,
        "index" => 0,
        "logprobs" => nil
      }
    ],
    "created" => 1_704_745_461,
    "id" => "chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08",
    "model" => "gpt-3.5-turbo-0613",
    "object" => "chat.completion.chunk",
    "system_fingerprint" => nil
  }

  assert {buffer, []} = Openai.parse([], chunk_one)
  assert {[], [^expected_event]} = Openai.parse(buffer, chunk_two)
end

In this test case, notice how it takes two calls to parse/2 in order to construct a single event, with the buffer from the first call passed to the second. This is where we’ll need to keep state—between calls to parse/2.

Admittedly, this is where Elixir can feel more difficult than a language like TypeScript, Go, etc. Since Elixir’s data is immutable, we cannot simply update a variable or modify an existing data structure. To solve for this, Elixir—more accurately, the underlying virtual machine—has a notion of processes, and processes are stateful. While this feels like friction at first, processes are really useful in ways beyond state and thus we often gain other benefits when leveraging them (e.g., concurrency, fault tolerance).

Thankfully, we do not have to deal with most of the mechanics of processes because Elixir provides a built-in abstraction around state called agents. Let’s update chat_completion/2 to use an agent.

def chat_completion(request, callback) do
  # Initialize buffer state
  {:ok, agent} = Agent.start_link(fn -> [] end)

  response =
    Req.post(@chat_completions_url,
      json: set_stream(request, true),
      auth: {:bearer, api_key()},
      into: fn {:data, data}, acc ->
        # Get previous buffer value
        buffer = Agent.get(agent, & &1)

        {buffer, events} = parse(buffer, data)
        Enum.each(events, callback)

        # Update buffer value with the result from calling parse/2
        :ok = Agent.update(agent, fn _ -> buffer end)

        {:cont, acc}
      end
    )

  # Make sure we shut the agent down
  :ok = Agent.stop(agent)

  response
end

The agent preserves the buffer between the arrival of different chunks. With that, our parser is now stateful and can handle chunks containing incomplete events.

Conclusion

Parsing binaries in Elixir is some of my favorite code to write (I wrote a parser for the KDL language largely for this reason). Perhaps that’s because of all the times I’ve had to do it without the luxury of pattern matching and the bit syntax.

The final part in this series takes our streaming chat completions and renders them in a UI using LiveView.