AnyCable off Rails: connecting Twilio streams with Hanami

Cover for AnyCable off Rails: connecting Twilio streams with Hanami

Topics

Share this post on


Translations

If you’re interested in translating or adapting this post, please contact us first.

WebSockets has been around for years, but its popularity isn’t showing any signs of fading. They still power most of the real-time web, primarily because they’re supported by 98% of browsers in use today (Opera Mini users, we miss you). However, browsers, mobile applications, and other clients aren’t the only ones using this technology. There can be situations when communication between servers require a high throughput channel—and WebSockets come into play. In this post, we explore how to tame a server-to-server WebSockets communication and put it under the control of a Ruby app with the help of AnyCable.

Today, building a WebSocket-driven web application is a piece of cake: major web frameworks provide everything you need out of the box. There are tons of client libraries and PaaS services—the surplus of choice is the only problem.

Dealing with server-to-server connections is not that simple: we need to come up with a protocol and implement both server and client solutions. It becomes even more complicated when we have to integrate with a third-party provider using a unique protocol: we’re likely to build everything from scratch.

What can a WebSocket-based integration between servers look like? One example is the so-called (okay, we made it up) WebSocketHooks: a concept similar to webhooks, but instead of performing an HTTP request to your server, a third party establishes a WebSocket connection and streams data. Thus, you need to somehow handle these connections in your, say, Rails application, but you can’t rely on Action Cable because this third party doesn’t speak the Action Cable protocol.

And that’s where AnyCable enters the story. The “Any” part of AnyCable is not a marketing gimmick, it’s actually reflects the true nature of the project, and its core motto—connect everyone with anything. Today, you’ll learn how to build a WebSocket application on top of AnyCable-Go to support custom protocols and build sophisticated data processing workflows—all controlled by your Ruby application. We’ll cover the following topics:

AnyCable-Go as a library

AnyCable-Go is a WebSocket server that powers AnyCable applications. It’s a core component that is responsible for handling connections, dealing with broadcasts, and so on. Usually, AnyCable-Go is used as a standalone service launched via the official CLI, anycable-go, or a Docker image. However, if you take a look at the source code, you can see that the application has a modular architecture and can be seen itself as a framework for building. There are over a dozen packages responsible for different aspects of the application lifecycle:

$ ls -l

cli/
common/
encoders/
metrics/
node/
pubsub/
rpc/
server/
...

The list above shows the most important components of AnyCable-Go and sheds light on its architecture. Unsurprisingly, the cli package is responsible for the command-line interface: parsing options, configuring, and running the application. The rpc and pubsub components are responsible for RPC communication and receiving broadcast messages, respectively. If we need to build a new broadcasting implementation (like we did with NATS), we just need to build a new pubsub implementation. There’s no need to touch any other components. The same holds for RPC controllers.

The encoders package is of particular importance for us: it abstracts the concept of Encoder, a component responsible for transforming WebSocket messages to and from internal representation (which is described in the common package, by the way). (In the OSS version of AnyCable, we only have a JSON encoder, but the Pro version comes with three more: two for binary protocols and one more for Apollo GraphQL support.)

How can you add a custom encoder for AnyCable-Go? Well, in Go, we cannot simply write a new .go file, put it somewhere, and load at runtime. We need to rebuild the entire application. One option is to fork the anycable/anycable-go project and maintain it till the end of time. The risks and maintainability overhead of this approach are huge. So, we need a better option—let’s use AnyCable-Go as a library!

Since AnyCable-Go was originally designed to be distributed as a service, and not a library, it can be tricky to get started with it in a blank Go project. Luckily, we have you covered—let us introduce the AnyCable-Go scaffold!

AnyCable-Go scaffold is a minimal Go application that demonstrates the most useful APIs and integration points. With its help, you can focus on implementing your extension right away without thinking of how to glue pieces together. It also comes with useful CI workflows, tests and linters configured, and a Makefile with useful commands.

Let’s explore the most important bits of the source code.

NOTE: Here and below we omit parts of the source code (e.g., error handling) for better readability.

In the cmd package, we can see the initialization code for the binary:

func main() {
  conf := config.NewConfig()

  anyconf, err, ok := acli.NewConfigFromCLI(
    os.Args,
    acli.WithCLIName("mycable"),
    acli.WithCLIUsageHeader("MyCable, the custom AnyCable-Go build"),
    acli.WithCLIVersion(version.Version()),
    acli.WithCLICustomOptions(cli.CustomOptions(conf)),
  )

  // error handling

  if err := cli.Run(conf, anyconf); err != nil {
    fmt.Fprintf(os.Stderr, "%v", err)
    os.Exit(1)
  }
}

