diff --git a/pkg/pubsub/subscriber_test.go b/pkg/pubsub/subscriber_test.go index 3f303f1..7ed8412 100644 --- a/pkg/pubsub/subscriber_test.go +++ b/pkg/pubsub/subscriber_test.go @@ -94,28 +94,15 @@ var _ = Describe("Pull messages", func() { var _ = Describe("PullJob", func() { BeforeEach(func(ctx context.Context) { - // making sure everything is empty, and acking everything, stopping when len = 0 - subscriber := pubsub.NewSubscriber(topicId, subscriptionId, pubsub.WithHTTPRoundTripper(rt), pubsub.WithHost(environment)) - - Eventually(func(g Gomega) bool { - msgs, err := subscriber.Pull(ctx, pubsub.WithMaxMessages(13)) - g.Expect(err).ToNot(HaveOccurred()) - - if len(msgs) == 0 { - return true - } - - err = subscriber.Ack(ctx, msgs.GetAckIDs()) - g.Expect(err).ToNot(HaveOccurred()) - - return false - }).WithTimeout(10 * time.Second).WithPolling(500 * time.Millisecond).Should(BeTrue()) + publisher := pubsub.NewPublisher(topicId, pubsub.WithHTTPRoundTripper(rt), pubsub.WithHost(environment)) + // making sure everything is empty + err := publisher.Purge(ctx) + Expect(err).ToNot(HaveOccurred()) // publishing test messages - publisher := pubsub.NewPublisher(topicId, pubsub.WithHTTPRoundTripper(rt), pubsub.WithHost(environment)) - messagesToPublish := pubsub.StringsToBase64("testMessage", "testMessage2") + messagesToPublish := pubsub.StringsToBase64("testMessage") - _, err := publisher.Publish(ctx, messagesToPublish) + _, err = publisher.Publish(ctx, messagesToPublish) Expect(err).ToNot(HaveOccurred()) }) @@ -123,23 +110,24 @@ var _ = Describe("PullJob", func() { It("should receive a message from channel", func(ctx context.Context) { subscriber := pubsub.NewSubscriber(topicId, subscriptionId, pubsub.WithHTTPRoundTripper(rt), pubsub.WithHost(environment)) - var receivedMessages pubsub.PullMessages - Eventually(func(g Gomega) { - msgs, err := subscriber.Pull(ctx, pubsub.WithMaxMessages(1)) - g.Expect(err).ToNot(HaveOccurred()) - if len(msgs) > 0 { - receivedMessages = msgs - } - g.Expect(receivedMessages).ToNot(BeEmpty()) - }).WithContext(ctx).Should(Succeed()) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + jobChan, err := subscriber.PullJobChan(ctx, pubsub.WithInterval(100*time.Millisecond)) + Expect(err).ToNot(HaveOccurred()) + + var receivedMessages pubsub.PullMessages + Eventually(jobChan, "5s").Should(Receive(&receivedMessages)) Expect(receivedMessages).To(HaveLen(1)) decodedStrings, err := pubsub.Base64ToStrings(string(receivedMessages[0].Data)) Expect(err).ToNot(HaveOccurred()) - Expect(decodedStrings[0]).To(Or(Equal("testMessage"), Equal("testMessage2"))) + Expect(decodedStrings[0]).To(Equal("testMessage")) err = subscriber.Ack(ctx, receivedMessages.GetAckIDs()) Expect(err).ToNot(HaveOccurred()) + + cancel() + subscriber.Wait() }) }) @@ -156,7 +144,7 @@ var _ = Describe("PullJob", func() { Expect(messages).To(HaveLen(1)) decoded, err := pubsub.Base64ToStrings(string(messages[0].Data)) Expect(err).ToNot(HaveOccurred()) - Expect(decoded[0]).To(Or(Equal("testMessage"), Equal("testMessage2"))) + Expect(decoded[0]).To(Equal("testMessage")) err = subscriber.Ack(ctx, messages.GetAckIDs()) Expect(err).ToNot(HaveOccurred()) callbackInvoked.Store(true)