Implementing an SSE provider

Hi,

I am currently trying to implement real time collaboration for a Map application where app state must be communicated in real time to other clients. This is a generic JSON configuration object with some settings, colors, etc.

I have decided to implement this using POST requests + Server Sent Events. I have been able to get awareness working perfectly like this, but run into issues when trying to send updates on documents which are initially loaded from a database. My approach (which im 99% sure is erroneous) has been to (at the UI client) fetch the persisted configuration from our psql database, create a new Y.Doc on each browser client, then insert the persisted configuration into the new doc via Map.set()'s. On any further updates, i encodeStateAsUpdate and send that via POST up to the server, which simply throws the message into a Redis pub/sub channel, which an SSE endpoint subscribes to which then send those messages down to the clients.

For context, my backend is a set of load balanced instances of the same api, so multiple api instances can be handling requests from multiple users

My issue is that occasionally, changes from one client are successfully propagated to other clients, but not applied. However, when I push an edit from the client where the changes weren’t applied, the changes do get applied on the original client. In that initial state where one client hasn’t pushed edits though, it sometimes gets stuck receiving updates without applying them. After reading a bunch here, my guess is that I’m either initializing the documents incorrectly, or not properly performing the sync steps described in the docs. That said, I wanted to try and implement a SSE Provider similar to the existing WebSocket provider in order to potentially solve my problems, and I had the following questions i was hoping to get answered. Lastly, if there’s anything else glaringly wrong with my approach i would love to get some feedback.

  1. Is there anything i could do with this current implementation that would fix my issues so that I don’t have to implement this provider?

  2. In this new provider, if the server is handling receiving and pushing messages in different request handlers, how does syncing work?

  3. Does a Y.Doc instance need to exist on the server in order to sync properly with the clients? How does this impact or how is it impacted by #1?

  4. Who is responsible for writing the latest updates to a database for persistence? Specifically i was wondering how the discrepancy between the latest persisted document and latest document (from ongoing edits) would impact users joining the document while it is already being edited by other users.

Thanks for any help, and i would be happy to provide any specific information that would help clarify or solve my issue here

So i was just thinking about how i could implement the sync protocol and i think i could accomplish it with a little more PubSub. If i have a channel for syncEvents which is scoped to client_id, i could push a SSE with info for sync1, and the client can POST their info for sync1, where the handler can push the message into a docId:clientId:sync1 channel, and the active SSE connection can pick it up in a similar fashion to the WS provider. And then same thing could be mirrored for sync2.

I assume this means the Y.Doc would live in the SSE GET handler.

Hi, welcome.

Can you clarify if you are saving the encoded binary or the JSON representation of the Doc to disk?

I don’t know of any network providers besides y-websocket, so that’s probably your best example to base yours on if you want to use a different transportation protocol.

Hi! The post, sse messages, and pub/sub publishers all send the base64 data. At the moment im only initializing the Y.doc’s from disk since i havent figured out how to persist the changes yet. So i initialize the client Y.doc’s separately with data from psql, then propagate further changes, and when all clients leave then the next client will start over from the persisted state since nothing was saved

Binary YJS data, or JSON data? This is what I’m trying to get at, since CRDT’s only work if you save the entire version history.

Sorry, the document is from an old implementation and is stored in plain JSON which I add to a new y.doc by iterating over the keys and performing Map.set’s

Do you convert the old document to a YJS Doc as a one-time migration, or are you saying this happens every time the app is loaded?

My current rough implementation performs this initialization whenever a client navigates to the “app” page in my React App, so it happens when the app is loaded. I got the impression from some other posts here that if i wanted to initialize a document from storage and have all clients use it, i would have to make a new doc on the server side, then populate it, then propagate it. Its not currently necessary for me to persist the the history of the document between sessions, so i figured initializing a new doc with the plain JSON values would be sufficient

Yeah, that’s what I was worried about. Unfortunately CRDT’s only work if you store the entire history. This is the only way that they can ensure that offline edits are applied correctly. How do you know when a client has closed their browser, rather than just having connection issues? If you’re creating an offline-first app, you never have full knowledge of when clients are connected and when they modify the Doc.

The only way you can safely throw away the history is to ensure that all clients are synced AND all clients close their browser windows so that nobody holds onto the old history. This is going to be really difficult in a real-world environment where clients can go offline (and continue to make edits) at any time. YJS can’t really handle this. Some people truncate the history from time to time to save disk space and coordinate a migration of each of the clients when that occurs. I don’t think it is advisable to do that every time the app starts though.

1 Like

To answer some of your original questions (though it may be moot if you are not persisting the full Doc history)…

Your current implementation is a YJS provider (or at least it’s trying to be). Anything that tries to sync a YJS Doc is effectively a provider.