Here you can extend the set of configuration parameters for your application (WithCLICustomOptions) and change the metadata information. So, if you build the app (make build) and run dist/mycable -h you can see all the AnyCable settings listed along with yours.

The cli package contains the application initialization code:

func initAnyCableRunner(appConf *config.Config, anyConf *aconfig.Config) (*acli.Runner, error) {
  opts := []acli.Option{
    acli.WithDefaultSubscriber(),
    acli.WithWebSocketEndpoint("/ws", myWebsocketHandler(appConf)),
  }

  if appConf.FakeRPC {
    opts = append(opts, acli.WithController(func(m *metrics.Metrics, c *aconfig.Config) (node.Controller, error) {
      return fake_rpc.NewController(), nil
    }))
  } else {
    opts = append(opts, acli.WithDefaultRPCController())
  }

  return acli.NewRunner(anyConf, opts)
}

This is where you inject your custom logic. The example code demonstrates how to define an additional WebSocket endpoint ("/ws") with a custom handler. The built-in "/cable" endpoint would work, too (we’ll use this feature in our demo application, see below). The handler code shows how to attach a custom encoder and an executor to a WebSocket session:

func myWebsocketHandler(config *config.Config) func(n *node.Node, c *aconfig.Config) (http.Handler, error) {
  return func(n *node.Node, c *aconfig.Config) (http.Handler, error) {
    extractor := ws.DefaultHeadersExtractor{Headers: c.Headers, Cookies: c.Cookies}

    executor := custom.NewExecutor(n)

    return ws.WebsocketHandler([]string{}, &extractor, &c.WS, func(wsc *websocket.Conn, info *ws.RequestInfo, callback func()) error {
      wrappedConn := ws.NewConnection(wsc)
      session := node.NewSession(n, wrappedConn, info.URL, info.Headers, info.UID)

      session.SetEncoder(custom.Encoder{})
      session.SetExecutor(executor)

      // Invokes Authenticate RPC method
      _, err := n.Authenticate(session)

      if err != nil {
        return err
      }

      return session.Serve(callback)
    }), nil
  }
}

What is an executor? Well, not every protocol can be directly translated into Action Cable communication flow. An executor solves this problem by intercepting incoming messages and translating them into RPC calls, or handling them inside the Go application.

The starter code also provides a fake RPC controller implementation. This can be used to stub RPC calls, so you can test the Go application in isolation.

Now, that we’ve learned the basics of AnyCable-Go, let’s build something cool!

Processing Twilio Media streams with AnyCable

It’s not easy to find an example of a WebSocketHook in the wild. Okay, we’ve had experience with just one—Twilio Media streams.

Twilio provides a collection of communication-related services, from old-school SMS messaging and phone calls to email marketing and real-time video tools. The Programmable Voice APIs allow you to make and receive phone calls. So, with Twilio, you can build voice assistants, auto-responders or other kinds of virtual assistants.

One of the distinguishing features of the Twilio Voice API is the ability to receive voice data in real-time via media streams. A media stream can be created for a phone call to send audio bytes and metadata via a WebSocket connection to your server. How can we leverage this feature?

Let’s build a phone call monitoring application. The app will provide a dashboard to see live calls (made via Twilio) in real-time and also will allow you to “peek” at the calls—see transcripts of what’s been said, and also in real-time.

The stack we’ll use today includes the following components, besides AnyCable and Twilio:

  • Vosk server: it provides offline speech recognition capabilities.
  • Hanami: a Ruby web framework. Why should everyone just use Rails?

The complete data flow looks like this:

  • The Hanami application initiates a phone call (our robot asks the recipient an important question: “Why do you love Ruby?“)
  • Twilio initiates a media stream.
  • AnyCable accepts the media stream’s WebSocket connection and authenticates it (by calling Hanami via AnyCable RPC).
  • AnyCable consumes media packets, sends them to Vosk, and transmits the recognition results back to Hanami app via RPC.
  • As the phone call ends, Twilio closes the media stream’s connection and AnyCable notifies Hanami about it (via RPC, as always).
Twilio AnyCable application architecture

