AmatsukazeでCM抜きMP4を自動生成するメンテナンスフリーなTVサーバー構築方法(Ubuntu対応版)

※2026/1/21現在の情報になります
※スクリプトや構成のアップデートに合わせて、記事も随時アップデートしています

以前よりUbuntuでTV視聴・録画サーバー(以降、TVサーバーと表記)を設置し、CMを抜いたmp4への変換にはWindowsのAmatsukazeを利用し、バックアップ処理も含めて自動化していました。しかし、TVサーバーのために2台のPCを使うのはもったいないと思い、Ubuntu上で完結するシステムへの切り替えを検討していました。

ネット上にはAmatsukazeを使ってCM抜きmp4を生成する方法の解説記事はあるものの、TVサーバーとの連携にはWindows単体もしくはWindowsとUbuntuを2台利用したものがほとんどで、Ubuntu 1台のみで完結したシステムの記事は見つかりませんでした。

このため、TVサーバーをUbuntuで実装する方法を一から検討し、録画->CM抜き->mp4/m2tsバックアップ->不要ファイル処理を完全自動化することができました。

本記事はUbuntuで完結するTVサーバー仕様のポイントを備忘録としてまとめたものになります。

本記事のTVサーバーのシステム構成

構築したTVサーバーのシステム構成は以下のとおりです。

mp4ファイル再生用のメディアサーバーはTVサーバー上に構築することでPC一台でシステムを完結させることもできます。しかし、mp4ファイルは最終的にNASへバックアップコピーするので、録画中のDrop発生要因を減らすことも考慮し、バックアップ先のNAS上で整理・再生するようにしました。

NASへのコピーはSFTPを使います。Exportされたフォルダをマウントする形式だと電源を入れる順番によってはマウントに失敗することもあるため、コピー開始前にNAS側の異常を検知できるSFTPを使います。

m2tsの保存先については、LAN経由でのコピーは時間がかかる上に、保存後にアクセスすることもないことから、NASへのバックアップではなくPCに直接接続したHDDを保存先にしています。

TVサーバー構築用ソフトウェア

TVサーバーを構築するソフトは、録画用に「EPGStation」、リアルタイム視聴・m2ts再生用に「KonomiTV」を利用します。EPGStationは録画した番組をm2ts形式で保存し、KonomiTVでm2tsファイルを直接再生して視聴できます。

録画用ソフトにEPGStationを使う理由は、RaspberryPi上でも動作するためです。TVサーバーが何らかのトラブルで録画に失敗する場合に備えて、RaspberryPiで録画バックアップサーバーを動かしているため、メインで利用するPC上の録画ソフトがEPGStationであれば録画ルール(時刻・条件・録画ファイル格納ディレクトリ)のデータの同期ができ、運用コストを下げることができます。

このような縛りがなければ、KonomiTVが録画予約までサポートしているECDBを使ってもいいです。

TVサーバー構築用OS

録画用ソフトにECDBを使えばWindowsでTVサーバーを構築することはできますが、セキュリティアップデートでアプリが起動しなくなる、録画途中に負荷がかかる、再起動が発生する、突発的なブルースクリーンが発生するなどが原因で録画が失敗するリスクがあります。

Ubuntuは自動アップデートなどの自動処理は行なわず、OSとしても安定しており、コンソールを使ってファイル操作やログのチェックができることから、UbuntuでTVサーバーを構築しています。

m2tsファイルのバックアップの必要性について

録画した番組を保存したければ、m2tsファイルをほかの媒体にコピーすればよいです。しかし、m2tsファイルは下記の点で保存には不向きです。

  • 100時間で約76GBとなり、1年でTB単位でトータル保存ファイルサイズが増加するため、ストレージコストが高くつく
  • スマホやタブレットへの保存(オフライン視聴)時に保存できる番組数が少なくなる
  • 再生できるプレイヤー・メディアサーバーが少ない
  • CMなど不要な部分が含まれているため、流し見に向いていない

これらは、Amatsukazeを利用しCMを抜いたmp4ファイルに変換し、これを保存することで解決できます。mp4に変換するとファイルサイズは10-30%程度まで縮小できます。

ただし、mp4に変換するとm2tsに記録されているニコニコ動画のコメントも消えてしまうため、これらを残したい場合はm2tsも残す必要があります。またCMカットをやり直すために再度エンコードする可能性もあるので、m2tsファイルは最終的には消す予定でも、しばらくの間は保持していたほうが安心です。

メンテナンスフリーのための自動処理の要件

録画したファイルを整理する際には、下記の作業を行います。

  • SSD/HDD/NASの残容量確認と不要ファイルの削除
  • CM抜きmp4ファイルへ変換
  • NAS,HDDへのコピー

番組を録画したら上記処理を自動で行う手順を考える必要があります。変換、転送処理は簡単ですが、ファイル・フォルダの整理と不要ファイルの削除については、削除ルールを策定しパラメータで処理をコントロールできるようにしなければいけません。

不要ファイルの削除

EPGStationで録画したm2tsファイルの削除は、EGPStationのデータベースとの不整合を防ぐためEPGstationの自動削除機能にまかせたほうが安全です。

保存が必要なm2tsファイルは、自動削除処理が実行される前にHDDへコピーして消去されないようにします。そして、コピー先のディレクトリサイズが指定容量を超過した場合は、指定容量以内に収まるまで更新日時の古いファイルから順番に自動で削除する形で容量オーバーを回避します。ファイルの削除可否はフォルダごとに自動削除対象かどうかを定義します。

