#!/usr/bin/env python3 from glm import ivec2 from PIL import Image from typing import Iterable import av import numpy as np SIZE = ivec2(4, 3) * 64 / 4 SIZE = ivec2(4, 3) * 64 / 3 LENGTH = SIZE.x * SIZE.y RENDER_SCALE = 13 def read_mp4() -> Iterable[np.array]: container = av.open("bad_apple.mp4") for frame_handler in container.decode(video=0): # convert frame to bool array image = frame_handler.to_image() image = image.resize(SIZE) # downscale frame = np.array(image)[:,:,0].T > 128 yield frame def encode_frames(frames) -> Iterable[bytes]: no_compression_cumulative, compression_cumulative = 0, 0 frame_prev = np.zeros((SIZE), dtype=bool) # assume black frame at start for n, frame in enumerate(frames): data = encode_frame(frame, frame_prev) yield data frame_prev = frame no_compression_cumulative += LENGTH // 8 compression_cumulative += len(data) print("frame", f"{n:>5}| len:", f"{len(data):>5} =", f"{compression_cumulative:>8} /", f"{no_compression_cumulative} =>", f"{compression_cumulative / no_compression_cumulative * 100:>10.3}% ratio", end="\r") print() def encode_frame(frame, frame_prev) -> bytes: def group(frame_data): mask = frame_data[:-1] != frame_data[1:] mask = np.concatenate((mask, [True])) sizes = np.arange(LENGTH) sizes = sizes[mask] sizes[1:] -= sizes[:-1] sizes[0] += 1 return zip(sizes, frame_data[mask]) def group_(frame_data): # slow, but readable n = 1 for a, b in zip(frame_data[:-1], frame_data[1:]): if a != b: yield n, a n = 1 else: n += 1 yield n, b out = [] # bytearray() ? write_byte = out.append prev_skipped = True # don't skip the first one n_pixels_left = LENGTH #for n_values, set_value in group((frame != frame_prev).reshape(-1).T): for n_values, set_value in group((frame != frame_prev).reshape(-1)): assert n_values <= 0x3fff ## skip n_values == 1, they can be inferred n_pixels_left -= n_values # avoid skipping the last one if n_values == 1 and not prev_skipped and n_pixels_left: prev_skipped = True # avoid skippin multiple times in a row continue prev_skipped = False if n_values > 0x3f: write_byte( (set_value << 7) | (1 << 6) | (n_values >> 8) ) write_byte( n_values & 0xff ) else: write_byte( (set_value << 7) | (n_values & 0x3f) ) #print(bytes(out)) return bytes(out) def decode_frame(target_buffer, data, data_offset=0) -> int: def read_byte(): nonlocal data_offset try: out = data[data_offset] except: print(f"\n\n{data_offset=} {len(data)=}") raise data_offset += 1 return out buffer = np.zeros(LENGTH, dtype=bool) buffer_offset = 0 prev_value = None while buffer_offset != LENGTH: byte = read_byte() value = byte >> 7 if byte & 0x40: n_values = ((byte & 0x3f) << 8) | read_byte() else: n_values = (byte & 0x3f) if prev_value == value: # there was an omitted n_values=1 in between buffer[buffer_offset] = not value buffer_offset += 1 buffer[buffer_offset:buffer_offset + n_values] = value buffer_offset += n_values prev_value = value #target_buffer ^= buffer.reshape((SIZE.y, SIZE.x)).T target_buffer ^= buffer.reshape(SIZE) return data_offset def render_frames(frames, fps = None): import pygame pygame.init() display = pygame.display.set_mode(SIZE * RENDER_SCALE) clock = pygame.time.Clock() ignored_rows = 0 for frame in frames: for event in pygame.event.get(): if event.type == pygame.KEYDOWN and event.key == pygame.K_q: pygame.quit(); return if event.type == pygame.QUIT: pygame.quit(); return # render frame draw_buffer = np.zeros((*SIZE, 3), dtype=np.uint8) draw_buffer[frame,:] = 0xff if 0: # updated line vizualisation for y in range(SIZE.y): if not (frame[:, y] != frame_prev[:, y]).any(): draw_buffer[:, y, 1] = 0 draw_buffer[:, y, 2] = 0xff ignored_rows += 1 surf = pygame.surfarray.make_surface(draw_buffer) display.blit(pygame.transform.scale(surf, SIZE*RENDER_SCALE), (0, 0)) pygame.display.update() if fps: clock.tick(fps) pygame.quit() def main(): def encode_then_decode(frames): decode_buffer = np.zeros(SIZE, dtype=bool) for data in encode_frames(frames): offset = decode_frame(decode_buffer, data) assert len(data) == offset yield decode_buffer def decoder(encoded_data, reuse=True): data_offset = 0 decode_buffer = np.zeros(SIZE, dtype=bool) while data_offset < len(encoded_data): data_offset = decode_frame(decode_buffer, encoded_data, data_offset) yield decode_buffer if reuse else decode_buffer.copy() print(f"{SIZE = }") print(f"{LENGTH = }") print(f"{RENDER_SCALE = }") #render_frames(read_mp4()) #render_frames(encode_then_decode(read_mp4())) with open("bad_apple.bin", "wb") as f: f.write(b''.join(encode_frames(read_mp4()))) #with open("bad_apple.bak.bin", "rb") as f: render_frames(decoder(f.read()) )#, fps = 30) #with open("bad_apple.bak.bin", "rb") as f: list(encode_frames(decoder(f.read(), reuse=0))) #with open("bad_apple.bak.bin", "rb") as f: list(encode_then_decode(decoder(f.read(), reuse=0))) main()