From the “my blog is actually my code backup store” department, this is a simple function I use on Google Cloud to accept a base64-encoded zip file, then unzip it all to a Google Cloud Storage bucket.
import functions_framework
import base64
from google.cloud import storage
import zipfile
import os
import datetime, pytz
@functions_framework.http
def hello_http(request):
#pull out the base64 encoded data and decode it.
request_bytes = base64.b64decode(request.get_data())
print(len(request_bytes))
print(request.data)
print(request.content_length)
print(request.method)
print(request.method)
print("request load: " + str(len(request_bytes)))
#generate working directory prefix name
datetime_string = datetime.datetime.now(pytz.timezone("US/Central")).isoformat()
print("Current Chicago Date-Time: %s" % (datetime_string))
directory_prefix = datetime_string[:19].replace(":", "-")
#dump to GCS
gcs_client = storage.Client()
gcs_bucket = gcs_client.get_bucket("bucket name goes here")
file_blob = storage.Blob("/zipped/" + directory_prefix + ".zip", gcs_bucket)
file_blob.upload_from_string(request_bytes, content_type="application/zip", client=gcs_client)
#dump to temporary directory within functions
with open("/tmp/" + directory_prefix + ".zip", "wb") as file:
file.write(request_bytes)
file.flush()
file.close()
#extract zipfile
zip_file = zipfile.ZipFile("/tmp/" + directory_prefix + ".zip")
zip_file.extractall("/tmp/local/unzip/" + directory_prefix + "/")
#go through extracted files
for file_name in os.listdir("/tmp/local/unzip/" + directory_prefix + "/"):
print(str(file_name))
file_blob = storage.Blob("/open/" + directory_prefix + "/" + file_name + "", gcs_bucket)
file_blob.upload_from_filename("/tmp/local/unzip/" + directory_prefix + "/" + file_name + "", client=gcs_client)
print("end")
return str(len(request_bytes))
You may want to alter the date reference (the US/Central note) but otherwise it’s a small and efficient tool for moving data where I can easily reference it later by date.