# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2020 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
"""
Tools for dealing with ROIs (Regions of Interest).
In this context ROI is a 2d slice of an image. For example a top left corner of 10 pixels square
will have an ROI that can be constructed with :py:func:`numpy.s_` like this: ``s_[0:10, 0:10]``.
"""
import math
from collections import abc
from typing import List, Optional, Protocol, Sequence, Tuple, Union, overload
import numpy as np
from .math import align_down, align_up, edge_index
from .types import (
ROI,
Chunks2d,
NdROI,
NormalizedROI,
NormalizedSlice,
Shape2d,
SomeIndex2d,
SomeShape,
SomeSlice,
T,
iyx_,
shape_,
)
# This is numeric code, short names make sense in this context, so disabling
# "invalid name" checks for the whole file
# pylint: disable=invalid-name
class WindowFromSlice:
"""Translate numpy slices to rasterio window tuples."""
def __getitem__(self, roi):
if roi is None:
return None
if not isinstance(roi, abc.Sequence) or len(roi) != 2:
raise ValueError("Need 2d roi")
row, col = roi
return (
(0 if row.start is None else row.start, row.stop),
(0 if col.start is None else col.start, col.stop),
)
w_ = WindowFromSlice()
class RoiTiles(Protocol):
"""
Abstraction for 2d slice/shape/chunks lookup.
"""
def __getitem__(self, idx: Union[SomeIndex2d, ROI]) -> Tuple[slice, slice]: ...
def crop(self, roi: ROI) -> "RoiTiles": ...
def tile_shape(self, idx: SomeIndex2d) -> Shape2d: ...
@property
def shape(self) -> Shape2d: ...
@property
def base(self) -> Shape2d: ...
@property
def chunks(self) -> Chunks2d: ...
def locate(self, pix: SomeIndex2d) -> Tuple[int, int]: ...
def __dask_tokenize__(self): ...
def norm_slice_2d(
idx: Union[SomeIndex2d, ROI], shape: Tuple[int, int]
) -> NormalizedROI:
if isinstance(idx, tuple):
return roi_normalise(idx, shape)
return roi_normalise(iyx_(idx).yx, shape)
def _fmt_shape(shape):
n1, n2 = shape.yx
if max(n1, n2) > 10_000:
return f"{n1:_d}x{n2:_d}"
return f"{n1:d}x{n2:d}"
def clip_tiles(
tiles: RoiTiles, selection: Sequence[Tuple[int, int]]
) -> Tuple["RoiTiles", Tuple[slice, slice], List[Tuple[int, int]]]:
"""
Compute cropped version of tiles from a selection of tile indexes.
:return: Cropped RoiTiles object, crop that was applied and tile indexes
changed to the cropped address space.
"""
ii = np.asarray(selection)
y1, x1 = ii.min(axis=0).tolist()
y2, x2 = ii.max(axis=0).tolist()
roi = np.s_[y1 : y2 + 1, x1 : x2 + 1]
sel_new = [(y - y1, x - x1) for y, x in selection]
return tiles.crop(roi), roi, sel_new
class Tiles:
"""
Partition box into tiles.
Turns ``row, col`` index into a 2d ROI of the original box.
"""
def __init__(self, base_shape: SomeShape, tile_shape: SomeShape) -> None:
tile_shape = shape_(tile_shape)
base_shape = shape_(base_shape)
self._tile_shape = tile_shape
self._base_shape = base_shape
ny, nx = (
int(math.ceil(float(N) / n)) for N, n in zip(base_shape.yx, tile_shape.yx)
)
self._shape = shape_((ny, nx))
def crop(self, roi: ROI) -> "Tiles":
base_shape = roi_shape(self[roi])
return Tiles(base_shape, self._tile_shape)
def __getitem__(self, idx: Union[SomeIndex2d, ROI]) -> Tuple[slice, slice]:
def _slice(i: NormalizedSlice, N: int, n: int) -> slice:
_in = i.start * n
_out = i.stop * n
if 0 <= _in < N and _out < N + n:
return slice(_in, min(_out, N))
raise IndexError(f"Index {idx} is out of range")
idx = norm_slice_2d(idx, self._shape.yx)
ir, ic = (
_slice(i, N, n)
for i, N, n in zip(idx, self._base_shape.yx, self._tile_shape.yx)
)
return (ir, ic)
def tile_shape(self, idx: SomeIndex2d) -> Shape2d:
"""
Query shape for a given tile.
:param idx: ``(row, col)`` chunk index, supports numpy style indexing.
:returns: shape of a tile (edge tiles might be smaller)
:raises: :py:class:`IndexError` when index is outside of ``[(0,0) -> .shape)``.
"""
idx = iyx_(idx)
def _sz(i: int, n: int, tile_sz: int, total_sz: int) -> int:
if i < 0: # numpy style index from the right
i = n + i
if 0 <= i < n - 1: # not edge tile
return tile_sz
if i == n - 1: # edge tile
return total_sz - (i * tile_sz)
# out of index case
raise IndexError(f"Index {idx} is out of range")
ny, nx = map(
_sz, idx.yx, self._shape.yx, self._tile_shape.yx, self._base_shape.yx
)
return shape_((ny, nx))
@property
def shape(self) -> Shape2d:
"""Number of tiles along each dimension."""
return self._shape
@property
def base(self) -> Shape2d:
"""Base shape."""
return self._base_shape
@property
def chunks(self) -> Chunks2d:
"""Dask compatible chunk rerpesentation."""
NY, NX = self.shape.yx
ny, nx = self.tile_shape((0, 0)).yx
ny_, nx_ = self.tile_shape((NY - 1, NX - 1))
return (
(ny,) * (NY - 1) + (ny_,),
(nx,) * (NX - 1) + (nx_,),
)
def locate(self, pix: SomeIndex2d) -> Tuple[int, int]:
"""Tile index from pixel coordinate."""
NY, NX = self._base_shape.yx
y, x = iyx_(pix).yx
if y < 0 or y >= NY or x < 0 or x >= NX:
raise IndexError()
ny, nx = self.tile_shape((0, 0)).yx
return (y // ny, x // nx)
def __dask_tokenize__(self):
return (
"odc.geo.roi.Tiles",
*self._shape,
*self._tile_shape,
)
def __str__(self) -> str:
b1, b2, b3 = map(_fmt_shape, [self._shape, self._tile_shape, self._base_shape])
return f"Tiles: {b1}|{b2}px => {b3}px"
def __repr__(self) -> str:
return self.__str__()
def __eq__(self, __value: object) -> bool:
if __value is self:
return True
if not isinstance(__value, Tiles):
return False
return (
self._base_shape == __value._base_shape
and self._tile_shape == __value._tile_shape
)
class VariableSizedTiles:
"""
Partition box into tiles of varying sizes.
Turns ``row, col`` index into a 2d ROI of the original box.
"""
__slots__ = ("_offsets",)
def __init__(self, chunks: Chunks2d) -> None:
self._offsets = tuple(
np.asarray([0, *idx], dtype="int32").cumsum(dtype="int32") for idx in chunks
)
def crop(self, roi: ROI) -> "VariableSizedTiles":
roi = roi_normalise(roi, self.shape.yx)
y, x = (ch[s.start : s.stop] for ch, s in zip(self.chunks, roi))
return VariableSizedTiles((y, x))
def __getitem__(self, idx: Union[SomeIndex2d, ROI]) -> Tuple[slice, slice]:
idx = norm_slice_2d(idx, self.shape.yx)
y, x = (
slice(int(a[i.start]), int(a[i.stop])) for a, i in zip(self._offsets, idx)
)
return (y, x)
def tile_shape(self, idx: SomeIndex2d) -> Shape2d:
"""
Query shape for a given tile.
:param idx: ``(row, col)`` chunk index
:returns: shape of a tile
:raises: :py:class:`IndexError` when index is outside of ``[(0,0) -> .shape)``.
"""
idx = iyx_(idx)
ny, nx = (int(a[i + 1]) - int(a[i]) for a, i in zip(self._offsets, idx.yx))
return Shape2d(x=nx, y=ny)
@property
def shape(self) -> Shape2d:
"""Number of tiles along each dimension."""
ny, nx = (len(idx) - 1 for idx in self._offsets)
return Shape2d(x=nx, y=ny)
@property
def base(self) -> Shape2d:
"""Base shape."""
ny, nx = (int(idx[-1]) for idx in self._offsets)
return Shape2d(x=nx, y=ny)
@property
def chunks(self) -> Tuple[Tuple[int, ...], Tuple[int, ...]]:
"""Dask compatible chunk rerpesentation."""
y, x = (tuple(np.diff(idx).tolist()) for idx in self._offsets)
return (y, x)
def locate(self, pix: SomeIndex2d) -> Tuple[int, int]:
"""Tile index from pixel coordinate."""
NY, NX = self.base.yx
y, x = iyx_(pix).yx
if y < 0 or y >= NY or x < 0 or x >= NX:
raise IndexError()
y, x = (
# 1: because offsets start from 0, n1, n1+n2, ...
int(np.searchsorted(bins[1:], pix, "right"))
for bins, pix in zip(self._offsets, (y, x))
)
return (y, x)
def __dask_tokenize__(self):
return (
"odc.geo.roi.VariableSizedTiles",
*self._offsets,
)
def __str__(self) -> str:
b1, b2, b3 = map(_fmt_shape, [self.shape, self.tile_shape((0, 0)), self.base])
return f"Tiles: {b1}|chunked {b2}px => {b3}px"
def __repr__(self) -> str:
return self.__str__()
def __eq__(self, __value: object) -> bool:
if __value is self:
return True
if not isinstance(__value, VariableSizedTiles):
return False
for a, b in zip(self._offsets, __value._offsets):
if a.shape != b.shape:
return False
if (a != b).any():
return False
return True
def roi_tiles(shape: SomeShape, how: Union[SomeShape, Chunks2d]) -> RoiTiles:
if isinstance(how, (tuple, list)) and isinstance(how[0], (tuple, list)):
y, x = (tuple(i) for i in how)
return VariableSizedTiles((y, x))
return Tiles(shape, how)
[docs]
def polygon_path(
x: np.ndarray,
y: Optional[np.ndarray] = None,
closed: bool = True,
) -> np.ndarray:
"""
Points along axis aligned polygon.
A little bit like :py:func:`numpy.meshgrid`, except returns only boundary points and limited to
a 2d case only.
.. rubric:: Examples
.. code-block::
[0,1] - unit square
[0,1], [0,1] => [[0, 1, 1, 0, 0],
[0, 0, 1, 1, 0]])
# three points per X, two point per Y side
[0,1,2], [7,9] => [[0, 1, 2, 2, 1, 0, 0],
[7, 7, 7, 9, 9, 9, 7]]
:returns:
A ``2xN`` array, ``x, y = polygon_path(...)``
"""
if not isinstance(x, np.ndarray):
x = np.asarray(x)
if y is None:
y = x
if not isinstance(y, np.ndarray):
y = np.asarray(y)
shape = (len(y), len(x))
iy, ix = np.asarray(list(edge_index(shape, closed=closed)), dtype="uint32").T
return np.vstack([x[np.newaxis, ix], y[np.newaxis, iy]])
[docs]
def roi_boundary(roi: NormalizedROI, pts_per_side: int = 2) -> np.ndarray:
"""
Get boundary points from a 2d roi.
roi needs to be in the normalised form, i.e. no open-ended start/stop,
:returns:
``Nx2 <float32>`` array of ``X,Y`` points on the perimeter of the envelope defined by ``roi``
.. seealso:: :py:func:`~odc.geo.roi.roi_normalise`
"""
yy, xx = roi
xx = np.linspace(xx.start, xx.stop, pts_per_side, dtype="float32")
yy = np.linspace(yy.start, yy.stop, pts_per_side, dtype="float32")
return polygon_path(xx, yy, closed=False).T
[docs]
def scaled_down_roi(roi: NormalizedROI, scale: int) -> NormalizedROI:
"""
Compute ROI for a scaled down image.
Given a crop region of the original image compute equivalent crop in the overview image.
:param roi: ROI in the original image
:param scale: integer scale to get scaled down image
:return: ROI in the scaled down image
"""
s1, s2 = (slice(s.start // scale, align_up(s.stop, scale) // scale) for s in roi)
return (s1, s2)
[docs]
def scaled_up_roi(
roi: NormalizedROI, scale: int, shape: Optional[SomeShape] = None
) -> NormalizedROI:
"""
Compute ROI for a scaled up image.
Given a crop region in the original image compute equivalent crop in the upsampled image.
:param roi: ROI in the original image
:param scale: integer scale to get scaled up image
:param shape: Clamp to that shape is supplied
:return: ROI in the scaled up image
"""
s1, s2 = (slice(s.start * scale, s.stop * scale) for s in roi)
if shape is not None:
s1, s2 = (
slice(min(dim, s.start), min(dim, s.stop))
for s, dim in zip([s1, s2], shape_(shape))
)
return (s1, s2)
[docs]
def scaled_down_shape(shape: Tuple[int, ...], scale: int) -> Tuple[int, ...]:
"""
Compute shape of the overview image.
:param shape: Original shape
:param scale: Shrink factor
:return: shape of the overview image
"""
return tuple(align_up(s, scale) // scale for s in shape)
# fmt: off
@overload
def roi_shape(roi: ROI) -> Tuple[int, int]: ...
@overload
def roi_shape(roi: NdROI) -> Tuple[int, ...]: ...
# fmt: on
[docs]
def roi_shape(roi: NdROI) -> Tuple[int, ...]:
"""
Shape of an array after cropping with ``roi``.
Same as ``xx[roi].shape``.
"""
def slice_dim(s: SomeSlice) -> int:
if isinstance(s, int):
return 1
_out = s.stop
if _out is None:
raise ValueError(
"Can't determine shape of the slice with open right-hand side."
)
if s.start is None:
return _out
return _out - s.start
if not isinstance(roi, tuple):
roi = (roi,)
return tuple(slice_dim(s) for s in roi)
[docs]
def roi_is_empty(roi: NdROI) -> bool:
"""
Check if ROI is "empty".
ROI is empty if any dimension is 0 elements wide.
"""
return any(d <= 0 for d in roi_shape(roi))
[docs]
def roi_is_full(roi: NdROI, shape: Union[int, Tuple[int, ...]]) -> bool:
"""
Check if ROI covers the entire region.
:returns: ``True`` if ``roi`` covers region from ``(0,..) -> shape``
:returns: ``False`` if ``roi`` actually crops an image
"""
def slice_full(s: SomeSlice, n: int) -> bool:
if isinstance(s, int):
return n == 1
return s.start in (0, None) and s.stop in (n, None)
if not isinstance(roi, tuple):
roi = (roi,)
if not isinstance(shape, tuple):
shape = (shape,)
return all(slice_full(s, n) for s, n in zip(roi, shape))
def _fill_if_none(x: Optional[T], val_if_none: T) -> T:
return val_if_none if x is None else x
def _norm_slice_or_error(s: SomeSlice) -> NormalizedSlice:
if isinstance(s, int):
start = s
stop = s + 1
step = None
else:
start = _fill_if_none(s.start, 0)
if s.stop is None:
raise ValueError("Can't process open ended slice")
stop = s.stop
step = s.step
if stop < 0 or start < 0:
raise ValueError("Can't process negative offset slice")
return slice(start, stop, step)
def _norm_slice(s: SomeSlice, n: int) -> NormalizedSlice:
if isinstance(s, int):
if s < 0:
s = n + s
return slice(s, s + 1)
start = _fill_if_none(s.start, 0)
stop = _fill_if_none(s.stop, n)
start, stop = (x if x >= 0 else n + x for x in (start, stop))
return slice(start, stop, s.step)
def slice_intersect3(a: SomeSlice, b: SomeSlice) -> Tuple[slice, slice, slice]:
"""
Compute overlap 3 way.
Compute part of a that overlaps b, part of b that overlaps a and the region
of the original region covered by the overlap.
:returns: ``a', b', ab'``, such that ``X[a][a'] == X[b][b'] == X[ab']``
"""
a = _norm_slice_or_error(a)
b = _norm_slice_or_error(b)
na = a.stop - a.start
nb = b.stop - b.start
if a.stop < b.start:
return slice(na, na), slice(0, 0), slice(a.stop, a.stop)
if a.start > b.stop:
return slice(0, 0), slice(nb, nb), slice(a.start, a.start)
_in = max(a.start, b.start)
_out = min(a.stop, b.stop)
return (
slice(_in - a.start, _out - a.start),
slice(_in - b.start, _out - b.start),
slice(_in, _out),
)
def roi_intersect3(
a: Tuple[SomeSlice, ...], b: Tuple[SomeSlice, ...]
) -> Tuple[Tuple[slice, ...], Tuple[slice, ...], Tuple[slice, ...]]:
"""
Compute overlap 3 way.
Compute part of a that overlaps b, part of b that overlaps a and the region
of the original region covered by the overlap.
:returns: ``a', b', ab'``, such that ``X[a][a'] == X[b][b'] == X[ab']``
"""
assert len(a) == len(b)
aa, bb, cc = zip(*[slice_intersect3(a_, b_) for a_, b_ in zip(a, b)])
return aa, bb, cc
# fmt: off
@overload
def roi_normalise(roi: SomeSlice, shape: Union[int, Tuple[int]]) -> NormalizedSlice: ...
@overload
def roi_normalise(roi: ROI, shape: Tuple[int, int]) -> NormalizedROI: ...
@overload
def roi_normalise(roi: Tuple[SomeSlice, ...], shape: Tuple[int, ...]
) -> Tuple[NormalizedSlice, ...]: ...
# fmt: on
[docs]
def roi_normalise(
roi: NdROI, shape: Union[int, Tuple[int, ...]]
) -> Union[NormalizedSlice, Tuple[NormalizedSlice, ...]]:
"""
Normalise ROI.
Fill in missing ``.start/.stop``, also deal with negative values, which are treated as offsets
from the end.
``.step`` parameter is left unchanged.
.. rubric:: Example
.. code-block::
np.s_[:3, 4: ], (10, 20) => np.s_[0:3, 4:20]
np.s_[:3, :-3], (10, 20) => np.s_[0:3, 0:17]
"""
if not isinstance(roi, abc.Sequence):
if isinstance(shape, abc.Sequence):
(shape,) = shape
return _norm_slice(roi, shape)
if not isinstance(shape, abc.Sequence):
shape = (shape,)
return tuple(_norm_slice(s, n) for s, n in zip(roi, shape))
# fmt: off
@overload
def roi_pad(roi: SomeSlice, pad: int, shape: int) -> NormalizedSlice: ...
@overload
def roi_pad(roi: Tuple[SomeSlice, ...], pad: int, shape: Tuple[int, ...]) -> Tuple[NormalizedSlice, ...]: ...
# fmt: on
[docs]
def roi_pad(
roi: NdROI, pad: int, shape: Union[int, Tuple[int, ...]]
) -> Union[NormalizedSlice, Tuple[NormalizedSlice, ...]]:
"""
Pad ROI on each side, with clamping.
Returned ROI is guaranteed to be within ``(0,..) -> shape``.
"""
def pad_slice(s: SomeSlice, n: int) -> NormalizedSlice:
s = _norm_slice(s, n)
return slice(max(0, s.start - pad), min(n, s.stop + pad))
if not isinstance(roi, abc.Sequence):
if isinstance(shape, abc.Sequence):
(shape,) = shape
return pad_slice(roi, shape)
if not isinstance(shape, abc.Sequence):
shape = (shape,)
return tuple(pad_slice(s, n) for s, n in zip(roi, shape))
# fmt: off
@overload
def roi_intersect(a: SomeSlice, b: SomeSlice) -> NormalizedSlice: ...
@overload
def roi_intersect(a: Tuple[SomeSlice, ...], b: Tuple[SomeSlice, ...]) -> Tuple[NormalizedSlice, ...]: ...
# fmt: on
[docs]
def roi_intersect(
a: NdROI, b: NdROI
) -> Union[NormalizedSlice, Tuple[NormalizedSlice, ...]]:
"""
Compute intersection of two ROIs.
.. rubric:: Examples
.. code-block::
s_[1:30], s_[20:40] => s_[20:30]
s_[1:10], s_[20:40] => s_[10:10]
# works for N dimensions
s_[1:10, 11:21], s_[8:12, 10:30] => s_[8:10, 11:21]
"""
def slice_intersect(a: SomeSlice, b: SomeSlice) -> NormalizedSlice:
a = _norm_slice_or_error(a)
b = _norm_slice_or_error(b)
if a.stop < b.start:
return slice(a.stop, a.stop)
if a.start > b.stop:
return slice(a.start, a.start)
_in = max(a.start, b.start)
_out = min(a.stop, b.stop)
return slice(_in, _out)
if not isinstance(a, abc.Sequence):
if isinstance(b, abc.Sequence):
(b,) = b
return slice_intersect(a, b)
if not isinstance(b, abc.Sequence):
b = (b,)
return tuple(slice_intersect(sa, sb) for sa, sb in zip(a, b))
# fmt: off
@overload
def roi_center(roi: SomeSlice) -> float: ...
@overload
def roi_center(roi: Tuple[SomeSlice, ...]) -> Tuple[float, ...]: ...
# fmt: on
[docs]
def roi_center(roi: NdROI) -> Union[float, Tuple[float, ...]]:
"""Return center point of an ``roi``."""
def slice_center(s: SomeSlice) -> float:
s = _norm_slice_or_error(s)
return (s.start + s.stop) * 0.5
if not isinstance(roi, abc.Sequence):
return slice_center(roi)
return tuple(slice_center(s) for s in roi)
[docs]
def roi_from_points(
xy: np.ndarray,
shape: SomeShape,
padding: int = 0,
align: Optional[int] = None,
) -> NormalizedROI:
"""
Build ROI from sample points.
Compute envelope around a bunch of points and return it as an ROI (tuple of
row/col slices)
Returned roi is clipped ``(0,0) --> shape``, so it won't stick outside of the
valid region.
"""
shape = shape_(shape)
def to_roi(*args):
return tuple(slice(int(v[0]), int(v[1])) for v in args)
assert len(shape) == 2
assert xy.ndim == 2 and xy.shape[1] == 2
# keep finite points only
# if any points are not finite remove them
ok_mask = np.isfinite(xy)
if not ok_mask.all():
keep = ok_mask.T[0] * ok_mask.T[1]
xy = xy[keep, :]
if xy.shape[0] == 0:
return np.s_[0:0, 0:0]
ny, nx = shape
_in = np.floor(xy.min(axis=0)).astype("int32") - padding
_out = np.ceil(xy.max(axis=0)).astype("int32") + padding
if align is not None:
_in = align_down(_in, align)
_out = align_up(_out, align)
xx = np.asarray([_in[0], _out[0]])
yy = np.asarray([_in[1], _out[1]])
xx = np.clip(xx, 0, nx, out=xx)
yy = np.clip(yy, 0, ny, out=yy)
return to_roi(yy, xx)