As you can see, we try to keep as little logic as possible in the AnyCable-Go application; all the business-logic aspects are delegated to the Ruby app. This is an important motivation behind using AnyCable as a framework for handling Twilio media streams: you can easily integrate this service with your existing Ruby or Rails application without spreading the logic across applications.

Using AnyCable as a framework for building a real-time service simplifies the integratation of this service with the existing Ruby/Rails application and helps keep business-logic in one place.

The final source code can be found on GitHub. Let’s look at the most interesting parts, starting with the Go application.

Translating from Twilio to AnyCable

The first required component for our application is Twilio encoder. Twilio has decent documentation; we can find the specification of the incoming WebSocket messages there and use it as a reference for our encoder.

We define a Go struct for each message type and then use a generic DecodedMessage struct as a container for incoming messages:

type DecodeMessage struct {
  Event     string `json:"event"`
  StreamSID string `json:"streamSid"`

  Start StartPayload `json:"start,omitempty"`
  Media MediaPayload `json:"media,omitempty"`
  Stop  StopPayload  `json:"stop,omitempty"`
  Mark  MarkPayload  `json:"mark,omitempty"`
}

Then, in the encoder’s Decode function, we transform the DecodedMessage into a CommonMessage—this is AnyCable’s internal representation of incoming messages:

func (Encoder) Decode(raw []byte) (*common.Message, error) {
  twMsg := &DecodeMessage{}

  if err := json.Unmarshal(raw, &twMsg); err != nil {
    return nil, err
  }

  var data interface{}

  switch twMsg.Event {
  case StartEvent:
    data = twMsg.Start
  case MediaEvent:
    data = twMsg.Media
  case MarkEvent:
    data = twMsg.Mark
  case StopEvent:
    data = twMsg.Stop
  }

  msg := common.Message{Command: twMsg.Event, Identifier: twMsg.StreamSID, Data: data}

  return &msg, nil
}

The Twilio commands (“start”, “media”, etc.) do not match the Action Cable commands (“subscribe”, “message”, etc.). Moreover, we do not have any subscriptions for Twilio streams. So, we need to treat the incoming messages stream differently. For that, we need to define an executor:

// Handling Twilio events and transforming them into Action Cable commands
type Executor struct {
  node node.AppNode
  conf *config.Config
}

// HandleCommand is reponsible for handling incoming messages; here msg has been decoded
// with the Twilio encoder
func (ex *Executor) HandleCommand(s *node.Session, msg *common.Message) error {
  // ...
  if msg.Command == StartEvent {
    // ...
  }

  if msg.Command == MediaEvent {
    // ...
  }

  // Ignore everything else
  return nil
}

We only care about two events: “start” and “media”. The first, “start”, carries metadata about the call, which we can use for authentication. “media” messages contain actual audio bytes. Let’s talk a bit about the speech recognition part of the service.

Vosk integration

Disclaimer: The implementation of this recognition service is meant for demonstration purposes only. Do not use this in production.

We created a separate component called Streamer, which is responsible for consuming audio bytes and sending them to a Vosk server via gRPC.

There are two API functions: KickOff() and Push(msg *Packet). The first one creates a gRPC client and starts a bi-directional gRPC stream (StreamingRecognize):

func (s *Streamer) KickOff(ctx context.Context) error {
  // ...
  conn, _ := grpc.Dial(s.config.VoskRPC, dialOptions...)
  s.client = vosk.NewSttServiceClient(conn)

  stream, _ := s.client.StreamingRecognize(cancelCtx)

  stream.Send(&vosk.StreamingRecognitionRequest{
    StreamingRequest: &vosk.StreamingRecognitionRequest_Config{
      Config: &vosk.RecognitionConfig{
        Specification: &vosk.RecognitionSpec{
          SampleRateHertz: 8000,
          PartialResults:  s.config.PartialRecognize,
        },
      },
    },
  })

  s.stream = stream

  go s.readFromStream()

  return nil
}

Note that we also send the first message over the stream to configure the recognition service. We must provide the audio sample rate (Twilio uses 8kHz) and can also turn partial results on or off. When partial results are on, the recognition results arrive quickly, but they can be incomplete and less accurate; there are multiple results for the same phrase.

In this function, we also start a Go routine to read results from the stream:

