diff options
Diffstat (limited to 'Src/replicant/nx/win/NXFileProgressiveDownloader.cpp')
-rw-r--r-- | Src/replicant/nx/win/NXFileProgressiveDownloader.cpp | 749 |
1 files changed, 749 insertions, 0 deletions
diff --git a/Src/replicant/nx/win/NXFileProgressiveDownloader.cpp b/Src/replicant/nx/win/NXFileProgressiveDownloader.cpp new file mode 100644 index 00000000..a8452c72 --- /dev/null +++ b/Src/replicant/nx/win/NXFileProgressiveDownloader.cpp @@ -0,0 +1,749 @@ +#include "NXFileObject.h" +#include "nu/ProgressTracker.h" +#include "nx/nxthread.h" +#include "nx/nxsleep.h" +#include "jnetlib/jnetlib.h" +#include "../nswasabi/AutoCharNX.h" +#include "nswasabi/ReferenceCounted.h" +#include "nu/MessageLoop.h" +#include <time.h> +#include <new> +#include "../../../WAT/WAT.h" + +/* TODO: benski> test this with a server that does not return content-length. I bet we could get it to work */ + +/* TODO: benski> on windows, we can use a single CreateFile HANDLE for both reading and writing + and use ReadFile(..., &overlapped) to maintain two separate file pointers + this should improve performance as they will share the same cache + _might_ have to use async I/O to get it to work (but use it synchronously by waiting on the handle after making the call + */ + +#define HTTP_BUFFER_SIZE 65536 + +class NXFileObject_ProgressiveDownloader; + +enum +{ + MESSAGE_KILL, + MESSAGE_SEEK, + MESSAGE_SIZE, + MESSAGE_ERROR, + MESSAGE_CLOSED, + MESSAGE_CONNECTED, +}; + +char MessageString[6][10] = +{ + "Kill", + "Seek", + "Size", + "Error", + "Closed", + "Connected" +}; + + +struct seek_message_t : public nu::message_node_t +{ + uint64_t start; + uint64_t end; +}; + +struct size_message_t : public nu::message_node_t +{ + uint64_t size; +}; + +struct error_message_t : public nu::message_node_t +{ + int error_code; +}; + +/* This class represents the thread that's actually downloading the content from the server */ +class ProgressiveDownload +{ +public: + ProgressiveDownload(ProgressTracker &progress_tracker, NXFileObject_ProgressiveDownloader &parent); + ~ProgressiveDownload(); + ns_error_t Initialize(nx_uri_t uri, jnl_http_t http, const char *user_agent, nx_uri_t temp_uri); + + void Seek(uint64_t start, uint64_t end); + void Close(); +private: + /* These functions are called on the local thread */ + /* These functions run on the download thread */ + static nx_thread_return_t NXTHREADCALL _ProgressiveThread(nx_thread_parameter_t param) { return ((ProgressiveDownload *)param)->ProgressiveThread(); } + nx_thread_return_t NXTHREADCALL ProgressiveThread(); + int Connect(); + void Internal_Write(const void *data, size_t data_len); + int Wait(int milliseconds); + ns_error_t SetupConnection(uint64_t start_position, uint64_t end_position); + int DoRead(void *buffer, size_t bufferlen); + void ProcessMessage(nu::message_node_t *message); +private: + ProgressTracker &progress_tracker; + NXFileObject_ProgressiveDownloader &parent; + + nx_uri_t temp_filename, url; + FILE *progressive_file_write; + jnl_http_t http; + char *user_agent; + nx_thread_t download_thread; + nu::MessageLoop message_loop; + uint64_t file_size; + int killswitch; +}; + +class NXFileObject_ProgressiveDownloader: public NXFileObject +{ +public: + NXFileObject_ProgressiveDownloader(); + ~NXFileObject_ProgressiveDownloader(); + ns_error_t Initialize(nx_uri_t uri, jnl_http_t http, const char *user_agent); + + bool Available(uint64_t size, uint64_t *available); + + /* API used by ProgressiveDownload */ + void OnFileSize(uint64_t filesize); + void OnConnected(); + void OnError(int error_code); + void OnClosed(); +private: + /* NXFileObject implementation */ + ns_error_t Read(void *buffer, size_t bytes_requested, size_t *bytes_read); + ns_error_t Write(const void *buffer, size_t bytes); + ns_error_t Seek(uint64_t position); + ns_error_t Tell(uint64_t *position); + ns_error_t PeekByte(uint8_t *byte); + ns_error_t Sync(); + ns_error_t Truncate(); + + bool WaitForRead(uint64_t size); + void ProcessMessage(nu::message_node_t *message); + void Wait(unsigned int milliseconds); + + ProgressiveDownload download; + ProgressTracker progress_tracker; + FILE *progressive_file_read; + bool end_of_file; + bool connected; + int error_code; + nu::MessageLoop message_loop; + bool closed; + bool need_seek; // if set to true, we need to fseek(position) +}; + +ProgressiveDownload::ProgressiveDownload(ProgressTracker &progress_tracker, NXFileObject_ProgressiveDownloader &parent) : progress_tracker(progress_tracker), parent(parent) +{ + killswitch=0; + url=0; + temp_filename=0; + progressive_file_write=0; + http=0; + user_agent=0; + download_thread=0; + file_size=0; +} + +ProgressiveDownload::~ProgressiveDownload() +{ + if (download_thread) + { + Close(); + NXThreadJoin(download_thread, 0); + } + + // TODO: flush messages + if (progressive_file_write) + fclose(progressive_file_write); + NXURIRelease(temp_filename); + NXURIRelease(url); + if (http) + jnl_http_release(http); + free(user_agent); +} + +void ProgressiveDownload::Close() +{ + nu::message_node_t *message = message_loop.AllocateMessage(); + message->message = MESSAGE_KILL; + message_loop.PostMessage(message); +} + +void ProgressiveDownload::Seek(uint64_t start, uint64_t end) +{ + seek_message_t *message = (seek_message_t *)message_loop.AllocateMessage(); + message->message = MESSAGE_SEEK; + message->start = start; + message->end = end; + message_loop.PostMessage(message); +} + +ns_error_t ProgressiveDownload::Initialize(nx_uri_t url, jnl_http_t http, const char *user_agent, nx_uri_t temp_filename) +{ + this->url = NXURIRetain(url); + this->temp_filename = NXURIRetain(temp_filename); + if (user_agent) + this->user_agent = strdup(user_agent); + this->http = jnl_http_retain(http); + progressive_file_write = NXFile_fopen(temp_filename, nx_file_FILE_readwrite_binary); + if (progressive_file_write == 0) + return NErr_FailedCreate; + + return NXThreadCreate(&download_thread, _ProgressiveThread, this); +} + +void ProgressiveDownload::ProcessMessage(nu::message_node_t *message) +{ + switch(message->message) + { + case MESSAGE_KILL: + killswitch=1; + break; + case MESSAGE_SEEK: + { + seek_message_t *seek_message = (seek_message_t *)message; + + char buffer[HTTP_BUFFER_SIZE] = {0}; + + /* empty out the jnetlib buffer. that might let us be able to avoid this seek */ + DoRead(buffer, sizeof(buffer)); + + uint64_t new_start, new_end; + if (!progress_tracker.Valid(seek_message->start, seek_message->end) /* double check that we actually need to seek */ + && !progress_tracker.Seek(seek_message->start, seek_message->end, &new_start, &new_end)) + { + int ret = SetupConnection(new_start, new_end); + if (ret == NErr_Success) + ret = Connect(); + if (ret != NErr_Success) + { + parent.OnError(ret); + killswitch=1; + break; + } + + _fseeki64(progressive_file_write, new_start, SEEK_SET); + } + else + parent.OnConnected(); + } + break; + } + + message_loop.FreeMessage(message); +} + +int ProgressiveDownload::Wait(int milliseconds) +{ + for (;;) + { + if (killswitch) + return 1; + + nu::message_node_t *message = message_loop.PeekMessage(milliseconds); + if (message) + ProcessMessage(message); + else + break; + } + + nu::message_node_t *message = message_loop.PeekMessage(milliseconds); + if (message) + ProcessMessage(message); + + return killswitch; +} + +ns_error_t ProgressiveDownload::SetupConnection(uint64_t start_position, uint64_t end_position) +{ + if (!http) + http = jnl_http_create(HTTP_BUFFER_SIZE, 0); + + if (!http) + return NErr_FailedCreate; + + jnl_http_reset_headers(http); + if (user_agent) + jnl_http_addheadervalue(http, "User-Agent", user_agent); + + if (start_position && start_position != (uint64_t)-1) + { + if (end_position == (uint64_t)-1) + { + char temp[128] = {0}; + sprintf(temp, "Range: bytes=%llu-", start_position); + jnl_http_addheader(http, temp); + } + else + { + char temp[128] = {0}; + sprintf(temp, "Range: bytes=%llu-%llu", start_position, end_position); + jnl_http_addheader(http, temp); + } + } + + jnl_http_addheader(http, "Connection: Close"); // TODO: change if we ever want a persistent connection and downloading in chunks + jnl_http_connect(http, AutoCharUTF8(url), 1, "GET"); + + return NErr_Success; +} + +int ProgressiveDownload::Connect() +{ + // TODO: configurable timeout + /* wait for connection */ +#ifdef _DEBUG + const int timeout = 15000; +#else + const int timeout = 15; +#endif + time_t start_time = time(0); + + int http_status = jnl_http_get_status(http); + while (http_status == HTTPGET_STATUS_CONNECTING || http_status == HTTPGET_STATUS_READING_HEADERS) + { + if (Wait(55) != 0) + return NErr_Interrupted; + + int ret = jnl_http_run(http); + if (ret == HTTPGET_RUN_ERROR) + return NErr_ConnectionFailed; + if (start_time + timeout < time(0)) + return NErr_TimedOut; + + http_status = jnl_http_get_status(http); + } + + if (http_status == HTTPGET_STATUS_ERROR) + { + switch(jnl_http_getreplycode(http)) + { + case 400: + return NErr_BadRequest; + case 401: + // TODO: deal with this specially + return NErr_Unauthorized; + case 403: + // TODO: deal with this specially? + return NErr_Forbidden; + case 404: + return NErr_NotFound; + case 405: + return NErr_BadMethod; + case 406: + return NErr_NotAcceptable; + case 407: + // TODO: deal with this specially + return NErr_ProxyAuthenticationRequired; + case 408: + return NErr_RequestTimeout; + case 409: + return NErr_Conflict; + case 410: + return NErr_Gone; + case 500: + return NErr_InternalServerError; + case 503: + return NErr_ServiceUnavailable; + default: + return NErr_ConnectionFailed; + } + } + else + { + if (!file_size) + { + // TODO: check range header for actual size + file_size = jnl_http_content_length(http); + parent.OnFileSize(file_size); + } + parent.OnConnected(); + return NErr_Success; + } +} + +void ProgressiveDownload::Internal_Write(const void *data, size_t data_len) +{ + size_t bytes_written = fwrite(data, 1, data_len, progressive_file_write); + fflush(progressive_file_write); + progress_tracker.Write(bytes_written); +} + +int ProgressiveDownload::DoRead(void *buffer, size_t bufferlen) +{ + int ret = jnl_http_run(http); + size_t bytes_received; + do + { + ret = jnl_http_run(http); + bytes_received = jnl_http_get_bytes(http, buffer, bufferlen); + if (bytes_received) + { + Internal_Write(buffer, bytes_received); + } + /* TODO: benski> should we limit the number of times through this loop? + I'm worried that if data comes in fast enough we might get stuck in this for a long time */ + } while (bytes_received == bufferlen); + return ret; +} + +nx_thread_return_t ProgressiveDownload::ProgressiveThread() +{ + ns_error_t ret; + + if (!http) + { + ret = SetupConnection(0, (uint64_t)-1); + if (ret != NErr_Success) + { + parent.OnError(ret); + parent.OnClosed(); + return 0; + } + } + + + ret = Connect(); + if (ret != NErr_Success) + { + parent.OnError(ret); + } + else + { + for (;;) + { + if (Wait(10) == 1) + break; // killed! + + char buffer[HTTP_BUFFER_SIZE] = {0}; + int ret = DoRead(buffer, sizeof(buffer)); + if (ret == -1) + break; + else if (ret == HTTPGET_RUN_CONNECTION_CLOSED) + { + if (jnl_http_bytes_available(http) == 0) + { + if (progress_tracker.Valid(0, file_size)) + { + // file is completely downloaded. let's gtfo + fclose(progressive_file_write); + progressive_file_write=0; + break; + } + + // if we're not completely full then we need to sit around for a potential MESSAGE_SEEK + //while (Wait(100) == 0) + { + // nop + } + } + } + } + } + + parent.OnClosed(); + return 0; +} + + /* ------------------ */ +NXFileObject_ProgressiveDownloader::NXFileObject_ProgressiveDownloader() : download(progress_tracker, *this) +{ + progressive_file_read=0; + end_of_file=false; + connected=false; + error_code=NErr_Success; + closed = false; + need_seek=false; + position=0; +} + + + +NXFileObject_ProgressiveDownloader::~NXFileObject_ProgressiveDownloader() +{ + download.Close(); + while (!closed) + Wait(10); + if (progressive_file_read) + fclose(progressive_file_read); +} + +void NXFileObject_ProgressiveDownloader::OnConnected() +{ + nu::message_node_t *message = message_loop.AllocateMessage(); + message->message = MESSAGE_CONNECTED; + message_loop.PostMessage(message); +} + +void NXFileObject_ProgressiveDownloader::OnError(int error_code) +{ + error_message_t *message = (error_message_t *)message_loop.AllocateMessage(); + message->message = MESSAGE_ERROR; + message->error_code = error_code; + message_loop.PostMessage(message); +} + +void NXFileObject_ProgressiveDownloader::OnFileSize(uint64_t size) +{ + size_message_t *message = (size_message_t *)message_loop.AllocateMessage(); + message->message = MESSAGE_SIZE; + message->size = size; + message_loop.PostMessage(message); +} + +void NXFileObject_ProgressiveDownloader::OnClosed() +{ + nu::message_node_t *message = message_loop.AllocateMessage(); + message->message = MESSAGE_CLOSED; + message_loop.PostMessage(message); +} + +ns_error_t NXFileObject_ProgressiveDownloader::Initialize(nx_uri_t uri, jnl_http_t http, const char *user_agent) +{ + ReferenceCountedNXURI temp_uri; + NXURICreateTemp(&temp_uri); + ns_error_t ret = download.Initialize(uri, http, user_agent, temp_uri); + if (ret != NErr_Success) + { + closed=true; + return ret; + } + + progressive_file_read = NXFile_fopen(temp_uri, nx_file_FILE_read_binary); + + for (;;) + { + Wait(10); + if (error_code != NErr_Success) + return error_code; + + if (connected) + break; + } + return NErr_Success; +} + +void NXFileObject_ProgressiveDownloader::ProcessMessage(nu::message_node_t *message) +{ + switch(message->message) + { + case MESSAGE_ERROR: + { + error_message_t *seek_message = (error_message_t *)message; + error_code = seek_message->error_code; + } + break; + case MESSAGE_CONNECTED: + connected = true; + break; + case MESSAGE_SIZE: + { + size_message_t *seek_message = (size_message_t *)message; + region.end = seek_message->size; + } + break; + case MESSAGE_CLOSED: + closed=true; + break; + } + + message_loop.FreeMessage(message); +} + +void NXFileObject_ProgressiveDownloader::Wait(unsigned int milliseconds) +{ + for (;;) + { + nu::message_node_t *message = message_loop.PeekMessage(milliseconds); + if (message) + ProcessMessage(message); + else + break; + } + + nu::message_node_t *message = message_loop.PeekMessage(milliseconds); + if (message) + ProcessMessage(message); +} + +bool NXFileObject_ProgressiveDownloader::WaitForRead(uint64_t size) +{ + if (progress_tracker.Valid(position, position+size)) + return true; + + if (need_seek) + { + // give it just a little bit of time to avoid constant reseeks when the download thread is just barely keeping up + Wait(10); + if (progress_tracker.Valid(position, position+size)) + return true; + + connected=false; + error_code=NErr_Success; + download.Seek(position, (uint64_t)position+size); + + for (;;) + { + Wait(10); + if (error_code != NErr_Success) + return false; + + if (connected) + break; + } + } + + while (!progress_tracker.Valid(position, position+size)) + { + Wait(10); + } + + return true; +} + + +ns_error_t NXFileObject_ProgressiveDownloader::Read(void *buffer, size_t bytes_requested, size_t *bytes_read) +{ + if (end_of_file || position >= (region.end - region.start)) + return NErr_EndOfFile; + + // don't allow a read past the end of the file as this will confuse progress_tracker (which doesn't know/care about the file length) + if ((position + bytes_requested) > region.end) + bytes_requested = (size_t)(region.end - position); + + if (WaitForRead((uint64_t)bytes_requested) == false) + { + *bytes_read = 0; + return error_code; + } + + if (need_seek) + { + _fseeki64(progressive_file_read, position, SEEK_SET); + need_seek=false; + } + + /* TODO: benski> if r < bytes_requested, then we need to flush the buffer. + on windows, we can use fflush(progressive_file_read) + on other platforms it's not guaranteed! */ + size_t r = fread(buffer, 1, bytes_requested, progressive_file_read); + this->position += r; + *bytes_read = r; + return NErr_Success; +} + +ns_error_t NXFileObject_ProgressiveDownloader::Seek(uint64_t new_position) +{ + if (new_position >= (region.end - region.start)) + { + this->position = region.end - region.start; + end_of_file=true; + } + else + { + if (new_position == position) + return NErr_Success; + position = new_position; + need_seek=true; + + end_of_file=false; + } + return NErr_Success; +} + +ns_error_t NXFileObject_ProgressiveDownloader::Tell(uint64_t *position) +{ + if (end_of_file) + *position = region.end - region.start; + else + *position = this->position - region.start; + return NErr_Success; +} + +ns_error_t NXFileObject_ProgressiveDownloader::PeekByte(uint8_t *byte) +{ + if (position == region.end) + return NErr_EndOfFile; + + // make sure we have enough room + if (WaitForRead((uint64_t)1) == false) + return error_code; + + if (need_seek) + { + _fseeki64(progressive_file_read, position, SEEK_SET); + need_seek=false; + } + + int read_byte = fgetc(progressive_file_read); + if (read_byte != EOF) + ungetc(read_byte, progressive_file_read); + else + { + /* TODO: benski> if we hit the point, then we actually need to flush the buffer. + on some platforms, fflush(progressive_file_read) will do that, but it's not guaranteed! */ + return NErr_EndOfFile; + } + + *byte = (uint8_t)read_byte; + return NErr_Success; +} + +ns_error_t NXFileObject_ProgressiveDownloader::Sync() +{ + return NErr_NotImplemented; +} + +ns_error_t NXFileObject_ProgressiveDownloader::Truncate() +{ + return NErr_NotImplemented; +} + +ns_error_t NXFileObject_ProgressiveDownloader::Write(const void *buffer, size_t bytes) +{ + return NErr_NotImplemented; +} + +bool NXFileObject_ProgressiveDownloader::Available(uint64_t size, uint64_t *available) +{ + uint64_t end = position+size; + if (end > region.end) + end = region.end; + if (position == region.end) + { + if (available) + *available=0; + return true; + } + return progress_tracker.Valid(position, end, available); +} + +ns_error_t NXFileOpenProgressiveDownloader(nx_file_t *out_file, nx_uri_t filename, nx_file_FILE_flags_t flags, jnl_http_t http, const char *user_agent) +{ + NXFileObject_ProgressiveDownloader *file_object = new (std::nothrow) NXFileObject_ProgressiveDownloader; + if (!file_object) + return NErr_OutOfMemory; + + ns_error_t ret = file_object->Initialize(filename, http, user_agent); + if (ret != NErr_Success) + { + delete file_object; + return ret; + } + + *out_file = (nx_file_t)file_object; + return NErr_Success; +} + +ns_error_t NXFileProgressiveDownloaderAvailable(nx_file_t _f, uint64_t size, uint64_t *available) +{ + if (!_f) + return NErr_BadParameter; + + NXFileObject_ProgressiveDownloader *f = (NXFileObject_ProgressiveDownloader *)_f; + if (f->Available(size, available)) + return NErr_True; + else + return NErr_False; +}
\ No newline at end of file |