how to use "in" with multiple unique_together fileds

I want to get many result from multiple unique_together fileds.
now i can use

ContentType.objects.filter(Q(app_label=‘auth’, model=‘user’) | Q(app_label=‘pxuser’, model=‘user’))

it got sql

(‘SELECT django_content_type.id, django_content_type.app_label, django_content_type.model FROM django_content_type WHERE ((django_content_type.app_label = %s AND django_content_type.model = %s) OR (django_cont ent_type.app_label = %s AND django_content_type.model = %s))’, (‘auth’, ‘user’, ‘pxuser’, ‘user’))

is there any orm way to got sql like

select * from django_content_type where (app_label, model) in ((‘auth’, ‘group’), (‘pxuser’, ‘user’));

You’re looking to do perform your WHERE clause on a ROW. Or at least that’s the name for it in PostgreSQL. I know this is possible for PostgreSQL, though I haven’t tried it for other databases. The solution is a bit of a hack, but it does what you want.

from django.db.models import Func, Value

app_model_pairs = [('auth', 'group'), ('pxuser', 'user')]
ContentType.objects.annotate(
    app_model_name=Func('app_label', 'model', function="ROW"),
).filter(app_model_name__in=[Value(*pair) for pair in app_model_pairs])
1 Like

mysql not support ROW.

in mysql the sql will get right result

select * from django_content_type where (app_label, model) in ((‘auth’, ‘group’), (‘pxuser’, ‘user’));

I played around with mysql a bit to find an answer for you. Turns out MySQL doesn’t like ROW constructors in the returned SELECT clause. Everything else, works well from what I can see. You’ll need to use a Subquery to avoid returning the row constructor:

subquery = ContentType.objects.annotate(
    app_model_name=Func('app_label', 'model', function="ROW"),
).filter(
    pk=OuterRef('pk'),
    app_model_name__in=[Value(pair) for pair in app_model_pairs],
)
ContentType.objects.filter(Exists(subquery.values('pk')))

this is ok for ContentType, but in my models(unique_together has diffrent field type)

class TestModel(model.BaseModel):
     field1 = models.CharField(max_length=100)
     field2 = models.IntegerField()
     class Meta:
         unique_together = (('field1', 'field2'),)
subquery = TestModel.objects.annotate(
    test=Func('field1', 'field2', function="ROW"),
).filter(
    pk=OuterRef('pk'),
    test__in=[Value(('a', 1), ('b', 2))],
)
TestModel.objects.filter(Exists(subquery.values('pk')))

got
" django.core.exceptions.FieldError: Expression contains mixed types: CharField, IntegerField. You must set output_field."
in Func._resolve_output_field()

set output_field=models.CharField() to Func got expected result

subquery = TestModel.objects.annotate(
    test=Func('field1', 'field2', function="ROW", output_field=models.CharField()),
).filter(
    pk=OuterRef('pk'),
    test__in=[Value(('a', 1), ('b', 2))],
)
TestModel.objects.filter(Exists(subquery.values('pk')))

I’m glad you were able to figure it out.

this code is ok for mysql and postgresql. It not use subquery. Is there any way I can set annotation_mask in queryset(function annotate or other)?

many_queryset = queryset.annotate(**{
    temp_name: Func(*field_names, function="ROW", output_field=models.CharField())
}).filter(**{
    "%s__in" % temp_name: [Value(pair) for pair in field_values]
})
many_queryset.query.set_annotation_mask(())
list(many_queryset)

this code will support both mysql,postgresql and oracle and it donot use subquery

import operator
from functools import reduce

from django.contrib.contenttypes.models import ContentType
from django.db import connections, models
from django.db.models import Value, Func, Q
from django.utils.crypto import get_random_string


class TupleValue(Value):

    def __init__(self, value):
        if isinstance(value, (list, tuple)):
            value = tuple([TupleValue(v) for v in value])
        super().__init__(value)

    def as_sql(self, compiler, connection):
        if self.value and isinstance(self.value, (list, tuple)):
            sqls = list()
            params = list()
            for val in self.value:
                if isinstance(val, Value):
                    sql, param = val.as_sql(compiler, connection)
                    sqls.append(sql)
                    params.extend(param)
            return '(' + ', '.join(sqls) + ')', params
        else:
            return super().as_sql(compiler, connection)


def get_many(model_cls, field_names, field_values):
    queryset = model_cls.objects
    many_queryset = None
    if len(field_names) > 1 and len(field_values) > 1:
        temp_name = "django_cool_temp_field_name_" + get_random_string(8)
        if connections[queryset.db].vendor in ("oracle", "mysql", "postgresql"):
            many_queryset = queryset.annotate(**{
                temp_name: Func(*field_names, function="", output_field=models.CharField())
            }).filter(**{
                "%s__in" % temp_name: [TupleValue(value) for value in field_values]
            })
            many_queryset.query.set_annotation_mask(())
    if many_queryset is None:
        many_queryset = queryset.filter(
            reduce(operator.or_, [Q(**dict(zip(field_names, field_value))) for field_value in field_values])
        )
    return list(many_queryset)


get_many(ContentType, field_names=('app_label', 'model'), field_values=[('auth', 'group'), ('pxuser', 'user')])