From acbfbdec81644ab8239269a110e0107ff219ac7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Gro=C3=9Fmann?= Date: Mon, 1 Jun 2026 14:19:26 +0200 Subject: [PATCH 1/2] feat: added test for pullJobChan and simplified before each in PullJob tests --- pkg/pubsub/subscriber_test.go | 47 +++++++++++++---------------------- 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/pkg/pubsub/subscriber_test.go b/pkg/pubsub/subscriber_test.go index 3f303f1..a78e7e6 100644 --- a/pkg/pubsub/subscriber_test.go +++ b/pkg/pubsub/subscriber_test.go @@ -94,28 +94,15 @@ var _ = Describe("Pull messages", func() { var _ = Describe("PullJob", func() { BeforeEach(func(ctx context.Context) { - // making sure everything is empty, and acking everything, stopping when len = 0 - subscriber := pubsub.NewSubscriber(topicId, subscriptionId, pubsub.WithHTTPRoundTripper(rt), pubsub.WithHost(environment)) - - Eventually(func(g Gomega) bool { - msgs, err := subscriber.Pull(ctx, pubsub.WithMaxMessages(13)) - g.Expect(err).ToNot(HaveOccurred()) - - if len(msgs) == 0 { - return true - } - - err = subscriber.Ack(ctx, msgs.GetAckIDs()) - g.Expect(err).ToNot(HaveOccurred()) - - return false - }).WithTimeout(10 * time.Second).WithPolling(500 * time.Millisecond).Should(BeTrue()) + publisher := pubsub.NewPublisher(topicId, pubsub.WithHTTPRoundTripper(rt), pubsub.WithHost(environment)) + // making sure everything is empty + err := publisher.Purge(ctx) + Expect(err).ToNot(HaveOccurred()) // publishing test messages - publisher := pubsub.NewPublisher(topicId, pubsub.WithHTTPRoundTripper(rt), pubsub.WithHost(environment)) - messagesToPublish := pubsub.StringsToBase64("testMessage", "testMessage2") + messagesToPublish := pubsub.StringsToBase64("testMessage") - _, err := publisher.Publish(ctx, messagesToPublish) + _, err = publisher.Publish(ctx, messagesToPublish) Expect(err).ToNot(HaveOccurred()) }) @@ -123,23 +110,23 @@ var _ = Describe("PullJob", func() { It("should receive a message from channel", func(ctx context.Context) { subscriber := pubsub.NewSubscriber(topicId, subscriptionId, pubsub.WithHTTPRoundTripper(rt), pubsub.WithHost(environment)) - var receivedMessages pubsub.PullMessages - Eventually(func(g Gomega) { - msgs, err := subscriber.Pull(ctx, pubsub.WithMaxMessages(1)) - g.Expect(err).ToNot(HaveOccurred()) - if len(msgs) > 0 { - receivedMessages = msgs - } - g.Expect(receivedMessages).ToNot(BeEmpty()) - }).WithContext(ctx).Should(Succeed()) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + jobChan, err := subscriber.PullJobChan(ctx, pubsub.WithInterval(100*time.Millisecond)) + Expect(err).ToNot(HaveOccurred()) + + receivedMessages := <-jobChan Expect(receivedMessages).To(HaveLen(1)) decodedStrings, err := pubsub.Base64ToStrings(string(receivedMessages[0].Data)) Expect(err).ToNot(HaveOccurred()) - Expect(decodedStrings[0]).To(Or(Equal("testMessage"), Equal("testMessage2"))) + Expect(decodedStrings[0]).To(Equal("testMessage")) err = subscriber.Ack(ctx, receivedMessages.GetAckIDs()) Expect(err).ToNot(HaveOccurred()) + + cancel() + subscriber.Wait() }) }) @@ -156,7 +143,7 @@ var _ = Describe("PullJob", func() { Expect(messages).To(HaveLen(1)) decoded, err := pubsub.Base64ToStrings(string(messages[0].Data)) Expect(err).ToNot(HaveOccurred()) - Expect(decoded[0]).To(Or(Equal("testMessage"), Equal("testMessage2"))) + Expect(decoded[0]).To(Equal("testMessage")) err = subscriber.Ack(ctx, messages.GetAckIDs()) Expect(err).ToNot(HaveOccurred()) callbackInvoked.Store(true) From 0006590a51f5fbeee939a25451097e1a2e53dbeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Gro=C3=9Fmann?= Date: Mon, 1 Jun 2026 14:38:54 +0200 Subject: [PATCH 2/2] refactor: made reading from PullJobChan safer by using gomega assertion --- pkg/pubsub/subscriber_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/pubsub/subscriber_test.go b/pkg/pubsub/subscriber_test.go index a78e7e6..7ed8412 100644 --- a/pkg/pubsub/subscriber_test.go +++ b/pkg/pubsub/subscriber_test.go @@ -116,7 +116,8 @@ var _ = Describe("PullJob", func() { jobChan, err := subscriber.PullJobChan(ctx, pubsub.WithInterval(100*time.Millisecond)) Expect(err).ToNot(HaveOccurred()) - receivedMessages := <-jobChan + var receivedMessages pubsub.PullMessages + Eventually(jobChan, "5s").Should(Receive(&receivedMessages)) Expect(receivedMessages).To(HaveLen(1)) decodedStrings, err := pubsub.Base64ToStrings(string(receivedMessages[0].Data))