Integration framework

This is a guide to help you get familiar with out integration framework if you want to understand how it works or if you're going to implement a new integration into our framework.

Terminology

  • Integration: crowd.dev-supported integration that a user can connect and configure and in turn our integration framework can process and should result in activities and members being created.
  • Integration Service": an implementation of IntegrationServiceBase abstract class that ties into our integration framework.
  • Integration Stream: Our integration service when a check/onboarding is triggered starts processing an integration and it does so by preparing streams that can be processed one by one. Examples of a single stream:
    • Slack channel ID that we can use to call Slack API to get all members and messages from.
    • Discord thread ID that we can Discord API to get all messages in a single thread
    • Dev.to article ID that we need to check for comments through Dev.to API

Important files and processes within the integration framework

The main part of the framework is IntegrationProcessor.

This file ties all the integration framework parts into one process by abstracting away internal processes that someone who is just creating a new integration doesn't need to know.

There are two ways that integration processing gets triggered:

  1. A user connects an integration and it triggers a NodeWorkerIntegrationProcessMessage SQS message to the nodejs-worker that in turn calls IntegrationProcessor.process method. This way it will be processed in an onboarding mode which means that it will try to process the whole integration and won't for example check only the last 1 day of messages on Slack integration.

  2. job-generator triggers a tick every minute and it calls IntegrationProcessor.processTick method to check which integration to run without onboarding mode. If an integration has a configuration that reads 20 ticks between checks it means that every 20 minutes the framework will trigger IntegrationProcessor.processCheck method to see if there are any integrations in the database with that type and it will trigger NodeWorkerIntegrationProcessMessage SQS message to the nodejs-worker that in turn calls IntegrationProcessor.process method but without onboarding set to true.

Whichever way the integration processing gets triggered it always ends up calling IntegrationProcessor.process. This method checks if integration type is supported by checking if integrationServices array contains such integration service that supports the type.

All supported integrations are defined in IntegrationProcessor constructor like so:

    this.integrationServices = [
      new DevtoIntegrationService(),
      new DiscordIntegrationService(),
      new TwitterIntegrationService(),
      new TwitterReachIntegrationService(),
      new SlackIntegrationService(),
      new GithubIntegrationService(),
    ]

And if you implement a new integration service you also have to add it here if you want integration processor to pick it up.

All these integration services implement IntegrationServiceBase abstract class.

This class requires you to call it's constructor and providing it with the integration type that the service will support (this should be a new type) and also how many number of ticks has to pass for the integration to be checked. If number of ticks is negative it will never be checked and will only be triggered when we are doing onboarding (ie. user connects the integration manually). If number of ticks is 0 it will be checked every tick.
There are also a few other parameters that can be set inside IntegrationServiceBase:

  • globalLimit & limitResetFrequencySeconds: how many records should we process within a certain time frame before we stop. This is useful if you want to tell the IntegrationProcessor that we have a global limit for an integration where we can for example execute only 1.000.000 API calls per day and we need to stop after and wait for another day.
  • onboardingLimitModifierFactor - if onboarding we can perhaps allow more data to be process then just when we are doing checks and we multiply globalLimit by this factor to do so. For example: if you are doing onboarding and you have a rate limit of 1000 requests/minut and if you surpase it you have to pay extra you can perhaps allow that just for onboarding and set this factor to 2 - this means that globalLimit will become 2000 instead of 1000 as initially set.

IntegrationServiceBase abstract class forces you to implement a few methods and it gives you an option to implement a few more if needed. Here are all possible methods that you can implement in order of how they are executed by IntegrationProcessor.process method:

  • [Optional] preprocess: this method gets called once every time IntegrationProcessor.process starts. It should load all the data that will be required to process the whole integration and all it's streams. It should set the data inside context.pipelineData property.

  • [Optional] createMemberAttributes: this method gets called only if member attributes weren't already created for tenant that we are currently processing integration for. It should create all the required and custom member attributes that the integration supports.

  • [Required] getStreams: this method only gets called if the framework didn't try to delay/restart streams from the previous integration run (this can happen in case a certain stream processing fails and we need to retry it a couple of times or if we hit a rate limit and we need to wait for a bit before continuing with stream processing. The method should return all the streams that we should process within this integration processing run. It should be mentioned here that while processing a single stream we can also generate more streams and include it in this integration processing. For example, if we have a stream where the endpoint results paginated response we can generate a new stream that points to the next page and return it to the framework and it will get processed later.

  • [Required] processStream: this method gets called to process each individual stream. The expected result is interfaced using this:

    export interface IProcessStreamResults {
      // result of stream processing are operations that have to be done after
      operations: IStreamResultOperation[]
    
      // which was the last record that was processed in the current stream
      lastRecord?: any
    
      // last record timestamp in the current stream
      lastRecordTimestamp?: number
    
      // if processing of the current stream results in new streams they should be returned here
      newStreams?: IPendingStream[]
    
      // if processing of the current stream results in the next page of the same stream it should be returned here
      nextPageStream?: IPendingStream
    
      // seconds to pause between continuing with integration processing for the remaining streams
      sleep?: number
    }
    

    And with this you can return an instruction to bulk insert new members and activities by providing relevant operations. You can also provide the framework with newStreams to process and if this current stream is paginated and you have detected a new page you should return it as a stream in nextPageStream property. If you return sleep property with a positive integer the framework will pase the integration processing, process all operations and restart the processing after enough seconds pass.

  • [Optional] isProcessingFinished: This method get's called if the previous stream we processed returned nextPageStream. Usually it's used to prevent processing of all pages that are available by limiting yourself to data from the last day for example.

  • [Optional]: postprocess: This method get's called once at the end of IntegrationProcessor.process and is used to modify integration data or settings in the database if needed. For example you can cache small amounts of data in the context.integration.settings property but be careful because it also contains the settings user provided when connecting the integration in the first place.

We suggest you check out all other integration services that are already implemented here. This will give you a practical view of how you are supposed to implement a new integration. Currently the simplest integration is devtoIntegrationService and the most difficult one is twitterIntegrationService mainly because of rate limits and other limitations.

Database tables related to the integration processing

  • integrations: Here we track all integrations that a tenant has connected with all its settings
  • integrationRuns: In this table, we track the processing of an integration - here you will find data about individual runs like when it started and how it ended (state, processedAt and error columns show the results).
  • integrationStreams: In this table, we track streams being processed by an individual integration run. Here you can see which stream is currently being processed, which were successfully processed, and which failed with an error.

Useful scripts

We have a few scripts that you can use while you debug/implement an integration service.

In the backend folder you can run:

  • npm run script:process-integration - Here you specify ID of the integration from the integrations database table and it will start a new integration processing run for you. If you run the command as is it will print out help text. Example usage on your local environment:
cd backend
source source-local.sh && npm run script:process-integration -- -o -i <some-id>
# or if you want to start multiple integration runs
source source-local.sh && npm run script:process-integration -- -o -i <some-id>,<some-other-id>,<third-id>
  • npm run script:continue-run - Here you specify integration run ID from integrationRuns table that was somehow stuck and you want it to start processing again the pending/errored streams.
    Example usage on your local environment:
cd backend
source source-local.sh && npm run script:continue-run -- -r <some-id>