Connectome-based predictive modeling (CPM) with Python using Ray for parallel processing
master @ 75f720e

Workflow Type: Python
Work-in-progress

Installation

Other than cloning this repository, you need to have bash installed (which is most likely the case if you use Linux, *BSD or even MacOS). For the Python code, the arguably easiest and cleanest way is to set up a Python virtual environment and install the dependencies there:

$ python3 -m venv ./hcp-suite-venv # Setup the virtual environment
$ source ./hcp-suite-venv/bin/activate # Activate the virtual environment
$ pip install pandas pingouin networkx nilearn nibabel ray # Install dependencies within the virtual environment
$ pip install ipython jupyterlab # Install interactive Python shells to run hcp-suite code in

Usage: CPM tutorial

The following tutorial uses the gambling task as an example, the variable to be predicted is BMI. Differing commands for resting-state fMRI are provided for the corresponding steps.

Overview

Python code in this repository is written to be used in an interactive Python shell. Either Jupyter Lab or Jupyer Notebook is recommended as plots and images are conveniently displayed in-line but any Python shell (e.g. iPython) should work.

Generally speaking, the procedure is as follows:

  1. Downloading data of subjects to be included in the analysis
  2. Parcellation of CIFTI files with task.sh prepare (at the code's current state, the HCP's folder structure is assumed) 3a. Extraction of task-based fMRI time series based on EV files provided by the HCP with get_ev_timeseries() from hcpsuite.py 3b.
  3. Computing of correlation matrices via compute_correlations() from hcpsuite.py
  4. Performing of CPM, the functions are provided by cpm_ray.py
  5. Establishing of statistical significance by way of permutation testing (also cpm_ray.py)

The following code snippets are to be run in an interactive Python shell (e.g. the "Console" of Jupyter Lab):

Downloading data

We will use Amazon Web Services to download HCP data. Set up access to HCP data via Amazon Web Services by following their documentation. You should be provided with an AWS Access Key ID and Secret Access Key we are going to put into Python variables (in quotation marks) for easy access:

aws_access_key_id="Replace with your access key ID"
aws_secret_access_key="Replace with your secret access key"

# Also, specify a location you want to download files to. We will use this variable repeatedly.
data_dir = "./HCP_1200"

We now need a list of subject IDs to be included in the analysis. Save them in a simple text file with one line per subject ID. For the rest of this tutorial, we will use the gambling task as an example (the list of IDs will be called gambling_subjects.ids). As preprocessing for resting-state data differs, we will provide resting-state specific instructions when needed (the list of IDs will be called rest_subjects.ids).

import sys

sys.path.append("../hcp-suite/lib") # This assumes the hcp-suite directory is in the current working directory's parent directory. You can also use absolute paths.
from hcpsuite import *
from cpm_ray import *

subj_ids = load_list_from_file("./gambling_subjects.ids") # Change the path to gambling_subjects.ids if it resides elsewhere

# Check the number of subjects
len(subj_ids)

Next, we will use the function download_hcp_files() to download all needed files from the HCP's AWS storage. This function takes the following arguments in this order: List of subject IDs, download location, task name, AWS access key ID, and AWS secret access key. It will return a list of missing subjects we capture in the variable missing_subjs:

missing_subjs = download_hcp_files(subj_ids, data_dir, "gambling", aws_access_key_id, aws_secret_access_key)

The previous command will print a list of missing subjects (if any) since not all data from all subjects is available on AWS, unfortunately. You can either obtain data from other sources or remove the missing subjects from our list of subject IDs:

for missing_subj in missing_subjs:
    subj_ids.remove(missing_subj)

Parcellation

Parcellation involves combining voxels of the "raw" CIFTI files into parcels as specified by the combined cortical, subcortical, and cerebellar parcellation we dubbed "RenTianGlasser" after the authors of the individual parcellations.

Task-based fMRI
parcellate_ciftis(subj_ids=subj_ids,
                  parcellation_fname="./hcp-suite/data/parcellations/RenTianGlasser.dlabel.nii",
                  task="gambling",
                  data_dir=data_dir) # If you have installed Connectome Workbench in a non-standard way,
                                     # so that 'wb_command' is not in your $PATH, add
                                     # 'wb_command="/replace/with/path/to/wb_command"'
Resting-state fMRI

As resting-state fMRI data was collected in two separate sessions called REST1 and REST2, we need to parcellate twice:

# First for REST1

parcellate_ciftis(subj_ids=subj_ids, 
                  parcellation_fname="./hcp-suite/data/parcellations/RenTianGlasser.dlabel.nii", 
                  task="REST1", 
                  data_dir=data_dir) # If you have installed Connectome Workbench in a non-standard way, 
                                     # so that 'wb_command' is not in your $PATH, add 
                                     # 'wb_command="/replace/with/path/to/wb_command"'

