How to integrate ypy-websocket with django channels (for websocket) and redis (as data store)

Thanks for this great library. I am trying to keep my backend in django channels for websocket. I am trying to understand how can I integrate ypy-websocket so that I can store all the updates in a redis (or some other datastore - whatever is efficitent for persistence). my questions:

  1. how can django channels post update to a ystore?
  2. how can client receive the existing document when they first connect?
  3. how can I convert the binary data in the store to some readable format to allow elasticsearch on these docs?

Thanks in advance.

Hi @anuj,

I can’t answer any specifics on ypy-websocket. However, I know that they already have a persistence layer. Maybe you can post a question in the repository because David (the author) is not active here.

Ypy and Yjs share a similar updates API: Document Updates - Yjs Docs

I recommend storing the Yjs state somewhere and using that as the source of truth for manipulations. The easiest approach is to store the encoded Yjs document in a database whenever the document changes (after a debounce). An optimization would be to store incremental updates instead of rewriting the whole document all the time.

Whenever you store the Yjs document, you can transform the document to plain text (or HTML) and send it to elasticsearch. It is hard to write a generic persistence adapter that works for all. So you will likely have to adapt ypy-websocket to suit your needs.

hi @dmonad

thanks for your response.

I was able to transform the document to plain text.

I was also able to run it using redis as a store. I am creating Ydoc when the room is created and apply updates from Redis.

I am using redis list to store the document (each key is doc_id and value is a list of updates).

below is how I am generating the doc for existing data and send to client.

ydoc = Y.YDoc()
for update in redis.lrange(doc_key, 0, -1):
    Y.apply_update(ydoc, update)

state = Y.encode_state_vector(self.room.ydoc)
msg = create_sync_step1_message(state)
# send the msg to client

but the problem is that this first sync takes time (few sec) (as apply updates takes time as the document size increases).

what is the best way to resolve this?

I recommend applying several updates in a single transaction.

In Yjs you can use

Y.transact(() => {
  updates.forEach(update => Y.applyUpdate(ydoc, update))
})

This will reduce the overhead of sending an event for every single incremental change.

Next, you should optimize and reduce the number of updates. One approach is to merge all updates from time to time and replace the existing list with a list containing only a single merged document. Most systems want to merge the state anyway and sync it to a persistent database. Once you do this you could clear the list.

Thanks again Kevin for quick reply.

I liked the idea of merging the updates but I think merging will lose metadata along with updates. will see if we can keep that information separately in the database.

Your replies boost my confidence and helped me solve a lot of problems.

1 Like

Metadata is never lost in Yjs. However, applying changes will remove content that is marked as deleted. So, after merging updates you are not able to restore old states. If you want to be able to restore old states (e.g. using Y snapshots), then you can simply disable garbage collection when merging updates (ydoc.gc = false).

Another option would be to use Y.mergeUpdates([update1, update2]) which will simply merge updates without performing garbage collection.

Thanks Kevin @dmonad I applied your suggestions and now it’s much better. Thanks for your work and help.

1 Like

Hey @anuj, were you able to get this to work?

I already have Django + channels for WebSocket connection. I have a requirement to add collaborative editing and looking for ways to integrate ypy-websocket with channels to create an endpoint.

Any suggestion on how best to go about it?

hi @shajha yes, I am using the modified version of process_message method from ypy-websocket.

    async def connect(self):
        self.group_name = 'some group name'
        self.ydoc = Y.YDoc()
        # you can keep it empty for new doc or apply updates from db
        self.accept()
        # if you are using updating doc from database, you might want to send the sync1 message
        state = Y.encode_state_vector(self.ydoc)
        msg = create_sync_step1_message(state)
        await self.send_message(msg)

    async def send_message(self, bytes_data):
        if not bytes_data:
            return
        # Send message to room group
        await self.channel_layer.group_send(self.group_name, {"type": "chat_message", "message": bytes_data})

    async def receive(self, text_data=None, bytes_data=None):
        await self.send_message(bytes_data)
        update = await self.process_message(bytes_data, self.ydoc)
        # save this update to your database, i am saving in redis list.
    
    async def process_message(self, message: bytes, ydoc: Y.YDoc):
        if message[0] == YMessageType.SYNC:
            message_type = message[1]
            msg = message[2:]
            if message_type == YMessageType.SYNC_STEP1:
                state = read_message(msg)
                update = Y.encode_state_as_update(ydoc, state)
                reply = create_sync_step2_message(update)
                await self.send_message(reply)
            elif message_type in (YMessageType.SYNC_STEP2, YMessageType.SYNC_UPDATE):
                update = read_message(msg)
                Y.apply_update(ydoc, update)
                return update

Thanks, @anuj! This is very helpful.

Quick question -

  1. create_sync_step1_message(state) , create_sync_step2_message(update) and read_message(msg) seems like local fn that handles state + db sync. Is this understanding correct?

  2. What’s the definition of YMessageType is this from the package itself or a variable you’ve declared?

Again, sorry for asking to be spoon-fed here. I understand I can dig further myself with what you have already shared.

They are all from ypy-websocket/yutils.py at main · y-crdt/ypy-websocket · GitHub

Hi @anuj,
It would be great if you could contribute to ypy-websocket, for instance by adding your Redis backend in the Y stores. We currently support file-based as well as SQLite-based stores.
BTW, Y stores are currently part of ypy-websocket, but they should ideally live in their own package, as they shouldn’t be tied to the transport layer.
Also, let me know if it would be better in Django if ypy-websocket supported ASGI. There is an open issue for that.

Hi @davidbrochart I am happy to contribute to ypy-websocket. I will work on adding redis backend to the Y stores.

I looked into the issue regarding ypy-websocket support for ASGI. I am not sure if adding it in Django is better because django has django-channels library (which is supported by Django community itself). For FastAPI and others it might be more helpful.

2 Likes