Not sure on this one.

According to the docs, “it is possible to sync clients and compute delta updates without loading the Yjs document to memory.” See: Document Updates | Yjs Docs

Typically that’s a separate provider, such as y-mongodb-provider that is loaded on the server. It subscribes to updates on the Doc (which may, independently, be applied by the network provider) and saves them to the db. Users will get real-time updates through the network provider and are typically not be affected by the db provider persisting them in the background.

It sounds like an update is getting dropped or their histories are deviating. Each update increases the client’s clock by 1. Only updates of successive clock values are applied, and YJS will wait until the missing updates arrive.

1 Like

Wow! Lots of valuable information here, thank you for the time you’re putting into these responses.

I think I understand what you mean. If i’m understanding correctly, If all clients disconnect but one remains, any subsequent connections will be “fresh” docs but someone will have the old history which will cause problems. I guess I misunderstood how the “sync” protocol works; i thought it would solve this in the sense that if i connect to the server and i have “new” document, and then if there is history being passed around by other clients, the “sync” would allow me to get it and apply it to my doc, effectively bringing it up to date. Does this only work for documents that share common history?

With that said, could you help me understand what the steps are for loading the most up to date document if i have a persisted document in psql (lets say i regularly persisted the base64 of some encodeStateAsUpdateCall), and then i connect to the server and want the most recent document displayed on my screen, with subsequent updates from other clients being applied.

My guess is if we’re storing the base64, we convert that to uInt8 and applyAsUpdate on a new Y.Doc. Then… sync?

Yes, exactly.

Correct. You can only sync Docs with a common history. new Doc() shares a common history with all Docs, because it has no history! But as soon as you apply an update (or call Map.set or any other shared type mutation) it won’t be able to sync with other Docs. This explains why your original client (with updates a,b,c,d,e) was able to get updates from the other client (with updates a,b,c) but not the other way around (f) until the missing updates came in (d,e).

Loading the updates and recreating the Doc is pretty easy: Just convert them from base64 to uint8 and call Y.applyUpdate(doc, update). You can call this on any number of updates, and it’s idempotent.

The sync protocol I can’t really speak to, as I haven’t built a network provider myself, but it’s described here: y-protocols/PROTOCOL.md at master · yjs/y-protocols · GitHub

Got it, thank you, this was very helpful. My key take-aways from the above are that

  1. I should persist the base64 data via some database provider so that when a new client loads in, they can get updates without issue
  2. For docs to sync they must share a common history, i.e. they must contain the same updates.

This leaves one last question that i have, though it might be a bit off topic relative to the initial inspiration of this post. Since syncing came up, I came across the section in the docs that talks about “calculating diffs” and im a bit confused. I understand in the “push the whole document” implementation how updates occur. I receive constant updates of the full state from some server (or maybe peers), which are from all the other clients. However, as i understand it in this implementation i am sending out my state vector, and i am being sent back a diff i must apply.

  1. In this implementation is the server maintaining a Y.Doc it uses to calculate the diff and send back to the client that made the request? Or is the server forwarding the state vector to all other clients, and each client is responding with their diff, which the original requesting client then applies all of?

  2. If im sending out my state vector (presumably on local document change), if i stop making changes, does my local document ever get updated by the other clients? From my understanding, this implementation implies that i’m only ever sending out diffs in response to other clients sending out their state vectors?

I think the server is maintaining a Y.Doc that it uses to calculate the diff and send back to the client that made the request. I don’t think the server is forwarding any state vectors…but I’m not super knowledgeable about this functionality.

Think of two-way data binding as involving two phases: the initial sync and the ongoing sync.

The initial sync requires Doc A to send its updates to Doc B, and Doc B to send its updates to Doc A. State vectors allow only the necessary updates to be sent, rather than all updates.

const stateVector1 = Y.encodeStateVector(ydoc1)
const stateVector2 = Y.encodeStateVector(ydoc2)
const diff1 = Y.encodeStateAsUpdate(ydoc1, stateVector2)
const diff2 = Y.encodeStateAsUpdate(ydoc2, stateVector1)
Y.applyUpdate(ydoc1, diff2)
Y.applyUpdate(ydoc2, diff1)

Once the two docs are synced, then you just need to subscribe to updates in both directions to keep them in sync:

doc1.on('update', update => {
  Y.applyUpdate(doc2, update)
})

doc2.on('update', update => {
  Y.applyUpdate(doc1, update)
})

The state vector is only used in the initial sync, so your question about sending out a state vector on local change doesn’t really make sense.

In a client-server model, the server can send updates directly to the client after the initial sync.

1 Like

