Não pode escolher mais do que 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

1182 linhas
26 KiB

  1. package client
  2. import (
  3. "errors"
  4. "io"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/yosssi/gmq/mqtt"
  9. "github.com/yosssi/gmq/mqtt/packet"
  10. )
  11. // Minimum and maximum Packet Identifiers
  12. const (
  13. minPacketID uint16 = 1
  14. maxPacketID uint16 = 65535
  15. )
  16. // Error values
  17. var (
  18. ErrAlreadyConnected = errors.New("the Client has already connected to the Server")
  19. ErrNotYetConnected = errors.New("the Client has not yet connected to the Server")
  20. ErrCONNACKTimeout = errors.New("the CONNACK Packet was not received within a reasonalbe amount of time")
  21. ErrPINGRESPTimeout = errors.New("the PINGRESP Packet was not received within a reasonalbe amount of time")
  22. ErrPacketIDExhaused = errors.New("Packet Identifiers are exhausted")
  23. ErrInvalidPINGRESP = errors.New("invalid PINGRESP Packet")
  24. ErrInvalidSUBACK = errors.New("invalid SUBACK Packet")
  25. )
  26. // Client represents a Client.
  27. type Client struct {
  28. // muConn is the Mutex for the Network Connection.
  29. muConn sync.RWMutex
  30. // conn is the Network Connection.
  31. conn *connection
  32. // muSess is the Mutex for the Session.
  33. muSess sync.RWMutex
  34. // sess is the Session.
  35. sess *session
  36. // wg is the Wait Group for the goroutines
  37. // which are launched by the New method.
  38. wg sync.WaitGroup
  39. // disconnc is the channel which handles the signal
  40. // to disconnect the Network Connection.
  41. disconnc chan struct{}
  42. // disconnEndc is the channel which ends the goroutine
  43. // which disconnects the Network Connection.
  44. disconnEndc chan struct{}
  45. // errorHandler is the error handler.
  46. errorHandler ErrorHandler
  47. }
  48. // Connect establishes a Network Connection to the Server and
  49. // sends a CONNECT Packet to the Server.
  50. func (cli *Client) Connect(opts *ConnectOptions) error {
  51. // Lock for the connection.
  52. cli.muConn.Lock()
  53. // Unlock.
  54. defer cli.muConn.Unlock()
  55. // Return an error if the Client has already connected to the Server.
  56. if cli.conn != nil {
  57. return ErrAlreadyConnected
  58. }
  59. // Initialize the options.
  60. if opts == nil {
  61. opts = &ConnectOptions{}
  62. }
  63. // Establish a Network Connection.
  64. conn, err := newConnection(opts.Network, opts.Address, opts.TLSConfig)
  65. if err != nil {
  66. return err
  67. }
  68. // Set the Network Connection to the Client.
  69. cli.conn = conn
  70. // Lock for reading and updating the Session.
  71. cli.muSess.Lock()
  72. // Create a Session or reuse the current Session.
  73. if opts.CleanSession || cli.sess == nil {
  74. // Create a Session and set it to the Client.
  75. cli.sess = newSession(opts.CleanSession, opts.ClientID)
  76. } else {
  77. // Reuse the Session and set its Client Identifier to the options.
  78. opts.ClientID = cli.sess.clientID
  79. }
  80. // Unlock.
  81. cli.muSess.Unlock()
  82. // Send a CONNECT Packet to the Server.
  83. err = cli.sendCONNECT(&packet.CONNECTOptions{
  84. ClientID: opts.ClientID,
  85. UserName: opts.UserName,
  86. Password: opts.Password,
  87. CleanSession: opts.CleanSession,
  88. KeepAlive: opts.KeepAlive,
  89. WillTopic: opts.WillTopic,
  90. WillMessage: opts.WillMessage,
  91. WillQoS: opts.WillQoS,
  92. WillRetain: opts.WillRetain,
  93. })
  94. if err != nil {
  95. // Close the Network Connection.
  96. cli.conn.Close()
  97. // Clean the Network Connection and the Session if necessary.
  98. cli.clean()
  99. return err
  100. }
  101. // Launch a goroutine which waits for receiving the CONNACK Packet.
  102. cli.conn.wg.Add(1)
  103. go cli.waitPacket(cli.conn.connack, opts.CONNACKTimeout, ErrCONNACKTimeout)
  104. // Launch a goroutine which receives a Packet from the Server.
  105. cli.conn.wg.Add(1)
  106. go cli.receivePackets()
  107. // Launch a goroutine which sends a Packet to the Server.
  108. cli.conn.wg.Add(1)
  109. go cli.sendPackets(time.Duration(opts.KeepAlive), opts.PINGRESPTimeout)
  110. // Resend the unacknowledged PUBLISH and PUBREL Packets to the Server
  111. // if the Clean Session is false.
  112. if !opts.CleanSession {
  113. // Lock for reading and updating the Session.
  114. cli.muSess.Lock()
  115. // Unlock.
  116. defer cli.muSess.Unlock()
  117. for id, p := range cli.sess.sendingPackets {
  118. // Extract the MQTT Control MQTT Control Packet type.
  119. ptype, err := p.Type()
  120. if err != nil {
  121. return err
  122. }
  123. switch ptype {
  124. case packet.TypePUBLISH:
  125. // Set the DUP flag of the PUBLISH Packet to true.
  126. p.(*packet.PUBLISH).DUP = true
  127. // Resend the PUBLISH Packet to the Server.
  128. cli.conn.send <- p
  129. case packet.TypePUBREL:
  130. // Resend the PUBREL Packet to the Server.
  131. cli.conn.send <- p
  132. default:
  133. // Delete the Packet from the Session.
  134. delete(cli.sess.sendingPackets, id)
  135. }
  136. }
  137. }
  138. return nil
  139. }
  140. // Disconnect sends a DISCONNECT Packet to the Server and
  141. // closes the Network Connection.
  142. func (cli *Client) Disconnect() error {
  143. // Lock for the disconnection.
  144. cli.muConn.Lock()
  145. // Return an error if the Client has not yet connected to the Server.
  146. if cli.conn == nil {
  147. // Unlock.
  148. cli.muConn.Unlock()
  149. return ErrNotYetConnected
  150. }
  151. // Send a DISCONNECT Packet to the Server.
  152. // Ignore the error returned by the send method because
  153. // we proceed to the subsequent disconnecting processing
  154. // even if the send method returns the error.
  155. cli.send(packet.NewDISCONNECT())
  156. // Close the Network Connection.
  157. if err := cli.conn.Close(); err != nil {
  158. // Unlock.
  159. cli.muConn.Unlock()
  160. return err
  161. }
  162. // Change the state of the Network Connection to disconnected.
  163. cli.conn.disconnected = true
  164. // Send the end signal to the goroutine via the channels.
  165. select {
  166. case cli.conn.sendEnd <- struct{}{}:
  167. default:
  168. }
  169. // Unlock.
  170. cli.muConn.Unlock()
  171. // Wait until all goroutines end.
  172. cli.conn.wg.Wait()
  173. // Lock for cleaning the Network Connection.
  174. cli.muConn.Lock()
  175. // Lock for cleaning the Session.
  176. cli.muSess.Lock()
  177. // Clean the Network Connection and the Session.
  178. cli.clean()
  179. // Unlock.
  180. cli.muSess.Unlock()
  181. // Unlock.
  182. cli.muConn.Unlock()
  183. return nil
  184. }
  185. // Publish sends a PUBLISH Packet to the Server.
  186. func (cli *Client) Publish(opts *PublishOptions) error {
  187. // Lock for reading.
  188. cli.muConn.RLock()
  189. // Unlock.
  190. defer cli.muConn.RUnlock()
  191. // Check the Network Connection.
  192. if cli.conn == nil {
  193. return ErrNotYetConnected
  194. }
  195. // Initialize the options.
  196. if opts == nil {
  197. opts = &PublishOptions{}
  198. }
  199. // Create a PUBLISH Packet.
  200. p, err := cli.newPUBLISHPacket(opts)
  201. if err != nil {
  202. return err
  203. }
  204. // Send the Packet to the Server.
  205. cli.conn.send <- p
  206. return nil
  207. }
  208. // Subscribe sends a SUBSCRIBE Packet to the Server.
  209. func (cli *Client) Subscribe(opts *SubscribeOptions) error {
  210. // Lock for reading and updating.
  211. cli.muConn.Lock()
  212. // Unlock.
  213. defer cli.muConn.Unlock()
  214. // Check the Network Connection.
  215. if cli.conn == nil {
  216. return ErrNotYetConnected
  217. }
  218. // Check the existence of the options.
  219. if opts == nil || len(opts.SubReqs) == 0 {
  220. return packet.ErrInvalidNoSubReq
  221. }
  222. // Define a Packet Identifier.
  223. var packetID uint16
  224. // Define an error.
  225. var err error
  226. // Lock for updating the Session.
  227. cli.muSess.Lock()
  228. defer cli.muSess.Unlock()
  229. // Generate a Packet Identifer.
  230. if packetID, err = cli.generatePacketID(); err != nil {
  231. return err
  232. }
  233. // Create subscription requests for the SUBSCRIBE Packet.
  234. var subReqs []*packet.SubReq
  235. for _, s := range opts.SubReqs {
  236. subReqs = append(subReqs, &packet.SubReq{
  237. TopicFilter: s.TopicFilter,
  238. QoS: s.QoS,
  239. })
  240. }
  241. // Create a SUBSCRIBE Packet.
  242. p, err := packet.NewSUBSCRIBE(&packet.SUBSCRIBEOptions{
  243. PacketID: packetID,
  244. SubReqs: subReqs,
  245. })
  246. if err != nil {
  247. return err
  248. }
  249. // Set the Packet to the Session.
  250. cli.sess.sendingPackets[packetID] = p
  251. // Set the subscription information to
  252. // the Network Connection.
  253. for _, s := range opts.SubReqs {
  254. cli.conn.unackSubs[string(s.TopicFilter)] = s.Handler
  255. }
  256. // Send the Packet to the Server.
  257. cli.conn.send <- p
  258. return nil
  259. }
  260. // Unsubscribe sends an UNSUBSCRIBE Packet to the Server.
  261. func (cli *Client) Unsubscribe(opts *UnsubscribeOptions) error {
  262. // Lock for reading and updating.
  263. cli.muConn.Lock()
  264. // Unlock.
  265. defer cli.muConn.Unlock()
  266. // Check the Network Connection.
  267. if cli.conn == nil {
  268. return ErrNotYetConnected
  269. }
  270. // Check the existence of the options.
  271. if opts == nil || len(opts.TopicFilters) == 0 {
  272. return packet.ErrNoTopicFilter
  273. }
  274. // Define a Packet Identifier.
  275. var packetID uint16
  276. // Define an error.
  277. var err error
  278. // Lock for updating the Session.
  279. cli.muSess.Lock()
  280. defer cli.muSess.Unlock()
  281. // Generate a Packet Identifer.
  282. if packetID, err = cli.generatePacketID(); err != nil {
  283. return err
  284. }
  285. // Create an UNSUBSCRIBE Packet.
  286. p, err := packet.NewUNSUBSCRIBE(&packet.UNSUBSCRIBEOptions{
  287. PacketID: packetID,
  288. TopicFilters: opts.TopicFilters,
  289. })
  290. if err != nil {
  291. return err
  292. }
  293. // Set the Packet to the Session.
  294. cli.sess.sendingPackets[packetID] = p
  295. // Send the Packet to the Server.
  296. cli.conn.send <- p
  297. return nil
  298. }
  299. // Terminate ternimates the Client.
  300. func (cli *Client) Terminate() {
  301. // Send the end signal to the disconnecting goroutine.
  302. cli.disconnEndc <- struct{}{}
  303. // Wait until all goroutines end.
  304. cli.wg.Wait()
  305. }
  306. // send sends an MQTT Control Packet to the Server.
  307. func (cli *Client) send(p packet.Packet) error {
  308. // Return an error if the Client has not yet connected to the Server.
  309. if cli.conn == nil {
  310. return ErrNotYetConnected
  311. }
  312. // Write the Packet to the buffered writer.
  313. if _, err := p.WriteTo(cli.conn.w); err != nil {
  314. return err
  315. }
  316. // Flush the buffered writer.
  317. return cli.conn.w.Flush()
  318. }
  319. // sendCONNECT creates a CONNECT Packet and sends it to the Server.
  320. func (cli *Client) sendCONNECT(opts *packet.CONNECTOptions) error {
  321. // Initialize the options.
  322. if opts == nil {
  323. opts = &packet.CONNECTOptions{}
  324. }
  325. // Create a CONNECT Packet.
  326. p, err := packet.NewCONNECT(opts)
  327. if err != nil {
  328. return err
  329. }
  330. // Send a CONNECT Packet to the Server.
  331. return cli.send(p)
  332. }
  333. // receive receives an MQTT Control Packet from the Server.
  334. func (cli *Client) receive() (packet.Packet, error) {
  335. // Return an error if the Client has not yet connected to the Server.
  336. if cli.conn == nil {
  337. return nil, ErrNotYetConnected
  338. }
  339. // Get the first byte of the Packet.
  340. b, err := cli.conn.r.ReadByte()
  341. if err != nil {
  342. return nil, err
  343. }
  344. // Create the Fixed header.
  345. fixedHeader := packet.FixedHeader([]byte{b})
  346. // Get and decode the Remaining Length.
  347. var mp uint32 = 1 // multiplier
  348. var rl uint32 // the Remaining Length
  349. for {
  350. // Get the next byte of the Packet.
  351. b, err = cli.conn.r.ReadByte()
  352. if err != nil {
  353. return nil, err
  354. }
  355. fixedHeader = append(fixedHeader, b)
  356. rl += uint32(b&0x7F) * mp
  357. if b&0x80 == 0 {
  358. break
  359. }
  360. mp *= 128
  361. }
  362. // Create the Remaining (the Variable header and the Payload).
  363. remaining := make([]byte, rl)
  364. if rl > 0 {
  365. // Get the remaining of the Packet.
  366. if _, err = io.ReadFull(cli.conn.r, remaining); err != nil {
  367. return nil, err
  368. }
  369. }
  370. // Create and return a Packet.
  371. return packet.NewFromBytes(fixedHeader, remaining)
  372. }
  373. // clean cleans the Network Connection and the Session if necessary.
  374. func (cli *Client) clean() {
  375. // Clean the Network Connection.
  376. cli.conn = nil
  377. // Clean the Session if the Clean Session is true.
  378. if cli.sess != nil && cli.sess.cleanSession {
  379. cli.sess = nil
  380. }
  381. }
  382. // waitPacket waits for receiving the Packet.
  383. func (cli *Client) waitPacket(packetc <-chan struct{}, timeout time.Duration, errTimeout error) {
  384. defer cli.conn.wg.Done()
  385. var timeoutc <-chan time.Time
  386. if timeout > 0 {
  387. timeoutc = time.After(timeout * time.Second)
  388. }
  389. select {
  390. case <-packetc:
  391. case <-timeoutc:
  392. // Handle the timeout error.
  393. cli.handleErrorAndDisconn(errTimeout)
  394. }
  395. }
  396. // receivePackets receives Packets from the Server.
  397. func (cli *Client) receivePackets() {
  398. defer func() {
  399. // Close the channel which handles a signal which
  400. // notifies the arrival of the CONNACK Packet.
  401. close(cli.conn.connack)
  402. cli.conn.wg.Done()
  403. }()
  404. for {
  405. // Receive a Packet from the Server.
  406. p, err := cli.receive()
  407. if err != nil {
  408. // Handle the error and disconnect
  409. // the Network Connection.
  410. cli.handleErrorAndDisconn(err)
  411. // End the goroutine.
  412. return
  413. }
  414. // Handle the Packet.
  415. if err := cli.handlePacket(p); err != nil {
  416. // Handle the error and disconnect
  417. // the Network Connection.
  418. cli.handleErrorAndDisconn(err)
  419. // End the goroutine.
  420. return
  421. }
  422. }
  423. }
  424. // handlePacket handles the Packet.
  425. func (cli *Client) handlePacket(p packet.Packet) error {
  426. // Get the MQTT Control Packet type.
  427. ptype, err := p.Type()
  428. if err != nil {
  429. return err
  430. }
  431. switch ptype {
  432. case packet.TypeCONNACK:
  433. cli.handleCONNACK()
  434. return nil
  435. case packet.TypePUBLISH:
  436. return cli.handlePUBLISH(p)
  437. case packet.TypePUBACK:
  438. return cli.handlePUBACK(p)
  439. case packet.TypePUBREC:
  440. return cli.handlePUBREC(p)
  441. case packet.TypePUBREL:
  442. return cli.handlePUBREL(p)
  443. case packet.TypePUBCOMP:
  444. return cli.handlePUBCOMP(p)
  445. case packet.TypeSUBACK:
  446. return cli.handleSUBACK(p)
  447. case packet.TypeUNSUBACK:
  448. return cli.handleUNSUBACK(p)
  449. case packet.TypePINGRESP:
  450. return cli.handlePINGRESP()
  451. default:
  452. return packet.ErrInvalidPacketType
  453. }
  454. }
  455. // handleCONNACK handles the CONNACK Packet.
  456. func (cli *Client) handleCONNACK() {
  457. // Notify the arrival of the CONNACK Packet if possible.
  458. select {
  459. case cli.conn.connack <- struct{}{}:
  460. default:
  461. }
  462. }
  463. // handlePUBLISH handles the PUBLISH Packet.
  464. func (cli *Client) handlePUBLISH(p packet.Packet) error {
  465. // Get the PUBLISH Packet.
  466. publish := p.(*packet.PUBLISH)
  467. switch publish.QoS {
  468. case mqtt.QoS0:
  469. // Lock for reading.
  470. cli.muConn.RLock()
  471. // Unlock.
  472. defer cli.muConn.RUnlock()
  473. // Handle the Application Message.
  474. cli.handleMessage(publish.TopicName, publish.Message)
  475. return nil
  476. case mqtt.QoS1:
  477. // Lock for reading.
  478. cli.muConn.RLock()
  479. // Unlock.
  480. defer cli.muConn.RUnlock()
  481. // Handle the Application Message.
  482. cli.handleMessage(publish.TopicName, publish.Message)
  483. // Create a PUBACK Packet.
  484. puback, err := packet.NewPUBACK(&packet.PUBACKOptions{
  485. PacketID: publish.PacketID,
  486. })
  487. if err != nil {
  488. return err
  489. }
  490. // Send the Packet to the Server.
  491. cli.conn.send <- puback
  492. return nil
  493. default:
  494. // Lock for update.
  495. cli.muSess.Lock()
  496. // Unlock.
  497. defer cli.muSess.Unlock()
  498. // Validate the Packet Identifier.
  499. if _, exist := cli.sess.receivingPackets[publish.PacketID]; exist {
  500. return packet.ErrInvalidPacketID
  501. }
  502. // Set the Packet to the Session.
  503. cli.sess.receivingPackets[publish.PacketID] = p
  504. // Create a PUBREC Packet.
  505. pubrec, err := packet.NewPUBREC(&packet.PUBRECOptions{
  506. PacketID: publish.PacketID,
  507. })
  508. if err != nil {
  509. return err
  510. }
  511. // Send the Packet to the Server.
  512. cli.conn.send <- pubrec
  513. return nil
  514. }
  515. }
  516. // handlePUBACK handles the PUBACK Packet.
  517. func (cli *Client) handlePUBACK(p packet.Packet) error {
  518. // Lock for update.
  519. cli.muSess.Lock()
  520. // Unlock.
  521. defer cli.muSess.Unlock()
  522. // Extract the Packet Identifier of the Packet.
  523. id := p.(*packet.PUBACK).PacketID
  524. // Validate the Packet Identifier.
  525. if err := cli.validatePacketID(cli.sess.sendingPackets, id, packet.TypePUBLISH); err != nil {
  526. return err
  527. }
  528. // Delete the PUBLISH Packet from the Session.
  529. delete(cli.sess.sendingPackets, id)
  530. return nil
  531. }
  532. // handlePUBREC handles the PUBREC Packet.
  533. func (cli *Client) handlePUBREC(p packet.Packet) error {
  534. // Lock for update.
  535. cli.muSess.Lock()
  536. // Unlock.
  537. defer cli.muSess.Unlock()
  538. // Extract the Packet Identifier of the Packet.
  539. id := p.(*packet.PUBREC).PacketID
  540. // Validate the Packet Identifier.
  541. if err := cli.validatePacketID(cli.sess.sendingPackets, id, packet.TypePUBLISH); err != nil {
  542. return err
  543. }
  544. // Create a PUBREL Packet.
  545. pubrel, err := packet.NewPUBREL(&packet.PUBRELOptions{
  546. PacketID: id,
  547. })
  548. if err != nil {
  549. return err
  550. }
  551. // Set the PUBREL Packet to the Session.
  552. cli.sess.sendingPackets[id] = pubrel
  553. // Send the Packet to the Server.
  554. cli.conn.send <- pubrel
  555. return nil
  556. }
  557. // handlePUBREL handles the PUBREL Packet.
  558. func (cli *Client) handlePUBREL(p packet.Packet) error {
  559. // Lock for update.
  560. cli.muSess.Lock()
  561. // Unlock.
  562. defer cli.muSess.Unlock()
  563. // Extract the Packet Identifier of the Packet.
  564. id := p.(*packet.PUBREL).PacketID
  565. // Validate the Packet Identifier.
  566. if err := cli.validatePacketID(cli.sess.receivingPackets, id, packet.TypePUBLISH); err != nil {
  567. return err
  568. }
  569. // Get the Packet from the Session.
  570. publish := cli.sess.receivingPackets[id].(*packet.PUBLISH)
  571. // Lock for reading.
  572. cli.muConn.RLock()
  573. // Handle the Application Message.
  574. cli.handleMessage(publish.TopicName, publish.Message)
  575. // Unlock.
  576. cli.muConn.RUnlock()
  577. // Delete the Packet from the Session
  578. delete(cli.sess.receivingPackets, id)
  579. // Create a PUBCOMP Packet.
  580. pubcomp, err := packet.NewPUBCOMP(&packet.PUBCOMPOptions{
  581. PacketID: id,
  582. })
  583. if err != nil {
  584. return err
  585. }
  586. // Send the Packet to the Server.
  587. cli.conn.send <- pubcomp
  588. return nil
  589. }
  590. // handlePUBCOMP handles the PUBCOMP Packet.
  591. func (cli *Client) handlePUBCOMP(p packet.Packet) error {
  592. // Lock for update.
  593. cli.muSess.Lock()
  594. // Unlock.
  595. defer cli.muSess.Unlock()
  596. // Extract the Packet Identifier of the Packet.
  597. id := p.(*packet.PUBCOMP).PacketID
  598. // Validate the Packet Identifier.
  599. if err := cli.validatePacketID(cli.sess.sendingPackets, id, packet.TypePUBREL); err != nil {
  600. return err
  601. }
  602. // Delete the PUBREL Packet from the Session.
  603. delete(cli.sess.sendingPackets, id)
  604. return nil
  605. }
  606. // handleSUBACK handles the SUBACK Packet.
  607. func (cli *Client) handleSUBACK(p packet.Packet) error {
  608. // Lock for update.
  609. cli.muConn.Lock()
  610. cli.muSess.Lock()
  611. // Unlock.
  612. defer cli.muConn.Unlock()
  613. defer cli.muSess.Unlock()
  614. // Extract the Packet Identifier of the Packet.
  615. id := p.(*packet.SUBACK).PacketID
  616. // Validate the Packet Identifier.
  617. if err := cli.validatePacketID(cli.sess.sendingPackets, id, packet.TypeSUBSCRIBE); err != nil {
  618. return err
  619. }
  620. // Get the subscription requests of the SUBSCRIBE Packet.
  621. subreqs := cli.sess.sendingPackets[id].(*packet.SUBSCRIBE).SubReqs
  622. // Delete the SUBSCRIBE Packet from the Session.
  623. delete(cli.sess.sendingPackets, id)
  624. // Get the Return Codes of the SUBACK Packet.
  625. returnCodes := p.(*packet.SUBACK).ReturnCodes
  626. // Check the lengths of the Return Codes.
  627. if len(returnCodes) != len(subreqs) {
  628. return ErrInvalidSUBACK
  629. }
  630. // Set the subscriptions to the Network Connection.
  631. for i, code := range returnCodes {
  632. // Skip if the Return Code is failure.
  633. if code == packet.SUBACKRetFailure {
  634. continue
  635. }
  636. // Get the Topic Filter.
  637. topicFilter := string(subreqs[i].TopicFilter)
  638. // Move the subscription information from
  639. // unackSubs to ackedSubs.
  640. cli.conn.ackedSubs[topicFilter] = cli.conn.unackSubs[topicFilter]
  641. delete(cli.conn.unackSubs, topicFilter)
  642. }
  643. return nil
  644. }
  645. // handleUNSUBACK handles the UNSUBACK Packet.
  646. func (cli *Client) handleUNSUBACK(p packet.Packet) error {
  647. // Lock for update.
  648. cli.muConn.Lock()
  649. cli.muSess.Lock()
  650. // Unlock.
  651. defer cli.muConn.Unlock()
  652. defer cli.muSess.Unlock()
  653. // Extract the Packet Identifier of the Packet.
  654. id := p.(*packet.UNSUBACK).PacketID
  655. // Validate the Packet Identifier.
  656. if err := cli.validatePacketID(cli.sess.sendingPackets, id, packet.TypeUNSUBSCRIBE); err != nil {
  657. return err
  658. }
  659. // Get the Topic Filters of the UNSUBSCRIBE Packet.
  660. topicFilters := cli.sess.sendingPackets[id].(*packet.UNSUBSCRIBE).TopicFilters
  661. // Delete the UNSUBSCRIBE Packet from the Session.
  662. delete(cli.sess.sendingPackets, id)
  663. // Delete the Topic Filters from the Network Connection.
  664. for _, topicFilter := range topicFilters {
  665. delete(cli.conn.ackedSubs, string(topicFilter))
  666. }
  667. return nil
  668. }
  669. // handlePINGRESP handles the PINGRESP Packet.
  670. func (cli *Client) handlePINGRESP() error {
  671. // Lock for reading and updating pingrespcs.
  672. cli.conn.muPINGRESPs.Lock()
  673. // Check the length of pingrespcs.
  674. if len(cli.conn.pingresps) == 0 {
  675. // Unlock.
  676. cli.conn.muPINGRESPs.Unlock()
  677. // Return an error if there is no channel in pingrespcs.
  678. return ErrInvalidPINGRESP
  679. }
  680. // Get the first channel in pingrespcs.
  681. pingrespc := cli.conn.pingresps[0]
  682. // Remove the first channel from pingrespcs.
  683. cli.conn.pingresps = cli.conn.pingresps[1:]
  684. // Unlock.
  685. cli.conn.muPINGRESPs.Unlock()
  686. // Notify the arrival of the PINGRESP Packet if possible.
  687. select {
  688. case pingrespc <- struct{}{}:
  689. default:
  690. }
  691. return nil
  692. }
  693. // handleError handles the error and disconnects
  694. // the Network Connection.
  695. func (cli *Client) handleErrorAndDisconn(err error) {
  696. // Lock for reading.
  697. cli.muConn.RLock()
  698. // Ignore the error and end the process
  699. // if the Network Connection has already
  700. // been disconnected.
  701. if cli.conn == nil || cli.conn.disconnected {
  702. // Unlock.
  703. cli.muConn.RUnlock()
  704. return
  705. }
  706. // Unlock.
  707. cli.muConn.RUnlock()
  708. // Handle the error.
  709. if cli.errorHandler != nil {
  710. cli.errorHandler(err)
  711. }
  712. // Send a disconnect signal to the goroutine
  713. // via the channel if possible.
  714. select {
  715. case cli.disconnc <- struct{}{}:
  716. default:
  717. }
  718. }
  719. // sendPackets sends Packets to the Server.
  720. func (cli *Client) sendPackets(keepAlive time.Duration, pingrespTimeout time.Duration) {
  721. defer func() {
  722. // Lock for reading and updating pingrespcs.
  723. cli.conn.muPINGRESPs.Lock()
  724. // Close the channels which handle a signal which
  725. // notifies the arrival of the PINGREQ Packet.
  726. for _, pingresp := range cli.conn.pingresps {
  727. close(pingresp)
  728. }
  729. // Initialize pingrespcs
  730. cli.conn.pingresps = make([]chan struct{}, 0)
  731. // Unlock.
  732. cli.conn.muPINGRESPs.Unlock()
  733. cli.conn.wg.Done()
  734. }()
  735. for {
  736. var keepAlivec <-chan time.Time
  737. if keepAlive > 0 {
  738. keepAlivec = time.After(keepAlive * time.Second)
  739. }
  740. select {
  741. case p := <-cli.conn.send:
  742. // Lock for sending the Packet.
  743. cli.muConn.RLock()
  744. // Send the Packet to the Server.
  745. err := cli.send(p)
  746. // Unlock.
  747. cli.muConn.RUnlock()
  748. if err != nil {
  749. // Handle the error and disconnect the Network Connection.
  750. cli.handleErrorAndDisconn(err)
  751. // End this function.
  752. return
  753. }
  754. case <-keepAlivec:
  755. // Lock for sending the Packet.
  756. cli.muConn.RLock()
  757. // Send a PINGREQ Packet to the Server.
  758. err := cli.send(packet.NewPINGREQ())
  759. // Unlock.
  760. cli.muConn.RUnlock()
  761. if err != nil {
  762. // Handle the error and disconnect the Network Connection.
  763. cli.handleErrorAndDisconn(err)
  764. // End this function.
  765. return
  766. }
  767. // Create a channel which handles the signal to notify the arrival of
  768. // the PINGRESP Packet.
  769. pingresp := make(chan struct{})
  770. // Lock for appending the channel to pingrespcs.
  771. cli.conn.muPINGRESPs.Lock()
  772. // Append the channel to pingrespcs.
  773. cli.conn.pingresps = append(cli.conn.pingresps, pingresp)
  774. // Unlock.
  775. cli.conn.muPINGRESPs.Unlock()
  776. // Launch a goroutine which waits for receiving the PINGRESP Packet.
  777. cli.conn.wg.Add(1)
  778. go cli.waitPacket(pingresp, pingrespTimeout, ErrPINGRESPTimeout)
  779. case <-cli.conn.sendEnd:
  780. // End this function.
  781. return
  782. }
  783. }
  784. }
  785. // generatePacketID generates and returns a Packet Identifier.
  786. func (cli *Client) generatePacketID() (uint16, error) {
  787. // Define a Packet Identifier.
  788. id := minPacketID
  789. for {
  790. // Find a Packet Identifier which does not used.
  791. if _, exist := cli.sess.sendingPackets[id]; !exist {
  792. // Return the Packet Identifier.
  793. return id, nil
  794. }
  795. if id == maxPacketID {
  796. break
  797. }
  798. id++
  799. }
  800. // Return an error if available ids are not found.
  801. return 0, ErrPacketIDExhaused
  802. }
  803. // newPUBLISHPacket creates and returns a PUBLISH Packet.
  804. func (cli *Client) newPUBLISHPacket(opts *PublishOptions) (packet.Packet, error) {
  805. // Define a Packet Identifier.
  806. var packetID uint16
  807. if opts.QoS != mqtt.QoS0 {
  808. // Lock for reading and updating the Session.
  809. cli.muSess.Lock()
  810. defer cli.muSess.Unlock()
  811. // Define an error.
  812. var err error
  813. // Generate a Packet Identifer.
  814. if packetID, err = cli.generatePacketID(); err != nil {
  815. return nil, err
  816. }
  817. }
  818. // Create a PUBLISH Packet.
  819. p, err := packet.NewPUBLISH(&packet.PUBLISHOptions{
  820. QoS: opts.QoS,
  821. Retain: opts.Retain,
  822. TopicName: opts.TopicName,
  823. PacketID: packetID,
  824. Message: opts.Message,
  825. })
  826. if err != nil {
  827. return nil, err
  828. }
  829. if opts.QoS != mqtt.QoS0 {
  830. // Set the Packet to the Session.
  831. cli.sess.sendingPackets[packetID] = p
  832. }
  833. // Return the Packet.
  834. return p, nil
  835. }
  836. // validateSendingPacketID checks if the Packet which has
  837. // the Packet Identifier and the MQTT Control Packet type
  838. // specified by the parameters exists in the Session's
  839. // sendingPackets.
  840. func (cli *Client) validatePacketID(packets map[uint16]packet.Packet, id uint16, ptype byte) error {
  841. // Extract the Packet.
  842. p, exist := packets[id]
  843. if !exist {
  844. // Return an error if there is no Packet which has the Packet Identifier
  845. // specified by the parameter.
  846. return packet.ErrInvalidPacketID
  847. }
  848. // Extract the MQTT Control Packet type of the Packet.
  849. t, err := p.Type()
  850. if err != nil {
  851. return err
  852. }
  853. if t != ptype {
  854. // Return an error if the Packet's MQTT Control Packet type does not
  855. // equal to one specified by the parameter.
  856. return packet.ErrInvalidPacketID
  857. }
  858. return nil
  859. }
  860. // handleMessage handles the Application Message.
  861. func (cli *Client) handleMessage(topicName, message []byte) {
  862. // Get the string of the Topic Name.
  863. topicNameStr := string(topicName)
  864. for topicFilter, handler := range cli.conn.ackedSubs {
  865. if handler == nil || !match(topicNameStr, topicFilter) {
  866. continue
  867. }
  868. // Execute the handler.
  869. go handler(topicName, message)
  870. }
  871. }
  872. // New creates and returns a Client.
  873. func New(opts *Options) *Client {
  874. // Initialize the options.
  875. if opts == nil {
  876. opts = &Options{}
  877. }
  878. // Create a Client.
  879. cli := &Client{
  880. disconnc: make(chan struct{}, 1),
  881. disconnEndc: make(chan struct{}),
  882. errorHandler: opts.ErrorHandler,
  883. }
  884. // Launch a goroutine which disconnects the Network Connection.
  885. cli.wg.Add(1)
  886. go func() {
  887. defer func() {
  888. cli.wg.Done()
  889. }()
  890. for {
  891. select {
  892. case <-cli.disconnc:
  893. if err := cli.Disconnect(); err != nil {
  894. if cli.errorHandler != nil {
  895. cli.errorHandler(err)
  896. }
  897. }
  898. case <-cli.disconnEndc:
  899. // End the goroutine.
  900. return
  901. }
  902. }
  903. }()
  904. // Return the Client.
  905. return cli
  906. }
  907. // match checks if the Topic Name matches the Topic Filter.
  908. func match(topicName, topicFilter string) bool {
  909. // Tokenize the Topic Name.
  910. nameTokens := strings.Split(topicName, "/")
  911. nameTokensLen := len(nameTokens)
  912. // Tolenize the Topic Filter.
  913. filterTokens := strings.Split(topicFilter, "/")
  914. for i, t := range filterTokens {
  915. switch t {
  916. case "#":
  917. return i != 0 || !strings.HasPrefix(nameTokens[0], "$")
  918. case "+":
  919. if i == 0 && strings.HasPrefix(nameTokens[0], "$") {
  920. return false
  921. }
  922. if nameTokensLen <= i {
  923. return false
  924. }
  925. default:
  926. if nameTokensLen <= i || t != nameTokens[i] {
  927. return false
  928. }
  929. }
  930. }
  931. return len(filterTokens) == nameTokensLen
  932. }