aboutsummaryrefslogtreecommitdiff
path: root/Minecraft.World/Socket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Minecraft.World/Socket.cpp')
-rw-r--r--Minecraft.World/Socket.cpp534
1 files changed, 534 insertions, 0 deletions
diff --git a/Minecraft.World/Socket.cpp b/Minecraft.World/Socket.cpp
new file mode 100644
index 00000000..4b703403
--- /dev/null
+++ b/Minecraft.World/Socket.cpp
@@ -0,0 +1,534 @@
+#include "stdafx.h"
+#include "InputOutputStream.h"
+#include "SocketAddress.h"
+#include "Socket.h"
+#include "ThreadName.h"
+#include "..\Minecraft.Client\ServerConnection.h"
+#include <algorithm>
+#include "..\Minecraft.Client\PS3\PS3Extras\ShutdownManager.h"
+
+// This current socket implementation is for the creation of a single local link. 2 sockets can be created, one for either end of this local
+// link, the end (0 or 1) is passed as a parameter to the ctor.
+
+CRITICAL_SECTION Socket::s_hostQueueLock[2];
+std::queue<byte> Socket::s_hostQueue[2];
+Socket::SocketOutputStreamLocal *Socket::s_hostOutStream[2];
+Socket::SocketInputStreamLocal *Socket::s_hostInStream[2];
+ServerConnection *Socket::s_serverConnection = NULL;
+
+void Socket::Initialise(ServerConnection *serverConnection)
+{
+ s_serverConnection = serverConnection;
+
+ // Only initialise everything else once - just setting up static data, one time xrnm things, thread for ticking sockets
+ static bool init = false;
+ if( init )
+ {
+ for( int i = 0; i < 2; i++ )
+ {
+ if(TryEnterCriticalSection(&s_hostQueueLock[i]))
+ {
+ // Clear the queue
+ std::queue<byte> empty;
+ std::swap( s_hostQueue[i], empty );
+ LeaveCriticalSection(&s_hostQueueLock[i]);
+ }
+ s_hostOutStream[i]->m_streamOpen = true;
+ s_hostInStream[i]->m_streamOpen = true;
+ }
+ return;
+ }
+ init = true;
+
+ for( int i = 0; i < 2; i++ )
+ {
+ InitializeCriticalSection(&Socket::s_hostQueueLock[i]);
+ s_hostOutStream[i] = new SocketOutputStreamLocal(i);
+ s_hostInStream[i] = new SocketInputStreamLocal(i);
+ }
+}
+
+Socket::Socket(bool response)
+{
+ m_hostServerConnection = true;
+ m_hostLocal = true;
+ if( response )
+ {
+ m_end = SOCKET_SERVER_END;
+ }
+ else
+ {
+ m_end = SOCKET_CLIENT_END;
+ Socket *socket = new Socket(1);
+ s_serverConnection->NewIncomingSocket(socket);
+ }
+
+ for( int i = 0; i < 2; i++ )
+ {
+ m_endClosed[i] = false;
+ }
+ m_socketClosedEvent = NULL;
+ createdOk = true;
+ networkPlayerSmallId = g_NetworkManager.GetHostPlayer()->GetSmallId();
+}
+
+Socket::Socket(INetworkPlayer *player, bool response /* = false*/, bool hostLocal /*= false*/)
+{
+ m_hostServerConnection = false;
+ m_hostLocal = hostLocal;
+
+ for( int i = 0; i < 2; i++ )
+ {
+ InitializeCriticalSection(&m_queueLockNetwork[i]);
+ m_inputStream[i] = NULL;
+ m_outputStream[i] = NULL;
+ m_endClosed[i] = false;
+ }
+
+ if(!response || hostLocal)
+ {
+ m_inputStream[0] = new SocketInputStreamNetwork(this,0);
+ m_outputStream[0] = new SocketOutputStreamNetwork(this,0);
+ m_end = SOCKET_CLIENT_END;
+ }
+ if(response || hostLocal)
+ {
+ m_inputStream[1] = new SocketInputStreamNetwork(this,1);
+ m_outputStream[1] = new SocketOutputStreamNetwork(this,1);
+ m_end = SOCKET_SERVER_END;
+ }
+ m_socketClosedEvent = new C4JThread::Event;
+ //printf("New socket made %s\n", player->GetGamertag() );
+ networkPlayerSmallId = player->GetSmallId();
+ createdOk = true;
+}
+
+SocketAddress *Socket::getRemoteSocketAddress()
+{
+ return NULL;
+}
+
+INetworkPlayer *Socket::getPlayer()
+{
+ return g_NetworkManager.GetPlayerBySmallId(networkPlayerSmallId);
+}
+
+void Socket::setPlayer(INetworkPlayer *player)
+{
+ if(player!=NULL)
+ {
+ networkPlayerSmallId = player->GetSmallId();
+ }
+ else
+ {
+ networkPlayerSmallId = 0;
+ }
+}
+
+void Socket::pushDataToQueue(const BYTE * pbData, DWORD dwDataSize, bool fromHost /*= true*/)
+{
+ int queueIdx = SOCKET_CLIENT_END;
+ if(!fromHost)
+ queueIdx = SOCKET_SERVER_END;
+
+ if( queueIdx != m_end && !m_hostLocal )
+ {
+ app.DebugPrintf("SOCKET: Error pushing data to queue. End is %d but queue idx id %d\n", m_end, queueIdx);
+ return;
+ }
+
+ EnterCriticalSection(&m_queueLockNetwork[queueIdx]);
+ for( unsigned int i = 0; i < dwDataSize; i++ )
+ {
+ m_queueNetwork[queueIdx].push(*pbData++);
+ }
+ LeaveCriticalSection(&m_queueLockNetwork[queueIdx]);
+}
+
+void Socket::addIncomingSocket(Socket *socket)
+{
+ if( s_serverConnection != NULL )
+ {
+ s_serverConnection->NewIncomingSocket(socket);
+ }
+}
+
+InputStream *Socket::getInputStream(bool isServerConnection)
+{
+ if( !m_hostServerConnection )
+ {
+ if( m_hostLocal )
+ {
+ if( isServerConnection )
+ {
+ return m_inputStream[SOCKET_SERVER_END];
+ }
+ else
+ {
+ return m_inputStream[SOCKET_CLIENT_END];
+ }
+ }
+ else
+ {
+ return m_inputStream[m_end];
+ }
+ }
+ else
+ {
+ return s_hostInStream[m_end];
+ }
+}
+
+void Socket::setSoTimeout(int a )
+{
+}
+
+void Socket::setTrafficClass( int a )
+{
+}
+
+Socket::SocketOutputStream *Socket::getOutputStream(bool isServerConnection)
+{
+ if( !m_hostServerConnection )
+ {
+ if( m_hostLocal )
+ {
+ if( isServerConnection )
+ {
+ return m_outputStream[SOCKET_SERVER_END];
+ }
+ else
+ {
+ return m_outputStream[SOCKET_CLIENT_END];
+ }
+ }
+ else
+ {
+ return m_outputStream[m_end];
+ }
+ }
+ else
+ {
+ return s_hostOutStream[ 1 - m_end ];
+ }
+}
+
+bool Socket::close(bool isServerConnection)
+{
+ bool allClosed = false;
+ if( m_hostLocal )
+ {
+ if( isServerConnection )
+ {
+ m_endClosed[SOCKET_SERVER_END] = true;
+ if(m_endClosed[SOCKET_CLIENT_END])
+ {
+ allClosed = true;
+ }
+ }
+ else
+ {
+ m_endClosed[SOCKET_CLIENT_END] = true;
+ if(m_endClosed[SOCKET_SERVER_END])
+ {
+ allClosed = true;
+ }
+ }
+ }
+ else
+ {
+ allClosed = true;
+ m_endClosed[m_end] = true;
+ }
+ if( allClosed && m_socketClosedEvent != NULL )
+ {
+ m_socketClosedEvent->Set();
+ }
+ if(allClosed) createdOk = false;
+ return allClosed;
+}
+
+/////////////////////////////////// Socket for input, on local connection ////////////////////
+
+Socket::SocketInputStreamLocal::SocketInputStreamLocal(int queueIdx)
+{
+ m_streamOpen = true;
+ m_queueIdx = queueIdx;
+}
+
+// Try and get an input byte, blocking until one is available
+int Socket::SocketInputStreamLocal::read()
+{
+ while(m_streamOpen && ShutdownManager::ShouldRun(ShutdownManager::eConnectionReadThreads))
+ {
+ if(TryEnterCriticalSection(&s_hostQueueLock[m_queueIdx]))
+ {
+ if( s_hostQueue[m_queueIdx].size() )
+ {
+ byte retval = s_hostQueue[m_queueIdx].front();
+ s_hostQueue[m_queueIdx].pop();
+ LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]);
+ return retval;
+ }
+ LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]);
+ }
+ Sleep(1);
+ }
+ return -1;
+}
+
+// Try and get an input array of bytes, blocking until enough bytes are available
+int Socket::SocketInputStreamLocal::read(byteArray b)
+{
+ return read(b, 0, b.length);
+}
+
+// Try and get an input range of bytes, blocking until enough bytes are available
+int Socket::SocketInputStreamLocal::read(byteArray b, unsigned int offset, unsigned int length)
+{
+ while(m_streamOpen)
+ {
+ if(TryEnterCriticalSection(&s_hostQueueLock[m_queueIdx]))
+ {
+ if( s_hostQueue[m_queueIdx].size() >= length )
+ {
+ for( unsigned int i = 0; i < length; i++ )
+ {
+ b[i+offset] = s_hostQueue[m_queueIdx].front();
+ s_hostQueue[m_queueIdx].pop();
+ }
+ LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]);
+ return length;
+ }
+ LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]);
+ }
+ Sleep(1);
+ }
+ return -1;
+}
+
+void Socket::SocketInputStreamLocal::close()
+{
+ m_streamOpen = false;
+ EnterCriticalSection(&s_hostQueueLock[m_queueIdx]);
+ s_hostQueue[m_queueIdx].empty();
+ LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]);
+}
+
+/////////////////////////////////// Socket for output, on local connection ////////////////////
+
+Socket::SocketOutputStreamLocal::SocketOutputStreamLocal(int queueIdx)
+{
+ m_streamOpen = true;
+ m_queueIdx = queueIdx;
+}
+
+void Socket::SocketOutputStreamLocal::write(unsigned int b)
+{
+ if( m_streamOpen != true )
+ {
+ return;
+ }
+ EnterCriticalSection(&s_hostQueueLock[m_queueIdx]);
+ s_hostQueue[m_queueIdx].push((byte)b);
+ LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]);
+}
+
+void Socket::SocketOutputStreamLocal::write(byteArray b)
+{
+ write(b, 0, b.length);
+}
+
+void Socket::SocketOutputStreamLocal::write(byteArray b, unsigned int offset, unsigned int length)
+{
+ if( m_streamOpen != true )
+ {
+ return;
+ }
+ MemSect(12);
+ EnterCriticalSection(&s_hostQueueLock[m_queueIdx]);
+ for( unsigned int i = 0; i < length; i++ )
+ {
+ s_hostQueue[m_queueIdx].push(b[offset+i]);
+ }
+ LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]);
+ MemSect(0);
+}
+
+void Socket::SocketOutputStreamLocal::close()
+{
+ m_streamOpen = false;
+ EnterCriticalSection(&s_hostQueueLock[m_queueIdx]);
+ s_hostQueue[m_queueIdx].empty();
+ LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]);
+}
+
+/////////////////////////////////// Socket for input, on network connection ////////////////////
+
+Socket::SocketInputStreamNetwork::SocketInputStreamNetwork(Socket *socket, int queueIdx)
+{
+ m_streamOpen = true;
+ m_queueIdx = queueIdx;
+ m_socket = socket;
+}
+
+// Try and get an input byte, blocking until one is available
+int Socket::SocketInputStreamNetwork::read()
+{
+ while(m_streamOpen && ShutdownManager::ShouldRun(ShutdownManager::eConnectionReadThreads))
+ {
+ if(TryEnterCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx]))
+ {
+ if( m_socket->m_queueNetwork[m_queueIdx].size() )
+ {
+ byte retval = m_socket->m_queueNetwork[m_queueIdx].front();
+ m_socket->m_queueNetwork[m_queueIdx].pop();
+ LeaveCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx]);
+ return retval;
+ }
+ LeaveCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx]);
+ }
+ Sleep(1);
+ }
+ return -1;
+}
+
+// Try and get an input array of bytes, blocking until enough bytes are available
+int Socket::SocketInputStreamNetwork::read(byteArray b)
+{
+ return read(b, 0, b.length);
+}
+
+// Try and get an input range of bytes, blocking until enough bytes are available
+int Socket::SocketInputStreamNetwork::read(byteArray b, unsigned int offset, unsigned int length)
+{
+ while(m_streamOpen)
+ {
+ if(TryEnterCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx]))
+ {
+ if( m_socket->m_queueNetwork[m_queueIdx].size() >= length )
+ {
+ for( unsigned int i = 0; i < length; i++ )
+ {
+ b[i+offset] = m_socket->m_queueNetwork[m_queueIdx].front();
+ m_socket->m_queueNetwork[m_queueIdx].pop();
+ }
+ LeaveCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx]);
+ return length;
+ }
+ LeaveCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx]);
+ }
+ Sleep(1);
+ }
+ return -1;
+}
+
+void Socket::SocketInputStreamNetwork::close()
+{
+ m_streamOpen = false;
+}
+
+/////////////////////////////////// Socket for output, on network connection ////////////////////
+
+Socket::SocketOutputStreamNetwork::SocketOutputStreamNetwork(Socket *socket, int queueIdx)
+{
+ m_queueIdx = queueIdx;
+ m_socket = socket;
+ m_streamOpen = true;
+}
+
+void Socket::SocketOutputStreamNetwork::write(unsigned int b)
+{
+ if( m_streamOpen != true ) return;
+ byteArray barray;
+ byte bb;
+ bb = (byte)b;
+ barray.data = &bb;
+ barray.length = 1;
+ write(barray, 0, 1);
+
+}
+
+void Socket::SocketOutputStreamNetwork::write(byteArray b)
+{
+ write(b, 0, b.length);
+}
+
+void Socket::SocketOutputStreamNetwork::write(byteArray b, unsigned int offset, unsigned int length)
+{
+ writeWithFlags(b, offset, length, 0);
+}
+
+void Socket::SocketOutputStreamNetwork::writeWithFlags(byteArray b, unsigned int offset, unsigned int length, int flags)
+{
+ if( m_streamOpen != true ) return;
+ if( length == 0 ) return;
+
+ // If this is a local connection, don't bother going through QNet as it just delivers it straight anyway
+ if( m_socket->m_hostLocal )
+ {
+ // We want to write to the queue for the other end of this socket stream
+ int queueIdx = m_queueIdx;
+ if(queueIdx == SOCKET_CLIENT_END)
+ queueIdx = SOCKET_SERVER_END;
+ else
+ queueIdx = SOCKET_CLIENT_END;
+
+ EnterCriticalSection(&m_socket->m_queueLockNetwork[queueIdx]);
+ for( unsigned int i = 0; i < length; i++ )
+ {
+ m_socket->m_queueNetwork[queueIdx].push(b[offset+i]);
+ }
+ LeaveCriticalSection(&m_socket->m_queueLockNetwork[queueIdx]);
+ }
+ else
+ {
+ XRNM_SEND_BUFFER buffer;
+ buffer.pbyData = &b[offset];
+ buffer.dwDataSize = length;
+
+ INetworkPlayer *hostPlayer = g_NetworkManager.GetHostPlayer();
+ if(hostPlayer == NULL)
+ {
+ app.DebugPrintf("Trying to write to network, but the hostPlayer is NULL\n");
+ return;
+ }
+ INetworkPlayer *socketPlayer = m_socket->getPlayer();
+ if(socketPlayer == NULL)
+ {
+ app.DebugPrintf("Trying to write to network, but the socketPlayer is NULL\n");
+ return;
+ }
+
+ if( m_queueIdx == SOCKET_SERVER_END )
+ {
+ //printf( "Sent %u bytes of data from \"%ls\" to \"%ls\"\n",
+ //buffer.dwDataSize,
+ //hostPlayer->GetGamertag(),
+ //m_socket->networkPlayer->GetGamertag());
+
+ hostPlayer->SendData(socketPlayer, buffer.pbyData, buffer.dwDataSize, QNET_SENDDATA_RELIABLE | QNET_SENDDATA_SEQUENTIAL | flags);
+
+ // DWORD queueSize = hostPlayer->GetSendQueueSize( NULL, QNET_GETSENDQUEUESIZE_BYTES );
+ // if( queueSize > 24000 )
+ // {
+ // //printf("Queue size is: %d, forcing doWork()\n",queueSize);
+ // g_NetworkManager.DoWork();
+ // }
+ }
+ else
+ {
+ //printf( "Sent %u bytes of data from \"%ls\" to \"%ls\"\n",
+ //buffer.dwDataSize,
+ //m_socket->networkPlayer->GetGamertag(),
+ //hostPlayer->GetGamertag());
+
+ socketPlayer->SendData(hostPlayer, buffer.pbyData, buffer.dwDataSize, QNET_SENDDATA_RELIABLE | QNET_SENDDATA_SEQUENTIAL | flags);
+ }
+ }
+}
+
+void Socket::SocketOutputStreamNetwork::close()
+{
+ m_streamOpen = false;
+} \ No newline at end of file