187 lines
5.8 KiB
Python
Executable File
187 lines
5.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from glm import ivec2
|
|
from PIL import Image
|
|
from typing import Iterable
|
|
import av
|
|
import numpy as np
|
|
|
|
SIZE = ivec2(4, 3) * 64 / 4
|
|
SIZE = ivec2(4, 3) * 64 / 3
|
|
LENGTH = SIZE.x * SIZE.y
|
|
RENDER_SCALE = 13
|
|
|
|
|
|
def read_mp4() -> Iterable[np.array]:
|
|
container = av.open("bad_apple.mp4")
|
|
for frame_handler in container.decode(video=0):
|
|
# convert frame to bool array
|
|
image = frame_handler.to_image()
|
|
image = image.resize(SIZE) # downscale
|
|
frame = np.array(image)[:,:,0].T > 128
|
|
yield frame
|
|
|
|
def encode_frames(frames) -> Iterable[bytes]:
|
|
no_compression_cumulative, compression_cumulative = 0, 0
|
|
|
|
frame_prev = np.zeros((SIZE), dtype=bool) # assume black frame at start
|
|
for n, frame in enumerate(frames):
|
|
data = encode_frame(frame, frame_prev)
|
|
yield data
|
|
frame_prev = frame
|
|
|
|
no_compression_cumulative += LENGTH // 8
|
|
compression_cumulative += len(data)
|
|
print("frame",
|
|
f"{n:>5}| len:",
|
|
f"{len(data):>5} =",
|
|
f"{compression_cumulative:>8} /",
|
|
f"{no_compression_cumulative} =>",
|
|
f"{compression_cumulative / no_compression_cumulative * 100:>10.3}% ratio",
|
|
end="\r")
|
|
print()
|
|
|
|
def encode_frame(frame, frame_prev) -> bytes:
|
|
def group(frame_data):
|
|
mask = frame_data[:-1] != frame_data[1:]
|
|
mask = np.concatenate((mask, [True]))
|
|
sizes = np.arange(LENGTH)
|
|
sizes = sizes[mask]
|
|
sizes[1:] -= sizes[:-1]
|
|
sizes[0] += 1
|
|
return zip(sizes, frame_data[mask])
|
|
def group_(frame_data): # slow, but readable
|
|
n = 1
|
|
for a, b in zip(frame_data[:-1], frame_data[1:]):
|
|
if a != b:
|
|
yield n, a
|
|
n = 1
|
|
else:
|
|
n += 1
|
|
yield n, b
|
|
|
|
out = [] # bytearray() ?
|
|
write_byte = out.append
|
|
|
|
prev_skipped = True # don't skip the first one
|
|
n_pixels_left = LENGTH
|
|
|
|
#for n_values, set_value in group((frame != frame_prev).reshape(-1).T):
|
|
for n_values, set_value in group((frame != frame_prev).reshape(-1)):
|
|
assert n_values <= 0x3fff
|
|
|
|
## skip n_values == 1, they can be inferred
|
|
n_pixels_left -= n_values # avoid skipping the last one
|
|
if n_values == 1 and not prev_skipped and n_pixels_left:
|
|
prev_skipped = True # avoid skippin multiple times in a row
|
|
continue
|
|
prev_skipped = False
|
|
|
|
if n_values > 0x3f:
|
|
write_byte( (set_value << 7) | (1 << 6) | (n_values >> 8) )
|
|
write_byte( n_values & 0xff )
|
|
else:
|
|
write_byte( (set_value << 7) | (n_values & 0x3f) )
|
|
|
|
#print(bytes(out))
|
|
return bytes(out)
|
|
|
|
def decode_frame(target_buffer, data, data_offset=0) -> int:
|
|
def read_byte():
|
|
nonlocal data_offset
|
|
try:
|
|
out = data[data_offset]
|
|
except:
|
|
print(f"\n\n{data_offset=} {len(data)=}")
|
|
raise
|
|
data_offset += 1
|
|
return out
|
|
|
|
buffer = np.zeros(LENGTH, dtype=bool)
|
|
buffer_offset = 0
|
|
|
|
prev_value = None
|
|
while buffer_offset != LENGTH:
|
|
|
|
byte = read_byte()
|
|
value = byte >> 7
|
|
if byte & 0x40:
|
|
n_values = ((byte & 0x3f) << 8) | read_byte()
|
|
else:
|
|
n_values = (byte & 0x3f)
|
|
|
|
if prev_value == value: # there was an omitted n_values=1 in between
|
|
buffer[buffer_offset] = not value
|
|
buffer_offset += 1
|
|
|
|
buffer[buffer_offset:buffer_offset + n_values] = value
|
|
buffer_offset += n_values
|
|
prev_value = value
|
|
|
|
#target_buffer ^= buffer.reshape((SIZE.y, SIZE.x)).T
|
|
target_buffer ^= buffer.reshape(SIZE)
|
|
|
|
return data_offset
|
|
|
|
def render_frames(frames, fps = None):
|
|
import pygame
|
|
pygame.init()
|
|
display = pygame.display.set_mode(SIZE * RENDER_SCALE)
|
|
clock = pygame.time.Clock()
|
|
|
|
ignored_rows = 0
|
|
for frame in frames:
|
|
for event in pygame.event.get():
|
|
if event.type == pygame.KEYDOWN and event.key == pygame.K_q:
|
|
pygame.quit(); return
|
|
if event.type == pygame.QUIT:
|
|
pygame.quit(); return
|
|
|
|
# render frame
|
|
draw_buffer = np.zeros((*SIZE, 3), dtype=np.uint8)
|
|
draw_buffer[frame,:] = 0xff
|
|
|
|
if 0: # updated line vizualisation
|
|
for y in range(SIZE.y):
|
|
if not (frame[:, y] != frame_prev[:, y]).any():
|
|
draw_buffer[:, y, 1] = 0
|
|
draw_buffer[:, y, 2] = 0xff
|
|
ignored_rows += 1
|
|
|
|
surf = pygame.surfarray.make_surface(draw_buffer)
|
|
display.blit(pygame.transform.scale(surf, SIZE*RENDER_SCALE), (0, 0))
|
|
|
|
pygame.display.update()
|
|
if fps: clock.tick(fps)
|
|
|
|
pygame.quit()
|
|
|
|
|
|
def main():
|
|
def encode_then_decode(frames):
|
|
decode_buffer = np.zeros(SIZE, dtype=bool)
|
|
for data in encode_frames(frames):
|
|
offset = decode_frame(decode_buffer, data)
|
|
assert len(data) == offset
|
|
yield decode_buffer
|
|
|
|
def decoder(encoded_data, reuse=True):
|
|
data_offset = 0
|
|
decode_buffer = np.zeros(SIZE, dtype=bool)
|
|
while data_offset < len(encoded_data):
|
|
data_offset = decode_frame(decode_buffer, encoded_data, data_offset)
|
|
yield decode_buffer if reuse else decode_buffer.copy()
|
|
|
|
print(f"{SIZE = }")
|
|
print(f"{LENGTH = }")
|
|
print(f"{RENDER_SCALE = }")
|
|
|
|
#render_frames(read_mp4())
|
|
#render_frames(encode_then_decode(read_mp4()))
|
|
with open("bad_apple.bin", "wb") as f: f.write(b''.join(encode_frames(read_mp4())))
|
|
|
|
#with open("bad_apple.bak.bin", "rb") as f: render_frames(decoder(f.read()) )#, fps = 30)
|
|
#with open("bad_apple.bak.bin", "rb") as f: list(encode_frames(decoder(f.read(), reuse=0)))
|
|
#with open("bad_apple.bak.bin", "rb") as f: list(encode_then_decode(decoder(f.read(), reuse=0)))
|
|
|
|
main()
|