import csv
import os
import pandas as pd
from cvkit.pose_estimation.data_readers.cvkit_datastore import CVKitDataStore3D
from cvkit.pose_estimation.data_readers.datastore_interface import DataStoreInterface, DataStoreStats
from cvkit.pose_estimation.data_readers.deeplabcut_datastore import DeeplabcutDataStore
from cvkit.pose_estimation.data_readers.flattened_datastore import FlattenedDataStore
datastore_readers = {DeeplabcutDataStore.FLAVOR: DeeplabcutDataStore,
FlattenedDataStore.FLAVOR: FlattenedDataStore, CVKitDataStore3D.FLAVOR: CVKitDataStore3D}
[docs]def initialize_datastore_reader(body_parts, path, reader_type, dimension=3) -> DataStoreInterface:
"""Detects underlying class based on reader_type. And generates appropriate :py:class:`~cvkit.pose_estimation.data_readers.datastore_interface.DataStoreInterface` subclass instance.
:param body_parts: List of body parts
:type body_parts: list[str]
:param path: Path to the data file
:type path: str
:param reader_type: A string to identify underlying data reader type. Refer to the :py:attr:`~cvkit.pose_estimation.data_readers.datastore_interface.DataStoreInterface.FLAVOR` attributes of each implementation.
:type reader_type: str
:param dimension: Dimension of the data
:type dimension: int
:return: An instance of appropriate subclass of :py:class:`~cvkit.pose_estimation.data_readers.datastore_interface.DataStoreInterface`.
:rtype: :py:class:`~cvkit.pose_estimation.data_readers.datastore_interface.DataStoreInterface`
"""
try:
reader = datastore_readers[reader_type]
return reader(body_parts, path, dimension)
except Exception as e:
raise Exception(f"Potentially incorrect reader type selected ({reader_type})\n" + str(e))
return None
[docs]def convert_data_flavor(source: DataStoreInterface, target: DataStoreInterface):
"""Converts one data flavor to another.
:param source: Source datastore instance.
:type source: :py:class:`~cvkit.pose_estimation.data_readers.datastore_interface.DataStoreInterface`.
:param target: Empty target datastore instance.
:type target: :py:class:`~cvkit.pose_estimation.data_readers.datastore_interface.DataStoreInterface`
"""
assert not os.path.exists(target.path)
writer = csv.writer(open(target.path, 'w'), delimiter=target.SEP)
writer.writerows(target.get_header_rows())
for index, skeleton in source.row_iterator():
if index % 200 == 0:
print(f'\r{index}/{len(source)}', end='')
writer.writerow(target.convert_to_list(index, skeleton))
class SequentialDatastoreBuilder:
def __init__(self, flavor, body_parts, dimension=3, buffer_size=1024):
self.data_store = initialize_datastore_reader(body_parts, None, flavor, dimension)
self.buffer_size = buffer_size
self.buffer = []
self.current_index = 0
def append(self, skeleton):
if skeleton is not None:
self.buffer.append(self.data_store.convert_to_list(self.current_index, skeleton, 0))
self.buffer[-1].append(self.data_store.BEHAVIOUR_SEP.join(skeleton.behaviour))
else:
self.buffer.append([None] * len(self.data_store.body_parts) + 1)
if len(self.buffer) >= self.buffer_size:
self._flush_buffer()
self.current_index += 1
def _flush_buffer(self):
if len(self.buffer) > 0:
self.data_store.data = pd.concat(
[self.data_store.data, pd.DataFrame(self.buffer, columns=self.data_store.data.columns)],
ignore_index=True)
self.buffer.clear()
def get_datastore(self):
self._flush_buffer()
return self.data_store
def build_empty_skeleton(self):
return self.data_store.build_empty_skeleton()