各録画ファイルへ適用する削除ルールは、EPGStationの録画予約時の保存先ディレクトリ指定機能を利用します。ファイル削除ルールごとにディレクトリを以下の4つに分け、録画時にこれらのディレクトリを指定して録画ファイルを自動で分類します。

  • no_conversion: CMカットも変換も不要な番組(ニュースなど)
  • delete: CMカット後に一定期間保持する番組(バラエティなど)
  • delete_after_watch: CMカット後に視聴するまで自動消去しない番組(連ドラなど)
  • keep: CMカット後も再エンコードに備えてm2tsを残したい番組(ドキュメンタリー・映画など)

NAS上のファイルについてもm2tsファイルと同様のパラメータで削除ルールを作り、コピー開始前にルールに従って削除します。

ファイル削除によって空ディレクトリもできるため、HDDコピー時に空ディレクトリも削除する処理を実行するようにします。

mp4ファイルへの変換

CMカット機能があるAmatsukazeを使用します。AmatsukazeはUbuntuに対応しているため、Ubuntu上で動作させます。

変換したmp4ファイルを格納するディレクトリはm2tsファイルと同じ構造を維持するようにし、管理の手間を省きます。ここでは/mnt/converted_filesに保存するものとします。

NAS・HDDへコピー

Pythonスクリプトで以下のように処理をします。

  • m2tsファイル:直接接続された外付けHDDへコピー
  • mp4ファイル: sftp経由でNASへアップロード

一度変換・コピーしたファイルは繰り返し同じ処理が走らないようにデータベースに記録し管理します。

ファイルをコピーする際には録画ファイルのディレクトリ構造を維持してコピーし、管理の手間を省きます。

TVサーバー構築

TVサーバーのPCスペックについて

録画・再生だけであればN100等の低スペックPCでも可能ですが、Amatsukazeでの変換を裏で同時に行う場合はCPU性能が必要になります。

Core i9のミニPCをTVサーバーにして運用した場合は、変換中にDropは発生しておらず、1時間番組の場合ですと録画終了からNASへのアップロード完了までは10分程度で終わります。

N100の場合は同時録画をしている最中に変換処理をしているとDropが発生することがあることを確認しています。この場合、変換処理は深夜などの無録画時間帯で実行することで回避可能です。

Ubuntuのインストール

インストールするPCによりインストール方法が変わるため、ここでの説明は割愛します。以下の記事などを参照してください。

EPGStation、KonomiTVのセットアップ

EPGStationとKonomiTVをUbuntuにインストールする方法は以下の記事にまとめていますので、こちらを参考にしてください。

ECDBは下記記事を参考にするとインストールできます。

【2023年10月】Ubuntu + Mirakurun + EDCB-Wine + KonomiTV (px4_drv + recisdb + ISDBScanner) でパパッと Linux 録画鯖構築の手引き

Ubuntu上で動作するAmatsukazeをセットアップする

Amatsukazeのインストールや設定は環境に大きく依存するので、マニュアルを参考にしてインストールしてください。

Amatsukazeのセットアップ後、最終的に以下のコマンドで変換キューに追加し、出力先にmp4ファイルができていればよいです。

$ /home/ユーザー名/Amatsukaze/exe_files/AmatsukazeAddTask -f <m2tsファイル> -o <出力先ディレクトリ> -ip localhost -s <プロファイル名> <br>

自動処理用Pythonスクリプト

下記が上記要件を取り入れたPythonスクリプトになります。

スクリプトを1ファイルに収めたので、処理内容の確認やカスタマイズはClaude、Copilotなどを利用すると簡単です。

環境依存パラメータについては、ファイル先頭に定義されているConfigクラスにまとめていますので、環境に応じて変更してください。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TV録画統合管理スクリプト (Integrated TV Recorder Manager)
【構成】
  - Config: 設定管理
  - StateManager: 状態管理 (INI)
  - DiskOperations (Abstract): ディスク操作の抽象化
    - LocalDiskOperations: ローカル操作
    - RemoteDiskOperations: SFTP操作
  - Cleaner: ディスクの掃除・容量管理ロジック
  - Pipelines:
    - M2tsConverterPipeline (Phase 1)
    - Mp4UploadPipeline (Phase 2)
    - M2tsBackupPipeline (Phase 3)
  - Logger: 条件付きバッファリングロガー
