Spaces:
Sleeping
Sleeping
| # Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license | |
| from collections import deque | |
| import numpy as np | |
| from .basetrack import TrackState | |
| from .byte_tracker import BYTETracker, STrack | |
| from .utils import matching | |
| from .utils.gmc import GMC | |
| from .utils.kalman_filter import KalmanFilterXYWH | |
| class BOTrack(STrack): | |
| """ | |
| An extended version of the STrack class for YOLOv8, adding object tracking features. | |
| This class extends the STrack class to include additional functionalities for object tracking, such as feature | |
| smoothing, Kalman filter prediction, and reactivation of tracks. | |
| Attributes: | |
| shared_kalman (KalmanFilterXYWH): A shared Kalman filter for all instances of BOTrack. | |
| smooth_feat (np.ndarray): Smoothed feature vector. | |
| curr_feat (np.ndarray): Current feature vector. | |
| features (deque): A deque to store feature vectors with a maximum length defined by `feat_history`. | |
| alpha (float): Smoothing factor for the exponential moving average of features. | |
| mean (np.ndarray): The mean state of the Kalman filter. | |
| covariance (np.ndarray): The covariance matrix of the Kalman filter. | |
| Methods: | |
| update_features(feat): Update features vector and smooth it using exponential moving average. | |
| predict(): Predicts the mean and covariance using Kalman filter. | |
| re_activate(new_track, frame_id, new_id): Reactivates a track with updated features and optionally new ID. | |
| update(new_track, frame_id): Update the YOLOv8 instance with new track and frame ID. | |
| tlwh: Property that gets the current position in tlwh format `(top left x, top left y, width, height)`. | |
| multi_predict(stracks): Predicts the mean and covariance of multiple object tracks using shared Kalman filter. | |
| convert_coords(tlwh): Converts tlwh bounding box coordinates to xywh format. | |
| tlwh_to_xywh(tlwh): Convert bounding box to xywh format `(center x, center y, width, height)`. | |
| Examples: | |
| Create a BOTrack instance and update its features | |
| >>> bo_track = BOTrack(tlwh=[100, 50, 80, 40], score=0.9, cls=1, feat=np.random.rand(128)) | |
| >>> bo_track.predict() | |
| >>> new_track = BOTrack(tlwh=[110, 60, 80, 40], score=0.85, cls=1, feat=np.random.rand(128)) | |
| >>> bo_track.update(new_track, frame_id=2) | |
| """ | |
| shared_kalman = KalmanFilterXYWH() | |
| def __init__(self, tlwh, score, cls, feat=None, feat_history=50): | |
| """ | |
| Initialize a BOTrack object with temporal parameters, such as feature history, alpha, and current features. | |
| Args: | |
| tlwh (np.ndarray): Bounding box coordinates in tlwh format (top left x, top left y, width, height). | |
| score (float): Confidence score of the detection. | |
| cls (int): Class ID of the detected object. | |
| feat (np.ndarray | None): Feature vector associated with the detection. | |
| feat_history (int): Maximum length of the feature history deque. | |
| Examples: | |
| Initialize a BOTrack object with bounding box, score, class ID, and feature vector | |
| >>> tlwh = np.array([100, 50, 80, 120]) | |
| >>> score = 0.9 | |
| >>> cls = 1 | |
| >>> feat = np.random.rand(128) | |
| >>> bo_track = BOTrack(tlwh, score, cls, feat) | |
| """ | |
| super().__init__(tlwh, score, cls) | |
| self.smooth_feat = None | |
| self.curr_feat = None | |
| if feat is not None: | |
| self.update_features(feat) | |
| self.features = deque([], maxlen=feat_history) | |
| self.alpha = 0.9 | |
| def update_features(self, feat): | |
| """Update the feature vector and apply exponential moving average smoothing.""" | |
| feat /= np.linalg.norm(feat) | |
| self.curr_feat = feat | |
| if self.smooth_feat is None: | |
| self.smooth_feat = feat | |
| else: | |
| self.smooth_feat = self.alpha * self.smooth_feat + (1 - self.alpha) * feat | |
| self.features.append(feat) | |
| self.smooth_feat /= np.linalg.norm(self.smooth_feat) | |
| def predict(self): | |
| """Predicts the object's future state using the Kalman filter to update its mean and covariance.""" | |
| mean_state = self.mean.copy() | |
| if self.state != TrackState.Tracked: | |
| mean_state[6] = 0 | |
| mean_state[7] = 0 | |
| self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance) | |
| def re_activate(self, new_track, frame_id, new_id=False): | |
| """Reactivates a track with updated features and optionally assigns a new ID.""" | |
| if new_track.curr_feat is not None: | |
| self.update_features(new_track.curr_feat) | |
| super().re_activate(new_track, frame_id, new_id) | |
| def update(self, new_track, frame_id): | |
| """Updates the YOLOv8 instance with new track information and the current frame ID.""" | |
| if new_track.curr_feat is not None: | |
| self.update_features(new_track.curr_feat) | |
| super().update(new_track, frame_id) | |
| def tlwh(self): | |
| """Returns the current bounding box position in `(top left x, top left y, width, height)` format.""" | |
| if self.mean is None: | |
| return self._tlwh.copy() | |
| ret = self.mean[:4].copy() | |
| ret[:2] -= ret[2:] / 2 | |
| return ret | |
| def multi_predict(stracks): | |
| """Predicts the mean and covariance for multiple object tracks using a shared Kalman filter.""" | |
| if len(stracks) <= 0: | |
| return | |
| multi_mean = np.asarray([st.mean.copy() for st in stracks]) | |
| multi_covariance = np.asarray([st.covariance for st in stracks]) | |
| for i, st in enumerate(stracks): | |
| if st.state != TrackState.Tracked: | |
| multi_mean[i][6] = 0 | |
| multi_mean[i][7] = 0 | |
| multi_mean, multi_covariance = BOTrack.shared_kalman.multi_predict(multi_mean, multi_covariance) | |
| for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)): | |
| stracks[i].mean = mean | |
| stracks[i].covariance = cov | |
| def convert_coords(self, tlwh): | |
| """Converts tlwh bounding box coordinates to xywh format.""" | |
| return self.tlwh_to_xywh(tlwh) | |
| def tlwh_to_xywh(tlwh): | |
| """Convert bounding box from tlwh (top-left-width-height) to xywh (center-x-center-y-width-height) format.""" | |
| ret = np.asarray(tlwh).copy() | |
| ret[:2] += ret[2:] / 2 | |
| return ret | |
| class BOTSORT(BYTETracker): | |
| """ | |
| An extended version of the BYTETracker class for YOLOv8, designed for object tracking with ReID and GMC algorithm. | |
| Attributes: | |
| proximity_thresh (float): Threshold for spatial proximity (IoU) between tracks and detections. | |
| appearance_thresh (float): Threshold for appearance similarity (ReID embeddings) between tracks and detections. | |
| encoder (Any): Object to handle ReID embeddings, set to None if ReID is not enabled. | |
| gmc (GMC): An instance of the GMC algorithm for data association. | |
| args (Any): Parsed command-line arguments containing tracking parameters. | |
| Methods: | |
| get_kalmanfilter(): Returns an instance of KalmanFilterXYWH for object tracking. | |
| init_track(dets, scores, cls, img): Initialize track with detections, scores, and classes. | |
| get_dists(tracks, detections): Get distances between tracks and detections using IoU and (optionally) ReID. | |
| multi_predict(tracks): Predict and track multiple objects with YOLOv8 model. | |
| Examples: | |
| Initialize BOTSORT and process detections | |
| >>> bot_sort = BOTSORT(args, frame_rate=30) | |
| >>> bot_sort.init_track(dets, scores, cls, img) | |
| >>> bot_sort.multi_predict(tracks) | |
| Note: | |
| The class is designed to work with the YOLOv8 object detection model and supports ReID only if enabled via args. | |
| """ | |
| def __init__(self, args, frame_rate=30): | |
| """ | |
| Initialize YOLOv8 object with ReID module and GMC algorithm. | |
| Args: | |
| args (object): Parsed command-line arguments containing tracking parameters. | |
| frame_rate (int): Frame rate of the video being processed. | |
| Examples: | |
| Initialize BOTSORT with command-line arguments and a specified frame rate: | |
| >>> args = parse_args() | |
| >>> bot_sort = BOTSORT(args, frame_rate=30) | |
| """ | |
| super().__init__(args, frame_rate) | |
| # ReID module | |
| self.proximity_thresh = args.proximity_thresh | |
| self.appearance_thresh = args.appearance_thresh | |
| if args.with_reid: | |
| # Haven't supported BoT-SORT(reid) yet | |
| self.encoder = None | |
| self.gmc = GMC(method=args.gmc_method) | |
| def get_kalmanfilter(self): | |
| """Returns an instance of KalmanFilterXYWH for predicting and updating object states in the tracking process.""" | |
| return KalmanFilterXYWH() | |
| def init_track(self, dets, scores, cls, img=None): | |
| """Initialize object tracks using detection bounding boxes, scores, class labels, and optional ReID features.""" | |
| if len(dets) == 0: | |
| return [] | |
| if self.args.with_reid and self.encoder is not None: | |
| features_keep = self.encoder.inference(img, dets) | |
| return [BOTrack(xyxy, s, c, f) for (xyxy, s, c, f) in zip(dets, scores, cls, features_keep)] # detections | |
| else: | |
| return [BOTrack(xyxy, s, c) for (xyxy, s, c) in zip(dets, scores, cls)] # detections | |
| def get_dists(self, tracks, detections): | |
| """Calculates distances between tracks and detections using IoU and optionally ReID embeddings.""" | |
| dists = matching.iou_distance(tracks, detections) | |
| dists_mask = dists > self.proximity_thresh | |
| if self.args.fuse_score: | |
| dists = matching.fuse_score(dists, detections) | |
| if self.args.with_reid and self.encoder is not None: | |
| emb_dists = matching.embedding_distance(tracks, detections) / 2.0 | |
| emb_dists[emb_dists > self.appearance_thresh] = 1.0 | |
| emb_dists[dists_mask] = 1.0 | |
| dists = np.minimum(dists, emb_dists) | |
| return dists | |
| def multi_predict(self, tracks): | |
| """Predicts the mean and covariance of multiple object tracks using a shared Kalman filter.""" | |
| BOTrack.multi_predict(tracks) | |
| def reset(self): | |
| """Resets the BOTSORT tracker to its initial state, clearing all tracked objects and internal states.""" | |
| super().reset() | |
| self.gmc.reset_params() | |