DaskArraySerializer
The DaskArraySerializer is designed to serialize Dask Arrays to appropriate
file types (hdf5 for local files, or zarr on s3 storage).
from dask.array.core import Array
class DaskArraySerializer
serializer_class = Array
Accepted Protocols are FILE_PROTOCOL and
S3_PROTOCOL.
File protocol interface
The file protocol used by DaskArraySerializer requires an open() function
that can be used as in the default python open().
def pickle(cls, obj : Array, protocols : dict):
if FILE_PROTOCOL in protocols:
file = protocols[FILE_PROTOCOL]
filename = generate_name()
f = file.open(filename, "wb")
hdf = h5py.File(f, mode = "w")
dset = hdf.create_dataset(cls.datapath, obj.shape, dtype= obj.dtype, chunks = obj.chunksize)
da.store(obj, dset)
hdf.close()
f.close()
def unpickle(cls, pid, protocols: dict):
_, filename = pid
if FILE_PROTOCOL in protocols:
file = protocols[FILE_PROTOCOL]
f = file.open(filename, "rb")
hdf = h5py.File(f, mode = "r")
dset = hdf[cls.datapath]
S3 protocol interface
The S3 protocol used by DaskArraySerializer requires a dictionary with
“client” and “folder” keys. The “client” value should have the S3FileSystem
interface as in s3fs.S3FileSystem. The “folder” value
def pickle(cls, obj : Array, protocols : dict):
if S3_PROTOCOL in protocols:
s3 = protocols[S3_PROTOCOL]
root = cls.get_s3_root(s3)
filename = generate_name()
dset = root.create_dataset(filename, shape=obj.shape, chunks=obj.chunksize, dtype=obj.dtype)
da.store(obj, dset)
def unpickle(cls, pid, protocols: dict):
_, filename = pid
if S3_PROTOCOL in protocols:
s3 = protocols[S3_PROTOCOL]
root = cls.get_s3_root(s3)
dset = root[filename]
def get_s3_root(cls, s3):
client = s3["client"]
folder = posixpath.join(s3["folder"], cls.datapath)
store = s3fs.S3Map(root=folder, s3=client, check=False)
root = zarr.group(store=store)
return root