Ingestion Pipeline

Customers who leverage large quantity of data in order to execute their business decisions may pull Rutter data and store it themselves on their own cloud services (database on AWS, GCP, Azure, etc). This workflow enables them to run complex calculations and analyses at a larger scale much more effectively.

Requirements

Database

Given that the commerce and accounting models include multiple relations that you might want to take advantage of in your queries, we’d recommend using a relational database like MySQL or Postgres.

Be ready for large amounts of data from some connections. We’ve seen some merchants with over a million orders in a year. This can lead tables to fill up quickly, but so far, no customers have needed to partition a table. Most of our customers have not needed more than 4TB of database storage. Some use much less than that (<1TB)

Server

In order to fetch Rutter data, most of our customers will rely on a queue and worker system. When a new connection has its data ready, you can add the connection to a queue (such as Amazon SQS or RabbitMQ), and then workers running on any basic server can ingest messages from the queue and start the data fetching process.

This worker and queue system also helps with any incremental updates you want to do.

Fetching Rutter Data

Rutter allows customers fetch data quickly through query parameters created_at_min and created_at_max that is set as UNIX timestamp in milliseconds. Customers utilize a sliding window for the date range (i.e. fetch the previous day’s data daily) to ensure all the data is getting fetched consistently. Depending on the urgency of the data, you may parallelize your requests by fetching multiple date ranges simultaneously to expedite the process. For example, you can fragment a day’s worth of data into three parts (00:00-7:59, 8:00-15:59, 16:00-23:59).

Below is an example Python script that pulls all orders on May 25th, 2022:

import json
import requests
import base64

# client_id and client_secret can be found at https://dashboard.rutterapi.com/dashboard
CLIENT_ID = ''
CLIENT_SECRET = ''
# access token can be found at https://dashboard.rutterapi.com/connections
ACCESS_TOKEN = ''

url = 'https://production.rutterapi.com/orders?access_token=' + ACCESS_TOKEN + '&created_at_min=1653436800000&created_at_max=1653523199000'

headers = {
    'Accept': 'application/json',
    'Authorization': 'Basic %s' % base64.b64encode(str(CLIENT_ID + ':' + CLIENT_SECRET).encode()).decode()
}


def main():
    print('Fetching data...')
    filtered_orders = []
    response = json.loads(requests.get(url, headers=headers).text)
    orders = response['orders']
    for order in orders:
        filtered_orders.append(order)
    next_page = response['next_cursor']

    while next_page:
        new_response = json.loads(requests.get((url + '&cursor=' + next_page), headers=headers).text)
        new_orders = new_response['orders']
        for order in new_orders:
            filtered_orders.append(order)

        next_page = new_response['next_cursor']

    return filtered_orders


if __name__ == "__main__":
    main()

Storing Rutter Data

We’ve seen some customers break up Rutter’s data schema into two parts: required data (data used in your underwriting model) and non-required data (data you don’t use right now but might want to store). Required data can be enforced as part of a table schema. Non-required data can be placed in a JSON or JSONB column. This makes it so that any changes to non-required data don’t break your data ingestion process.

Additionally, you may use an input validation library (e.g. Zod for Typescript). Before trying to add Rutter’s data to your database, you can run it through the input validation function. If an error is thrown, you can pipe these errors into Sentry or your preferred error logging system. By adding a monitor to these errors, you can alert your developer team immediately when Rutter’s schema has changed, and developers can then migrate tables.

Updating Rutter Data

A common strategy that’s strongly recommended by Rutter is to use polling to incrementally update their connections. Assuming you’re using a queue and worker system, this can achieved by creating a cron job that runs at a regular interval (based on your company’s needs for data freshness). The cron job can be as simple as querying your database for all live connections and enqueuing an incremental update job for each live connection. Then, your workers can fetch all new data after the last update time using the update filters (min_updated_at) that Rutter provides.

Customers may also leverage Rutter’s webhooks to be notified of when the data can be fetched and when there are changes to their data. Please note that this adds a dependency on sending and receiving webhooks.