API rate limit

Introduction

The API rate limit is a limit to the number of calls you can send to the CGC public API within a predefined time frame. That limit is 1000 requests within 5 minutes. After this limit is reached, no further calls are accepted by the API server, until the 5 minute interval ends.

All rate limit information is returned to the user in the following HTTP headers:

  1. The header X-RateLimit-Limit represents the rate limit - currently this is 1000 requests per five minutes.
  2. The header X-RateLimit-Remaining represents your remaining number of calls before hitting the limit.
  3. The header X-RateLimit-Reset - represents the time in Unix timestamp when the limit will be reset

To learn how you can write optimized code and make the most of the CGC public API regardless of the rate limit, please consult the examples below.

Each of the examples will first illustrate what an un-optimized code can look like and what makes it less ideal given the rate limit as well as a concrete recommendation on how you can optimize it.

Note that these examples assume that you are using the Python client for the CGC public API.

Submitting tasks for execution

There are two methods for starting multiple tasks on CGC:

  1. Submit tasks one by one inside a loop.
  2. Submit a batch task. A batch task can consist of many child tasks and can be created with a single API call, whereas submitting tasks inside a loop requires one API call for each task. Using batch tasks is therefore the recommended approach.

Not optimized for rate limit

In the example below, we iterate over samples and submit tasks one by one (in this case we run Salmon for RNA-seq analysis). This example assumes that we already grouped our input FASTQ files by samples in a dictionary.

for sample_id, fastq_files in samples.items():
     
    inputs = {
        "reads": fastq_files,
        "transcriptome_fasta_or_salmon_index_archive": index_file,
        "gtf": gene_map_file
    }
     
    task = api.tasks.create(
        name="Salmon {}".format(sample_id),
        project=project,
        app=salmon_workflow.id,
        inputs=inputs
    )

Optimized for rate limit

In this example, which is optimized for rate limit, we run Salmon as a batch task for all input files at once using sample_id as batching criterion. This example assumes that file metadata sample_id is already set for all input files.

inputs = {
    "reads": fastq_files,
    "transcriptome_fasta_or_salmon_index_archive": index_file,
    "gtf": gene_map_file
}
 
batch_task = api.tasks.create(
    name="Salmon batch",
    project=project,
    app=salmon_workflow.id,
    inputs=inputs,
    batch_input="reads",
    batch_by={
        "type": "CRITERIA",
        "criteria": ["metadata.sample_id"]
    }
)

Copying files between projects

Instead of copying individual files, which will make one API call per file, we recommend using a bulk API call. This way you can copy up to 100 files with a single API call.

Not optimized for rate limit

Copying individual files requires two API calls for each file: one to find the file by name, and another one to copy it. We recommend using the bulk API call instead.

for name in source_file_names:
    f = api.files.query(project=src_project, names=[name])[0]
    f.copy(project=target_project)

Optimized for rate limit

Using a bulk API call you can copy up to 100 files.

def bulk_copy_files(files_to_copy, target_project):
    "Copies files in batches of size 100"
     
    final_responses = {}
     
    for i in range(0, len(files_to_copy), 100):
         
        files = [f for f in files_to_copy[i:i + 100]]
        responses = api.actions.bulk_copy_files(files, target_project.id)
         
        for fileid, response in responses.items():
            if response['status'] != 'OK':
                raise Exception(
                    "Error copying {}: {}".format(fileid, response)
                )
                 
        final_responses.update(responses)
     
    return final_responses
     
     
files_to_copy = list(
    api.files.query(
        project=src_project,
        names=source_files,
        limit=100
    ).all()
)
 
responses = bulk_copy_files(files_to_copy, target_project)

Importing files from a volume

The CGC API allows you to import files from a volume in bulk rather than one by one. Using the bulk API feature, you can import up to 100 files per call.

Not optimized for rate limit

Importing individual files requires two API calls for each file: one to find the file by name, and another one to import it. We recommend using the bulk API call instead.

for f in files_to_import:
     
    imported_file = api.imports.submit_import(
        volume=volume,
        project=dest_project,
        location='christian_demo_files/' + f,
        overwrite=True
    )

Optimized for rate limit

Using the bulk API feature, you will first query all files that need to be imported and then use one API to import up to 100 files.

