import os
import numpy as np
import pandas as pd
from cvkit.pose_estimation import Skeleton, Part
from cvkit.pose_estimation.data_readers.datastore_interface import DataStoreInterface
[docs]class FlattenedDataStore(DataStoreInterface):
"""
Implements a datastore reader for flattened csv files. Expects a csv file where all dimensions are flattened.
The header should contain 3 consecutive columns per keypoint.
Ex. Snout_1,Snout_2,Snout_3 for x,y, and z values
:param body_parts: list of column names
:param path: path to data file
:param dimension: data dimension
"""
FLAVOR = "flattened"
DIMENSIONS = 3
def __init__(self, body_parts, path, dimension=3):
super(FlattenedDataStore, self).__init__(body_parts, path, dimension=dimension)
self.path = path
if path is not None and os.path.exists(path):
self.data = pd.read_csv(path, sep=',')
else:
columns = []
for part in body_parts:
columns.extend([f"{part}_{i}" for i in range(1, self.DIMENSIONS + 1)])
self.data = pd.DataFrame(columns=columns)
for part in body_parts:
for dim in range(1, self.DIMENSIONS + 1):
if f"{part}_{dim}" not in self.data.columns:
self.data[f"{part}_{dim}"] = ""
if "behaviour" not in self.data.columns:
self.data['behaviour'] = ""
if not self.data.index.is_monotonic_increasing:
self.data.sort_index(inplace=True)
[docs] def save_file(self, path: str = None) -> None:
if path is None:
path = self.path
self.data.sort_index(inplace=True)
self.data.to_csv(path, index=False, sep=self.SEP)
[docs] def delete_part(self, index, name, force_remove=False):
if force_remove or index in self.data.index:
self.data.loc[index, [f"{name}_{i}" for i in range(1, self.DIMENSIONS + 1)]] = pd.NA
[docs] def set_behaviour(self, index, behaviour: list) -> None:
self.data.loc[index, 'behaviour'] = self.BEHAVIOUR_SEP.join(behaviour)
[docs] def get_behaviour(self, index) -> list:
if index in self.data.index and not pd.isna(self.data.loc[index, 'behaviour']):
return self.data.loc[index, 'behaviour'].split(self.BEHAVIOUR_SEP)
else:
return []
[docs] def get_part_slice(self, slice_indices: list, name: str) -> np.ndarray:
return self.data.loc[slice_indices[0]:slice_indices[1] - 1,
[f"{name}_{i}" for i in range(1, self.DIMENSIONS + 1)]].apply(
lambda x: self.build_part(x, name), axis=1).to_numpy()
[docs] def set_part_slice(self, slice_indices: list, name: str, data: np.ndarray) -> None:
for i in range(1, self.DIMENSIONS + 1):
self.data.loc[slice_indices[0]:slice_indices[1] - 1, f"{name}_{i}"] = [d[i - 1] for d in data]
[docs] def get_part(self, index, name) -> Part:
if index in self.data.index:
pt = np.array([self.data.loc[index, f"{name}_{i}"] for i in range(1, self.DIMENSIONS + 1)])
if any(np.isnan(pt)):
pt = np.array([self.MAGIC_NUMBER] * self.DIMENSIONS)
return Part(pt, name, float(not all(pt == self.MAGIC_NUMBER)))
else:
return Part([self.MAGIC_NUMBER] * self.DIMENSIONS, name, 0.0)
[docs] def set_part(self, index, part: Part) -> None:
name = part.name
for i in range(1, part.shape[0] + 1):
self.data.loc[index, f"{name}_{i}"] = part[i - 1] if part[i - 1] != self.MAGIC_NUMBER else pd.NA
if not self.data.index.is_monotonic_increasing:
self.data.sort_index(inplace=True)
[docs] def build_skeleton(self, row) -> Skeleton:
part_map = {}
likelihood_map = {}
for name in self.body_parts:
part_map[name] = np.array([row[f"{name}_{i}"] for i in range(1, self.DIMENSIONS + 1)])
if any(np.isnan(part_map[name])):
part_map[name] = np.array([self.MAGIC_NUMBER] * self.DIMENSIONS)
likelihood_map[name] = float(not all(part_map[name] == self.MAGIC_NUMBER))
behaviour = [] if pd.isna(row['behaviour']) else row['behaviour'].split(self.BEHAVIOUR_SEP)
return Skeleton(self.body_parts, part_map=part_map, likelihood_map=likelihood_map,
behaviour=behaviour,
dims=self.DIMENSIONS)
[docs] def build_part(self, row, name):
pt = row.to_numpy()
if any(np.isnan(pt)):
pt = np.array([self.MAGIC_NUMBER] * self.DIMENSIONS)
return Part(pt, name, float(not all(pt == self.MAGIC_NUMBER)))
[docs] def part_iterator(self, part):
for index, row in self.data.loc[:, [f'{part}_{i + 1}' for i in range(self.DIMENSIONS)]].iterrows():
yield index, self.build_part(row, part)
[docs] @staticmethod
def convert_to_list(index, skeleton, threshold=0.8):
out = []
for part in skeleton.body_parts:
out.extend(skeleton[part].tolist() if skeleton[part] > threshold else ['', '', ''])
return out