diff --git a/.github/workflows/cron-webhooks-triggers.yml b/.github/workflows/cron-webhooks-triggers.yml new file mode 100644 index 0000000000..20f54dbf0c --- /dev/null +++ b/.github/workflows/cron-webhooks-triggers.yml @@ -0,0 +1,23 @@ +name: Cron - webhookTriggers + +on: + # "Scheduled workflows run on the latest commit on the default or base branch." + # — https://docs.github.com/en/actions/learn-github-actions/events-that-trigger-workflows#schedule + schedule: + # Runs “every 5 minutes” (see https://crontab.guru) + - cron: "*/5 * * * *" +jobs: + cron-webhookTriggers: + env: + APP_URL: ${{ secrets.APP_URL }} + CRON_API_KEY: ${{ secrets.CRON_API_KEY }} + runs-on: ubuntu-latest + steps: + - name: cURL request + if: ${{ env.APP_URL && env.CRON_API_KEY }} + run: | + curl ${{ secrets.APP_URL }}/api/cron/webhookTriggers \ + -X POST \ + -H 'content-type: application/json' \ + -H 'authorization: ${{ secrets.CRON_API_KEY }}' \ + --fail diff --git a/apps/web/pages/api/cron/webhookTriggers.ts b/apps/web/pages/api/cron/webhookTriggers.ts new file mode 100644 index 0000000000..f262146429 --- /dev/null +++ b/apps/web/pages/api/cron/webhookTriggers.ts @@ -0,0 +1 @@ +export { default } from "@calcom/features/webhooks/lib/cron"; diff --git a/packages/app-store/zapier/api/subscriptions/addSubscription.ts b/packages/app-store/zapier/api/subscriptions/addSubscription.ts index 56bbe16a4d..3bc3ecc2f4 100644 --- a/packages/app-store/zapier/api/subscriptions/addSubscription.ts +++ b/packages/app-store/zapier/api/subscriptions/addSubscription.ts @@ -2,8 +2,8 @@ import type { Prisma } from "@prisma/client"; import type { NextApiRequest, NextApiResponse } from "next"; import { v4 } from "uuid"; -import { scheduleTrigger } from "@calcom/app-store/zapier/lib/nodeScheduler"; import findValidApiKey from "@calcom/features/ee/api-keys/lib/findValidApiKey"; +import { scheduleTrigger } from "@calcom/features/webhooks/lib/scheduleTrigger"; import { defaultHandler, defaultResponder } from "@calcom/lib/server"; import prisma from "@calcom/prisma"; import { BookingStatus, WebhookTriggerEvents } from "@calcom/prisma/enums"; diff --git a/packages/features/bookings/lib/handleCancelBooking.ts b/packages/features/bookings/lib/handleCancelBooking.ts index 277aaae159..a007e7f74b 100644 --- a/packages/features/bookings/lib/handleCancelBooking.ts +++ b/packages/features/bookings/lib/handleCancelBooking.ts @@ -5,7 +5,6 @@ import appStore from "@calcom/app-store"; import { getCalendar } from "@calcom/app-store/_utils/getCalendar"; import { FAKE_DAILY_CREDENTIAL } from "@calcom/app-store/dailyvideo/lib/VideoApiAdapter"; import { DailyLocationType } from "@calcom/app-store/locations"; -import { cancelScheduledJobs } from "@calcom/app-store/zapier/lib/nodeScheduler"; import { deleteMeeting, updateMeeting } from "@calcom/core/videoClient"; import dayjs from "@calcom/dayjs"; import { sendCancelledEmails, sendCancelledSeatEmails } from "@calcom/emails"; @@ -16,6 +15,7 @@ import { sendCancelledReminders } from "@calcom/features/ee/workflows/lib/remind import { deleteScheduledSMSReminder } from "@calcom/features/ee/workflows/lib/reminders/smsReminderManager"; import { deleteScheduledWhatsappReminder } from "@calcom/features/ee/workflows/lib/reminders/whatsappReminderManager"; import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks"; +import { cancelScheduledJobs } from "@calcom/features/webhooks/lib/scheduleTrigger"; import type { EventTypeInfo } from "@calcom/features/webhooks/lib/sendPayload"; import sendPayload from "@calcom/features/webhooks/lib/sendPayload"; import { isPrismaObjOrUndefined, parseRecurringEvent } from "@calcom/lib"; diff --git a/packages/features/bookings/lib/handleConfirmation.ts b/packages/features/bookings/lib/handleConfirmation.ts index 22dd544914..6af8f7b5f5 100644 --- a/packages/features/bookings/lib/handleConfirmation.ts +++ b/packages/features/bookings/lib/handleConfirmation.ts @@ -1,12 +1,12 @@ import type { Prisma, Workflow, WorkflowsOnEventTypes, WorkflowStep } from "@prisma/client"; -import { scheduleTrigger } from "@calcom/app-store/zapier/lib/nodeScheduler"; import type { EventManagerUser } from "@calcom/core/EventManager"; import EventManager from "@calcom/core/EventManager"; import { sendScheduledEmails } from "@calcom/emails"; import { isEventTypeOwnerKYCVerified } from "@calcom/features/ee/workflows/lib/isEventTypeOwnerKYCVerified"; import { scheduleWorkflowReminders } from "@calcom/features/ee/workflows/lib/reminders/reminderScheduler"; import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks"; +import { scheduleTrigger } from "@calcom/features/webhooks/lib/scheduleTrigger"; import type { EventTypeInfo } from "@calcom/features/webhooks/lib/sendPayload"; import sendPayload from "@calcom/features/webhooks/lib/sendPayload"; import { getTeamIdFromEventType } from "@calcom/lib/getTeamIdFromEventType"; diff --git a/packages/features/bookings/lib/handleNewBooking.ts b/packages/features/bookings/lib/handleNewBooking.ts index 03702f100d..af39a15eac 100644 --- a/packages/features/bookings/lib/handleNewBooking.ts +++ b/packages/features/bookings/lib/handleNewBooking.ts @@ -19,7 +19,6 @@ import { } from "@calcom/app-store/locations"; import type { EventTypeAppsList } from "@calcom/app-store/utils"; import { getAppFromSlug } from "@calcom/app-store/utils"; -import { cancelScheduledJobs, scheduleTrigger } from "@calcom/app-store/zapier/lib/nodeScheduler"; import EventManager from "@calcom/core/EventManager"; import { getEventName } from "@calcom/core/event"; import { getUserAvailability } from "@calcom/core/getUserAvailability"; @@ -48,6 +47,7 @@ import { import { getFullName } from "@calcom/features/form-builder/utils"; import type { GetSubscriberOptions } from "@calcom/features/webhooks/lib/getWebhooks"; import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks"; +import { cancelScheduledJobs, scheduleTrigger } from "@calcom/features/webhooks/lib/scheduleTrigger"; import { isPrismaObjOrUndefined, parseRecurringEvent } from "@calcom/lib"; import { getVideoCallUrlFromCalEvent } from "@calcom/lib/CalEventParser"; import { checkRateLimitAndThrowError } from "@calcom/lib/checkRateLimitAndThrowError"; diff --git a/packages/features/webhooks/lib/cron.ts b/packages/features/webhooks/lib/cron.ts new file mode 100644 index 0000000000..089ddbbc5a --- /dev/null +++ b/packages/features/webhooks/lib/cron.ts @@ -0,0 +1,96 @@ +/* Cron job for scheduled webhook events triggers */ +import type { NextApiRequest, NextApiResponse } from "next"; + +import dayjs from "@calcom/dayjs"; +import { defaultHandler } from "@calcom/lib/server"; +import prisma from "@calcom/prisma"; + +async function handler(req: NextApiRequest, res: NextApiResponse) { + const apiKey = req.headers.authorization || req.query.apiKey; + if (process.env.CRON_API_KEY !== apiKey) { + res.status(401).json({ message: "Not authenticated" }); + return; + } + + // get jobs that should be run + const jobsToRun = await prisma.webhookScheduledTriggers.findMany({ + where: { + startAfter: { + lte: dayjs().toISOString(), + }, + }, + }); + + // run jobs + for (const job of jobsToRun) { + try { + await fetch(job.subscriberUrl, { + method: "POST", + body: job.payload, + }); + } catch (error) { + console.log(`Error running webhook trigger (retry count: ${job.retryCount}): ${error}`); + + // if job fails, retry again for 5 times. + if (job.retryCount < 5) { + await prisma.webhookScheduledTriggers.update({ + where: { + id: job.id, + }, + data: { + retryCount: { + increment: 1, + }, + startAfter: dayjs() + .add(5 * (job.retryCount + 1), "minutes") + .toISOString(), + }, + }); + return res.json({ ok: false }); + } + } + + const parsedJobPayload = JSON.parse(job.payload) as { + id: number; // booking id + endTime: string; + scheduledJobs: string[]; + triggerEvent: string; + }; + + // clean finished job + await prisma.webhookScheduledTriggers.delete({ + where: { + id: job.id, + }, + }); + + const booking = await prisma.booking.findUnique({ + where: { id: parsedJobPayload.id }, + select: { id: true, scheduledJobs: true }, + }); + if (!booking) { + console.log("Error finding booking in webhook trigger:", parsedJobPayload); + return res.json({ ok: false }); + } + + //remove scheduled job from bookings once triggered + const updatedScheduledJobs = booking.scheduledJobs.filter((scheduledJob) => { + return scheduledJob !== job.jobName; + }); + + await prisma.booking.update({ + where: { + id: booking.id, + }, + data: { + scheduledJobs: updatedScheduledJobs, + }, + }); + } + + res.json({ ok: true }); +} + +export default defaultHandler({ + POST: Promise.resolve({ default: handler }), +}); diff --git a/packages/app-store/zapier/lib/nodeScheduler.ts b/packages/features/webhooks/lib/scheduleTrigger.ts similarity index 55% rename from packages/app-store/zapier/lib/nodeScheduler.ts rename to packages/features/webhooks/lib/scheduleTrigger.ts index e6f7b16436..8b610d55a7 100644 --- a/packages/app-store/zapier/lib/nodeScheduler.ts +++ b/packages/features/webhooks/lib/scheduleTrigger.ts @@ -1,5 +1,3 @@ -import schedule from "node-schedule"; - import prisma from "@calcom/prisma"; import { WebhookTriggerEvents } from "@calcom/prisma/enums"; @@ -9,45 +7,32 @@ export async function scheduleTrigger( subscriber: { id: string; appId: string | null } ) { try { - //schedule job to call subscriber url at the end of meeting - // FIXME: in-process scheduling - job will vanish on server crash / restart - const job = schedule.scheduleJob( - `${subscriber.appId}_${subscriber.id}`, - booking.endTime, - async function () { - const body = JSON.stringify({ triggerEvent: WebhookTriggerEvents.MEETING_ENDED, ...booking }); - await fetch(subscriberUrl, { - method: "POST", - body, - }); + const payload = JSON.stringify({ triggerEvent: WebhookTriggerEvents.MEETING_ENDED, ...booking }); + const jobName = `${subscriber.appId}_${subscriber.id}`; - //remove scheduled job from bookings once triggered - const updatedScheduledJobs = booking.scheduledJobs.filter((scheduledJob) => { - return scheduledJob !== `${subscriber.appId}_${subscriber.id}`; - }); - - await prisma.booking.update({ - where: { - id: booking.id, - }, - data: { - scheduledJobs: updatedScheduledJobs, - }, - }); - } - ); + // add scheduled job to database + const createTrigger = prisma.webhookScheduledTriggers.create({ + data: { + jobName, + payload, + startAfter: booking.endTime, + subscriberUrl, + }, + }); //add scheduled job name to booking - await prisma.booking.update({ + const updateBooking = prisma.booking.update({ where: { id: booking.id, }, data: { scheduledJobs: { - push: job.name, + push: jobName, }, }, }); + + await prisma.$transaction([createTrigger, updateBooking]); } catch (error) { console.error("Error cancelling scheduled jobs", error); } @@ -64,16 +49,20 @@ export async function cancelScheduledJobs( const promises = booking.scheduledJobs.map(async (scheduledJob) => { if (appId) { if (scheduledJob.startsWith(appId)) { - if (schedule.scheduledJobs[scheduledJob]) { - schedule.scheduledJobs[scheduledJob].cancel(); - } + await prisma.webhookScheduledTriggers.deleteMany({ + where: { + jobName: scheduledJob, + }, + }); scheduledJobs = scheduledJobs?.filter((job) => scheduledJob !== job) || []; } } else { //if no specific appId given, delete all scheduled jobs of booking - if (schedule.scheduledJobs[scheduledJob]) { - schedule.scheduledJobs[scheduledJob].cancel(); - } + await prisma.webhookScheduledTriggers.deleteMany({ + where: { + jobName: scheduledJob, + }, + }); scheduledJobs = []; } diff --git a/packages/prisma/migrations/20230815080823_zapier_scheduled_triggers/migration.sql b/packages/prisma/migrations/20230815080823_zapier_scheduled_triggers/migration.sql new file mode 100644 index 0000000000..358ebaf242 --- /dev/null +++ b/packages/prisma/migrations/20230815080823_zapier_scheduled_triggers/migration.sql @@ -0,0 +1,12 @@ +-- CreateTable +CREATE TABLE "WebhookScheduledTriggers" ( + "id" SERIAL NOT NULL, + "jobName" TEXT NOT NULL, + "subscriberUrl" TEXT NOT NULL, + "payload" TEXT NOT NULL, + "startAfter" TIMESTAMP(3) NOT NULL, + "retryCount" INTEGER NOT NULL DEFAULT 0, + "createdAt" TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "WebhookScheduledTriggers_pkey" PRIMARY KEY ("id") +); diff --git a/packages/prisma/schema.prisma b/packages/prisma/schema.prisma index ec45c33d0c..5e066435d7 100644 --- a/packages/prisma/schema.prisma +++ b/packages/prisma/schema.prisma @@ -807,6 +807,16 @@ model WorkflowReminder { @@index([seatReferenceId]) } +model WebhookScheduledTriggers { + id Int @id @default(autoincrement()) + jobName String + subscriberUrl String + payload String + startAfter DateTime + retryCount Int @default(0) + createdAt DateTime? @default(now()) +} + enum WorkflowTemplates { REMINDER CUSTOM diff --git a/packages/trpc/server/routers/loggedInViewer/deleteCredential.handler.ts b/packages/trpc/server/routers/loggedInViewer/deleteCredential.handler.ts index 4d676f9bfb..716f3cf843 100644 --- a/packages/trpc/server/routers/loggedInViewer/deleteCredential.handler.ts +++ b/packages/trpc/server/routers/loggedInViewer/deleteCredential.handler.ts @@ -1,10 +1,10 @@ import z from "zod"; import { getCalendar } from "@calcom/app-store/_utils/getCalendar"; -import { cancelScheduledJobs } from "@calcom/app-store/zapier/lib/nodeScheduler"; import { DailyLocationType } from "@calcom/core/location"; import { sendCancelledEmails } from "@calcom/emails"; import { getCalEventResponses } from "@calcom/features/bookings/lib/getCalEventResponses"; +import { cancelScheduledJobs } from "@calcom/features/webhooks/lib/scheduleTrigger"; import { isPrismaObjOrUndefined, parseRecurringEvent } from "@calcom/lib"; import getPaymentAppData from "@calcom/lib/getPaymentAppData"; import { deletePayment } from "@calcom/lib/payment/deletePayment"; diff --git a/packages/trpc/server/routers/viewer/bookings/requestReschedule.handler.ts b/packages/trpc/server/routers/viewer/bookings/requestReschedule.handler.ts index 324ed23057..096f229200 100644 --- a/packages/trpc/server/routers/viewer/bookings/requestReschedule.handler.ts +++ b/packages/trpc/server/routers/viewer/bookings/requestReschedule.handler.ts @@ -2,7 +2,6 @@ import type { BookingReference, EventType } from "@prisma/client"; import type { TFunction } from "next-i18next"; import { getCalendar } from "@calcom/app-store/_utils/getCalendar"; -import { cancelScheduledJobs } from "@calcom/app-store/zapier/lib/nodeScheduler"; import { CalendarEventBuilder } from "@calcom/core/builders/CalendarEvent/builder"; import { CalendarEventDirector } from "@calcom/core/builders/CalendarEvent/director"; import { deleteMeeting } from "@calcom/core/videoClient"; @@ -13,6 +12,7 @@ import { deleteScheduledWhatsappReminder } from "@calcom/ee/workflows/lib/remind import { sendRequestRescheduleEmail } from "@calcom/emails"; import { getCalEventResponses } from "@calcom/features/bookings/lib/getCalEventResponses"; import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks"; +import { cancelScheduledJobs } from "@calcom/features/webhooks/lib/scheduleTrigger"; import sendPayload from "@calcom/features/webhooks/lib/sendPayload"; import { isPrismaObjOrUndefined } from "@calcom/lib"; import { getTeamIdFromEventType } from "@calcom/lib/getTeamIdFromEventType";