def bulk_import_files(file_names, volume, location, project, overwrite=True, chunk_size=100):
    "Imports list of files from volume in bulk"
 
    def is_running(response):
        if not response.valid or response.resource.error:
            raise Exception(
                '\n'.join([
                    str(response.resource.error),
                    response.resource.error.message,
                    response.resource.error.more_info
                ]))
        return response.resource.state not in ["COMPLETED", "FAILED", "ABORTED"]
     
    final_responses = []
     
    # import files in batches of 100 each
    for i in range(0, len(file_names), chunk_size):
         
        # setup list of dictionary with import requests
        imports = [
            {
                'volume': volume,
                'location': location + '/' + fn,
                'project': project,
                'name' : fn, 
                'overwrite': True
            }
            for fn in file_names[i:i + chunk_size]
        ]
 
        # initiate bulk import of batch and wait until finished
        responses = api.imports.bulk_submit(imports)
        while any([is_running(r) for r in responses]):
            time.sleep(10)
            responses = api.imports.bulk_get([r.resource for r in responses])
             
        final_responses.extend(responses)
         
    return final_responses
 
 
responses = bulk_import_files(
    file_names=files_to_import,
    volume=volume,
    location="christian_demo_files",
    project=dest_project
)

Updating file metadata

Metadata for multiple files can be set using a bulk API call instead of one call per file. Setting metadata for the files is typically required before they can be provided as input to a CWL workflow.

In the examples below, we will assume that there is a list of FASTQ files for a specific sample and we want to set both sample_id and paired_end metadata information for all of them.

Not optimized for rate limit

In an example which is not optimized for the rate limit we are iterating over all FASTQ files and setting metadata for each of the files individually.

# set metadata for forward read files
for fastq in forward_reads:
    fastq.metadata['sample_id'] = 'my-sample'
    fastq.metadata['paired_end'] = '1'
    fastq.save()
 
# set metadata for reverse read files
for fastq in reverse_reads:
    fastq.metadata['sample_id'] = 'my-sample'
    fastq.metadata['paired_end'] = '2'
    fastq.save()

Optimized for rate limit

An optimal way to update metadata for multiple files is to use a bulk API call and update metadata for up to 100 files per call.

def bulk_update_metadata(files, metadata, replace=False):
    """Updates metadata for list of files in bulk. Input lists must be ordered
    pairs, i.e. the first element in list 'files' corresponds to first element
    in list 'metadata' etc."""
     
    final_responses = []
     
    # process in batches of 100
    for i in range(0, len(files), 100):
 
        files_chunk = [f for f in files[i:i + 100]]
        md_chunk = [md for md in metadata[i:i + 100]]
 
        # make sure metadata attribute is set for all files before update;
        # avoids lazy fetching of metadata for each file in subsequent loop
        md_missing = any([not f.field('metadata') for f in files_chunk])
        if md_missing:
            files_chunk = api.files.bulk_get(files_chunk)
 
        # set metadata for each file
        for t in zip(files_chunk, md_chunk):
            t[0].metadata = t[1]
 
        # update or replace existing metdata
        if replace :
            responses = api.files.bulk_update(files_chunk)
        else:
            responses = api.files.bulk_edit(files_chunk)
 
        # check for errors
        for r in responses:
            if not r.valid:
                raise Exception(
                    '\n'.join([str(r.error), r.error.message, r.error.more_info])
                )
 
        final_responses.extend(responses)
         
    return final_responses
 
metadata = []
for fastq in forward_reads:
    metadata.append({"sample_id" : "my-sample", "paired_end" : "1"})
for fastq in reverse_reads:
    metadata.append({"sample_id" : "my-sample", "paired_end" : "2"})
   
responses = bulk_update_metadata(forward_reads + reverse_reads, metadata)

Deleting multiple files

This example will show how you can delete multiple files with the API rate limit in mind. The optimal way to delete multiple files is via bulk API call which can delete up to 100 files.

Not optimized for rate limit

Fetch and delete files one by one using a loop.

for fn in source_file_names:
    f = api.files.query(project=src_project, names=[fn])[0]
    f.delete()

Optimized for rate limit

Fetch all files at once and then use a bulk API call to delete them in batches of 100 files or less.

