diff options
Diffstat (limited to 'Minecraft.Client/Common/Network/Sony/SQRNetworkPlayer.cpp')
| -rw-r--r-- | Minecraft.Client/Common/Network/Sony/SQRNetworkPlayer.cpp | 258 |
1 files changed, 235 insertions, 23 deletions
diff --git a/Minecraft.Client/Common/Network/Sony/SQRNetworkPlayer.cpp b/Minecraft.Client/Common/Network/Sony/SQRNetworkPlayer.cpp index e244b34c..a040b28b 100644 --- a/Minecraft.Client/Common/Network/Sony/SQRNetworkPlayer.cpp +++ b/Minecraft.Client/Common/Network/Sony/SQRNetworkPlayer.cpp @@ -16,6 +16,15 @@ #endif +//#define PRINT_ACK_STATS + +#ifdef __PS3__ +static const int sc_wouldBlockFlag = CELL_RUDP_ERROR_WOULDBLOCK; +#else // __ORBIS__ +static const int sc_wouldBlockFlag = SCE_RUDP_ERROR_WOULDBLOCK; +#endif + + static const bool sc_verbose = false; @@ -84,6 +93,8 @@ SQRNetworkPlayer::SQRNetworkPlayer(SQRNetworkManager *manager, eSQRNetworkPlayer m_host = onHost; m_manager = manager; m_customData = 0; + m_acksOutstanding = 0; + m_totalBytesInSendQueue = 0; if( pUID ) { memcpy(&m_ISD.m_UID,pUID,sizeof(PlayerUID)); @@ -102,6 +113,7 @@ SQRNetworkPlayer::SQRNetworkPlayer(SQRNetworkManager *manager, eSQRNetworkPlayer } SetNameFromUID(); InitializeCriticalSection(&m_csQueue); + InitializeCriticalSection(&m_csAcks); #ifdef __ORBIS__ if(IsLocal()) { @@ -109,6 +121,14 @@ SQRNetworkPlayer::SQRNetworkPlayer(SQRNetworkManager *manager, eSQRNetworkPlayer } #endif +#ifndef _CONTENT_PACKAGE + m_minAckTime = INT_MAX; + m_maxAckTime = 0; + m_totalAcks = 0; + m_totalAckTime = 0; + m_averageAckTime = 0; +#endif + } SQRNetworkPlayer::~SQRNetworkPlayer() @@ -190,13 +210,8 @@ bool SQRNetworkPlayer::HasSmallIdConfirmed() // To confirm to the host that we are ready, send a single byte with our small id. void SQRNetworkPlayer::ConfirmReady() { -#ifdef __PS3__ - int ret = cellRudpWrite( m_rudpCtx, &m_ISD, sizeof(InitSendData), CELL_RUDP_MSG_LATENCY_CRITICAL ); -#else //__ORBIS__ - int ret = sceRudpWrite( m_rudpCtx, &m_ISD, sizeof(InitSendData), SCE_RUDP_MSG_LATENCY_CRITICAL ); -#endif - // TODO - error handling here? - assert ( ret == sizeof(InitSendData) ); + SendInternal(&m_ISD, sizeof(InitSendData), e_flag_AckNotRequested); + // Final flag for a local player on the client, as we are now safe to send data on to the host m_host ? app.DebugPrintf(sc_verbose, "host : ") : app.DebugPrintf(sc_verbose, "client:"); app.DebugPrintf(sc_verbose, ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Small ID confirmed\n"); @@ -205,8 +220,9 @@ void SQRNetworkPlayer::ConfirmReady() // Attempt to send data, of any size, from this player to that specified by pPlayerTarget. This may not be possible depending on the two players, due to // our star shaped network connectivity. Data may be any size, and is copied so on returning from this method it does not need to be preserved. -void SQRNetworkPlayer::SendData( SQRNetworkPlayer *pPlayerTarget, const void *data, unsigned int dataSize ) +void SQRNetworkPlayer::SendData( SQRNetworkPlayer *pPlayerTarget, const void *data, unsigned int dataSize, bool ack ) { + AckFlags ackFlags = ack ? e_flag_AckRequested : e_flag_AckNotRequested; // Our network is connected as a star. If we are the host, then we can send to any remote player. If we're a client, we can send only to the host. // The host can also send to other local players, but this doesn't need to go through Rudp. if( m_host ) @@ -224,7 +240,7 @@ void SQRNetworkPlayer::SendData( SQRNetworkPlayer *pPlayerTarget, const void *da else if( ( m_type == SNP_TYPE_HOST ) && ( pPlayerTarget->m_type == SNP_TYPE_REMOTE ) ) { // Rudp communication from host to remote player - handled by remote player instance - pPlayerTarget->SendInternal(data,dataSize); + pPlayerTarget->SendInternal(data,dataSize, ackFlags); } else { @@ -237,7 +253,7 @@ void SQRNetworkPlayer::SendData( SQRNetworkPlayer *pPlayerTarget, const void *da if( ( m_type == SNP_TYPE_LOCAL ) && ( pPlayerTarget->m_type == SNP_TYPE_HOST ) ) { // Rudp communication from client to host - handled by this player instace - SendInternal(data, dataSize); + SendInternal(data, dataSize, ackFlags); } else { @@ -250,15 +266,30 @@ void SQRNetworkPlayer::SendData( SQRNetworkPlayer *pPlayerTarget, const void *da // Internal send function - to simplify the number of mechanisms we have for sending data, this method just adds the data to be send to the player's internal queue, // and then calls SendMoreInternal. This method can take any size of data, which it will split up into payload size chunks before sending. All input data is copied // into internal buffers. -void SQRNetworkPlayer::SendInternal(const void *data, unsigned int dataSize) +void SQRNetworkPlayer::SendInternal(const void *data, unsigned int dataSize, AckFlags ackFlags) { EnterCriticalSection(&m_csQueue); - + bool bOutstandingPackets = (m_sendQueue.size() > 0); // check if there are still packets in the queue, we won't be calling SendMoreInternal here if there are QueuedSendBlock sendBlock; unsigned char *dataCurrent = (unsigned char *)data; unsigned int dataRemaining = dataSize; + if(ackFlags == e_flag_AckReturning) + { + // no data, just the flag + assert(dataSize == 0); + assert(data == NULL); + int dataSize = dataRemaining; + if( dataSize > SNP_MAX_PAYLOAD ) dataSize = SNP_MAX_PAYLOAD; + sendBlock.start = NULL; + sendBlock.end = NULL; + sendBlock.current = NULL; + sendBlock.ack = ackFlags; + m_sendQueue.push(sendBlock); + } + else + { while( dataRemaining ) { int dataSize = dataRemaining; @@ -266,19 +297,203 @@ void SQRNetworkPlayer::SendInternal(const void *data, unsigned int dataSize) sendBlock.start = new unsigned char [dataSize]; sendBlock.end = sendBlock.start + dataSize; sendBlock.current = sendBlock.start; + sendBlock.ack = ackFlags; memcpy( sendBlock.start, dataCurrent, dataSize); m_sendQueue.push(sendBlock); dataRemaining -= dataSize; dataCurrent += dataSize; } - // Now try and send as much as we can - SendMoreInternal(); + } + m_totalBytesInSendQueue += dataSize; + + // if the queue had something in it already, then the UDP callback will fire and call SendMoreInternal + // so we don't call it here, to avoid a deadlock + if(!bOutstandingPackets) + { + // Now try and send as much as we can + SendMoreInternal(); + } LeaveCriticalSection(&m_csQueue); } +int SQRNetworkPlayer::WriteDataPacket(const void* data, int dataSize, AckFlags ackFlags) + { + DataPacketHeader header(dataSize, ackFlags); + int headerSize = sizeof(header); + int packetSize = dataSize+headerSize; + unsigned char* packetData = new unsigned char[packetSize]; + *((DataPacketHeader*)packetData) = header; + memcpy(&packetData[headerSize], data, dataSize); + +#ifndef _CONTENT_PACKAGE + if(ackFlags == e_flag_AckRequested) + m_ackStats.push_back(System::currentTimeMillis()); +#endif + +#ifdef __PS3__ + int ret = cellRudpWrite( m_rudpCtx, packetData, packetSize, 0);//CELL_RUDP_MSG_LATENCY_CRITICAL ); +#else // __ORBIS__ && __PSVITA__ + int ret = sceRudpWrite( m_rudpCtx, packetData, packetSize, 0);//SCE_RUDP_MSG_LATENCY_CRITICAL ); +#endif + if(ret == sc_wouldBlockFlag) + { + // nothing was sent! + } + else + { + assert(ret==packetSize || ret > headerSize); // we must make sure we've sent the entire packet or the header and some data at least + ret -= headerSize; + if(ackFlags == e_flag_AckRequested) + { + EnterCriticalSection(&m_csAcks); + m_acksOutstanding++; + LeaveCriticalSection(&m_csAcks); + } + } + delete packetData; + + return ret; +} + +int SQRNetworkPlayer::GetPacketDataSize() +{ + unsigned int ackFlag; + int headerSize = sizeof(ackFlag); +#ifdef __PS3__ + unsigned int packetSize = cellRudpGetSizeReadable(m_rudpCtx); +#else + unsigned int packetSize = sceRudpGetSizeReadable(m_rudpCtx); +#endif + if(packetSize == 0) + return 0; + + unsigned int dataSize = packetSize - headerSize; + assert(dataSize >= 0); + if(dataSize == 0) + { + // header only, must just be an ack returning + ReadAck(); + } + return dataSize; +} + +int SQRNetworkPlayer::ReadDataPacket(void* data, int dataSize) +{ + int headerSize = sizeof(DataPacketHeader); + int packetSize = dataSize+headerSize; + + unsigned char* packetData = new unsigned char[packetSize]; +#ifdef __PS3__ + int bytesRead = cellRudpRead( m_rudpCtx, packetData, packetSize, 0, NULL ); +#else // __ORBIS__ && __PSVITA__ + int bytesRead = sceRudpRead( m_rudpCtx, packetData, packetSize, 0, NULL ); +#endif + if(bytesRead == sc_wouldBlockFlag) + { + delete packetData; + return 0; + } + // check the header, and see if we need to send back an ack + DataPacketHeader header = *((DataPacketHeader*)packetData); + if(header.GetAckFlags() == e_flag_AckRequested) + { + // Don't send the ack back directly from here, as this is called from a rudp event callback, and we end up in a thread lock situation between the lock librudp uses + // internally (which is locked already here since we are being called in the event handler), and our own lock that we do for processing our write queue + m_manager->RequestWriteAck(GetSmallId()); + } + else + { + assert(header.GetAckFlags() == e_flag_AckNotRequested); + } + if(bytesRead > 0) + { + bytesRead -= headerSize; + memcpy(data, &packetData[headerSize], bytesRead); + } + assert(header.GetDataSize() == bytesRead); + + delete packetData; + + return bytesRead; +} + + + +void SQRNetworkPlayer::ReadAck() +{ + DataPacketHeader header; +#ifdef __PS3__ + int bytesRead = cellRudpRead( m_rudpCtx, &header, sizeof(header), 0, NULL ); +#else // __ORBIS__ && __PSVITA__ + int bytesRead = sceRudpRead( m_rudpCtx, &header, sizeof(header), 0, NULL ); +#endif + if(bytesRead == sc_wouldBlockFlag) + { + return; + } + + assert(header.GetAckFlags() == e_flag_AckReturning); + EnterCriticalSection(&m_csAcks); + m_acksOutstanding--; + assert(m_acksOutstanding >=0); + LeaveCriticalSection(&m_csAcks); + +#ifndef _CONTENT_PACKAGE +#ifdef PRINT_ACK_STATS + __int64 timeTaken = System::currentTimeMillis() - m_ackStats[0]; + if(timeTaken < m_minAckTime) + m_minAckTime = timeTaken; + if(timeTaken > m_maxAckTime) + m_maxAckTime = timeTaken; + m_totalAcks++; + m_totalAckTime += timeTaken; + m_averageAckTime = m_totalAckTime / m_totalAcks; + app.DebugPrintf("RUDP ctx : %d : Time taken for ack - %4d ms : min - %4d : max %4d : avg %4d\n", m_rudpCtx, timeTaken, m_minAckTime, m_maxAckTime, m_averageAckTime); + m_ackStats.erase(m_ackStats.begin()); +#endif +#endif +} + +void SQRNetworkPlayer::WriteAck() +{ + SendInternal(NULL, 0, e_flag_AckReturning); +} + +int SQRNetworkPlayer::GetOutstandingAckCount() +{ + return m_manager->GetOutstandingAckCount(this); +} + +int SQRNetworkPlayer::GetTotalOutstandingAckCount() +{ + return m_acksOutstanding; +} + +int SQRNetworkPlayer::GetTotalSendQueueBytes() +{ + return m_totalBytesInSendQueue; +} + +int SQRNetworkPlayer::GetTotalSendQueueMessages() +{ + CriticalSectionScopeLock lock(&m_csQueue); + return m_sendQueue.size(); + +} + +int SQRNetworkPlayer::GetSendQueueSizeBytes() +{ + return m_manager->GetSendQueueSizeBytes(); +} + +int SQRNetworkPlayer::GetSendQueueSizeMessages() +{ + return m_manager->GetSendQueueSizeMessages(); +} + // Internal send function. This attempts to send as many elements in the queue as possible until the write function tells us that we can't send any more. This way, @@ -287,6 +502,8 @@ void SQRNetworkPlayer::SendInternal(const void *data, unsigned int dataSize) void SQRNetworkPlayer::SendMoreInternal() { EnterCriticalSection(&m_csQueue); + assert(m_sendQueue.size() > 0); // this should never be called with an empty queue. + bool keepSending; do { @@ -296,17 +513,12 @@ void SQRNetworkPlayer::SendMoreInternal() // Attempt to send the full data in the first element in our queue unsigned char *data= m_sendQueue.front().current; int dataSize = m_sendQueue.front().end - m_sendQueue.front().current; -#ifdef __PS3__ - int ret = cellRudpWrite( m_rudpCtx, data, dataSize, 0);//CELL_RUDP_MSG_LATENCY_CRITICAL ); - int wouldBlockFlag = CELL_RUDP_ERROR_WOULDBLOCK; + int ret = WriteDataPacket(data, dataSize, m_sendQueue.front().ack); -#else // __ORBIS__ - int ret = sceRudpWrite( m_rudpCtx, data, dataSize, 0);//CELL_RUDP_MSG_LATENCY_CRITICAL ); - int wouldBlockFlag = SCE_RUDP_ERROR_WOULDBLOCK; -#endif if( ret == dataSize ) { // Fully sent, remove from queue - will loop in the while loop to see if there's anything else in the queue we could send + m_totalBytesInSendQueue -= ret; delete [] m_sendQueue.front().start; m_sendQueue.pop(); if( m_sendQueue.size() ) @@ -314,15 +526,15 @@ void SQRNetworkPlayer::SendMoreInternal() keepSending = true; } } - else if( ( ret >= 0 ) || ( ret == wouldBlockFlag ) ) + else if( ( ret >= 0 ) || ( ret == sc_wouldBlockFlag ) ) { - // Things left to send - adjust this element in the queue int remainingBytes; if( ret >= 0 ) { // Only ret bytes sent so far + m_totalBytesInSendQueue -= ret; remainingBytes = dataSize - ret; assert(remainingBytes > 0 ); } |
