Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: eclipse-paho/paho.mqtt.golang
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.4.2
Choose a base ref
...
head repository: eclipse-paho/paho.mqtt.golang
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref

Commits on Dec 20, 2022

  1. Fixed to avoid panic when keep alive is 1 #531.

    Signed-off-by: Daichi Tomaru <banaoa7543@gmail.com>
    tomatod committed Dec 20, 2022
    Copy the full SHA
    a0e4e11 View commit details

Commits on Dec 21, 2022

  1. Merge pull request #622 from tomatod/fix_panic_when_keepalive_zero

    Resolve panic when keep alive is set to 1 (second)
    
    #closes 531
    MattBrittan authored Dec 21, 2022
    Copy the full SHA
    d9dea69 View commit details

Commits on Dec 22, 2022

  1. Clear URL User before creating websocket connection (allowing MQTT us…

    …ername/password to be specified in URL)
    
    Closes #623
    MattBrittan committed Dec 22, 2022
    Copy the full SHA
    3a8066f View commit details
  2. Merge pull request #624 from ChIoT-Tech/master

    Allow MQTT username/password in URL when connecting via Websockets
    MattBrittan authored Dec 22, 2022
    Copy the full SHA
    4b066a0 View commit details

Commits on Dec 31, 2022

  1. Add back-off controller for sleep before reconnection when connection…

    … lost is detected immediately after connecting. #589
    
    Signed-off-by: Daichi Tomaru <banaoa7543@gmail.com>
    tomatod committed Dec 31, 2022
    Copy the full SHA
    d174b9a View commit details

Commits on Jan 8, 2023

  1. Merge pull request #625 from tomatod/connect_retry_backoff - Prevent …

    …reconnect loops
    
    Add back-off controller for sleep time of reconnection  when connection lost is detected immediately after connecting. #589
    This issue could be caused by an invalid publish request (which leads to the broker dropping the connection immediately).
    MattBrittan authored Jan 8, 2023
    Copy the full SHA
    e3fa503 View commit details

Commits on Mar 6, 2023

  1. Copy the full SHA
    602b06b View commit details
  2. Copy the full SHA
    c1df6d0 View commit details

Commits on Mar 13, 2023

  1. Fix for PR #624 as per comment from @Tieske (previous version worked …

    …for the first, but not subsequent, connections)
    
    Closes #623
    MattBrittan committed Mar 13, 2023
    Copy the full SHA
    e3d0846 View commit details
  2. Merge pull request #638 from ChIoT-Tech/master

    Resolve issue introduced in PR #624 (Websocket authentication)  as per comment from @Tieske (previous version worked for the first, but not subsequent, connections)
    MattBrittan authored Mar 13, 2023
    Copy the full SHA
    5e01522 View commit details

Commits on Mar 16, 2023

  1. Copy the full SHA
    aa0a8ad View commit details

