Module hub.core.chunk_engine.tests.test_benchmark_chunk_engine
Expand source code
from typing import Tuple
import numpy as np
import pytest
from hub.constants import GB
from hub.core.chunk_engine import read_array, write_array
from hub.core.chunk_engine.tests.common import (
get_random_array,
TENSOR_KEY,
)
from hub.core.tests.common import parametrize_all_caches, parametrize_all_storages
from hub.core.typing import StorageProvider
from hub.tests.common_benchmark import (
parametrize_benchmark_shapes,
parametrize_benchmark_chunk_sizes,
parametrize_benchmark_dtypes,
parametrize_benchmark_num_batches,
)
def single_benchmark_write(info, key, arrays, chunk_size, storage, batched):
actual_key = "%s_%i" % (key, info["iteration"])
for a_in in arrays:
write_array(
array=a_in,
key=actual_key,
storage=storage,
chunk_size=chunk_size,
batched=batched,
)
info["iteration"] += 1
return actual_key
def benchmark_write(benchmark, shape, dtype, chunk_size, num_batches, storage):
"""
Benchmark `write_array`.
Samples have FIXED shapes (must have the same shapes).
Samples are provided WITH a batch axis.
"""
arrays = [get_random_array(shape, dtype) for _ in range(num_batches)]
gbs = (np.prod(shape) * num_batches * np.dtype(dtype).itemsize) / GB
benchmark.extra_info["input_array_gigabytes"] = gbs
info = {"iteration": 0}
benchmark(
single_benchmark_write,
info,
TENSOR_KEY,
arrays,
chunk_size,
storage,
batched=True,
)
storage.clear()
def single_benchmark_read(key, storage):
read_array(key, storage)
def benchmark_read(benchmark, shape, dtype, chunk_size, num_batches, storage):
"""
Benchmark `read_array`.
Samples have FIXED shapes (must have the same shapes).
Samples are provided WITH a batch axis.
"""
arrays = [get_random_array(shape, dtype) for _ in range(num_batches)]
info = {"iteration": 0}
actual_key = single_benchmark_write(
info, TENSOR_KEY, arrays, chunk_size, storage, batched=True
)
benchmark(single_benchmark_read, actual_key, storage)
storage.clear()
@pytest.mark.benchmark(group="chunk_engine_write_memory")
@parametrize_benchmark_shapes
@parametrize_benchmark_num_batches
@parametrize_benchmark_chunk_sizes
@parametrize_benchmark_dtypes
def test_write_memory(
benchmark,
shape: Tuple[int],
chunk_size: int,
num_batches: int,
dtype: str,
memory_storage: StorageProvider,
):
benchmark_write(
benchmark=benchmark,
shape=shape,
dtype=dtype,
chunk_size=chunk_size,
num_batches=num_batches,
storage=memory_storage,
)
@pytest.mark.benchmark(group="chunk_engine_read_memory")
@parametrize_benchmark_shapes
@parametrize_benchmark_num_batches
@parametrize_benchmark_chunk_sizes
@parametrize_benchmark_dtypes
def test_read_memory(
benchmark,
shape: Tuple[int],
chunk_size: int,
num_batches: int,
dtype: str,
memory_storage: StorageProvider,
):
benchmark_read(benchmark, shape, dtype, chunk_size, num_batches, memory_storage)
Functions
def benchmark_read(benchmark, shape, dtype, chunk_size, num_batches, storage)
-
Benchmark
read_array
.Samples have FIXED shapes (must have the same shapes). Samples are provided WITH a batch axis.
Expand source code
def benchmark_read(benchmark, shape, dtype, chunk_size, num_batches, storage): """ Benchmark `read_array`. Samples have FIXED shapes (must have the same shapes). Samples are provided WITH a batch axis. """ arrays = [get_random_array(shape, dtype) for _ in range(num_batches)] info = {"iteration": 0} actual_key = single_benchmark_write( info, TENSOR_KEY, arrays, chunk_size, storage, batched=True ) benchmark(single_benchmark_read, actual_key, storage) storage.clear()
def benchmark_write(benchmark, shape, dtype, chunk_size, num_batches, storage)
-
Benchmark
write_array
.Samples have FIXED shapes (must have the same shapes). Samples are provided WITH a batch axis.
Expand source code
def benchmark_write(benchmark, shape, dtype, chunk_size, num_batches, storage): """ Benchmark `write_array`. Samples have FIXED shapes (must have the same shapes). Samples are provided WITH a batch axis. """ arrays = [get_random_array(shape, dtype) for _ in range(num_batches)] gbs = (np.prod(shape) * num_batches * np.dtype(dtype).itemsize) / GB benchmark.extra_info["input_array_gigabytes"] = gbs info = {"iteration": 0} benchmark( single_benchmark_write, info, TENSOR_KEY, arrays, chunk_size, storage, batched=True, ) storage.clear()
def single_benchmark_read(key, storage)
-
Expand source code
def single_benchmark_read(key, storage): read_array(key, storage)
def single_benchmark_write(info, key, arrays, chunk_size, storage, batched)
-
Expand source code
def single_benchmark_write(info, key, arrays, chunk_size, storage, batched): actual_key = "%s_%i" % (key, info["iteration"]) for a_in in arrays: write_array( array=a_in, key=actual_key, storage=storage, chunk_size=chunk_size, batched=batched, ) info["iteration"] += 1 return actual_key
def test_read_memory(benchmark, shape: Tuple[int], chunk_size: int, num_batches: int, dtype: str, memory_storage: StorageProvider)
-
Expand source code
@pytest.mark.benchmark(group="chunk_engine_read_memory") @parametrize_benchmark_shapes @parametrize_benchmark_num_batches @parametrize_benchmark_chunk_sizes @parametrize_benchmark_dtypes def test_read_memory( benchmark, shape: Tuple[int], chunk_size: int, num_batches: int, dtype: str, memory_storage: StorageProvider, ): benchmark_read(benchmark, shape, dtype, chunk_size, num_batches, memory_storage)
def test_write_memory(benchmark, shape: Tuple[int], chunk_size: int, num_batches: int, dtype: str, memory_storage: StorageProvider)
-
Expand source code
@pytest.mark.benchmark(group="chunk_engine_write_memory") @parametrize_benchmark_shapes @parametrize_benchmark_num_batches @parametrize_benchmark_chunk_sizes @parametrize_benchmark_dtypes def test_write_memory( benchmark, shape: Tuple[int], chunk_size: int, num_batches: int, dtype: str, memory_storage: StorageProvider, ): benchmark_write( benchmark=benchmark, shape=shape, dtype=dtype, chunk_size=chunk_size, num_batches=num_batches, storage=memory_storage, )