I wanted to share some updates I made to ensure compatibility with Redis Cluster. Given the challenges of operating across different shards in a Redis Cluster environment, I made three key adjustments:
- Script Separation: To address the issue of
redisWorkerStreamName
and redisWorkerGroupName
being global variables and potentially causing script execution problems across different shards, I’ve split the related operations into separate Lua scripts. Initially, the script combined stream existence checks and message additions.
The original script:
addMessage: redis.defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT: `
if redis.call("EXISTS", KEYS[1]) == 0 then
redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1])
end
redis.call("XADD", KEYS[1], "*", "m", ARGV[1])
`,
transformArguments(key, message) {
return [key, message];
},
transformReply(x) {
return x;
},
}),
And it has been refactored into these more focused scripts:
checkStreamExists: redis.defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT: `
return redis.call("EXISTS", KEYS[1])
`,
transformArguments(streamKey) {
return [streamKey];
},
transformReply(reply) {
return reply === 1;
},
}),
addCompactToWorkerStream: redis.defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT: `
redis.call("XADD", KEYS[1], "*", "compact", ARGV[1])
`,
transformArguments(workerStreamKey, streamKey) {
return [workerStreamKey, streamKey];
},
transformReply(x) {
return x;
},
}),
addMessageToStream: redis.defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT: `
redis.call("XADD", KEYS[1], "*", "m", ARGV[1])
`,
transformArguments(key, message) {
return [key, message];
},
transformReply(x) {
return x;
},
}),
- Individual Script Functions: Each part of the process is now handled by its own dedicated Lua script. The refactoring was necessary to handle the distinct operations in a manner compatible with Redis Cluster requirements. Here’s the adjustment in the application logic:
Original function:
async addMessage(path, room, docid, m) {
if (m[0] === protocol.messageSync && m[1] === protocol.messageSyncStep2) {
if (m.byteLength < 4) {
return promise.resolve();
}
m[1] = protocol.messageSyncUpdate;
}
return this.redis.addMessage(
computeRedisRoomStreamName(path, room, docid, this.prefix),
m
);
}
Updated function:
async addMessage(path, room, docid, m) {
if (m[0] === protocol.messageSync && m[1] === protocol.messageSyncStep2) {
if (m.byteLength < 4) {
return promise.resolve();
}
m[1] = protocol.messageSyncUpdate;
}
const streamKey = computeRedisRoomStreamName(
path,
room,
docid,
this.prefix
);
const exists = await this.redis.checkStreamExists(streamKey);
if (!exists) {
await this.redis.addCompactToWorkerStream(
this.redisWorkerStreamName,
streamKey
);
}
return this.redis.addMessageToStream(streamKey, m);
}
- Stream Key Management: To further enhance compatibility and ensure all keys map to appropriate hash slots, I modified how stream keys are managed:
Original key computation:
export const computeRedisRoomStreamName = (room, docid, prefix) =>
`${prefix}:room:${encodeURIComponent(room)}:${encodeURIComponent(docid)}`;
Updated key computation:
export const computeRedisRoomStreamName = (path, room, docid, prefix) => {
return `${prefix}:{${encodeURIComponent(room)}:${encodeURIComponent(
docid
)}}:stream`;
};
These changes were essential to ensure that the application runs smoothly across multiple nodes in a Redis Cluster, avoiding cross-slot command executions which were causing failures. I hope these insights might be helpful for your future projects