Export a Dataset
In this tutorial, you'll learn how to read and export records.
In [2]:
Copied!
%load_ext autoreload
%autoreload 2
from typing import cast
import os
import dotenv
from tqdm.auto import tqdm
import datamol as dm
import pandas as pd
import zarr
from openff.toolkit import Molecule
import qcelemental as qcel
from qcportal import PortalClient
_ = dotenv.load_dotenv("../../openfractal_test_secrets.env")
%load_ext autoreload
%autoreload 2
from typing import cast
import os
import dotenv
from tqdm.auto import tqdm
import datamol as dm
import pandas as pd
import zarr
from openff.toolkit import Molecule
import qcelemental as qcel
from qcportal import PortalClient
_ = dotenv.load_dotenv("../../openfractal_test_secrets.env")
Initialize the client and list the datasets¶
In [4]:
Copied!
client = PortalClient(
address="https://openfractal-test-pgzbs3yryq-uc.a.run.app",
username=os.environ["OPENFRACTAL_USER_3_USERNAME"],
password=os.environ["OPENFRACTAL_USER_3_PASSWORD"],
)
client
client = PortalClient(
address="https://openfractal-test-pgzbs3yryq-uc.a.run.app",
username=os.environ["OPENFRACTAL_USER_3_USERNAME"],
password=os.environ["OPENFRACTAL_USER_3_PASSWORD"],
)
client
Out[4]:
PortalClient
- Server: openfractal-test
- Address: https://openfractal-test-pgzbs3yryq-uc.a.run.app/
- Username: read_default
Let's list the available dataset.
In [5]:
Copied!
client.list_datasets()
client.list_datasets()
Out[5]:
[{'id': 4, 'dataset_type': 'singlepoint', 'dataset_name': 'dataset_demo_5077749542'}]
Load a dataset given its name and fetch its records¶
In [6]:
Copied!
dataset_name = "dataset_demo_4321690179"
ds = client.get_dataset("singlepoint", dataset_name)
ds
dataset_name = "dataset_demo_4321690179"
ds = client.get_dataset("singlepoint", dataset_name)
ds
Out[6]:
SinglepointDataset(id=4, dataset_type='singlepoint', name='dataset_demo_5077749542', description='my great dataset!', tagline='', tags=['demo_local'], group='default', visibility=True, provenance={}, default_tag='demo_local', default_priority=<PriorityEnum.normal: 1>, owner_user='admin_default', owner_group=None, metadata={}, extras={}, entry_names_=[], specifications_={}, entries_={}, record_map_={}, contributed_values_=None, auto_fetch_missing=True)
Refresh the below often.
In [7]:
Copied!
print(ds.status_table())
print(ds.status_table())
specification complete error -------------------------- ---------- ------- simple_qm_calculation_demo 8 2
In [8]:
Copied!
progress = True
status = None
fetch_error = True
fetch_wfn = True
records_list = []
for spec_name in tqdm(ds.specification_names, disable=not progress):
record_iterator = ds.iterate_records(
specification_names=spec_name,
force_refetch=True,
fetch_updated=True,
status=status,
)
for _, _, record in tqdm(record_iterator, disable=not progress, leave=False):
if fetch_error:
record.error
if fetch_wfn:
record.wavefunction # type: ignore
record_dict = record.dict()
record_dict["specification_name"] = spec_name
records_list.append(record_dict)
records = pd.DataFrame(records_list)
records = records.sort_values("id")
records = records.reset_index(drop=True)
records
progress = True
status = None
fetch_error = True
fetch_wfn = True
records_list = []
for spec_name in tqdm(ds.specification_names, disable=not progress):
record_iterator = ds.iterate_records(
specification_names=spec_name,
force_refetch=True,
fetch_updated=True,
status=status,
)
for _, _, record in tqdm(record_iterator, disable=not progress, leave=False):
if fetch_error:
record.error
if fetch_wfn:
record.wavefunction # type: ignore
record_dict = record.dict()
record_dict["specification_name"] = spec_name
records_list.append(record_dict)
records = pd.DataFrame(records_list)
records = records.sort_values("id")
records = records.reset_index(drop=True)
records
0it [00:00, ?it/s]
Out[8]:
id | record_type | is_service | properties | extras | status | manager_name | created_on | modified_on | owner_user | owner_group | compute_history_ | task_ | service_ | comments_ | native_files_ | specification | molecule_id | molecule_ | wavefunction_ | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 11 | singlepoint | False | {'pe energy': 0.0, 'scf dipole': [0.0335809823... | {} | RecordStatusEnum.complete | manager_demo_local-boromir-86f6c0ce-e825-41f3-... | 2023-06-12 20:22:35.161042 | 2023-06-12 20:23:29.625597 | admin_default | None | [{'id': 1, 'record_id': 11, 'status': 'RecordS... | None | None | None | None | {'program': 'psi4', 'driver': 'SinglepointDriv... | 28 | None | {'compression_type': 'CompressionEnum.zstd', '... |
1 | 12 | singlepoint | False | {'pe energy': 0.0, 'scf dipole': [-0.191476353... | {} | RecordStatusEnum.complete | manager_demo_local-boromir-86f6c0ce-e825-41f3-... | 2023-06-12 20:22:35.161047 | 2023-06-12 20:23:29.788551 | admin_default | None | [{'id': 2, 'record_id': 12, 'status': 'RecordS... | None | None | None | None | {'program': 'psi4', 'driver': 'SinglepointDriv... | 21 | None | {'compression_type': 'CompressionEnum.zstd', '... |
2 | 13 | singlepoint | False | {'pe energy': 0.0, 'scf dipole': [0.0032987900... | {} | RecordStatusEnum.complete | manager_demo_local-boromir-86f6c0ce-e825-41f3-... | 2023-06-12 20:22:35.161048 | 2023-06-12 20:23:29.856461 | admin_default | None | [{'id': 3, 'record_id': 13, 'status': 'RecordS... | None | None | None | None | {'program': 'psi4', 'driver': 'SinglepointDriv... | 24 | None | {'compression_type': 'CompressionEnum.zstd', '... |
3 | 14 | singlepoint | False | {'pe energy': 0.0, 'scf dipole': [-0.356400390... | {} | RecordStatusEnum.complete | manager_demo_local-boromir-86f6c0ce-e825-41f3-... | 2023-06-12 20:22:35.161049 | 2023-06-12 20:24:00.283171 | admin_default | None | [{'id': 4, 'record_id': 14, 'status': 'RecordS... | None | None | None | None | {'program': 'psi4', 'driver': 'SinglepointDriv... | 20 | None | {'compression_type': 'CompressionEnum.zstd', '... |
4 | 15 | singlepoint | False | {'pe energy': 0.0, 'scf dipole': [0.0085374704... | {} | RecordStatusEnum.complete | manager_demo_local-boromir-86f6c0ce-e825-41f3-... | 2023-06-12 20:22:35.161050 | 2023-06-12 20:24:00.373625 | admin_default | None | [{'id': 5, 'record_id': 15, 'status': 'RecordS... | None | None | None | None | {'program': 'psi4', 'driver': 'SinglepointDriv... | 27 | None | {'compression_type': 'CompressionEnum.zstd', '... |
5 | 16 | singlepoint | False | {'pe energy': 0.0, 'scf dipole': [0.8039027310... | {} | RecordStatusEnum.complete | manager_demo_local-boromir-86f6c0ce-e825-41f3-... | 2023-06-12 20:22:35.161051 | 2023-06-12 20:24:30.943935 | admin_default | None | [{'id': 6, 'record_id': 16, 'status': 'RecordS... | None | None | None | None | {'program': 'psi4', 'driver': 'SinglepointDriv... | 23 | None | {'compression_type': 'CompressionEnum.zstd', '... |
6 | 17 | singlepoint | False | None | None | RecordStatusEnum.error | manager_demo_local-boromir-86f6c0ce-e825-41f3-... | 2023-06-12 20:22:35.161052 | 2023-06-12 20:24:31.153263 | admin_default | None | [{'id': 7, 'record_id': 17, 'status': 'RecordS... | None | None | None | None | {'program': 'psi4', 'driver': 'SinglepointDriv... | 26 | None | None |
7 | 18 | singlepoint | False | {'pe energy': 0.0, 'scf dipole': [-0.453720071... | {} | RecordStatusEnum.complete | manager_demo_local-boromir-86f6c0ce-e825-41f3-... | 2023-06-12 20:22:35.161053 | 2023-06-12 20:25:01.445537 | admin_default | None | [{'id': 8, 'record_id': 18, 'status': 'RecordS... | None | None | None | None | {'program': 'psi4', 'driver': 'SinglepointDriv... | 29 | None | {'compression_type': 'CompressionEnum.zstd', '... |
8 | 19 | singlepoint | False | {'pe energy': 0.0, 'scf dipole': [0.0792046587... | {} | RecordStatusEnum.complete | manager_demo_local-boromir-86f6c0ce-e825-41f3-... | 2023-06-12 20:22:35.161054 | 2023-06-12 20:25:01.517410 | admin_default | None | [{'id': 9, 'record_id': 19, 'status': 'RecordS... | None | None | None | None | {'program': 'psi4', 'driver': 'SinglepointDriv... | 25 | None | {'compression_type': 'CompressionEnum.zstd', '... |
9 | 20 | singlepoint | False | None | None | RecordStatusEnum.error | manager_demo_local-boromir-86f6c0ce-e825-41f3-... | 2023-06-12 20:22:35.161054 | 2023-06-12 20:25:01.602811 | admin_default | None | [{'id': 10, 'record_id': 20, 'status': 'Record... | None | None | None | None | {'program': 'psi4', 'driver': 'SinglepointDriv... | 22 | None | None |
End-to-end export to Zarr¶
The below function is opiniated en-to-end export pipeline. It's very opiniated and probably not optimal since many of the outputs are not stored as array.
Use it as a guide to export and store only the relevant informations for your usecase.
In [9]:
Copied!
def export_dataset_to_zarr(
client: PortalClient,
dataset_name: str,
dataset_type: str,
output_file: str,
chunksize: int = 1_000,
progress: bool = True,
progress_leave: bool = False,
):
# This could be easily parallelized if we want to.
# Parallelization will not work if using ZIP.
# Get the dataset
ds = client.get_dataset(dataset_type=dataset_type, dataset_name=dataset_name)
root = zarr.open(output_file, mode="w")
root = cast(zarr.Group, root)
molecules_group = root.create_group("/molecules")
for i in tqdm(
range(0, len(ds.entry_names), chunksize),
disable=not progress,
leave=progress_leave,
):
# Pull completed records for those entry names
chunk_entry_names = ds.entry_names[i : i + chunksize]
records = ds.iterate_records(
entry_names=chunk_entry_names, status="complete", force_refetch=True
)
for entry_name, specification_name, record in records:
# Fetch molecule
record.molecule
# Fetch wavefunction
record.wavefunction
# Fetch compute_history
record.compute_history_ # consider not fetching it
# Get dict
record_dict = record.dict()
# Get infos about the molecule (3D system)
mol = record_dict["molecule_"]
smiles = mol["extras"]["canonical_isomeric_explicit_hydrogen_mapped_smiles"]
molecule_hash = mol["identifiers"]["molecule_hash"]
conformations = mol["geometry"]
# Get a group for that molecule
if molecule_hash in molecules_group:
molecule_group = molecules_group[molecule_hash]
else:
molecule_group = molecules_group.create_group(molecule_hash)
# Save infos about the molecule
molecule_group.attrs["smiles"] = smiles
molecule_group.attrs["molecule_hash"] = molecule_hash
molecule_group["conformations"] = conformations
# Get group for all the specifications for that molecule
if "specifications" in molecule_group:
specifications_group = molecule_group["specifications"]
else:
specifications_group = molecule_group.create_group("specifications")
# Get a group for that specification
if specification_name in specifications_group:
raise ValueError(
f"The specification '{specification_name}' already exists."
)
else:
specification_group = specifications_group.create_group(
specification_name
)
specification_group.attrs["entry_name"] = entry_name
specification_group.attrs["specification_name"] = specification_name
# Save the infos of the specification as attributes
specification_group.attrs["id"] = record_dict["id"]
specification_group.attrs["record_type"] = record_dict["record_type"]
specification_group.attrs["is_service"] = record_dict["is_service"]
specification_group.attrs["extras"] = record_dict["extras"]
specification_group.attrs["status"] = record_dict["status"]
specification_group.attrs["manager_name"] = record_dict["manager_name"]
specification_group.attrs["created_on"] = record_dict[
"created_on"
].isoformat()
specification_group.attrs["modified_on"] = record_dict[
"modified_on"
].isoformat()
specification_group.attrs["owner_user"] = record_dict["owner_user"]
specification_group.attrs["owner_group"] = record_dict["owner_group"]
specification_group.attrs["compute_history_"] = record_dict[
"compute_history_"
]
specification_group.attrs["task_"] = record_dict["task_"]
specification_group.attrs["service_"] = record_dict["service_"]
specification_group.attrs["comments_"] = record_dict["comments_"]
specification_group.attrs["native_files_"] = record_dict["native_files_"]
specification_group.attrs["molecule_id"] = record_dict["molecule_id"]
# For now we save the QM properties and the wavefunction as attributes as well.
# This is obviously NOT IDEAL.
specification_group.attrs["wavefunction_"] = record_dict["specification"]
specification_group.attrs["properties"] = record_dict["properties"]
# Cleanup (only needed when zip file)
root.store.close()
return root
def export_dataset_to_zarr(
client: PortalClient,
dataset_name: str,
dataset_type: str,
output_file: str,
chunksize: int = 1_000,
progress: bool = True,
progress_leave: bool = False,
):
# This could be easily parallelized if we want to.
# Parallelization will not work if using ZIP.
# Get the dataset
ds = client.get_dataset(dataset_type=dataset_type, dataset_name=dataset_name)
root = zarr.open(output_file, mode="w")
root = cast(zarr.Group, root)
molecules_group = root.create_group("/molecules")
for i in tqdm(
range(0, len(ds.entry_names), chunksize),
disable=not progress,
leave=progress_leave,
):
# Pull completed records for those entry names
chunk_entry_names = ds.entry_names[i : i + chunksize]
records = ds.iterate_records(
entry_names=chunk_entry_names, status="complete", force_refetch=True
)
for entry_name, specification_name, record in records:
# Fetch molecule
record.molecule
# Fetch wavefunction
record.wavefunction
# Fetch compute_history
record.compute_history_ # consider not fetching it
# Get dict
record_dict = record.dict()
# Get infos about the molecule (3D system)
mol = record_dict["molecule_"]
smiles = mol["extras"]["canonical_isomeric_explicit_hydrogen_mapped_smiles"]
molecule_hash = mol["identifiers"]["molecule_hash"]
conformations = mol["geometry"]
# Get a group for that molecule
if molecule_hash in molecules_group:
molecule_group = molecules_group[molecule_hash]
else:
molecule_group = molecules_group.create_group(molecule_hash)
# Save infos about the molecule
molecule_group.attrs["smiles"] = smiles
molecule_group.attrs["molecule_hash"] = molecule_hash
molecule_group["conformations"] = conformations
# Get group for all the specifications for that molecule
if "specifications" in molecule_group:
specifications_group = molecule_group["specifications"]
else:
specifications_group = molecule_group.create_group("specifications")
# Get a group for that specification
if specification_name in specifications_group:
raise ValueError(
f"The specification '{specification_name}' already exists."
)
else:
specification_group = specifications_group.create_group(
specification_name
)
specification_group.attrs["entry_name"] = entry_name
specification_group.attrs["specification_name"] = specification_name
# Save the infos of the specification as attributes
specification_group.attrs["id"] = record_dict["id"]
specification_group.attrs["record_type"] = record_dict["record_type"]
specification_group.attrs["is_service"] = record_dict["is_service"]
specification_group.attrs["extras"] = record_dict["extras"]
specification_group.attrs["status"] = record_dict["status"]
specification_group.attrs["manager_name"] = record_dict["manager_name"]
specification_group.attrs["created_on"] = record_dict[
"created_on"
].isoformat()
specification_group.attrs["modified_on"] = record_dict[
"modified_on"
].isoformat()
specification_group.attrs["owner_user"] = record_dict["owner_user"]
specification_group.attrs["owner_group"] = record_dict["owner_group"]
specification_group.attrs["compute_history_"] = record_dict[
"compute_history_"
]
specification_group.attrs["task_"] = record_dict["task_"]
specification_group.attrs["service_"] = record_dict["service_"]
specification_group.attrs["comments_"] = record_dict["comments_"]
specification_group.attrs["native_files_"] = record_dict["native_files_"]
specification_group.attrs["molecule_id"] = record_dict["molecule_id"]
# For now we save the QM properties and the wavefunction as attributes as well.
# This is obviously NOT IDEAL.
specification_group.attrs["wavefunction_"] = record_dict["specification"]
specification_group.attrs["properties"] = record_dict["properties"]
# Cleanup (only needed when zip file)
root.store.close()
return root
In [10]:
Copied!
dataset_name = "dataset_demo_5077749542"
root = export_dataset_to_zarr(
client=client,
dataset_name=dataset_name,
dataset_type="singlepoint",
output_file="/home/hadim/test_openfractal.zarr",
chunksize=1_000,
)
dataset_name = "dataset_demo_5077749542"
root = export_dataset_to_zarr(
client=client,
dataset_name=dataset_name,
dataset_type="singlepoint",
output_file="/home/hadim/test_openfractal.zarr",
chunksize=1_000,
)
0%| | 0/1 [00:00<?, ?it/s]
In [86]:
Copied!