"""
import configparser
import shutil
import subprocess
import logging
import logging.handlers
import sys
import unicodedata
import fcntl
import posixpath
import stat
import re
import time
from abc import ABC, abstractmethod
from pathlib import Path
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import List, Dict, Generator, Optional, Tuple, Any, Union
try:
    import paramiko
except ImportError:
    print("【エラー】'paramiko' ライブラリが必要です: pip install paramiko", file=sys.stderr)
    sys.exit(1)
# =========================================================
# 0. データ構造 & ユーティリティ
# =========================================================
@dataclass
class FileEntry:
    path: Union[Path, str]
    name: str
    size: int
    mtime: datetime
def format_bytes(size: int) -> str:
    for unit in ["Byte", "KB", "MB"]:
        if size < 1024:
            return f"{size:.2f}{unit}" if unit != "Byte" else f"{size}{unit}"
        size /= 1024
    return f"{size:.2f}GB"
def normalize_str(s: str) -> str:
    return unicodedata.normalize('NFC', s) if isinstance(s, str) else s
# =========================================================
# 1. 設定管理 (Configuration)
# =========================================================
@dataclass
class Config:
    # --- パス設定 ---
    source_dir_m2ts: Path = Path("/mnt/tv-recorder/recorded_files")
    dest_dir_hdd: Path = Path("/mnt/hdd/m2ts")
    converted_dir: Path = Path("/mnt/converted_files")
    
    ini_file: Path = Path("/home/tv-recorder/Scripts/status_TvRecorder.ini")
    log_file: Path = Path("/home/tv-recorder/Scripts/process_TvRecorder.log")
    lock_file: Path = Path("/tmp/TvRecorder.lock")
    # --- 動作設定 ---
    dry_run: bool = False
    write_log: bool = False
    scan_threshold_sec: int = 10
    m2ts_delete_days: int = 30
    # --- HDDコピー実行許可時間帯 ---
    copy_window_start_hour: int = 2
    # --- NAS接続設定 ---
    nas_config: Dict = field(default_factory=lambda: {
        'host': '192.168.1.3',
        'port': 22,
        'user': 'tv-recorder',
        'password': '********',
        'key_file': None,
        'dest_dir': '/tv_program/converted_files'
    })
    # --- 除外設定 ---
    skip_folders_m2ts: List[str] = field(default_factory=lambda: ['no_conversion'])
    hdd_exclude_dirs: List[str] = field(default_factory=lambda: ['keep'])
    converted_exclude_dirs: List[str] = field(default_factory=list)
    nas_exclude_dirs: List[str] = field(default_factory=lambda: ['/keep/', '/delete_after_watch/'])
    # --- 容量上限設定 (GB) ---
    max_size_hdd_gb: float = 4500.0
    max_size_converted_gb: float = 210.0
    max_size_nas_gb: float = 4000.0
    # --- 保持ポリシー ---
    retention_policies_hdd: List[Dict] = field(default_factory=lambda: [
        {"dir": "delete", "days": 182, "limit_gb": 1000},
        {"dir": "delete_after_watch", "days": 365, "limit_gb": 1500}
    ])
    retention_policies_nas: List[Dict] = field(default_factory=lambda: [
        {"dir": "delete", "days": 365, "limit_gb": 1500},
    ])
    # --- Amatsukaze設定 ---
    amatsukaze_cmd: str = "/home/tv-recorder/Amatsukaze/Amatsukaze/exe_files/AmatsukazeAddTask"
    amatsukaze_ip: str = "localhost"
    amatsukaze_service: str = "QsvEnc"
    @property
    def is_hdd_copy_time_window(self) -> bool:
        return datetime.now().hour == self.copy_window_start_hour
# =========================================================
# 2. ロガー & 状態管理 & 排他制御
# =========================================================
class ConditionalBufferHandler(logging.Handler):
    def __init__(self, log_file: Path):
        super().__init__()
        self.buffer = []
        self.passthrough = False
        
        if not log_file.parent.exists():
            log_file.parent.mkdir(parents=True, exist_ok=True)
            
        self.stream_handler = logging.StreamHandler(sys.stdout)
        self.file_handler = logging.FileHandler(log_file, encoding='utf-8')
    def setFormatter(self, fmt):
        super().setFormatter(fmt)
        self.stream_handler.setFormatter(fmt)
        self.file_handler.setFormatter(fmt)
    def emit(self, record):
        if getattr(self, 'passthrough', False):
            self.stream_handler.handle(record)
            self.file_handler.handle(record)
        else:
            if not hasattr(self, 'buffer'):
                self.buffer = []
            self.buffer.append(record)
    def flush_all(self, should_write: bool):
        if should_write:
            self.passthrough = True
            if hasattr(self, 'buffer'):
                for record in self.buffer:
                    self.stream_handler.handle(record)
                    self.file_handler.handle(record)
                self.stream_handler.flush()
                self.file_handler.flush()
        self.buffer = []
    def close(self):
        self.stream_handler.close()
        self.file_handler.close()
        super().close()
def setup_logger(log_path: Path) -> ConditionalBufferHandler:
    root = logging.getLogger()
    if root.hasHandlers(): root.handlers.clear()
    root.setLevel(logging.INFO)
    fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    handler = ConditionalBufferHandler(log_path)
    handler.setFormatter(fmt)
    root.addHandler(handler)
    return handler
def activate_realtime_log():
    root = logging.getLogger()
    for h in root.handlers:
        if isinstance(h, ConditionalBufferHandler):
            h.flush_all(True)
def acquire_lock(lock_path: Path):
    try:
        lock_fd = open(lock_path, 'w')
        fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
        return lock_fd
    except IOError:
        return None
class StateManager:
    SECTION_COPY_HDD = 'CopyHDD'
    SECTION_CONVERT = 'Convert'
    SECTION_UPLOAD_NAS = 'UploadNAS'
    def __init__(self, ini_path: Path):
        self.ini_path = ini_path
        self.config = configparser.ConfigParser()
        self.config.optionxform = str
        self._load()
    def _load(self):
        if self.ini_path.exists():
            try:
                self.config.read(self.ini_path, encoding='utf-8')
            except Exception: pass
        for sec in [self.SECTION_COPY_HDD, self.SECTION_CONVERT, self.SECTION_UPLOAD_NAS]:
            if sec not in self.config: self.config[sec] = {}
    def is_recorded(self, section: str, file_name: str, file_size: int) -> bool:
        if section not in self.config: return False
        norm_name = normalize_str(file_name)
        sec_data = {normalize_str(k): v for k, v in self.config[section].items()}
        return sec_data.get(norm_name) == str(file_size)
    def is_key_exists(self, section: str, file_name: str) -> bool:
        if section not in self.config: return False
        norm_name = normalize_str(file_name)
        return norm_name in {normalize_str(k) for k in self.config[section].keys()}
    def update_entry(self, section: str, file_name: str, file_size: int, dry_run: bool = False):
        if dry_run:
            logging.info(f"  [DryRun] INI更新 [{section}]: {file_name}")
            return
        try:
            sec_data = self.config[section]
            if file_name not in sec_data:
                items = list(sec_data.items())
                sec_data.clear()
                sec_data[file_name] = str(file_size)
                for k, v in items: sec_data[k] = v
            else:
                sec_data[file_name] = str(file_size)
            
            with open(self.ini_path, 'w', encoding='utf-8') as f:
                self.config.write(f)
        except Exception as e:
            logging.error(f"INI Update Failed: {e}")
# =========================================================
# 3. ディスク操作の抽象化 (Disk Operations & Cleaner)
# =========================================================
class DiskOperations(ABC):
    @abstractmethod
    def list_files_recursive(self, root_dir: Union[Path, str], exclude_dirs: List[str] = None) -> Generator[FileEntry, None, None]:
        pass
    @abstractmethod
    def delete_file(self, path: Union[Path, str]) -> bool:
        pass
    @abstractmethod
    def remove_empty_dir(self, path: Union[Path, str]) -> bool:
        pass
    @abstractmethod
    def exists(self, path: Union[Path, str]) -> bool:
        pass
class LocalDiskOperations(DiskOperations):
    def __init__(self, dry_run: bool):
        self.dry_run = dry_run
    def list_files_recursive(self, root_dir: Path, exclude_dirs: List[str] = None) -> Generator[FileEntry, None, None]:
        exclude_dirs = exclude_dirs or []
        excludes = [root_dir / d for d in exclude_dirs]
        
        for p in root_dir.rglob('*'):
            if p.is_file():
                if any(ex in p.parents for ex in excludes): continue
                yield FileEntry(
                    path=p, name=p.name, size=p.stat().st_size,
                    mtime=datetime.fromtimestamp(p.stat().st_mtime)
                )
    def delete_file(self, path: Path) -> bool:
        size_str = "Unknown"
        try:
            size_str = format_bytes(path.stat().st_size)
        except Exception: pass
        if self.dry_run:
            logging.info(f"    [DryRun] 削除: {path.name} ({size_str})")
            return True
        try:
            path.unlink()
            logging.info(f"    削除: {path.name} ({size_str})")
            return True
        except Exception: return False
    def remove_empty_dir(self, path: Path) -> bool:
        if self.dry_run: return True
        try:
            path.rmdir()
            return True
        except OSError: return False
    def exists(self, path: Path) -> bool:
        return path.exists()
class RemoteDiskOperations(DiskOperations):
    def __init__(self, sftp, dry_run: bool):
        self.sftp = sftp
        self.dry_run = dry_run
    def list_files_recursive(self, root_dir: str, exclude_dirs: List[str] = None) -> Generator[FileEntry, None, None]:
        exclude_markers = exclude_dirs or []
        try:
            for path, attr, is_dir in self._sftp_walk(root_dir):
                if not is_dir:
                    if any(m in path for m in exclude_markers): continue
                    yield FileEntry(
                        path=path, name=posixpath.basename(path), size=attr.st_size,
                        mtime=datetime.fromtimestamp(attr.st_mtime)
                    )
        except Exception: pass
    def delete_file(self, path: str) -> bool:
        size_str = "Unknown"
        try:
            size_str = format_bytes(self.sftp.stat(path).st_size)
        except Exception: pass
        if self.dry_run:
            logging.info(f"    [DryRun] 削除(NAS): {posixpath.basename(path)} ({size_str})")
            return True
        try:
            self.sftp.remove(path)
            logging.info(f"    削除(NAS): {posixpath.basename(path)} ({size_str})")
            
            if path.endswith('.mp4'):
                ass = str(Path(path).with_suffix('.ass'))
                if self.exists(ass):
                    self.sftp.remove(ass)
            return True
        except Exception: return False
    def remove_empty_dir(self, path: str) -> bool:
        if self.dry_run: return True
        try:
            self.sftp.rmdir(path)
            return True
        except Exception: return False
    def exists(self, path: str) -> bool:
        try:
            self.sftp.stat(path)
            return True
        except FileNotFoundError: return False
    def _sftp_walk(self, remote_path):
        try:
            for attr in self.sftp.listdir_attr(remote_path):
                full_path = posixpath.join(remote_path, attr.filename)
                if stat.S_ISDIR(attr.st_mode):
                    yield from self._sftp_walk(full_path)
                    yield (full_path, attr, True)
                else:
                    yield (full_path, attr, False)
        except Exception: pass
class Cleaner:
    def __init__(self, config: Config, ops: DiskOperations, label: str):
        self.cfg = config
        self.ops = ops
        self.label = label
    def enforce_size_limit(self, target_dir: Union[Path, str], limit_gb: float, exclude_dirs: List[str]):
        if not self.ops.exists(target_dir): return
        all_files = list(self.ops.list_files_recursive(target_dir, exclude_dirs=[]))
        total_size = sum(f.size for f in all_files)
        
        logging.info(f"  {self.label} 現在の全体容量 [{target_dir}]: {format_bytes(total_size)}")
        deletable_files = []
        for f in all_files:
            is_excluded = False
            if isinstance(target_dir, Path):
                if exclude_dirs and any(ex in f.path.parts for ex in exclude_dirs):
                    is_excluded = True
            else:
                if exclude_dirs and any(ex in f.path for ex in exclude_dirs):
                    is_excluded = True
            
            if not is_excluded:
                deletable_files.append(f)
        deletable_files.sort(key=lambda x: x.mtime)
        
        limit_bytes = limit_gb * (1024**3)
        delete_list = []
        
        if total_size > limit_bytes:
            for f in deletable_files:
                delete_list.append(f)
                total_size -= f.size
                if total_size <= limit_bytes: break
        
        if delete_list:
            self.cfg.write_log = True
            activate_realtime_log()
            logging.info(f"  {self.label} 全体容量制限チェック [{target_dir}]: 上限 {limit_gb}GB -> {len(delete_list)}ファイル削除")
            for f in delete_list:
                self.ops.delete_file(f.path)
    def apply_retention_policy(self, parent_dir: Union[Path, str], dir_name: str, days: int, limit_gb: int):
        if isinstance(parent_dir, Path):
            target = parent_dir / dir_name
        else:
            target = posixpath.join(parent_dir, dir_name)
        if not self.ops.exists(target): return
        files = list(self.ops.list_files_recursive(target))
        total_size = sum(f.size for f in files)
        logging.info(f"  {self.label} 現在の容量 [{target}]: {format_bytes(total_size)}")
        cutoff = datetime.now() - timedelta(days=days)
        delete_list = [f for f in files if f.mtime < cutoff]
        keep_list = [f for f in files if f.mtime >= cutoff]
        keep_list.sort(key=lambda x: x.mtime)
        limit_bytes = limit_gb * (1024**3)
        current_keep_size = sum(f.size for f in keep_list)
        while current_keep_size > limit_bytes and keep_list:
            target_f = keep_list.pop(0)
            delete_list.append(target_f)
            current_keep_size -= target_f.size
        if delete_list:
            self.cfg.write_log = True
            activate_realtime_log()
            logging.info(f"  {self.label} ポリシー適用 [{dir_name}]: {len(delete_list)}ファイル削除")
            for f in delete_list:
                self.ops.delete_file(f.path)
    def delete_old_files_by_pattern(self, target_dir: Union[Path, str], days: int, pattern: str):
        if not self.ops.exists(target_dir): return
        
        logging.info(f"  {self.label} 古いファイルの削除チェック: {target_dir} ({days}日以上, {pattern})")
        
        all_files = self.ops.list_files_recursive(target_dir)
        cutoff = datetime.now() - timedelta(days=days)
        
        count = 0
        for f in all_files:
            if isinstance(target_dir, Path):
                if not f.path.match(pattern): continue
            else:
                if not f.name.endswith('.m2ts'): continue
            if f.mtime < cutoff:
                self.cfg.write_log = True
                activate_realtime_log()
                self.ops.delete_file(f.path)
                count += 1
        
        if count == 0:
            logging.info("    -> 削除対象なし")
        else:
            logging.info(f"    -> {count}ファイル削除")
# =========================================================
# 4. パイプライン基底クラス
# =========================================================
class BasePipeline(ABC):
    def __init__(self, config: Config, state: StateManager):
        self.cfg = config
        self.state = state
        self.ops = LocalDiskOperations(config.dry_run)
        self.cleaner = Cleaner(config, self.ops, "[Local]")
    @abstractmethod
    def run(self):
        pass
    def _cleanup_empty_dirs_local(self, root: Path, excludes: List[str] = None):
        if not root.exists(): return
        excludes = excludes or []
        deleted_count = 0
        all_dirs = sorted([p for p in root.rglob('*') if p.is_dir()], key=lambda p: len(p.parts), reverse=True)
        exclude_paths = [root / e for e in excludes]
        for d in all_dirs:
            if d in exclude_paths or d == root: continue
            try:
                if not any(d.iterdir()):
                    if self.ops.remove_empty_dir(d):
                        logging.info(f"  [Local] 空ディレクトリ削除: {d}")
                        deleted_count += 1
            except OSError: pass
        if deleted_count > 0:
            self.cfg.write_log = True
            activate_realtime_log()
# =========================================================
# 5. 各フェーズの実装
# =========================================================
class M2tsConverterPipeline(BasePipeline):
    """Phase 1: M2TS -> MP4 変換"""
    
    def run(self):
        tasks = list(self._scan())
        self._cleanup()
        if tasks:
            self.cfg.write_log = True
            activate_realtime_log()
            logging.info("=== Phase 1: M2TS変換 (Converter) ===")
            logging.info(f"変換対象数: {len(tasks)}")
            for task in tasks:
                self._process(task)
    def _scan(self):
        if not self.cfg.source_dir_m2ts.exists(): return
        threshold = datetime.now() - timedelta(seconds=self.cfg.scan_threshold_sec)
        
        for p in self.cfg.source_dir_m2ts.rglob('*.m2ts'):
            if not p.is_file(): continue
            try:
                rel = p.relative_to(self.cfg.source_dir_m2ts)
                if len(rel.parts) > 1 and rel.parts[0] in self.cfg.skip_folders_m2ts: continue
            except ValueError: pass
            if datetime.fromtimestamp(p.stat().st_mtime) >= threshold: continue
            if self.state.is_recorded(StateManager.SECTION_CONVERT, p.name, p.stat().st_size):
                continue
                
            yield {'src': p, 'rel': rel, 'size': p.stat().st_size}
    def _cleanup(self):
        self.cleaner.enforce_size_limit(self.cfg.converted_dir, self.cfg.max_size_converted_gb, self.cfg.converted_exclude_dirs)
        self._cleanup_empty_dirs_local(self.cfg.converted_dir)
    def _process(self, task):
        src = task['src']
        logging.info(f"[Converter] 登録: {src.name} ({format_bytes(task['size'])})")
        out_dir = self.cfg.converted_dir / task['rel'].parent
        
        if self._prepare_out_dir(out_dir, src.stem) and self._exec_amatsukaze(src, out_dir):
            log_file = out_dir / f"{src.stem}-enc.log"
            try:
                if not self.cfg.dry_run:
                    log_file.touch()
                logging.info(f"  -> エンコードログファイル作成: {log_file.name}")
            except Exception as e:
                logging.warning(f"  -> エンコードログファイル作成失敗: {e}")
            self.state.update_entry(StateManager.SECTION_CONVERT, src.name, task['size'], self.cfg.dry_run)
    def _prepare_out_dir(self, out_dir, stem):
        if self.cfg.dry_run: return True
        try:
            out_dir.mkdir(parents=True, exist_ok=True)
            norm = normalize_str(stem)
            for p in out_dir.iterdir():
                if p.is_file() and normalize_str(p.stem).startswith(norm): p.unlink()
            return True
        except Exception: return False
    def _exec_amatsukaze(self, src, out_dir):
        cmd = [self.cfg.amatsukaze_cmd, "-ip", self.cfg.amatsukaze_ip,
               "-s", self.cfg.amatsukaze_service, "-o", str(out_dir), "-f", str(src)]
        if self.cfg.dry_run:
            logging.info(f"  [DryRun] CMD: {cmd}")
            return True
        try:
            subprocess.run(cmd, capture_output=True, text=True, check=True)
            return True
        except Exception as e:
            logging.error(f"  -> コマンド失敗: {e}")
            return False
class Mp4UploadPipeline(BasePipeline):
    """Phase 2: MP4 -> NAS 転送"""
    def __init__(self, config: Config, state: StateManager):
        super().__init__(config, state)
        self.sftp = None
        self.transport = None
    def run(self):
        if not self.cfg.converted_dir.exists(): return
        
        candidates = list(self._scan_candidates())
        if not candidates: return
        self.cfg.write_log = True
        activate_realtime_log()
        logging.info("=== Phase 2: MP4転送 (Upload) ===")
        
        self.sftp, self.transport = self._connect_sftp()
        if not self.sftp: return
        self.ops = RemoteDiskOperations(self.sftp, self.cfg.dry_run)
        self.cleaner = Cleaner(self.cfg, self.ops, "[NAS]")
        try:
            self._cleanup()
            
            processed = 0
            for task in candidates:
                if self._check_remote_status(task):
                    self._process(task)
                    processed += 1
            
            if processed > 0:
                logging.info(f"MP4転送完了数: {processed}")
        finally:
            if self.transport: self.transport.close()
    def _connect_sftp(self):
        c = self.cfg.nas_config
        try:
            t = paramiko.Transport((c['host'], c['port']))
            if c['key_file']:
                k = paramiko.RSAKey.from_private_key_file(c['key_file'])
                t.connect(username=c['user'], pkey=k)
            else:
                t.connect(username=c['user'], password=c['password'])
            return paramiko.SFTPClient.from_transport(t), t
        except Exception: return None, None
    def _scan_candidates(self) -> Generator[Dict, None, None]:
        dest_root = self.cfg.nas_config['dest_dir']
        threshold = datetime.now() - timedelta(seconds=self.cfg.scan_threshold_sec)
        exclude_ptn = re.compile(r'-\d+$')
        for p in self.cfg.converted_dir.rglob('*.mp4'):
            if not p.is_file(): continue
            if exclude_ptn.search(p.stem): continue
            if datetime.fromtimestamp(p.stat().st_mtime) >= threshold: continue
            size = p.stat().st_size
            if self.state.is_recorded(StateManager.SECTION_UPLOAD_NAS, p.name, size): continue
            rel = p.relative_to(self.cfg.converted_dir).as_posix()
            dest = posixpath.join(dest_root, rel)
            
            yield {'src': p, 'dest': dest, 'name': p.name, 'size': size, 'type': 'mp4'}
            ass = p.with_suffix('.ass')
            if ass.exists():
                ass_dest = posixpath.join(dest_root, ass.relative_to(self.cfg.converted_dir).as_posix())
                yield {'src': ass, 'dest': ass_dest, 'size': ass.stat().st_size, 'type': 'ass'}
    def _check_remote_status(self, task) -> bool:
        try:
            r_size = self.sftp.stat(task['dest']).st_size
            if task['size'] == r_size: return False
            task['reason'] = "サイズ変更(NAS既存)"
            return True
        except FileNotFoundError:
            task['reason'] = "新規"
            return True
    def _cleanup(self):
        root = self.cfg.nas_config['dest_dir']
        nas_excludes = self.cfg.nas_exclude_dirs.copy()
        
        for pol in self.cfg.retention_policies_nas:
            self.cleaner.apply_retention_policy(root, pol['dir'], pol['days'], pol['limit_gb'])
            nas_excludes.append(f"/{pol['dir']}/")
            
        self.cleaner.enforce_size_limit(root, self.cfg.max_size_nas_gb, nas_excludes)
        self._cleanup_empty_dirs_remote(root)
    def _cleanup_empty_dirs_remote(self, root: str):
        dirs = []
        try:
            for path, _, is_dir in self.ops._sftp_walk(root):
                if is_dir: dirs.append(path)
        except Exception: return
        dirs.sort(key=lambda s: len(s), reverse=True)
        norm_root = root.rstrip('/')
        
        count = 0
        for d in dirs:
            if posixpath.dirname(d.rstrip('/')) == norm_root: continue
            if self.ops.remove_empty_dir(d):
                logging.info(f"  [NAS] 空ディレクトリ削除: {d}")
                count += 1
        if count > 0:
            self.cfg.write_log = True
            activate_realtime_log()
    def _process(self, task):
        dest_dir = posixpath.dirname(task['dest'])
        logging.info(f"[NAS] Upload ({task['reason']}): {task['src'].name} -> {dest_dir}")
        if self._upload(task['src'], task['dest']):
            if task['type'] == 'mp4':
                self.state.update_entry(StateManager.SECTION_UPLOAD_NAS, task['name'], task['size'], self.cfg.dry_run)
    def _upload(self, src, dest):
        if self.cfg.dry_run:
            logging.info("  [DryRun] Uploaded.")
            return True
        try:
            self._mkdir_p(posixpath.dirname(dest))
            
            size_str = format_bytes(src.stat().st_size)
            logging.info(f"  -> Upload開始 ({size_str})")
            start = time.time()
            self.sftp.put(str(src), dest)
            dur = time.time() - start
            logging.info(f"  -> Upload完了 (所要時間: {dur:.1f}秒)")
            return True
        except Exception as e:
            logging.error(f"  -> Upload失敗: {e}")
            return False
    def _mkdir_p(self, remote_dir):
        if remote_dir in ['/', '.']: return
        try: self.sftp.stat(remote_dir)
        except FileNotFoundError:
            self._mkdir_p(posixpath.dirname(remote_dir))
            try: self.sftp.mkdir(remote_dir)
            except OSError: pass
class M2tsBackupPipeline(BasePipeline):
    """Phase 3: M2TS -> HDD バックアップ"""
    def run(self):
        if not self.cfg.is_hdd_copy_time_window and not self.cfg.dry_run: return
        tasks = list(self._scan())
        self._cleanup()
        if tasks:
            self.cfg.write_log = True
            activate_realtime_log()
            
            dest_path = self.cfg.dest_dir_hdd
            pre_size_str = self._get_dir_size_str(dest_path)
            logging.info("=== Phase 3: M2TSバックアップ (Backup) ===")
            logging.info(f"コピー対象数: {len(tasks)} (現在のディレクトリサイズ [{dest_path}]: {pre_size_str})")
            for task in tasks:
                self._process(task)
            
            self.cleaner.delete_old_files_by_pattern(self.cfg.source_dir_m2ts, self.cfg.m2ts_delete_days, "*.m2ts")
            
            post_size_str = self._get_dir_size_str(dest_path)
            logging.info(f"バックアップ完了後のディレクトリサイズ [{dest_path}]: {post_size_str}")
    def _get_dir_size_str(self, target_path: Path) -> str:
        if not target_path.exists():
            return format_bytes(0)
        all_files = self.ops.list_files_recursive(target_path, exclude_dirs=[])
        return format_bytes(sum(f.size for f in all_files))
    def _scan(self):
        if not self.cfg.source_dir_m2ts.exists(): return
        threshold = datetime.now() - timedelta(seconds=self.cfg.scan_threshold_sec)
        for p in self.cfg.source_dir_m2ts.rglob('*.m2ts'):
            if not p.is_file(): continue
            try:
                rel = p.relative_to(self.cfg.source_dir_m2ts)
                if len(rel.parts) > 1 and rel.parts[0] in self.cfg.skip_folders_m2ts: continue
            except ValueError: pass
            if datetime.fromtimestamp(p.stat().st_mtime) >= threshold: continue
            size = p.stat().st_size
            dest = self.cfg.dest_dir_hdd / rel
            if self.state.is_recorded(StateManager.SECTION_COPY_HDD, p.name, size):
                continue
            
            if dest.exists() and dest.stat().st_size == size: continue
            
            yield {'src': p, 'dest': dest, 'size': size}
    def _cleanup(self):
        for pol in self.cfg.retention_policies_hdd:
            self.cleaner.apply_retention_policy(self.cfg.dest_dir_hdd, pol["dir"], pol["days"], pol["limit_gb"])
        self.cleaner.enforce_size_limit(self.cfg.dest_dir_hdd, self.cfg.max_size_hdd_gb, self.cfg.hdd_exclude_dirs)
        
        self._cleanup_empty_dirs_local(self.cfg.source_dir_m2ts)
        self._cleanup_empty_dirs_local(self.cfg.dest_dir_hdd)
    def _process(self, task):
        src, dest = task['src'], task['dest']
        logging.info(f"[Backup] コピー: {src.name} ({format_bytes(task['size'])})")
        
        if self._copy(src, dest):
            self.state.update_entry(StateManager.SECTION_COPY_HDD, src.name, task['size'], self.cfg.dry_run)
    def _copy(self, src, dest):
        if self.cfg.dry_run:
            logging.info(f"  [DryRun] Copy to: {dest}")
            return True
        try:
            dest.parent.mkdir(parents=True, exist_ok=True)
            if dest.exists(): dest.unlink()
            
            file_size = src.stat().st_size
            free_space = shutil.disk_usage(dest.parent).free
            if free_space < file_size:
                logging.error(f"  -> HDD容量不足 (空き: {format_bytes(free_space)}, 必要: {format_bytes(file_size)})")
                return False
            
            logging.info(f"  -> Copy開始 ({format_bytes(file_size)})")
            start = time.time()
            shutil.copy2(src, dest)
            dur = time.time() - start
            logging.info(f"  -> Copy完了 (所要時間: {dur:.1f}秒)")
            return True
        except Exception as e:
            logging.error(f"  -> Copy失敗: {e}")
            return False
# =========================================================
# メイン処理
# =========================================================
def main():
    cfg = Config()
    
    lock_fd = acquire_lock(cfg.lock_file)
    if lock_fd is None: return
    logger = setup_logger(cfg.log_file)
    try:
        state = StateManager(cfg.ini_file)
        mode = "DryRun" if cfg.dry_run else "Production"
        
        logging.info(f"=== TV Recorder Manager Start [{mode}] ===")
        M2tsConverterPipeline(cfg, state).run()
        Mp4UploadPipeline(cfg, state).run()
        M2tsBackupPipeline(cfg, state).run()
        logging.info("=== All Finished ===")
    except Exception as e:
        cfg.write_log = True
        logging.exception(f"Unexpected Error: {e}")
    finally:
        logger.flush_all(cfg.write_log)
        logger.close()
        if lock_fd: lock_fd.close()
if __name__ == "__main__":
    main()

このスクリプトを起動すると下記タスクを順次実行して終了します。

  • Amatsukazeへ変換タスクを追加
  • mp4をNASへコピー
  • m2tsをHDDへコピー・空ディレクトリの削除

処理対象のファイルがない場合は何もせずに終了します。常駐するタイプのスクリプトではないので、実行開始にはトリガーが必要になります。

実行開始のトリガーはcronを利用します。通常のテレビ番組は0分台、30分台に終了することが多いため、録画終了からできるだけ早く変換・コピー処理が始まるように、毎時1分を起点として3分おきに実行するようにします。下記はcrontabの設定例になります。

1-59/3 * * * * /usr/bin/python3 /home/tv-recorder/Scripts/TvRecorder.py

N100など非力なCPUでTVサーバーを構築している場合、録画中の変換処理でDropが発生することを避けるため、実行時間を深夜から未明にかけてなど録画番組のない時間帯に限定してスクリプトを起動するように設定すればよいです。

このPythonスクリプトは以下についても考慮しています。

  • 録画中のファイルは処理対象から除外する
  • HDDアクセスによるDropを避けるため、HDDへコピーする時間帯を制限する
  • 変換・コピー処理のログをリアルタイムでログに保存し、コンソール上からもリアルタイムで動作状況を確認できるようにする
  • ログの肥大化を避けるため、処理が行われない場合はログ出力しないようにする
  • ファイル操作をせずに動作確認できるようにDryRunで動作確認できるようにする
  • コピー処理実行時にはHDD/NASにコピー可能かどうかを確認し、コピーできない場合はHDD/NASが復帰したときに処理を再実行する
  • 変換・コピー後にファイル移動などで再処理が走らないように、処理を実行したファイルをiniファイルに記録し、次回以降の処理でiniファイルを参照してスキップする
  • m2tsを上書きしたら、mp4変換とコピー処理が走り、既存のファイルを上書きするようにする

TVサーバー稼働後の運用とメンテナンス

KonomiTVはディレクトリ階層構造でファイルを表示できないため、1000単位の数の録画ファイルを探すには不向きです。そこで直近1週間程度の録画番組はKonomiTVで再生し、録画後のCM抜きmp4ファイルの再生はNAS上で動作するJellyfinなどのメディアサーバーを使って視聴します。

ファイルは自動整理されるため、基本的にメンテナンスは不要ですが、削除前提で録画したファイルを保存しておきたくなった場合は、保存用ディレクトリにファイルを移動するだけで作業が完了します。

また、ファイルは自動で削除されるので短い期間でNASやHDDの容量が足りなくなることはないですが、保存しておく録画ファイルが増えるとHDD/NASを圧迫してシステムが機能しなくなるため、保存するファイルのトータルサイズとHDD/NASの空き容量は、適宜チェックしておく必要があります。

参考になれば幸いです。