Ohhhhhh, i thought the state vector was used for each update. So you send the state vectors for the initial update, and then after sync you just forward all updates from any client to other clients. I think that’s probably the root of my confusion surrounding the forwarding of state vectors, i thought this was how updates were being communicated in general but i see now that doesn’t make sense. After you’re synced, i suppose every subsequent update is necessary so you just send them as you produce them. Thank you! I think with this information i can implement this

1 Like

For others who found this post, i found this article which also helped me understand and visualize all this behavior. This figure in particular helped me understand the syncing.

One of the mistakes i was making which resulted in the inability of one client to apply changes from another was to skip the syncing step.

2 Likes

I’m wondering how this knowledge would transfer to a system like mine where there are multiple servers running which serve the overall client pool. Is there any documentation out there on this? Sending updates from clients to others is simple since i don’t need to maintain anything on the server to do that, but what about syncing? When a client joins, they should be able to sync with any server, and it should work even if their SSE GET request is with one server, but their POST requests are with another (due to load balancing).

And since the server count can increase or decrease at any time (GCP CloudRun autoscaling), it seems precarious to have the servers maintain a Y.Doc since they also would have to sync with the other servers when they spin up.

Consider the following:

  1. Client A opens the document page
  2. Client A sends SyncStep1 to Server A
  3. Server A responds to Client A with SyncStep2 (updates)
  4. Server A now then sends SyncStep1 to Client A (what updates do you have that i dont)
  5. Client A sends a POST request with SyncStep2 but it gets routed to Server B

Or maybe that post request goes to a totally new Server C since A and B were congested, and Server C has no active Y.Doc it was maintaining so it has to somehow get synced as well… It seems like maintaining Y.Docs on the server could cause problems in these circumstances. So im wondering if theres a solution to this where I dont have to maintain them. Maybe i can forward those sync requests to the clients as opposed to trying to do it on the server, but on the surface this seems like it could result in a lot of requests containing a lot of redundant data.

Try searching the forums/repo for “horizontal scaling”. You’re right that the typical client-server setup for YJS assumes that the same server is receiving and responding to all requests. In order to load balance, you need redis or some kind of global pub-sub channel so that the server pool can properly direct requests.

The only place I’ve seen this done is here:

The websocket-server that comes with y-websocket essentially maintains a copy of the y-js document(s) in memory and syncs it between different clients connected to the same doc.

The websocket-server in this repo isolates the updates that clients send to it, persists these updates to the database, and publishes these updates (using redis-pubsub) in a channel for the document. Also, when a doc is created for the first time in the websocket-server, the server reads all the updates stored in the database, and applies those updates to the document, effectively initializing it.

This makes the websocket-server provided in this repo persistent and horizontally-scalable on paper.

1 Like

Thanks for the information, I was able to find a relevant thread from this forum and got learned some helpful techniques from the repository and the posts. I hadn’t considered using Redis Lists as a way of solving the cross-server syncronization issue. If a new server joins in, they can instantiate a Y.Doc from storage, then read any outstanding edits from the redis updates queue, and then they should be up to date with the room. This seems like an effective way to make the server group behave like a single server. The only thing i was kind of curious about is a line from the redis.ts file in the repo you linked:

if (len > 100) {
    redis.pipeline()
      .lpopBuffer(getDocUpdatesKey(doc))
      .rpushBuffer(getDocUpdatesKey(doc), Buffer.from(update))
      .expire(getDocUpdatesKey(doc), 300)
      .exec()
  }

The implementation appears to persist every change, then publish the change to a Pub/Sub channel, and push it to the queue. So, connected servers get their updates quickly, new servers can get the latest updates, and the document is persisted regularly (maybe too regularly?). One thing i’m wondering is, what if the persistence DB went down while clients were editing for long enough that updates arent persisted (imagine db errors dont crash the server or exit the handler) but make their way through the 100 size queue of updates, and then the db comes back online. If multiple clients are editing, 100 updates could go by quickly, and now the DB is missing updates that the active clients are using, so new clients wouldnt be able to sync right?

It’s not clear to me what the solution to this is. Maybe you just have to make sure db persistence, queue pushes, and publishes happen in a transaction? But that means every update would have to wait for db writes. Or db errors throw on the server when you try to make updates. In either case, it would be nice to have a strategy that allows something like the DB to go down and come back without stopping updates from traveling to/from clients. I figured DB persistence should happen as some sort of redis queue pop that happens every 30 seconds or something.

P.S. Checked the implementation again and saw

persistUpdate(doc, update)
      .catch((err) => {
        serverLogger.error(err);
        closeConn(doc, origin);
      })
    ;

So it seems like if theres any db problems with this implementation it kills the connection. I also realized that encodeStateAsUpdate encodes the entire document state as an update, so as long as i’m using the server doc’s to overwrite the persisted document then it should be fine once the DB comes back online

1 Like