import os
import numpy as np
import pandas as pd
from cvkit.pose_estimation import Skeleton, Part
from cvkit.pose_estimation.data_readers.datastore_interface import DataStoreInterface
[docs]class DeeplabcutDataStore(DataStoreInterface):
"""
Implements a datastore reader for DeepLabCut compatible data files.
:param body_parts: list of column names
:param path: path to data file
:param dimension: data dimension
"""
FLAVOR = "deeplabcut"
DIMENSIONS = 2
def __init__(self, body_parts, path,dimensions=2):
super().__init__(body_parts, path,2)
if path is not None and os.path.exists(path):
self.data = pd.read_csv(path, header=[0, 1, 2], index_col=0, dtype='unicode')
else:
self.data = None
for bodypart in body_parts:
pdindex = pd.MultiIndex.from_product(
[["CVKit3D"], [bodypart], ["x", "y", "likelihood"]],
names=["scorer", "bodyparts", "coords"],
)
frame = pd.DataFrame(columns=pdindex)
self.data = frame if self.data is None else pd.concat([frame, self.data], axis=1)
self.scorer = self.data.columns[0][0]
if (self.scorer, 'behaviour', 'name') not in self.data.columns:
self.data[self.scorer, 'behaviour', 'name'] = ""
for bodypart in body_parts:
if (self.scorer, bodypart, 'x') not in self.data.columns:
self.data[(self.scorer, bodypart, 'x')] = -1
self.data[(self.scorer, bodypart, 'y')] = -1
self.data[(self.scorer, bodypart, 'likelihood')] = "0"
self.data = self.data.astype({(self.scorer, bodypart, 'x'): float, (self.scorer, bodypart, 'y'): float,
(self.scorer, bodypart, 'likelihood'): float})
# self.data.sort_index(level=[0,1,2],axis=1,inplace=True)
if not self.data.index.is_monotonic_increasing:
self.data.sort_index(inplace=True)
def get_header_string(self):
level_0 = ['scorer']
level_0.extend(self.data.columns.get_level_values(0))
level_1 = ['bodyparts']
level_1.extend(self.data.columns.get_level_values(1))
level_2 = ['coords']
level_2.extend(self.data.columns.get_level_values(2))
return [level_0, level_1, level_2]
[docs] @staticmethod
def convert_to_list(index, skeleton, threshold=0.8):
out = [index]
for part in skeleton.body_parts:
out.extend(skeleton[part].tolist())
out.append(skeleton[part].likelihood)
return out
[docs] def delete_part(self, index, name, force_remove=False):
if force_remove or index in self.data.index:
self.data.loc[index, (self.scorer, name, 'likelihood')] = 0.0
[docs] def set_behaviour(self, index, behaviour) -> None:
self.data.loc[index, (self.scorer, 'behaviour', 'name')] = self.BEHAVIOUR_SEP.join(behaviour)
[docs] def get_behaviour(self, index) -> list:
if index in self.data.index and not pd.isna(self.data.loc[index, (self.scorer, 'behaviour', 'name')]):
return self.data.loc[index, (self.scorer, 'behaviour', 'name')].split(self.BEHAVIOUR_SEP)
else:
return []
[docs] def get_part(self, index, name) -> Part:
if index in self.data.index:
return Part(
[self.data.loc[index, (self.scorer, name, 'x')], self.data.loc[index, (self.scorer, name, 'y')]],
name, self.data.loc[index, (self.scorer, name, 'likelihood')])
else:
return Part([self.MAGIC_NUMBER] * self.DIMENSIONS, name, 0.0)
[docs] def set_part(self, index, part: Part) -> None:
name = part.name
self.data.loc[index, (self.scorer, name, 'x')] = part[0]
self.data.loc[index, (self.scorer, name, 'y')] = part[1]
self.data.loc[index, (self.scorer, name, 'likelihood')] = part.likelihood
if not self.data.index.is_monotonic_increasing:
self.data.sort_index(inplace=True)
[docs] def part_iterator(self, part):
for index, row in self.data.loc[:, (self.scorer, part)].iterrows():
yield index, self.build_part(row, part)
[docs] def build_part(self, row, name):
return Part([row['x'], row['y']], name, row['likelihood'])
[docs] def get_part_slice(self, slice_indices: list, name: str) -> np.ndarray:
return self.data.loc[slice_indices[0]:slice_indices[1] - 1, (self.scorer, name)].apply(
lambda x: self.build_part(x, name), axis=1).to_numpy()
[docs] def set_part_slice(self, slice_indices: list, name: str, data: np.ndarray) -> None:
self.data.loc[slice_indices[0]:slice_indices[1] - 1, (self.scorer, name, 'x')] = data[:, 0]
self.data.loc[slice_indices[0]:slice_indices[1] - 1, (self.scorer, name, 'y')] = data[:, 1]
self.data.loc[slice_indices[0]:slice_indices[1] - 1, (self.scorer, name, 'likelihood')] = [d.likelihood for d in
data]
[docs] def build_skeleton(self, row) -> Skeleton:
part_map = {}
likelihood_map = {}
for name in self.body_parts:
part_map[name] = [float(row[(self.scorer, name, 'x')]), float(row[(self.scorer, name, 'y')])]
likelihood_map[name] = float(row[(self.scorer, name, 'likelihood')])
behaviour = [] if pd.isna(row[(self.scorer, 'behaviour', 'name')]) else row[
(self.scorer, 'behaviour', 'name')].split(self.BEHAVIOUR_SEP)
return Skeleton(self.body_parts, part_map=part_map, likelihood_map=likelihood_map,
behaviour=behaviour,dims=self.DIMENSIONS)
def get_valid_marker(self, name, threshold=0.01):
try:
# sub_df = self.data[self.data.index.get_level_values('likelihood')>threshold]
sub_df = self.data[self.data[(self.scorer, name, 'likelihood')] >= threshold]
# sub_df = sub_df[sub_df['likelihood'] > threshold]
if len(sub_df) > 0:
return Part([sub_df[(self.scorer, name, 'x')].iloc[0], sub_df[(self.scorer, name, 'y')].iloc[0]],
name, sub_df[(self.scorer, name, 'likelihood')].iloc[0])
except Exception as e:
print(name, e)