Commits on Aug 10, 2023

  1. Wrap connection errors

    adriansmares committed Aug 10, 2023
    Copy the full SHA
    ace7b4e View commit details
  2. Update dependencies

    adriansmares committed Aug 10, 2023
    Copy the full SHA
    9ac3838 View commit details
  3. Wrap connection network errors so they are accessible (requires go1.20)

    Allows the use of [Errors.Is](https://pkg.go.dev/errors#Is) to access further information on the reason a connection attempt failed.
    Note: This used functionality introduced in Go 1.20 so that is now in `go.mod` (in line with supporting the current and one old release).
    MattBrittan authored Aug 10, 2023
    Copy the full SHA
    7b759f1 View commit details

Commits on Nov 7, 2023

  1. Copy the full SHA
    6eae012 View commit details
  2. Merge pull request #658 from ChIoT-Tech/master

    Update dependencies (mainly to address x/net security warnings)
    MattBrittan authored Nov 7, 2023
    Copy the full SHA
    c26bc8b View commit details
  3. Clarify use of token.WaitTimeout

    Ref issue #656
    MattBrittan committed Nov 7, 2023
    Copy the full SHA
    e855a0b View commit details
  4. Merge pull request #659 from ChIoT-Tech/master

    Clarify use of token.WaitTimeout
    MattBrittan authored Nov 7, 2023
    Copy the full SHA
    5786441 View commit details

Commits on Dec 19, 2023

  1. Copy the full SHA
    6f31b3d View commit details
  2. Merge pull request #662 from avmunm/fix-661-make-ClientOptionsReader-…

    …mockable
    
    Add NewClientOptionsReader for mocking purposes.
    
    closes #661
    MattBrittan authored Dec 19, 2023
    Copy the full SHA
    6d8e0a7 View commit details

Commits on Jan 6, 2024

  1. Add paho project policy docs

    Copied from https://www.eclipse.org/projects/tools/documentation.php?id=iot.paho
    SECURITY.md modified to meet this projects needs.
    MattBrittan committed Jan 6, 2024
    Copy the full SHA
    68320a2 View commit details
  2. Add paho project policy docs

    Copied from https://www.eclipse.org/projects/tools/documentation.php?id=iot.paho
    SECURITY.md modified to meet this projects needs.
    MattBrittan authored Jan 6, 2024
    Copy the full SHA
    41dbed3 View commit details

Commits on Jan 7, 2024

  1. Copy the full SHA
    67a8ac3 View commit details
  2. Copy the full SHA
    8f6a9be View commit details

Commits on Feb 9, 2024

  1. fix: fix keep-alive timeouts on small intervals

    The check interval calculation in the `ping.go` file has been adjusted for when the keep-alive time is less than or equal to 10. Instead of dividing by 2, the check interval now is calculated with a division by 4, ensuring a more frequent ping check. This is required as MQTT brokers typically detect a timeout if the client did not send a ping within the specified keep-alive interval times 1.5. If the client only checks for due pings every half-interval, this commonly leads to timeouts. An example: a 5 second interval means that every 2.5 seconds checks are made for due pings. However, if checks have occurred at 2.49 seconds since the last ping and 4.99 seconds, the next one might happen slightly after 7.5 seconds, making the broker detect a timeout. Therefore, we increase the frequency of ping checks by reducing the check interval.
    lefinal committed Feb 9, 2024
    Copy the full SHA
    08d0637 View commit details
  2. Increase frequency of PING checks when using low (<10s) KeepAlive

    fix keep-alive timeouts on small intervals
    MattBrittan authored Feb 9, 2024
    Copy the full SHA
    f21bdb1 View commit details

Commits on Mar 21, 2024

  1. Replace the time.After with the timer for efficiency.

    Signed-off-by: Daniele Vasselli <vasselli.daniele@gmail.com>
    Daniele Vasselli committed Mar 21, 2024
    Copy the full SHA
    d0120ee View commit details
  2. Remove the channel draining since it is not reused.

    Signed-off-by Daniele Vasselli <vasselli.daniele@gmail.com>
    Daniele Vasselli committed Mar 21, 2024
    Copy the full SHA
    8b638fb View commit details
  3. Merge pull request #671 from DVasselli/clientTimer

    Replace the time.After with the timer for efficiency.
    MattBrittan authored Mar 21, 2024
    Copy the full SHA
    1a63b63 View commit details

Commits on Mar 26, 2024

  1. Merge pull request #665 from vruge/patch-1

    fix: deprecation warnings for ioutil
    MattBrittan authored Mar 26, 2024
    Copy the full SHA
    b7215e4 View commit details

Commits on Apr 2, 2024

  1. Resolve "cannot use os.ReadDir" introduced in PR #665

    Matt Brittan committed Apr 2, 2024
    Copy the full SHA
    fe38f80 View commit details

Commits on May 19, 2024

  1. Copy the full SHA
    386b731 View commit details

Commits on May 20, 2024

  1. Copy the full SHA
    71f9814 View commit details
  2. Resolve potential goroutine leak wien Disconnect called

    If `Disconnect` was called whilst a connection attempt was in progress a goroutine leak occurred. This change allows the connection attempt to complete as normal (including calling the `OnConnect` callback) before the Disconnect is handled.
    
    closes #675
    MattBrittan authored May 20, 2024
    Copy the full SHA
    6801721 View commit details

Commits on Jul 31, 2024

  1. Update dependencies

    Matt Brittan committed Jul 31, 2024
    Copy the full SHA
    8768f3b View commit details
  2. Merge pull request #683 from ChIoT-Tech/master

    Update dependencies in preperation for v1.5 release
    MattBrittan authored Jul 31, 2024
    Copy the full SHA
    714f7c0 View commit details

Commits on Sep 3, 2024

  1. Copy the full SHA
    91231b3 View commit details
  2. Resolve "missing go.sum entry" in docker build

    Update go dependencies for pub and sub in docker containers as part of the build process
    
    closes #690
    MattBrittan authored Sep 3, 2024
    Copy the full SHA
    514b7fa View commit details

Commits on Mar 4, 2025

  1. Update Go version and dependencies

    This is mainly to resolve warnings issued by security scanners.
    Matt Brittan committed Mar 4, 2025
    Copy the full SHA
    3c20bed View commit details
  2. Update Go version and dependencies

    This is mainly to resolve warnings issued by security scanners.
    MattBrittan authored Mar 4, 2025
    Copy the full SHA
    22cef27 View commit details
Showing with 460 additions and 64 deletions.
  1. +93 −0 CODE_OF_CONDUCT.md
  2. +4 −2 README.md
  3. +13 −0 SECURITY.md
  4. +104 −0 backoff.go
  5. +68 −0 backoff_test.go
  6. +25 −22 client.go
  7. +6 −1 client_test.go
  8. +1 −1 cmd/docker/publisher/dockerfile
  9. +6 −3 cmd/docker/publisher/go.mod
  10. +23 −2 cmd/docker/publisher/go.sum
  11. +1 −1 cmd/docker/subscriber/dockerfile
  12. +6 −3 cmd/docker/subscriber/go.mod
  13. +23 −0 cmd/docker/subscriber/go.sum
  14. +2 −2 cmd/ssl/main.go
  15. +9 −4 filestore.go
  16. +1 −3 fvt/docker/docker-compose.yml
  17. +4 −4 go.mod
  18. +10 −10 go.sum
  19. +6 −2 netconn.go
  20. +15 −0 options_reader.go
  21. +4 −4 ping.go
  22. +18 −0 token.go
  23. +18 −0 token_test.go
93 changes: 93 additions & 0 deletions CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Community Code of Conduct

**Version 2.0
January 1, 2023**

## Our Pledge

In the interest of fostering an open and welcoming environment, we as community members, contributors, Committers[^1], and Project Leads (collectively "Contributors") pledge to make participation in our projects and our community a harassment-free and inclusive experience for everyone.

This Community Code of Conduct ("Code") outlines our behavior expectations as members of our community in all Eclipse Foundation activities, both offline and online. It is not intended to govern scenarios or behaviors outside of the scope of Eclipse Foundation activities. Nor is it intended to replace or supersede the protections offered to all our community members under the law. Please follow both the spirit and letter of this Code and encourage other Contributors to follow these principles into our work. Failure to read or acknowledge this Code does not excuse a Contributor from compliance with the Code.

## Our Standards

Examples of behavior that contribute to creating a positive and professional environment include:

- Using welcoming and inclusive language;
- Actively encouraging all voices;
- Helping others bring their perspectives and listening actively. If you find yourself dominating a discussion, it is especially important to encourage other voices to join in;
- Being respectful of differing viewpoints and experiences;
- Gracefully accepting constructive criticism;
- Focusing on what is best for the community;
- Showing empathy towards other community members;
- Being direct but professional; and
- Leading by example by holding yourself and others accountable

Examples of unacceptable behavior by Contributors include:

- The use of sexualized language or imagery;
- Unwelcome sexual attention or advances;
- Trolling, insulting/derogatory comments, and personal or political attacks;
- Public or private harassment, repeated harassment;
- Publishing others' private information, such as a physical or electronic address, without explicit permission;
- Violent threats or language directed against another person;
- Sexist, racist, or otherwise discriminatory jokes and language;
- Posting sexually explicit or violent material;
- Sharing private content, such as emails sent privately or non-publicly, or unlogged forums such as IRC channel history;
- Personal insults, especially those using racist or sexist terms;
- Excessive or unnecessary profanity;
- Advocating for, or encouraging, any of the above behavior; and
- Other conduct which could reasonably be considered inappropriate in a professional setting

## Our Responsibilities

With the support of the Eclipse Foundation employees, consultants, officers, and directors (collectively, the "Staff"), Committers, and Project Leads, the Eclipse Foundation Conduct Committee (the "Conduct Committee") is responsible for clarifying the standards of acceptable behavior. The Conduct Committee takes appropriate and fair corrective action in response to any instances of unacceptable behavior.

## Scope

This Code applies within all Project, Working Group, and Interest Group spaces and communication channels of the Eclipse Foundation (collectively, "Eclipse spaces"), within any Eclipse-organized event or meeting, and in public spaces when an individual is representing an Eclipse Foundation Project, Working Group, Interest Group, or their communities. Examples of representing a Project or community include posting via an official social media account, personal accounts, or acting as an appointed representative at an online or offline event. Representation of Projects, Working Groups, and Interest Groups may be further defined and clarified by Committers, Project Leads, or the Staff.

## Enforcement

Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the Conduct Committee via conduct@eclipse-foundation.org. All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances. Without the explicit consent of the reporter, the Conduct Committee is obligated to maintain confidentiality with regard to the reporter of an incident. The Conduct Committee is further obligated to ensure that the respondent is provided with sufficient information about the complaint to reply. If such details cannot be provided while maintaining confidentiality, the Conduct Committee will take the respondent‘s inability to provide a defense into account in its deliberations and decisions. Further details of enforcement guidelines may be posted separately.

Staff, Committers and Project Leads have the right to report, remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code, or to block temporarily or permanently any Contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful. Any such actions will be reported to the Conduct Committee for transparency and record keeping.

Any Staff (including officers and directors of the Eclipse Foundation), Committers, Project Leads, or Conduct Committee members who are the subject of a complaint to the Conduct Committee will be recused from the process of resolving any such complaint.

## Responsibility

The responsibility for administering this Code rests with the Conduct Committee, with oversight by the Executive Director and the Board of Directors. For additional information on the Conduct Committee and its process, please write to <conduct@eclipse-foundation.org>.

## Investigation of Potential Code Violations

All conflict is not bad as a healthy debate may sometimes be necessary to push us to do our best. It is, however, unacceptable to be disrespectful or offensive, or violate this Code. If you see someone engaging in objectionable behavior violating this Code, we encourage you to address the behavior directly with those involved. If for some reason, you are unable to resolve the matter or feel uncomfortable doing so, or if the behavior is threatening or harassing, please report it following the procedure laid out below.

Reports should be directed to <conduct@eclipse-foundation.org>. It is the Conduct Committee’s role to receive and address reported violations of this Code and to ensure a fair and speedy resolution.

The Eclipse Foundation takes all reports of potential Code violations seriously and is committed to confidentiality and a full investigation of all allegations. The identity of the reporter will be omitted from the details of the report supplied to the accused. Contributors who are being investigated for a potential Code violation will have an opportunity to be heard prior to any final determination. Those found to have violated the Code can seek reconsideration of the violation and disciplinary action decisions. Every effort will be made to have all matters disposed of within 60 days of the receipt of the complaint.

## Actions
Contributors who do not follow this Code in good faith may face temporary or permanent repercussions as determined by the Conduct Committee.

This Code does not address all conduct. It works in conjunction with our [Communication Channel Guidelines](https://www.eclipse.org/org/documents/communication-channel-guidelines/), [Social Media Guidelines](https://www.eclipse.org/org/documents/social_media_guidelines.php), [Bylaws](https://www.eclipse.org/org/documents/eclipse-foundation-be-bylaws-en.pdf), and [Internal Rules](https://www.eclipse.org/org/documents/ef-be-internal-rules.pdf) which set out additional protections for, and obligations of, all contributors. The Foundation has additional policies that provide further guidance on other matters.

It’s impossible to spell out every possible scenario that might be deemed a violation of this Code. Instead, we rely on one another’s good judgment to uphold a high standard of integrity within all Eclipse Spaces. Sometimes, identifying the right thing to do isn’t an easy call. In such a scenario, raise the issue as early as possible.

## No Retaliation

The Eclipse community relies upon and values the help of Contributors who identify potential problems that may need to be addressed within an Eclipse Space. Any retaliation against a Contributor who raises an issue honestly is a violation of this Code. That a Contributor has raised a concern honestly or participated in an investigation, cannot be the basis for any adverse action, including threats, harassment, or discrimination. If you work with someone who has raised a concern or provided information in an investigation, you should continue to treat the person with courtesy and respect. If you believe someone has retaliated against you, report the matter as described by this Code. Honest reporting does not mean that you have to be right when you raise a concern; you just have to believe that the information you are providing is accurate.

False reporting, especially when intended to retaliate or exclude, is itself a violation of this Code and will not be accepted or tolerated.

Everyone is encouraged to ask questions about this Code. Your feedback is welcome, and you will get a response within three business days. Write to <conduct@eclipse-foundation.org>.

## Amendments

The Eclipse Foundation Board of Directors may amend this Code from time to time and may vary the procedures it sets out where appropriate in a particular case.

### Attribution

This Code was inspired by the [Contributor Covenant](https://www.contributor-covenant.org/), version 1.4, available [here](https://www.contributor-covenant.org/version/1/4/code-of-conduct/).

[^1]: Capitalized terms used herein without definition shall have the meanings assigned to them in the Bylaws.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -113,7 +113,9 @@ identifier; this is as per the [spec](https://docs.oasis-open.org/mqtt/mqtt/v3.1
not received, disconnecting` errors).
* When QOS1+ subscriptions have been created previously and you connect with `CleanSession` set to false it is possible
that the broker will deliver retained messages before `Subscribe` can be called. To process these messages either
configure a handler with `AddRoute` or set a `DefaultPublishHandler`.
configure a handler with `AddRoute` or set a `DefaultPublishHandler`. If there is no handler (or `DefaultPublishHandler`)
then inbound messages will not be acknowledged. Adding a handler (even if it's `opts.SetDefaultPublishHandler(func(mqtt.Client, mqtt.Message) {})`)
is highly recommended to avoid inadvertently hitting inflight message limits.
* Loss of network connectivity may not be detected immediately. If this is an issue then consider setting
`ClientOptions.KeepAlive` (sends regular messages to check the link is active).
* Reusing a `Client` is not completely safe. After calling `Disconnect` please create a new Client (`NewClient()`) rather
@@ -193,4 +195,4 @@ Discussion of the Paho clients takes place on the [Eclipse paho-dev mailing list

General questions about the MQTT protocol are discussed in the [MQTT Google Group](https://groups.google.com/forum/?hl=en-US&fromgroups#!forum/mqtt).

There is much more information available via the [MQTT community site](http://mqtt.org).
There is much more information available via the [MQTT community site](http://mqtt.org).
13 changes: 13 additions & 0 deletions SECURITY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Security Policy

This project implements the Eclipse Foundation Security Policy

* https://www.eclipse.org/security

## Supported Versions

Only the most recent release of the client will be supported with security updates.

## Reporting a Vulnerability

Please report vulnerabilities to the Eclipse Foundation Security Team at security@eclipse.org
104 changes: 104 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Matt Brittan
* Daichi Tomaru
*/

package mqtt

import (
"sync"
"time"
)

// Controller for sleep with backoff when the client attempts reconnection
// It has statuses for each situations cause reconnection.
type backoffController struct {
sync.RWMutex
statusMap map[string]*backoffStatus
}

type backoffStatus struct {
lastSleepPeriod time.Duration
lastErrorTime time.Time
}

func newBackoffController() *backoffController {
return &backoffController{
statusMap: map[string]*backoffStatus{},
}
}

// Calculate next sleep period from the specified parameters.
// Returned values are next sleep period and whether the error situation is continual.
// If connection errors continuouslly occurs, its sleep period is exponentially increased.
// Also if there is a lot of time between last and this error, sleep period is initialized.
func (b *backoffController) getBackoffSleepTime(
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
) (time.Duration, bool) {
// Decide first sleep time if the situation is not continual.
var firstProcess = func(status *backoffStatus, init time.Duration, skip bool) (time.Duration, bool) {
if skip {
status.lastSleepPeriod = 0
return 0, false
}
status.lastSleepPeriod = init
return init, false
}

// Prioritize maxSleep.
if initSleepPeriod > maxSleepPeriod {
initSleepPeriod = maxSleepPeriod
}
b.Lock()
defer b.Unlock()

status, exist := b.statusMap[situation]
if !exist {
b.statusMap[situation] = &backoffStatus{initSleepPeriod, time.Now()}
return firstProcess(b.statusMap[situation], initSleepPeriod, skipFirst)
}

oldTime := status.lastErrorTime
status.lastErrorTime = time.Now()

// When there is a lot of time between last and this error, sleep period is initialized.
if status.lastErrorTime.Sub(oldTime) > (processTime * 2 + status.lastSleepPeriod) {
return firstProcess(status, initSleepPeriod, skipFirst)
}

if status.lastSleepPeriod == 0 {
status.lastSleepPeriod = initSleepPeriod
return initSleepPeriod, true
}

if nextSleepPeriod := status.lastSleepPeriod * 2; nextSleepPeriod <= maxSleepPeriod {
status.lastSleepPeriod = nextSleepPeriod
} else {
status.lastSleepPeriod = maxSleepPeriod
}

return status.lastSleepPeriod, true
}

// Execute sleep the time returned from getBackoffSleepTime.
func (b *backoffController) sleepWithBackoff(
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
) (time.Duration, bool) {
sleep, isFirst := b.getBackoffSleepTime(situation, initSleepPeriod, maxSleepPeriod, processTime, skipFirst)
if sleep != 0 {
time.Sleep(sleep)
}
return sleep, isFirst
}
68 changes: 68 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Matt Brittan
* Daichi Tomaru
*/

package mqtt

import (
"testing"
"time"
)

func TestGetBackoffSleepTime(t *testing.T) {
// Test for adding new situation
controller := newBackoffController()
if s, c := controller.getBackoffSleepTime("not-exist", 1 * time.Second, 5 * time.Second, 1 * time.Second, false); !((s == 1 * time.Second) && !c) {
t.Errorf("When new situation is added, period should be initSleepPeriod and naturally it shouldn't be continual error. s:%d c%t", s, c)
}

// Test for the continual error in the same situation and suppression of sleep period by maxSleepPeriod
controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false)
if s, c := controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false); !((s == 20 * time.Second) && c) {
t.Errorf("When same situation is called again, period should be increased and it should be regarded as a continual error. s:%d c%t", s, c)
}
if s, c := controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false); !((s == 30 * time.Second) && c) {
t.Errorf("A same situation is called three times. 10 * 2 * 2 = 40 but maxSleepPeriod is 30. So the next period should be 30. s:%d c%t", s, c)
}

// Test for initialization by elapsed time.
controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false)
controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false)
time.Sleep((1 * 2 + 1 * 2 + 1) * time.Second)
if s, c := controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false); !((s == 1 * time.Second) && !c) {
t.Errorf("Initialization should be triggered by elapsed time. s:%d c%t", s, c)
}

// Test when initial and max period is same.
controller.getBackoffSleepTime("same", 2 * time.Second, 2 * time.Second, 1 * time.Second, false)
if s, c := controller.getBackoffSleepTime("same", 2 * time.Second, 2 * time.Second, 1 * time.Second, false); !((s == 2 * time.Second) && c) {
t.Errorf("Sleep time should be always 2. s:%d c%t", s, c)
}

// Test when initial period > max period.
controller.getBackoffSleepTime("bigger", 5 * time.Second, 2 * time.Second, 1 * time.Second, false)
if s, c := controller.getBackoffSleepTime("bigger", 5 * time.Second, 2 * time.Second, 1 * time.Second, false); !((s == 2 * time.Second) && c) {
t.Errorf("Sleep time should be 2. s:%d c%t", s, c)
}

// Test when first sleep is skipped.
if s, c := controller.getBackoffSleepTime("skip", 3 * time.Second, 12 * time.Second, 1 * time.Second, true); !((s == 0) && !c) {
t.Errorf("Sleep time should be 0 because of skip. s:%d c%t", s, c)
}
if s, c := controller.getBackoffSleepTime("skip", 3 * time.Second, 12 * time.Second, 1 * time.Second, true); !((s == 3 * time.Second) && c) {
t.Errorf("Sleep time should be 3. s:%d c%t", s, c)
}
}
47 changes: 25 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
@@ -141,6 +141,8 @@ type client struct {
stop chan struct{} // Closed to request that workers stop
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)

backoff *backoffController
}

// NewClient will create an MQTT v3.1.1 client with all of the options specified
@@ -169,6 +171,7 @@ func NewClient(o *ClientOptions) Client {
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
c.obound = make(chan *PacketAndToken)
c.oboundP = make(chan *PacketAndToken)
c.backoff = newBackoffController()
return c
}

@@ -302,10 +305,16 @@ func (c *client) Connect() Token {
func (c *client) reconnect(connectionUp connCompletedFn) {
DEBUG.Println(CLI, "enter reconnect")
var (
sleep = 1 * time.Second
conn net.Conn
initSleep = 1 * time.Second
conn net.Conn
)

// If the reason of connection lost is same as the before one, sleep timer is set before attempting connection is started.
// Sleep time is exponentially increased as the same situation continues
if slp, isContinual := c.backoff.sleepWithBackoff("connectionLost", initSleep, c.options.MaxReconnectInterval, 3*time.Second, true); isContinual {
DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds")
}

for {
if nil != c.options.OnReconnecting {
c.options.OnReconnecting(c, &c.options)
@@ -315,15 +324,8 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
if err == nil {
break
}
DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds:", err)
time.Sleep(sleep)
if sleep < c.options.MaxReconnectInterval {
sleep *= 2
}

if sleep > c.options.MaxReconnectInterval {
sleep = c.options.MaxReconnectInterval
}
sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false)
DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err)

if c.status.ConnectionStatus() != reconnecting { // Disconnect may have been called
if err := connectionUp(false); err != nil { // Should always return an error
@@ -425,7 +427,7 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
if rc != packets.ErrNetworkError { // mqtt error
err = packets.ConnErrors[rc]
} else { // network error (if this occurred in ConnectMQTT then err will be nil)
err = fmt.Errorf("%s : %s", packets.ConnErrors[rc], err)
err = fmt.Errorf("%w : %w", packets.ConnErrors[rc], err)
}
}
return conn, rc, sessionPresent, err
@@ -599,17 +601,14 @@ func (c *client) startCommsWorkers(conn net.Conn, connectionUp connCompletedFn,
c.workers.Add(1) // Done will be called when ackOut is closed
ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)

// The connection is now ready for use (we spin up a few go routines below). It is possible that
// Disconnect has been called in the interim...
// The connection is now ready for use (we spin up a few go routines below).
// It is possible that Disconnect has been called in the interim...
// issue 675:we will allow the connection to complete before the Disconnect is allowed to proceed
// as if a Disconnect event occurred immediately after connectionUp(true) completed.
if err := connectionUp(true); err != nil {
DEBUG.Println(CLI, err)
close(c.stop) // Tidy up anything we have already started
close(incomingPubChan)
c.workers.Wait()
c.conn.Close()
c.conn = nil
return false
ERROR.Println(CLI, err)
}

DEBUG.Println(CLI, "client is connected/reconnected")
if c.options.OnConnect != nil {
go c.options.OnConnect(c)
@@ -797,9 +796,13 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
if publishWaitTimeout == 0 {
publishWaitTimeout = time.Second * 30
}

t := time.NewTimer(publishWaitTimeout)
defer t.Stop()

select {
case c.obound <- &PacketAndToken{p: pub, t: token}:
case <-time.After(publishWaitTimeout):
case <-t.C:
token.setError(errors.New("publish was broken by timeout"))
}
}
7 changes: 6 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
@@ -63,7 +63,12 @@ func TestCustomConnectionFunction(t *testing.T) {
client := NewClient(options)

// Try to connect using custom function, wait for 2 seconds, to pass MQTT first message
if token := client.Connect(); token.WaitTimeout(2*time.Second) && token.Error() != nil {
// Note that the token should NOT complete (because a CONNACK is never sent)
token := client.Connect()
if token.WaitTimeout(2 * time.Second) {
t.Fatal("token should not complete") // should be blocked waiting for CONNACK
}
if token.Error() != nil { // Should never have an error
t.Fatalf("%v", token.Error())
}

2 changes: 1 addition & 1 deletion cmd/docker/publisher/dockerfile
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ ENV CGO_ENABLED 0

ADD . /pub_src
WORKDIR /pub_src
RUN go build -gcflags "all=-N -l" -o /pub
RUN go mod tidy && go build -gcflags "all=-N -l" -o /pub

# Final stage
FROM alpine:latest
9 changes: 6 additions & 3 deletions cmd/docker/publisher/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
module publisher

go 1.15
go 1.23

require github.com/eclipse/paho.mqtt.golang v1.5.0

require (
github.com/eclipse/paho.mqtt.golang v1.3.5
golang.org/x/net v0.0.0-20201029055024-942e2f445f3c // indirect
github.com/gorilla/websocket v1.5.3 // indirect
golang.org/x/net v0.35.0 // indirect
golang.org/x/sync v0.11.0 // indirect
)
25 changes: 23 additions & 2 deletions cmd/docker/publisher/go.sum
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200918111050-ba85050a1f23 h1:znRijtV5P9m5mmDsy4oesCPlCIPDILTj4wosaZWsTpY=
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200918111050-ba85050a1f23/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201029055024-942e2f445f3c h1:rpcgRPA7OvNEOdprt2Wx8/Re2cBTd8NPo/lvo3AyMqk=
golang.org/x/net v0.0.0-20201029055024-942e2f445f3c/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
2 changes: 1 addition & 1 deletion cmd/docker/subscriber/dockerfile
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ ENV CGO_ENABLED 0

ADD . /sub_src
WORKDIR /sub_src
RUN go build -gcflags "all=-N -l" -o /sub
RUN go mod tidy && go build -gcflags "all=-N -l" -o /sub

# Final stage
FROM alpine:latest
9 changes: 6 additions & 3 deletions cmd/docker/subscriber/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
module subscriber

go 1.15
go 1.23

require github.com/eclipse/paho.mqtt.golang v1.5.0

require (
github.com/eclipse/paho.mqtt.golang v1.3.5
golang.org/x/net v0.0.0-20201029055024-942e2f445f3c // indirect
github.com/gorilla/websocket v1.5.3 // indirect
golang.org/x/net v0.35.0 // indirect
golang.org/x/sync v0.11.0 // indirect
)
23 changes: 23 additions & 0 deletions cmd/docker/subscriber/go.sum
Original file line number Diff line number Diff line change
@@ -2,14 +2,37 @@ github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200918111050-ba85050a1f23 h1:znRi
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200918111050-ba85050a1f23/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201029055024-942e2f445f3c h1:rpcgRPA7OvNEOdprt2Wx8/Re2cBTd8NPo/lvo3AyMqk=
golang.org/x/net v0.0.0-20201029055024-942e2f445f3c/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
4 changes: 2 additions & 2 deletions cmd/ssl/main.go
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"os"
"time"

MQTT "github.com/eclipse/paho.mqtt.golang"
@@ -59,7 +59,7 @@ func NewTLSConfig() *tls.Config {
// Alternatively, manually add CA certificates to
// default openssl CA bundle.
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile("samplecerts/CAfile.pem")
pemCerts, err := os.ReadFile("samplecerts/CAfile.pem")
if err == nil {
certpool.AppendCertsFromPEM(pemCerts)
}
13 changes: 9 additions & 4 deletions filestore.go
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
package mqtt

import (
"io/ioutil"
"io/fs"
"os"
"path"
"sort"
@@ -159,15 +159,20 @@ func (store *FileStore) Reset() {
func (store *FileStore) all() []string {
var err error
var keys []string
var files fileInfos

if !store.opened {
ERROR.Println(STR, "trying to use file store, but not open")
return nil
}

files, err = ioutil.ReadDir(store.directory)
entries, err := os.ReadDir(store.directory)
chkerr(err)
files := make(fileInfos, 0, len(entries))
for _, entry := range entries {
info, err := entry.Info()
chkerr(err)
files = append(files, info)
}
sort.Sort(files)
for _, f := range files {
DEBUG.Println(STR, "file in All():", f.Name())
@@ -246,7 +251,7 @@ func exists(file string) bool {
return true
}

type fileInfos []os.FileInfo
type fileInfos []fs.FileInfo

func (f fileInfos) Len() int {
return len(f)
4 changes: 1 addition & 3 deletions fvt/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.6'
services:
mosquitto:
container_name: mosquitto-test
@@ -13,7 +12,6 @@ services:
source: ./mosquitto.conf
target: /mosquitto/config/mosquitto.conf
read_only: true
volume:
nocopy: true



8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/eclipse/paho.mqtt.golang

go 1.14
go 1.23

require (
github.com/gorilla/websocket v1.4.2
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
github.com/gorilla/websocket v1.5.3
golang.org/x/net v0.35.0
golang.org/x/sync v0.11.0
)
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
8 changes: 6 additions & 2 deletions netconn.go
Original file line number Diff line number Diff line change
@@ -40,10 +40,14 @@ import (
func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions, dialer *net.Dialer) (net.Conn, error) {
switch uri.Scheme {
case "ws":
conn, err := NewWebsocket(uri.String(), nil, timeout, headers, websocketOptions)
dialURI := *uri // #623 - Gorilla Websockets does not accept URL's where uri.User != nil
dialURI.User = nil
conn, err := NewWebsocket(dialURI.String(), nil, timeout, headers, websocketOptions)
return conn, err
case "wss":
conn, err := NewWebsocket(uri.String(), tlsc, timeout, headers, websocketOptions)
dialURI := *uri // #623 - Gorilla Websockets does not accept URL's where uri.User != nil
dialURI.User = nil
conn, err := NewWebsocket(dialURI.String(), tlsc, timeout, headers, websocketOptions)
return conn, err
case "mqtt", "tcp":
allProxy := os.Getenv("all_proxy")
15 changes: 15 additions & 0 deletions options_reader.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,21 @@ type ClientOptionsReader struct {
options *ClientOptions
}

// NewOptionsReader creates a ClientOptionsReader, this should only be used for mocking purposes.
//
// An example implementation:
//
// func (c *mqttClientMock) OptionsReader() mqtt.ClientOptionsReader {
// opts := mqtt.NewClientOptions()
// opts.UserName = "TestUserName"
// return mqtt.NewOptionsReader(opts)
// }
func NewOptionsReader(o *ClientOptions) ClientOptionsReader {
return ClientOptionsReader{
options: o,
}
}

// Servers returns a slice of the servers defined in the clientoptions
func (r *ClientOptionsReader) Servers() []*url.URL {
s := make([]*url.URL, len(r.options.Servers))
8 changes: 4 additions & 4 deletions ping.go
Original file line number Diff line number Diff line change
@@ -32,16 +32,16 @@ import (
func keepalive(c *client, conn io.Writer) {
defer c.workers.Done()
DEBUG.Println(PNG, "keepalive starting")
var checkInterval int64
var checkInterval time.Duration
var pingSent time.Time

if c.options.KeepAlive > 10 {
checkInterval = 5
checkInterval = 5 * time.Second
} else {
checkInterval = c.options.KeepAlive / 2
checkInterval = time.Duration(c.options.KeepAlive) * time.Second / 4
}

intervalTicker := time.NewTicker(time.Duration(checkInterval * int64(time.Second)))
intervalTicker := time.NewTicker(checkInterval)
defer intervalTicker.Stop()

for {
18 changes: 18 additions & 0 deletions token.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
package mqtt

import (
"errors"
"sync"
"time"

@@ -202,3 +203,20 @@ type UnsubscribeToken struct {
type DisconnectToken struct {
baseToken
}

// TimedOut is the error returned by WaitTimeout when the timeout expires
var TimedOut = errors.New("context canceled")

// WaitTokenTimeout is a utility function used to simplify the use of token.WaitTimeout
// token.WaitTimeout may return `false` due to time out but t.Error() still results
// in nil.
// `if t := client.X(); t.WaitTimeout(time.Second) && t.Error() != nil {` may evaluate
// to false even if the operation fails.
// It is important to note that if TimedOut is returned, then the operation may still be running
// and could eventually complete successfully.
func WaitTokenTimeout(t Token, d time.Duration) error {
if !t.WaitTimeout(d) {
return TimedOut
}
return t.Error()
}
18 changes: 18 additions & 0 deletions token_test.go
Original file line number Diff line number Diff line change
@@ -40,3 +40,21 @@ func TestWaitTimeout(t *testing.T) {
t.Fatal("Should have succeeded")
}
}

func TestWaitTokenTimeout(t *testing.T) {
b := baseToken{}

if !errors.Is(WaitTokenTimeout(&b, time.Second), TimedOut) {
t.Fatal("Should have failed")
}

// Now let's confirm that WaitTimeout returns correct error
b = baseToken{complete: make(chan struct{})}
testError := errors.New("test")
go func(bt *baseToken) {
bt.setError(testError)
}(&b)
if !errors.Is(WaitTokenTimeout(&b, 5*time.Second), testError) {
t.Fatal("Unexpected error received")
}
}