MythTV  master
bufferedsocketdevice.cpp
Go to the documentation of this file.
1 // Program Name: bufferedsocketdevice.cpp
3 // Created : Oct. 1, 2005
4 //
5 // Purpose :
6 //
7 // Copyright (c) 2005 David Blain <dblain@mythtv.org>
8 //
9 // Licensed under the GPL v2 or later, see COPYING for details
10 //
12 
13 #include <algorithm>
14 #include <chrono> // for milliseconds
15 #include <thread> // for sleep_for
16 
17 #include "mythtimer.h"
18 #include "bufferedsocketdevice.h"
19 #include "upnputil.h"
20 #include "mythlogging.h"
21 
23 //
25 
27 {
28  m_pSocket = new MSocketDevice();
29 
30  m_pSocket->setSocket ( nSocket, MSocketDevice::Stream );
31  m_pSocket->setBlocking ( false );
32  m_pSocket->setAddressReusable( true );
33 
34  struct linger ling = {1, 1};
35 
36  if ( setsockopt(socket(), SOL_SOCKET, SO_LINGER, (const char *)&ling,
37  sizeof(ling)) < 0)
38  LOG(VB_GENERAL, LOG_ERR,
39  "BufferedSocketDevice: setsockopt - SO_LINGER: " + ENO);
40 
41  m_nDestPort = 0;
42 
44  m_nWriteSize = 0;
45  m_nWriteIndex = 0;
47 }
48 
50 //
52 
53 BufferedSocketDevice::BufferedSocketDevice( MSocketDevice *pSocket /* = nullptr*/,
54  bool bTakeOwnership /* = false */ )
55 {
56  m_pSocket = pSocket;
57 
58  m_nDestPort = 0;
59 
61  m_nWriteSize = 0;
62  m_nWriteIndex = 0;
63  m_bHandleSocketDelete= bTakeOwnership;
64 
65 }
66 
68 //
70 
72 {
73  Close();
74 }
75 
77 //
79 
81 {
82  Flush();
83  ReadBytes();
84 
85  m_bufRead.clear();
87 
88  if (m_pSocket != nullptr)
89  {
90  if (m_pSocket->isValid())
91  m_pSocket->close();
92 
94  delete m_pSocket;
95 
96  m_pSocket = nullptr;
97  }
98 
99 }
100 
102 //
104 
105 bool BufferedSocketDevice::Connect( const QHostAddress &addr, quint16 port )
106 {
107  if (m_pSocket == nullptr)
108  return false;
109 
110  return m_pSocket->connect( addr, port );
111 }
112 
114 //
116 
118 {
119  return( m_pSocket );
120 }
121 
123 //
125 
126 void BufferedSocketDevice::SetSocketDevice( MSocketDevice *pSocket )
127 {
128  if ((m_bHandleSocketDelete) && (m_pSocket != nullptr))
129  delete m_pSocket;
130 
131  m_bHandleSocketDelete = false;
132 
133  m_pSocket = pSocket;
134 }
135 
137 //
139 
141  QHostAddress hostAddress, quint16 nPort)
142 {
143  m_DestHostAddress = hostAddress;
144  m_nDestPort = nPort;
145 }
146 
148 //
150 
151 void BufferedSocketDevice::SetReadBufferSize( qulonglong bufSize )
152 {
153  m_nMaxReadBufferSize = bufSize;
154 }
155 
157 //
159 
161 {
162  return m_nMaxReadBufferSize;
163 }
164 
166 //
168 
170 {
171  if (m_pSocket == nullptr)
172  return m_bufRead.size();
173 
174  qlonglong maxToRead = 0;
175 
176  if ( m_nMaxReadBufferSize > 0 )
177  {
178  maxToRead = m_nMaxReadBufferSize - m_bufRead.size();
179 
180  if ( maxToRead <= 0 )
181  return m_bufRead.size();
182  }
183 
184  qlonglong nbytes = m_pSocket->bytesAvailable();
185  qlonglong nread;
186 
187  QByteArray *a = nullptr;
188 
189  if ( nbytes > 0 )
190  {
191  a = new QByteArray();
192  a->resize(nbytes);
193 
194  nread = m_pSocket->readBlock(
195  a->data(), maxToRead ? std::min(nbytes, maxToRead) : nbytes);
196 
197  if (( nread > 0 ) && ( nread != a->size() ))
198  {
199  // unexpected
200  a->resize( nread );
201  }
202  }
203 
204  if (a)
205  {
206 #if 0
207  QString msg;
208  for( long n = 0; n < a->count(); n++ )
209  msg += QString("%1").arg(a->at(n));
210  LOG(VB_GENERAL, LOG_DEBUG, msg);
211 #endif
212 
213  m_bufRead.append( a );
214  }
215 
216  return m_bufRead.size();
217 }
218 
220 //
222 
223 bool BufferedSocketDevice::ConsumeWriteBuf( qulonglong nbytes )
224 {
225  if ( !nbytes || ((qlonglong)nbytes > m_nWriteSize) )
226  return false;
227 
228  m_nWriteSize -= nbytes;
229 
230  for ( ;; )
231  {
232  QByteArray *a = m_bufWrite.front();
233 
234  if ( m_nWriteIndex + nbytes >= (qulonglong)a->size() )
235  {
236  nbytes -= a->size() - m_nWriteIndex;
237  m_bufWrite.pop_front();
238  delete a;
239 
240  m_nWriteIndex = 0;
241 
242  if ( nbytes == 0 )
243  break;
244  }
245  else
246  {
247  m_nWriteIndex += nbytes;
248  break;
249  }
250  }
251 
252  return true;
253 }
254 
256 //
258 
260 {
261 
262  if ((m_pSocket == nullptr) || !m_pSocket->isValid())
263  return;
264 
265  bool osBufferFull = false;
266  //int consumed = 0;
267 
268  while ( !osBufferFull && ( m_nWriteSize > 0 ) && m_pSocket->isValid())
269  {
270  deque<QByteArray*>::iterator it = m_bufWrite.begin();
271  QByteArray *a = *it;
272 
273  int nwritten = 0;
274  int i = 0;
275 
276  if ( a->size() - m_nWriteIndex < 1460 )
277  {
278  QByteArray out;
279  out.resize(65536);
280 
281  int j = m_nWriteIndex;
282  int s = a->size() - j;
283 
284  while ( a && i+s < out.size() )
285  {
286  memcpy( out.data()+i, a->data()+j, s );
287  j = 0;
288  i += s;
289  ++it;
290  a = *it;
291  s = a ? a->size() : 0;
292  }
293 
294  if (m_nDestPort != 0)
295  nwritten = m_pSocket->writeBlock( out.data(), i, m_DestHostAddress, m_nDestPort );
296  else
297  nwritten = m_pSocket->writeBlock( out.data(), i );
298  }
299  else
300  {
301  // Big block, write it immediately
302  i = a->size() - m_nWriteIndex;
303 
304  if (m_nDestPort != 0)
305  nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i, m_DestHostAddress, m_nDestPort );
306  else
307  nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i );
308  }
309 
310  if ( nwritten > 0 )
311  {
312  if ( ConsumeWriteBuf( nwritten ) )
313  {
314  //consumed += nwritten;
315  }
316  }
317 
318  if ( nwritten < i )
319  osBufferFull = true;
320  }
321 }
322 
323 
325 //
327 
329 {
330  return (qlonglong)BytesAvailable();
331 }
332 
334 //
336 
337 qlonglong BufferedSocketDevice::At() const
338 {
339  return( 0 );
340 }
341 
343 //
345 
346 bool BufferedSocketDevice::At( qlonglong index )
347 {
348  ReadBytes();
349 
350  if ( index > m_bufRead.size() )
351  return false;
352 
353  // throw away data 0..index-1
354  m_bufRead.consumeBytes( (qulonglong)index, nullptr );
355 
356  return true;
357 }
358 
360 //
362 
364 {
365  if ( !m_pSocket->isValid() )
366  return true;
367 
368  ReadBytes();
369 
370  return m_bufRead.size() == 0;
371 
372 }
373 
375 //
377 
379 {
380  if ( !m_pSocket->isValid() )
381  return 0;
382 
383  return ReadBytes();
384 }
385 
387 //
389 
391  int msecs, bool *pTimeout /* = nullptr*/ )
392 {
393  bool bTimeout = false;
394 
395  if ( !m_pSocket->isValid() )
396  return 0;
397 
398  qlonglong nBytes = BytesAvailable();
399 
400  if (nBytes == 0)
401  {
402 /*
403  The following code is a possible workaround to the lost request problem
404  I just hate looping too much to put it in. I believe there is something
405  I'm missing that is causing the lost packets... Just need to find it.
406 
407  bTimeout = true;
408  int nCount = 0;
409  int msWait = msecs / 100;
410 
411  while (((nBytes = ReadBytes()) == 0 ) &&
412  (nCount++ < 100 ) &&
413  bTimeout &&
414  m_pSocket->isValid() )
415  {
416  // give up control
417 
418  // should be some multiple of msWait.
419  std::this_thread::sleep_for(std::chrono::milliseconds(1));
420 
421  }
422  }
423 */
424  // -=>TODO: Override the timeout to 1 second... Closes connection sooner
425  // to help recover from lost requests. (hack until better fix found)
426 
427  msecs = 1000;
428 
429  nBytes = m_pSocket->waitForMore( msecs, &bTimeout );
430 
431  if (pTimeout != nullptr)
432  *pTimeout = bTimeout;
433  }
434 
435  return nBytes; // nBytes //m_bufRead.size();
436 }
437 
439 //
441 
443 {
444  return m_nWriteSize;
445 
446 }
447 
449 //
451 
453 {
454  while (!m_bufWrite.empty())
455  {
456  delete m_bufWrite.back();
457  m_bufWrite.pop_back();
458  }
460 }
461 
463 //
465 
467 {
468  m_bufRead.clear();
469 }
470 
472 //
474 
475 qlonglong BufferedSocketDevice::ReadBlock( char *data, qulonglong maxlen )
476 {
477  if ( data == nullptr && maxlen != 0 )
478  return -1;
479 
480  if ( !m_pSocket->isOpen() )
481  return -1;
482 
483  ReadBytes();
484 
485  if ( maxlen >= (qulonglong)m_bufRead.size() )
486  maxlen = m_bufRead.size();
487 
488  m_bufRead.consumeBytes( maxlen, data );
489 
490  return maxlen;
491 
492 }
493 
495 //
497 
498 qlonglong BufferedSocketDevice::WriteBlock( const char *data, qulonglong len )
499 {
500  if ( len == 0 )
501  return 0;
502 
503  QByteArray *a = m_bufWrite.back();
504 
505  bool writeNow = ( (m_nWriteSize + len >= 1400) || (len > 512) );
506 
507  if ( a && (a->size() + len < 128) )
508  {
509  // small buffer, resize
510  int i = a->size();
511 
512  a->resize( i+len );
513  memcpy( a->data()+i, data, len );
514  }
515  else
516  {
517  // append new buffer
518  m_bufWrite.push_back(new QByteArray(data, len));
519  }
520 
521  m_nWriteSize += len;
522 
523  if ( writeNow )
524  Flush();
525 
526  return len;
527 }
528 
530 //
532 
534  const char *data, qulonglong len)
535 {
536  qlonglong nWritten = 0;
537 
538  // must Flush data just in case caller is mixing buffered & un-buffered calls
539 
540  Flush();
541 
542  if (m_nDestPort != 0)
543  nWritten = m_pSocket->writeBlock( data, len, m_DestHostAddress, m_nDestPort );
544  else
545  nWritten = m_pSocket->writeBlock( data, len );
546 
547  return nWritten;
548 }
549 
551 //
553 
555 {
556  if ( m_pSocket->isOpen() )
557  {
558  ReadBytes();
559 
560  if (m_bufRead.size() > 0 )
561  {
562  uchar c;
563 
564  m_bufRead.consumeBytes( 1, (char*)&c );
565 
566  return c;
567  }
568  }
569 
570  return -1;
571 }
572 
574 //
576 
578 {
579  char buf[2];
580 
581  buf[0] = ch;
582 
583  return WriteBlock(buf, 1) == 1 ? ch : -1;
584 }
585 
587 //
589 
591 {
592  return m_bufRead.ungetch( ch );
593 }
594 
596 //
598 
600 {
601  ReadBytes();
602 
603  if (( BytesAvailable() > 0 ) && m_bufRead.scanNewline( nullptr ) )
604  return true;
605 
606  return false;
607 }
608 
610 //
612 
614 {
615  QByteArray a;
616  a.resize(256);
617 
618  ReadBytes();
619 
620  bool nl = m_bufRead.scanNewline( &a );
621 
622  QString s;
623 
624  if ( nl )
625  {
626  At( a.size() ); // skips the data read
627 
628  s = QString( a );
629  }
630 
631  return s;
632 }
633 
635 //
637 
638 QString BufferedSocketDevice::ReadLine( int msecs )
639 {
640  MythTimer timer;
641  QString sLine;
642 
643  if ( CanReadLine() )
644  return( ReadLine() );
645 
646  // ----------------------------------------------------------------------
647  // If the user supplied a timeout, lets loop until we can read a line
648  // or timeout.
649  // ----------------------------------------------------------------------
650 
651  if ( msecs > 0)
652  {
653  bool bTimeout = false;
654 
655  timer.start();
656 
657  while ( !CanReadLine() && !bTimeout )
658  {
659 #if 0
660  LOG(VB_HTTP, LOG_DEBUG, "Can't Read Line... Waiting for more." );
661 #endif
662 
663  WaitForMore( msecs, &bTimeout );
664 
665  if ( timer.elapsed() >= msecs )
666  {
667  bTimeout = true;
668  LOG(VB_HTTP, LOG_INFO, "Exceeded Total Elapsed Wait Time." );
669  }
670  }
671 
672  if (CanReadLine())
673  sLine = ReadLine();
674  }
675 
676  return( sLine );
677 }
678 
680 //
682 
683 quint16 BufferedSocketDevice::Port(void) const
684 {
685  if (m_pSocket)
686  return( m_pSocket->port() );
687 
688  return 0;
689 }
690 
692 //
694 
696 {
697  if (m_pSocket)
698  return( m_pSocket->peerPort() );
699 
700  return 0;
701 }
702 
704 //
706 
707 QHostAddress BufferedSocketDevice::Address() const
708 {
709  if (m_pSocket)
710  return( m_pSocket->address() );
711 
712  QHostAddress tmp;
713 
714  return tmp;
715 }
716 
718 //
720 
722 {
723  if (m_pSocket)
724  return( m_pSocket->peerAddress() );
725 
726  QHostAddress tmp;
727 
728  return tmp;
729 }
730 
qulonglong BytesToWrite() const
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
qlonglong WriteBlock(const char *data, qulonglong len)
void SetReadBufferSize(qulonglong)
QHostAddress Address() const
bool Connect(const QHostAddress &addr, quint16 port)
static guint32 * tmp
Definition: goom_core.c:35
deque< QByteArray * > m_bufWrite
bool ConsumeWriteBuf(qulonglong nbytes)
qlonglong WriteBlockDirect(const char *data, qulonglong len)
qulonglong WaitForMore(int msecs, bool *timeout=nullptr)
QHostAddress PeerAddress() const
qlonglong ReadBlock(char *data, qulonglong maxlen)
qint64 m_nWriteIndex
write index
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
MSocketDevice * SocketDevice()
void SetSocketDevice(MSocketDevice *pSocket)
int elapsed(void) const
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
qint64 m_nWriteSize
write total buf size
void SetDestAddress(QHostAddress hostAddress, quint16 nPort)
qulonglong ReadBufferSize() const
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47