func (s *Streamer) readFromStream() {
  for {
    resp, err := s.stream.Recv()

    if err == nil {
      chunk := resp.GetChunks()[0]
      alt := chunk.Alternatives[0]

      if alt.Text == "" && chunk.Final {
        s.log.Debugf("recognition completed")
        break
      }

      if alt.Text != "" {
        s.sendResultFunction(&Response{Message: alt.Text, Final: chunk.Final, Event: "transcript"})
      }
    } else {
      // error handling
      break
    }
  }

  s.conn.Close()
}

The sendResultFunction function is a callback we defined in the executor (see below).

Finally, the Push() function:

func (s *Streamer) Push(msg *Packet) error {
  s.buf.Write(msg.Audio)

  if s.buf.Len() > bytesPerFlush {
    s.stream.Send(&vosk.StreamingRecognitionRequest{
      StreamingRequest: &vosk.StreamingRecognitionRequest_AudioContent{
        AudioContent: s.buf.Bytes(),
      },
    }

    s.buf.Reset()
  }

  return nil
}

We buffer the audio before sending it to Vosk in order not to overload the recognition service with requests. For transcription purposes, splitting audio into hundred-millisecond chunks is totally sufficient.

Now, let’s show the “media” part of the executor:

if msg.Command == MediaEvent {
  twilioMsg := msg.Data.(MediaPayload)

  var t *streamer.Streamer

  if rawStreamer, ok := s.InternalState["streamer"]; ok {
    t = rawStreamer.(*streamer.Streamer)
  }

  audioBytes, _ := base64.StdEncoding.DecodeString(twilioMsg.Payload)

  err = t.Push(&streamer.Packet{Audio: g711.DecodeUlaw(audioBytes)})

  return err
}

We must perform some manipulations with the audio bytes before sending them to the streamer. Twilio sends audio/x-mulaw bytes as base64 encoded strings, so, first, we need to decode them. Then, we must convert mulaw to PCM (the only codec supported by Vosk out-of-the-box).

As you can see, we use the streamer struct stored in the session’s internal estate: how do we put it there?

Authentication and streamer initialization

Security matters. Our WebSocket server is open to the public (so Twilio can reach it), thus, we need to protect it from unauthorized access.

As soon as Twilio sends a “start” message, we must authenticate the call. For that, we obtain the Twilio account SID (unique identifier) and send it over to AnyCable RPC (Hanami). In Ruby, we can compare it with the known Twilio account identifier, and approve or reject the connection:

if msg.Command == StartEvent {
  start, _ok := msg.Data.(StartPayload)

  // We add account SID as a header to the sesssion.
  // So, we can access it via request.headers['x-twilio-account'] in Ruby.
  s.GetEnv().SetHeader("x-twilio-account", start.AccountSID)
  res, err := ex.node.Authenticate(s)

  if err != nil {
    return err
  }

  // We need to perform an additional RPC call to initialize the channel subscription
  // and notify about the call start.
  ex.node.Subscribe(s, &common.Message{Identifier: channelId(start.CallSID), Command: "subscribe"})

  ex.initStreamer(s, start.CallSID)

  return nil
}

We use the call SID as a part of the channel identifier, so in Ruby, we’ll be able to access it via params[:sid]. This is how we distinguish calls from each other.

The initStreamer function does exactly what it says it does. The part worth showing here is the transcription result callback:

func (ex *Executor) initStreamer(s *node.Session, sid string) error {
  identifier := channelId(sid)

  st := streamer.NewStreamer(ex.conf)

  st.OnResponse(func(response *streamer.Response) {
    _, performError := ex.node.Perform(s, &common.Message{
      Identifier: identifier,
      Command:    "message",
      Data: string(
        utils.ToJSON(map[string]interface{}{
          "action": "handle_message",
          "result": response,
        })),
    })
  })

  st.KickOff(context.Background())

  s.InternalState["streamer"] = st

  return nil
}

We use the node.Peform() function, which performs the respective RPC call under the hood: the #handle_mesage method will be called on a channel object in Ruby.

That’s pretty much it for the Go application. But before jumping into the Ruby part, let’s talk about how we can test in isolation.

Using wsdirector to emulate Twilio media streams

Developing a feature which heavily relies on a third-party service is always painful. How can you test an application which transcribes a phone call in real-time? You’ll be constantly calling yourself and speaking gibberish on the phone (don’t do this in a public workspace 🙃). It’s not only incovenient, but also time consuming. We Rubyists always look for ways to improve our productivity, and this situation is no exception.

AnyCable is more than a service or a library, it’s an ecosystem. There are plenty of useful tools for any real-time occasions. The one that helped us with testing Twilio media streams is wsdirector. We spent a few minutes and recorded a handful of fixtures: media stream dumps in YAML format. Now, instead of making real phone calls, we can just run a wsdirector CLI to emulate them:

wsirector -f etc/fixtures/wsdirector/ruby.yml -u ws://localhost:8080/streams

This was especially helpful to test and tune our Vosk integration. As a side effect, you don’t need a Twilio account to play with the demo application—just make the calls via wsdirector!

Faking Vosk

Speaking of DX, we’ve also made it possible to avoid running a Vosk server (which eats a lot of RAM and is painfully slow within Docker on M1 Macs) by creating a fake Vosk server with Ruby and grpc_kit. Thanks to gRPC, all we need to implement is a Ruby class with the #streaming_recognize method; most of the code we autogenerated using gRPC tools.

Our fake Vosk uses ffaker to generate random phrases in response to incoming requests. So, combining wsirector, a fake RPC controller, and a fake Vosk server, we can test the Go app completely in isolation.

Connecting AnyCable to Hanami

Usually, we integrate AnyCable into Rails applications. (That also was the case for our original work on Twilio media streams for a client.) However, we decided to add an additional twist to this article and go off the Rails, so to speak.

At Evil Martians, we’ve been familiar with Hanami since its early days (and we even used it in production back when it was still called Lotus), but the recent 2.0 release was new to us. So, we decided to give it a try.

The Hanami documentation is a good starting point. We followed the guide closely to bootstrap an application:

$ hanami new kaisen
...

We did skip the part related to persistence—we don’t need it for this demo.

We then faced our first challenge—Hanami, as of v2.0, doesn’t provide a view layer at all (it’s planned for 2.1). We wanted to keep things simple and avoid adding yet another (frontend) application; we were looking for a full-stack way of building Hanami web applications. The search wasn’t fruitful, so we decided to build it from scratch.

Using Phlex for views

Phlex is a Ruby framework for building views. It’s relatively new, but recently hit its first major release; so, it’s ready to be a part of your application.

In Phlex, each view is represented as a Ruby class. Here we can see some similarities to View Component. Unlike View Component though, Phlex also allows you to declare HTML via Ruby by using methods and blocks. Thus, you don’t need to switch between multiple files, a template and a Ruby class, to modify a component.

Here is, for example, our main application view class:

class Show < View
  option :call_sid, optional: true
  option :phone, optional: true

  def template
    div(class: "min-w-full flex flex-row") do
      div(class: "w-1/3 border-r border-red-100 mr-4") do
        a(href: path_for(:calls)) { h2(class: "font-bold text-2xl mb-5") { "Calls" } }

        render Form.new(phone:)

        hr(class: "border-red-100 mt-2")

        div(id: "calls", class: "pr-2") do
          stream_from("calls")
        end
      end

      render Events.new(call_sid:)
    end
  end
end

If you’ve ever worked with React, you’re probably experiencing flashbacks right now.

We can organize views into components (see Form and Events above), use OOP features, and so on. For example, our base View class contains some helpers and adds dry-initializer for more convenient constructor parameter declaration:

module Kaisen
  class View < Phlex::HTML
    extend Dry::Initializer

    private

    def path_for(...) = ::Hanami.app["routes"].path(...)
  end
end

Finally, we added a helper to our base action class to infer a view from the action. Here’s how we use it:

module Calls
  class Show < Kaisen::Action
    def handle(request, response)
      call_sid = request.params[:id]
      response.body = phlex(locals: {call_sid:})
    end
  end
end

The interface is obviously inspired by Rails (e.g., locals). We believe that it’s better to stick to well-known patterns to reduce mental overhead for developers switching between frameworks.

(Let’s not dilute the post with unnecessary code snippets.The helper implementation can be found in the source code.)

Using Vite for assets

A view layer consists, not only of HTML, but also of assets (JavaScript, CSS, etc.). This challenge was the simplest one to overcome—Vite is the way. Luckily, Vite Ruby is a framework-agnostic project, so we knew we’d be able to make it work with Hanami.

We found the existing vite-hanami gem, which was built for Hanami 1.3, and cherry-picked some bits from it to make Vite Ruby work with Hanami 2.0. Since Vite depends on the HTML implementation, our Vite helpers rely on Phlex. For example:

def vite_client
  return unless src = vite_manifest.vite_client_src

  script(src: src, type: "module")
end

def vite_javascript(name, **options)
  entries = vite_manifest.resolve_entries(*name, type: :javascript)
  return unless entries

  entries.first.last.each do |src|
    script(src:, **options)
  end
end

The full source code can be found here.

We also found that Hanami comes with a very restrictive Content-Security Policy by default, so we had to adjust it to allow serving Vite assets:

environment :development do
  # Allow @vite/client to hot reload changes in development
  config.actions.content_security_policy[:script_src] += " 'unsafe-eval' 'unsafe-inline'"
  config.actions.content_security_policy[:connect_src] += " ws://#{ ViteRuby.config.host_with_port }"
  config.actions.content_security_policy[:style_src] += " 'unsafe-eval'"
end

Finally, to serve assets, we added a couple of Rack middlewares:

environment :development do
  config.middleware.use(ViteRuby::DevServerProxy) if ViteRuby.run_proxy?
  config.middleware.use Rack::Static, { urls: ["/vite-dev/"], root: "public" }
end

The Rack::Static middleware is used to serve precompiled assets if there is no Vite dev server running.

Getting reactivity ready with Cable Ready

Although we configured Vite and made it possible to use JS in our Hanami application, we decided to rely on as little JS code as possible. Instead, we wanted to follow the HTML-over-the-wire approach.

Hotwire was our first candidate. The JavaScript part of Hotwire (the most important part) is not coupled with Rails; thus, it’s possible to use Hotwire with a web application built with any framework, not only Rails. Nevertheless, we decided to go all in on Rails alternatives. Thus, we chose Cable Ready.

Cable Ready 5.0 comes with a #cable_ready_stream_from helper which has a similar interface to Turbo Streams. That means we can subscribe to server updates by dropping an HTML element on the page, no Javascript required.

Then, we can send Cable Ready operations from the server to perform DOM modifications. Cable Ready comes with dozens of operations out of the box, so you can do pretty much anything. (And if something is missing, you can always define a custom operation).

Setting up Cable Ready on the client side didn’t require a lot of effort:

import CableReady from 'cable_ready';
import { createConsumer } from "@anycable/web";

const consumer = createConsumer();
CableReady.initialize({ consumer });

Note that we also added the AnyCable client library as a WebSocket client implementation.

Unfortunately, we couldn’t use the cable_ready gem: it depends on Rails components, such as Action Pack and Active Support. (It could’ve work out, but we decided to stay Rails-free.)

To use the #cable_ready_stream_from feature, we had to re-implement the stream signing functionality. Cable Ready uses the MessageVerifier class from Active Support for that. Our signer needs only Base64 and OpenSSL:

class StreamName
  def signed(name)
    data = ::Base64.strict_encode64(name.to_json)
    digest = generate_digest(data)
    "#{data}--#{generate_digest(data)}"
  end

  private

  def generate_digest(data)
    require "openssl" unless defined?(OpenSSL)
    OpenSSL::HMAC.hexdigest(OpenSSL::Digest::SHA256.new, ::Hanami.app["settings"][:cable_ready_sign_key], data)
  end
end

The implementation is fully compatible with the one that comes with the cable_ready gem.

Backporting operations (enough for our demo) resulted in some copy-pasting. You can find the result here.

To make it easy to access the #cable_ready object anywhere in the codebase, we created a Hanami provider:

Hanami.app.register_provider(:cable_ready) do
  prepare do
    require "cable_ready/hanami"
  end

  start do
    broadcaster = Kaisen::CableReady::Hanami::Broadcaster.new
    stream_name = Kaisen::CableReady::Hanami::StreamName.new

    register "cable_ready", broadcaster
    register "cable_ready_stream_name", stream_name
  end
end

Providers are the way to configure application dependencies. They may look similar to Rails initializers, but they are more powerful and better designed (no global state, lifecycle events, etc.).

Now, you can add the #cable_ready broadcaster to your class by injecting it as a dependency:

class MyClass
  include Deps["cable_ready"]

  def broadcast_something = cable_ready.action(...).broadcast_to("test")
end

Finally, we added the <cable-ready-stream-from> HTML element support to Phlex:

class View < Phlex::HTML
  register_element :cable_ready_stream_from

  private

  def stream_from(name)
    cable_ready_stream_from(identifier: ::Hanami.app["cable_ready_stream_name"].signed(name))
  end
end

With this configuration, we can drop #stream_from(name) in any view to subscribe to the stream updates. The WebSocket client will automatically be initiated and connected to our Go application (to the /cable endpoint serving regular Action Cable clients). At this point, we don’t even need an RPC server: AnyCable supports signed Cable Ready streams out of the box.

That said, we still need a broadcasting component to send updates from Hanami to AnyCable.

Integrating AnyCable via LiteCable

Our Go application uses AnyCable RPC to authenticate streams and send transcription results. Thus, we need to define a channel to handle these requests.

For that, we can use Lite Cable—a lightweight implementation of Action Cable. It comes with the same Connection and Channel abstractions and supports the minimal viable subset of the Action Cable API.

Here is the definition of the Connection class in our Hanami application:

class Connection < LiteCable::Connection::Base
  def connect
    sid = request.env["HTTP_X_TWILIO_ACCOUNT"]
    return unless sid

    twilio_account_sid = Hanami.app["settings"].twilio_account_sid
    reject_unauthorized_connection unless sid == twilio_account_sid
  end
end

That’s where we check that the media stream connection is made from our Twilio account. We read the value of the x-twilio-account HTTP header (via request.env), and then, only if it’s present, we compare it with the application’s Twilio account ID.

We also define a Twilio channel to handle events from the stream and broadcast updates via Cable Ready:

class Twilio < Channel
  def subscribed
    cable_ready.append(
      selector: "#calls",
      html: render_call(call_sid:)
    ).broadcast_to("calls")

    cable_ready.append(
      selector: "#events",
      html: render_event(text: "Call started", event_type: "start")
    ).broadcast_to("call_#{call_sid}")
  end

  def unsubscribed
    # ...
  end

  def handle_message(data)
    data.fetch("result").values_at("id", "text", "event") => id, text, event_type

    cable_ready.append_or_replace(
      selector: "#events",
      target: "#event_#{id}",
      html: render_event(id:, text:, event_type:)
    ).broadcast_to("call_#{call_sid}")
  end

  private

  def call_sid = params["sid"]

  def render_event(**)
    Views::Calls::Show::Event.new(**).call
  end

  def render_call(**)
    Views::Calls::Show::Call.new(**).call
  end
end

That’s pretty much it!

The final piece of the puzzle is a publish/subscribe service. This is a component that helps to distribute broadcasts across connected clients. With Action Cable, we usually use Redis for that. With AnyCable, we can avoid adding one more infrastructure dependency by using an embedded NATS server. We enable it in our Go application by setting the default environment variables:

# .env
ANYCABLE_EMBED_NATS=true
ANYCABLE_BROADCAST_ADAPTER=nats

In the Hanami application, we configured AnyCable to use NATS via the application settings:

# config/settings.rb
module Kaisen
  class Settings < Hanami::Settings
    # ...
    setting :anycable_broadcast_adapter, default: "nats", constructor: Types::String
  end
end

To make AnyCable understand Hanami settings, we added a custom loader for Anyway Config (which powers AnyCable configuration). You can find it here.

To verify that our setup works as expected, we can emulate a phone call using the ruby.yml wsdirector scenario. You should see something like this in your browser:

Twilio calls monitor demo

Alright, that was a heck of a journey, but now it’s time to say goodbye!

In this post, we learned that AnyCable can truly hold up on the “any” part of its namesake if you start using it as a library. Whenever you need to connect your Ruby or Rails application to a WebSocket stream, or when you have to support proprietary client applications speaking foreign protocols—AnyCable is there to help you.

We also hope that our experiments with Hanami will encourage you to pay attention to this Ruby framework and its ecosystem.

P.S. Don’t forget to star the demo application repo on GitHub!

Do you hear that sound? The cosmic phone is ringing, and it’s Evil Martians on the line! We’re ready to assist with your project: whether it’s a Rails project or a web or mobile application, if you’re in need of expert problem solving with product design, frontend, backend, or software reliability, we’re here, ready to answer the call! Reach out to us!

Join our email newsletter

Get all the new posts delivered directly to your inbox. Unsubscribe anytime.

In the same orbit

How can we help you?

Martians at a glance
17
years in business

We transform growth-stage startups into unicorns, build developer tools, and create open source products.

If you prefer email, write to us at surrender@evilmartians.com