
※2026/1/21現在の情報になります
※スクリプトや構成のアップデートに合わせて、記事も随時アップデートしています
以前よりUbuntuでTV視聴・録画サーバー(以降、TVサーバーと表記)を設置し、CMを抜いたmp4への変換にはWindowsのAmatsukazeを利用し、バックアップ処理も含めて自動化していました。しかし、TVサーバーのために2台のPCを使うのはもったいないと思い、Ubuntu上で完結するシステムへの切り替えを検討していました。
ネット上にはAmatsukazeを使ってCM抜きmp4を生成する方法の解説記事はあるものの、TVサーバーとの連携にはWindows単体もしくはWindowsとUbuntuを2台利用したものがほとんどで、Ubuntu 1台のみで完結したシステムの記事は見つかりませんでした。
このため、TVサーバーをUbuntuで実装する方法を一から検討し、録画->CM抜き->mp4/m2tsバックアップ->不要ファイル処理を完全自動化することができました。
本記事はUbuntuで完結するTVサーバー仕様のポイントを備忘録としてまとめたものになります。
本記事のTVサーバーのシステム構成
構築したTVサーバーのシステム構成は以下のとおりです。

mp4ファイル再生用のメディアサーバーはTVサーバー上に構築することでPC一台でシステムを完結させることもできます。しかし、mp4ファイルは最終的にNASへバックアップコピーするので、録画中のDrop発生要因を減らすことも考慮し、バックアップ先のNAS上で整理・再生するようにしました。
NASへのコピーはSFTPを使います。Exportされたフォルダをマウントする形式だと電源を入れる順番によってはマウントに失敗することもあるため、コピー開始前にNAS側の異常を検知できるSFTPを使います。
m2tsの保存先については、LAN経由でのコピーは時間がかかる上に、保存後にアクセスすることもないことから、NASへのバックアップではなくPCに直接接続したHDDを保存先にしています。
TVサーバー構築用ソフトウェア
TVサーバーを構築するソフトは、録画用に「EPGStation」、リアルタイム視聴・m2ts再生用に「KonomiTV」を利用します。EPGStationは録画した番組をm2ts形式で保存し、KonomiTVでm2tsファイルを直接再生して視聴できます。
録画用ソフトにEPGStationを使う理由は、RaspberryPi上でも動作するためです。TVサーバーが何らかのトラブルで録画に失敗する場合に備えて、RaspberryPiで録画バックアップサーバーを動かしているため、メインで利用するPC上の録画ソフトがEPGStationであれば録画ルール(時刻・条件・録画ファイル格納ディレクトリ)のデータの同期ができ、運用コストを下げることができます。
このような縛りがなければ、KonomiTVが録画予約までサポートしているECDBを使ってもいいです。
TVサーバー構築用OS
録画用ソフトにECDBを使えばWindowsでTVサーバーを構築することはできますが、セキュリティアップデートでアプリが起動しなくなる、録画途中に負荷がかかる、再起動が発生する、突発的なブルースクリーンが発生するなどが原因で録画が失敗するリスクがあります。
Ubuntuは自動アップデートなどの自動処理は行なわず、OSとしても安定しており、コンソールを使ってファイル操作やログのチェックができることから、UbuntuでTVサーバーを構築しています。
m2tsファイルのバックアップの必要性について
録画した番組を保存したければ、m2tsファイルをほかの媒体にコピーすればよいです。しかし、m2tsファイルは下記の点で保存には不向きです。
- 100時間で約76GBとなり、1年でTB単位でトータル保存ファイルサイズが増加するため、ストレージコストが高くつく
- スマホやタブレットへの保存(オフライン視聴)時に保存できる番組数が少なくなる
- 再生できるプレイヤー・メディアサーバーが少ない
- CMなど不要な部分が含まれているため、流し見に向いていない
これらは、Amatsukazeを利用しCMを抜いたmp4ファイルに変換し、これを保存することで解決できます。mp4に変換するとファイルサイズは10-30%程度まで縮小できます。
ただし、mp4に変換するとm2tsに記録されているニコニコ動画のコメントも消えてしまうため、これらを残したい場合はm2tsも残す必要があります。またCMカットをやり直すために再度エンコードする可能性もあるので、m2tsファイルは最終的には消す予定でも、しばらくの間は保持していたほうが安心です。
メンテナンスフリーのための自動処理の要件
録画したファイルを整理する際には、下記の作業を行います。
- SSD/HDD/NASの残容量確認と不要ファイルの削除
- CM抜きmp4ファイルへ変換
- NAS,HDDへのコピー
番組を録画したら上記処理を自動で行う手順を考える必要があります。変換、転送処理は簡単ですが、ファイル・フォルダの整理と不要ファイルの削除については、削除ルールを策定しパラメータで処理をコントロールできるようにしなければいけません。
不要ファイルの削除
EPGStationで録画したm2tsファイルの削除は、EGPStationのデータベースとの不整合を防ぐためEPGstationの自動削除機能にまかせたほうが安全です。
保存が必要なm2tsファイルは、自動削除処理が実行される前にHDDへコピーして消去されないようにします。そして、コピー先のディレクトリサイズが指定容量を超過した場合は、指定容量以内に収まるまで更新日時の古いファイルから順番に自動で削除する形で容量オーバーを回避します。ファイルの削除可否はフォルダごとに自動削除対象かどうかを定義します。
各録画ファイルへ適用する削除ルールは、EPGStationの録画予約時の保存先ディレクトリ指定機能を利用します。ファイル削除ルールごとにディレクトリを以下の4つに分け、録画時にこれらのディレクトリを指定して録画ファイルを自動で分類します。
- no_conversion: CMカットも変換も不要な番組(ニュースなど)
- delete: CMカット後に一定期間保持する番組(バラエティなど)
- delete_after_watch: CMカット後に視聴するまで自動消去しない番組(連ドラなど)
- keep: CMカット後も再エンコードに備えてm2tsを残したい番組(ドキュメンタリー・映画など)
NAS上のファイルについてもm2tsファイルと同様のパラメータで削除ルールを作り、コピー開始前にルールに従って削除します。
ファイル削除によって空ディレクトリもできるため、HDDコピー時に空ディレクトリも削除する処理を実行するようにします。
mp4ファイルへの変換
CMカット機能があるAmatsukazeを使用します。AmatsukazeはUbuntuに対応しているため、Ubuntu上で動作させます。
変換したmp4ファイルを格納するディレクトリはm2tsファイルと同じ構造を維持するようにし、管理の手間を省きます。ここでは/mnt/converted_filesに保存するものとします。
NAS・HDDへコピー
Pythonスクリプトで以下のように処理をします。
- m2tsファイル:直接接続された外付けHDDへコピー
- mp4ファイル: sftp経由でNASへアップロード
一度変換・コピーしたファイルは繰り返し同じ処理が走らないようにデータベースに記録し管理します。
ファイルをコピーする際には録画ファイルのディレクトリ構造を維持してコピーし、管理の手間を省きます。
TVサーバー構築
TVサーバーのPCスペックについて
録画・再生だけであればN100等の低スペックPCでも可能ですが、Amatsukazeでの変換を裏で同時に行う場合はCPU性能が必要になります。
Core i9のミニPCをTVサーバーにして運用した場合は、変換中にDropは発生しておらず、1時間番組の場合ですと録画終了からNASへのアップロード完了までは10分程度で終わります。
N100の場合は同時録画をしている最中に変換処理をしているとDropが発生することがあることを確認しています。この場合、変換処理は深夜などの無録画時間帯で実行することで回避可能です。
Ubuntuのインストール
インストールするPCによりインストール方法が変わるため、ここでの説明は割愛します。以下の記事などを参照してください。
EPGStation、KonomiTVのセットアップ
EPGStationとKonomiTVをUbuntuにインストールする方法は以下の記事にまとめていますので、こちらを参考にしてください。
ECDBは下記記事を参考にするとインストールできます。
Ubuntu上で動作するAmatsukazeをセットアップする
Amatsukazeのインストールや設定は環境に大きく依存するので、マニュアルを参考にしてインストールしてください。
Amatsukazeのセットアップ後、最終的に以下のコマンドで変換キューに追加し、出力先にmp4ファイルができていればよいです。
$ /home/ユーザー名/Amatsukaze/exe_files/AmatsukazeAddTask -f <m2tsファイル> -o <出力先ディレクトリ> -ip localhost -s <プロファイル名> <br>自動処理用Pythonスクリプト
下記が上記要件を取り入れたPythonスクリプトになります。
スクリプトを1ファイルに収めたので、処理内容の確認やカスタマイズはClaude、Copilotなどを利用すると簡単です。
環境依存パラメータについては、ファイル先頭に定義されているConfigクラスにまとめていますので、環境に応じて変更してください。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TV録画統合管理スクリプト (Integrated TV Recorder Manager)
【構成】
- Config: 設定管理
- StateManager: 状態管理 (INI)
- DiskOperations (Abstract): ディスク操作の抽象化
- LocalDiskOperations: ローカル操作
- RemoteDiskOperations: SFTP操作
- Cleaner: ディスクの掃除・容量管理ロジック
- Pipelines:
- M2tsConverterPipeline (Phase 1)
- Mp4UploadPipeline (Phase 2)
- M2tsBackupPipeline (Phase 3)
- Logger: 条件付きバッファリングロガー
"""
import configparser
import shutil
import subprocess
import logging
import logging.handlers
import sys
import unicodedata
import fcntl
import posixpath
import stat
import re
import time
from abc import ABC, abstractmethod
from pathlib import Path
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import List, Dict, Generator, Optional, Tuple, Any, Union
try:
import paramiko
except ImportError:
print("【エラー】'paramiko' ライブラリが必要です: pip install paramiko", file=sys.stderr)
sys.exit(1)
# =========================================================
# 0. データ構造 & ユーティリティ
# =========================================================
@dataclass
class FileEntry:
path: Union[Path, str]
name: str
size: int
mtime: datetime
def format_bytes(size: int) -> str:
for unit in ["Byte", "KB", "MB"]:
if size < 1024:
return f"{size:.2f}{unit}" if unit != "Byte" else f"{size}{unit}"
size /= 1024
return f"{size:.2f}GB"
def normalize_str(s: str) -> str:
return unicodedata.normalize('NFC', s) if isinstance(s, str) else s
# =========================================================
# 1. 設定管理 (Configuration)
# =========================================================
@dataclass
class Config:
# --- パス設定 ---
source_dir_m2ts: Path = Path("/mnt/tv-recorder/recorded_files")
dest_dir_hdd: Path = Path("/mnt/hdd/m2ts")
converted_dir: Path = Path("/mnt/converted_files")
ini_file: Path = Path("/home/tv-recorder/Scripts/status_TvRecorder.ini")
log_file: Path = Path("/home/tv-recorder/Scripts/process_TvRecorder.log")
lock_file: Path = Path("/tmp/TvRecorder.lock")
# --- 動作設定 ---
dry_run: bool = False
write_log: bool = False
scan_threshold_sec: int = 10
m2ts_delete_days: int = 30
# --- HDDコピー実行許可時間帯 ---
copy_window_start_hour: int = 2
# --- NAS接続設定 ---
nas_config: Dict = field(default_factory=lambda: {
'host': '192.168.1.3',
'port': 22,
'user': 'tv-recorder',
'password': '********',
'key_file': None,
'dest_dir': '/tv_program/converted_files'
})
# --- 除外設定 ---
skip_folders_m2ts: List[str] = field(default_factory=lambda: ['no_conversion'])
hdd_exclude_dirs: List[str] = field(default_factory=lambda: ['keep'])
converted_exclude_dirs: List[str] = field(default_factory=list)
nas_exclude_dirs: List[str] = field(default_factory=lambda: ['/keep/', '/delete_after_watch/'])
# --- 容量上限設定 (GB) ---
max_size_hdd_gb: float = 4500.0
max_size_converted_gb: float = 210.0
max_size_nas_gb: float = 4000.0
# --- 保持ポリシー ---
retention_policies_hdd: List[Dict] = field(default_factory=lambda: [
{"dir": "delete", "days": 182, "limit_gb": 1000},
{"dir": "delete_after_watch", "days": 365, "limit_gb": 1500}
])
retention_policies_nas: List[Dict] = field(default_factory=lambda: [
{"dir": "delete", "days": 365, "limit_gb": 1500},
])
# --- Amatsukaze設定 ---
amatsukaze_cmd: str = "/home/tv-recorder/Amatsukaze/Amatsukaze/exe_files/AmatsukazeAddTask"
amatsukaze_ip: str = "localhost"
amatsukaze_service: str = "QsvEnc"
@property
def is_hdd_copy_time_window(self) -> bool:
return datetime.now().hour == self.copy_window_start_hour
# =========================================================
# 2. ロガー & 状態管理 & 排他制御
# =========================================================
class ConditionalBufferHandler(logging.Handler):
def __init__(self, log_file: Path):
super().__init__()
self.buffer = []
self.passthrough = False
if not log_file.parent.exists():
log_file.parent.mkdir(parents=True, exist_ok=True)
self.stream_handler = logging.StreamHandler(sys.stdout)
self.file_handler = logging.FileHandler(log_file, encoding='utf-8')
def setFormatter(self, fmt):
super().setFormatter(fmt)
self.stream_handler.setFormatter(fmt)
self.file_handler.setFormatter(fmt)
def emit(self, record):
if getattr(self, 'passthrough', False):
self.stream_handler.handle(record)
self.file_handler.handle(record)
else:
if not hasattr(self, 'buffer'):
self.buffer = []
self.buffer.append(record)
def flush_all(self, should_write: bool):
if should_write:
self.passthrough = True
if hasattr(self, 'buffer'):
for record in self.buffer:
self.stream_handler.handle(record)
self.file_handler.handle(record)
self.stream_handler.flush()
self.file_handler.flush()
self.buffer = []
def close(self):
self.stream_handler.close()
self.file_handler.close()
super().close()
def setup_logger(log_path: Path) -> ConditionalBufferHandler:
root = logging.getLogger()
if root.hasHandlers(): root.handlers.clear()
root.setLevel(logging.INFO)
fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
handler = ConditionalBufferHandler(log_path)
handler.setFormatter(fmt)
root.addHandler(handler)
return handler
def activate_realtime_log():
root = logging.getLogger()
for h in root.handlers:
if isinstance(h, ConditionalBufferHandler):
h.flush_all(True)
def acquire_lock(lock_path: Path):
try:
lock_fd = open(lock_path, 'w')
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return lock_fd
except IOError:
return None
class StateManager:
SECTION_COPY_HDD = 'CopyHDD'
SECTION_CONVERT = 'Convert'
SECTION_UPLOAD_NAS = 'UploadNAS'
def __init__(self, ini_path: Path):
self.ini_path = ini_path
self.config = configparser.ConfigParser()
self.config.optionxform = str
self._load()
def _load(self):
if self.ini_path.exists():
try:
self.config.read(self.ini_path, encoding='utf-8')
except Exception: pass
for sec in [self.SECTION_COPY_HDD, self.SECTION_CONVERT, self.SECTION_UPLOAD_NAS]:
if sec not in self.config: self.config[sec] = {}
def is_recorded(self, section: str, file_name: str, file_size: int) -> bool:
if section not in self.config: return False
norm_name = normalize_str(file_name)
sec_data = {normalize_str(k): v for k, v in self.config[section].items()}
return sec_data.get(norm_name) == str(file_size)
def is_key_exists(self, section: str, file_name: str) -> bool:
if section not in self.config: return False
norm_name = normalize_str(file_name)
return norm_name in {normalize_str(k) for k in self.config[section].keys()}
def update_entry(self, section: str, file_name: str, file_size: int, dry_run: bool = False):
if dry_run:
logging.info(f" [DryRun] INI更新 [{section}]: {file_name}")
return
try:
sec_data = self.config[section]
if file_name not in sec_data:
items = list(sec_data.items())
sec_data.clear()
sec_data[file_name] = str(file_size)
for k, v in items: sec_data[k] = v
else:
sec_data[file_name] = str(file_size)
with open(self.ini_path, 'w', encoding='utf-8') as f:
self.config.write(f)
except Exception as e:
logging.error(f"INI Update Failed: {e}")
# =========================================================
# 3. ディスク操作の抽象化 (Disk Operations & Cleaner)
# =========================================================
class DiskOperations(ABC):
@abstractmethod
def list_files_recursive(self, root_dir: Union[Path, str], exclude_dirs: List[str] = None) -> Generator[FileEntry, None, None]:
pass
@abstractmethod
def delete_file(self, path: Union[Path, str]) -> bool:
pass
@abstractmethod
def remove_empty_dir(self, path: Union[Path, str]) -> bool:
pass
@abstractmethod
def exists(self, path: Union[Path, str]) -> bool:
pass
class LocalDiskOperations(DiskOperations):
def __init__(self, dry_run: bool):
self.dry_run = dry_run
def list_files_recursive(self, root_dir: Path, exclude_dirs: List[str] = None) -> Generator[FileEntry, None, None]:
exclude_dirs = exclude_dirs or []
excludes = [root_dir / d for d in exclude_dirs]
for p in root_dir.rglob('*'):
if p.is_file():
if any(ex in p.parents for ex in excludes): continue
yield FileEntry(
path=p, name=p.name, size=p.stat().st_size,
mtime=datetime.fromtimestamp(p.stat().st_mtime)
)
def delete_file(self, path: Path) -> bool:
size_str = "Unknown"
try:
size_str = format_bytes(path.stat().st_size)
except Exception: pass
if self.dry_run:
logging.info(f" [DryRun] 削除: {path.name} ({size_str})")
return True
try:
path.unlink()
logging.info(f" 削除: {path.name} ({size_str})")
return True
except Exception: return False
def remove_empty_dir(self, path: Path) -> bool:
if self.dry_run: return True
try:
path.rmdir()
return True
except OSError: return False
def exists(self, path: Path) -> bool:
return path.exists()
class RemoteDiskOperations(DiskOperations):
def __init__(self, sftp, dry_run: bool):
self.sftp = sftp
self.dry_run = dry_run
def list_files_recursive(self, root_dir: str, exclude_dirs: List[str] = None) -> Generator[FileEntry, None, None]:
exclude_markers = exclude_dirs or []
try:
for path, attr, is_dir in self._sftp_walk(root_dir):
if not is_dir:
if any(m in path for m in exclude_markers): continue
yield FileEntry(
path=path, name=posixpath.basename(path), size=attr.st_size,
mtime=datetime.fromtimestamp(attr.st_mtime)
)
except Exception: pass
def delete_file(self, path: str) -> bool:
size_str = "Unknown"
try:
size_str = format_bytes(self.sftp.stat(path).st_size)
except Exception: pass
if self.dry_run:
logging.info(f" [DryRun] 削除(NAS): {posixpath.basename(path)} ({size_str})")
return True
try:
self.sftp.remove(path)
logging.info(f" 削除(NAS): {posixpath.basename(path)} ({size_str})")
if path.endswith('.mp4'):
ass = str(Path(path).with_suffix('.ass'))
if self.exists(ass):
self.sftp.remove(ass)
return True
except Exception: return False
def remove_empty_dir(self, path: str) -> bool:
if self.dry_run: return True
try:
self.sftp.rmdir(path)
return True
except Exception: return False
def exists(self, path: str) -> bool:
try:
self.sftp.stat(path)
return True
except FileNotFoundError: return False
def _sftp_walk(self, remote_path):
try:
for attr in self.sftp.listdir_attr(remote_path):
full_path = posixpath.join(remote_path, attr.filename)
if stat.S_ISDIR(attr.st_mode):
yield from self._sftp_walk(full_path)
yield (full_path, attr, True)
else:
yield (full_path, attr, False)
except Exception: pass
class Cleaner:
def __init__(self, config: Config, ops: DiskOperations, label: str):
self.cfg = config
self.ops = ops
self.label = label
def enforce_size_limit(self, target_dir: Union[Path, str], limit_gb: float, exclude_dirs: List[str]):
if not self.ops.exists(target_dir): return
all_files = list(self.ops.list_files_recursive(target_dir, exclude_dirs=[]))
total_size = sum(f.size for f in all_files)
logging.info(f" {self.label} 現在の全体容量 [{target_dir}]: {format_bytes(total_size)}")
deletable_files = []
for f in all_files:
is_excluded = False
if isinstance(target_dir, Path):
if exclude_dirs and any(ex in f.path.parts for ex in exclude_dirs):
is_excluded = True
else:
if exclude_dirs and any(ex in f.path for ex in exclude_dirs):
is_excluded = True
if not is_excluded:
deletable_files.append(f)
deletable_files.sort(key=lambda x: x.mtime)
limit_bytes = limit_gb * (1024**3)
delete_list = []
if total_size > limit_bytes:
for f in deletable_files:
delete_list.append(f)
total_size -= f.size
if total_size <= limit_bytes: break
if delete_list:
self.cfg.write_log = True
activate_realtime_log()
logging.info(f" {self.label} 全体容量制限チェック [{target_dir}]: 上限 {limit_gb}GB -> {len(delete_list)}ファイル削除")
for f in delete_list:
self.ops.delete_file(f.path)
def apply_retention_policy(self, parent_dir: Union[Path, str], dir_name: str, days: int, limit_gb: int):
if isinstance(parent_dir, Path):
target = parent_dir / dir_name
else:
target = posixpath.join(parent_dir, dir_name)
if not self.ops.exists(target): return
files = list(self.ops.list_files_recursive(target))
total_size = sum(f.size for f in files)
logging.info(f" {self.label} 現在の容量 [{target}]: {format_bytes(total_size)}")
cutoff = datetime.now() - timedelta(days=days)
delete_list = [f for f in files if f.mtime < cutoff]
keep_list = [f for f in files if f.mtime >= cutoff]
keep_list.sort(key=lambda x: x.mtime)
limit_bytes = limit_gb * (1024**3)
current_keep_size = sum(f.size for f in keep_list)
while current_keep_size > limit_bytes and keep_list:
target_f = keep_list.pop(0)
delete_list.append(target_f)
current_keep_size -= target_f.size
if delete_list:
self.cfg.write_log = True
activate_realtime_log()
logging.info(f" {self.label} ポリシー適用 [{dir_name}]: {len(delete_list)}ファイル削除")
for f in delete_list:
self.ops.delete_file(f.path)
def delete_old_files_by_pattern(self, target_dir: Union[Path, str], days: int, pattern: str):
if not self.ops.exists(target_dir): return
logging.info(f" {self.label} 古いファイルの削除チェック: {target_dir} ({days}日以上, {pattern})")
all_files = self.ops.list_files_recursive(target_dir)
cutoff = datetime.now() - timedelta(days=days)
count = 0
for f in all_files:
if isinstance(target_dir, Path):
if not f.path.match(pattern): continue
else:
if not f.name.endswith('.m2ts'): continue
if f.mtime < cutoff:
self.cfg.write_log = True
activate_realtime_log()
self.ops.delete_file(f.path)
count += 1
if count == 0:
logging.info(" -> 削除対象なし")
else:
logging.info(f" -> {count}ファイル削除")
# =========================================================
# 4. パイプライン基底クラス
# =========================================================
class BasePipeline(ABC):
def __init__(self, config: Config, state: StateManager):
self.cfg = config
self.state = state
self.ops = LocalDiskOperations(config.dry_run)
self.cleaner = Cleaner(config, self.ops, "[Local]")
@abstractmethod
def run(self):
pass
def _cleanup_empty_dirs_local(self, root: Path, excludes: List[str] = None):
if not root.exists(): return
excludes = excludes or []
deleted_count = 0
all_dirs = sorted([p for p in root.rglob('*') if p.is_dir()], key=lambda p: len(p.parts), reverse=True)
exclude_paths = [root / e for e in excludes]
for d in all_dirs:
if d in exclude_paths or d == root: continue
try:
if not any(d.iterdir()):
if self.ops.remove_empty_dir(d):
logging.info(f" [Local] 空ディレクトリ削除: {d}")
deleted_count += 1
except OSError: pass
if deleted_count > 0:
self.cfg.write_log = True
activate_realtime_log()
# =========================================================
# 5. 各フェーズの実装
# =========================================================
class M2tsConverterPipeline(BasePipeline):
"""Phase 1: M2TS -> MP4 変換"""
def run(self):
tasks = list(self._scan())
self._cleanup()
if tasks:
self.cfg.write_log = True
activate_realtime_log()
logging.info("=== Phase 1: M2TS変換 (Converter) ===")
logging.info(f"変換対象数: {len(tasks)}")
for task in tasks:
self._process(task)
def _scan(self):
if not self.cfg.source_dir_m2ts.exists(): return
threshold = datetime.now() - timedelta(seconds=self.cfg.scan_threshold_sec)
for p in self.cfg.source_dir_m2ts.rglob('*.m2ts'):
if not p.is_file(): continue
try:
rel = p.relative_to(self.cfg.source_dir_m2ts)
if len(rel.parts) > 1 and rel.parts[0] in self.cfg.skip_folders_m2ts: continue
except ValueError: pass
if datetime.fromtimestamp(p.stat().st_mtime) >= threshold: continue
if self.state.is_recorded(StateManager.SECTION_CONVERT, p.name, p.stat().st_size):
continue
yield {'src': p, 'rel': rel, 'size': p.stat().st_size}
def _cleanup(self):
self.cleaner.enforce_size_limit(self.cfg.converted_dir, self.cfg.max_size_converted_gb, self.cfg.converted_exclude_dirs)
self._cleanup_empty_dirs_local(self.cfg.converted_dir)
def _process(self, task):
src = task['src']
logging.info(f"[Converter] 登録: {src.name} ({format_bytes(task['size'])})")
out_dir = self.cfg.converted_dir / task['rel'].parent
if self._prepare_out_dir(out_dir, src.stem) and self._exec_amatsukaze(src, out_dir):
log_file = out_dir / f"{src.stem}-enc.log"
try:
if not self.cfg.dry_run:
log_file.touch()
logging.info(f" -> エンコードログファイル作成: {log_file.name}")
except Exception as e:
logging.warning(f" -> エンコードログファイル作成失敗: {e}")
self.state.update_entry(StateManager.SECTION_CONVERT, src.name, task['size'], self.cfg.dry_run)
def _prepare_out_dir(self, out_dir, stem):
if self.cfg.dry_run: return True
try:
out_dir.mkdir(parents=True, exist_ok=True)
norm = normalize_str(stem)
for p in out_dir.iterdir():
if p.is_file() and normalize_str(p.stem).startswith(norm): p.unlink()
return True
except Exception: return False
def _exec_amatsukaze(self, src, out_dir):
cmd = [self.cfg.amatsukaze_cmd, "-ip", self.cfg.amatsukaze_ip,
"-s", self.cfg.amatsukaze_service, "-o", str(out_dir), "-f", str(src)]
if self.cfg.dry_run:
logging.info(f" [DryRun] CMD: {cmd}")
return True
try:
subprocess.run(cmd, capture_output=True, text=True, check=True)
return True
except Exception as e:
logging.error(f" -> コマンド失敗: {e}")
return False
class Mp4UploadPipeline(BasePipeline):
"""Phase 2: MP4 -> NAS 転送"""
def __init__(self, config: Config, state: StateManager):
super().__init__(config, state)
self.sftp = None
self.transport = None
def run(self):
if not self.cfg.converted_dir.exists(): return
candidates = list(self._scan_candidates())
if not candidates: return
self.cfg.write_log = True
activate_realtime_log()
logging.info("=== Phase 2: MP4転送 (Upload) ===")
self.sftp, self.transport = self._connect_sftp()
if not self.sftp: return
self.ops = RemoteDiskOperations(self.sftp, self.cfg.dry_run)
self.cleaner = Cleaner(self.cfg, self.ops, "[NAS]")
try:
self._cleanup()
processed = 0
for task in candidates:
if self._check_remote_status(task):
self._process(task)
processed += 1
if processed > 0:
logging.info(f"MP4転送完了数: {processed}")
finally:
if self.transport: self.transport.close()
def _connect_sftp(self):
c = self.cfg.nas_config
try:
t = paramiko.Transport((c['host'], c['port']))
if c['key_file']:
k = paramiko.RSAKey.from_private_key_file(c['key_file'])
t.connect(username=c['user'], pkey=k)
else:
t.connect(username=c['user'], password=c['password'])
return paramiko.SFTPClient.from_transport(t), t
except Exception: return None, None
def _scan_candidates(self) -> Generator[Dict, None, None]:
dest_root = self.cfg.nas_config['dest_dir']
threshold = datetime.now() - timedelta(seconds=self.cfg.scan_threshold_sec)
exclude_ptn = re.compile(r'-\d+$')
for p in self.cfg.converted_dir.rglob('*.mp4'):
if not p.is_file(): continue
if exclude_ptn.search(p.stem): continue
if datetime.fromtimestamp(p.stat().st_mtime) >= threshold: continue
size = p.stat().st_size
if self.state.is_recorded(StateManager.SECTION_UPLOAD_NAS, p.name, size): continue
rel = p.relative_to(self.cfg.converted_dir).as_posix()
dest = posixpath.join(dest_root, rel)
yield {'src': p, 'dest': dest, 'name': p.name, 'size': size, 'type': 'mp4'}
ass = p.with_suffix('.ass')
if ass.exists():
ass_dest = posixpath.join(dest_root, ass.relative_to(self.cfg.converted_dir).as_posix())
yield {'src': ass, 'dest': ass_dest, 'size': ass.stat().st_size, 'type': 'ass'}
def _check_remote_status(self, task) -> bool:
try:
r_size = self.sftp.stat(task['dest']).st_size
if task['size'] == r_size: return False
task['reason'] = "サイズ変更(NAS既存)"
return True
except FileNotFoundError:
task['reason'] = "新規"
return True
def _cleanup(self):
root = self.cfg.nas_config['dest_dir']
nas_excludes = self.cfg.nas_exclude_dirs.copy()
for pol in self.cfg.retention_policies_nas:
self.cleaner.apply_retention_policy(root, pol['dir'], pol['days'], pol['limit_gb'])
nas_excludes.append(f"/{pol['dir']}/")
self.cleaner.enforce_size_limit(root, self.cfg.max_size_nas_gb, nas_excludes)
self._cleanup_empty_dirs_remote(root)
def _cleanup_empty_dirs_remote(self, root: str):
dirs = []
try:
for path, _, is_dir in self.ops._sftp_walk(root):
if is_dir: dirs.append(path)
except Exception: return
dirs.sort(key=lambda s: len(s), reverse=True)
norm_root = root.rstrip('/')
count = 0
for d in dirs:
if posixpath.dirname(d.rstrip('/')) == norm_root: continue
if self.ops.remove_empty_dir(d):
logging.info(f" [NAS] 空ディレクトリ削除: {d}")
count += 1
if count > 0:
self.cfg.write_log = True
activate_realtime_log()
def _process(self, task):
dest_dir = posixpath.dirname(task['dest'])
logging.info(f"[NAS] Upload ({task['reason']}): {task['src'].name} -> {dest_dir}")
if self._upload(task['src'], task['dest']):
if task['type'] == 'mp4':
self.state.update_entry(StateManager.SECTION_UPLOAD_NAS, task['name'], task['size'], self.cfg.dry_run)
def _upload(self, src, dest):
if self.cfg.dry_run:
logging.info(" [DryRun] Uploaded.")
return True
try:
self._mkdir_p(posixpath.dirname(dest))
size_str = format_bytes(src.stat().st_size)
logging.info(f" -> Upload開始 ({size_str})")
start = time.time()
self.sftp.put(str(src), dest)
dur = time.time() - start
logging.info(f" -> Upload完了 (所要時間: {dur:.1f}秒)")
return True
except Exception as e:
logging.error(f" -> Upload失敗: {e}")
return False
def _mkdir_p(self, remote_dir):
if remote_dir in ['/', '.']: return
try: self.sftp.stat(remote_dir)
except FileNotFoundError:
self._mkdir_p(posixpath.dirname(remote_dir))
try: self.sftp.mkdir(remote_dir)
except OSError: pass
class M2tsBackupPipeline(BasePipeline):
"""Phase 3: M2TS -> HDD バックアップ"""
def run(self):
if not self.cfg.is_hdd_copy_time_window and not self.cfg.dry_run: return
tasks = list(self._scan())
self._cleanup()
if tasks:
self.cfg.write_log = True
activate_realtime_log()
dest_path = self.cfg.dest_dir_hdd
pre_size_str = self._get_dir_size_str(dest_path)
logging.info("=== Phase 3: M2TSバックアップ (Backup) ===")
logging.info(f"コピー対象数: {len(tasks)} (現在のディレクトリサイズ [{dest_path}]: {pre_size_str})")
for task in tasks:
self._process(task)
self.cleaner.delete_old_files_by_pattern(self.cfg.source_dir_m2ts, self.cfg.m2ts_delete_days, "*.m2ts")
post_size_str = self._get_dir_size_str(dest_path)
logging.info(f"バックアップ完了後のディレクトリサイズ [{dest_path}]: {post_size_str}")
def _get_dir_size_str(self, target_path: Path) -> str:
if not target_path.exists():
return format_bytes(0)
all_files = self.ops.list_files_recursive(target_path, exclude_dirs=[])
return format_bytes(sum(f.size for f in all_files))
def _scan(self):
if not self.cfg.source_dir_m2ts.exists(): return
threshold = datetime.now() - timedelta(seconds=self.cfg.scan_threshold_sec)
for p in self.cfg.source_dir_m2ts.rglob('*.m2ts'):
if not p.is_file(): continue
try:
rel = p.relative_to(self.cfg.source_dir_m2ts)
if len(rel.parts) > 1 and rel.parts[0] in self.cfg.skip_folders_m2ts: continue
except ValueError: pass
if datetime.fromtimestamp(p.stat().st_mtime) >= threshold: continue
size = p.stat().st_size
dest = self.cfg.dest_dir_hdd / rel
if self.state.is_recorded(StateManager.SECTION_COPY_HDD, p.name, size):
continue
if dest.exists() and dest.stat().st_size == size: continue
yield {'src': p, 'dest': dest, 'size': size}
def _cleanup(self):
for pol in self.cfg.retention_policies_hdd:
self.cleaner.apply_retention_policy(self.cfg.dest_dir_hdd, pol["dir"], pol["days"], pol["limit_gb"])
self.cleaner.enforce_size_limit(self.cfg.dest_dir_hdd, self.cfg.max_size_hdd_gb, self.cfg.hdd_exclude_dirs)
self._cleanup_empty_dirs_local(self.cfg.source_dir_m2ts)
self._cleanup_empty_dirs_local(self.cfg.dest_dir_hdd)
def _process(self, task):
src, dest = task['src'], task['dest']
logging.info(f"[Backup] コピー: {src.name} ({format_bytes(task['size'])})")
if self._copy(src, dest):
self.state.update_entry(StateManager.SECTION_COPY_HDD, src.name, task['size'], self.cfg.dry_run)
def _copy(self, src, dest):
if self.cfg.dry_run:
logging.info(f" [DryRun] Copy to: {dest}")
return True
try:
dest.parent.mkdir(parents=True, exist_ok=True)
if dest.exists(): dest.unlink()
file_size = src.stat().st_size
free_space = shutil.disk_usage(dest.parent).free
if free_space < file_size:
logging.error(f" -> HDD容量不足 (空き: {format_bytes(free_space)}, 必要: {format_bytes(file_size)})")
return False
logging.info(f" -> Copy開始 ({format_bytes(file_size)})")
start = time.time()
shutil.copy2(src, dest)
dur = time.time() - start
logging.info(f" -> Copy完了 (所要時間: {dur:.1f}秒)")
return True
except Exception as e:
logging.error(f" -> Copy失敗: {e}")
return False
# =========================================================
# メイン処理
# =========================================================
def main():
cfg = Config()
lock_fd = acquire_lock(cfg.lock_file)
if lock_fd is None: return
logger = setup_logger(cfg.log_file)
try:
state = StateManager(cfg.ini_file)
mode = "DryRun" if cfg.dry_run else "Production"
logging.info(f"=== TV Recorder Manager Start [{mode}] ===")
M2tsConverterPipeline(cfg, state).run()
Mp4UploadPipeline(cfg, state).run()
M2tsBackupPipeline(cfg, state).run()
logging.info("=== All Finished ===")
except Exception as e:
cfg.write_log = True
logging.exception(f"Unexpected Error: {e}")
finally:
logger.flush_all(cfg.write_log)
logger.close()
if lock_fd: lock_fd.close()
if __name__ == "__main__":
main()このスクリプトを起動すると下記タスクを順次実行して終了します。
- Amatsukazeへ変換タスクを追加
- mp4をNASへコピー
- m2tsをHDDへコピー・空ディレクトリの削除
処理対象のファイルがない場合は何もせずに終了します。常駐するタイプのスクリプトではないので、実行開始にはトリガーが必要になります。
実行開始のトリガーはcronを利用します。通常のテレビ番組は0分台、30分台に終了することが多いため、録画終了からできるだけ早く変換・コピー処理が始まるように、毎時1分を起点として3分おきに実行するようにします。下記はcrontabの設定例になります。
1-59/3 * * * * /usr/bin/python3 /home/tv-recorder/Scripts/TvRecorder.pyN100など非力なCPUでTVサーバーを構築している場合、録画中の変換処理でDropが発生することを避けるため、実行時間を深夜から未明にかけてなど録画番組のない時間帯に限定してスクリプトを起動するように設定すればよいです。
このPythonスクリプトは以下についても考慮しています。
- 録画中のファイルは処理対象から除外する
- HDDアクセスによるDropを避けるため、HDDへコピーする時間帯を制限する
- 変換・コピー処理のログをリアルタイムでログに保存し、コンソール上からもリアルタイムで動作状況を確認できるようにする
- ログの肥大化を避けるため、処理が行われない場合はログ出力しないようにする
- ファイル操作をせずに動作確認できるようにDryRunで動作確認できるようにする
- コピー処理実行時にはHDD/NASにコピー可能かどうかを確認し、コピーできない場合はHDD/NASが復帰したときに処理を再実行する
- 変換・コピー後にファイル移動などで再処理が走らないように、処理を実行したファイルをiniファイルに記録し、次回以降の処理でiniファイルを参照してスキップする
- m2tsを上書きしたら、mp4変換とコピー処理が走り、既存のファイルを上書きするようにする
TVサーバー稼働後の運用とメンテナンス
KonomiTVはディレクトリ階層構造でファイルを表示できないため、1000単位の数の録画ファイルを探すには不向きです。そこで直近1週間程度の録画番組はKonomiTVで再生し、録画後のCM抜きmp4ファイルの再生はNAS上で動作するJellyfinなどのメディアサーバーを使って視聴します。
ファイルは自動整理されるため、基本的にメンテナンスは不要ですが、削除前提で録画したファイルを保存しておきたくなった場合は、保存用ディレクトリにファイルを移動するだけで作業が完了します。
また、ファイルは自動で削除されるので短い期間でNASやHDDの容量が足りなくなることはないですが、保存しておく録画ファイルが増えるとHDD/NASを圧迫してシステムが機能しなくなるため、保存するファイルのトータルサイズとHDD/NASの空き容量は、適宜チェックしておく必要があります。
参考になれば幸いです。