Initial commit - split up code

master
Joshua Rubingh 2 years ago
parent 71718c3ced
commit 2849812834

@ -0,0 +1,10 @@
.git
.vscode
venv
log/*
__pycache__
**/__pycache__
**/.env
**/db.sqlite3
docker-compose.yaml

@ -0,0 +1,100 @@
---
kind: pipeline
name: notify-pipeline-start
type: kubernetes
steps:
- name: slack
image: plugins/slack
settings:
webhook:
from_secret: SLACK_WEBHOOK
link_names: true
template: >
{{#if build.pull }}
*Build started*: {{ repo.owner }}/{{ repo.name }} - <https://git.webhosting.rug.nl/{{ repo.owner }}/{{ repo.name }}/pull/{{ build.pull }}|Pull Request #{{ build.pull }}>
{{else}}
*Build started: {{ repo.owner }}/{{ repo.name }} - Build #{{ build.number }}* (type: `{{ build.event }}`)
{{/if}}
Commit: <https://git.webhosting.rug.nl/{{ repo.owner }}/{{ repo.name }}/commit/{{ build.commit }}|{{ truncate build.commit 8 }}>
Branch: <https://git.webhosting.rug.nl/{{ repo.owner }}/{{ repo.name }}/commits/{{ build.branch }}|{{ build.branch }}>
Author: {{ build.author }}
<{{ build.link }}|Visit build page ↗>
---
kind: pipeline
name: build-docker-image
type: kubernetes
steps:
- name: build-docker-image-branch
image: plugins/docker
settings:
cache_from:
- ${DRONE_REPO,,}:${DRONE_SOURCE_BRANCH/\//-}
username:
from_secret: rug_docker_repo_user
password:
from_secret: rug_docker_repo_password
repo: registry.webhosting.rug.nl/${DRONE_REPO,,}
registry: registry.webhosting.rug.nl
dockerfile: docker/Dockerfile.tusd
tags:
- ${DRONE_SOURCE_BRANCH/\//-}
- ${DRONE_SOURCE_BRANCH/\//-}-${DRONE_COMMIT_SHA:0:8}
when:
event:
exclude:
- tag
- name: nginx-frontend-proxy
image: plugins/docker
settings:
cache_from:
- ${DRONE_REPO,,}:${DRONE_SOURCE_BRANCH/\//-}
username:
from_secret: rug_docker_repo_user
password:
from_secret: rug_docker_repo_password
repo: registry.webhosting.rug.nl/${DRONE_REPO,,}-ngx
registry: registry.webhosting.rug.nl
dockerfile: docker/Dockerfile.nginx
tags:
- ${DRONE_SOURCE_BRANCH/\//-}
- ${DRONE_SOURCE_BRANCH/\//-}-${DRONE_COMMIT_SHA:0:8}
when:
event:
exclude:
- tag
---
kind: pipeline
name: notify-pipeline-end
type: kubernetes
steps:
- name: slack
image: plugins/slack
settings:
webhook:
from_secret: SLACK_WEBHOOK
link_names: true
template: >
{{#if build.pull }}
*{{#success build.status}}✔{{ else }}✘{{/success}} {{ uppercasefirst build.status }}*: {{ repo.owner }}/{{ repo.name }} - <https://git.webhosting.rug.nl/{{ repo.owner }}/{{ repo.name }}/pull/{{ build.pull }}|Pull Request #{{ build.pull }}>
{{else}}
*{{#success build.status}}✔{{ else }}✘{{/success}} {{ uppercasefirst build.status }}: {{ repo.owner }}/{{ repo.name }} - Build #{{ build.number }}* (type: `{{ build.event }}`)
{{/if}}
Commit: <https://git.webhosting.rug.nl/{{ repo.owner }}/{{ repo.name }}/commit/{{ build.commit }}|{{ truncate build.commit 8 }}>
Branch: <https://git.webhosting.rug.nl/{{ repo.owner }}/{{ repo.name }}/commits/{{ build.branch }}|{{ build.branch }}>
Author: {{ build.author }}
Duration: {{ since build.created }}
<{{ build.link }}|Visit build page ↗>
depends_on:
- build-docker-image
trigger:
status:
- success
- failure

@ -0,0 +1,12 @@
# TUS Daemon settings
# Change the required variable below to your needs.
# You can here also overrule the default variables in the startup.sh script
# This is the full url to the REST API server to post updates during uploads.
WEBHOOK_URL=http://localhost:8000/api/v1/dropoffs/webhook/
# The key for the token that is created on the REST API server for communication with the REST API server.
DROPOFF_API_HAWK_KEY=[ENTER_HAWK_KEY]
# The secret value that belongs to the token DROPOFF_API_HAWK_KEY.
DROPOFF_API_HAWK_SECRET=[ENTER_HAWK_SECRET]

8
.gitignore vendored

@ -0,0 +1,8 @@
.vscode
venv
log/*
__pycache__
**/__pycache__
**/.env
**/db.sqlite3

@ -0,0 +1,19 @@
Copyright (c) 2013-2017 Transloadit Ltd and Contributors
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -1,3 +1,42 @@
# Upload_Server
# tusd
The Upload Server Daemon (TUSD) with special webhooks
<img alt="Tus logo" src="https://github.com/tus/tus.io/blob/master/assets/img/tus1.png?raw=true" width="30%" align="right" />
> **tus** is a protocol based on HTTP for *resumable file uploads*. Resumable
> means that an upload can be interrupted at any moment and can be resumed without
> re-uploading the previous data again. An interruption may happen willingly, if
> the user wants to pause, or by accident in case of an network issue or server
> outage.
tusd is the official reference implementation of the [tus resumable upload
protocol](http://www.tus.io/protocols/resumable-upload.html). The protocol
specifies a flexible method to upload files to remote servers using HTTP.
The special feature is the ability to pause and resume uploads at any
moment allowing to continue seamlessly after e.g. network interruptions.
It is capable of accepting uploads with arbitrary sizes and storing them locally
on disk, on Google Cloud Storage or on AWS S3 (or any other S3-compatible
storage system). Due to its modularization and extensibility, support for
nearly any other cloud provider could easily be added to tusd.
**Protocol version:** 1.0.0
This branch contains tusd v1. If you are looking for the previous major release, after which
breaking changes have been introduced, please look at the [0.14.0 tag](https://github.com/tus/tusd/tree/0.14.0).
## Documentation
* [Installation](/docs/installation.md)
* [Using the `tusd` binary](/docs/usage-binary.md)
* [Monitoring the server](/docs/monitoring.md)
* [Receiving events with hooks](/docs/hooks.md)
* [Using the tusd package programmatically](/docs/usage-package.md)
* [FAQ & Common issues](/docs/faq.md)
## Build status
![Build Status](https://github.com/tus/tusd/workflows/Test/badge.svg)
## License
This project is licensed under the MIT license, see `LICENSE.txt`.

@ -0,0 +1,31 @@
version: '3'
services:
upload-server:
container_name: upload-server
env_file:
- docker/project.env
# This feels like bad practice....
build:
context: ./
dockerfile: ./docker/Dockerfile.tusd
# command: gunicorn demo_portal.wsgi:application --bind 0.0.0.0:8000 --workers=4
command: ./startup.sh
volumes:
- uploads:/home/app/web/staticfiles
upload-server-nginx:
container_name: upload-server-nginx
env_file:
- docker/project.env
build:
context: ./
dockerfile: ./docker/Dockerfile.nginx
restart: always
ports:
- 1080:80
depends_on:
- portal
volumes:
uploads:

@ -0,0 +1,10 @@
FROM nginx:mainline
RUN sed -i 's@user nginx;@user nginx;\nload_module modules/ngx_http_js_module.so;\nload_module modules/ngx_stream_js_module.so;@' /etc/nginx/nginx.conf
COPY ./nginx/tus.vhost.conf /etc/nginx/conf.d/
COPY ./nginx/njs/dropoff_tus.js /etc/nginx/
RUN rm /etc/nginx/conf.d/default.conf
RUN sed -i 's@http://localhost:1080;@http://upload-server:1080;@g' /etc/nginx/conf.d/tus.vhost.conf

@ -0,0 +1,41 @@
# Use the official Python image from the Docker Hub
FROM python:3.8
# These two environment variables prevent __pycache__/ files.
ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1
# Update packages and install encfs for data encryption
# Not working in Docker/Kubernets/Container due to not able to use fuse systems
# RUN apt-get update && apt-get install -y encfs
#RUN modprobe fuse
# Create an app user in the app group.
RUN useradd --user-group --create-home --no-log-init --shell /bin/bash app
RUN groupadd fuse
RUN usermod app -a -G fuse
ENV APP_HOME=/home/app/web
# Change the workdir.
WORKDIR ${APP_HOME}
# Copy the code.
COPY ./tusd/. ${APP_HOME}
# Upgrade pip
RUN pip install --upgrade pip wheel
# Install the storage libraries
RUN pip install -r hooks/requirements.txt
RUN rm -Rf hooks/log
RUN mkdir -p hooks/log/
# fix file rights
RUN chown -R app:app $APP_HOME
# Run as user
USER app:app
ENTRYPOINT ["/home/app/web/startup.sh"]

@ -0,0 +1,12 @@
# TUS Daemon settings
# Change the required variable below to your needs.
# You can here also overrule the default variables in the startup.sh script
# This is the full url to the REST API server to post updates during uploads.
WEBHOOK_URL=http://localhost:8000/api/v1/dropoffs/webhook/
# The key for the token that is created on the REST API server for communication with the REST API server.
DROPOFF_API_HAWK_KEY=[ENTER_HAWK_KEY]
# The secret value that belongs to the token DROPOFF_API_HAWK_KEY.
DROPOFF_API_HAWK_SECRET=[ENTER_HAWK_SECRET]

@ -0,0 +1,12 @@
# TUS Daemon settings
# Change the required variable below to your needs.
# You can here also overrule the default variables in the startup.sh script
# This is the full url to the REST API server to post updates during uploads.
WEBHOOK_URL=http://localhost:8000/api/v1/dropoffs/webhook/
# The key for the token that is created on the REST API server for communication with the REST API server.
DROPOFF_API_HAWK_KEY=[ENTER_HAWK_KEY]
# The secret value that belongs to the token DROPOFF_API_HAWK_KEY.
DROPOFF_API_HAWK_SECRET=[ENTER_HAWK_SECRET]

@ -0,0 +1,83 @@
The JSON data of a hook contains the following code:
'''
{
"Upload": {
"ID": "fae477007048c363be15cc0f6757eaf1",
"Size": 243487,
"SizeIsDeferred": false,
"Offset": 243487,
"MetaData": {
"code": "858859",
"filename": "provider_for_google_calendar-68.2.1-tb.xpi",
"filetype": "application/x-xpinstall",
"ip": "127.0.0.1",
"name": "provider_for_google_calendar-68.2.1-tb.xpi",
"relativePath": "null",
"study": "08211acd-f899-41da-a9cf-15a2db78bb03",
"type": "application/x-xpinstall"
},
"IsPartial": false,
"IsFinal": false,
"PartialUploads": null,
"Storage": {
"Path": "/opt/deploy/data_drop-off/upload_data/fae477007048c363be15cc0f6757eaf1",
"Type": "filestore"
}
},
"HTTPRequest": {
"Method": "PATCH",
"URI": "/files/fae477007048c363be15cc0f6757eaf1",
"RemoteAddr": "127.0.0.1:37722",
"Header": {
"Accept": [
"*/*"
],
"Accept-Encoding": [
"gzip, deflate"
],
"Accept-Language": [
"en-US,en;q=0.5"
],
"Connection": [
"upgrade"
],
"Content-Length": [
"243487"
],
"Content-Type": [
"application/offset+octet-stream"
],
"Dnt": [
"1"
],
"Origin": [
"http://localhost:8000"
],
"Referer": [
"http://localhost:8000/drop/08211acd-f899-41da-a9cf-15a2db78bb03"
],
"Tus-Resumable": [
"1.0.0"
],
"Upload-Offset": [
"0"
],
"User-Agent": [
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:73.0) Gecko/20100101 Firefox/73.0"
],
"X-Forwarded-For": [
"127.0.0.1"
],
"X-Forwarded-Host": [
"localhost"
],
"X-Forwarded-Proto": [
"http"
],
"X-Real-Ip": [
"127.0.0.1"
]
}
}
}
'''

@ -0,0 +1,46 @@
# In order to change the logging configuration, make a copy of this file and save it as 'logging.custom.ini'
# Then the logger will update the default logging values with your custom logging settings
[loggers]
keys=root
[handlers]
keys=consoleHandler,fileHandler,fileHandlerDebug,syslogHandler
[formatters]
keys=simpleFormatter
[logger_root]
level=DEBUG
# Add fileHandlerDebug to handlers to enable debug logging
# Add syslogHandler to handlers to enable syslog logging.
handlers=consoleHandler,fileHandler,fileHandlerDebug
[handler_consoleHandler]
class=StreamHandler
level=WARNING
formatter=simpleFormatter
args=(sys.stdout,)
[handler_fileHandler]
class=logging.handlers.TimedRotatingFileHandler
level=INFO
formatter=simpleFormatter
args=('log/hooks.log','midnight',1,30)
[handler_fileHandlerDebug]
class=logging.handlers.TimedRotatingFileHandler
level=DEBUG
formatter=simpleFormatter
args=('log/hooks.debug.log','midnight',1,30)
[handler_syslogHandler]
class=logging.handlers.SysLogHandler
level=INFO
formatter=simpleFormatter
# Use '/dev/log' for local syslog. For remote use ('hostip',portnr)
args=('/dev/log','local6')
[formatter_simpleFormatter]
format=%(asctime)s - %(levelname)-7s - %(module)-12s - %(message)s
datefmt=

@ -0,0 +1,46 @@
# In order to change the logging configuration, make a copy of this file and save it as 'logging.custom.ini'
# Then the logger will update the default logging values with your custom logging settings
[loggers]
keys=root
[handlers]
keys=consoleHandler,fileHandler,fileHandlerDebug,syslogHandler
[formatters]
keys=simpleFormatter
[logger_root]
level=DEBUG
# Add fileHandlerDebug to handlers to enable debug logging
# Add syslogHandler to handlers to enable syslog logging.
handlers=consoleHandler,fileHandler
[handler_consoleHandler]
class=StreamHandler
level=WARNING
formatter=simpleFormatter
args=(sys.stdout,)
[handler_fileHandler]
class=logging.handlers.TimedRotatingFileHandler
level=INFO
formatter=simpleFormatter
args=('log/hooks.log','midnight',1,30)
[handler_fileHandlerDebug]
class=logging.handlers.TimedRotatingFileHandler
level=DEBUG
formatter=simpleFormatter
args=('log/hooks.debug.log','midnight',1,30)
[handler_syslogHandler]
class=logging.handlers.SysLogHandler
level=INFO
formatter=simpleFormatter
# Use '/dev/log' for local syslog. For remote use ('hostip',portnr)
args=('/dev/log','local6')
[formatter_simpleFormatter]
format=%(asctime)s - %(levelname)-7s - %(module)-12s - %(message)s
datefmt=

@ -0,0 +1,8 @@
#!/bin/sh
id="$TUS_ID"
offset="$TUS_OFFSET"
size="$TUS_SIZE"
echo "Upload created with ID ${id} and size ${size}"
cat /dev/stdin | jq .

@ -0,0 +1 @@
post-finish.py

@ -0,0 +1,180 @@
#!/usr/bin/env python
import os
import logging
import logging.config
if os.path.isfile('logging.custom.ini'):
logging.config.fileConfig('logging.custom.ini')
else:
logging.config.fileConfig('logging.ini')
logger = logging.getLogger(__name__)
from datetime import datetime
start_time = datetime.now()
import sys
import json
import requests
from requests_hawk import HawkAuth
from decouple import config, Csv
from storage.storage import Storage
import storage.exceptions as StorageException
# What is the default cleaning policy:
# - None: File will be deleted when processed correctly. When there are errors, the file will remain here
# - True: File will ALWAYS be deleted
# - False: File will NEVER be deleted
AUTO_CLEAN = None
# !!!!!! NO CHANGES BELOW THIS LINE !!!!!!
# Tus webhook name
HTTP_HOOK_NAME = 'post-finish'
# Make sure you set the content-type to JSON. Else the Hawk authentication could fail due to missing content type header
webhook_headers = {
'HOOK-NAME' : HTTP_HOOK_NAME,
'Content-Type' : 'application/json',
'cache-control' : 'no-cache'
}
# Django webserver with hook url path
WEBHOOK_URL = config('WEBHOOK_URL')
# Creating a Hawk Authentication headers
hawk_auth = HawkAuth(id=config('DROPOFF_API_HAWK_KEY'), key=config('DROPOFF_API_HAWK_SECRET'))
clean_setting = 'force clean' if AUTO_CLEAN else ('all uploads ok' if AUTO_CLEAN is None else 'force keep')
logger.debug(f'Start reading from STDIN for \'{HTTP_HOOK_NAME}\' hook with cleaning setting: {clean_setting}')
# Read stdin input data from the TUS daemon
data = ''.join(sys.stdin.readlines())
# Test if data is valid JSON... just to be sure...
# And we need some data from the JSON as well
study = None
try:
logger.debug('Start parsing STDIN to validate it as JSON data')
upload_data = json.loads(data)
upload_data['error'] = 0
study = upload_data['Upload']['MetaData'].get('study')
logger.debug('STDIN data is valid JSON data')
logger.info(f'Start processing data drop for study ID: \'{study}\'')
except json.decoder.JSONDecodeError as ex:
logger.exception(f'STDIN data is NOT valid JSON data. Will stop processing further. Exception message: {ex}')
# Send exit code higher then 0 to stop the upload process on the Tus server
sys.exit(1)
uploaded_file = upload_data['Upload']['Storage'].get('Path',None)
if not os.path.exists(uploaded_file):
logger.error(f'Uploaded file for study ID: \'{study}\' is not available on disk! It has vanished from location: {uploaded_file}')
sys.exit(1)
# Here we could add some business logic like virus scanning, unzipping archives, or anonymise the data etc...
# We know for sure that JSON input data is 'valid'. So we post to the webhook for further checking and actions
try:
# Create a webhook POST request with the needed headers and data.
logger.debug(f'Post hook data back to the API ({WEBHOOK_URL}) with headers: {webhook_headers}')
# Exception will be catched if there are network errors
webhook = requests.post(WEBHOOK_URL, headers=webhook_headers, auth=hawk_auth, json=upload_data)
# print('Webhook result')
# print(webhook.status_code)
# # If the POST is ok, and we get a 200 status back, so the management system is aware of this upload and has stored the info in the database
if webhook.status_code != requests.codes.ok:
logger.error(f'The API did not give back a valid response code for study ID: \'{study}\'. Terminate with HTTP error code: {webhook.status_code}')
sys.exit(1)
# Exception will be catched if not valid JSON
storage_data = webhook.json()
# print('Start processing data')
# print(storage_data)
# Keep track of upload statuses. Either they succeed, or fail
upload_actions = []
# Loop over all the storage actions that came from the API
for storage_action in storage_data['storages']:
# Get the original filename, so that the file can be renamed back to original when the upload is done.
original_file_name = upload_data['Upload']['MetaData'].get('filename',None)
if original_file_name is None:
logger.warning('We are missing the original file name in the uploaded metadata.')
original_file_name = 'We_Do_Not_Know_The_Original_Name.unknown'
# Now we are uploading the files. We are doing this in a normal sync way, so we have to wait here.
# We get back True when upload was successfull. Else we get back False.
upload_ok = False
#print('Make storage engine')
#print(storage_action)
try:
encryption_key = None if '' == storage_action.get('encryption_password') else storage_action.get('encryption_password')
data_storage = Storage(storage_action['engine'],
storage_action['location'],
storage_action['username'],
storage_action['password'],
uploaded_file,
os.path.join(storage_action['path'],original_file_name),
encryption_key,
storage_data.get('uploader_name'),
storage_data.get('uploader_email'))
if encryption_key is not None:
data_storage.encrypt_source()
upload_ok = data_storage.upload_file()
except StorageException.FileDoesNotExist as ex:
upload_data['error'] = 1
upload_data['error_message'] = f'Error uploading source file: {original_file_name} with error: {ex}'
logger.error(upload_data['error_message'])
webhook = requests.post(WEBHOOK_URL, headers=webhook_headers, auth=hawk_auth, json=upload_data)
# TODO: Webhook call for reporting issue
except StorageException.InvalidAuthentication as ex:
upload_data['error'] = 1
upload_data['error_message'] = f'Authentication error for user: {storage_action["username"]} with error: {ex}'
logger.error(upload_data['error_message'])
webhook = requests.post(WEBHOOK_URL, headers=webhook_headers, auth=hawk_auth, json=upload_data)
# Store the upload action for later reporting
upload_actions.append({'storage' : storage_action['engine'], 'success' : upload_ok})
if upload_ok:
upload_data['done'] = 1
webhook = requests.post(WEBHOOK_URL, headers=webhook_headers, auth=hawk_auth, json=upload_data)
else:
logger.error(f'Error uploading file for study ID: \'{study}\' to storage \'{storage_action["engine"]}\'')
except requests.exceptions.RequestException as ex:
# Webhook post failed
logger.exception(f'Webhook network error for study ID: \'{study}\'. Exception message: {ex}')
sys.exit(1)
except json.decoder.JSONDecodeError as ex:
# Webhook response data is not valid JSON
logger.exception(f'Webhook response data for study ID: \'{study}\' is not JSON valid. Exception message: {ex}')
sys.exit(1)
uploads_ok = all([storage.get('success', False) for storage in upload_actions])
if not uploads_ok:
logger.error('Error uploading to the following storage(s): {}'.format(','.join([storage['storage'] for storage in upload_actions if not storage['success']])))
# We clean up the files either forced, or when all uploads are OK.
if AUTO_CLEAN or AUTO_CLEAN is None and uploads_ok:
logger.debug('Clearing original files')
# Remove original file
try:
os.remove(uploaded_file)
except FileNotFoundError as ex:
pass
# Remove the metadata file that comes with the uploaded file
try:
os.remove(f'{uploaded_file}.info')
except FileNotFoundError as ex:
pass
# All done!
logger.info('Done processing data drop for study ID: \'{}\' to storage(s) \'{}\' in {} (h:mm:ss.ms).'.format(study,
','.join([storage['storage'] for storage in upload_actions if storage['success']]),
datetime.now()-start_time))
sys.exit(0)

@ -0,0 +1,4 @@
#!/bin/sh
echo "Upload $TUS_ID ($TUS_SIZE bytes) finished"
cat /dev/stdin | jq .

@ -0,0 +1,8 @@
#!/bin/sh
id="$TUS_ID"
offset="$TUS_OFFSET"
size="$TUS_SIZE"
progress=$((100 * $offset/$size))
echo "Upload ${id} is at ${progress}% (${offset}/${size})"

@ -0,0 +1,4 @@
#!/bin/sh
echo "Upload $TUS_ID terminated"
cat /dev/stdin | jq .

@ -0,0 +1 @@
pre-create.py

@ -0,0 +1,78 @@
#!/usr/bin/env python
import os
import logging
import logging.config
if os.path.isfile('logging.custom.ini'):
logging.config.fileConfig('logging.custom.ini')
else:
logging.config.fileConfig('logging.ini')
logger = logging.getLogger(__name__)
from datetime import datetime
start_time = datetime.now()
import sys
import json
import requests
from requests_hawk import HawkAuth
from decouple import config, Csv
# !!!!!! NO CHANGES BELOW THIS LINE !!!!!!
# Tus webhook name
HTTP_HOOK_NAME = 'pre-create'
# Make sure you set the content-type to JSON. Else the Hawk authentication could fail due to missing content type header
webhook_headers = {
'HOOK-NAME' : HTTP_HOOK_NAME,
'Content-Type' : 'application/json',
'cache-control' : 'no-cache'
}
# Django webserver with hook url path
WEBHOOK_URL = config('WEBHOOK_URL')
# Creating a Hawk Authentication headers
hawk_auth = HawkAuth(id=config('DROPOFF_API_HAWK_KEY'), key=config('DROPOFF_API_HAWK_SECRET'))
logger.debug(f'Start reading from STDIN for \'{HTTP_HOOK_NAME}\' hook.')
# Read stdin input data from the TUS daemon
data = ''.join(sys.stdin.readlines())
# Test if data is valid JSON... just to be sure...
# And we need some data from the JSON as well
study = None
try:
logger.debug('Start parsing STDIN to validate it as JSON data')
upload_data = json.loads(data)
study = upload_data['Upload']['MetaData'].get('study')
logger.debug('STDIN data is valid JSON data')
logger.info(f'Start processing data drop for study ID: \'{study}\'')
except json.decoder.JSONDecodeError as ex:
logger.exception(f'STDIN data is NOT valid JSON data. Will stop processing further. Exception message: {ex}')
# Send exit code higher then 0 to stop the upload process on the Tus server
sys.exit(1)
# We know for sure that JSON input data is 'valid'. So we post to the webhook for further checking and actions
try:
# Create a webhook POST request with.
logger.debug(f'Post hook data back to the API {WEBHOOK_URL}')
# Exception will be catched if there are network errors
webhook = requests.post(WEBHOOK_URL, headers=webhook_headers, auth=hawk_auth, json=upload_data)
# If the POST is ok, and we get a 200 status back, so the upload can continue
if webhook.status_code == requests.codes.ok:
# This will make the Tus server continue the upload
logger.info(f'Done processing data drop for study ID: \'{study}\' in {datetime.now()-start_time} (h:mm:ss.ms).')
sys.exit(0)
else:
logger.error(f'Got HTTP status code: {webhook.status_code}')
except requests.exceptions.RequestException as ex:
# Webhook post failed
logger.exception(f'Webhook network error for study ID: \'{study}\'. Exception message: {ex}')
except json.decoder.JSONDecodeError as ex:
# Webhook response data is not valid JSON
logger.exception(f'Webhook response data for study ID: \'{study}\' is not JSON valid. Exception message: {ex}')
# We had some errors, so upload has to be stopped
sys.exit(1)

@ -0,0 +1,7 @@
#!/bin/sh
filename=$(cat /dev/stdin | jq .MetaData.filename)
if [ -z "$filename" ]; then
echo "Error: no filename provided"
exit 1
fi

@ -0,0 +1,8 @@
wheel==0.36.2
python-decouple==3.4
webdavclient3==3.14.5
giteapy==1.0.8
requests==2.25.1
requests-hawk==1.1.0
PyGithub==1.54.1
python-irodsclient==0.8.6

@ -0,0 +1,8 @@
import os
import logging
import logging.config
if os.path.isfile('logging.custom.ini'):
logging.config.fileConfig('logging.custom.ini')
elif os.path.isfile('logging.ini'):
logging.config.fileConfig('logging.ini')

@ -0,0 +1,29 @@
import logging
logger = logging.getLogger(__name__)
from storage.storage import BaseStorage
import os
import shutil
class LocalStorage(BaseStorage):
TYPE = 'fs'
def file_exists(self, filepath):
return os.path.exists(filepath) and os.path.isfile(filepath)
def directory_exists(self, filepath):
return os.path.exists(filepath) and os.path.isdir(filepath)
def _make_folder_action(self, path):
os.makedirs(path)
return True
def _upload_file_action(self, source, destination):
shutil.copy(source, destination)
return True
def _download_file_action(self, source, destination):
shutil.copy(source, destination)
return True

@ -0,0 +1,88 @@
import logging
logger = logging.getLogger(__name__)
from storage.storage import BaseStorage
# Gitea Support - https://pypi.org/project/giteapy/
import base64
import giteapy
from giteapy.rest import ApiException
class GiteaStorage(BaseStorage):
TYPE = 'gitea'
def __init__(self, url = None, username = None, password = None, source = None, destination = None, encryption_key = None, sender_name = None, sender_email = None):
# The repository is added to the url parameter. Use a '#' as seperator. The repository needs to be created first.
# Ex: https://git.web.rug.nl/api/v1#RepositoryName
(url, self.repository) = url.split('#')
destination = destination.strip('/')
super().__init__(url, username, password, source, destination, encryption_key, sender_name, sender_email)
# Create a commiter object when the data is uploaded through one of the invited accounts.
self.committer = None
if sender_name is not None or sender_email is not None:
self.committer = giteapy.Identity(name = sender_name,email = sender_email)
def __connect(self):
try:
assert(self.client)
except AttributeError:
# Configuration for the GITEA connection
configuration = giteapy.Configuration()
# Overrule the host url....?
configuration.host = self.url
#configuration.debug = False
configuration.api_key['access_token'] = self.password
# Create the client
self.client = giteapy.RepositoryApi(giteapy.ApiClient(configuration))
logger.info(f'Created Gitea connection to url: {self.url}')
def file_exists(self, filepath):
self.__connect()
try:
self.client.repo_get_contents(self.username, self.repository, filepath)
return True
except ApiException:
return False
def directory_exists(self, filepath):
self.__connect()
return self.file_exists(filepath)
def _make_folder_action(self, path):
# On GitHub you cannot create empty directories. So this actions will always succeed
return True
def _upload_file_action(self, source, destination):
self.__connect()
try:
with open(source,'rb') as datafile:
# This is a very big issue. Big files will be stored completely in memory :(
body = giteapy.CreateFileOptions(content = base64.b64encode(datafile.read()).decode(),
message = f'Upload from VRE DataDropOff\n Added file: {destination}',
committer = self.committer)
except Exception:
return False
try:
# Create a file in a repository
api_response = self.client.repo_create_file(self.username, self.repository, destination, body)
return True
except ApiException as ex:
logger.exception(f'Exception when calling RepositoryApi->repo_create_file: {ex}')
return True
def _download_file_action(self, source, destination):
self.__connect()
with open(destination,'wb') as destination_file:
try:
data = self.client.repo_get_contents(self.username, self.repository, source)
destination_file.write(base64.b64decode(data.content))
except ApiException as ex:
logger.exception(f'Exception when calling RepositoryApi->repo_get_contents: {ex}')
return True

@ -0,0 +1,66 @@
import logging
logger = logging.getLogger(__name__)
import os
from storage.storage import BaseStorage
# Github Support - https://pypi.org/project/PyGithub/
from github import Github, InputGitAuthor, GithubObject
from github.GithubException import UnknownObjectException
class GithubStorage(BaseStorage):
TYPE = 'github'
def __init__(self, url = None, username = None, password = None, source = None, destination = None, encryption_key = None, sender_name = None, sender_email = None):
# The repository is added to the url parameter. Use a '#' as seperator. The repository needs to be created first.
# Ex: https://api.github.com/#RepositoryName
(url, self.repository) = url.split('#')
destination = destination.strip('/')
super().__init__(url, username, password, source, destination, encryption_key, sender_name, sender_email)
# Create a commiter object when the data is uploaded through one of the invited accounts.
self.committer = GithubObject.NotSet
if sender_name is not None or sender_email is not None:
self.committer = InputGitAuthor(name=sender_name, email=sender_email)
def __connect(self):
try:
assert(self.repo)
except AttributeError:
client = Github(self.password)
self.repo = client.get_user().get_repo(self.repository)
logger.info('Created Github.com connection')
def file_exists(self, filepath):
self.__connect()
try:
self.repo.get_contents(filepath)
return True
except UnknownObjectException:
return False
def directory_exists(self, filepath):
return True
def _make_folder_action(self, path):
# On GitHub you cannot create empty directories. So this actions will always succeed
return True
def _upload_file_action(self, source, destination):
self.__connect()
# Read the file and post to Github. The library will convert to Base64
with open(source,'rb') as datafile:
self.repo.create_file(destination.strip('/'),f'Upload from VRE DataDropOff\n Added file: {destination}', datafile.read(), committer = self.committer)
return True
def _download_file_action(self, source, destination):
self.__connect()
download = self.repo.get_contents(source)
with open(destination,'wb') as destination_file:
destination_file.write(download.decoded_content)
return True

@ -0,0 +1,138 @@
import logging
logger = logging.getLogger(__name__)
from storage.storage import BaseStorage
import storage.exceptions as StorageException
# iRods support - https://pypi.org/project/python-irodsclient/
import irods
from irods.session import iRODSSession
import atexit
class iRODSStorage(BaseStorage):
TYPE = 'irods'
def __init__(self, url = None, username = None, password = None, source = None, destination = None, encryption_key = None, sender_name = None, sender_email = None):
# The iRODS zone is added to the url parameter. Use a '#' as seperator. This needs to be an Existing iRODS zone
# Ex: rdms-prod-icat.data.rug.nl#rug
(url, self.irods_zone) = url.split('#')
if destination:
destination = destination.strip('/')
super().__init__(url, username, password, source, destination, encryption_key, sender_name, sender_email)
# We need to clean up the iRODS session. Using atexit is the easiest way.
atexit.register(self.__close)
def __connect(self):
try:
assert(self.client)
except AttributeError:
# Connect to the iRODS server
self.client = None
try:
self.client = iRODSSession(host=self.url, port=1247, user=self.username, password=self.password, zone=self.irods_zone)
# Need to make a call to validate the authentication. So by checking the version, we know if we can authenticate...
logger.debug(f'iRODS {self.client.server_version} connection through *native* authentication')
except irods.exception.CAT_INVALID_AUTHENTICATION:
# Authentication scheme is not native (default), so we try PAM here
try:
self.client = iRODSSession(host=self.url, port=1247, user=self.username, password=self.password, zone=self.irods_zone, irods_authentication_scheme='pam')
logger.debug(f'iRODS {self.client.server_version} connection through *PAM* authentication')
except irods.exception.CAT_INVALID_AUTHENTICATION:
# Authentication scheme is not PAM either last try: GIS
try:
self.client = iRODSSession(host=self.url, port=1247, user=self.username, password=self.password, zone=self.irods_zone, irods_authentication_scheme='gis')
logger.debug(f'iRODS {self.client.server_version} connection through *GIS* authentication')
except irods.exception.CAT_INVALID_AUTHENTICATION:
pass
if self.client is None:
logger.error('Unable to login to the iRODS instance. Please check username and password combination!')
raise StorageException.InvalidAuthentication(self.username)
logger.info('Created iRODS connection')
def __close(self):
logger.debug('Closing iRODS storage connection and clean up')
self.client.cleanup()
def _file_exists_action(self, path):
self.__connect()
try:
self.client.data_objects.get(f'/{self.irods_zone}/home/{self.username}/{path}')
except irods.exception.DataObjectDoesNotExist:
logger.debug(f'File \'{path}\' does NOT exists on the iRODS server')
return False
except irods.exception.CollectionDoesNotExist:
logger.debug(f'Parent folder of file \'{path}\' does NOT exists on the iRODS server')
return False
return True
def _directory_exists_action(self, path):
self.__connect()
try:
self.client.collections.get(f'/{self.irods_zone}/home/{self.username}/{path}')
logger.debug(f'Folder \'{path}\' exists on the iRODS server')
except irods.exception.CollectionDoesNotExist:
logger.debug(f'Folder \'{path}\' does NOT exists on the iRODS server')
return False
return True
def _make_folder_action(self, path):
self.__connect()
try:
self.client.collections.create(f'/{self.irods_zone}/home/{self.username}/{path}')
except irods.exception.CollectionDoesNotExist:
logger.debug(f'Parent folder of file \'{path}\' does NOT exists on the iRODS server')
return False
return True
def _upload_file_action(self, source, destination):
self.__connect()
# The upload path consists of a zone, username and path
destination = f'/{self.irods_zone}/home/{self.username}/{destination}'
logger.debug(f'Uploading to file: \'{destination}\'')
try:
obj = self.client.data_objects.create(destination)
logger.debug(f'Created file: \'{destination}\'')
# Open 'both' files and copy 4K data each time.
with obj.open('w') as irods_file, open(source,'rb') as source_file_binary:
while True:
buf = source_file_binary.read(4096)
if buf:
irods_file.write(buf)
else:
break
obj.metadata.add('source',f'Upload from VRE DataDropOff\n Added file: {destination} uploaded by: {self.sender_name}({self.sender_email})')
except irods.exception.OVERWRITE_WITHOUT_FORCE_FLAG:
logger.warning('The uploaded file already exists. So we did NOT upload the new file!')
return False
return True
def _download_file_action(self, source, destination):
self.__connect()
logger.debug(f'Downloading file: \'{source}\' to \'{destination}\'')
try:
obj = self.client.data_objects.get(f'/{self.irods_zone}/home/{self.username}/{source}')
# Open 'both' files and copy 4K data each time.
with obj.open('r') as irods_source_file, open(destination,'wb') as local_destination_file:
while True:
buf = irods_source_file.read(4096)
if buf:
local_destination_file.write(buf)
else:
break
except irods.exception.DataObjectDoesNotExist:
logger.error(f'File: \'{source}\' does not exists on the iRODS server')
return False
return True

@ -0,0 +1,64 @@
import logging
logger = logging.getLogger(__name__)
from storage.storage import BaseStorage
from storage.utils import human_filesize
import storage.exceptions as StorageException
# WebDAV Support - https://pypi.org/project/webdavclient3/
from webdav3.client import Client
from webdav3.exceptions import WebDavException, ResponseErrorCode
class WebDAVStorage(BaseStorage):
TYPE = 'webdav'
def __connect(self):
# Connect to the external storage. This function can be run multiple times. It will check if it has already a connection to re-use
try:
# When this fails with an Attribute error, that means that the 'client' variable is not set and we need to make a new connection
assert(self.client)
except AttributeError:
# Because the 'client' variable is not known, the WebDAV connections is not created yet. So do it now!
self.client = Client({
'webdav_hostname' : self.url,
'webdav_login' : self.username,
'webdav_password' : self.password,
})
try:
# Here we abuse the .free check to see if the login credentials do work
free_space = self.client.free()
logger.info(f'Created WebDAV connection to url: \'{self.url}\', with space left: {human_filesize(free_space)}')
except ResponseErrorCode as ex:
# Login went wrong, so delete the client variable for next run/try
del(self.client)
# If there was an authentication error, raise exception and quit.
if 401 == ex.code:
raise StorageException.InvalidAuthentication(self.username)
# TODO: More errors.....
def _file_exists_action(self, path):
self.__connect()
return self.client.check(path)
def _directory_exists_action(self, path):
self.__connect()
return self.client.check(path)
def _make_folder_action(self, path):
self.__connect()
self.client.mkdir(path)
return True
def _upload_file_action(self, source, destination):
self.__connect()
self.client.upload(local_path = source, remote_path = destination)
return True
def _download_file_action(self, source, destination):
self.__connect()
self.client.download(source,destination)
return True

@ -0,0 +1,57 @@
class BaseStorageError(Exception):
pass
class StorageActionNotImplemented(Exception):
def __init__(self, storage, action, message='is not implemented'):
self.storage = storage
self.action = action
self.message = message
super().__init__(self.message)
def __str__(self):
return f'{self.action} on class {self.storage} {self.message}'
class FileDoesNotExist(BaseStorageError):
def __init__(self, source, message='File does not exists on disk'):
self.source = source
self.message = message
super().__init__(self.message)
def __str__(self):
return f'{self.source} -> {self.message}'
class InvalidLocation(BaseStorageError):
def __init__(self, location, message='Location does not exists or is not valid'):
self.location = location
self.message = message
super().__init__(self.message)
def __str__(self):
return f'{self.location} -> {self.message}'
class InvalidAuthentication(BaseStorageError):
def __init__(self, user, message='Authentication failed'):
self.user = user
self.message = message
super().__init__(self.message)
def __str__(self):
return f'{self.user} -> {self.message}'
class UnknownStorageEngine(BaseStorageError):
def __init__(self, engine, message='Storage engine is unknown, not available'):
self.engine = engine
self.message = message
super().__init__(self.message)
def __str__(self):
return f'{self.engine} -> {self.message}'
class MissingEncryptionKey(BaseStorageError):
def __init__(self, message='The encryption keys are missing'):
self.message = message
super().__init__(self.message)
def __str__(self):
return f'{self.message}'

@ -0,0 +1,247 @@
import logging
logger = logging.getLogger(__name__)
import tempfile
from datetime import datetime
import subprocess, shlex
import shutil
import os
import glob
import re
from pathlib import Path
import importlib
import storage.exceptions as StorageException
class Storage():
CLASS_REGEX = re.compile(r'class\s+(?P<class_name>[^\s\(]+)\s*\(\s*BaseStorage\s*\)\s*:')
# This acts like a factory function. It will return a storage object from the requested engine
def __new__(self, storage_type, url = None, username = None, password = None, source = None, destination = None, encryption_key = None, sender_name = None, sender_email = None):
storage_type = storage_type.lower()
engines = Storage.__load_storage_engines()
logger.debug(f'Available storage engines({len(Storage.available_engines())}): {Storage.available_engines()}')
if storage_type not in engines:
raise StorageException.UnknownStorageEngine(storage_type)
return engines[storage_type](url, username, password, source, destination, encryption_key, sender_name, sender_email)
@staticmethod
def __load_storage_engines():
loaded_engines = {}
engines = (Path(__file__)).parent.joinpath('engines')
for engine in [x for x in engines.glob('*.py') if x.is_file()]:
with engine.open() as python_file:
data = python_file.read()
class_name = Storage.CLASS_REGEX.findall(data)
if len(class_name) == 1:
storage_engine_module = importlib.import_module('.{}' . format(engine.stem),package='storage.engines')
storage_engine_class = getattr(storage_engine_module, class_name[0])
loaded_engines[storage_engine_class.TYPE.lower()] = storage_engine_class
return loaded_engines
@staticmethod
def available_engines():
engines = list(Storage.__load_storage_engines().keys())
engines.sort()
return engines
class BaseStorage():
ENCFS_XML = '.encfs6.xml'
ENCRYPT_CMD = '/usr/bin/encfs'
FUSER_MOUNT = '/bin/fusermount'
TYPE = ''
def __init__(self, url = None, username = None, password = None, source = None, destination = None, encryption_key = None, sender_name = None, sender_email = None):
if source is not None and not os.path.exists(source):
logger.error(f'Source file is not available on disk! It has vanished from: {source}')
raise StorageException.FileDoesNotExist(source)
# if destination is None:
# logger.error(f'Destination is not valid: {destination}')
# raise StorageException.InvalidLocation(destination)
self.source = source
self.destination_dir = None if destination is None else os.path.dirname(destination)
self.destination_file = None if destination is None else os.path.basename(destination)
self.encryption_key = encryption_key
self.encrypted = False
self.url = url
self.username = username
self.password = password
self.sender_name = sender_name
self.sender_email = sender_email
def encrypt_source(self):
if self.encryption_key is None:
logger.error(f'Cannot encrypt source file {self.source} due to missing encryption key!')
raise StorageException.MissingEncryptionKey()
if self.encrypted:
logger.warning('File is already encrypted')
return True
start_time = datetime.now()
logger.info(f'Encrypting new uploaded file: {self.source}')
encrypted_dir = tempfile.mkdtemp()
logger.debug(f'Created encrypted source folder: {encrypted_dir}')
decoded_dir = tempfile.mkdtemp()
logger.debug(f'Created decoded folder: {decoded_dir}')
new_encryption_setup = True
existing_encfs_file = os.path.join(self.destination_dir,BaseStorage.ENCFS_XML)
logger.debug(f'Check for existing encryption key file \'{existing_encfs_file}\' on the destination location.')
if self.file_exists(existing_encfs_file):
logger.debug(f'Copying existing \'{BaseStorage.ENCFS_XML}\' file...')
self.download_file(existing_encfs_file, os.path.join(encrypted_dir,BaseStorage.ENCFS_XML))
logger.info(f'Using existing \'{existing_encfs_file}\' from destination location.')
new_encryption_setup = False
# Mounting part between source and encrypted folder
# TODO: Check what happens when there are spaces in the dir names... need some quotes I guess
cmd = f'{BaseStorage.ENCRYPT_CMD} --standard -S {encrypted_dir} {decoded_dir}'
logger.debug(f'Creating an encrypted EncFS mount point with command: {cmd}')
process = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
# # Send encryption password
logger.debug('Mounting in action. Sending encryption key...')
(output,error) = process.communicate(input=self.encryption_key.encode())
if process.wait(timeout=30) != 0:
output = output.decode().strip()
logger.error(f'Error creating an encrypted mount with EncFS. Error: \'{output}\'')
raise RuntimeError(f'Mounting error EncFS: {output}')
logger.debug(f'Mountpoint is ready at path: {decoded_dir}')
if new_encryption_setup:
logger.info(f'We have a new \'{BaseStorage.ENCFS_XML}\' file that needs to be moved to the same destination: {self.destination_dir}')
self.upload_file(os.path.join(encrypted_dir,BaseStorage.ENCFS_XML), existing_encfs_file, True)
# Here we ignore the subdirectories on the destination. This will be fixed during the upload
destination_file = os.path.join(decoded_dir,self.destination_dir, self.destination_file)
logger.debug(f'Moving source file \'{self.source}\' to \'{destination_file}\' for encryption.')
os.makedirs(os.path.dirname(destination_file))
shutil.move(self.source,destination_file)
# Here we umount the decoded directory, so we only have the encypted data left
logger.debug(f'Encrypting is done, un-mounting decoded folder: {decoded_dir}')
cmd = f'{BaseStorage.FUSER_MOUNT} -u {decoded_dir}'
logger.debug(f'Umounting cmd: {cmd}')
process = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
if process.wait() != 0:
# TODO: Better error handling... Add raise exception
logger.error(f'Error un-mounting mount point: {decoded_dir}')
raise RuntimeError(f'Un-mounting error EncFS: {process}')
logger.debug(f'Cleanup temporary decoded dir: {decoded_dir}')
shutil.rmtree(decoded_dir)
# Find the newly created encrypted file and move it back to the original source file
# We use the glob function so we can also support subdirectories in the encrypted storage
logger.debug(f'Finding newly created encrypted file in the encrypted source folder: {encrypted_dir}')
encrypted_listing = glob.glob(f'{encrypted_dir}/**',recursive=True)
logger.debug(f'Found encrypted file: {encrypted_listing[-1]}')
# Source file is been changed to the new encrypted file name. So use that for the file upload process
self.source = os.path.join(os.path.dirname(self.source),os.path.basename(encrypted_listing[-1]))
self.destination_file = os.path.basename(self.source)
logger.debug(f'Moving encrypted file {encrypted_listing[-1]} back to original file: {self.source}')
logger.debug(f'Updated the destination file name based on the encrypted name: {self.destination_file}')
shutil.move(encrypted_listing[-1],self.source)
logger.info(f'Encrypted to \'{self.source}\' in {datetime.now() - start_time} (h:mm:ss.ms)')
self.encrypted = True
return True
def file_exists(self, path):
logger.debug(f'Check if file exists at path: \'{path}\' with engine: \'{self.TYPE}\'')
file_exists = self._file_exists_action(path)
exists = 'exist' if file_exists else 'does not exist'
logger.debug(f'File \'{path}\' {exists} on storage \'{self.TYPE}\'')
return file_exists
def upload_file(self, source = None, destination = None, move = False):
source = self.source if source is None else source
destination = os.path.join(self.destination_dir,self.destination_file) if destination is None else destination
upload_ok = None
if source is None or destination is None:
logger.error(f'Error uploading file. Either source: \'{source}\' or destination: \'{destination}\' is not set!')
start_time = datetime.now()
logger.debug(f'Start uploading file: \'{source}\' to: \'{destination}