import os
import numpy as np
import pandas as pd
from cvkit.pose_estimation import Skeleton, Part
from cvkit.pose_estimation.data_readers.datastore_interface import DataStoreInterface
from cvkit.pose_estimation.utils import convert_to_numpy
[docs]class CVKitDataStore3D(DataStoreInterface):
"""
Implements a datastore reader for BU-CVKit's n-dimensional data files. Expects a csv file where all dimensions are stored in a single cell.
The header should contain a single column per keypoint.
:param body_parts: list of column names
:param path: path to data file
:param dimension: data dimension
"""
FLAVOR = "CVKit3D"
SEP = ';'
[docs] def save_file(self, path: str = None) -> None:
if path is None:
path = self.path
self.data.sort_index(inplace=True)
self.data.to_csv(path, index=False, sep=self.SEP)
[docs] def delete_part(self, index, name, force_remove=False):
if force_remove or index in self.data.index:
self.data.loc[index, name] = pd.NA
[docs] def set_behaviour(self, index, behaviour: list) -> None:
self.data.loc[index, 'behaviour'] = self.BEHAVIOUR_SEP.join(behaviour)
[docs] def get_behaviour(self, index) -> list:
if index in self.data.index and not pd.isna(self.data.loc[index, 'behaviour']):
return self.data.loc[index, 'behaviour'].split(self.BEHAVIOUR_SEP)
else:
return []
[docs] def get_part_slice(self, slice_indices: list, name: str) -> np.ndarray:
return self.data.loc[slice_indices[0]:slice_indices[1] - 1, name].map(
lambda x: self.build_part(x, name)).to_numpy()
[docs] def set_part_slice(self, slice_indices: list, name: str, data: np.ndarray) -> None:
place_holder = np.empty((data.shape[0],), dtype=object)
place_holder[:] = data.tolist()
self.data.loc[slice_indices[0]:slice_indices[1] - 1, name] = place_holder
[docs] def get_part(self, index, name) -> Part:
if index in self.data.index:
pt = convert_to_numpy(self.data.loc[index, name])
return Part(pt, name, float(not all(pt == self.MAGIC_NUMBER)))
else:
return Part([self.MAGIC_NUMBER] * self.DIMENSIONS, name, 0.0)
[docs] def set_part(self, index, part: Part) -> None:
name = part.name
self.data.loc[index, name] = str(part.tolist())
if not self.data.index.is_monotonic_increasing:
self.data.sort_index(inplace=True)
[docs] def build_skeleton(self, row) -> Skeleton:
part_map = {}
likelihood_map = {}
for name in self.body_parts:
part_map[name] = convert_to_numpy(row[name])
likelihood_map[name] = float(not all(part_map[name] == self.MAGIC_NUMBER))
behaviour = [] if pd.isna(row['behaviour']) else row['behaviour'].split(self.BEHAVIOUR_SEP)
return Skeleton(self.body_parts, part_map=part_map, likelihood_map=likelihood_map,
behaviour=behaviour)
[docs] def build_part(self, row, name):
pt = convert_to_numpy(row)
return Part(pt, name, float(not all(pt == self.MAGIC_NUMBER)))
def __init__(self, body_parts, path,dimensions=3):
super(CVKitDataStore3D, self).__init__(body_parts, path,dimensions)
self.path = path
if path is not None and os.path.exists(path):
self.data = pd.read_csv(path, sep=';')
else:
self.data = pd.DataFrame(columns=body_parts)
for part in body_parts:
if part not in self.data.columns:
self.data[part] = ""
if "behaviour" not in self.data.columns:
self.data['behaviour'] = ""
if not self.data.index.is_monotonic_increasing:
self.data.sort_index(inplace=True)
[docs] @staticmethod
def convert_to_list(index, skeleton, threshold=0.8):
return [skeleton[part].tolist() if skeleton[part] > threshold else None for part in skeleton.body_parts]