Module hub.core.chunk_engine.write
Expand source code
import numpy as np
import pickle
from uuid import uuid1
from hub.core.chunk_engine import generate_chunks
from hub.constants import META_FILENAME, DEFAULT_CHUNK_SIZE
from hub.core.typing import StorageProvider
from typing import Any, Callable, List, Tuple
from .flatten import row_wise_to_bytes
from hub.util.keys import get_meta_key, get_index_map_key, get_chunk_key
from hub.util.array import normalize_and_batchify_shape
def write_tensor_meta(key: str, storage: StorageProvider, meta: dict):
storage[get_meta_key(key)] = pickle.dumps(meta)
def write_dataset_meta(storage: StorageProvider, meta: dict):
storage[META_FILENAME] = pickle.dumps(meta)
def write_array(
array: np.ndarray,
key: str,
storage: StorageProvider,
chunk_size: int = DEFAULT_CHUNK_SIZE,
batched: bool = False,
tobytes: Callable[[np.ndarray], bytes] = row_wise_to_bytes,
):
"""Chunk and write an array to storage.
For more on chunking, see the `generate_chunks` method.
Args:
array (np.ndarray): Array to be chunked/written. Batch axis (`array.shape[0]`) is optional, if `array` does have a
batch dimension, you should pass the argument `batched=True`.
key (str): Key for where the chunks, index_map, and meta will be located in `storage` relative to it's root.
storage (StorageProvider): StorageProvider for storing the chunks, index_map, and meta.
chunk_size (int): Desired length of each chunk.
batched (bool): If True, the provied `array`'s first axis (`shape[0]`) will be considered it's batch axis.
If False, a new axis will be created with a size of 1 (`array.shape[0] == 1`). default=False
tobytes (Callable): Callable that flattens an array into bytes. Must accept an `np.ndarray` as it's argument and return `bytes`.
Raises:
NotImplementedError: Do not use this function for writing to a key that already exists.
"""
array = normalize_and_batchify_shape(array, batched=batched)
meta_key = get_meta_key(key)
index_map_key = get_index_map_key(key)
if meta_key in storage or index_map_key in storage:
# TODO: when appending is done change this error
raise NotImplementedError("Appending is not supported yet.")
index_map: List[dict] = []
meta = {
"chunk_size": chunk_size,
"dtype": array.dtype.name,
"length": array.shape[0],
"min_shape": array.shape[1:],
"max_shape": array.shape[1:],
}
for i in range(array.shape[0]):
sample = array[i]
b = tobytes(sample)
index_map_entry = write_bytes(b, key, chunk_size, storage, index_map)
# shape per sample for dynamic tensors (TODO: if strictly fixed-size, store this in meta)
index_map_entry["shape"] = sample.shape
index_map.append(index_map_entry)
# TODO: don't use pickle
write_tensor_meta(key, storage, meta)
storage[index_map_key] = pickle.dumps(index_map)
def write_bytes(
b: bytes, key: str, chunk_size: int, storage: StorageProvider, index_map: List[dict]
) -> dict:
"""For internal use only. Chunk and write bytes to storage and return the index_map entry.
The provided bytes are treated as a single sample.
Args:
b (bytes): Bytes to be chunked/written. `b` is considered to be 1 sample and will be chunked according
to `chunk_size`.
key (str): Key for where the index_map, and meta are located in `storage` relative to it's root. A subdirectory
is created under this `key` (defined in `constants.py`), which is where the chunks will be stored.
chunk_size (int): Desired length of each chunk.
storage (StorageProvider): StorageProvider for storing the chunks, index_map, and meta.
index_map (list): List of dictionaries that represent each sample. An entry for `index_map` is returned
but not appended to `index_map`.
Returns:
dict: Index map entry (note: it does not get appended to the `index_map` argument). Dictionary keys:
chunk_names: Sequential list of names of chunks that were created.
start_byte: Start byte for this sample. Will be 0 if no previous chunks exist, otherwise will
be set to the length of the last chunk before writing.
end_byte: End byte for this sample. Will be equal to the length of the last chunk written to.
"""
last_chunk_name, last_chunk = _get_last_chunk(key, index_map, storage)
bllc = 0
extend_last_chunk = False
if len(index_map) > 0 and len(last_chunk) < chunk_size:
bllc = chunk_size - len(last_chunk)
extend_last_chunk = True
chunk_generator = generate_chunks(b, chunk_size, bytes_left_in_last_chunk=bllc)
chunk_names = []
start_byte = 0
for chunk in chunk_generator:
if extend_last_chunk:
chunk_name = last_chunk_name
last_chunk_bytearray = bytearray(last_chunk)
last_chunk_bytearray.extend(chunk)
chunk = bytes(last_chunk_bytearray)
start_byte = index_map[-1]["end_byte"]
if len(chunk) >= chunk_size:
extend_last_chunk = False
else:
chunk_name = _random_chunk_name()
end_byte = len(chunk)
chunk_key = get_chunk_key(key, chunk_name)
storage[chunk_key] = chunk
chunk_names.append(chunk_name)
last_chunk = chunk
last_chunk_name = chunk_name
# TODO: encode index_map_entry as array instead of dictionary
# TODO: encode shape into the sample's bytes instead of index_map
index_map_entry = {
"chunk_names": chunk_names,
"start_byte": start_byte,
"end_byte": end_byte,
}
return index_map_entry
def _get_last_chunk(
key: str, index_map: List[dict], storage: StorageProvider
) -> Tuple[str, bytes]:
"""For internal use only. Retrieves the name and bytes for the last chunk.
Args:
key (str): Key for where the chunks are located in `storage` relative to it's root.
index_map (list): List of dictionaries that maps each sample to the `chunk_names`, `start_byte`, and `end_byte`.
storage (StorageProvider): StorageProvider where the chunks are stored.
Returns:
str: Name of the last chunk. If the last chunk doesn't exist, returns an empty string.
bytes: Content of the last chunk. If the last chunk doesn't exist, returns empty bytes.
"""
last_chunk_name = ""
last_chunk = bytes()
if len(index_map) > 0:
last_index_map_entry = index_map[-1]
last_chunk_name = last_index_map_entry["chunk_names"][-1]
last_chunk_key = get_chunk_key(key, last_chunk_name)
last_chunk = storage[last_chunk_key]
return last_chunk_name, last_chunk
def _random_chunk_name() -> str:
return str(uuid1())
Functions
def write_array(array: numpy.ndarray, key: str, storage: StorageProvider, chunk_size: int = 16000000, batched: bool = False, tobytes: Callable[[numpy.ndarray], bytes] = <function row_wise_to_bytes>)
-
Chunk and write an array to storage.
For more on chunking, see the
generate_chunks
method.Args
array
:np.ndarray
- Array to be chunked/written. Batch axis (
array.shape[0]
) is optional, ifarray
does have a batch dimension, you should pass the argumentbatched=True
. key
:str
- Key for where the chunks, index_map, and meta will be located in
storage
relative to it's root. storage
:StorageProvider
- StorageProvider for storing the chunks, index_map, and meta.
chunk_size
:int
- Desired length of each chunk.
batched
:bool
- If True, the provied
array
's first axis (shape[0]
) will be considered it's batch axis. If False, a new axis will be created with a size of 1 (array.shape[0] == 1
). default=False tobytes
:Callable
- Callable that flattens an array into bytes. Must accept an
np.ndarray
as it's argument and returnbytes
.
Raises
NotImplementedError
- Do not use this function for writing to a key that already exists.
Expand source code
def write_array( array: np.ndarray, key: str, storage: StorageProvider, chunk_size: int = DEFAULT_CHUNK_SIZE, batched: bool = False, tobytes: Callable[[np.ndarray], bytes] = row_wise_to_bytes, ): """Chunk and write an array to storage. For more on chunking, see the `generate_chunks` method. Args: array (np.ndarray): Array to be chunked/written. Batch axis (`array.shape[0]`) is optional, if `array` does have a batch dimension, you should pass the argument `batched=True`. key (str): Key for where the chunks, index_map, and meta will be located in `storage` relative to it's root. storage (StorageProvider): StorageProvider for storing the chunks, index_map, and meta. chunk_size (int): Desired length of each chunk. batched (bool): If True, the provied `array`'s first axis (`shape[0]`) will be considered it's batch axis. If False, a new axis will be created with a size of 1 (`array.shape[0] == 1`). default=False tobytes (Callable): Callable that flattens an array into bytes. Must accept an `np.ndarray` as it's argument and return `bytes`. Raises: NotImplementedError: Do not use this function for writing to a key that already exists. """ array = normalize_and_batchify_shape(array, batched=batched) meta_key = get_meta_key(key) index_map_key = get_index_map_key(key) if meta_key in storage or index_map_key in storage: # TODO: when appending is done change this error raise NotImplementedError("Appending is not supported yet.") index_map: List[dict] = [] meta = { "chunk_size": chunk_size, "dtype": array.dtype.name, "length": array.shape[0], "min_shape": array.shape[1:], "max_shape": array.shape[1:], } for i in range(array.shape[0]): sample = array[i] b = tobytes(sample) index_map_entry = write_bytes(b, key, chunk_size, storage, index_map) # shape per sample for dynamic tensors (TODO: if strictly fixed-size, store this in meta) index_map_entry["shape"] = sample.shape index_map.append(index_map_entry) # TODO: don't use pickle write_tensor_meta(key, storage, meta) storage[index_map_key] = pickle.dumps(index_map)
def write_bytes(b: bytes, key: str, chunk_size: int, storage: StorageProvider, index_map: List[dict]) ‑> dict
-
For internal use only. Chunk and write bytes to storage and return the index_map entry. The provided bytes are treated as a single sample.
Args
b
:bytes
- Bytes to be chunked/written.
b
is considered to be 1 sample and will be chunked according tochunk_size
. key
:str
- Key for where the index_map, and meta are located in
storage
relative to it's root. A subdirectory is created under thiskey
(defined inconstants.py
), which is where the chunks will be stored. chunk_size
:int
- Desired length of each chunk.
storage
:StorageProvider
- StorageProvider for storing the chunks, index_map, and meta.
index_map
:list
- List of dictionaries that represent each sample. An entry for
index_map
is returned but not appended toindex_map
.
Returns
dict
- Index map entry (note: it does not get appended to the
index_map
argument). Dictionary keys: chunk_names: Sequential list of names of chunks that were created. start_byte: Start byte for this sample. Will be 0 if no previous chunks exist, otherwise will be set to the length of the last chunk before writing. end_byte: End byte for this sample. Will be equal to the length of the last chunk written to.
Expand source code
def write_bytes( b: bytes, key: str, chunk_size: int, storage: StorageProvider, index_map: List[dict] ) -> dict: """For internal use only. Chunk and write bytes to storage and return the index_map entry. The provided bytes are treated as a single sample. Args: b (bytes): Bytes to be chunked/written. `b` is considered to be 1 sample and will be chunked according to `chunk_size`. key (str): Key for where the index_map, and meta are located in `storage` relative to it's root. A subdirectory is created under this `key` (defined in `constants.py`), which is where the chunks will be stored. chunk_size (int): Desired length of each chunk. storage (StorageProvider): StorageProvider for storing the chunks, index_map, and meta. index_map (list): List of dictionaries that represent each sample. An entry for `index_map` is returned but not appended to `index_map`. Returns: dict: Index map entry (note: it does not get appended to the `index_map` argument). Dictionary keys: chunk_names: Sequential list of names of chunks that were created. start_byte: Start byte for this sample. Will be 0 if no previous chunks exist, otherwise will be set to the length of the last chunk before writing. end_byte: End byte for this sample. Will be equal to the length of the last chunk written to. """ last_chunk_name, last_chunk = _get_last_chunk(key, index_map, storage) bllc = 0 extend_last_chunk = False if len(index_map) > 0 and len(last_chunk) < chunk_size: bllc = chunk_size - len(last_chunk) extend_last_chunk = True chunk_generator = generate_chunks(b, chunk_size, bytes_left_in_last_chunk=bllc) chunk_names = [] start_byte = 0 for chunk in chunk_generator: if extend_last_chunk: chunk_name = last_chunk_name last_chunk_bytearray = bytearray(last_chunk) last_chunk_bytearray.extend(chunk) chunk = bytes(last_chunk_bytearray) start_byte = index_map[-1]["end_byte"] if len(chunk) >= chunk_size: extend_last_chunk = False else: chunk_name = _random_chunk_name() end_byte = len(chunk) chunk_key = get_chunk_key(key, chunk_name) storage[chunk_key] = chunk chunk_names.append(chunk_name) last_chunk = chunk last_chunk_name = chunk_name # TODO: encode index_map_entry as array instead of dictionary # TODO: encode shape into the sample's bytes instead of index_map index_map_entry = { "chunk_names": chunk_names, "start_byte": start_byte, "end_byte": end_byte, } return index_map_entry
def write_dataset_meta(storage: StorageProvider, meta: dict)
-
Expand source code
def write_dataset_meta(storage: StorageProvider, meta: dict): storage[META_FILENAME] = pickle.dumps(meta)
def write_tensor_meta(key: str, storage: StorageProvider, meta: dict)
-
Expand source code
def write_tensor_meta(key: str, storage: StorageProvider, meta: dict): storage[get_meta_key(key)] = pickle.dumps(meta)