# Now for REST2
parcellate_ciftis(subj_ids=subj_ids, 
                  parcellation_fname="./hcp-suite/data/parcellations/RenTianGlasser.dlabel.nii", 
                  task="REST2", 
                  data_dir=data_dir) # If you have installed Connectome Workbench in a non-standard way, 
                                     # so that 'wb_command' is not in your $PATH, add 
                                     # 'wb_command="/replace/with/path/to/wb_command"'											 

Extraction of time series

Task-based fMRI
ev_data_dict = get_ev_timeseries(subj_ids, ["win.txt"], task="gambling",
                                 runs=('LR', 'RL'),
                                 parcellation="RenTianGlasser",
                                 data_dir=data_dir) # If you have installed Connectome Workbench in a non-standard way,
                                                    # so that 'wb_command' is not in your $PATH, add
                                                    # 'wb_command="/replace/with/path/to/wb_command"'

# Now, we save the extraced time series as text files in a directory of our choice
# (in this case: ./GAMBLING_win)
save_data_dict(ev_data_dict, path="./GAMBLING_win")
Resting-state fMRI

Extraction of time series for resting-state fMRI is less complicated via the get_rest_timeseries function:

ts_dict = get_rest_timeseries(subj_ids, data_dir)

# Save time series files in directory "REST"
save_data_dict(ts_dict, path="./REST")

Computation of correlation matrices

We continue in our Python shell to compute correlation matrices:

# We load the saved time series from the previous step
cd GAMBLING_win # save_data_dict() writes file names into file "ts_files" but without paths,
                # thus the easiest way is to change into the directory containing the files
time_series, time_series_files = load_time_series("./ts_files") # ... and read them from there

correlation_measure, correlation_matrices = compute_correlations(time_series, kind='tangent')
# Tangent in our experience provides the best results, but there are alternatives:
# https://nilearn.github.io/dev/modules/generated/nilearn.connectome.ConnectivityMeasure.html

# We then save the matrices into a single file in the NIfTI format for downstream processing
save_matrix(cifti_dim_to_nifti_dim(correlation_matrices), "./GAMBLING_win-tangent.nii.gz")

CPM

For actual CPM, we need to install and run Ray (run this e.g. in your Python virtual environment as described in Installation):

$ pip install ray
$ ray start --head --num-cpus=16 # Run this on your main node.
                                 # Processes take up a lot of RAM, be careful not to use too many CPUs

Optionally add more Ray nodes to form a cluster, see the Ray documentation for details.

Merging behavioral/biometric data in Python

For our analysis, we need values from both the unrestricted and restricted HCP data, which are available as separate CSV files. For easier handling, we merge them into a single CSV file:

import pandas as pd
unrestricted = pd.read_csv("/path/to/unrestricted.csv")
restricted = pd.read_csv("/path/to/restricted.csv")
merged = pd.merge(restricted, unrestricted, on="Subject")
merged.to_csv("./merged.csv") # Save the merged DataFrame as a CSV file in the current directory
Loading and preparing data in Python
behav = 'BMI' # Which variable do we want to predict?
covars = ['Age_in_Yrs', 'Gender' ] # What variables do we want to correct for?

behav_data = get_behav_data("./path/to/merged.csv", ids) # Loading of behavioral and biometrical data as a Pandas DataFrame from a CSV file
# We need to binarize gender for our analysis
behav_data["Gender"] = behav_data["Gender"].replace("F", 0)
behav_data["Gender"] = behav_data["Gender"].replace("M", 1)

functional_data = convert_matrices_to_dataframe(nifti_dim_to_cifti_dim(get_nimg_data("./path/to/GAMBLING_win-tangent.nii.gz")), subj_ids) # Loading of correlation matrices as Pandas DataFrame
Starting the Ray handler

ray_handler() is a Python class through which data management and the starting and coordination of Ray Actors (i.e. the processes working in parallel) is being handled.

n_folds = 128 # In this example, we use 128 folds, which is a good starting point
n_perm = 1000 # How many permutations are we planning to do later on?

ray_handler = RayHandler(
    functional_data.copy(),
    behav_data.copy(),
    behav,
    covars,
    address="auto", # We assume that the main Ray process runs on the same host
    n_perm=n_perm,
) # You can safely ignore the PerformanceWarning messages

ray_handler.add_kfold_indices(n_folds, clean=True) # By setting "clean" to True, we remove twins from the fold so they don't predict each other
ray_handler.upload_data() # Functional and behavioral data are uploaded into the shared storage to which Ray Actors have access
Starting the analysis

First we define the jobs for the actors; a job is a Python list object consisting of the following items: "job type", "fold number", "permutation number". The permutation number for the actual, i.e. unpermutated, data is "-1".

job_list = [["fselection_and_prediction", fold, perm] for perm in [-1] for fold in range(n_folds)] # This is the job list without permutations

