aboutsummaryrefslogtreecommitdiff
path: root/examples/sound/spustream/interleave.py
blob: 4f4f20fbaf34871d9ac5300a53046a7b749d680a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python3
# Simple .VAG interleaving tool
# (C) 2021-2022 spicyjpeg - MPL licensed

import os, sys
from warnings  import warn
from struct    import Struct
from itertools import zip_longest
from argparse  import ArgumentParser, FileType

VAG_HEADER  = Struct("> 4s 4I 10x H 16s")
VAG_MAGIC   = b"VAGp"
VAGI_MAGIC  = b"VAGi"
VAG_VERSION = 0x20
BUFFER_SIZE = 0x1000
CHUNK_ALIGN = 0x800

## Helpers

def swap_endian(value, size):
	return int.from_bytes(value.to_bytes(size, "big"), "little")

def align(data, size):
	chunks = (len(data) + size - 1) // size

	return data.ljust(chunks * size, b"\x00")

def get_loop_offset(data):
	for index, flag in enumerate(data[1::16]):
		if flag & 0x01:
			return index * 16

	return len(data) - 16

## .VAG file reader

class VAGReader:
	def __init__(self, _file):
		self.file = _file
		header    = _file.read(VAG_HEADER.size)

		(
			magic, _, _,
			self.size,
			self.sample_rate,
			_, _
		) = VAG_HEADER.unpack(header)

		if magic == VAGI_MAGIC:
			raise RuntimeError(f"{_file.name} is an interleaved .VAG file (must be mono)")
		if magic != VAG_MAGIC:
			raise RuntimeError(f"{_file.name} is not a valid .VAG file")

	def read(self, chunk_size):
		for _ in range(0, self.size, chunk_size):
			chunk = self.file.read(chunk_size)

			if len(chunk) < 16:
				break
			if len(chunk) % 16:
				warn(RuntimeWarning(f"{self.file.name} is not 16-byte aligned, trimming"))
				chunk = chunk[0:(len(chunk) // 16) * 16]

			# If there already is an end flag in the chunk replace it with a
			# loop flag, otherwise add a new loop flag at the end.
			end   = get_loop_offset(chunk)
			chunk = bytearray(chunk)

			chunk[end + 1] = 0x03 # Jump to loop point + sustain
			yield chunk.ljust(chunk_size, b"\x00")

## Main

def get_args():
	parser = ArgumentParser(
		description = "Generates interleaved audio stream data from one or more .VAG files."
	)
	parser.add_argument(
		"input_file",
		nargs = "+",
		type  = FileType("rb"),
		help  = "mono input files for each channel in .VAG format"
	)
	parser.add_argument(
		"output_file",
		type = FileType("wb"),
		help = "where to output converted stream data"
	)
	parser.add_argument(
		"-b", "--buffer-size",
		type    = int,
		default = BUFFER_SIZE,
		help    = f"size of each channel buffer in each chunk (default {BUFFER_SIZE})",
		metavar = "bytes"
	)
	parser.add_argument(
		"-a", "--align",
		type    = int,
		default = CHUNK_ALIGN,
		help    = f"pad each chunk to a multiple of the given size (default {CHUNK_ALIGN})",
		metavar = "bytes"
	)
	parser.add_argument(
		"-r", "--raw",
		action = "store_true",
		help   = "do not add an interleaved .VAG header to the output file"
	)

	return parser.parse_args()

def main():
	args = get_args()
	if args.buffer_size % 16:
		raise ValueError("buffer size must be a multiple of 16 bytes")
	if args.buffer_size % args.align:
		warn(RuntimeWarning(f"buffer size should be a multiple of {args.align}"))

	input_files = tuple(map(VAGReader, args.input_file))
	size        = input_files[0].size
	sample_rate = input_files[0].sample_rate

	for vag in input_files[1:]:
		if vag.size != size:
			warn(RuntimeWarning(f"{vag.file.name} has a different file size"))
		if vag.sample_rate != sample_rate:
			warn(RuntimeWarning(f"{vag.file.name} has a different sample rate"))

	interleave = zip_longest(
		*( vag.read(args.buffer_size) for vag in input_files ),
		fillvalue = b"\x00" * args.buffer_size
	)

	with args.output_file as _file:
		if not args.raw:
			header = VAG_HEADER.pack(
				VAGI_MAGIC,
				VAG_VERSION,
				swap_endian(args.buffer_size, 4),
				size,
				sample_rate,
				swap_endian(len(input_files), 2),
				os.path.basename(_file.name).encode()[0:16]
			)

			_file.write(align(header, args.align))

		for chunks in interleave:
			data = b"".join(chunks)

			_file.write(align(data, args.align))

if __name__ == "__main__":
	main()