Uploading media to Gemini¶
This notebook processes an experiment file and associate each media element with the id of the file when uploaded using the Files API
In [ ]:
Copied!
import os
import json
import time
import tqdm
import base64
import hashlib
import google.generativeai as genai
import os
import json
import time
import tqdm
import base64
import hashlib
import google.generativeai as genai
In [ ]:
Copied!
# Set the location of the experiment and media
experiment_location = "data/input"
filename = "gemini-multimodal-example.jsonl"
media_location = "data/media"
# Set the location of the experiment and media
experiment_location = "data/input"
filename = "gemini-multimodal-example.jsonl"
media_location = "data/media"
In [ ]:
Copied!
# Load the GEMINI_API_KEY from the environment
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
if GEMINI_API_KEY is None:
raise ValueError("GEMINI_API_KEY is not set")
genai.configure(api_key=GEMINI_API_KEY)
# Load the GEMINI_API_KEY from the environment
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
if GEMINI_API_KEY is None:
raise ValueError("GEMINI_API_KEY is not set")
genai.configure(api_key=GEMINI_API_KEY)
In [ ]:
Copied!
def compute_sha256_base64(file_path, chunk_size=8192):
"""
Compute the SHA256 hash of the file at 'file_path' and return it as a base64-encoded string.
"""
hasher = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
hasher.update(chunk)
return base64.b64encode(hasher.digest()).decode("utf-8")
def remote_file_hash_base64(remote_file):
"""
Convert a remote file's SHA256 hash (stored as a hex-encoded UTF-8 bytes object)
to a base64-encoded string.
"""
hex_str = remote_file.sha256_hash.decode("utf-8")
raw_bytes = bytes.fromhex(hex_str)
return base64.b64encode(raw_bytes).decode("utf-8")
def wait_for_processing(file_obj, poll_interval=10):
"""
Poll until the file is no longer in the 'PROCESSING' state.
Returns the updated file object.
"""
while file_obj.state.name == "PROCESSING":
print("Waiting for file to be processed...")
time.sleep(poll_interval)
file_obj = genai.get_file(file_obj.name)
return file_obj
def upload(file_path, already_uploaded_files):
"""
Upload the file at 'file_path' if it hasn't been uploaded yet.
If a file with the same SHA256 (base64-encoded) hash exists, returns its name.
Otherwise, uploads the file, waits for it to be processed,
and returns the new file's name. Raises a ValueError if processing fails.
"""
local_hash = compute_sha256_base64(file_path)
if local_hash in already_uploaded_files:
return already_uploaded_files[local_hash], already_uploaded_files
# Upload the file if it hasn't been found.
file_obj = genai.upload_file(path=file_path)
file_obj = wait_for_processing(file_obj)
if file_obj.state.name == "FAILED":
raise ValueError("File processing failed")
already_uploaded_files[local_hash] = file_obj.name
return already_uploaded_files[local_hash], already_uploaded_files
def compute_sha256_base64(file_path, chunk_size=8192):
"""
Compute the SHA256 hash of the file at 'file_path' and return it as a base64-encoded string.
"""
hasher = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
hasher.update(chunk)
return base64.b64encode(hasher.digest()).decode("utf-8")
def remote_file_hash_base64(remote_file):
"""
Convert a remote file's SHA256 hash (stored as a hex-encoded UTF-8 bytes object)
to a base64-encoded string.
"""
hex_str = remote_file.sha256_hash.decode("utf-8")
raw_bytes = bytes.fromhex(hex_str)
return base64.b64encode(raw_bytes).decode("utf-8")
def wait_for_processing(file_obj, poll_interval=10):
"""
Poll until the file is no longer in the 'PROCESSING' state.
Returns the updated file object.
"""
while file_obj.state.name == "PROCESSING":
print("Waiting for file to be processed...")
time.sleep(poll_interval)
file_obj = genai.get_file(file_obj.name)
return file_obj
def upload(file_path, already_uploaded_files):
"""
Upload the file at 'file_path' if it hasn't been uploaded yet.
If a file with the same SHA256 (base64-encoded) hash exists, returns its name.
Otherwise, uploads the file, waits for it to be processed,
and returns the new file's name. Raises a ValueError if processing fails.
"""
local_hash = compute_sha256_base64(file_path)
if local_hash in already_uploaded_files:
return already_uploaded_files[local_hash], already_uploaded_files
# Upload the file if it hasn't been found.
file_obj = genai.upload_file(path=file_path)
file_obj = wait_for_processing(file_obj)
if file_obj.state.name == "FAILED":
raise ValueError("File processing failed")
already_uploaded_files[local_hash] = file_obj.name
return already_uploaded_files[local_hash], already_uploaded_files
In [ ]:
Copied!
# Retrieve already uploaded files
uploaded_files = {
remote_file_hash_base64(remote_file): remote_file.name
for remote_file in genai.list_files()
}
print(f"Found {len(uploaded_files)} files already uploaded")
# Retrieve already uploaded files
uploaded_files = {
remote_file_hash_base64(remote_file): remote_file.name
for remote_file in genai.list_files()
}
print(f"Found {len(uploaded_files)} files already uploaded")
In [ ]:
Copied!
files_to_upload = set()
experiment_path = f"{experiment_location}/{filename}"
# Read and collect media file paths
with open(experiment_path, "r") as f:
lines = f.readlines()
data_list = []
for line in lines:
data = json.loads(line)
data_list.append(data)
if not isinstance(data.get("prompt"), list):
continue
files_to_upload.update(
f'{media_location}/{el["media"]}'
for prompt in data["prompt"]
for part in prompt.get("parts", [])
if isinstance(el := part, dict) and "media" in el
)
# Upload files and store mappings
genai_files = {}
for file_path in tqdm.tqdm(files_to_upload):
uploaded_filename, uploaded_files = upload(file_path, uploaded_files)
genai_files[file_path] = uploaded_filename
# Modify data to include uploaded filenames
for data in data_list:
if isinstance(data.get("prompt"), list):
for prompt in data["prompt"]:
for part in prompt.get("parts", []):
if isinstance(part, dict) and "media" in part:
file_path = f'{media_location}/{part["media"]}'
if file_path in genai_files:
part["uploaded_filename"] = genai_files[file_path]
else:
print(f"Failed to find {file_path} in genai_files")
# Write modified data back to the JSONL file
with open(experiment_path, "w") as f:
for data in data_list:
f.write(json.dumps(data) + "\n")
files_to_upload = set()
experiment_path = f"{experiment_location}/{filename}"
# Read and collect media file paths
with open(experiment_path, "r") as f:
lines = f.readlines()
data_list = []
for line in lines:
data = json.loads(line)
data_list.append(data)
if not isinstance(data.get("prompt"), list):
continue
files_to_upload.update(
f'{media_location}/{el["media"]}'
for prompt in data["prompt"]
for part in prompt.get("parts", [])
if isinstance(el := part, dict) and "media" in el
)
# Upload files and store mappings
genai_files = {}
for file_path in tqdm.tqdm(files_to_upload):
uploaded_filename, uploaded_files = upload(file_path, uploaded_files)
genai_files[file_path] = uploaded_filename
# Modify data to include uploaded filenames
for data in data_list:
if isinstance(data.get("prompt"), list):
for prompt in data["prompt"]:
for part in prompt.get("parts", []):
if isinstance(part, dict) and "media" in part:
file_path = f'{media_location}/{part["media"]}'
if file_path in genai_files:
part["uploaded_filename"] = genai_files[file_path]
else:
print(f"Failed to find {file_path} in genai_files")
# Write modified data back to the JSONL file
with open(experiment_path, "w") as f:
for data in data_list:
f.write(json.dumps(data) + "\n")