# -*- coding: utf-8 -*-
# ELEKTRONN2 Toolkit
# Copyright (c) 2015 Marius Killinger
# All rights reserved
# TODO: Python 3 compatibility
__all__ = ['KnossosArray', 'KnossosArrayMulti']
import glob
import itertools
import re
import os
import traceback
import ctypes
import multiprocessing as mp
from collections import deque
import numpy as np
from .. import utils
def our_glob(s):
l = []
for g in glob.glob(s):
l.append(g.replace(os.path.sep,"/"))
return l
def get_remaining_size(fd):
try:
fn = fd.fileno()
except AttributeError:
return os.path.getsize(fd.name) - fd.tell()
st = os.fstat(fn)
size = st.st_size - fd.tell()
return size
def load_rawfile(path, out, offset=0):
fd = open(path, 'rb')
if (offset > 0):
fd.seek(offset, 1)
size = get_remaining_size(fd)
itemsize = out.itemsize
shapeprod = out.size
nbytes = shapeprod * itemsize
if nbytes > size:
raise ValueError(
"Not enough bytes left in file for specified shape and type")
nbytesread = fd.readinto(out.data)
if nbytesread != nbytes:
raise IOError("Didn't read as many bytes as expected")
fd.close()
return out
class LoadProc(mp.Process):
def __init__(self, mp_array, path, dtype, status):
super(LoadProc, self).__init__()
self.path = path
self.mp_array = mp_array
self.status = status
self.array = np.frombuffer(mp_array.get_obj(), dtype)
def run(self):
try:
self.mp_array.acquire()
a_buffer = np.empty(len(self.array), np.uint8)
load_rawfile(self.path, a_buffer)
self.array[:] = a_buffer
self.array /= 255
self.mp_array.release()
except:
self.mp_array.release()
traceback.print_exc()
self.status.set()
[docs]class KnossosArray(object):
"""
Interfaces with knossos cubes, all axes are in zxy order!
"""
def __init__(self, path, max_ram=1000, n_preload=2, fixed_mag=1):
path = os.path.expanduser(path)
# Knossos attributes
self._shape = np.zeros(3, dtype=np.int)
self._shape_spatial = np.zeros(3, dtype=np.int)
self.scale = np.zeros(3, dtype=np.float)
self._knossos_path = None
self._experiment_name = None
self._cube_boundary = None
self._cube_length = 128
self._cube_sh = [self._cube_length,]*3
self._mag = []
self._n_f = 1
# slicing/loading attributes
self.dtype = np.dtype(np.float32)
_n_preload = np.ones(3, dtype=np.float)*n_preload
_n_preload[0] /= 2
self.n_preload = np.ceil(_n_preload).astype(np.int)
max_bytes = max_ram * 2**20
self.cache_size = int(max_bytes / (self._cube_length**3 * self.dtype.itemsize))
if self.cache_size<np.prod(self.n_preload*2+1):
ram = np.prod(self.n_preload*2+1) * (self._cube_length**3 * self.dtype.itemsize) // 2**20
raise ValueError("For %s preload cubes more RAM is required: %i MB" %(n_preload, ram))
self.mpa_used = {}
self.mpa_empty = []
self.cubes = {}
self.load_q = deque()
self.loading = {}
for i in range(self.cache_size):
mp_array = mp.Array(ctypes.c_float, int(self._cube_length**3), lock=True)
mp_array.acquire()
self.mpa_empty.append(mp_array)
self._initialize_from_knossos_path(path, fixed_mag)
# self.track = []
@property
def shape(self):
return self._shape
@property
def n_f(self):
return self._n_f
def __repr__(self):
s = "<KnossosArray> %s\n"%(self.shape,)
s += "%i cubes in cache\n" %len(self.cubes)
s += "%i mpa in use\n" %len(self.mpa_used)
s += "%i mpa empty\n" %len(self.mpa_empty)
s += "%i mpa loading\n" %len(self.loading)
s += "%i loading q\n" %len(self.load_q)
return s
def _initialize_from_knossos_path(self, path, fixed_mag=0):
""" Initializes the dataset by parsing the knossos.conf in path + "mag1"
:param path: str
forward-slash separated path to the datasetfolder - not .../mag !
:param fixed_mag: int
fixes available mag to one specific value
:return:
nothing
"""
self._knossos_path = path
all_mag_folders = our_glob(os.path.join(path,"*mag*"))
if fixed_mag > 0:
self._mag.append(fixed_mag)
else:
for mag_test_nb in range(32):
for mag_folder in all_mag_folders:
if "mag"+str(2**mag_test_nb) in mag_folder:
self._mag.append(2**mag_test_nb)
break
mag_folder = our_glob(os.path.join(path,"*mag*"))[0].split("/")
if len(mag_folder[-1]) > 1:
mag_folder = mag_folder[-1]
else:
mag_folder = mag_folder[-2]
self._name_mag_folder = \
mag_folder[:-len(re.findall("[\d]+", mag_folder)[-1])]
try:
p = our_glob(os.path.join(path,"*mag1"))[0]+"/knossos.conf"
print("Reading %s" %p)
f = open(p)
lines = f.readlines()
f.close()
except:
raise NotImplementedError("Could not find/read *mag1/knossos.conf")
parsed_dict = {}
for line in lines:
try:
match = re.search(r'(?P<key>[A-Za-z _]+)'
r'((?P<numeric_value>[0-9\.]+)'
r'|"(?P<string_value>[A-Za-z0-9._/-]+)");',
line).groupdict()
if match['string_value']:
val = match['string_value']
elif '.' in match['numeric_value']:
val = float(match['numeric_value'])
elif match['numeric_value']:
val = int(match['numeric_value'])
else:
raise Exception('Malformed knossos.conf')
parsed_dict[match["key"]] = val
except:
print("Unreadable line in knossos.conf - ignored.")
self._shape[2] = parsed_dict['boundary x ']
self._shape[1] = parsed_dict['boundary y ']
self._shape[0] = parsed_dict['boundary z ']
self._shape_spatial[:] = self._shape
self._cube_boundary = np.ceil(self._shape.astype(np.float) / self._cube_length).astype(np.int) # the boundary is the first for which no cube is availbale!
self.scale[2] = parsed_dict['scale x ']
self.scale[1] = parsed_dict['scale y ']
self.scale[0] = parsed_dict['scale z ']
self._experiment_name = parsed_dict['experiment name ']
if self._experiment_name.endswith("mag1"):
self._experiment_name = self._experiment_name[:-5]
def _request_cube(self, coords):
assert (coords not in self.cubes) and (coords not in self.loading)
assert len(self.mpa_empty)
# self.track.append(coords)
coords_xyz = [coords[2], coords[1], coords[0]] # knossos has xyz order, we have zyx
mag = self._mag[0]
path = self._knossos_path+self._name_mag_folder+\
str(mag) + "/x%04d/y%04d/z%04d/" \
% (coords_xyz[0], coords_xyz[1], coords_xyz[2]) + \
self._experiment_name + '_mag' + str(mag) + \
"_x%04d_y%04d_z%04d.raw" \
% (coords_xyz[0], coords_xyz[1], coords_xyz[2])
if not os.path.exists(path):
raise ValueError("Slice out of bounce, no cube %s "
"[Note: not all cubes exist within bounding box!].)" %(coords,))
mp_array = self.mpa_empty.pop()
status = mp.Event() # is not set
mp_array.release()
proc = LoadProc(mp_array, path, self.dtype, status)
proc.start()
self.load_q.append([status, proc, mp_array, coords])
self.loading[coords]=True
def _clear_q(self, wait_for):
if wait_for is None:
states = [q[1].is_alive() for q in self.load_q]
try:
max_i = states.index(True)
except: # all have terminated already
max_i = len(self.load_q)
elif wait_for=='all':
max_i = len(self.load_q)
else:
max_i = len(self.load_q)
i = 0
while i<max_i:
status, proc, mp_array, coords = self.load_q.popleft()
if proc.is_alive():
print("WAITING for cube %s" %(coords,))
proc.join()
if status.is_set():
raise Exception("An error happened in one of the loading processes")
mp_array.acquire()
np_array = np.frombuffer(mp_array.get_obj(), self.dtype).reshape(self._cube_sh)
assert coords not in self.cubes, "%s"%(coords,)
self.cubes[coords] = np_array
self.mpa_used[coords] = mp_array
self.loading.pop(coords)
#print("cube %s loaded" %(coords,))
if coords==wait_for:
break
i += 1
[docs] def preload(self, position, start_end=None, sync=False):
"""
preloads around position preload distance but at least to cover start-end
"""
position = np.array(position, dtype=np.int)
center_cube = np.floor((position)/self._cube_length).astype(np.int)
start = np.maximum(center_cube - self.n_preload, 0)
end = np.minimum(center_cube + self.n_preload + 1, self._cube_boundary)
# preload indices
ijk_pre = list(itertools.product(range(start[0], end[0]),
range(start[1], end[1]),
range(start[2], end[2])))
# requested indices:
if start_end is not None:
start_r, end_r = start_end
if np.any(end_r > self._cube_boundary):
print("Warning: Slice at %s might be out of bounce (no cube %s)" %(position, end_r-1))
ijk_req = list(itertools.product(range(start_r[0], end_r[0]),
range(start_r[1], end_r[1]),
range(start_r[2], end_r[2])))
ijk_req_set = set((ijk_req))
ijk_pre = list(filter(lambda x: x not in ijk_req_set, ijk_pre))
ijk = ijk_req + ijk_pre
else:
ijk = ijk_pre
fetch = []
keep = {}
for coords in ijk:
if (coords in self.cubes):
keep[coords] = True
elif (coords not in self.loading):
fetch.append(coords)
if len(fetch) > len(self.mpa_empty): # need to release arrays first
n = len(fetch) - len(self.mpa_empty)
if len(ijk) > self.cache_size:
f = float(len(ijk)) / self.cache_size
raise ValueError("The requested array+preload margin is %.2f x"
" too large for the cache limit" %f)
self._release_cubes(center_cube, keep, n)
for i,j,k in fetch:
self._request_cube((i,j,k))
if sync:
self._clear_q("all")
if start_end is not None:
return ijk_req
def _release_cube(self, coords):
self.cubes.pop(coords)
mp_array = self.mpa_used.pop(coords)
self.mpa_empty.append(mp_array)
#print("cube %s deleted" %(coords,))
def _release_cubes(self, center_cube, keep, n):
release = []
for coords in self.cubes:
if coords not in keep:
dist = np.linalg.norm((center_cube - coords) * [2,1,1])
release.append((coords, dist))
release.sort(key=lambda x: x[1], reverse=True)
if len(release)< n: # not enough cubes available, they are still in loading q
for i in range(len(release)): # release the possible cubes first
self._release_cube(release[i][0])
n_left = n - len(release)
# now clear the q
try: # without waiting
self._clear_q(None)
self._release_cubes(center_cube, keep, n_left)
except: # if it doesnt work, wait for all
self._clear_q('all')
self._release_cubes(center_cube, keep, n_left)
return
for i in range(n):
self._release_cube(release[i][0])
def _fetch_cube(self, coords):
"""
Fetch a cube that is in cache or in loading q. i.e. this cube must have been preloaded
Don't use this to request/preload a cube!
"""
if coords not in self.cubes:
assert coords in self.loading
self._clear_q(coords)
return self.cubes[coords]
#@utils.timeit
[docs] def cut_slice(self, shape, offset, out=None):
#tt = utils.Timer()
shape = np.array(shape, dtype=np.int)
offset = np.array(offset, dtype=np.int)
start = np.floor((offset)/self._cube_length).astype(np.int)
end = np.floor((offset+shape-1)/self._cube_length).astype(np.int) + 1
position = offset + shape//2
#tt.check("setup", True)
ijk = self.preload(position=position, start_end=(start, end), sync=False)
#tt.check("preload", True)
if out is None:
out = np.empty(shape, self.dtype)
else:
assert out.dtype==self.dtype
assert np.allclose(out.shape, shape)
l = self._cube_length
for i,j,k in ijk: # Flat triple-nested loop
values = self._fetch_cube((i,j,k))
#tt.check("fetch", True)
xl = max(offset[0]-i*l, 0)
xh = min(offset[0]-i*l+shape[0], l)
yl = max(offset[1]-j*l, 0)
yh = min(offset[1]-j*l+shape[1], l)
zl = max(offset[2]-k*l, 0)
zh = min(offset[2]-k*l+shape[2], l)
cut = values[xl:xh, yl:yh, zl:zh]
#tt.check("cut", True)
xl1 = max(i*l-offset[0], 0)
xh1 = min((i+1)*l-offset[0], shape[0])
yl1 = max(j*l-offset[1], 0)
yh1 = min((j+1)*l-offset[1], shape[1])
zl1 = max(k*l-offset[2], 0)
zh1 = min((k+1)*l-offset[2], shape[2])
out[xl1:xh1, yl1:yh1, zl1:zh1] = cut
#tt.check('write', True)
#tt.summary()
return out
def _normalise_idx(self, idx):
new_idx = []
shape = np.zeros(3, dtype=np.int)
offset = np.zeros(3, dtype=np.int)
for i,sl in enumerate(idx):
if isinstance(sl, int):
raise ValueError("Cannot slice single slices, only ranges")
start, stop, step = sl.indices(self._shape_spatial[i])
if start < 0 or stop>=self._shape_spatial[i]:
raise ValueError("Slice %s out of bounce for knossos "
"array of shape %s" %(idx[i], self._shape_spatial[i]))
if step != 1:
raise NotImplementedError("Subsampled slicing not implemented:"
"instructins: use mag(stride) cubes "
"and reuse slicing function")
new_idx.append(slice(start, stop, step))
shape[i] = (stop - start)//step
offset[i] = start
return shape, offset, new_idx
def __getitem__(self, idx):
assert len(idx) in [3,4]
add_channel = False
if len(idx)==4:
assert idx[0].start==idx[0].stop==idx[0].step==None
idx = idx[1:]
add_channel = True
shape, offset, new_idx = self._normalise_idx(idx)
out = self.cut_slice(shape, offset)
if add_channel:
out = out[None]
return out
[docs]class KnossosArrayMulti(KnossosArray):
def __init__(self, path_prefix, feature_paths, max_ram=3000, n_preload=2, fixed_mag=1):
self._arrays = []
self._n_f = len(feature_paths)
self.dtype = np.dtype(np.float32)
for feature in feature_paths:
p = os.path.expanduser(os.path.join(path_prefix,feature))
self._arrays.append(KnossosArray(p, max_ram // self._n_f, n_preload, fixed_mag))
self._shape = (self._n_f,) + tuple(self._arrays[0].shape)
self._shape_spatial = self._arrays[0].shape
[docs] def preload(self, position, sync=True):
if len(position)==4:
position = position[1:]
for a in self._arrays:
a.preload(position, sync=sync)
def __getitem__(self, idx):
assert len(idx)==4
feature_slice = idx[0]
idx = idx[1:]
shape, offset, new_idx = self._normalise_idx(idx)
feature_indices = list(range(*feature_slice.indices(self._n_f)))
out = np.empty((len(feature_indices),)+tuple(shape), self.dtype)
for i in feature_indices:
self._arrays[i].cut_slice(shape, offset, out[i])
return out
[docs] def cut_slice(self, shape, offset, out=None):
if out is None:
out = np.empty((self._n_f,)+tuple(shape), self.dtype)
for i in range(self._n_f):
self._arrays[i].cut_slice(shape, offset, out[i])
return out
def __repr__(self):
s = "<KnossosArray> %s\nFirst sub-array:\n"%(self.shape,)
s += repr(self._arrays[0])
return s
class KnossosArrayMultiMy(KnossosArrayMulti):
def __getitem__(self, idx):
tmp = super(KnossosArrayMultiMy, self).__getitem__(idx)
sh = list(tmp.shape)
sh[0] = 2
out = np.empty(sh, dtype=np.float32)
out[0] = tmp[0]
out[1] = np.maximum(np.maximum(tmp[1], tmp[2]) - tmp[3], 0)
return out
@property
def n_f(self):
return self.n_f
@property
def shape(self):
sh = list(self._shape)
sh[0] = 2
return tuple(sh)
if __name__ == "__main__":
import time, sys
sys.path.append(os.path.expanduser("~/axon/mkilling/devel/"))
from knossos_utils import KnossosDataset
from elektronn2.utils.plotting import scroll_plot, scroll_plot2
path = os.path.expanduser("~/lustre/sdorkenw/j0126_3d_rrbarrier/")
arr = KnossosArray(path, max_ram=1200, n_preload=2)
arr.preload([0,0,0], sync=True)
sh = np.array([120,240,240])
off = np.array([0,0,0]) + np.random.randint(0, 500, size=3)
# a = arr[0:80, 0:220, 0:230]
# a0 = arr[0:100, 0:400, 0:200]
# a1 = arr[0:100, 100:400+100, 100:200+100]
# b = arr[0:640, 0:640, 0:384]
# c = arr[500:600, 500:800, 500:800]
#
kds = KnossosDataset()
kds.initialize_from_knossos_path(path)
get = utils.timeit(kds.from_raw_cubes_to_matrix)
# a0 = arr.cut_slice(sh, off)
# a1 = get(sh[[2,1,0]], off[[2,1,0]])
# a2 = (a1.T).astype(np.float32)/255
# print(np.allclose(a0, a2))
# fig = scroll_plot2([a0, a2], 'ab')
t = 0
out = np.empty(sh, dtype=np.float32)
N = 500
for i in range(N):
t0 = time.time()
a = arr.cut_slice(sh, off, out=out)
t += time.time()-t0
time.sleep(0.2)
off += np.random.randint(0, 10, size=3)
print("%.3f slices/s" %(float(N)/t))
arr._clear_q(None)
fig = scroll_plot(out, 'a')