The Database Storage (DBStorage) service is responsible for persisting trading data and managing user records in PostgreSQL. It consumes messages from Redis Streams, stores closed orders in the database, handles user creation/updates, and serves queries for historical order data.Location:apps/DBstorage/src/index.ts:1Database: PostgreSQL with Prisma ORMMessage Source: Redis Streams (dbStorageStream)
The DBStorage service uses a single consumer that processes messages sequentially to maintain data consistency and avoid race conditions in the database.
Handles user creation and updates with conflict resolution:
if (result.function === "createUser") { const { userId, userEmail } = result.message || {}; if (!userId || !userEmail) { console.log("createUser missing fields", result.message); return; } try { // Check if user exists by email const existingUserByEmail = await prisma.user.findUnique({ where: { email: userEmail } }); if (existingUserByEmail) { // User exists with this email, update userID if different if (existingUserByEmail.userID !== userId) { await prisma.user.update({ where: { email: userEmail }, data: { userID: userId } }); console.log("Updated existing user's userID", { userId, userEmail }); } else { console.log("User already exists with same userID", { userId, userEmail }); } } else { // Check if userID already exists const existingUserByID = await prisma.user.findUnique({ where: { userID: userId } }); if (existingUserByID) { // Update email for existing userID await prisma.user.update({ where: { userID: userId }, data: { email: userEmail } }); console.log("Updated existing user's email", { userId, userEmail }); } else { // Create new user await prisma.user.create({ data: { userID: userId, email: userEmail } }); console.log("Created new user", { userId, userEmail }); } } } catch (error) { console.error("Error in createUser:", error); // Conflict resolution: find and update const existingUser = await prisma.user.findFirst({ where: { OR: [ { email: userEmail }, { userID: userId } ] } }); if (existingUser) { await prisma.user.update({ where: { id: existingUser.id }, data: { userID: userId, email: userEmail } }); console.log("Updated existing user to resolve conflict", { userId, userEmail }); } }}
Unique Constraint Handling: The user creation logic handles race conditions where the same user might be created by both Engine and DBStorage simultaneously. It uses conflict resolution to ensure data consistency.
model User { id Int @id @default(autoincrement()) userID String @unique email String @unique balance Float @default(10000.0) orders Orders[] @@index([userID]) @@index([email])}
-- Index on userId for fast user-specific queriesCREATE INDEX idx_orders_userId ON Orders(userId);-- Index on closeTime for chronological sortingCREATE INDEX idx_orders_closeTime ON Orders(closeTime DESC);-- Composite index for filtered queriesCREATE INDEX idx_orders_user_time ON Orders(userId, closeTime DESC);