def bulk_delete_files(files_to_delete, chunk_size=100):
    "Deletes files in bulk, 100 files per API call (max)"
     
    final_responses = []
     
    for i in range(0, len(files_to_delete), chunk_size):
         
        files = [f for f in files_to_delete[i:i + chunk_size]]
        responses = api.files.bulk_delete(files)
         
        for idx, r in enumerate(responses):
            if not r.valid:
                raise Exception(
                    '\n'.join([
                        str(r.error) + ": " + r.error.message,
                        r.error.more_info,
                        files[idx].name
                    ]))
         
        final_responses.extend(responses)
         
    return final_responses
 
files_to_delete = list(
    api.files.query(
        project=src_project,
        names=source_file_names,
        limit=100
    ).all())
 
responses = bulk_delete_files(files_to_delete)

Exporting files to a volume

📘

When exporting a file from the CGC to an attached volume, export is possible only to a volume that is in the same location (cloud provider and region) as the project from which the file is being exported.

The goal here is to export files from a CGC project to a volume (cloud bucket). Please note that export to a volume is available only via the API (including API client libraries), and through the Seven Bridges CLI.

Again, CGC bulk API calls should be used to reduce the overall number of API calls. Note that below examples make use of the copy_only export feature, which requires advance_access to be activated when initializing the API.

Not optimized for rate limit

In this example, files are fetched and exported in a loop, one by one.

for name in source_file_names:
     
    f = api.files.query(project=src_project, names=[name])[0]
     
    export = api.exports.submit_export(
        file=f,
        volume=volume,
        location="christian_demo_files/" + f.name,
        overwrite=True,
        copy_only=False
    )

Optimized for rate limit

Fetch and export files in bulk.

def bulk_export_files(files, volume, location, overwrite=True, copy_only=False, chunk_size=100):
    "Exports list of files to volume in bulk"
 
    def is_running(response):
        if not response.valid:
            raise Exception(
                '\n'.join([
                    str(response.error),
                    response.error.message,
                    response.error.more_info
                ]))
        return response.resource.state not in ["COMPLETED", "FAILED", "ABORTED"]
     
    final_responses = []
 
    # export files in batches of 100 files each
    for i in range(0, len(files), chunk_size):
         
        # setup list of dictionary with export requests
        exports = [
            {
                'file': f,
                'volume': volume,
                'location': location + '/' + f.name, 
                'overwrite': overwrite
            }
            for f in files[i:i + chunk_size]
        ]
 
        # initiate bulk export of this batch and wait until finished
        responses = api.exports.bulk_submit(exports, copy_only=copy_only)
        while any([is_running(r) for r in responses]):
            time.sleep(10)
            responses = api.exports.bulk_get([r.resource for r in responses])
             
        final_responses.extend(responses)
         
    return final_responses
 
files_to_export = list(
    api.files.query(
        project=src_project,
        names=source_file_names,
        limit=100
    ).all())
 
responses = bulk_export_files(
    files=files_to_export,
    volume=volume,
    location='christian_demo_files',
    copy_only=False
)

Setting maximum pagination limit in queries

Several API calls allow setting a pagination limit to the number of results that are returned. Changing the default pagination limit (50) to its allowed maximum value (100) cuts the number of required API calls in half when iterating over the entire result set of a query.

Pagination limits can be set for various API calls, but we recommend that you set it for the following queries as they tend to return the largest result sets:

  • api.files.query()
  • api.projects.query()
  • api.tasks.query()
  • task.get_batch_children()

Not optimized for rate limit

Here is an example for a project query that uses the default pagination limit of 50.

for project in api.projects.query().all():
    print(project)

Optimized for rate limit

In the example below, the limit is set to its allowed maximum value of 100.

for project in api.projects.query(limit=100).all():
    print(project)

Finding project by name

Not optimized for rate limit

Iterate over all projects and compare names.

project = [
    p for p in api.projects.query().all()
    if p.name == project_name
][0]

Optimized for rate limit

Use 'name' query parameter in search to restrict results. Query parameter performs partial match, so name comparison is still required to ensure the exact match.

project = [
    p for p in api.projects.query(name=project_name).all()
    if p.name == project_name
][0]