Streaming Data

Working with the Firehose and Jetstream

Streaming data

One of the core primitives of the AT Protocol is the firehose. It is an authenticated stream of events used to efficiently sync user updates (posts, likes, follows, handle changes, etc).

Many AT applications that need to stream incoming data will utilize the firehose — from feed generators to labelers, to bots and search engines. In the AT ecosystem, there are many different endpoints that seed these firehose APIs. Each PDS serves a stream of all of the activity on the repos it is responsible for. From there, Relays aggregate the streams of each participating PDS into a single unified stream — the firehose.

Firehose

Anyone can connect to the firehose without authentication — this is a core feature of the protocol. To get started, open a WebSocket connection to any provider of the com.atproto.sync.subscribeRepos endpoint:

$ websocat wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos

From here, you would need to read each message as it comes in, and decode the associated data. Our Go SDK is currently the most feature-complete for interacting with the firehose directly.

Bear in mind that firehose output format is one of the more complex parts of AT, involving decoding binary CBOR data and CAR files. Additionally, the volume of data has increased rapidly as the network has grown. The full synchronization firehose is core network infrastructure, but for end users such as feed developers, we provide an alternative streaming solution called Jetstream.

Jetstream

Jetstream has a few key advantages:

  • simple JSON encoding
  • reduced bandwidth, and compression
  • ability to filter by collection (NSID) or repo (DID)

A Jetstream server consumes from the firehose and fans out to many subscribers. It is open source, implemented in Go, simple to self-host. There is an official client library included (in Go), and community client libraries have been developed.

You can work with Jetstream like any other websocket. Just provide a Jetstream endpoint like wss://jetstream2.us-east.bsky.network/subscribe, and a wantedCollections parameter, like ?wantedCollections=app.bsky.feed.post.

Here is a Python example:

import asyncio
import websockets

uri = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post"

async def listen_to_websocket():
  async with websockets.connect(uri) as websocket:
    while True:
      try:
        message = await websocket.recv()
        print(message)
      except websockets.ConnectionClosed as e:
        print(f"Connection closed: {e}")
        break
      except Exception as e:
        print(f"Error: {e}")

asyncio.get_event_loop().run_until_complete(listen_to_websocket())

Further Reading and Resources