# If we wanted to also compute the permutations (which takes a very long time), the job list can be created as follows:
#job_list = [["fselection_and_prediction", fold, perm] for perm in [-1, *range(n_perm)] for fold in range(n_folds)]

ray_handler.start_actors(job_list) # Start computing
Monitoring progress and retrieving results
n_done, n_remaining, n_held = ray_handler.status() # Prints a status report (see screenshot)

results = ray_handler.get_results(n=100) # Retrieving a huge number of results (e.g. when performing permutation analysis)
                                         # and especially from distributed Ray actors can take a long time. Specifying the
                                         # number of results (e.g. n=100) to be retrieved from the results store at once
                                         # allows for a display of progress

# Optional: Save results into a file (NumPy format)
np.save("./GAMBLING_win-results.npy", results)

# This file can be used to restore results
results = np.load("./GAMBLING_win-results.npy", allow_pickle=True).item()

You might consider fetching results and saving them periodically with a simple loop:

sleep_time = 1200 # Sleep for 20 minutes and then rerun loop
n_remaining = 1 # Set to something > 0 to get the loop started
results_path = get_safe_path("./GAMBLING_win-results", ".npy")
while n_remaining > 0: # Run until no jobs to be fetched are remaining
    n_done, n_remaining, n_held = ray_handler.status()
    if n_held > 0:
        results = ray_handler.get_results(n=100)
        # BEWARE: the file in results_path will be overwritten without asking
        # but we have used get_safe_path for risk mitigation
        print("\nSaving results to {}...".format(results_path), end='', flush=True)
        np.save(results_path, results)
        print(" done.")
    else:
        print("\nNo results to fetch and save.")
    print("Sleeping for {} seconds...".format(sleep_time))
    sleep(sleep_time) # Will sleep for sleep_time seconds

Check for completion

Rarely single jobs or actors die before completion. Having run your analyses, you can check your results for completion as follows and rerun analyses as needed (the function check_for_completion will advise you on how do this):

incomplete_jobs = check_for_completion(results)

Presenting results

To plot observed values against values as predicted by the GLM, use plot_predictions():

perm = -1 # Permutation -1 selects unpermutated results
g = plot_predictions(results['prediction_results'][perm], tail="glm", color="green")

To plot permutation results, use plot_permutation_results(), which will take either a list of paths to saved results files (f.i. when you want to combine multiple permutation runs) or the prediction results dictionary, which we will use in the following example:

# plot_permutation_results() will automatically remove incomplete permutations
# and return/print basic descriptive statistics (minimum and maximum r value, total
# number of permutations, and the minimum p value to be achieved with these permutations

plot, min_value, max_value, count, min_p = plot_permutation_results(results['prediction_results'])

For more presentation and plotting examples including static and interactive plots of predictive networks, see save.py in folder utils.

Overlap of predictive networks

cpm_ray.py has a function get_overlap() to create overlap networks of different predictive networks. For example, if you have two different CPM results as generated by results = ray_handler.get_results() and saved in two files results_A.npy and results_B.npy, you can do the following:

# Load results into Python
results_A = np.load("/path/to/results_A.npy", allow_pickle=True).item()
results_B = np.load("/path/to/results_B.npy", allow_pickle=True).item()

# Load coordinates of parcels for plotting
coords = np.loadtxt("/path/to/hcp-suite/data/parcellations/RenTianGlasser.coords")

# Use overlap() function, which takes as its main input a list of results, in this case
# [results_A, results_B]. You can specify a result as the odd one out (e.g. odd_one_out=0
# for results_A), which means that result_A's tails will be switched, so that the positive
# tail result A will be overlapped with the negative tail of result B and vice versa.
#
# overlap() returns a bunch of dictionaries with the tail (positive and negative predictive
# networks)  as a primary key and usually several subkeys. These are more or less
# self-explanatory.

overlap, degrees_sorted, top_n_edges, plot_dict, m_cons = get_overlap([results_A, results_B], odd_one_out=None, coords=coords, plot_top_nodes=True, top_n_edges=50)

Version History

master @ 75f720e (earliest) Created 2nd Jan 2025 at 17:54 by Tobias Bachmann

Fixed p-val_fdr column


Frozen master 75f720e
help Creators and Submitter
Creator
Submitter
Citation
Bachmann, T. (2025). Connectome-based predictive modeling (CPM) with Python using Ray for parallel processing. WorkflowHub. https://doi.org/10.48546/WORKFLOWHUB.WORKFLOW.1234.1
Activity

Views: 133   Downloads: 37

Created: 2nd Jan 2025 at 17:54

help Attributions

None

Total size: 12.1 MB
Powered by
(v.1.16.0-main)
Copyright © 2008 - 2024 The University of Manchester and HITS gGmbH