Spend almost 4 days trying to understand YJS but ALAS!

So i have a pretty easy to understand use-case:

STACK:

  1. ReactJS
  2. NodeJS
  3. SocketIO
  4. MongoDB and Redis
  5. Typescript
  6. Kubernetes (multiple nodes)

OBJECTIVE
I am trying you to understand what i am trying to do:

  1. I am using Prosemirror
  2. Editor is being used by a team of 6 people to send messages to clients in timeline (just like facebook timeline). This message area is not synced.
  3. I need templates to be made so its easier to send messages. When sending message, you can select predefined templates and send instantly.
  4. Templates are needed to be made in collaboration by everyone in team
  5. When you create a template, you can use it in in timeline section(again which is not realtime collaborative)
  6. Collaboration is happening only when you are making a template. I guess this makes sense now.
  7. yjs document needs to be preserved in Redis so that any machine (node of kubernetes) can work on same document.

PROBLEMS
Now i am telling about my pain points

  1. I have SocketIO working already in front end, and i can see a websocket provider (y-websocket) in yjs github which i don’t want to use, neither it is feasible in my usecase as socket-io supports alternatives(of ws like long polling), so i need SocketIO connector
  2. I need my connector to save changes in redis or mongo, so if someone wants to use template while it is being modified from someone else they can have the document at least.
  3. I need document in Prosemirror format so i can append(or replace) the message box with new content when someone clicks on a certain template.

What i did so far
By looking at how y-websocket connector is working, i have tried the following things.

  1. i am trying to make my own connector based on socket-io (sharing the code in comment, so far)
  2. I am trying to implement the similar logic on server side (sharing the code in comment, so far)

WHAT I AM NOT GETTING

  1. How the hell connector works. And i am not getting any clue of it even.
  2. How can i implement multiple instances synced using redis or whatever i can use.
1 Like

CLIENT SIDE of CONNECTOR

import SocketClient from 'socket.io-client'
import Y from 'yjs'

// y-protocls
import { Awareness } from 'y-protocols/awareness'
import { writeUpdate, readSyncMessage } from 'y-protocols/sync.js'

// Lib0
import { createEncoder, writeVarUint, toUint8Array } from 'lib0/encoding'
import { createDecoder, readVarUint } from 'lib0/decoding'
import { Observable } from 'lib0/observable'

// Constants
const messageSync = 0
const messageQueryAwareness = 3
const messageAwareness = 1
const messageAuth = 2

export class SocketIOProvider extends Observable<string> {
  private io: SocketIOClient.Socket
  private doc: Y.Doc
  private listeningEvent: string
  private broadcastingEvent: string
  private joinRoomEvent: string
  public awareness: Awareness
  private roomName: string
  private _synced: boolean

  constructor({
    doc,
    io,
    roomName,
    serverURL,
    path,
    listeningEvent,
    broadcastingEvent,
    joinRoomEvent,
    awarenessName,
  }: SocketIOProviderProps) {
    super()
    if (serverURL) {
      this.io = SocketClient(serverURL, {
        transports: ['websocket', 'polling', 'flashsocket'],
        path: path,
        autoConnect: true,
      })
      this.io.connect()
    } else {
      if (!io) {
        throw new Error(
          'Error Making SocketIO Provider. Neither serverURL is passed nor io instance',
        )
      }
      this.io = io
    }
    // Initializing Class Variables
    this.doc = doc
    this._synced = false
    this.listeningEvent = listeningEvent ? listeningEvent : 'LISTEN_EVENT_YJS'
    this.broadcastingEvent = broadcastingEvent
      ? broadcastingEvent
      : 'BROADCAST_EVENT_YJS'
    this.joinRoomEvent = joinRoomEvent ? joinRoomEvent : 'JOIN_ROOM_YJS'
    this.awareness = new Awareness(doc)
    if (awarenessName) {
      this.awareness.setLocalStateField(
        awarenessName.field,
        awarenessName.value,
      )
    }
    this.roomName = roomName ? roomName : 'content-room'

    // Calling Init
    this.Init()
  }

  Init() {
    // Join Room Event
    this.io.emit(this.joinRoomEvent, this.roomName)
    // Listen event
    this.io.on(
      this.listeningEvent,
      (payload: { data: Uint8Array; roomName: string }) => {
        const emitSynced = true
        console.log('GOT UPDATE FROM SERVERRRRRRR:', payload)
        const decoder = createDecoder(payload.data)
        const encoder = createEncoder()
        const messageType = readVarUint(decoder)
        switch (messageType) {
          case messageSync: {
            writeVarUint(encoder, messageSync)
            const syncMessageType = readSyncMessage(
              decoder,
              encoder,
              this.doc,
              this.io.id,
            )
            console.log('synccc:', syncMessageType)
            // if (
            //   emitSynced &&
            //   syncMessageType === syncProtocol.messageYjsSyncStep2 &&
            //   !provider.synced
            // ) {
            //   provider.synced = true
            // }
            // break
          }
          default:
            console.error('Unable to compute message')
            return encoder
        }
      },
    )

    // Listen Doc Changes and Emit
    this.doc.on('update', (update: Uint8Array, origin: SocketIOProvider) => {
      console.log('WE NEED EMIT:', update, origin)
      if (origin !== this || origin === null) {
        const encoder = createEncoder()
        writeVarUint(encoder, 0)
        writeUpdate(encoder, update)
        this.io.emit(this.broadcastingEvent, {
          data: toUint8Array(encoder),
          roomName: this.roomName,
        })
      }
    })
  }
  get synced() {
    return this._synced
  }

