Integration framework
This is a guide to help you get familiar with out integration framework if you want to understand how it works or if you're going to implement a new integration into our framework.
Terminology
- Integration: crowd.dev-supported integration that a user can connect and configure and in turn our integration framework can process and should result in
activities
andmembers
being created. - Integration Service": an implementation of
IntegrationServiceBase
abstract class that ties into our integration framework. - Integration Stream: Our integration service when a check/onboarding is triggered starts processing an integration and it does so by preparing
streams
that can be processed one by one. Examples of a single stream:- Slack channel ID that we can use to call Slack API to get all members and messages from.
- Discord thread ID that we can Discord API to get all messages in a single thread
- Dev.to article ID that we need to check for comments through Dev.to API
Important files and processes within the integration framework
The main part of the framework is IntegrationProcessor
.
This file ties all the integration framework parts into one process by abstracting away internal processes that someone who is just creating a new integration doesn't need to know.
There are two ways that integration processing gets triggered:
-
A user connects an integration and it triggers a
NodeWorkerIntegrationProcessMessage
SQS message to thenodejs-worker
that in turn callsIntegrationProcessor.process
method. This way it will be processed in anonboarding
mode which means that it will try to process the whole integration and won't for example check only the last 1 day of messages onSlack
integration. -
job-generator
triggers atick
every minute and it callsIntegrationProcessor.processTick
method to check which integration to run withoutonboarding
mode. If an integration has a configuration that reads20 ticks between checks
it means that every 20 minutes the framework will triggerIntegrationProcessor.processCheck
method to see if there are any integrations in the database with that type and it will triggerNodeWorkerIntegrationProcessMessage
SQS message to thenodejs-worker
that in turn callsIntegrationProcessor.process
method but withoutonboarding
set to true.
Whichever way the integration processing gets triggered it always ends up calling IntegrationProcessor.process
. This method checks if integration type is supported by checking if integrationServices
array contains such integration service that supports the type.
All supported integrations are defined in IntegrationProcessor
constructor like so:
this.integrationServices = [
new DevtoIntegrationService(),
new DiscordIntegrationService(),
new TwitterIntegrationService(),
new TwitterReachIntegrationService(),
new SlackIntegrationService(),
new GithubIntegrationService(),
]
And if you implement a new integration service you also have to add it here if you want integration processor to pick it up.
All these integration services implement IntegrationServiceBase
abstract class.
This class requires you to call it's constructor and providing it with the integration type that the service will support (this should be a new type) and also how many number of ticks has to pass for the integration to be checked. If number of ticks is negative it will never be checked and will only be triggered when we are doing onboarding (ie. user connects the integration manually). If number of ticks is 0 it will be checked every tick.
There are also a few other parameters that can be set inside IntegrationServiceBase
:
globalLimit
&limitResetFrequencySeconds
: how many records should we process within a certain time frame before we stop. This is useful if you want to tell theIntegrationProcessor
that we have a global limit for an integration where we can for example execute only 1.000.000 API calls per day and we need to stop after and wait for another day.onboardingLimitModifierFactor
- if onboarding we can perhaps allow more data to be process then just when we are doing checks and we multiplyglobalLimit
by this factor to do so. For example: if you are doing onboarding and you have a rate limit of 1000 requests/minut and if you surpase it you have to pay extra you can perhaps allow that just for onboarding and set this factor to 2 - this means thatglobalLimit
will become 2000 instead of 1000 as initially set.
IntegrationServiceBase
abstract class forces you to implement a few methods and it gives you an option to implement a few more if needed. Here are all possible methods that you can implement in order of how they are executed by IntegrationProcessor.process
method:
-
[Optional]
preprocess
: this method gets called once every timeIntegrationProcessor.process
starts. It should load all the data that will be required to process the whole integration and all it's streams. It should set the data insidecontext.pipelineData
property. -
[Optional]
createMemberAttributes
: this method gets called only if member attributes weren't already created for tenant that we are currently processing integration for. It should create all the required and custom member attributes that the integration supports. -
[Required]
getStreams
: this method only gets called if the framework didn't try to delay/restart streams from the previous integration run (this can happen in case a certain stream processing fails and we need to retry it a couple of times or if we hit a rate limit and we need to wait for a bit before continuing with stream processing. The method should return all the streams that we should process within this integration processing run. It should be mentioned here that while processing a single stream we can also generate more streams and include it in this integration processing. For example, if we have a stream where the endpoint results paginated response we can generate a new stream that points to the next page and return it to the framework and it will get processed later. -
[Required]
processStream
: this method gets called to process each individual stream. The expected result is interfaced using this:export interface IProcessStreamResults { // result of stream processing are operations that have to be done after operations: IStreamResultOperation[] // which was the last record that was processed in the current stream lastRecord?: any // last record timestamp in the current stream lastRecordTimestamp?: number // if processing of the current stream results in new streams they should be returned here newStreams?: IPendingStream[] // if processing of the current stream results in the next page of the same stream it should be returned here nextPageStream?: IPendingStream // seconds to pause between continuing with integration processing for the remaining streams sleep?: number }
And with this you can return an instruction to bulk insert new members and activities by providing relevant
operations
. You can also provide the framework withnewStreams
to process and if this current stream is paginated and you have detected a new page you should return it as a stream innextPageStream
property. If you returnsleep
property with a positive integer the framework will pase the integration processing, process alloperations
and restart the processing after enough seconds pass. -
[Optional]
isProcessingFinished
: This method get's called if the previous stream we processed returnednextPageStream
. Usually it's used to prevent processing of all pages that are available by limiting yourself to data from the last day for example. -
[Optional]:
postprocess
: This method get's called once at the end ofIntegrationProcessor.process
and is used to modify integration data or settings in the database if needed. For example you can cache small amounts of data in thecontext.integration.settings
property but be careful because it also contains the settings user provided when connecting the integration in the first place.
We suggest you check out all other integration services that are already implemented here. This will give you a practical view of how you are supposed to implement a new integration. Currently the simplest integration is devtoIntegrationService
and the most difficult one is twitterIntegrationService
mainly because of rate limits and other limitations.
Database tables related to the integration processing
integrations
: Here we track all integrations that a tenant has connected with all its settingsintegrationRuns
: In this table, we track the processing of an integration - here you will find data about individual runs like when it started and how it ended (state
,processedAt
anderror
columns show the results).integrationStreams
: In this table, we track streams being processed by an individual integration run. Here you can see which stream is currently being processed, which were successfully processed, and which failed with an error.
Useful scripts
We have a few scripts that you can use while you debug/implement an integration service.
In the backend folder you can run:
npm run script:process-integration
- Here you specify ID of the integration from theintegrations
database table and it will start a new integration processing run for you. If you run the command as is it will print out help text. Example usage on your local environment:
cd backend
source source-local.sh && npm run script:process-integration -- -o -i <some-id>
# or if you want to start multiple integration runs
source source-local.sh && npm run script:process-integration -- -o -i <some-id>,<some-other-id>,<third-id>
npm run script:continue-run
- Here you specify integration run ID fromintegrationRuns
table that was somehow stuck and you want it to start processing again the pending/errored streams.
Example usage on your local environment:
cd backend
source source-local.sh && npm run script:continue-run -- -r <some-id>
Updated 2 months ago