英文:
Keda servicebus: All the messages getting processed by some pods of keda scaledJob
问题
I am using Azure Kubernetes Service (AKS) platform and used KEDA "ScaledJob" for long-running jobs. In this, Azure Service Bus Queue trigger has been used to auto-trigger Jobs. Now while I am adding messages in Azure Service Bus, KEDA will auto-trigger the job and create a node/pod as per configuration. But in this case, all the messages are getting picked up and processed by some pods. It is expected that each scaled-up pod processes a single message and terminates.
Following is my YAML file:
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{ .Chart.Name }}
spec:
jobTargetRef:
backoffLimit: 4
parallelism: 1
completions: 1
activeDeadlineSeconds: 300
template:
spec:
imagePullSecrets:
- name: {{ .Values.image.imagePullSecrets }}
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
volumes:
- name: azure
azureFile:
shareName: sharenameone
secretName: secret-sharenameone
readOnly: true
- name: one-storage
emptyDir: {}
- name: "logging-volume-file"
persistentVolumeClaim:
claimName: "azure-file-logging"
initContainers:
- name: test-java-init
image: {{ .Values.global.imageRegistryURI }}/{{ .Values.image.javaInitImage.name}}:{{ .Values.image.javaInitImage.tag }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
securityContext:
readOnlyRootFilesystem: true
resources:
requests:
cpu: 100m
memory: 300Mi
limits:
cpu: 200m
memory: 400Mi
volumeMounts:
- name: azure
mountPath: /mnt/azure
- name: one-storage
mountPath: /certs
containers:
- name: {{ .Chart.Name }}
image: {{ .Values.global.imageRegistryURI }}/tests/{{ .Chart.Name }}:{{ .Values.version }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
env:
{{- include "chart.envVars" . | nindent 14 }}
- name: JAVA_OPTS
value: >-
{{ .Values.application.javaOpts }}
- name: application_name
value: "test_application"
- name: queueName
value: "test-queue-name"
- name: servicebusconnstrenv
valueFrom:
secretKeyRef:
name: secrets-service-bus
key: service_bus_conn_str
volumeMounts:
- name: cert-storage
mountPath: /certs
- name: "logging-volume-azure-file"
mountPath: "/mnt/logging"
resources:
{{- toYaml .Values.resources | nindent 14 }}
pollingInterval: 30
maxReplicaCount: 5
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 20
triggers:
- type: azure-servicebus
metadata:
queueName: "test-queue-name"
connectionFromEnv: servicebusconnstrenv
messageCount: "1"
And this is my Azure Function listener:
@FunctionName("TestServiceBusTrigger")
public void TestServiceBusTriggerHandler(
@ServiceBusQueueTrigger(
name = "msg",
queueName = "%TEST_QUEUE_NAME%",
connection = "ServiceBusConnectionString")
final String inputMessage,
final ExecutionContext context) {
final java.util.logging.Logger contextLogger = context.getLogger();
System.setProperty("javax.net.ssl.trustStore", "/certs/cacerts");
try {
// all the processing goes here
} catch (Exception e) {
//Exception handling
}
}
What configurations need to be added so that each scaled-up pod processes a single message and terminates?
英文:
I am using Azure Kubernetes Service(AKS) platform and used KEDA "ScaledJob" for long running job. In this, Azure Service Bus Queue trigger has been used to auto trigger Jobs. Now while I am adding messages in Azure Service Bus, KEDA will auto trigger job and create node/pod as per configuration. but in this case, all the messages are getting picked up and processed by some pods. it is expected that each scaled up pod processes single message and terminates.
following is my yml file
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{ .Chart.Name }}
spec:
jobTargetRef:
backoffLimit: 4
parallelism: 1
completions: 1
activeDeadlineSeconds: 300
template:
spec:
imagePullSecrets:
- name: {{ .Values.image.imagePullSecrets }}
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
volumes:
- name: azure
azureFile:
shareName: sharenameone
secretName: secret-sharenameone
readOnly: true
- name: one-storage
emptyDir: {}
- name: "logging-volume-file"
persistentVolumeClaim:
claimName: "azure-file-logging"
initContainers:
- name: test-java-init
image: {{ .Values.global.imageRegistryURI }}/{{ .Values.image.javaInitImage.name}}:{{ .Values.image.javaInitImage.tag }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
securityContext:
readOnlyRootFilesystem: true
resources:
requests:
cpu: 100m
memory: 300Mi
limits:
cpu: 200m
memory: 400Mi
volumeMounts:
- name: azure
mountPath: /mnt/azure
- name: one-storage
mountPath: /certs
containers:
- name: {{ .Chart.Name }}
image: {{ .Values.global.imageRegistryURI }}/tests/{{ .Chart.Name }}:{{ .Values.version }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
env:
{{- include "chart.envVars" . | nindent 14 }}
- name: JAVA_OPTS
value: >-
{{ .Values.application.javaOpts }}
- name: application_name
value: "test_application"
- name: queueName
value: "test-queue-name"
- name: servicebusconnstrenv
valueFrom:
secretKeyRef:
name: secrets-service-bus
key: service_bus_conn_str
volumeMounts:
- name: cert-storage
mountPath: /certs
- name: "logging-volume-azure-file"
mountPath: "/mnt/logging"
resources:
{{- toYaml .Values.resources | nindent 14 }}
pollingInterval: 30
maxReplicaCount: 5
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 20
triggers:
- type: azure-servicebus
metadata:
queueName: "test-queue-name"
connectionFromEnv: servicebusconnstrenv
messageCount: "1"
and this is my azure function listener
@FunctionName("TestServiceBusTrigger")
public void TestServiceBusTriggerHandler(
@ServiceBusQueueTrigger(
name = "msg",
queueName = "%TEST_QUEUE_NAME%",
connection = "ServiceBusConnectionString")
final String inputMessage,
final ExecutionContext context) {
final java.util.logging.Logger contextLogger = context.getLogger();
System.setProperty("javax.net.ssl.trustStore", "/certs/cacerts");
try {
// all the processing goes here
} catch (Exception e) {
//Exception handling
}
}
what configurations needs to be added, so that each scaled up pod processes single message and terminates?
答案1
得分: 1
这不是 Azure Functions 的设计方式,或者更准确地说,甚至不是一般情况下如何使用 KEDA 的方式。如果已经在运行的容器可以在分配后处理尽可能多的消息,将会更理想。
话虽如此,如果您的情景仍然需要这样做,您可以编写一个简单的脚本,直接使用 Azure Service Bus SDK 来获取一个消息,处理它,然后终止。
英文:
This is not how Azure Functions were designed, or rather even how KEDA should be used in general. It would be more ideal if the already running container processes as many messages as it can once provisioned.
That being said, if your scenario still requires this, you could write a simple script that directly uses the Azure Service Bus SDK to fetch just one message, process it, and terminate.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论