  set synced(state) {
    if (this._synced !== state) {
      this._synced = state
      // emit event
      // this.emit('sync', [state])
    }
  }

  /** Cleanup. To destroy the io instance and other stuff */
  public destroy() {
    this.io.close()
  }
}

export interface SocketIOProviderProps {
  /** yjs doc instance */
  doc: Y.Doc
  /** existing io instance */
  io?: SocketIOClient.Socket
  /** Default: 'content-room' */
  roomName?: string
  /** If you don't have socket io, specify the socket io server url. It will create an instance of socket io for you */
  serverURL?: string
  /** any specific path for server socket io */
  path?: string
  /** Default: "LISTEN_EVENT_YJS" */
  listeningEvent?: string
  /** Default: "BROADCAST_EVENT_YJS" */
  broadcastingEvent?: string
  /** Default: 'JOIN_ROOM_YJS" */
  joinRoomEvent?: string
  /** initial awareness to be used */
  awarenessName?: {
    field: string
    value: {
      color: string
      name: string
    }
  }
}

SERVER SIDE

import { Namespace as SocketIONamespace, Server } from 'socket.io'
import { ISocket } from '@interfaces/sockets'
import Cookie from 'cookie'
import cookieParser from 'cookie-parser'
import OurJWT from '../../Organs/jwt'
import { AWJWT } from '@interfaces/utilities'
import { decoding, encoding } from 'lib0'
import { readSyncMessage } from 'y-protocols/sync'

export default class ProsemirrorNamespaceClass {
  public SocketIO: Server
  public Namespace: SocketIONamespace

  constructor(SocketIO: Server) {
    this.SocketIO = SocketIO
    this.Namespace = SocketIO.of('/prose')

    this.initializeConnection()
    // this.initializeAuthentication()
  }

  initializeConnection = (): void => {
    this.Namespace.on('connection', (socket: ISocket) => {
      console.log('connection:', socket.id)
      socket.on('JOIN_TIMELINE_POST', (roomName) => {
        console.log('RoomName:', roomName)
        socket.join(roomName)
      })
      socket.on('PROSEMIRROR_REALTIME_UPDATE', (payload) => {
        console.log('PROSEMIRROR_REALTIME_UPDATE:', payload.data, payload.roomName)
        // const encoder = encoding.createEncoder()
        // const decoder = decoding.createDecoder(payload.data)
        // const messageType = decoding.readVarUint(decoder)
        // console.log('messageType:', messageType)
        // switch (messageType) {
        //   case 0:
        //     encoding.writeVarUint(encoder, 0)
        //     readSyncMessage(decoder, encoder, doc, null)
        //     this.Namespace.to(payload.roomName).emit('PROSEMIRROR_LISTEN_SERVER_UPDATE', realPayload)
        //     break
        // }
      })
      socket.on('disconnect', () => {})
    })
  }

  initializeAuthentication = (): void => {
    // OMITTED FOR SAFETY
  }
}

It is fairly simple to implement your own connector. I think https://github.com/yjs/yjs#Document-Updates nicely explains how syncing in Yjs works.

Simply store the document updates in a database. This is also fairly simple. On every change, push the update, or the complete document, to your database. Syncing the servers over PubSub is a bit more complicated. y-redis - a project I’m currently working on - enables you to store document updates efficiently as incremental updates and sync all servers using Redis PubSub.

The simplest approach is to let the client do this. Otherwise you need to load ProseMirror on the server side.

Thank you so much for the quick response. I am looking into details. What i assume is that:

  1. Create a new Y.DOC() instance on client side
  2. Listen to changes in Y.DOC()
  3. Emit Changes to server (using socket-io)
  4. Server will apply the changes on its document (on server from redis) and broadcast to everyone in room
  5. Client will get the update and apply those changes

If we think about awareness, same will happen for it as Y.DOC contains awareness.

That was the case when i am working on template creation.

Now if someone is trying to get a list of templates, and clicks one. Unfortunately i can’t pass the Y.DOC to user, may be i can. But the problem is that in this specific prosemirror i am not using and Y plugins. I need normal prosemirror content.

If i look into y.doc api, i can’t find a way to get the prosemirror document out of it.

As said. Yjs only models the ProseMirror content. You need to create a ProseMirror instance and bind it to a Yjs document in order to get / set the content.

You can retrieve the Yjs document in an XML format by doing yXmlType.toString(). But that’s it.

You can simply encode the Yjs document and send it to the user, and restruct it.

If you just want to apply some initial content as a template, you could also store the template as a Yjs document update, and then apply the document update as the initial content on the server. Then you don’t need the ProseMirror instance.