"""GraphBolt OnDiskDataset."""
import os
from copy import deepcopy
from typing import Dict, List, Union
import pandas as pd
import torch
import yaml
import dgl
from ...base import dgl_warning
from ...data.utils import download, extract_archive
from ..base import etype_str_to_tuple
from ..dataset import Dataset, Task
from ..internal import copy_or_convert_data, get_attributes, read_data
from ..itemset import ItemSet, ItemSetDict
from ..sampling_graph import SamplingGraph
from .fused_csc_sampling_graph import from_dglgraph, FusedCSCSamplingGraph
from .ondisk_metadata import (
OnDiskGraphTopology,
OnDiskMetaData,
OnDiskTaskData,
OnDiskTVTSet,
)
from .torch_based_feature_store import TorchBasedFeatureStore
__all__ = ["OnDiskDataset", "preprocess_ondisk_dataset", "BuiltinDataset"]
def preprocess_ondisk_dataset(
dataset_dir: str, include_original_edge_id: bool = False
) -> str:
"""Preprocess the on-disk dataset. Parse the input config file,
load the data, and save the data in the format that GraphBolt supports.
Parameters
----------
dataset_dir : str
The path to the dataset directory.
include_original_edge_id : bool, optional
Whether to include the original edge id in the FusedCSCSamplingGraph.
Returns
-------
output_config_path : str
The path to the output config file.
"""
# Check if the dataset path is valid.
if not os.path.exists(dataset_dir):
raise RuntimeError(f"Invalid dataset path: {dataset_dir}")
# Check if the dataset_dir is a directory.
if not os.path.isdir(dataset_dir):
raise RuntimeError(
f"The dataset must be a directory. But got {dataset_dir}"
)
# 0. Check if the dataset is already preprocessed.
preprocess_metadata_path = os.path.join("preprocessed", "metadata.yaml")
if os.path.exists(os.path.join(dataset_dir, preprocess_metadata_path)):
print("The dataset is already preprocessed.")
return os.path.join(dataset_dir, preprocess_metadata_path)
print("Start to preprocess the on-disk dataset.")
processed_dir_prefix = "preprocessed"
# Check if the metadata.yaml exists.
metadata_file_path = os.path.join(dataset_dir, "metadata.yaml")
if not os.path.exists(metadata_file_path):
raise RuntimeError("metadata.yaml does not exist.")
# Read the input config.
with open(metadata_file_path, "r") as f:
input_config = yaml.safe_load(f)
# 1. Make `processed_dir_abs` directory if it does not exist.
os.makedirs(os.path.join(dataset_dir, processed_dir_prefix), exist_ok=True)
output_config = deepcopy(input_config)
# 2. Load the edge data and create a DGLGraph.
if "graph" not in input_config:
raise RuntimeError("Invalid config: does not contain graph field.")
is_homogeneous = "type" not in input_config["graph"]["nodes"][0]
if is_homogeneous:
# Homogeneous graph.
num_nodes = input_config["graph"]["nodes"][0]["num"]
edge_data = pd.read_csv(
os.path.join(
dataset_dir, input_config["graph"]["edges"][0]["path"]
),
names=["src", "dst"],
)
src, dst = edge_data["src"].to_numpy(), edge_data["dst"].to_numpy()
g = dgl.graph((src, dst), num_nodes=num_nodes)
else:
# Heterogeneous graph.
# Construct the num nodes dict.
num_nodes_dict = {}
for node_info in input_config["graph"]["nodes"]:
num_nodes_dict[node_info["type"]] = node_info["num"]
# Construct the data dict.
data_dict = {}
for edge_info in input_config["graph"]["edges"]:
edge_data = pd.read_csv(
os.path.join(dataset_dir, edge_info["path"]),
names=["src", "dst"],
)
src = torch.tensor(edge_data["src"])
dst = torch.tensor(edge_data["dst"])
data_dict[etype_str_to_tuple(edge_info["type"])] = (src, dst)
# Construct the heterograph.
g = dgl.heterograph(data_dict, num_nodes_dict)
# 3. Load the sampling related node/edge features and add them to
# the sampling-graph.
if input_config["graph"].get("feature_data", None):
for graph_feature in input_config["graph"]["feature_data"]:
in_memory = (
True
if "in_memory" not in graph_feature
else graph_feature["in_memory"]
)
if graph_feature["domain"] == "node":
node_data = read_data(
os.path.join(dataset_dir, graph_feature["path"]),
graph_feature["format"],
in_memory=in_memory,
)
g.ndata[graph_feature["name"]] = node_data
if graph_feature["domain"] == "edge":
edge_data = read_data(
os.path.join(dataset_dir, graph_feature["path"]),
graph_feature["format"],
in_memory=in_memory,
)
g.edata[graph_feature["name"]] = edge_data
# 4. Convert the DGLGraph to a FusedCSCSamplingGraph.
fused_csc_sampling_graph = from_dglgraph(
g, is_homogeneous, include_original_edge_id
)
# 5. Save the FusedCSCSamplingGraph and modify the output_config.
output_config["graph_topology"] = {}
output_config["graph_topology"]["type"] = "FusedCSCSamplingGraph"
output_config["graph_topology"]["path"] = os.path.join(
processed_dir_prefix, "fused_csc_sampling_graph.pt"
)
torch.save(
fused_csc_sampling_graph,
os.path.join(
dataset_dir,
output_config["graph_topology"]["path"],
),
)
del output_config["graph"]
# 6. Load the node/edge features and do necessary conversion.
if input_config.get("feature_data", None):
for feature, out_feature in zip(
input_config["feature_data"], output_config["feature_data"]
):
# Always save the feature in numpy format.
out_feature["format"] = "numpy"
out_feature["path"] = os.path.join(
processed_dir_prefix, feature["path"].replace("pt", "npy")
)
in_memory = (
True if "in_memory" not in feature else feature["in_memory"]
)
copy_or_convert_data(
os.path.join(dataset_dir, feature["path"]),
os.path.join(dataset_dir, out_feature["path"]),
feature["format"],
output_format=out_feature["format"],
in_memory=in_memory,
is_feature=True,
)
# 7. Save tasks and train/val/test split according to the output_config.
if input_config.get("tasks", None):
for input_task, output_task in zip(
input_config["tasks"], output_config["tasks"]
):
for set_name in ["train_set", "validation_set", "test_set"]:
if set_name not in input_task:
continue
for input_set_per_type, output_set_per_type in zip(
input_task[set_name], output_task[set_name]
):
for input_data, output_data in zip(
input_set_per_type["data"], output_set_per_type["data"]
):
# Always save the feature in numpy format.
output_data["format"] = "numpy"
output_data["path"] = os.path.join(
processed_dir_prefix,
input_data["path"].replace("pt", "npy"),
)
copy_or_convert_data(
os.path.join(dataset_dir, input_data["path"]),
os.path.join(dataset_dir, output_data["path"]),
input_data["format"],
output_data["format"],
)
# 8. Save the output_config.
output_config_path = os.path.join(dataset_dir, preprocess_metadata_path)
with open(output_config_path, "w") as f:
yaml.dump(output_config, f)
print("Finish preprocessing the on-disk dataset.")
# 9. Return the absolute path of the preprocessing yaml file.
return output_config_path
class OnDiskTask:
"""An on-disk task.
An on-disk task is for ``OnDiskDataset``. It contains the metadata and the
train/val/test sets.
"""
def __init__(
self,
metadata: Dict,
train_set: Union[ItemSet, ItemSetDict],
validation_set: Union[ItemSet, ItemSetDict],
test_set: Union[ItemSet, ItemSetDict],
):
"""Initialize a task.
Parameters
----------
metadata : Dict
Metadata.
train_set : Union[ItemSet, ItemSetDict]
Training set.
validation_set : Union[ItemSet, ItemSetDict]
Validation set.
test_set : Union[ItemSet, ItemSetDict]
Test set.
"""
self._metadata = metadata
self._train_set = train_set
self._validation_set = validation_set
self._test_set = test_set
@property
def metadata(self) -> Dict:
"""Return the task metadata."""
return self._metadata
@property
def train_set(self) -> Union[ItemSet, ItemSetDict]:
"""Return the training set."""
return self._train_set
@property
def validation_set(self) -> Union[ItemSet, ItemSetDict]:
"""Return the validation set."""
return self._validation_set
@property
def test_set(self) -> Union[ItemSet, ItemSetDict]:
"""Return the test set."""
return self._test_set
def __repr__(self) -> str:
return _ondisk_task_str(self)
[docs]class OnDiskDataset(Dataset):
"""An on-disk dataset which reads graph topology, feature data and
Train/Validation/Test set from disk.
Due to limited resources, the data which are too large to fit into RAM will
remain on disk while others reside in RAM once ``OnDiskDataset`` is
initialized. This behavior could be controled by user via ``in_memory``
field in YAML file. All paths in YAML file are relative paths to the
dataset directory.
A full example of YAML file is as follows:
.. code-block:: yaml
dataset_name: graphbolt_test
graph:
nodes:
- type: paper # could be omitted for homogeneous graph.
num: 1000
- type: author
num: 1000
edges:
- type: author:writes:paper # could be omitted for homogeneous graph.
format: csv # Can be csv only.
path: edge_data/author-writes-paper.csv
- type: paper:cites:paper
format: csv
path: edge_data/paper-cites-paper.csv
feature_data:
- domain: node
type: paper # could be omitted for homogeneous graph.
name: feat
format: numpy
in_memory: false # If not specified, default to true.
path: node_data/paper-feat.npy
- domain: edge
type: "author:writes:paper"
name: feat
format: numpy
in_memory: false
path: edge_data/author-writes-paper-feat.npy
tasks:
- name: "edge_classification"
num_classes: 10
train_set:
- type: paper # could be omitted for homogeneous graph.
data: # multiple data sources could be specified.
- name: node_pairs
format: numpy # Can be numpy or torch.
in_memory: true # If not specified, default to true.
path: set/paper-train-node_pairs.npy
- name: labels
format: numpy
path: set/paper-train-labels.npy
validation_set:
- type: paper
data:
- name: node_pairs
format: numpy
path: set/paper-validation-node_pairs.npy
- name: labels
format: numpy
path: set/paper-validation-labels.npy
test_set:
- type: paper
data:
- name: node_pairs
format: numpy
path: set/paper-test-node_pairs.npy
- name: labels
format: numpy
path: set/paper-test-labels.npy
Parameters
----------
path: str
The YAML file path.
include_original_edge_id: bool, optional
Whether to include the original edge id in the FusedCSCSamplingGraph.
"""
def __init__(
self, path: str, include_original_edge_id: bool = False
) -> None:
# Always call the preprocess function first. If already preprocessed,
# the function will return the original path directly.
self._dataset_dir = path
yaml_path = preprocess_ondisk_dataset(path, include_original_edge_id)
with open(yaml_path) as f:
self._yaml_data = yaml.load(f, Loader=yaml.loader.SafeLoader)
self._loaded = False
def _convert_yaml_path_to_absolute_path(self):
"""Convert the path in YAML file to absolute path."""
if "graph_topology" in self._yaml_data:
self._yaml_data["graph_topology"]["path"] = os.path.join(
self._dataset_dir, self._yaml_data["graph_topology"]["path"]
)
if "feature_data" in self._yaml_data:
for feature in self._yaml_data["feature_data"]:
feature["path"] = os.path.join(
self._dataset_dir, feature["path"]
)
if "tasks" in self._yaml_data:
for task in self._yaml_data["tasks"]:
for set_name in ["train_set", "validation_set", "test_set"]:
if set_name not in task:
continue
for set_per_type in task[set_name]:
for data in set_per_type["data"]:
data["path"] = os.path.join(
self._dataset_dir, data["path"]
)
[docs] def load(self):
"""Load the dataset."""
self._convert_yaml_path_to_absolute_path()
self._meta = OnDiskMetaData(**self._yaml_data)
self._dataset_name = self._meta.dataset_name
self._graph = self._load_graph(self._meta.graph_topology)
self._feature = TorchBasedFeatureStore(self._meta.feature_data)
self._tasks = self._init_tasks(self._meta.tasks)
self._all_nodes_set = self._init_all_nodes_set(self._graph)
self._loaded = True
return self
@property
def yaml_data(self) -> Dict:
"""Return the YAML data."""
return self._yaml_data
@property
def tasks(self) -> List[Task]:
"""Return the tasks."""
self._check_loaded()
return self._tasks
@property
def graph(self) -> SamplingGraph:
"""Return the graph."""
self._check_loaded()
return self._graph
@property
def feature(self) -> TorchBasedFeatureStore:
"""Return the feature."""
self._check_loaded()
return self._feature
@property
def dataset_name(self) -> str:
"""Return the dataset name."""
self._check_loaded()
return self._dataset_name
@property
def all_nodes_set(self) -> Union[ItemSet, ItemSetDict]:
"""Return the itemset containing all nodes."""
self._check_loaded()
return self._all_nodes_set
def _init_tasks(self, tasks: List[OnDiskTaskData]) -> List[OnDiskTask]:
"""Initialize the tasks."""
ret = []
if tasks is None:
return ret
for task in tasks:
ret.append(
OnDiskTask(
task.extra_fields,
self._init_tvt_set(task.train_set),
self._init_tvt_set(task.validation_set),
self._init_tvt_set(task.test_set),
)
)
return ret
def _check_loaded(self):
assert self._loaded, (
"Please ensure that you have called the OnDiskDataset.load() method"
+ " to properly load the data."
)
def _load_graph(
self, graph_topology: OnDiskGraphTopology
) -> FusedCSCSamplingGraph:
"""Load the graph topology."""
if graph_topology is None:
return None
if graph_topology.type == "FusedCSCSamplingGraph":
return torch.load(graph_topology.path)
raise NotImplementedError(
f"Graph topology type {graph_topology.type} is not supported."
)
def _init_tvt_set(
self, tvt_set: List[OnDiskTVTSet]
) -> Union[ItemSet, ItemSetDict]:
"""Initialize the TVT set."""
ret = None
if (tvt_set is None) or (len(tvt_set) == 0):
return ret
if tvt_set[0].type is None:
assert (
len(tvt_set) == 1
), "Only one TVT set is allowed if type is not specified."
ret = ItemSet(
tuple(
read_data(data.path, data.format, data.in_memory)
for data in tvt_set[0].data
),
names=tuple(data.name for data in tvt_set[0].data),
)
else:
data = {}
for tvt in tvt_set:
data[tvt.type] = ItemSet(
tuple(
read_data(data.path, data.format, data.in_memory)
for data in tvt.data
),
names=tuple(data.name for data in tvt.data),
)
ret = ItemSetDict(data)
return ret
def _init_all_nodes_set(self, graph) -> Union[ItemSet, ItemSetDict]:
if graph is None:
dgl_warning(
"`all_node_set` is returned as None, since graph is None."
)
return None
num_nodes = graph.num_nodes
if isinstance(num_nodes, int):
return ItemSet(num_nodes, names="seed_nodes")
else:
data = {
node_type: ItemSet(num_node, names="seed_nodes")
for node_type, num_node in num_nodes.items()
}
return ItemSetDict(data)
[docs]class BuiltinDataset(OnDiskDataset):
"""A utility class to download built-in dataset from AWS S3 and load it as
:class:`OnDiskDataset`.
Available built-in datasets include:
**cora**
The cora dataset is a homogeneous citation network dataset, which is
designed for the node classification task.
**ogbn-mag**
The ogbn-mag dataset is a heterogeneous network composed of a subset of
the Microsoft Academic Graph (MAG). See more details in
`ogbn-mag <https://ogb.stanford.edu/docs/nodeprop/#ogbn-mag>`_.
.. note::
Reverse edges are added to the original graph and duplicated
edges are removed.
**ogbl-citation2**
The ogbl-citation2 dataset is a directed graph, representing the
citation network between a subset of papers extracted from MAG. See
more details in `ogbl-citation2
<https://ogb.stanford.edu/docs/linkprop/#ogbl-citation2>`_.
.. note::
Reverse edges are added to the original graph and duplicated
edges are removed.
**ogbn-arxiv**
The ogbn-arxiv dataset is a directed graph, representing the citation
network between all Computer Science (CS) arXiv papers indexed by MAG.
See more details in `ogbn-arxiv
<https://ogb.stanford.edu/docs/nodeprop/#ogbn-arxiv>`_.
.. note::
Reverse edges are added to the original graph and duplicated
edges are removed.
**ogbn-products**
The ogbn-products dataset is an undirected and unweighted graph,
representing an Amazon product co-purchasing network. See more details
in `ogbn-products
<https://ogb.stanford.edu/docs/nodeprop/#ogbn-products>`_.
.. note::
Reverse edges are added to the original graph.
Node features are stored as float32.
**ogb-lsc-mag240m**
The ogb-lsc-mag240m dataset is a heterogeneous academic graph extracted
from the Microsoft Academic Graph (MAG). See more details in
`ogb-lsc-mag240m <https://ogb.stanford.edu/docs/lsc/mag240m/>`_.
.. note::
Reverse edges are added to the original graph.
Parameters
----------
name : str
The name of the builtin dataset.
root : str, optional
The root directory of the dataset. Default ot ``datasets``.
"""
# For dataset that is smaller than 30GB, we use the base url.
# Otherwise, we use the accelerated url.
_base_url = "https://data.dgl.ai/dataset/graphbolt/"
_accelerated_url = (
"https://dgl-data.s3-accelerate.amazonaws.com/dataset/graphbolt/"
)
_datasets = [
"cora",
"ogbn-mag",
"ogbl-citation2",
"ogbn-products",
"ogbn-arxiv",
]
_large_datasets = ["ogb-lsc-mag240m"]
_all_datasets = _datasets + _large_datasets
def __init__(self, name: str, root: str = "datasets") -> OnDiskDataset:
dataset_dir = os.path.join(root, name)
if not os.path.exists(dataset_dir):
if name not in self._all_datasets:
raise RuntimeError(
f"Dataset {name} is not available. Available datasets are "
f"{self._all_datasets}."
)
url = (
self._accelerated_url
if name in self._large_datasets
else self._base_url
)
url += name + ".zip"
os.makedirs(root, exist_ok=True)
zip_file_path = os.path.join(root, name + ".zip")
download(url, path=zip_file_path)
extract_archive(zip_file_path, root, overwrite=True)
os.remove(zip_file_path)
super().__init__(dataset_dir)
def _ondisk_task_str(task: OnDiskTask) -> str:
final_str = "OnDiskTask("
indent_len = len(final_str)
def _add_indent(_str, indent):
lines = _str.split("\n")
lines = [lines[0]] + [" " * indent + line for line in lines[1:]]
return "\n".join(lines)
attributes = get_attributes(task)
attributes.reverse()
for name in attributes:
if name[0] == "_":
continue
val = getattr(task, name)
final_str += (
f"{name}={_add_indent(str(val), indent_len + len(name) + 1)},\n"
+ " " * indent_len
)
return final_str[:-indent_len] + ")"